4.0 Trident快速入门

2016-02-01 00:57:09 6,588 0

Trident是在storm基础上,一个以realtime计算为目标的高度抽象

它在提供处理大吞吐量数据能力的同时,也提供了低延时分布式查询(DRPC)和有状态(实时保存最新的聚合信息,重启无影响)流式处理的能力。如果你对Pig和Cascading这种高级批量处理工具很了解的话,那么应该毕竟容易理解Trident,因为他们之间很多的概念和思想都是类似的。Tident提供了 joins, aggregations, grouping, functions, 以及 filters等能力。

除此之外,Trident 还提供了一些专门的原语,从而在基于数据库或者其他存储的前提下来应付有状态的递增式处理。

Trident是完全容错的,拥有有且只有一次处理的语义,其实就是transactional的高级封装。这就让你可以很轻松的使用Trident来进行实时数据处理。Trident会把状态以某种形式保持起来,当有错误发生时,它会根据需要来恢复这些状态。

有了前边事务的基础,学习Trident会容易一些。

举例说明

让我们一起来看一个Trident的例子。在这个例子中,我们主要做了两件事情:

1. 从一个流式输入中读取语句病计算每个单词的个数

2. 提供查询给定单词列表中每个单词当前总数的功能

因为这只是一个例子,我们会从如下这样一个无限的输入流中读取语句作为输入:

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
               new Values("the cow jumped over the moon"),
               new Values("the man went to the store and bought some candy"),
               new Values("four score and seven years ago"),
               new Values("how many apples can you eat"),spout.setCycle(true);

这个spout会循环输出列出的那些语句到sentence stream当中,下面的代码会以这个stream作为输入并计算每个单词的个数:

TridentTopology topology = new TridentTopology();       
TridentState wordCounts =
     topology.newStream("spout1", spout)
       .each(new Fields("sentence"), new Split(), new Fields("word"))
       .groupBy(new Fields("word"))
       .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))               
       .parallelismHint(6);

让我们一起来读一下这段代码。我们首先创建了一个TridentTopology对象。TridentTopology类相应的接口来构造Trident计算过程中的所有内容。我们在调用了TridentTopology类的newStream方法时,传入了一个spout对象,spout对象会从外部读取数据并输出到当前topology当中,从而在topology中创建了一个新的数据流。在这个例子中,我们使用了上面定义的FixedBatchSpout 对象。输入数据源同样也可以是如Kestrel或者Kafka这样的队列服务。Trident会再Zookeeper中保存一小部分状态信息来追踪数据的 处理情况,而在代码中我们指定的字符串“spout1”就是Zookeeper中用来存储metadata信息的Znode节点

Trident在处理输入stream的时候会把输入转换成若干个tuple的batch来处理。比如说,输入的sentence stream可能会被拆分成如下的batch:

Image.png

一般来说,这些小的batch中的tuple可能会在数千或者数百万这样的数量级,这完全取决于你的输入的吞吐量。

Trident提供了一系列非常成熟的批量处理的API来处理这些小batch. 这些API和你在Pig或者Cascading中看到的非常类似, 你可以做group by's, joins, aggregations, 运行 functions, 执行 filters等等。当然,独立的处理每个小的batch并不是非常有趣的事情,所以Trident提供了很多功能来实现batch之间的聚合的结果并可 以将这些聚合的结果存储到内存,Memcached, Cassandra或者是一些其他的存储中。同时,Trident还提供了非常好的功能来查询实时状态。这些实时状态可以被Trident更新,同时它也 可以是一个独立的状态源。

回到我们的这个例子中来,spout输出了一个只有单一字段“sentence”的数据流。在下一行,topology使用了Split函数来拆分 stream中的每一个tuple,Split函数读取输入流中的“sentence”字段并将其拆分成若干个word tuple。每一个sentence tuple可能会被转换成多个word tuple,比如说"the cow jumped over the moon" 会被转换成6个 "word" tuples. 下面是Split的定义:

public class Split extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
       String sentence = tuple.getString(0);
       for(String word: sentence.split(" ")) {
           collector.emit(new Values(word));               
       }
   }}

如你所见,真的很简单。它只是简单的根据空格拆分sentence,并将拆分出的每个单词作为一个tuple输出。

topology的其他部分计算单词的个数并将计算结果保存到了持久存储中。首先,word stream被根据“word”字段进行group操作,然后每一个group使用Count聚合器进行持久化聚合。persistentAggregate会帮助你把一个状态源聚合的结果存储或者更新到存储当中。在这个例子中,单词的数量被保持在内存中,不过我们可以很简单的把这些数据保存到其他的存储当中,如 Memcached, Cassandra等。如果我们要把结果存储到Memcached中,只是简单的使用下面这句话替换掉persistentAggregate就可以,这当中的"serverLocations"是Memcached cluster的主机和端口号列表:

.persistentAggregate(MemcachedState.transactional(serverLocations), new Count(), new Fields("count"))       
MemcachedState.transactional()

persistentAggregate存储的数据就是所有batch聚合的结果。

Trident非常酷的一点就是它是完全容错的,拥有者有且只有一次处理的语义。这就让你可以很轻松的使用Trident来进行实时数据处理。Trident会把状态以某种形式保持起来,当有错误发生时,它会根据需要来恢复这些状态。

persistentAggregate方法会把数据流转换成一个TridentState对象。在这个例子当中,TridentState对象代表了所有的单词的数量。我们会使用这个TridentState对象来实现在计算过程中的进行分布式查询。

下面这部分实现了一个低延时的单词数量的分布式查询。这个查询以一个用空格分割的单词列表为输入,并返回这些单词当天的个数。这些查询是像普通的RPC调用那样被执行的,要说不同的话,那就是他们在后台是并行执行的。下面是执行查询的一个例子:

DRPCClient client = new DRPCClient("drpc.server.location", 3772);
// prints the JSON-encoded result, e.g.: "[[5078]]"
System.out.println(client.execute("words", "cat dog the man");

如你所见,除了这是并发执行在storm cluster上之外,这看上去就是一个正常的RPC调用。这样的简单查询的延时通常在10毫秒左右。当然,更负责的DRPC调用可能会占用更长的时间,尽管延时很大程度上是取决于你给计算分配了多少资源。

这个分布式查询的实现如下所示:

topology.newDRPCStream("words")
       .each(new Fields("args"), new Split(), new Fields("word"))
       .groupBy(new Fields("word"))
       .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
       .each(new Fields("count"), new FilterNull())
       .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

我们仍然是使用TridentTopology对象来创建DRPC stream,并且我们将这个函数命名为“words”。这个函数名会作为第一个参数在使用DRPC Client来执行查询的时候用到。

每个DRPC请求会被当做只有一个tuple的batch来处理。在处理的过程中,以这个输入的单一tuple来表示这个请求。这个tuple包含了一个 叫做“args”的字段,在这个字段中保存了客户端提供的查询参数。在这个例子中,这个参数是一个以空格分割的单词列表。

首先,我们使用Split功能把入参拆分成独立的单词。然后对“word” 进行group by操作,之后就可以使用stateQuery来在上面代码中创建的TridentState对象上进行查询。stateQuery接受一个数据源(在这个例子中,就是我们的topolgoy所计算的单词的个数)以及一个用于查询的函数作为输入。在这个例子中,我们使用了MapGet函数来获取每个单词的出现个数。由于DRPC stream是使用跟TridentState完全同样的group方式(按照“word”字段进行group),每个单词的查询会被路由到TridentState对象管理和更新这个单词的分区去执行。

接下来,我们用FilterNull这个过滤器把从未出现过的单词给去掉,并使用Sum这个聚合器将这些count累加起来。最终,Trident会自动把这个结果发送回等待的客户端。

Trident在如何最大程度的保证执行topogloy性能方面是非常智能的。在topology中会自动的发生两件非常有意思的事情:

读取和更新状态的操作 (比如说 persistentAggregate 和 stateQuery) 会自动的是batch的形式操作状态。 如果有20次更新需要被同步到存储中,Trident会自动的把这些操作汇总到一起,只做一次读一次写,而不是进行20次读20次写的操作。因此你可以在 很方便的执行计算的同时,保证了非常好的性能。

Trident 的聚合器已经是被优化的非常好了的。Trident并不是简单的把一个group中所有的tuples都发送到同一个机器上面进行聚合,而是在发送之前已 经进行过一次部分的聚合。打个比方,Count聚合器会先在每个partition上面进行count,然后把每个分片count汇总到一起就得到了最终 的count。这个技术其实就跟MapReduce里面的combiner是一个思想。