5.1 MapReduce教程

2016-03-05 14:35:45 5,021 1

一、MapReduce简介

MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题.MapReduce的核心,就是对一个需要计算的任务进行拆分,然后并行处理。

1.1 为什么要进行任务的拆分?

举例来 说,假设我们要从10亿个随机的数字中,挑选出值是最大的哪一个。如果一台机器来执行,可能需要执行很长时间。但是如果我们有10台机器,那么每台机器只 要处理1亿条,10台机器各自挑选出自己负责的1亿条数据的最大值,然后将10台机器各自的最大值汇总再进行比较,很明显计算的时间会大大缩短。这就是分 布式计算,将一个计算的任务由多台计算机联合进行处理。

二、 JobTracker与TaskTracker

 2.1 JobTracker

    我们已经知道MapReduce框架需要将任务进行拆分执行。那么肯定要有程序负责任务的拆分(JobTracker),也要有程序负责任务的计算(TaskTracker)。当然JobTracker不仅仅是任务的拆分这么简单,以下摘自官网上的一句话:

     Map/Reduce框架由一个单独的master JobTracker 和每个集群节点一个slave TaskTracker共同组成。master负责调度构成一个作业的所有任务,这些任务分布在不同的slave上,master监控它们的执行,重新执 行已经失败的任务。而slave仅负责执行由master指派的任务。

 2.2 TaskTracker

    TaskTracker负责执行实际的运算任务,在实际计算过程,又分为2个阶段:MapReduce

    Map阶段的作用

    Map的原本含义是“映射”意思,实际上就是将一个单列的集合映射成一个key-value的集合。以下代码展示了将一个List映射为一个Map的过程:

/**准备数据:构造一个姓名集合*/
        List<String> nameList = Arrays.asList("tianshouzhi","huhuamin","wanghanao","luyang");
        /**进行Map*/
        Map<Integer, String> nameMap=new HashMap<Integer, String>();
        for (int i=0;i< nameList.size(); i++) {
            nameMap.put(i, nameList.get(i));
        }

这段代码的作用是,将一个由名称组成的集合映射成一个Map,Map的key是名称在list集合中的index,map的value就是名称。

      读者可以想一下,我们要统计一段文本中各个单词出现的次数。例如如下的文本:

A Storm Cluster can run many topologies at the same time 
Topology consists of spouts and bolts
This is the first topology example

对于这段文本,在映射的时候,我们需要考虑把什么当做key,把什么当做value。事实上,这个key和value到底应该是什么是根据业务需求来的

1、假如我要知道的就是每一行的内容是什么,那么我们的key可能就是行号,value就是这一段文本的内容。那么Map应该是这样的Map<Integer,String>。

2、如果我要统计的是不同单词出现的次数,那么我的key可能就是单词的内容,而value就是单词出现的次数。那么Map应该是这样的Map<String,Integer>。当然,这种映射比上一种映射更加复杂。


上述只是文本内容比较少,如果我们是统计一个10亿行的文本中不同单词出现的次数。正确的做法应该是,同时有多个map任务来运行,例如10个,那么第1个map任务可能读取的1-1亿行的内容进行map阶段的映射,第2个map任务可能读取的1亿-2亿行的内容进行map阶段的映射...以此类推。每个map任务执行各自负责的内容的映射。这样map阶段的速度就会提高很多。


Reduce阶段的作用

Reduce阶段的作用我称之为"归一"。例如上述提到的用10个map任务进行统计10亿行文本中单词出现次数的映射。因为有多个Map任务,每个Map任务实际上统计的是各自负责范围的单词出现的次数。例如

map任务1统计的结果可能是:

me 3000W次
the 5000w次
...

map任务2统计的结果可能是:

me 5000W次
you 2000w次
...

...

因为我们最终需要的是所有的不同单词出现的次数。所以我们不同将不同的map任务执行的映射结果进行合并。下面展示了上面两个任务合并的结果:

me 8000W次
the 5000w次
you 2000w次
...

读者可能已经注意到,这里(Reduce阶段)是将key相同的部分进行归一。例如单词me出现次数=3000W+5000W。


总结:现在我们已经知道了MapReduce框架中实际上已经分为Map阶段和Reduce阶段,这两个阶段负责的任务不相同,一个负责映射,一个负责按照相同的key进行归一。在MapReduce框架,分别提供Mapper类和Reducer类,用于表示这两个阶段各自要执行的任务。


下图展示了MapReduce任务执行的流程。

11.jpg

    

    说明:首先Client向JobTracker提交一个任务。JobTracker将任务分配到一个或者多个TaskTracker进行处理。不同的TaskTracker上,有的运行的是Map阶段的任务,有的运行是Reduce阶段的任务。

    对于map阶段,首先对输入的内容进行分割(InputSplit),不同Mapper任务负责各自的分割后的内容的映射。

   对于Reduce阶段,接受多个Mapper的输出,进行归一后,得到最终的输出。


三、Mapreduce原理

22.png

执行步骤:

1. map任务处理

  1. 读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。

  2. 写自己的逻辑,对输入的key、value处理,转换成新的key、value输出。

  3. 对输出的key、value进行分区。

  4. 对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中。

  5.  (可选)分组后的数据进行归约。

2.reduce任务处理

  1. 对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。

  2. 对多个map任务的输出进行合并、排序。写reduce函数自己的逻辑,对输入的key、value处理,转换成新的key、value输出。

  3. 把reduce的输出保存到文件中。


上一篇:4.3 HDFS Java API 下一篇:5.2 入门案例