例如,我有这些批量大小为 5 的元组,其中包含来自用户的印象:
Batch 1:
[UUID1, clientId1]
[UUID2, clientId1]
[UUID2, clientId1]
[UUID2, clientId1]
[UUID3, clientId2]
Batch 2:
[UUID4, clientId1]
[UUID5, clientId1]
[UUID5, clientId1]
[UUID6, clientId2]
[UUID6, clientId2]
这是我保存计数状态的示例:
TridentState ClientState = impressionStream
.groupBy(new Fields("clientId"))
.persistentAggregate(getCassandraStateFactory("users", "DataComputation",
"UserImpressionCounter"), new Count(), new Fields("count));
Stream ClientStream = ClientState.newValuesStream();
我有清晰的数据库并运行我的拓扑。按 clientId 对流进行分组后,我使用 persistentAggregate 函数和 Count 聚合器保存状态。第一批是 newValuesStream 方法后的结果:[clientId1, 4]
, [clientId2, 1]
。对于第二批:[clientId1, 7]
,[clientId2, 3]
正如预期的那样。
ClientStream 在几个分支中使用,在其中一个分支中,我需要处理元组以便拥有大小为 1 的批次,因为我需要有关每个元组的计数的信息。大小为 1 的批次显然是废话,所以我必须在更新计数器之前以某种方式找出计数器的先前状态,并使用元组发出此信息,那里已经更新了计数器,例如第二批[clientId1, 7, 4]
。
有人知道怎么做吗?