我想对整个数据集做一个简单的最大值。我从 Kafka 示例开始:https ://github.com/hazelcast/hazelcast-jet-code-samples/blob/0.7-maintenance/kafka/src/main/java/avro/KafkaAvroSource.java
我刚刚将管道更改为:
p.drawFrom(KafkaSources.<Integer, User>kafka(brokerProperties(), TOPIC))
.map(Map.Entry::getValue)
.rollingAggregate(minBy(comparingInt(user -> (Integer) user.get(2))))
.map(user -> (Integer) user.get(2))
.drainTo(Sinks.list("result"));
然后去:
IListJet<Integer> res = jet.getList("result");
SECONDS.sleep(10);
System.out.println(res.get(0));
SECONDS.sleep(15);
System.out.println(res.get(0));
cancel(job);
获得话题中年龄最大的人。然而,它不返回 20 并且似乎在不同的运行中返回不同的值。知道为什么吗?