0

考虑一个代码:

import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

class Scratch {
  public static void main(String[] args) {

    StreamTableEnvironment tableEnv = /*some init code here*/;

    tableEnv.executeSql("CREATE TABLE my_table (\n" +
        "                                  id STRING,\n" +
        "                                  createdDate DATE,\n" +
        "                                  `date` STRING " +
        "                                ) PARTITIONED BY (`date`) \n" +
        "                                WITH (\n" +
        "                                  'connector' = 'filesystem',\n" +
        "                                  'path' = 's3://my-bucket/',\n" +
        "                                  'format' = 'json'\n" +
        "                                )");

    tableEnv.executeSql("CREATE TABLE output_table  (\n" +
        "  id STRING,\n" +
        "  created_date DATE,\n" +
        "  count_value BIGINT,\n" +
        "  PRIMARY KEY (id, created_date) NOT ENFORCED\n" +
        ") WITH (\n" +
        "   'connector' = 'filesystem', \n" +
        "   'path' = 's3://some-bucket/output-table/',\n" +
        "   'format' = 'json'\n" +
        " )");
    Table temp = tableEnv.sqlQuery(
        " SELECT id as id, " +
            " max(createdDate) as created_date, " +
            " COUNT(DISTINCT(id)) as count_value  " +
            " from my_table\n" +
            "    GROUP BY createdDate, id"
    );
    temp.executeInsert("output_table");

  }
}

这会给我错误:

org.apache.flink.client.program.ProgramInvocationException:主要方法导致错误:表接收器'default_catalog.default_database.output_table'不支持消费节点GroupAggregate产生的更新更改(select = [MIN($ f0) AS id, MAX(createdDate) AS created_date, COUNT(DISTINCT $f2) AS count_value ])

有没有办法通过 flink 将聚合写入 s3?(flink 以批处理模式运行)

4

1 回答 1

1

因为您正在以流模式运行查询,这需要一个可以处理来自聚合的更新和删除的接收器。

如果你要么

  • 以 CDC(更改日志)格式生成结果,例如 debezium,
  • 或以批处理模式运行作业

要以批处理模式运行,您可以执行以下操作:

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

EnvironmentSettings settings = EnvironmentSettings
    .newInstance()
    .inBatchMode()
    .build();

TableEnvironment tEnv = TableEnvironment.create(settings);

如果你需要在批量执行模式下使用 Table API,同时还可以访问 DataStream API,这只能从 Flink 1.14 开始。

于 2021-10-04T13:06:12.177 回答