1

当前的 Hazelcast Jet 0.6.1 代码示例演示了基于单个字段(例如ticker)的聚合。

这是一个参考。

\code-samples\streaming\stock-exchange\src\main\java\StockExchange.java

如何将其扩展到不止一个,例如ticker、traderId 等。

这是来自的当前示例代码 StockExchange.java

 private static Pipeline buildPipeline() {
    Pipeline p = Pipeline.create();

    p.drawFrom(Sources.<Trade, Integer, Trade>mapJournal(TRADES_MAP_NAME,
            alwaysTrue(), EventJournalMapEvent::getNewValue, START_FROM_CURRENT))
     .addTimestamps(Trade::getTime, 3000)
     .groupingKey(Trade::getTicker)
     .window(WindowDefinition.sliding(SLIDING_WINDOW_LENGTH_MILLIS, SLIDE_STEP_MILLIS))
     .aggregate(counting(),
             (winStart, winEnd, key, result) -> String.format("%s %5s %4d", toLocalTime(winEnd), key, result))
     .drainTo(Sinks.logger());

    return p;
}
4

2 回答 2

1

对于ticker & traderId,您可以使用:

.groupingKey(trade -> Tuple2.tuple2(trade.getTicker(), trade.getTraderId()))

通常,密钥可以通过任何可以正确执行的方式来equals实现hashCodeTuple2是两个值的通用容器。

于 2018-06-14T06:19:14.587 回答
0

我们还可以为分组提供逗号分隔的键。

.aggregate(AggregateOperations.groupingBy(data -> {
    StringBuilder stringBuilder = new StringBuilder();
    stringBuilder.append(StringUtils.defaultString(data.getSource1().get(dataValue) + "", "")).append(",");
    return stringBuilder.substring(0, stringBuilder.toString().length() - 1); 
}));
于 2021-06-01T12:42:44.460 回答