1

我正在使用如下所示的查询来跟踪特定字段的滚动总和:

SELECT id, SUM(quantity) AS quantity from stream \
WINDOW HOPPING (SIZE 1 MINUTE, ADVANCE BY 10 SECONDS) \
GROUP BY id;

现在,对于每个输入刻度,它似乎返回了 6 个不同的聚合值,我猜它们是针对以下时间段的:

[start, start+60] seconds
[start+10, start+60] seconds
[start+20, start+60] seconds
[start+30, start+60] seconds
[start+40, start+60] seconds
[start+50, start+60] seconds

如果我有兴趣只为每个进入的滴答获得 [start, start+60] 秒的结果。有没有办法只得到那个?

4

2 回答 2

1

因为您指定了一个跳跃窗口,所以每条记录都会落入多个窗口,并且在处理一条记录时需要更新所有窗口。只更新一个窗口是不正确的,结果也是错误的。

比较关于跳跃窗口的 Kafka Streams 文档(Kafka Streams 是 KSQL 的内部运行时引擎):https ://docs.confluent.io/current/streams/developer-guide/dsl-api.html#hopping-time-windows

更新

Kafka Streams 正在通过 KIP-450 ( https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL ) 添加适当的滑动窗口支持。这也应该允许稍后将滑动窗口添加到 ksqlDB。

于 2018-08-11T17:35:23.070 回答
0

我处于类似的情况并创建一个用户定义的函数来仅访问带有 collect_list(column).size() = window duration 的窗口似乎是一个有前途的轨道。

在 udf 中使用 List 类型来获取您的聚合基列值列表之一。然后评估形成的列表大小是否等于周期的跳跃窗口数,否则返回null。

从这里创建一个选择数据并使用 udf 转换它的表。

从这个最新的表中创建一个表并过滤掉转换列上的空值。

于 2019-10-16T14:04:46.227 回答