使用 Flink 1.13.1 和一个 pyFlink 和一个用户定义的表聚合函数 (UDTAGG),并将 Hive 表作为源和接收器,我遇到了一个错误:
pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException:
Table sink 'myhive.mydb.flink_tmp_model' doesn't support consuming update changes
which is produced by node PythonGroupAggregate
这是接收器的 SQL CREATE TABLE
table_env.execute_sql(
"""
CREATE TABLE IF NOT EXISTS flink_tmp_model (
run_id STRING,
model_blob BINARY,
roc_auc FLOAT
) PARTITIONED BY (dt STRING) STORED AS parquet TBLPROPERTIES (
'sink.partition-commit.delay'='1 s',
'sink.partition-commit.policy.kind'='success-file'
)
"""
)
这里有什么问题?