我目前有一些使用聚合构建 KTable 的代码:
inputTopic.groupByKey().aggregate(
Aggregator::new,
(key, value, aggregate) -> {
someProcessingDoneHere;
return aggregate;
},
Materialized.with(Serdes.String(), Serdes.String())
);
一旦为单个键接收并聚合了给定数量的消息,我想将最新的聚合状态推送到另一个主题,然后删除表中的键。
我显然可以使用一个普通的 Kafka 生产者,并且有类似的东西:
inputTopic.groupByKey().aggregate(
Aggregator::new,
(key, value, aggregate) -> {
someProcessingDoneHere;
if (count > threshold) {
producer.send(new ProducerRecord<String,String>("output-topic",
key, aggregate));
return null;
}
return aggregate;
},
Materialized.with(Serdes.String(), Serdes.String())
);
但我正在寻找一种更“流”的方法。
有什么提示吗?