我正试图围绕 pyflink 数据流 api。我的用例如下:源是一个 kinesis 数据流,包含以下内容:
曲奇饼 | 簇 | 昏暗0 | 昏暗1 | 昏暗2 | 时间事件 |
---|---|---|---|---|---|
1 | 1 | 5 | 5 | 5 | 1分钟 |
1 | 2 | 1 | 0 | 6 | 30分钟 |
2 | 1 | 1 | 2 | 3 | 45分钟 |
1 | 1 | 10 | 10 | 15 | 70分钟 |
2 | 1 | 5 | 5 | 10 | 120分钟 |
我想创建一个间隔为 60 分钟的会话窗口聚合,计算每个 cookie 集群组合的平均值。窗口分配应该基于cookie,聚合基于cookie和cluster。
因此,结果将是这样的(每一行都被立即转发):
曲奇饼 | 簇 | 昏暗0 | 昏暗1 | 昏暗2 | 时间事件 |
---|---|---|---|---|---|
1 | 1 | 5 | 5 | 5 | 1分钟 |
1 | 2 | 1 | 0 | 6 | 30分钟 |
2 | 1 | 1 | 2 | 3 | 45分钟 |
1 | 1 | 7.5 | 7.5 | 10 | 70 分钟 |
2 | 1 | 5 | 5 | 10 | 120分钟 |
用 SQL 表示,对于新记录,我想执行此聚合:
INSERT INTO `input` (`cookie`, `cluster`, `dim0`, `dim1`, `dim2`, `time_event`) VALUES
("1", "1", 0, 0, 0, 125)
WITH RECURSIVE by_key AS (
SELECT *,
(time_event - lag(time_event) over (partition by cookie order by time_event)) as "time_passed"
FROM input
WHERE cookie = "1"
),
new_session AS (
SELECT *,
CASE WHEN time_passed > 60 THEN 1 ELSE 0 END as "new_session"
FROM by_key),
by_session AS (
SELECT *, SUM(new_session) OVER(partition by cookie order by time_event) as "session_number"
FROM new_session)
SELECT cookie, cluster, avg(dim0), avg(dim1), avg(dim2), max(time_event)
FROM by_session
WHERE cluster = "1"
GROUP BY session_number
ORDER BY session_number DESC
LIMIT 1
我尝试使用 table api 来完成此操作,但我需要在将新记录添加到 cookie-cluster 组合后立即更新结果。这是我第一个使用 flink 的项目,数据流 API 是一个完全不同的野兽,特别是因为 python 还没有包含很多东西。
我目前的方法如下所示:
- 从 kinesis 数据流创建表(数据流没有 kinesis 连接器)
- 将其转换为数据流以执行聚合。根据我的阅读,水印被传播并且生成的行对象包含列名,即我可以像处理 python 字典一样处理它们。请纠正我,如果我在这方面错了。
- 通过 cookie键入数据流。
- 带有自定义 SessionWindowsAssigner 的窗口,借用了 Table API。我正在写一个单独的帖子。
- 通过计算每个集群的平均值来处理窗口
table_env = StreamTableEnvironment.create(stream_env, environment_settings=env_settings)
table_env.execute_sql(
create_table(input_table_name, input_stream, input_region, stream_initpos)
)
ds = table_env.to_append_stream(input_table_name)
ds.key_by(lambda r: r["cookie"])\
.window(SessionWindowAssigner(session_gap=60, is_event_time=True)\
.trigger(OnElementTrigger()).\
.process(MeanWindowProcessFunction())
我对 ProcessWindowFunction 的基本想法是这样的:
class MeanWindowProcessFunction(ProcessWindowFunction[Dict, Dict, str, TimeWindow]):
def process(self,
key: str,
content: ProcessWindowFunction.Context,
elements: Iterable) -> Iterable[Dict]:
clusters = {}
cluster_records = {}
for element in inputs:
if element["cluster"] not in clusters:
clusters[element["cluster"]] = {key: val for key, val in element.as_dict().items()}
cluster_records[element["cluster"]] = 0
else:
for dim in range(3):
clusters[element["cluster"]][f"dim{dim}"] += element[f"dim{dim}"]
clusters[element["cluster"]]["time_event"] = element["time_event"]
cluster_records[element["cluster"]] += 1
for cluster in clusters.keys():
for dim in range(3):
clusters[cluster][f"dim{dim}"] /= cluster_records[cluster]
return clusters.values()
def clear(self, context: 'ProcessWindowFunction.Context') -> None:
pass
- 这是解决这个问题的正确方法吗?
- 我是否需要为 ProcessWindowFunction 考虑其他任何事情,例如实际实现 clear 方法?
我将非常感谢任何帮助,或者 pyflink 中任何更详细的窗口分析应用程序示例。谢谢!