下面的代码仅处理第一条消息到达并正确发布。但是之后不再处理任何消息(我在终端中使用kafka-console-consumer.bat来监控发布到total-amount-by-id的消息)
卡夫卡流:
KStream<String, String> totalAmount = builder.stream("data-consumed", Consumed.with(Serdes.String(), Serdes.String()));
totalAmount
.mapValues(v -> Integer.valueOf(v))
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMillis(100)))
.aggregate(
() -> new Integer(0),
(key, value, aggregate) -> {
System.out.println("value: "+value);
System.out.println("aggregate: "+aggregate);
return value+aggregate;
},,
Materialized.with(Serdes.String(), Serdes.Integer())
)
.toStream()
.map(((key, aggregate) -> new KeyValue<>(key.key(), aggregate)))
.to("total-amount-by-id", Produced.with(Serdes.String(), Serdes.Integer()));
测试:
- 我每 100 毫秒发布 1 条消息,始终使用相同的键,主题“数据消耗”
- 发布到“数据消耗”主题的前四个 (k,v) 是:(1,1),(1,2),(1,4),(1,1)
- Kafka 流发布 (1,1) 到“total-amount-by-id”,但之后没有其他内容
- 上面代码中的 System.out.println() 仅打印:值:1 聚合:0 值:2 聚合:0
任何猜测这个问题背后的原因?
*我期望第二个聚合等于 1(聚合:1)