我想在窗口化的 KTable 上做一些进一步的操作。为了提供一些背景知识,我有一个主题的数据形式为:{clientId, txTimestamp, txAmount}
。从这个主题,我创建了一个流,由 clientId 分区,底层主题时间戳等于 txTimestamp 事件字段。从这个流开始,我想在每 1 小时窗口中聚合每个 clientId 的事务数。这是通过类似于以下内容完成的:
CREATE TABLE transactions_per_client WITH (kafka_topic='transactions_per_client_topic') AS SELECT clientId, COUNT(*) AS transactions_per_client, WINDOWSTART AS window_start, WINDOWEND AS window_end FROM transactions_stream WINDOW TUMBLING (SIZE 1 HOURS) GROUP BY clientId;
聚合按预期工作并产生类似于以下内容的值:
客户 ID | Transactions_per_client | windows开始 | 窗口结束 |
---|---|---|---|
1 | 12 | 1 | 2 |
2 | 8 | 1 | 2 |
1 | 24 | 2 | 3 |
1 | 19 | 3 | 4 |
我现在要做的是进一步处理此表以添加一个列,该列表示同一客户端的 2 个相邻窗口之间每个客户端的事务数差异。对于上一张表,这将是这样的:
客户 ID | Transactions_per_client | windows开始 | 窗口结束 | 偏差 |
---|---|---|---|---|
1 | 12 | 1 | 2 | 0 |
2 | 8 | 1 | 2 | 0 |
1 | 24 | 2 | 3 | 12 |
1 | 19 | 3 | 4 | -5 |
实现这一目标的最佳方法是什么(使用 kafka 流或 ksql)?我尝试使用用户定义的聚合函数来尝试创建此列,但它不能应用于 KTable,只能应用于 KStream。