2.2 WordCountApp
在本节中,我们创建一个名字叫WordCountApp的Topology。这个Topology的作用是统计一个名为words.txt文本中每个单词出现的次数。我们一步一步的分析,是如何根据我们需求,最终创建出Topology。
一、Topology组件分析
假设我们的words.txt
中内容如下:
A Storm Cluster can run many topologies at the same time Topology consists of spouts and bolts This is the first topology example
这个Topology可能会由以下的组件组成:
WordReader(Spout)
:我们需要从外数据源wrods.txt中获取数据,这里的words.txt其实就是外部数据源。为了明确这个Spout的作用,我们将其称之为WordReader。
WordNormalizer(Bolt)
:这个Bolt作用是格式化单词。通常情况下,我们统计一个单词出现次数的时候,是不区分大小写的。所以我们,必须要有这样一个环节来转换大小写。事实上,如果我们的文本中,还出现了标点符号",","."等,我们还要将这些符号去除,这就是WordNormalizer的作用。为了简单,我们这里并没有出现符号。
WordCounter(Bolt)
:用于统计经过WordNormalizer处理后的单词的出现次数。
因次,我们的WordCountApp这个Topology可能如下所示:
二、API简介
Components组件API介绍
构建一个Topology有很多,根据我们需求的不同,我们可能会使用不同的API,下图列出了,本节中我们会提到的,或者在WordCountApp案例中会涉及到的API。
我们之前提过,Topology中的Spout和Bolt称之为Topology的组件(Components)。
在Storm API中,定义了一个IComponent
接口表示组件,用ISpout表示一个Spout,IBolt表示一个Bolt。IRichSpout接口分别继承了IComponent和ISpout接口,意味着这个接口的实现类,既是Spout,又是Topology的组件。
IRichBolt接口集成了IComponent接口和IBolt接口,表示这个接口的实现类即是Component,又是Bolt。
因此在开发中,我们通常只要实现IRichSpout和IRichBolt即可。由于IRichBolt和IRichSpout接口中定义的方法比较多。有些方法我们可能并不想实现,因此分别提供了一个实现类,BaseRichSpout
和BaseRichBolt
。把一些并不是一定要用户实现的方法,提供了一个默认的实现,来简化我们的开发,这实际上就适配器设计模式。
作为第一个案例,为了尽量熟悉各个API中的方法,我们编写的WordReader
会去实现IRichSpout,因此其作用就是从外部数据源获取数据,WordNormalizer
和WordCounter
会实现IRichBolt,它们的作用是分析处理数据。
三 编写Topology
准备工作,创建一个maven工程wordcountapp,pom.xml如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion> 4.0.0</modelVersion > <groupId> com.tianshouzhi</groupId > <artifactId> wordcountapp</artifactId > <version> 0.0.1-SNAPSHOT</version > <dependencies> <dependency> <groupId> org.apache.storm</groupId > <artifactId> storm-core</artifactId > <version> 0.9.2-incubating</version > </dependency> </dependencies> </project>
1、创建WordReader(Spout)
我们的WordReader要实现IRichSpout接口,这个接口中,定义的方法包括:
目前我们最关心的是,用红色框标记出的三个方法,对于其他的方法,我们只打印出一句话。
WordReader.java
package com.tianshouzhi.study.wordcountapp.spouts; import java.io.BufferedReader; import java.io.FileReader; import java.io.InputStream; import java.io.InputStreamReader; import java.util.Map; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; /** * WordReader(Spout),用于从外部数据源words.txt中获取数据 */ public class WordReader implements IRichSpout { private SpoutOutputCollector collector ; private FileReader fileReader ; BufferedReader reader; private boolean completed = false; /** * 这个方法做的惟一一件事情就是分发文件中的文本行 */ public void nextTuple() { /** * 这个方法会不断的被调用,直到整个文件都读完了,我们将等待并返回。 */ if (completed ) { try { Thread. sleep(1000); } catch (InterruptedException e ) { // 什么也不做 } return; } String str; try { int i = 0; // 读所有文本行 while ((str = reader.readLine()) != null) { System. out.println("WordReader.nextTuple(),emits time:" + i++); /** * 按行发布一个新值 */ this.collector .emit(new Values( str), str ); } } catch (Exception e ) { throw new RuntimeException("Error reading tuple", e); } finally { completed = true ; } } /** * * 当Spout被创建之后,这个方法会被条用 */ public void open(Map conf, TopologyContext context , SpoutOutputCollector collector ) { System.out.println( "WordReader.open(Map conf, TopologyContext context, SpoutOutputCollector collector)"); String fileName = conf .get("fileName").toString(); InputStream inputStream=WordReader.class.getClassLoader().getResourceAsStream( fileName); reader =new BufferedReader(new InputStreamReader(inputStream )); this.collector = collector; } /** * 声明数据格式,即输出的一个Tuple中,包含几个字段 */ public void declareOutputFields(OutputFieldsDeclarer declarer) { System. out.println("WordReader.declareOutputFields(OutputFieldsDeclarer declarer)"); declarer.declare(new Fields("line")); } @Override public void activate() { System. out.println("WordReader.activate()" ); } @Override public void deactivate() { System. out.println("WordReader.deactivate()" ); } @Override public Map<String, Object> getComponentConfiguration() { System. out.println("WordReader.getComponentConfiguration()" ); return null ; } /** * 当一个Tuple处理成功时,会调用这个方法 */ public void ack(Object msgId) { System. out.println("WordReader.ack(Object msgId):" + msgId); } /** * 当Topology停止时,会调用这个方法 */ public void close() { System. out.println("WordReader.close()" ); } /** * 当一个Tuple处理失败时,会调用这个方法 */ public void fail(Object msgId) { System. out.println("WordReader.fail(Object msgId):" + msgId); } }
目前我们主要关注的是以下三个方法:
1、open方法
这个方法的声明如下:
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector )
当Spout被实例化之后,open方法会被调用一次。通常情况下,这个方法中,我们会做一些初始化的动作。例如建立与外部数据源的链接,在本例中,就是获取到了外部数据文件words.txt的读取器。
这个方法还传递进来了3个参数。
参数1:Map conf
conf对象中维护的是一些配置信息。例如在这个方法中,我们获取words.txt文件的位置就是通过
conf .get("wordsFile")这种方式来进行的。conf对象中还维护了一些其他的默认参数,读者自己可以将这个对象中的参数打印出来进行观察。
参数2:TopologyContext
context
这个参数中,包含了Topology运行时的上下文信息。要知道这个对象的作用,最简单的方式就是查看这个对象中,有哪些方法和字段,此处不做过多讲解。
参数3:SpoutOutputCollector
collector
从名字中,我们就可以看出,这个对象的作用,Spout输出收集器,我们之前说过Spout需要将数据以Tuple的形式发送给Bolt,就是通过这个对象的emit方法来实现的。
2、declareOutputFields方法
方法的声明如下:
public void declareOutputFields(OutputFieldsDeclarer declarer)
这个方法的作用是,声明由这个Spout输出的流中的每一个Tuple,包含哪些字段。 之前我们提到过,在Topology中,不同的Spout和Bolt以及Bolt与Bolt之间形成数据流中,同一个Stream中Tuple的数据格式一定是相同的,不同的数据流中的Tuple的数据格式可能相同也可能不同。那么一个Tuple中的数据格式到底是怎么样的呢?就是在这个方法中定义的。
例如在这里,我们声明了一个line字段。declarer .declare(new Fields( "line"));,这就相当于宣告,每个Tuple中只能含有一个字段,这个字段的名字叫做line。如果一个Tuple中有多个字段,那么我们在这里就要声明多个字段。
3、nextTuple方法
这个方法会被不断的调用,因为Spout需要不断的从外部数据源中获取最新的数据,然后使用SpoutOutputCollector的emit方法来进行发射。
补充:SpoutOutputCollector的emit方法
SpoutOutputCollector的emit方法有很多种重载形式,比较常用的是红色框中的几种。
在本例中,我们使用的是第二个emit方法。这个方法第一个参数是List集合。,但是我们的代码中使用的却是Values
这个对象。如下:
this .collector.emit( new Values(str ), str);
这是因为Values对象是ArrayList集合的子类。读者可以自行查看这个类的源码。这里主要说明,我们构造Values对象时,其构造方法接受一个可变参数,我们这里只传递了一个str,说明最终List集合中只有一个元素。而且只能是一个元素。因为我们我们已经在declareOutputFields
这个方法中声明过,只有一个字段line。
对于其他的几个方法,后面我们也会详细介绍,目前只是在每句话里面打印出了一句话。
2、创建WordNormalizer(实现IRichBolt)
WordNormalizer.java
package com.tianshouzhi.study.wordcountapp.bolts; import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class WordNormalizer implements IRichBolt{ /** * */ private static final long serialVersionUID = 3644849073824009317L; private OutputCollector collector ; /** * *bolt*从单词文件接收到文本行,并标准化它。 * 文本行会全部转化成小写,并切分它,从中得到所有单词。 */ public void execute(Tuple input ){ System.out.println( "WordNormalizer.execute()" ); String sentence = input .getString(0); String[] words = sentence .split(" "); for(String word : words){ word = word .trim(); if(!word .isEmpty()){ word=word .toLowerCase(); /*//发布这个单词*/ collector.emit(input ,new Values(word)); } } //对元组做出应答 collector.ack(input ); } public void prepare(Map stormConf, TopologyContext context , OutputCollector collector ) { System. out.println("WordNormalizer.prepare()" ); this. collector=collector ; } /** * 这个*bolt*只会发布“word”域 */ public void declareOutputFields(OutputFieldsDeclarer declarer) { System. out.println("WordNormalizer.declareOutputFields()" ); declarer.declare(new Fields("word")); } @Override public Map<String, Object> getComponentConfiguration() { System. out.println("WordNormalizer.getComponentConfiguration()" ); return null ; } public void cleanup(){ System. out.println("WordNormalizer.cleanup()" ); } }
在这个Bolt中,我们关心的方法是prepare
、execute
和declareOutputFields
,对于其他方法,我们依然是打印出一句话。
prepare 方法:
声明如下:
public void prepare( Map stormConf, TopologyContext context, OutputCollector collector )
这个方法与Spout的open方法参数很类似,唯一不同的就是最后一个参数,Spout中的open方法传递的是SpoutOutPutCollector,这里传递的是OutputCollector
。不过都是数据收集器。主要的区别是,emit方法可以多出一个anchor或者anchors的参数,这与消息的可靠性
保证有关,我们将会在后面详细讲解。如下红色框中全部的部分:
declareOutputFields方法:
因为Bolt还可以发送数据到下一级Bolt,因此,我们同样要指定这个Tuple中的数据格式。如果这个Bolt不要输出的话,我们就可以不声明。通常最后一个Bolt,因为没有下一级Bolt,这个方法就可以直接返回null。如我们下面要编写的WordCounter。
execute方法:
声明如下:
public void execute(Tuple input)
每当Bolt接受到一个Tuple的时候,就会调用一次execute方法。Tuple就是Spout传递给Bolt的数据。额..等等,我们在之前刚说过,本案例中WordReader发射数据时,使用的是下面这个方法:
明明发送的是一个List集合。但是这里接受参数却是Tuple类型。实际上是没有错的,Tuple是一个接口,我们可以看看其实现类TupleImpl中的源码。
相信看到这个读者应该明白了,Tuple实际上是将Spout端发送的数据进行了一层封装,原来的List集合还在,此外还封装了一些元数据。
因为我们原来的Spout每次发送的List<Object>集合中,只有一个元素,因此,在这里,就可以通过input.getString(0);来获取。
3、编写WordCounter(实现IRichBolt)
这个Bolt将统计到的最终结果放到一个map集合中。并在Bolt销毁时,调用
cleanup
方法时,打印出统计结果。
WordCounter.java
package com.tianshouzhi.study.wordcountapp.bolts; import java.util.HashMap; import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class WordCounter implements IRichBolt{ Integer id; String name; Map<String,Integer> counters; private OutputCollector collector; /** * 当Bolt销毁时,我们会显示单词数量 */ @Override public void cleanup(){ for(Map.Entry<String,Integer> entry : counters.entrySet()){ System. out.println(entry .getKey()+": "+ entry.getValue()); } System.out.println( "WordCounter.cleanup()" ); } /** * 为每个单词计数 */ @Override public void execute(Tuple input ) { System.out.println( "WordCounter.execute()" ); String str=input .getString(0); /** * 如果单词尚不存在于map,我们就创建一个,如果已在,我们就为它加1 */ if(!counters .containsKey(str)){ counters.put(str ,1); } else{ Integer c = counters.get(str ) + 1; counters.put(str ,c ); } //对元组作为应答 collector.ack(input ); } /** * 初始化 */ @Override public void prepare(Map stormConf, TopologyContext context , OutputCollector collector ){ this.counters = new HashMap<String, Integer>(); this.collector = collector; this.name = context.getThisComponentId(); this.id = context.getThisTaskId(); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { System. out.println("WordCounter.declareOutputFields()" ); } @Override public Map<String, Object> getComponentConfiguration() { System. out.println("WordCounter.getComponentConfiguration()" ); return null ;} }
这个Bolt因为不需要继续输出,所以declareOutputFields方法中仅仅是打印了一句话。
4、创建WordCountApp类
前面我们已经将构建Topology的各个组件都编写好了。现在我们需要编写一个类,来将组件组合成一个真正的Topology。
WordCountApp.java
package com.tianshouzhi.study.wordcountapp; import com.tianshouzhi.study.wordcountapp.bolts.WordCounter; import com.tianshouzhi.study.wordcountapp.bolts.WordNormalizer; import com.tianshouzhi.study.wordcountapp.spouts.WordReader; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.generated.StormTopology; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; public class WordCountApp { public static void main(String[] args) throws InterruptedException, AlreadyAliveException, InvalidTopologyException { //定义拓扑 TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("word-reader" , new WordReader()); builder.setBolt("word-normalizer" , new WordNormalizer()).shuffleGrouping("word-reader" ); builder.setBolt("word-counter" , new WordCounter()).fieldsGrouping("word-normalizer" , new Fields("word")); StormTopology topology = builder .createTopology(); //配置 Config conf = new Config(); String fileName ="words.txt" ; conf.put("fileName" , fileName ); conf.setDebug(false); //运行拓扑 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("Getting-Started-Topologie" , conf , topology ); Thread. sleep(5000); cluster.shutdown(); } }
TopologyBuilder
对象用于创建Topology。在我们设置完Spout和Bolt之后,最后通过对象的createTopology()方法来创建StormTopology
对象。
Config
对象是Storm中提供的一个配置类,其实Map的子类,还记得我们之前在WordReader中的open方法中,是通过conf对象来获取words.txt的文件的路径,从而创建BufferedReader。
Storm支持两种运行方式,本地模式和远程模式。在本例中,我们使用LocalCluster
这个类,模拟一个本地集群,从而使我们创建的代码在本地就可以运行,不用提交到Storm集群中。通常情况下,在开发的时候,我们会使用本地模式,在提测和产品环境下,我们才会将代码真正的提交到Storm集群去运行。
5、添加日志配置文件
Storm使用的日志框架是logback,默认的日志输出级别是INFO,因此在运行的时候会输出很多日志,影响我们的观察。因此,此处使用一个简单的logback.xml配置文件,将日志级别调整为ERROR。在后面,我们将会有一个章节,详细的来说明Storm集群中的日志相关问题。
logback.xml
<?xml version="1.0" encoding= "UTF-8"?> <configuration> <root level="ERROR" /> </configuration>
最终,我们的项目目录结构如下所示:
四、本地模式运行Topology
本地模式运行很简单,我们只需要右击-->Run as-->JavaApplication即可
运行后,控制台输出
WordReader.getComponentConfiguration() WordNormalizer.getComponentConfiguration() WordCounter.getComponentConfiguration() WordCounter.declareOutputFields() WordNormalizer.declareOutputFields() WordReader.declareOutputFields(OutputFieldsDeclarer declarer) filepath:/D:/git/cloud/storm/target/classes/wordsFile.dic WordNormalizer.prepare() WordReader.open(Map conf, TopologyContext context, SpoutOutputCollector collector) WordReader.activate() WordReader.nextTuple(),emits time:0 WordReader.nextTuple(),emits time:1 WordReader.nextTuple(),emits time:2 WordNormalizer.execute() WordNormalizer.execute() WordNormalizer.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordCounter.execute() WordReader.ack(Object msgId): WordReader.ack(Object msgId):A Storm Cluster can run many topologies at the same time WordReader.ack(Object msgId):Topology consists of spouts and bolts WordReader.ack(Object msgId):This is the first topology example a: 1 cluster: 1 storm: 1 topology: 2 bolts: 1 this: 1 run: 1 is: 1 many: 1 topologies: 1 example: 1 the: 2 can: 1 at: 1 same: 1 and: 1 of: 1 consists: 1 spouts: 1 time: 1 first: 1 WordCounter.cleanup() WordNormalizer.cleanup() WordReader.close()
从输出结果中,我们可以看到,单词个数的确被准确的统计出来了。而且从打印的日志中,我们也可以看到Spout和Bolt方法被调用的顺序。
Spout实例方法被调用的顺序为:getComponentConfiguration()-->declareOutputFields----->open-->active-->nextTuple(死循环)-->close。
我们可以类比J2EE中Servlet生命周期方法,把这个几个方法看做是一个IRichSpout实例生命周期方法。所谓声明周期方法,指的是,一个类实例从创建到销毁时,必定会执行的方法,而且有着一定的顺序。ack方法、fail方法、deactive等方法,因为不是Spout生命周期方法,所以并不一定会执行,根据设置,有些情况下会执行,有些情况下不会执行。(注意Storm官方并没有Spout生命周期这种说法,本人是为了方便读者理解,进行了对比)。
类似的,我们可以找出IRichBolt实例的生命周期方法为:getComponentConfiguration-->declareOutputFields()-->execute(执行多次)-->cleanup。prepare方法并不是生命周期方法。为什么这么说?我们可以看到在打印的日志中,WordNormalizer.prepare()执行了,但是WordCounter的prepare方法并没有执行。通常情况下,如果一个IRichBolt如果没有声明任何输出,即declareOutputFields方法返回的是null,则prepare方法不会执行。在案例中,Toplogy中数据流向的最后一个WordCounter因为不要发送数据给其他Bolt了,declareOutputFields返回的结果就是null,因此prepare方法没执行。分析这个有用吗?其实还是有点用的,因为prepare方法的名字很具有诱惑性,我们通常认为可以在这个方法中做一些初始化操作。不过我们现在已经理解了,如果是最后一个Bolt,这个代码是不会被调用的。
最后我们要注意的是,IRichSpout的active方法必须是在Spout数据流向经过的所有Bolt都实例化完成之后,才会被调用。举例来说:
对于这张图中,Spout1必须要在Bolt1、Bolt2、Bolt3、Bolt4都准备好的情况下,active方法才会被调用;而Spout2,只需要Bolt3准备好了之后,就会被调用。
在下一节中,我们将讲解如何以远程模式运行WordCountApp。