1

我们有一个用例,我们从 kafka 接收需要聚合的消息。这必须以某种方式聚合,如果更新出现在相同的 id 上,那么如果需要减去现有值,则必须添加新值。

从各种论坛我了解到,jet 不存储原始值,而是汇总结果和一些内部数据。

在这种情况下,我该如何实现?

例子

Balance 1 {id:1, amount:100} // aggregated result 100
Balance 2 {id:2, amount:200} // 300
Balance 3 {id:1, amount:400} // 600 after removing 100 and adding 400

我可以在每次添加时实现一个简单的使用。但是我无法实现需要减去现有值并添加新值的聚合。

rollingAggregation(AggregatorOperations.summingDouble(<login to add remove>))
    .drainTo(Sinks.logger()).
  1. 余额 1,2,3 是消息序列
  2. 注释显示了 jet 执行的每条消息的聚合值。
  3. 我的目标是添加新金额(如果 id 是第一次出现)并在更新的余额出现时减去金额,即 Id 与之前的相同。
4

1 回答 1

2

您可以尝试自定义聚合操作,该操作将发出以前和当前看到的值,如下所示:

public static <T> AggregateOperation1<T, ?, Tuple2<T, T>> previousAndCurrent() {
    return AggregateOperation
            .withCreate(() -> new Object[2])
            .<T>andAccumulate((acc, current) -> {
                acc[0] = acc[1];
                acc[1] = current;
            })
            .andExportFinish((acc) -> tuple2((T) acc[0], (T) acc[1]));
}

输出应该是一个 Tuple 的形式(previous, current)。然后,您可以再次将滚动聚合应用于输出。为了将问题简化为输入,我有一(id, amount)对。

Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Integer, Long>mapJournal("map", START_FROM_OLDEST)) // (id, amount)
        .groupingKey(Entry::getKey)
        .rollingAggregate(previousAndCurrent(), (key, val) -> val)
        .rollingAggregate(AggregateOperations.summingLong(e -> {
            long prevValue = e.f0() == null ? 0 : e.f0().getValue();
            long newValue = e.f1().getValue();
            return newValue - prevValue;
        }))
        .drainTo(Sinks.logger());

JetConfig config = new JetConfig();
config.getHazelcastConfig().addEventJournalConfig(new EventJournalConfig().setMapName("map"));
JetInstance jet = Jet.newJetInstance(config);

IMapJet<Object, Object> map = jet.getMap("map");

map.put(0, 1L);
map.put(0, 2L);
map.put(1, 10L);
map.put(1, 40L);

jet.newJob(p).join();

这应该作为输出产生:1, 2, 12, 42.

于 2019-01-31T13:20:32.940 回答