我的 Kafka 主题包含由 deviceId 键入的状态。我想KStreamBuilder.stream().groupByKey().aggregate(...)
用来只保留状态的最新值TimeWindow
。我猜想,只要对主题进行 key 分区,聚合函数总能以这种方式返回最新的值:
(key, value, older_value) -> value
这是我可以从 Kafka Streams 获得的保证吗?我应该推出自己的处理方法来检查时间戳吗?
我的 Kafka 主题包含由 deviceId 键入的状态。我想KStreamBuilder.stream().groupByKey().aggregate(...)
用来只保留状态的最新值TimeWindow
。我猜想,只要对主题进行 key 分区,聚合函数总能以这种方式返回最新的值:
(key, value, older_value) -> value
这是我可以从 Kafka Streams 获得的保证吗?我应该推出自己的处理方法来检查时间戳吗?
Kafka Streams 保证按偏移量而不是按时间戳进行排序。因此,默认情况下,“最后更新获胜”策略基于偏移量而不是时间戳。迟到的记录(在时间戳上定义的“迟到”)基于时间戳是无序的,它们不会被重新排序以保持原始偏移量的顺序。
如果你想让你的窗口包含基于时间戳的最新值,你需要使用处理器 API (PAPI) 来完成这项工作。
在 Kafka Streams 的 DSL 中,您无法访问获得正确结果所需的记录时间戳。一种简单的方法可能是在记录(即它的值)本身添加一个.transform()
之前并将时间戳添加到记录本身。因此,您可以在您的(btw: a使用更简单的也可以代替).groupBy()
中使用时间戳。最后,您需要再次从值中删除时间戳。Aggregator
.reduce()
.aggregate()
.mapValues()
.aggregate()
使用 DSL 和 PAPI 的这种混合搭配方法应该可以简化您的代码,因为您可以使用 DSL 窗口支持并且KTable
不需要进行低级别的时间窗口和状态管理。
当然,您也可以在单个低级有状态处理器中完成所有这些操作,但我不推荐这样做。