当前的 Hazelcast Jet 0.6.1 代码示例演示了基于单个字段(例如ticker)的聚合。
这是一个参考。
\code-samples\streaming\stock-exchange\src\main\java\StockExchange.java
如何将其扩展到不止一个,例如ticker、traderId 等。
这是来自的当前示例代码 StockExchange.java
private static Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Trade, Integer, Trade>mapJournal(TRADES_MAP_NAME,
alwaysTrue(), EventJournalMapEvent::getNewValue, START_FROM_CURRENT))
.addTimestamps(Trade::getTime, 3000)
.groupingKey(Trade::getTicker)
.window(WindowDefinition.sliding(SLIDING_WINDOW_LENGTH_MILLIS, SLIDE_STEP_MILLIS))
.aggregate(counting(),
(winStart, winEnd, key, result) -> String.format("%s %5s %4d", toLocalTime(winEnd), key, result))
.drainTo(Sinks.logger());
return p;
}