Github Project : example-java-read-and-write-from-hdfs
Maven Dependencies
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
HDFS URI
HDFS connection URI have the following format :
hdfs://namenodedns:port/user/hdfs/folder/file.csv
Default port is 8020.
Init HDFS FileSystem Object
private static String HADOOP_CONF_DIR = System.getenv("HADOOP_CONF_DIR");
// ====== Init HDFS File System Object
Configuration conf = new Configuration();
conf.addResource(new Path("file:///" + HADOOP_CONF_DIR + "/core-site.xml"));
conf.addResource(new Path("file:///" + HADOOP_CONF_DIR + "/hdfs-site.xml"));
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
// Set HADOOP user
System.setProperty("HADOOP_USER_NAME", "hdfs");
System.setProperty("hadoop.home.dir", "/");
//Get the filesystem - HDFS
FileSystem fs = FileSystem.get(conf);
Init Subfolders
//==== Create folder if not exists
Path workingDir=fs.getWorkingDirectory();
Path newFolderPath= new Path(path);
if(!fs.exists(newFolderPath)) {
// Create new Directory
fs.mkdirs(newFolderPath);
logger.info("Path "+path+" created.");
}
How to write a file to HDFS
//==== Write file
logger.info("Begin Write file into hdfs");
//Create a path
Path hdfswritepath = new Path(newFolderPath + "/" + fileName);
//Init output stream
FSDataOutputStream outputStream=fs.create(hdfswritepath);
//Cassical output stream usage
outputStream.writeBytes(fileContent);
outputStream.close();
logger.info("End Write file into hdfs");
How to read a file from HDFS
//==== Read file
logger.info("Read file from hdfs");
//Create a path
Path hdfsreadpath = new Path(newFolderPath + "/" + fileName);
//Init input stream
FSDataInputStream inputStream = fs.open(hdfsreadpath);
//Classical input stream usage
String out= IOUtils.toString(inputStream, "UTF-8");
logger.info(out);
inputStream.close();
fs.close();
With Kerberos
Github project : kerberos-java-client
Maven Dependencies
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0-cdh5.16.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh5.16.1.1</version>
</dependency>
JAAS configuration
Add a jaas.conf file under src/main/resources containing the following content :
Main {
com.sun.security.auth.module.Krb5LoginModule required client=TRUE;
};
Create login context function
private static final String JDBC_DRIVER_NAME = "org.apache.hive.jdbc.HiveDriver";
private static String username;
private static String password;
private static String HADOOP_CONF_DIR = System.getenv("HADOOP_CONF_DIR");
private static LoginContext kinit(String username, String password) throws LoginException {
LoginContext lc = new LoginContext(Main.class.getSimpleName(), callbacks -> {
for (Callback c : callbacks) {
if (c instanceof NameCallback)
((NameCallback) c).setName(username);
if (c instanceof PasswordCallback)
((PasswordCallback) c).setPassword(password.toCharArray());
}
});
lc.login();
return lc;
}
Init HDFS FileSystem Object
if (args.length > 0) { // If any arguments provided
username = args[0];
password = args[1];
} else {
System.out.println("Usage : java -jar <username> <password>");
System.exit(0);
}
URL url = Main.class.getClassLoader().getResource("jaas.conf");
System.setProperty("java.security.auth.login.config", url.toExternalForm());
Configuration conf = new Configuration();
conf.addResource(new Path("file:///" + HADOOP_CONF_DIR + "/core-site.xml"));
conf.addResource(new Path("file:///" + HADOOP_CONF_DIR + "/hdfs-site.xml"));
UserGroupInformation.setConfiguration(conf);
LoginContext lc = kinit(username, password);
UserGroupInformation.loginUserFromSubject(lc.getSubject());
FileSystem fs = FileSystem.get(conf);
Init Subfolders
//==== Create folder if not exists
Path workingDir=fs.getWorkingDirectory();
Path newFolderPath= new Path(path);
if(!fs.exists(newFolderPath)) {
// Create new Directory
fs.mkdirs(newFolderPath);
logger.info("Path "+path+" created.");
}
How to write a file to HDFS
//==== Write file
logger.info("Begin Write file into hdfs");
//Create a path
Path hdfswritepath = new Path(newFolderPath + "/" + fileName);
//Init output stream
FSDataOutputStream outputStream=fs.create(hdfswritepath);
//Cassical output stream usage
outputStream.writeBytes(fileContent);
outputStream.close();
logger.info("End Write file into hdfs");
How to read a file from HDFS
//==== Read file
logger.info("Read file from hdfs");
//Create a path
Path hdfsreadpath = new Path(newFolderPath + "/" + fileName);
//Init input stream
FSDataInputStream inputStream = fs.open(hdfsreadpath);
//Classical input stream usage
String out= IOUtils.toString(inputStream, "UTF-8");
logger.info(out);
inputStream.close();
fs.close();
Comments
0 comments
Article is closed for comments.