2.2 WordCountApp

2016-01-27 21:36:44 12,524 12

在本节中,我们创建一个名字叫WordCountApp的Topology。这个Topology的作用是统计一个名为words.txt文本中每个单词出现的次数。我们一步一步的分析,是如何根据我们需求,最终创建出Topology。

一、Topology组件分析

假设我们的words.txt中内容如下:

A Storm Cluster can run many topologies at the same time 
Topology consists of spouts and bolts
This is the first topology example

这个Topology可能会由以下的组件组成:

WordReader(Spout):我们需要从外数据源wrods.txt中获取数据,这里的words.txt其实就是外部数据源。为了明确这个Spout的作用,我们将其称之为WordReader。

WordNormalizer(Bolt):这个Bolt作用是格式化单词。通常情况下,我们统计一个单词出现次数的时候,是不区分大小写的。所以我们,必须要有这样一个环节来转换大小写。事实上,如果我们的文本中,还出现了标点符号",","."等,我们还要将这些符号去除,这就是WordNormalizer的作用。为了简单,我们这里并没有出现符号。

WordCounter(Bolt):用于统计经过WordNormalizer处理后的单词的出现次数。

因次,我们的WordCountApp这个Topology可能如下所示:

Image.png

二、API简介

Components组件API介绍

构建一个Topology有很多,根据我们需求的不同,我们可能会使用不同的API,下图列出了,本节中我们会提到的,或者在WordCountApp案例中会涉及到的API。

Strom Components API.png

我们之前提过,Topology中的Spout和Bolt称之为Topology的组件(Components)。

在Storm API中,定义了一个IComponent接口表示组件,用ISpout表示一个Spout,IBolt表示一个Bolt。IRichSpout接口分别继承了IComponent和ISpout接口,意味着这个接口的实现类,既是Spout,又是Topology的组件。

IRichBolt接口集成了IComponent接口和IBolt接口,表示这个接口的实现类即是Component,又是Bolt。

因此在开发中,我们通常只要实现IRichSpout和IRichBolt即可。由于IRichBolt和IRichSpout接口中定义的方法比较多。有些方法我们可能并不想实现,因此分别提供了一个实现类,BaseRichSpoutBaseRichBolt。把一些并不是一定要用户实现的方法,提供了一个默认的实现,来简化我们的开发,这实际上就适配器设计模式。

作为第一个案例,为了尽量熟悉各个API中的方法,我们编写的WordReader会去实现IRichSpout,因此其作用就是从外部数据源获取数据,WordNormalizerWordCounter 会实现IRichBolt,它们的作用是分析处理数据。


三 编写Topology

准备工作,创建一个maven工程wordcountapp,pom.xml如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
             http://maven.apache.org/xsd/maven-4.0.0.xsd" >
       <modelVersion> 4.0.0</modelVersion >
       <groupId> com.tianshouzhi</groupId >
       <artifactId> wordcountapp</artifactId >
       <version> 0.0.1-SNAPSHOT</version >
       <dependencies>
             <dependency>
                   <groupId> org.apache.storm</groupId >
                   <artifactId> storm-core</artifactId >
                   <version> 0.9.2-incubating</version >
             </dependency>
       </dependencies>

</project>


1、创建WordReader(Spout)

我们的WordReader要实现IRichSpout接口,这个接口中,定义的方法包括:

Image.png

目前我们最关心的是,用红色框标记出的三个方法,对于其他的方法,我们只打印出一句话。

WordReader.java

package com.tianshouzhi.study.wordcountapp.spouts;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

/**
 * WordReader(Spout),用于从外部数据源words.txt中获取数据
 */
public class WordReader implements IRichSpout {
       private SpoutOutputCollector collector ;
       private FileReader fileReader ;
       BufferedReader reader;
       private boolean completed = false;

       /**
       * 这个方法做的惟一一件事情就是分发文件中的文本行
       */
       public void nextTuple() {
             /**
             * 这个方法会不断的被调用,直到整个文件都读完了,我们将等待并返回。
             */
             if (completed ) {
                   try {
                        Thread. sleep(1000);
                  } catch (InterruptedException e ) {
                         // 什么也不做
                  }
                   return;
            }
            String str;
             
             try {
                   int i = 0;
                   // 读所有文本行
                   while ((str = reader.readLine()) != null) {
                        System. out.println("WordReader.nextTuple(),emits time:" + i++);
                         /**
                         * 按行发布一个新值
                         */
                         this.collector .emit(new Values( str), str );
                  }
            } catch (Exception e ) {
                   throw new RuntimeException("Error reading tuple", e);
            } finally {
                   completed = true ;
            }
      }

       /**
       *
       * 当Spout被创建之后,这个方法会被条用
       */
       public void open(Map conf, TopologyContext context , SpoutOutputCollector collector ) {
      
    	   System.out.println( "WordReader.open(Map conf, TopologyContext context, SpoutOutputCollector collector)");
            String fileName = conf .get("fileName").toString();
            InputStream inputStream=WordReader.class.getClassLoader().getResourceAsStream( fileName);
             reader =new BufferedReader(new InputStreamReader(inputStream ));
             this.collector = collector;
      }

       /**
       * 声明数据格式,即输出的一个Tuple中,包含几个字段
       */
       public void declareOutputFields(OutputFieldsDeclarer declarer) {
            System. out.println("WordReader.declareOutputFields(OutputFieldsDeclarer declarer)");
             declarer.declare(new Fields("line"));
      }

       @Override
       public void activate() {
            System. out.println("WordReader.activate()" );
      }

       @Override
       public void deactivate() {
            System. out.println("WordReader.deactivate()" );
      }

       @Override
       public Map<String, Object> getComponentConfiguration() {
            System. out.println("WordReader.getComponentConfiguration()" );
             return null ;
      }

       /**
       * 当一个Tuple处理成功时,会调用这个方法
       */
       public void ack(Object msgId) {
            System. out.println("WordReader.ack(Object msgId):" + msgId);
      }

       /**
       * 当Topology停止时,会调用这个方法
       */
       public void close() {
            System. out.println("WordReader.close()" );
      }

       /**
       * 当一个Tuple处理失败时,会调用这个方法
       */
       public void fail(Object msgId) {
            System. out.println("WordReader.fail(Object msgId):" + msgId);
      }
}

目前我们主要关注的是以下三个方法:

1、open方法

这个方法的声明如下:

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector )

当Spout被实例化之后,open方法会被调用一次。通常情况下,这个方法中,我们会做一些初始化的动作。例如建立与外部数据源的链接,在本例中,就是获取到了外部数据文件words.txt的读取器。

这个方法还传递进来了3个参数。

参数1:Map conf

     conf对象中维护的是一些配置信息。例如在这个方法中,我们获取words.txt文件的位置就是通过

conf .get("wordsFile")这种方式来进行的。conf对象中还维护了一些其他的默认参数,读者自己可以将这个对象中的参数打印出来进行观察。

参数2:TopologyContext context

     这个参数中,包含了Topology运行时的上下文信息。要知道这个对象的作用,最简单的方式就是查看这个对象中,有哪些方法和字段,此处不做过多讲解。

参数3:SpoutOutputCollector collector

     从名字中,我们就可以看出,这个对象的作用,Spout输出收集器,我们之前说过Spout需要将数据以Tuple的形式发送给Bolt,就是通过这个对象的emit方法来实现的。

2、declareOutputFields方法

方法的声明如下:

public void declareOutputFields(OutputFieldsDeclarer declarer)

这个方法的作用是,声明由这个Spout输出的流中的每一个Tuple,包含哪些字段。 之前我们提到过,在Topology中,不同的Spout和Bolt以及Bolt与Bolt之间形成数据流中,同一个Stream中Tuple的数据格式一定是相同的,不同的数据流中的Tuple的数据格式可能相同也可能不同。那么一个Tuple中的数据格式到底是怎么样的呢?就是在这个方法中定义的。

例如在这里,我们声明了一个line字段。declarer .declare(new Fields( "line"));,这就相当于宣告,每个Tuple中只能含有一个字段,这个字段的名字叫做line。如果一个Tuple中有多个字段,那么我们在这里就要声明多个字段。

3、nextTuple方法

这个方法会被不断的调用,因为Spout需要不断的从外部数据源中获取最新的数据,然后使用SpoutOutputCollector的emit方法来进行发射。

补充:SpoutOutputCollector的emit方法

SpoutOutputCollector的emit方法有很多种重载形式,比较常用的是红色框中的几种。

Image.png

在本例中,我们使用的是第二个emit方法。这个方法第一个参数是List集合。,但是我们的代码中使用的却是Values这个对象。如下:

this .collector.emit( new Values(str ), str);

这是因为Values对象是ArrayList集合的子类。读者可以自行查看这个类的源码。这里主要说明,我们构造Values对象时,其构造方法接受一个可变参数,我们这里只传递了一个str,说明最终List集合中只有一个元素。而且只能是一个元素。因为我们我们已经在declareOutputFields这个方法中声明过,只有一个字段line。

对于其他的几个方法,后面我们也会详细介绍,目前只是在每句话里面打印出了一句话。

2、创建WordNormalizer(实现IRichBolt)

WordNormalizer.java

package com.tianshouzhi.study.wordcountapp.bolts;

import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class WordNormalizer implements IRichBolt{
    /**
       *
       */
       private static final long serialVersionUID = 3644849073824009317L;
       private OutputCollector collector ;
  
    /**
      * *bolt*从单词文件接收到文本行,并标准化它。
      * 文本行会全部转化成小写,并切分它,从中得到所有单词。
     */
    public void execute(Tuple input ){
System.out.println( "WordNormalizer.execute()" );
        String sentence = input .getString(0);
        String[] words = sentence .split(" ");
        for(String word : words){
            word = word .trim();
            if(!word .isEmpty()){
                word=word .toLowerCase();
                /*//发布这个单词*/
                collector.emit(input ,new Values(word));
            }
        }
        //对元组做出应答
        collector.ack(input );
    }
    public void prepare(Map stormConf, TopologyContext context , OutputCollector collector ) {
        System. out.println("WordNormalizer.prepare()" );
      this. collector=collector ;
    }

    /**
      * 这个*bolt*只会发布“word”域
      */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      System. out.println("WordNormalizer.declareOutputFields()" );
        declarer.declare(new Fields("word"));
    }
       @Override
       public Map<String, Object> getComponentConfiguration() {
            System. out.println("WordNormalizer.getComponentConfiguration()" );
             return null ;
      }
       public void cleanup(){
          System. out.println("WordNormalizer.cleanup()" );   
          }
}

在这个Bolt中,我们关心的方法是prepareexecutedeclareOutputFields,对于其他方法,我们依然是打印出一句话。

prepare 方法:

声明如下:

public void prepare( Map stormConf, TopologyContext context, OutputCollector collector )

这个方法与Spout的open方法参数很类似,唯一不同的就是最后一个参数,Spout中的open方法传递的是SpoutOutPutCollector,这里传递的是OutputCollector。不过都是数据收集器。主要的区别是,emit方法可以多出一个anchor或者anchors的参数,这与消息的可靠性保证有关,我们将会在后面详细讲解。如下红色框中全部的部分:

Image.png

declareOutputFields方法:

因为Bolt还可以发送数据到下一级Bolt,因此,我们同样要指定这个Tuple中的数据格式。如果这个Bolt不要输出的话,我们就可以不声明。通常最后一个Bolt,因为没有下一级Bolt,这个方法就可以直接返回null。如我们下面要编写的WordCounter。

execute方法:

声明如下:

public void execute(Tuple input)

每当Bolt接受到一个Tuple的时候,就会调用一次execute方法。Tuple就是Spout传递给Bolt的数据。额..等等,我们在之前刚说过,本案例中WordReader发射数据时,使用的是下面这个方法:

Image.png

明明发送的是一个List集合。但是这里接受参数却是Tuple类型。实际上是没有错的,Tuple是一个接口,我们可以看看其实现类TupleImpl中的源码。

Image.png

相信看到这个读者应该明白了,Tuple实际上是将Spout端发送的数据进行了一层封装,原来的List集合还在,此外还封装了一些元数据。

因为我们原来的Spout每次发送的List<Object>集合中,只有一个元素,因此,在这里,就可以通过input.getString(0);来获取。

3、编写WordCounter(实现IRichBolt)

这个Bolt将统计到的最终结果放到一个map集合中。并在Bolt销毁时,调用cleanup方法时,打印出统计结果。

WordCounter.java

package com.tianshouzhi.study.wordcountapp.bolts;

import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class WordCounter implements IRichBolt{
    Integer id;
    String name;
    Map<String,Integer> counters;
    private OutputCollector collector;

    /**
      * 当Bolt销毁时,我们会显示单词数量
      */
    @Override
    public void cleanup(){
        
        for(Map.Entry<String,Integer> entry : counters.entrySet()){
            System. out.println(entry .getKey()+": "+ entry.getValue());
        }
System.out.println( "WordCounter.cleanup()" );
    }

    /**
     *  为每个单词计数
     */
    @Override
    public void execute(Tuple input ) {
System.out.println( "WordCounter.execute()" );
        String str=input .getString(0);
        /**
         * 如果单词尚不存在于map,我们就创建一个,如果已在,我们就为它加1
         */
        if(!counters .containsKey(str)){
            counters.put(str ,1);
        } else{
            Integer c = counters.get(str ) + 1;
            counters.put(str ,c );
        }
        //对元组作为应答
        collector.ack(input );
    }

    /**
     * 初始化
     */
    @Override
    public void prepare(Map stormConf, TopologyContext context , OutputCollector collector ){
        this.counters = new HashMap<String, Integer>();
        this.collector = collector;
        this.name = context.getThisComponentId();
        this.id = context.getThisTaskId();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      System. out.println("WordCounter.declareOutputFields()" );
    }

       @Override
       public Map<String, Object> getComponentConfiguration() {
            System. out.println("WordCounter.getComponentConfiguration()" );
             return null ;}
}

这个Bolt因为不需要继续输出,所以declareOutputFields方法中仅仅是打印了一句话。

4、创建WordCountApp类

前面我们已经将构建Topology的各个组件都编写好了。现在我们需要编写一个类,来将组件组合成一个真正的Topology。

WordCountApp.java

package com.tianshouzhi.study.wordcountapp;

import com.tianshouzhi.study.wordcountapp.bolts.WordCounter;
import com.tianshouzhi.study.wordcountapp.bolts.WordNormalizer;
import com.tianshouzhi.study.wordcountapp.spouts.WordReader;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

public class WordCountApp {
    public static void main(String[] args) throws InterruptedException, AlreadyAliveException, InvalidTopologyException {
    //定义拓扑
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("word-reader" , new WordReader());
        builder.setBolt("word-normalizer" , new WordNormalizer()).shuffleGrouping("word-reader" );
        builder.setBolt("word-counter" , new WordCounter()).fieldsGrouping("word-normalizer" , new Fields("word"));
        StormTopology topology = builder .createTopology();
    //配置
        
        Config conf = new Config();
        String fileName ="words.txt" ;
        conf.put("fileName" , fileName );
        conf.setDebug(false);

      //运行拓扑
      
             LocalCluster cluster = new LocalCluster();
             cluster.submitTopology("Getting-Started-Topologie" , conf , topology );
             Thread. sleep(5000);
             cluster.shutdown();
       
    }
}

TopologyBuilder对象用于创建Topology。在我们设置完Spout和Bolt之后,最后通过对象的createTopology()方法来创建StormTopology对象。

Config对象是Storm中提供的一个配置类,其实Map的子类,还记得我们之前在WordReader中的open方法中,是通过conf对象来获取words.txt的文件的路径,从而创建BufferedReader。

Storm支持两种运行方式,本地模式和远程模式。在本例中,我们使用LocalCluster这个类,模拟一个本地集群,从而使我们创建的代码在本地就可以运行,不用提交到Storm集群中。通常情况下,在开发的时候,我们会使用本地模式,在提测和产品环境下,我们才会将代码真正的提交到Storm集群去运行。

5、添加日志配置文件

Storm使用的日志框架是logback,默认的日志输出级别是INFO,因此在运行的时候会输出很多日志,影响我们的观察。因此,此处使用一个简单的logback.xml配置文件,将日志级别调整为ERROR。在后面,我们将会有一个章节,详细的来说明Storm集群中的日志相关问题。

logback.xml

<?xml version="1.0" encoding= "UTF-8"?>
<configuration>
  <root level="ERROR" />
</configuration>

最终,我们的项目目录结构如下所示:

QQ截图20160127021150.png

四、本地模式运行Topology

本地模式运行很简单,我们只需要右击-->Run as-->JavaApplication即可

运行后,控制台输出

WordReader.getComponentConfiguration()
WordNormalizer.getComponentConfiguration()
WordCounter.getComponentConfiguration()
WordCounter.declareOutputFields()
WordNormalizer.declareOutputFields()
WordReader.declareOutputFields(OutputFieldsDeclarer declarer)
filepath:/D:/git/cloud/storm/target/classes/wordsFile.dic
WordNormalizer.prepare()
WordReader.open(Map conf, TopologyContext context, SpoutOutputCollector collector)
WordReader.activate()
WordReader.nextTuple(),emits time:0
WordReader.nextTuple(),emits time:1
WordReader.nextTuple(),emits time:2
WordNormalizer.execute()
WordNormalizer.execute()
WordNormalizer.execute()
WordCounter.execute()
WordCounter.execute()
WordCounter.execute()
WordCounter.execute()
WordCounter.execute()
WordCounter.execute()
WordCounter.execute()
WordCounter.execute()
WordCounter.execute()
WordCounter.execute()
WordCounter.execute()
WordCounter.execute()
WordCounter.execute()
WordCounter.execute()
WordCounter.execute()
WordCounter.execute()
WordCounter.execute()
WordCounter.execute()
WordCounter.execute()
WordCounter.execute()
WordCounter.execute()
WordCounter.execute()
WordCounter.execute()
WordReader.ack(Object msgId):
WordReader.ack(Object msgId):A Storm Cluster can run many topologies at the same time 
WordReader.ack(Object msgId):Topology consists of spouts and bolts
WordReader.ack(Object msgId):This is the first topology example
a: 1
cluster: 1
storm: 1
topology: 2
bolts: 1
this: 1
run: 1
is: 1
many: 1
topologies: 1
example: 1
the: 2
can: 1
at: 1
same: 1
and: 1
of: 1
consists: 1
spouts: 1
time: 1
first: 1
WordCounter.cleanup()
WordNormalizer.cleanup()
WordReader.close()

从输出结果中,我们可以看到,单词个数的确被准确的统计出来了。而且从打印的日志中,我们也可以看到Spout和Bolt方法被调用的顺序。

Spout实例方法被调用的顺序为:getComponentConfiguration()-->declareOutputFields----->open-->active-->nextTuple(死循环)-->close。

我们可以类比J2EE中Servlet生命周期方法,把这个几个方法看做是一个IRichSpout实例生命周期方法。所谓声明周期方法,指的是,一个类实例从创建到销毁时,必定会执行的方法,而且有着一定的顺序。ack方法、fail方法、deactive等方法,因为不是Spout生命周期方法,所以并不一定会执行,根据设置,有些情况下会执行,有些情况下不会执行。(注意Storm官方并没有Spout生命周期这种说法,本人是为了方便读者理解,进行了对比)。

类似的,我们可以找出IRichBolt实例的生命周期方法为:getComponentConfiguration-->declareOutputFields()-->execute(执行多次)-->cleanup。prepare方法并不是生命周期方法。为什么这么说?我们可以看到在打印的日志中,WordNormalizer.prepare()执行了,但是WordCounter的prepare方法并没有执行。通常情况下,如果一个IRichBolt如果没有声明任何输出,即declareOutputFields方法返回的是null,则prepare方法不会执行。在案例中,Toplogy中数据流向的最后一个WordCounter因为不要发送数据给其他Bolt了,declareOutputFields返回的结果就是null,因此prepare方法没执行。分析这个有用吗?其实还是有点用的,因为prepare方法的名字很具有诱惑性,我们通常认为可以在这个方法中做一些初始化操作。不过我们现在已经理解了,如果是最后一个Bolt,这个代码是不会被调用的。

最后我们要注意的是,IRichSpout的active方法必须是在Spout数据流向经过的所有Bolt都实例化完成之后,才会被调用。举例来说:

Image.png

对于这张图中,Spout1必须要在Bolt1、Bolt2、Bolt3、Bolt4都准备好的情况下,active方法才会被调用;而Spout2,只需要Bolt3准备好了之后,就会被调用。

在下一节中,我们将讲解如何以远程模式运行WordCountApp。