我正在为一个项目使用 PyFlink 1.13,我正在尝试执行以下操作:
- 从消息包含 UserId 的 Kafka 主题中读取数据
- 对数据执行超过 2 秒的翻转窗口
- 用我的 windows 值调用 Python UDF
我正在使用 PyFlink 的 Table API,并且我的两个表都是使用 SQL DDL 声明的。
我的查询执行如下所示:
SELECT UserId, Timestamp, my_udf(Data) AS Result,
FROM InputTable
GROUP BY TUMBLE(Timestamp, interval 2 SECONDS), UserId, Data
这是我的 Python UDF 函数:
@udf(input_types=SOME_INPUT_TYPE, result_type=SOME_OUTPUT_TYPE)
def my_udf(window_data):
# ...business logic here with window_data
return some_result
我目前的问题是,由于某种原因,该my_udf
函数分别接收每一行,因此在上面的示例中将被调用 4 次而不是 2 次。
我一直在研究 PyFlink 文档,但我无法找到如何实现我想要的。
该信息可能在文档中,但似乎我无法找到/理解它。
任何帮助,将不胜感激。
谢谢 !