0

使用 Flink 1.13.1 和一个 pyFlink 和一个用户定义的表聚合函数 (UDTAGG),并将 Hive 表作为源和接收器,我遇到了一个错误:

pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException:
 Table sink 'myhive.mydb.flink_tmp_model' doesn't support consuming update changes 
 which is produced by node PythonGroupAggregate

这是接收器的 SQL CREATE TABLE

table_env.execute_sql(
    """
    CREATE TABLE IF NOT EXISTS flink_tmp_model (
        run_id STRING,
        model_blob BINARY,
        roc_auc FLOAT
    )  PARTITIONED BY (dt STRING) STORED AS parquet TBLPROPERTIES (
        'sink.partition-commit.delay'='1 s',
        'sink.partition-commit.policy.kind'='success-file'
    )
"""
)

这里有什么问题?

4

0 回答 0