0

例如,我有这些批量大小为 5 的元组,其中包含来自用户的印象:

Batch 1:
[UUID1, clientId1]
[UUID2, clientId1]
[UUID2, clientId1]
[UUID2, clientId1]
[UUID3, clientId2]

Batch 2:
[UUID4, clientId1]
[UUID5, clientId1]
[UUID5, clientId1]
[UUID6, clientId2]
[UUID6, clientId2]

这是我保存计数状态的示例:

TridentState ClientState = impressionStream
    .groupBy(new Fields("clientId"))
    .persistentAggregate(getCassandraStateFactory("users", "DataComputation",
        "UserImpressionCounter"), new Count(), new Fields("count));

Stream ClientStream = ClientState.newValuesStream();

我有清晰的数据库并运行我的拓扑。按 clientId 对流进行分组后,我使用 persistentAggregate 函数和 Count 聚合器保存状态。第一批是 newValuesStream 方法后的结果:[clientId1, 4], [clientId2, 1]。对于第二批:[clientId1, 7][clientId2, 3]正如预期的那样。

ClientStream 在几个分支中使用,在其中一个分支中,我需要处理元组以便拥有大小为 1 的批次,因为我需要有关每个元组的计数的信息。大小为 1 的批次显然是废话,所以我必须在更新计数器之前以某种方式找出计数器的先前状态,并使用元组发出此信息,那里已经更新了计数器,例如第二批[clientId1, 7, 4]

有人知道怎么做吗?

4

1 回答 1

0

我通过添加新的聚合器并加入持久聚合解决了这个问题:

TridentState ClientState = impressionStream
    .groupBy(new Fields("clientId"))
    .persistentAggregate(getCassandraStateFactory("users", "DataComputation",
        "UserImpressionCounter"), new Count(), new Fields("count));

Stream ClientBatchAggregationStream = impressionStream
    .groupBy(new Fields("clientId"))
    .aggregate(new SumCountAggregator(), new Fields("batchCount"));

Stream GroupingPeriodCounterStateStream = topology
    .join(ClientState.newValuesStream(), new Fields("clientId"),
        ClientBatchAggregationStream, new Fields("clientId"), 
        new Fields("clientId", "count", "batchCount"));

SumCountAggregator:

public class SumCountAggregator extends BaseAggregator<SumCountAggregator.CountState> {

    static class CountState {
        long count = 0;
    }

    @Override
    public CountState init(Object batchId, TridentCollector collector) {
        return new CountState();
    }

    @Override
    public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector)            {
        state.count += 1;
    }

    @Override
    public void complete(CountState state, TridentCollector collector) {
        collector.emit(new Values(state.count));
    }

}
于 2014-01-24T23:38:38.430 回答