6

我的 Kafka 主题包含由 deviceId 键入的状态。我想KStreamBuilder.stream().groupByKey().aggregate(...)用来只保留状态的最新值TimeWindow。我猜想,只要对主题进行 key 分区,聚合函数总能以这种方式返回最新的值:

(key, value, older_value) -> value

这是我可以从 Kafka Streams 获得的保证吗?我应该推出自己的处理方法来检查时间戳吗?

4

1 回答 1

11

Kafka Streams 保证按偏移量而不是按时间戳进行排序。因此,默认情况下,“最后更新获胜”策略基于偏移量而不是时间戳。迟到的记录(在时间戳上定义的“迟到”)基于时间戳是无序的,它们不会被重新排序以保持原始偏移量的顺序。

如果你想让你的窗口包含基于时间戳的最新值,你需要使用处理器 API (PAPI) 来完成这项工作。

在 Kafka Streams 的 DSL 中,您无法访问获得正确结果所需的记录时间戳。一种简单的方法可能是在记录(即它的值)本身添加一个.transform()之前并将时间戳添加到记录本身。因此,您可以在您的(btw: a使用更简单的也可以代替).groupBy()中使用时间戳。最后,您需要再次从值中删除时间戳。Aggregator.reduce().aggregate().mapValues().aggregate()

使用 DSL 和 PAPI 的这种混合搭配方法应该可以简化您的代码,因为您可以使用 DSL 窗口支持并且KTable不需要进行低级别的时间窗口和状态管理。

当然,您也可以在单个低级有状态处理器中完成所有这些操作,但我不推荐这样做。

于 2017-01-09T16:38:13.233 回答