4.3 HDFS Java API
准备工作
因为HDFS 是一个分布式文件系统,其并不像我们的本地文件系统,可以直接访问,而是需要通过网络进行访问。Hadoop提供了Java客户端让我们可以远程对 HDFS进行操作,包括增删改查等。而其中的网络传入部分Hadoop的Java客户端已经封装好,我们只需要简单的调用即可。
首先我们需要下载hadoop的客户端依赖pom.xml,注意,下载hadoop客户端依赖版本最好与服务端的保持一致。因为我们服务端使用的cdh版本的hadoop,因此客户端,我们也也用cdh版本的hadoop客户端。
<!--hadoopHDFS依赖--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0-cdh5.4.7</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.0-cdh5.4.7</version> </dependency> <!--hadoop使用到了jdk自带的tools.jar--> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.7</version> <scope>system</scope> <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> </dependency> <repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories>
1、获取文件系统对象
Hadoop提供FileSystem
类用于表示HDFS文件系统,我们对HDFS所有的操作都是要基于这个类来进行完成。因此我们首先要做的就是获取这个类的实例。
public class HDFSAPI { public static String nameNodeUrl="hdfs://115.28.65.149:9000"; public static Configuration configuration=new Configuration(); public static FileSystem fileSystem; @BeforeClass public static void beforeClass() throws Exception{ Configuration configuration=new Configuration(); configuration.set("fs.defaultFS", "hdfs://115.28.65.149:9000"); URI uri = new URI(nameNodeUrl); String user="root"; fileSystem=FileSystem.get(uri,configuration,user); System.out.println(fileSystem); } ... }
运行程序输出:
DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1459561864_1, ugi=root (auth:SIMPLE)]]
说明:
因为通过网络来进行访问,所以我们必须配置HDFS的网络路径,实际上就是我们hadoop集群中core-site.xml
配置的fs.default.name
项的值。我们通过Configuration
类来进行设置。只不过配置项的名字改为"fs.defaultFS
"。这里我们是通过字符串来指定配置项的名字,实际上,Hadoop还提供了2个类CommonConfigurationKeysPublic
CommonConf和CommonConfigurationKeys
两个类,这两个类中包含了Hadoop客户端所有配置项的key和默认值,例如我们这里的"fs.defaultFS
"就可以用CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY 来指定。
我们使用FileSystem.get(uri,configuration,user);这个方法来获取FileSystem的实例,这里指定的user为root,实际上也可以不指定。主要是因为我在Linux系统上是通过root用户来启动hadoop集群的。这里显示指定root,那么我就有权限访问HDFS文件系统中的所有内容。如果不指定,就会以启动我们这个客户端的程序的身份去访问HDFS,由于笔者是在Windows操作系统上写的代码,用户名为Administer,那么可能就会有一些文件没有权限访问。
2、创建文件夹
@Test public void mkdirs() throws IOException{ Path path=new Path("/hdfs/javaapi"); final boolean exists = fileSystem.exists(path); System.out.println(exists);//创建前 if(!exists){ fileSystem.mkdirs(path); } System.out.println(fileSystem.exists(path));//创建后 }
运行程序,控制台输出
false true
说明:
Hadoop使用PATH
对象表示HDFS文件中的路径,因为是仿照Linux系统的路径命名习惯,所以我们传入的路径是"/hdfs/javaapi"。在创建文件夹文件夹之前,我们依然要判断是否存在,不存在才可以创建,否则会抛出异常。
3、上传文件
@Test public void uploadFile() throws Exception{ String uploadPath="/hdfs/javaapi/upload.txt";//文件保存路径 String content="test upload file";//文件内容 boolean overwrite=true;//如果文件存在就覆盖 final Path hdfsPath = new Path(uploadPath); FSDataOutputStream out=fileSystem.create(hdfsPath, overwrite); boolean close=true;//上传成功关闭输入流和输出流 int bufferSize=1024;//缓冲大小 ByteArrayInputStream in = new ByteArrayInputStream(content.getBytes());//输入流 IOUtils.copyBytes(in, out, bufferSize,close); System.out.println(fileSystem.exists(hdfsPath));//判断文件是否上传到HDFS上 }
运行程序,控制台输出:
true
说明:
HDFS是提供了FSDataOutputStream
对象来表示输出流,对应的有FSDataInputStream
表示输入流。因为HSFS是需要通过流式的方式进行操作,所以我们把上传的内容content转换成了输入流。IOUtils
是hadoop提供的一个工具类,帮我们从输入流往输出流中写数据。
4、下载文件
@Test public void downloadFile() throws Exception { String downloadPath = "/hdfs/javaapi/upload.txt"; FSDataInputStream inputStream = fileSystem.open(new Path(downloadPath)); IOUtils.copyBytes(inputStream, System.out, 1024, true); }
运行程序,控制台输出
test upload file
说明:
FileSystem的open
方法返回的是一个FSDataInputStream
对象,表示指定文件的输入流。此处我们依然使用了工具类IOUtils,将获取到的内容显示在了控制台上(通过指定输出流为System.out).
5、递归列出文件夹中所有内容(包含子目录)
@Test public void recursive() throws IOException { FileStatus[] listStatus = fileSystem.listStatus(new Path("/hdfs")); for (FileStatus fileStatus : listStatus) { listFiles(fileStatus); } } private void listFiles(FileStatus fileStatus) throws IOException { Path path = fileStatus.getPath(); System.out.println(path); if (!fileStatus.isDirectory()) {// 如果不是目录 FsPermission permission = fileStatus.getPermission(); String owner = fileStatus.getOwner(); short replication = fileStatus.getReplication(); long len = fileStatus.getLen(); System .out.println("path:"+path+",permission:"+permission+",replication:" +replication+",owenr:"+owner+",size:"+len); } else {// 如果是目录 FileStatus[] listStatus = fileSystem.listStatus(path); for (FileStatus childStatus : listStatus) { listFiles(childStatus); } } }
运行程序,控制台输出:
hdfs://115.28.65.149:9000/hdfs/d1 hdfs://115.28.65.149:9000/hdfs/javaapi hdfs://115.28.65.149:9000/hdfs/javaapi/upload.txt path:hdfs://115.28.65.149:9000/hdfs/javaapi/upload.txt,permission:rw-r--r--,replication:3,owenr:root,size:16
说明:
FileStatus
对象用于表示HDFS中文件的元数据(包含文件的路径,文件大小、权限、备份数等信息)。
fileSystem.listStatus(new Path("/hdfs"))
方法的返回值是FileStatus[]数组,表示的是/hdfs目录下所有的文件/文件夹的源信息。
通过fileStatus.getPath()
f方法,我们可以获得一个文件的访问路径。通过fileStatus.isDirectory(),我们可以判断当前路径是文件还是文件夹。
FsPermission
对象表示的是一个文件的权限信息,通过fileStatus.getPermission()方法获得。
文件的备份数,通过fileStatus.getReplication()
获得,返回值是一个short整数。
文件的所有者通过fileStatus.getOwner()
f方法获得。
...