0

我正试图围绕 pyflink 数据流 api。我的用例如下:源是一个 kinesis 数据流,包含以下内容:

曲奇饼 昏暗0 昏暗1 昏暗2 时间事件
1 1 5 5 5 1分钟
1 2 1 0 6 30分钟
2 1 1 2 3 45分钟
1 1 10 10 15 70分钟
2 1 5 5 10 120分钟

我想创建一个间隔为 60 分钟的会话窗口聚合,计算每个 cookie 集群组合的平均值。窗口分配应该基于cookie,聚合基于cookie和cluster。

因此,结果将是这样的(每一行都被立即转发):

曲奇饼 昏暗0 昏暗1 昏暗2 时间事件
1 1 5 5 5 1分钟
1 2 1 0 6 30分钟
2 1 1 2 3 45分钟
1 1 7.5 7.5 10 70 分钟
2 1 5 5 10 120分钟

用 SQL 表示,对于新记录,我想执行此聚合:

INSERT INTO `input` (`cookie`, `cluster`, `dim0`, `dim1`, `dim2`, `time_event`) VALUES
    ("1", "1", 0, 0, 0, 125)

WITH RECURSIVE by_key AS (
    SELECT *,
    (time_event - lag(time_event) over (partition by cookie order by time_event)) as "time_passed"
    FROM input
    WHERE cookie = "1"
    ),
new_session AS (
    SELECT *, 
    CASE WHEN time_passed > 60 THEN 1 ELSE 0 END as "new_session"
    FROM by_key),
by_session AS (
    SELECT *, SUM(new_session) OVER(partition by cookie order by time_event) as "session_number"
    FROM new_session)
SELECT cookie, cluster, avg(dim0), avg(dim1), avg(dim2), max(time_event)
    FROM by_session
    WHERE cluster = "1"
    GROUP BY session_number
    ORDER BY session_number DESC
    LIMIT 1

我尝试使用 table api 来完成此操作,但我需要在将新记录添加到 cookie-cluster 组合后立即更新结果。这是我第一个使用 flink 的项目,数据流 API 是一个完全不同的野兽,特别是因为 python 还没有包含很多东西。

我目前的方法如下所示:

  1. 从 kinesis 数据流创建表(数据流没有 kinesis 连接器)
  2. 将其转换为数据流以执行聚合。根据我的阅读,水印被传播并且生成的行对象包含列名,即我可以像处理 python 字典一样处理它们。请纠正我,如果我在这方面错了。
  3. 通过 cookie键入数据流。
  4. 带有自定义 SessionWindowsAssigner 的窗口,借用了 Table API。我正在写一个单独的帖子
  5. 通过计算每个集群的平均值来处理窗口
table_env = StreamTableEnvironment.create(stream_env, environment_settings=env_settings)
table_env.execute_sql(
        create_table(input_table_name, input_stream, input_region, stream_initpos)
    )
ds = table_env.to_append_stream(input_table_name)
ds.key_by(lambda r: r["cookie"])\
  .window(SessionWindowAssigner(session_gap=60, is_event_time=True)\
  .trigger(OnElementTrigger()).\
  .process(MeanWindowProcessFunction())

我对 ProcessWindowFunction 的基本想法是这样的:

class MeanWindowProcessFunction(ProcessWindowFunction[Dict, Dict, str, TimeWindow]):

    def process(self,
                key: str,
                content: ProcessWindowFunction.Context,
                elements: Iterable) -> Iterable[Dict]:

            clusters = {}
            cluster_records = {}
            for element in inputs:
                if element["cluster"] not in clusters:
                    clusters[element["cluster"]] = {key: val for key, val in element.as_dict().items()}
                    cluster_records[element["cluster"]] = 0
                else:
                    for dim in range(3):
                        clusters[element["cluster"]][f"dim{dim}"] += element[f"dim{dim}"]
    
                clusters[element["cluster"]]["time_event"] = element["time_event"]
                cluster_records[element["cluster"]] += 1
    
            for cluster in clusters.keys():
                for dim in range(3):
                    clusters[cluster][f"dim{dim}"] /= cluster_records[cluster]

            return clusters.values()

    def clear(self, context: 'ProcessWindowFunction.Context') -> None:
        pass
  • 这是解决这个问题的正确方法吗?
  • 我是否需要为 ProcessWindowFunction 考虑其他任何事情,例如实际实现 clear 方法?

我将非常感谢任何帮助,或者 pyflink 中任何更详细的窗口分析应用程序示例。谢谢!

4

0 回答 0