我正在按照文档配置具有 rowtime 属性的 TableSource。
我注册timestamp
字段如下
KafkaTableSource source = Kafka08JsonTableSource.builder()// set Kafka topic
.forTopic("alerting")
// set Kafka consumer properties
.withKafkaProperties(getKafkaProperties())
// set Table schema
.withSchema(TableSchema.builder()
.field("tenant", Types.STRING())
.field("message", Types.STRING())
.field("frequency", Types.LONG())
.field("timestamp", Types.SQL_TIMESTAMP()).build())
.failOnMissingField(true)
.withRowtimeAttribute(
// "timestamp" is rowtime attribute
"timestamp",
// value of "timestamp" is extracted from existing field with same name
new ExistingField("timestamp"),
// values of "timestamp" are at most out-of-order by 30 seconds
new BoundedOutOfOrderTimestamps(TimeUnit.DAYS.toMillis(1)))
.build();
//register the alerting topic as kafka
tEnv.registerTableSource("kafka", source);
Table results = tEnv.sqlQuery("SELECT tenant, message, SUM(frequency) " +
"FROM kafka " +
"GROUP BY HOP(rowtime, INTERVAL '1' SECOND, INTERVAL '5' SECOND), tenant, message");
tEnv.toAppendStream(results, Row.class).print();
并得到以下错误:
线程“main”org.apache.flink.table.api.ValidationException 中的异常:SQL 验证失败。从第 1 行第 64 列到第 1 行第 70 列:在 org.apache.flink 的 org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:93) 的任何表中都找不到列 'rowtime'。 table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:561) at oracle.flink.demo.KafkaSQLStream.main(KafkaSQLStream.java:62) 原因:org.apache.calcite.runtime.CalciteContextException:从第 1 行第 64 列开始到第 1 行第 70 列:在 sun.reflect.DelegatingConstructorAccessorImpl 的 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) 的 sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 的任何表中找不到列“rowtime”。