3

下面的代码仅处理第一条消息到达并正确发布。但是之后不再处理任何消息(我在终端中使用kafka-console-consumer.bat来监控发布到total-amount-by-id的消息)

卡夫卡流

KStream<String, String> totalAmount = builder.stream("data-consumed", Consumed.with(Serdes.String(), Serdes.String()));


totalAmount
  .mapValues(v -> Integer.valueOf(v))
  .groupByKey()
  .windowedBy(TimeWindows.of(Duration.ofMillis(100)))
  .aggregate(
        () -> new Integer(0),
        (key, value, aggregate) -> {
                        System.out.println("value: "+value);
                        System.out.println("aggregate: "+aggregate);
                        return value+aggregate;
                    },,
        Materialized.with(Serdes.String(), Serdes.Integer())
  )
  .toStream()
  .map(((key, aggregate) -> new KeyValue<>(key.key(), aggregate)))
  .to("total-amount-by-id", Produced.with(Serdes.String(), Serdes.Integer()));

测试

  • 我每 100 毫秒发布 1 条消息,始终使用相同的键,主题“数据消耗”
  • 发布到“数据消耗”主题的前四个 (k,v) 是:(1,1),(1,2),(1,4),(1,1)
  • Kafka 流发布 (1,1) 到“total-amount-by-id”,但之后没有其他内容
  • 上面代码中的 System.out.println() 仅打印:值:1 聚合:0 值:2 聚合:0

任何猜测这个问题背后的原因?

*我期望第二个聚合等于 1(聚合:1)

4

0 回答 0