2

我正在从 kafka 流中读取数据,创建 Table 环境并计算平均值并将数据写回 kafka [SIMPLECUSTOMER]。

这在 Flink 1.12.5 中有效。我正在使用 Flink 1.13.2 和 Flink 1.14.0

customerId 被读取为 Avro Generated Java 类中定义的 RAW('org.apache.avro.util.Utf8', '...')。在写回接收器时,我收到以下错误。

org.apache.flink.table.api.ValidationException:注册表'default_catalog.default_database.SIMPLECUSTOMER'的查询结果和接收器的列类型不匹配。

原因:位置 0 的接收器列“customerId”的类型不兼容。

查询架构:[customerId: RAW('org.apache.avro.util.Utf8', '...'), age: INT NOT NULL] 接收器架构:[customerId: STRING, age: INT]

接收器表架构:

    TableResult sinkTable =
    tableEnv.executeSql(
        "CREATE TABLE SIMPLECUSTOMER (\n"
            + "  `customerId` STRING, \n"
            + "  `age` INT NOT NULL,\n"
            + "   PRIMARY KEY (customerId) NOT ENFORCED\n"
            + ") WITH (\n"
            + "  'connector' = 'upsert-kafka',\n"
            + "  'topic' = 'simple-customer',\n"
            + "  'properties.bootstrap.servers' = 'localhost:9092',\n"
            + "  'properties.group.id' = 'testGroup',\n"
            + "  'value.format' = 'avro',\n"
            + "  'key.format' = 'raw')");

这是我的接收器代码

TableResult table3 =
        tableEnv.executeSql(
            "insert into SIMPLECUSTOMER  select customerId, avg(age) as age from customer group by customerId ");

尝试将其转换为字符串,但这也不起作用

 TableResult table3 =
    tableEnv.executeSql(
        "insert into SIMPLECUSTOMER  select CAST(customerId as STRING), avg(age) as age from customer group by customerId ");

原因:org.apache.calcite.sql.validate.SqlValidatorException:Cast 函数无法将 RAW('org.apache.avro.util.Utf8', '...') 类型的值转换为 java 中的 VARCHAR(2147483647) 类型.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 在 java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) 在 java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl。 java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) 上的 newInstance(DelegatingConstructorAccessorImpl.java:45)

4

0 回答 0