0

我正在使用Java处理我的第一个Kafka Stream,基本上我正在尝试从一个主题中读取记录,然后将这些记录中的一些值合并到一条记录中以写入一个单独的主题,即批量处理一系列记录成一条记录。理想情况下,该批次将由记录大小或记录数控制,但此时很高兴让一个简单的示例正常工作!我在想 KStream.groupBy 函数将是解决方案的基础,但无法提出一个工作示例

4

1 回答 1

0

如果您有一个事件流并希望根据一个键将它们合并到一个列表中,您可以从下面的基本示例开始:

KTable<String, List<Object>> aggregatedMetrics = eventStream
        .selectKey((k,v)-> k // Pick your key here)
        .groupByKey()
        .aggregate(() -> ArrayList::new, 
         (key, value, aggregate) -> aggregate.add(value), arrayListSerde());
于 2019-01-21T20:52:23.760 回答