5.2 入门案例
一、准备工作
本节讲解MapReduce入门案例,统计一个文本文件中的单词出现次数。首先我们要下载相应的依赖。
pom.xml
<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0-cdh5.4.7</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.0-cdh5.4.7</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-app</artifactId> <version>2.6.0-cdh5.4.7</version> </dependency> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.7</version> <scope>system</scope> <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.9</version> </dependency> </dependencies> <repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories>
src/main/resources/log4j.properties
log4j.rootLogger=DEBUG,console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.layout=org.apache.log4j.PatternLayout
准备单词文本word.txt
hello you helo me
接下来,将这个文件上传到hdfs中"/mapreduce"目录下。
hadoop fs -mkdir /mapreduce hadoop fs -put word.txt /mapreduce
按照上一节的讲解,我们知道一个MapReduce任务是分为2个阶段的Map阶段,和Reduce阶段。Mapper阶段负责映射,Reduce阶段负责按照相同key进行归一。
MapReduce框架提供了Mapper
和Reducer
类,分别用于执行Mapper阶段和Reducer阶段的任务。我们只要自定义自己的类,分别继承这两个类并覆盖map方法和reduce方法,写上我们自己的业务逻辑即可。
在这里,我们将自定一个类TokenizerMapper继承Mapper,自定义IntSumReducer继承Reducer。
二 Mapper实现
分析
Mapper类的声明如下:
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {....}
这个类的声明上有四个泛型,对于KEYOUT
、VALUEOUT
表示的经过mapper阶段映射后输出的内容。这个很好理解。我们在上一节已经讲过,Map阶段作用就是将原始的内容映射为key-value集合,然后将结果传递给Reducer。因为我们这里统计的是单词的个数,因此KEYOUT表示的应该就是单词内容(字符串类型),VALUEOUT表示的是单词的出现的次数(整数类型)。
对于KEYIN
和KEYOUT
是什么呢?因为我们统计的时候文本的内容是一行一行读取的,所以KEYIN就是行号(整数类型),VALUEIN是这一行的内容(字符串类型)。
在确定了四个泛型参数之后,我们就可以开始写代码了,不过需要注意的是,在Hadoop中,定义了自己的数据类型,字符串类型用Text表示,而整数类型用IntWriteable表示。在后文,我们将会对Hadoop的数据类型进行详细讲解。
实现:
TokenizerMapper.java
public static class TokenizerMapper extends Mapper<IntWritable, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override public void map(IntWritable key, Text value, Context context ) throws IOException, InterruptedException { //StringTokenizer是java工具类,将字符串按照空格进行分割 StringTokenizer itr = new StringTokenizer(value.toString()); //每次出现一个单词,单词次数加1 while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }
三 Reducer实现
分析:
Reducer的声明如下所示:
public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {...}
这个类依然有四个泛型。我们已经知道,Reducer是接受Map阶段的输出,按照相同key进行归一。那么map阶段的输出类型肯定就是reduce阶段的输入类型。
因此,在我们的案例中,KEYIN应该单词(字符串),VALUEIN应该是单词出现的次数(整数)。那么KEYOUT、VALUEOUT是什么?因为我们统计的就是各个单词出现的次数(字符串),所以还应该是单词和出现的次数(整数)。
实现
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
四、Job实现
一个mapreduce任务分为map阶段和reduce阶段,现在我们只是单独了实现了这两个这段的代码。
我们还需要将上述代码组合成一个完整的MapReduce任务,用Job
对象表示。
public class WordCount { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); final String nameNodeUrl = "hdfs://115.28.65.149:9000"; conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, nameNodeUrl); //1 构建job对象,因为hadoop中可能会同时运行多个任务,每个任务都会有一个名字,以示区分 final String jobName = "word count"; Job job = Job.getInstance(conf, jobName); job.setJarByClass(WordCount.class); // 任务内容的输入路径 FileInputFormat.addInputPath(job, new Path("/mapreduce/word.txt")); // 任务计算结果的而输出路径,如果输出目录已经存在,就删除 final Path outputPath = new Path("/out"); FileSystem fileSystem = outputPath.getFileSystem(conf); if(fileSystem.exists(outputPath)){ fileSystem.delete(outputPath, true); } FileOutputFormat.setOutputPath(job, outputPath); //2 设置Mapper job.setMapperClass(TokenizerMapper.class); //规约,后文会详细介绍 // job.setCombinerClass(IntSumReducer.class); //3 设置Reducer //设置分区 job.setReducerClass(IntSumReducer.class); //设置reducer任务数,默认为1 job.setNumReduceTasks(1); //设置分区类 job.setPartitionerClass(HashPartitioner.class); //设置输出的key与Value job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 提交任务并等待执行完成 job.waitForCompletion(true); } } }
运行程序,如果没有抛出异常,则说明成功(不过在正常情况下,一般都是会出错的)。
此时登录Linux服务器,查看输出内容:
不幸的是,大多数人在Windows平台下开发的用户,第一次运行WordCount代码时,都会出错。
运行程序可能遇到的问题:
1、
Exception in thread "main" java.lang.NullPointerException atjava.lang.ProcessBuilder.start(Unknown Source) atorg.apache.hadoop.util.Shell.runCommand(Shell.java:482) atorg.apache.hadoop.util.Shell.run(Shell.java:455) atorg.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715) atorg.apache.hadoop.util.Shell.execCommand(Shell.java:808) atorg.apache.hadoop.util.Shell.execCommand(Shell.java:791) at
这是因为缺少 winutils.exe
2、
Exception in thread "main"java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z atorg.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method) atorg.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:557) atorg.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:977) atorg.apache.hadoop.util.DiskChecker.checkAccessByFileMethods(DiskChecker.java:187) atorg.apache.hadoop.util.DiskChecker.checkDirAccess(DiskChecker.java:174) atorg.apache.hadoop.util.DiskChecker.checkDir(DiskChecker.java:108) atorg.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:285) ....
这是因为缺少hadoop.dll
要解决这两个问题很简单,只要对应的将上述文件下载下来放入PATH路径下即可,需要注意的是,hadoop.dll
与winutils.exe
版本一定要与hadoop的版本是对应的。本地提供了一个下载链接
否则即使添加了这两个文件可能还是会出现问题。解压之后将所有的文件放入C:\Windows\System32即可,因为这个目录本来就是PATH路径下。