0

我正在为一个项目使用 PyFlink 1.13,我正在尝试执行以下操作:

  • 从消息包含 UserId 的 Kafka 主题中读取数据
  • 对数据执行超过 2 秒的翻转窗口
  • 用我的 windows 值调用 Python UDF

这是我试图实现的数据流的可视化表示: 在此处输入图像描述

我正在使用 PyFlink 的 Table API,并且我的两个表都是使用 SQL DDL 声明的。

我的查询执行如下所示:

SELECT UserId, Timestamp, my_udf(Data) AS Result,
FROM InputTable 
GROUP BY TUMBLE(Timestamp, interval 2 SECONDS), UserId, Data

这是我的 Python UDF 函数:

@udf(input_types=SOME_INPUT_TYPE, result_type=SOME_OUTPUT_TYPE)
def my_udf(window_data):
    
    # ...business logic here with window_data
    
    return some_result

我目前的问题是,由于某种原因,该my_udf函数分别接收每一行,因此在上面的示例中将被调用 4 次而不是 2 次。

我一直在研究 PyFlink 文档,但我无法找到如何实现我想要的。

该信息可能在文档中,但似乎我无法找到/理解它。

任何帮助,将不胜感激。

谢谢 !

4

1 回答 1

0

如果我正确理解您要执行的操作,您希望修改您的查询,使其Data列或Timestamp

SELECT UserId, TUMBLE_END(Timestamp, interval '2' SECONDS), my_udf(Data) AS Result,
FROM InputTable 
GROUP BY TUMBLE(Timestamp, interval '2' SECONDS), UserId

然后你想实现一个用户定义的聚合函数,它将给定用户窗口中所有行的数据列的值聚合为单个值。我在上面链接到的文档中有一个示例。

于 2021-10-29T19:53:48.647 回答