2.3 远程模式运行WordCountApp

2016-02-24 20:41:29 9,812 0

一、提交Topology到Storm集群中

所谓远程模式,就是将Topology提交到Storm集群中来运行。与Hadoop中map-reduce job一样,我们也是通过将代码打成一个jar包后,提交到Storm集群中运行。

我们并不能直接打包,需要做一点修改。如果我们的代码中,引用了除了Storm-core之外的第三方依赖jar包,我们必须要将依赖的包也打包进去,否则就会爆出找不到类的异常。虽然我们这里没有引入其他的第三方依赖,不过我们这里依然打算这样做。

有几个要注意的地方:

1、务必保证Maven中引入的storm依赖与我们安装在服务器上的storm集群的版本是一致的

因为如果不一致,例如服务器版本是0.9.2-incubating,我们再maven中引入的版本是0.9.5,这样可能会导致我们在编写top时,使用到的一些api可能在0.9.5版本中有,0.9.2-incubating版本中没有,因此可能会报错。

2、top的提交方法与LocalCluster是不同的

LocalCluster中,我们通常使用以下方式提交topology:

//提交top
 LocalCluster localCluster=new LocalCluster();
 localCluster.submitTopology(topName , config , topology );

在集群作业中,当提交一个topology的时候,我们要使用以下方式

StormSubmitter. submitTopology("topology-name" , config, topology );

不过为了测试方便,我们通长会在主方法中同时支持两种方式进行提交,有参数的时候使用StormSubmitter 提交;没有参数的时候,使用LocalCluster 提交。

如下所示:

//提交top
        if(args !=null&&args.length>0){ //有参数时,表示向集群提交作业,并把第一个参数当做topology名称
             StormSubmitter. submitTopology(args[0], config, topology);
       } else{//没有参数时,本地提交
             LocalCluster localCluster=new LocalCluster();
              localCluster.submitTopology("wordcountapp" , config , topology);
             Thread. sleep(10000);
              localCluster.shutdown();
       }

3、打包时,加入必要的依赖

我们开发的topology会使用到storm的api,但是由于storm集群本身已经有了这些api,所以我们在maven打包的时候,不需要打包进storm相关jar。所以要加上provided

<dependency>
    <groupId> org.apache.storm</groupId >
    <artifactId> storm-core</artifactId >
    <version>0.9.2-incubating</version >
    <scope> provided</scope >
</dependency>

但是对于一些storm集群本身没有提供的依赖,例如,我们之前提到的,Storm的最佳数据源是消息中间件,如果我们以后的案例使用到了RocketMq,我们需要将其打包进去。默认情况下,maven打包时只会将我们自己开发的代码进行打包,但是依赖的所有jar包都不会被打包进去。此时利用Maven Assembly插件来实现这个功能,在wordCountApp项目的pom.xml文件中添加以下代码

        <build>
             <plugins>
                   <plugin>
                <artifactId> maven-assembly-plugin</artifactId>
                <configuration>
                  <descriptorRefs>
                    <descriptorRef> jar-with-dependencies</descriptorRef >
                  </descriptorRefs>
                  <archive>
                    <manifest>
                      <mainClass> com.tianshouzhi.study.wordcountapp.WordCountApp</mainClass >
                    </manifest>
                  </archive>
                </configuration>
              </plugin>
             </plugins>
       </build>

打包的时候,使用命令

mvn assembly:assembly

我们可以看到target目录下回出现两个大包好的jar,一个是带依赖的,一个是不带依赖的。

Image.png

我们将名字比较长的那个提交到storm集群中进行运行。

提交Topology的命令格式:

storm jar path/to/allmycode.jar path.main.class arg1 arg2 arg3

在执行这个命令之前,我再次介绍以下我的storm集群配置

在static.tianshouzhi.com这台机器上:同时启动了nimbus和supervisor,还启动了Storm UI

在www.huhuamin.com这台机器上,启动了supervisor


现在我们在nimbus节点 static.tianshouzhi.com上执行以下命令

storm jar wordcountapp-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.tianshouzhi.study.wordcountapp.WordCountApp wordcountapp

(关于Storm命令客户端到底可以提交哪些参数,我们在后文会有专门的一节进行讲解。)

提交之后,我们可以通过storm list命令来查看现在storm集群中有几个Topology在运行

Image.png

可以看到我们的Topology已经成功运行在Storm集群中了。列出的结果中,还有几个其他的项,目前我们关心的是Num_Workers。我们之前提到过,一个Topology可以运行在多个Worker中,在我们的WordCountApp案例中,并没有设置,所以采用了默认值,1个Worker。