0

我想在窗口化的 KTable 上做一些进一步的操作。为了提供一些背景知识,我有一个主题的数据形式为:{clientId, txTimestamp, txAmount}。从这个主题,我创建了一个流,由 clientId 分区,底层主题时间戳等于 txTimestamp 事件字段。从这个流开始,我想在每 1 小时窗口中聚合每个 clientId 的事务数。这是通过类似于以下内容完成的: CREATE TABLE transactions_per_client WITH (kafka_topic='transactions_per_client_topic') AS SELECT clientId, COUNT(*) AS transactions_per_client, WINDOWSTART AS window_start, WINDOWEND AS window_end FROM transactions_stream WINDOW TUMBLING (SIZE 1 HOURS) GROUP BY clientId;

聚合按预期工作并产生类似于以下内容的值:

客户 ID Transactions_per_client windows开始 窗口结束
1 12 1 2
2 8 1 2
1 24 2 3
1 19 3 4

我现在要做的是进一步处理此表以添加一个列,该列表示同一客户端的 2 个相邻窗口之间每个客户端的事务数差异。对于上一张表,这将是这样的:

客户 ID Transactions_per_client windows开始 窗口结束 偏差
1 12 1 2 0
2 8 1 2 0
1 24 2 3 12
1 19 3 4 -5

实现这一目标的最佳方法是什么(使用 kafka 流或 ksql)?我尝试使用用户定义的聚合函数来尝试创建此列,但它不能应用于 KTable,只能应用于 KStream。

4

0 回答 0