4.3 HDFS Java API

2016-03-05 12:59:14 7,536 0

准备工作

因为HDFS 是一个分布式文件系统,其并不像我们的本地文件系统,可以直接访问,而是需要通过网络进行访问。Hadoop提供了Java客户端让我们可以远程对 HDFS进行操作,包括增删改查等。而其中的网络传入部分Hadoop的Java客户端已经封装好,我们只需要简单的调用即可。

首先我们需要下载hadoop的客户端依赖pom.xml,注意,下载hadoop客户端依赖版本最好与服务端的保持一致。因为我们服务端使用的cdh版本的hadoop,因此客户端,我们也也用cdh版本的hadoop客户端。

<!--hadoopHDFS依赖--> 
<dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.6.0-cdh5.4.7</version>
        </dependency>
 <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.6.0-cdh5.4.7</version>
 </dependency> 
<!--hadoop使用到了jdk自带的tools.jar-->
 <dependency>
    <groupId>jdk.tools</groupId>
    <artifactId>jdk.tools</artifactId>
    <version>1.7</version>
    <scope>system</scope>
    <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>

  <repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>

1、获取文件系统对象

Hadoop提供FileSystem类用于表示HDFS文件系统,我们对HDFS所有的操作都是要基于这个类来进行完成。因此我们首先要做的就是获取这个类的实例。

public class HDFSAPI {
    public static String nameNodeUrl="hdfs://115.28.65.149:9000";
    public static Configuration configuration=new Configuration();
    public static FileSystem fileSystem;
    
    @BeforeClass
    public static void beforeClass() throws Exception{
        Configuration configuration=new Configuration();
        configuration.set("fs.defaultFS", "hdfs://115.28.65.149:9000");
        URI uri = new URI(nameNodeUrl);
        String user="root";
        fileSystem=FileSystem.get(uri,configuration,user);
        System.out.println(fileSystem);
    }
    ...
 }

运行程序输出:

DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1459561864_1, ugi=root (auth:SIMPLE)]]

说明:

因为通过网络来进行访问,所以我们必须配置HDFS的网络路径,实际上就是我们hadoop集群中core-site.xml配置的fs.default.name项的值。我们通过Configuration类来进行设置。只不过配置项的名字改为"fs.defaultFS"。这里我们是通过字符串来指定配置项的名字,实际上,Hadoop还提供了2个类CommonConfigurationKeysPublicCommonConf和CommonConfigurationKeys两个类,这两个类中包含了Hadoop客户端所有配置项的key和默认值,例如我们这里的"fs.defaultFS"就可以用CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY 来指定。

我们使用FileSystem.get(uri,configuration,user);这个方法来获取FileSystem的实例,这里指定的user为root,实际上也可以不指定。主要是因为我在Linux系统上是通过root用户来启动hadoop集群的。这里显示指定root,那么我就有权限访问HDFS文件系统中的所有内容。如果不指定,就会以启动我们这个客户端的程序的身份去访问HDFS,由于笔者是在Windows操作系统上写的代码,用户名为Administer,那么可能就会有一些文件没有权限访问。

2、创建文件夹

@Test
    public void mkdirs() throws IOException{
        Path path=new Path("/hdfs/javaapi");
        final boolean exists = fileSystem.exists(path);
        System.out.println(exists);//创建前
        if(!exists){
            fileSystem.mkdirs(path);
        }
        System.out.println(fileSystem.exists(path));//创建后
    }

运行程序,控制台输出

false
true

说明:

Hadoop使用PATH对象表示HDFS文件中的路径,因为是仿照Linux系统的路径命名习惯,所以我们传入的路径是"/hdfs/javaapi"。在创建文件夹文件夹之前,我们依然要判断是否存在,不存在才可以创建,否则会抛出异常。

3、上传文件

@Test
    public void uploadFile() throws Exception{
        String uploadPath="/hdfs/javaapi/upload.txt";//文件保存路径
        String content="test upload file";//文件内容
        boolean overwrite=true;//如果文件存在就覆盖
        final Path hdfsPath = new Path(uploadPath);
        FSDataOutputStream out=fileSystem.create(hdfsPath, overwrite);
        boolean close=true;//上传成功关闭输入流和输出流
        int bufferSize=1024;//缓冲大小
        ByteArrayInputStream in = new ByteArrayInputStream(content.getBytes());//输入流
        IOUtils.copyBytes(in, out, bufferSize,close);
        System.out.println(fileSystem.exists(hdfsPath));//判断文件是否上传到HDFS上
    }

运行程序,控制台输出:

true

说明:

HDFS是提供了FSDataOutputStream对象来表示输出流,对应的有FSDataInputStream表示输入流。因为HSFS是需要通过流式的方式进行操作,所以我们把上传的内容content转换成了输入流。IOUtils是hadoop提供的一个工具类,帮我们从输入流往输出流中写数据。

4、下载文件

@Test
    public void downloadFile() throws Exception {
        String  downloadPath = "/hdfs/javaapi/upload.txt";
        FSDataInputStream inputStream = fileSystem.open(new Path(downloadPath));
        IOUtils.copyBytes(inputStream, System.out, 1024, true);
    }

运行程序,控制台输出

test upload file

说明:

FileSystem的open方法返回的是一个FSDataInputStream对象,表示指定文件的输入流。此处我们依然使用了工具类IOUtils,将获取到的内容显示在了控制台上(通过指定输出流为System.out).

5、递归列出文件夹中所有内容(包含子目录)

@Test
    public void recursive() throws IOException {
        FileStatus[] listStatus = fileSystem.listStatus(new Path("/hdfs"));
        for (FileStatus fileStatus : listStatus) {
                listFiles(fileStatus);
        }
    }

   private void listFiles(FileStatus fileStatus) throws IOException {
        Path path = fileStatus.getPath();
        System.out.println(path);
        if (!fileStatus.isDirectory()) {// 如果不是目录
            
             FsPermission permission = fileStatus.getPermission(); 
             String owner = fileStatus.getOwner(); 
             short replication = fileStatus.getReplication(); 
             long len = fileStatus.getLen();
            System
             .out.println("path:"+path+",permission:"+permission+",replication:"
             +replication+",owenr:"+owner+",size:"+len);
            
        } else {// 如果是目录
            FileStatus[] listStatus = fileSystem.listStatus(path);
            for (FileStatus childStatus : listStatus) {
                listFiles(childStatus);
            }
        }
    }

运行程序,控制台输出:

hdfs://115.28.65.149:9000/hdfs/d1
hdfs://115.28.65.149:9000/hdfs/javaapi
hdfs://115.28.65.149:9000/hdfs/javaapi/upload.txt
path:hdfs://115.28.65.149:9000/hdfs/javaapi/upload.txt,permission:rw-r--r--,replication:3,owenr:root,size:16

说明:

FileStatus对象用于表示HDFS中文件的元数据(包含文件的路径,文件大小、权限、备份数等信息)。

fileSystem.listStatus(new Path("/hdfs"))方法的返回值是FileStatus[]数组,表示的是/hdfs目录下所有的文件/文件夹的源信息。

通过fileStatus.getPath()f方法,我们可以获得一个文件的访问路径。通过fileStatus.isDirectory(),我们可以判断当前路径是文件还是文件夹。

FsPermission对象表示的是一个文件的权限信息,通过fileStatus.getPermission()方法获得。

文件的备份数,通过fileStatus.getReplication()获得,返回值是一个short整数。

文件的所有者通过fileStatus.getOwner()f方法获得。

...

上一篇:4.2 HDFS权限管理 下一篇:5.1 MapReduce教程