5.2 入门案例

2016-03-09 21:13:57 4,627 1


一、准备工作

本节讲解MapReduce入门案例,统计一个文本文件中的单词出现次数。首先我们要下载相应的依赖。

pom.xml

    <dependencies>
    <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.6.0-cdh5.4.7</version>
            <scope>provided</scope>
        </dependency>
       <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.6.0-cdh5.4.7</version>
        </dependency> 
       <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-app</artifactId>
            <version>2.6.0-cdh5.4.7</version>
        </dependency> 
        <dependency>
		    <groupId>jdk.tools</groupId>
		    <artifactId>jdk.tools</artifactId>
		    <version>1.7</version>
		    <scope>system</scope>
		    <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
	</dependency>
	<dependency>
     	<groupId>junit</groupId>
     	<artifactId>junit</artifactId>
     	<version>4.9</version>
     </dependency>
  </dependencies>
  <repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>

src/main/resources/log4j.properties

log4j.rootLogger=DEBUG,console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout

准备单词文本word.txt

hello you
helo me

接下来,将这个文件上传到hdfs中"/mapreduce"目录下。

hadoop fs -mkdir /mapreduce
hadoop fs -put word.txt /mapreduce

按照上一节的讲解,我们知道一个MapReduce任务是分为2个阶段的Map阶段,和Reduce阶段。Mapper阶段负责映射,Reduce阶段负责按照相同key进行归一。

MapReduce框架提供了MapperReducer类,分别用于执行Mapper阶段和Reducer阶段的任务。我们只要自定义自己的类,分别继承这两个类并覆盖map方法和reduce方法,写上我们自己的业务逻辑即可。

在这里,我们将自定一个类TokenizerMapper继承Mapper,自定义IntSumReducer继承Reducer。

二 Mapper实现

分析

Mapper类的声明如下:

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {....}

这个类的声明上有四个泛型,对于KEYOUTVALUEOUT表示的经过mapper阶段映射后输出的内容。这个很好理解。我们在上一节已经讲过,Map阶段作用就是将原始的内容映射为key-value集合,然后将结果传递给Reducer。因为我们这里统计的是单词的个数,因此KEYOUT表示的应该就是单词内容(字符串类型),VALUEOUT表示的是单词的出现的次数(整数类型)。

对于KEYINKEYOUT是什么呢?因为我们统计的时候文本的内容是一行一行读取的,所以KEYIN就是行号(整数类型),VALUEIN是这一行的内容(字符串类型)。

在确定了四个泛型参数之后,我们就可以开始写代码了,不过需要注意的是,在Hadoop中,定义了自己的数据类型,字符串类型用Text表示,而整数类型用IntWriteable表示。在后文,我们将会对Hadoop的数据类型进行详细讲解。

实现:

TokenizerMapper.java

 public static class TokenizerMapper
       extends Mapper<IntWritable, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    @Override
    public void map(IntWritable key, Text value, Context context
                    ) throws IOException, InterruptedException {
      //StringTokenizer是java工具类,将字符串按照空格进行分割
      StringTokenizer itr = new StringTokenizer(value.toString());
      
      //每次出现一个单词,单词次数加1
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

三 Reducer实现

分析:

Reducer的声明如下所示:

public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {...}

这个类依然有四个泛型。我们已经知道,Reducer是接受Map阶段的输出,按照相同key进行归一。那么map阶段的输出类型肯定就是reduce阶段的输入类型。

因此,在我们的案例中,KEYIN应该单词(字符串),VALUEIN应该是单词出现的次数(整数)。那么KEYOUT、VALUEOUT是什么?因为我们统计的就是各个单词出现的次数(字符串),所以还应该是单词和出现的次数(整数)。

实现

 public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

四、Job实现

一个mapreduce任务分为map阶段和reduce阶段,现在我们只是单独了实现了这两个这段的代码。

我们还需要将上述代码组合成一个完整的MapReduce任务,用Job对象表示。

public class WordCount {
    public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		final String nameNodeUrl = "hdfs://115.28.65.149:9000";
		conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, nameNodeUrl);

		//1 构建job对象,因为hadoop中可能会同时运行多个任务,每个任务都会有一个名字,以示区分
		final String jobName = "word count";
		Job job = Job.getInstance(conf, jobName);
		job.setJarByClass(WordCount.class);
		// 任务内容的输入路径
		FileInputFormat.addInputPath(job, new Path("/mapreduce/word.txt"));
		// 任务计算结果的而输出路径,如果输出目录已经存在,就删除
		final Path outputPath = new Path("/out");
		FileSystem fileSystem = outputPath.getFileSystem(conf);
		if(fileSystem.exists(outputPath)){
			fileSystem.delete(outputPath, true);
		}
		FileOutputFormat.setOutputPath(job, outputPath);
		
		
		//2 设置Mapper
		job.setMapperClass(TokenizerMapper.class);
		//规约,后文会详细介绍
//		job.setCombinerClass(IntSumReducer.class);
		
		//3 设置Reducer
		//设置分区
		job.setReducerClass(IntSumReducer.class);
		//设置reducer任务数,默认为1
		job.setNumReduceTasks(1);
		//设置分区类
		job.setPartitionerClass(HashPartitioner.class);
		//设置输出的key与Value
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		// 提交任务并等待执行完成
		job.waitForCompletion(true);
	}
	}
}

运行程序,如果没有抛出异常,则说明成功(不过在正常情况下,一般都是会出错的)。

此时登录Linux服务器,查看输出内容:

QQ截图20160309211140.png


不幸的是,大多数人在Windows平台下开发的用户,第一次运行WordCount代码时,都会出错。


运行程序可能遇到的问题:

1、 

Exception in thread "main" java.lang.NullPointerException
       atjava.lang.ProcessBuilder.start(Unknown Source)
       atorg.apache.hadoop.util.Shell.runCommand(Shell.java:482)
       atorg.apache.hadoop.util.Shell.run(Shell.java:455)
       atorg.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
       atorg.apache.hadoop.util.Shell.execCommand(Shell.java:808)
       atorg.apache.hadoop.util.Shell.execCommand(Shell.java:791)
       at

这是因为缺少 winutils.exe

2、

Exception in thread "main"java.lang.UnsatisfiedLinkError:
    org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
       atorg.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
       atorg.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:557)
       atorg.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:977)
       atorg.apache.hadoop.util.DiskChecker.checkAccessByFileMethods(DiskChecker.java:187)
       atorg.apache.hadoop.util.DiskChecker.checkDirAccess(DiskChecker.java:174)
       atorg.apache.hadoop.util.DiskChecker.checkDir(DiskChecker.java:108)
       atorg.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:285)
       ....

这是因为缺少hadoop.dll


要解决这两个问题很简单,只要对应的将上述文件下载下来放入PATH路径下即可,需要注意的是,hadoop.dllwinutils.exe版本一定要与hadoop的版本是对应的。本地提供了一个下载链接

hadoop_dll2.6.0.zip

否则即使添加了这两个文件可能还是会出现问题。解压之后将所有的文件放入C:\Windows\System32即可,因为这个目录本来就是PATH路径下。