我们有一种情况,我认为 Kafka Streams 可以提供帮助,但我找不到任何说明如何的文档或示例。
我发现了一个类似的问题,但它没有任何实现建议:Kafka Streams wait function withdependent objects
我想做的事:
我想将来自 Kafka 主题的相关记录关联到一个对象中,并将该新对象发布到一个单独的输出主题。例如,可能有五个消息记录通过唯一键相互关联 - 我想从这些相关对象构建一个新对象,并将其生成到一个新主题。
我希望聚合一小时的滑动窗口内的所有相关事件。换句话说,一旦 ID 为“123”的消息 A 到达消费者,应用程序必须等待至少一个小时,以便 ID 为“123”的剩余记录到达。在所有记录到达或一小时过去后,这些记录将过期。
最后,一小时内收集的所有相关消息都用于创建一个新对象,然后将其发送到另一个 Kafka 主题。
我遇到的问题。
Kafka 中的滑动窗口似乎只在将两个流连接在一起时才起作用。我们将只有一个流连接到该主题 - 我不知道为什么需要两个流或我们将如何实现这一点。我在网上找不到任何这样的例子。我在 Kafka 中看到的所有流函数在收集相同键的事件时都只是简单地聚合/减少到一个简单的值。例如,一个键出现的次数或加起来一些值
这是一些伪代码来描述我在说什么。如果功能存在,则功能名称/语义将有所不同。
KStream<Key, Object> kstream = kStreamBuilder.stream(TOPIC);
kstream.windowedBy(
// One hour sliding Window
)
.collectAllRelatedKeys(
// Collect all Records related to each key
// map == HashMap<Key, ArrayList<Value>>
map.get(key).add(value);
)
.transformAndProcess(
if(ALL_EVENTS_COLLECTED) {
// Create new Object from all related records
newObject =
createNewObjectFromRelatedRecordsFunction(map.get(key));
producer.send(newObject);
}
)
问题(感谢您的帮助):
- 我怎么能在单个流中使用滑动窗口?
- 如何自定义 KStream/KTable 函数以收集时间窗口内的所有相关事件并将新对象生成到另一个主题?
- 确认/偏移管理如何与滑动窗口流一起使用?
- 这能保证 Exactly Once 交货吗?供参考:https ://www.confluent.io/blog/enabling-exactly-kafka-streams/