7

我正在学习Trident框架。Trident 上有几种方法可以Stream批次中聚合元组,包括这个允许使用Aggregator接口执行元组的有状态映射的方法。但不幸的是,不存在用于额外持久化地图状态的内置对应项,就像其他 9 个重载一样persistentAggregate(),仅使用Aggregator作为参数。

因此,我如何通过结合较低级别的 Trident 和 Storm 抽象和工具来实现所需的功能?探索 API 非常困难,因为几乎没有 Javadoc 文档。

换句话说,persistentAggregate()方法允许通过更新一些持久状态来结束流处理:

stream of tuples ---> persistent state

我想更新持久状态并顺便发出不同的元组:

stream of tuples ------> stream of different tuples
                  with
            persistent state

Stream.aggregate(Fields, Aggregator, Fields)不提供容错:

stream of tuples ------> stream of different tuples
                  with
          simple in-memory state
4

1 回答 1

3

You can create a new stream from a state using the method TridentState#newValuesStream(). This will allow you to retrieve a stream of the aggregated values.

For illustrative purpose, we can improve the example in Trident documentation by adding this method and a Debug Filter :

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
    new Values("the cow jumped over the moon"),
    new Values("the man went to the store and bought some candy"),
    new Values("four score and seven years ago"),
    new Values("how many apples can you eat"));
spout.setCycle(true);

TridentTopology topology = new TridentTopology();        
topology.newStream("spout1", spout)
    .each(new Fields("sentence"), new Split(), new Fields("word"))
    .groupBy(new Fields("word"))
    .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))                
    .newValuesStream().each(new Fields("count"), new Debug());

Running this topology will output (to the console) the aggregated counts.

Hope it helps

于 2013-11-27T10:29:55.840 回答