0

我使用Flink SQL执行如下语句时,报错如下:</p>

要求

根据字段对user_behavior_kafka_table中的数据进行分组user_id,然后取出ts每组中字段值最大的那条数据

执行sql

SELECT user_id,item_id,ts FROM user_behavior_kafka_table AS a 
WHERE ts = (select max(b.ts) 
FROM user_behavior_kafka_table AS b 
WHERE a.user_id = b.user_id );

Flink 版本

1.11.2

错误信息

AppendStreamTableSink doesn't support consuming update changes which is produced by node Join(joinType=[InnerJoin], where=[((user_id = user_id0) AND (ts = EXPR$0))], select=[user_id, item_id, ts, user_id0, EXPR$0], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])

作业部署

纱线上

表消息

  • 来自消费者 kafka 主题的user_behavior_kafka_table 数据

{"user_id":"aaa","item_id":"11-222-333","comment":"aaa 访问项目在","ts":100}

{"user_id":"ccc","item_id":"11-222-334","comment":"ccc 访问项目在","ts":200}

{"user_id":"ccc","item_id":"11-222-333","comment":"ccc 访问项目在","ts":300}

{"user_id":"bbb","item_id":"11-222-334","comment":"bbb 访问项目在","ts":200}

{"user_id":"aaa","item_id":"11-222-333","comment":"aaa 访问项目在","ts":200}

{"user_id":"aaa","item_id":"11-222-334","comment":"aaa 访问项目在","ts":400}

{"user_id":"ccc","item_id":"11-222-333","comment":"ccc 访问项目在","ts":400}

{"user_id":"vvv","item_id":"11-222-334","comment":"vvv 访问项目在","ts":200}

{"user_id":"bbb","item_id":"11-222-333","comment":"bbb 访问项目在","ts":300}

{"user_id":"aaa","item_id":"11-222-334","comment":"aaa 访问项目在","ts":300}

{"user_id":"ccc","item_id":"11-222-333","comment":"ccc 访问项目在","ts":100}

{"user_id":"bbb","item_id":"11-222-334","comment":"bbb 访问项目在","ts":100}

  • user_behavior_hive_table 预期结果

{"user_id":"aaa","item_id":"11-222-334","comment":"aaa 访问项目在","ts":400}

{"user_id":"bbb","item_id":"11-222-333","comment":"bbb 访问项目在","ts":300}

{"user_id":"ccc","item_id":"11-222-333","comment":"ccc 访问项目在","ts":400}

{"user_id":"vvv","item_id":"11-222-334","comment":"vvv 访问项目在","ts":200}

4

1 回答 1

2

要从该查询中获得您期望的结果,它需要以批处理模式执行。作为流式查询,Flink SQL 规划器无法处理它,如果可以,它会产生一个结果流,其中每个结果的最后一个结果user_id将与预期结果匹配,但会有额外的中间结果。

例如,对于用户 aaa,将出现以下结果:

aaa 11-222-333 100
aaa 11-222-333 200
aaa 11-222-334 400

但是 ts=300 的行将被跳过,因为它从来都不是 ts 最大值的行。

如果您想使其在流模式下工作,请尝试将其重新格式化为top-n 查询

SELECT user_id, item_id, ts FROM
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY ts DESC) AS row_num
  FROM user_behavior_kafka_table)
WHERE row_num = 1;

我相信这应该可行,但我无法轻松测试它。

于 2021-06-01T09:45:44.490 回答