2.5 grouping策略和并发度
Grouping
策略及并发度是理解Storm的关键。本文将以前所未有的详细程度,来深入的介绍。
1、并发度介绍
回顾之前,我们已经介绍过,一个Topology可以运行在多个worker上,这样可以提高数据处理能力。因为一个worker就是一个进程,很自然的,我们可以想到,如果一个worker中可以再起多个线程的话,那么效率就会更高。事实上,Storm就是这么干的,worker并不是Storm集群中的最小运行单位。Executer才是Storm集群中的最小运行单位。Executer实际上就是一个线程。你可以这样理解,worker是Topology的最小运行单位,而Executer是Spout或者Bolt的最小运行单位。回顾一下我们的WordCountApp案例中,创建Topology的代码
//定义拓扑 TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("word-reader" , new WordReader()); builder.setBolt("word-normalizer" , new WordNormalizer()).shuffleGrouping("word-reader" ); builder.setBolt("word-counter" , new WordCounter()).fieldsGrouping("word-normalizer" , new Fields("word")); StormTopology topology = builder .createTopology();
在这段代码中,我们没有设置并发度,也没有设置worker的数量。Storm默认就会给这个Topology分配1个Worker,在这个Worker启动三个线程,1个用来运行WordReader,1个线程用来运行WordNormalizer,1个线程用来运行WordCounter。
如果我们用图形来表示的话,应该是这样:
这个图的这个图意思是,WordCountApp这个Topology运行在一个Worker上,在这个worker中,分别其了三个线程(executor
),分别用来执行Topology的三个组件:WordReader、WordNormalizer、WordCounter。
那么假设我们想用两个线程来执行WordNormalizer,行不行呢?
很简单,目前我们setSpout和setBolt的时候,调用的分别是TopologyBuilder以下两个方法:
setSpout(String id, IRichSpout spout ) setBolt(String id, IRichBolt bolt ) |
这两个方法,表示使用默认的并发度,也就是1.
我们可以调用另外两个方法,显示的指定并发度。
setSpout(String id, IRichSpout spout , Number parallelism_hint) setBolt (String id , IRichBolt bolt, Number parallelism_hint ) |
现在我们修改WordCountApp,设置WordNormalizer 并发度为2
builder.setBolt( "word-normalizer" , new WordNormalizer(),2).shuffleGrouping("word-reader" );
为了更加方面的观察,我们将WordNormalizer中prepare方法打印的内容修改一下:
System.out.println( "WordNormalizer.prepare(),taskId:" +context.getThisTaskId()+ ",hashcode:" +this);
现在运行程序,观察输出,输出的日志中,应该包含以下两句话:
WordNormalizer.prepare(),taskId:3,hashcode:com.tianshouzhi.study.wordcountapp.bolts.WordNormalizer@41106aa2 WordNormalizer.prepare(),taskId:4,hashcode:com.tianshouzhi.study.wordcountapp.bolts.WordNormalizer@3f4c04b1
我们可以看到,我们设置并行度为2的时候, WordNormalizer被实例化了2次,意味着有2个 WordNormalizer实例,而Storm会分配2个executer来分别运行一个实例。所以,此时我们的Topology运行时,是这样的:
图中,用蓝色特别标记出,有2个线程分别用来执行一个WordNormalizer实例。
理论上,我们也同样可以给WordReader和WordCounter来设置并发度。但是具体问题要具体分析,在本案例,并不适合给WordReader和WordCounter设置过高的并发度。
WordReader:在WordCountApp案例中,我们使用的是一个文本文件,如果我们给WordReader设置并发度为2,那么就会有2个WordReader去读取words.txt,最终导致我们的结果是实际的2倍,读者可以自行测试。这实际上就是数据源的问题,因为我们的数据源无法保证一行数据只被读取一次。
WordCounter:WordCounter是一个汇总的Bolt,统计个每个单词出现的次数,如果我们将其并发度设置为2甚至更高。最终会导致每个WordCounter实例的统计的只是实际结果的一部分。因此也是不合适的。一般情况下,我们如果我们的Topology中的最后一个Bolt如果是汇总型的,并发度一般都设置默认为1。
虽然我们的WordCountApp案例不适合WordReader和WordCounter都不适合使用2个或更高的并发度。但是我们可以一种相反的角度来学习。我们可以提高WordReader的并发度,甚至设置多个Worker,虽然最终统计结果会出现错误,但是只要我们预计的是一样的就可以了。
我们将WordCountApp类main方法中的代码改为:
//定义拓扑 TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("word-reader" , new WordReader(),4); builder.setBolt("word-normalizer" , new WordNormalizer(),3).shuffleGrouping("word-reader" ); builder.setBolt("word-counter" , new WordCounter(),1).fieldsGrouping("word-normalizer" , new Fields("word")); StormTopology topology = builder .createTopology(); //配置 Config conf = new Config(); conf.setNumWorkers(2); String fileName ="wordsFile.dic" ; conf.put("fileName" , fileName ); conf.setDebug(false); //运行拓扑 if(args ==null||args.length==0){ //以本地模式运行 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("Getting-Started-Topologie" , conf , topology); Thread. sleep(5000); cluster.shutdown(); } else{//提交到Storm集群运行,以第一个参数为topology的名称 StormSubmitter. submitTopology(args[0], conf, topology ); }
在这个Topology中,我们设置WordReader的并发度为4,WordNormalizer的并发度为3,并且通过调用Config对象的setNumWorkers方法设置有2个Worker。
因为WordReader并发度设置为了4,所以words.txt文件会被读取四次,最终的统计结果应该是原来的四倍。
运行代码看输出:
a: 4 cluster: 4 storm: 4 topology: 8 bolts: 4 this: 4 is: 4 run: 4 many: 4 example: 4 topologies: 4 the: 8 can: 4 at: 4 same: 4 and: 4 of: 4 consists: 4 spouts: 4 time: 4 first: 4
可以看到,输出结果中的每个单词刚好都是原来的四倍。
既然结果和我们预料的一样,现在我们就可以愉快的分析在有多个Worker和和高并发度的情况下,Topology的图应该是什么样的。
我们需要将4个WordReader实例,3个WordNomalize实例和1个WordCounter实例分配在2个worker中,因为4+3+1=8,意味着我们要在2个worker中,总共创建8个线程来运行这个组件。具体分配情况是无法预知的,以下是一种可能的情况:
这张图表示,Topology运行在2个worker上。worker1和worker2各启动了四个线程(这只是一种可能情况,也可能是其他情况,如,一个worker启动5个线程,另一个启动3个)。其中worker1运行了2个WorkerReader实例和2个WorkerNormalizer实例,worker2运行了2个WorkerReader实例,1个WorkerNormalizer实例和1个WordCounter实例。
相信到目前为止,你对并发度的概念已经比较了解了,那么我们再来加点料。因为我们设置了2个worker,在集群中运行的时候,这两个worker可能运行一台supervisor上,也有可能不在。下图演示了,worker不在同一台机器上的情况
二、Grouping策略介绍
storm里面有6种类型的stream grouping:
1.Shuffle Grouping: 随机分组, 随机派发stream里面的tuple, 保证每个bolt接收到的tuple数目相同。轮询,平均分配。
2. Fields Grouping:按字段分组, 比如按userid来分组, 具有同样userid的tuple会被分到相同的Bolts, 而不同的userid则会被分配到不同的Bolts。
3. All Grouping: 广播发送, 对于每一个tuple, 所有的Bolts都会收到。
4. Global Grouping: 全局分组, 这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。
5. Non Grouping: 不分组, 这个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是一样的效果,不平均分配。
6. Direct Grouping: 直接分组, 这是一种比较特别的分组方法,用这种分组意味着消息的发送者举鼎由消息接收者的哪个task处理这个消息。 只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来或者处理它的消息的taskid(OutputCollector.emit方法也会返回taskid)