我正在从 kafka 流中读取数据,创建 Table 环境并计算平均值并将数据写回 kafka [SIMPLECUSTOMER]。
这在 Flink 1.12.5 中有效。我正在使用 Flink 1.13.2 和 Flink 1.14.0
customerId 被读取为 Avro Generated Java 类中定义的 RAW('org.apache.avro.util.Utf8', '...')。在写回接收器时,我收到以下错误。
org.apache.flink.table.api.ValidationException:注册表'default_catalog.default_database.SIMPLECUSTOMER'的查询结果和接收器的列类型不匹配。
原因:位置 0 的接收器列“customerId”的类型不兼容。
查询架构:[customerId: RAW('org.apache.avro.util.Utf8', '...'), age: INT NOT NULL] 接收器架构:[customerId: STRING, age: INT]
接收器表架构:
TableResult sinkTable =
tableEnv.executeSql(
"CREATE TABLE SIMPLECUSTOMER (\n"
+ " `customerId` STRING, \n"
+ " `age` INT NOT NULL,\n"
+ " PRIMARY KEY (customerId) NOT ENFORCED\n"
+ ") WITH (\n"
+ " 'connector' = 'upsert-kafka',\n"
+ " 'topic' = 'simple-customer',\n"
+ " 'properties.bootstrap.servers' = 'localhost:9092',\n"
+ " 'properties.group.id' = 'testGroup',\n"
+ " 'value.format' = 'avro',\n"
+ " 'key.format' = 'raw')");
这是我的接收器代码
TableResult table3 =
tableEnv.executeSql(
"insert into SIMPLECUSTOMER select customerId, avg(age) as age from customer group by customerId ");
尝试将其转换为字符串,但这也不起作用
TableResult table3 =
tableEnv.executeSql(
"insert into SIMPLECUSTOMER select CAST(customerId as STRING), avg(age) as age from customer group by customerId ");
原因:org.apache.calcite.sql.validate.SqlValidatorException:Cast 函数无法将 RAW('org.apache.avro.util.Utf8', '...') 类型的值转换为 java 中的 VARCHAR(2147483647) 类型.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 在 java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) 在 java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl。 java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) 上的 newInstance(DelegatingConstructorAccessorImpl.java:45)