我使用Flink SQL执行如下语句时,报错如下:</p>
要求
根据字段对user_behavior_kafka_table中的数据进行分组user_id
,然后取出ts
每组中字段值最大的那条数据
执行sql
SELECT user_id,item_id,ts FROM user_behavior_kafka_table AS a
WHERE ts = (select max(b.ts)
FROM user_behavior_kafka_table AS b
WHERE a.user_id = b.user_id );
Flink 版本
1.11.2
错误信息
AppendStreamTableSink doesn't support consuming update changes which is produced by node Join(joinType=[InnerJoin], where=[((user_id = user_id0) AND (ts = EXPR$0))], select=[user_id, item_id, ts, user_id0, EXPR$0], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
作业部署
纱线上
表消息
- 来自消费者 kafka 主题的user_behavior_kafka_table 数据
{"user_id":"aaa","item_id":"11-222-333","comment":"aaa 访问项目在","ts":100}
{"user_id":"ccc","item_id":"11-222-334","comment":"ccc 访问项目在","ts":200}
{"user_id":"ccc","item_id":"11-222-333","comment":"ccc 访问项目在","ts":300}
{"user_id":"bbb","item_id":"11-222-334","comment":"bbb 访问项目在","ts":200}
{"user_id":"aaa","item_id":"11-222-333","comment":"aaa 访问项目在","ts":200}
{"user_id":"aaa","item_id":"11-222-334","comment":"aaa 访问项目在","ts":400}
{"user_id":"ccc","item_id":"11-222-333","comment":"ccc 访问项目在","ts":400}
{"user_id":"vvv","item_id":"11-222-334","comment":"vvv 访问项目在","ts":200}
{"user_id":"bbb","item_id":"11-222-333","comment":"bbb 访问项目在","ts":300}
{"user_id":"aaa","item_id":"11-222-334","comment":"aaa 访问项目在","ts":300}
{"user_id":"ccc","item_id":"11-222-333","comment":"ccc 访问项目在","ts":100}
{"user_id":"bbb","item_id":"11-222-334","comment":"bbb 访问项目在","ts":100}
- user_behavior_hive_table 预期结果
{"user_id":"aaa","item_id":"11-222-334","comment":"aaa 访问项目在","ts":400}
{"user_id":"bbb","item_id":"11-222-333","comment":"bbb 访问项目在","ts":300}
{"user_id":"ccc","item_id":"11-222-333","comment":"ccc 访问项目在","ts":400}
{"user_id":"vvv","item_id":"11-222-334","comment":"vvv 访问项目在","ts":200}