2.5 grouping策略和并发度

2016-03-02 21:55:19 9,414 3

Grouping策略及并发度是理解Storm的关键。本文将以前所未有的详细程度,来深入的介绍。

1、并发度介绍

       回顾之前,我们已经介绍过,一个Topology可以运行在多个worker上,这样可以提高数据处理能力。因为一个worker就是一个进程,很自然的,我们可以想到,如果一个worker中可以再起多个线程的话,那么效率就会更高。事实上,Storm就是这么干的,worker并不是Storm集群中的最小运行单位。Executer才是Storm集群中的最小运行单位。Executer实际上就是一个线程。你可以这样理解,worker是Topology的最小运行单位,而Executer是Spout或者Bolt的最小运行单位。回顾一下我们的WordCountApp案例中,创建Topology的代码

        //定义拓扑
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("word-reader" , new WordReader());
        builder.setBolt("word-normalizer" , new WordNormalizer()).shuffleGrouping("word-reader" );
        builder.setBolt("word-counter" , new WordCounter()).fieldsGrouping("word-normalizer" , new Fields("word"));
        StormTopology topology = builder .createTopology();

    在这段代码中,我们没有设置并发度,也没有设置worker的数量。Storm默认就会给这个Topology分配1个Worker,在这个Worker启动三个线程,1个用来运行WordReader,1个线程用来运行WordNormalizer,1个线程用来运行WordCounter。

如果我们用图形来表示的话,应该是这样:
53B34B1E-9DD1-425F-9E3A-C72B6DC7885A.png

这个图的这个图意思是,WordCountApp这个Topology运行在一个Worker上,在这个worker中,分别其了三个线程(executor),分别用来执行Topology的三个组件:WordReader、WordNormalizer、WordCounter。

那么假设我们想用两个线程来执行WordNormalizer,行不行呢?

很简单,目前我们setSpout和setBolt的时候,调用的分别是TopologyBuilder以下两个方法:

setSpout(String id, IRichSpout spout )
setBolt(String id, IRichBolt bolt )

这两个方法,表示使用默认的并发度,也就是1.

我们可以调用另外两个方法,显示的指定并发度。

setSpout(String id, IRichSpout spout , Number parallelism_hint)
setBolt (String id , IRichBolt bolt, Number parallelism_hint )

现在我们修改WordCountApp,设置WordNormalizer 并发度为2

builder.setBolt( "word-normalizer" , new WordNormalizer(),2).shuffleGrouping("word-reader" );

为了更加方面的观察,我们将WordNormalizer中prepare方法打印的内容修改一下:

System.out.println( "WordNormalizer.prepare(),taskId:" +context.getThisTaskId()+ ",hashcode:" +this);

现在运行程序,观察输出,输出的日志中,应该包含以下两句话:

WordNormalizer.prepare(),taskId:3,hashcode:com.tianshouzhi.study.wordcountapp.bolts.WordNormalizer@41106aa2
WordNormalizer.prepare(),taskId:4,hashcode:com.tianshouzhi.study.wordcountapp.bolts.WordNormalizer@3f4c04b1

我们可以看到,我们设置并行度为2的时候, WordNormalizer被实例化了2次,意味着有2个 WordNormalizer实例,而Storm会分配2个executer来分别运行一个实例。所以,此时我们的Topology运行时,是这样的:

Image.png


图中,用蓝色特别标记出,有2个线程分别用来执行一个WordNormalizer实例。

理论上,我们也同样可以给WordReader和WordCounter来设置并发度。但是具体问题要具体分析,在本案例,并不适合给WordReader和WordCounter设置过高的并发度。
WordReader:在WordCountApp案例中,我们使用的是一个文本文件,如果我们给WordReader设置并发度为2,那么就会有2个WordReader去读取words.txt,最终导致我们的结果是实际的2倍,读者可以自行测试。这实际上就是数据源的问题,因为我们的数据源无法保证一行数据只被读取一次。
WordCounter:WordCounter是一个汇总的Bolt,统计个每个单词出现的次数,如果我们将其并发度设置为2甚至更高。最终会导致每个WordCounter实例的统计的只是实际结果的一部分。因此也是不合适的。一般情况下,我们如果我们的Topology中的最后一个Bolt如果是汇总型的,并发度一般都设置默认为1。

虽然我们的WordCountApp案例不适合WordReader和WordCounter都不适合使用2个或更高的并发度。但是我们可以一种相反的角度来学习。我们可以提高WordReader的并发度,甚至设置多个Worker,虽然最终统计结果会出现错误,但是只要我们预计的是一样的就可以了。

我们将WordCountApp类main方法中的代码改为:

//定义拓扑
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("word-reader" , new WordReader(),4);
        builder.setBolt("word-normalizer" , new WordNormalizer(),3).shuffleGrouping("word-reader" );
        builder.setBolt("word-counter" , new WordCounter(),1).fieldsGrouping("word-normalizer" , new Fields("word"));
        StormTopology topology = builder .createTopology();
    //配置
        Config conf = new Config();
        conf.setNumWorkers(2);
        String fileName ="wordsFile.dic" ;
        conf.put("fileName" , fileName );
        conf.setDebug(false);
      //运行拓扑
             if(args ==null||args.length==0){ //以本地模式运行
             LocalCluster cluster = new LocalCluster();
             cluster.submitTopology("Getting-Started-Topologie" , conf , topology);
             Thread. sleep(5000);
             cluster.shutdown();
        } else{//提交到Storm集群运行,以第一个参数为topology的名称
            StormSubmitter. submitTopology(args[0], conf, topology );
        }

在这个Topology中,我们设置WordReader的并发度为4,WordNormalizer的并发度为3,并且通过调用Config对象的setNumWorkers方法设置有2个Worker。

因为WordReader并发度设置为了4,所以words.txt文件会被读取四次,最终的统计结果应该是原来的四倍。
运行代码看输出:

a: 4
cluster: 4
storm: 4
topology: 8
bolts: 4
this: 4
is: 4
run: 4
many: 4
example: 4
topologies: 4
the: 8
can: 4
at: 4
same: 4
and: 4
of: 4
consists: 4
spouts: 4
time: 4
first: 4

可以看到,输出结果中的每个单词刚好都是原来的四倍。

既然结果和我们预料的一样,现在我们就可以愉快的分析在有多个Worker和和高并发度的情况下,Topology的图应该是什么样的。
我们需要将4个WordReader实例,3个WordNomalize实例和1个WordCounter实例分配在2个worker中,因为4+3+1=8,意味着我们要在2个worker中,总共创建8个线程来运行这个组件。具体分配情况是无法预知的,以下是一种可能的情况:
Image.png

这张图表示,Topology运行在2个worker上。worker1和worker2各启动了四个线程(这只是一种可能情况,也可能是其他情况,如,一个worker启动5个线程,另一个启动3个)。其中worker1运行了2个WorkerReader实例和2个WorkerNormalizer实例,worker2运行了2个WorkerReader实例,1个WorkerNormalizer实例和1个WordCounter实例。

相信到目前为止,你对并发度的概念已经比较了解了,那么我们再来加点料。因为我们设置了2个worker,在集群中运行的时候,这两个worker可能运行一台supervisor上,也有可能不在。下图演示了,worker不在同一台机器上的情况

Image.png

二、Grouping策略介绍

storm里面有6种类型的stream grouping:

1.Shuffle Grouping: 随机分组, 随机派发stream里面的tuple, 保证每个bolt接收到的tuple数目相同。轮询,平均分配。

2. Fields Grouping:按字段分组, 比如按userid来分组, 具有同样userid的tuple会被分到相同的Bolts, 而不同的userid则会被分配到不同的Bolts。

3. All Grouping: 广播发送, 对于每一个tuple, 所有的Bolts都会收到。

4. Global Grouping: 全局分组, 这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。

5. Non Grouping: 不分组, 这个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是一样的效果,不平均分配。

6. Direct Grouping: 直接分组, 这是一种比较特别的分组方法,用这种分组意味着消息的发送者举鼎由消息接收者的哪个task处理这个消息。 只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来或者处理它的消息的taskid(OutputCollector.emit方法也会返回taskid)