0

我正在按照文档配置具有 rowtime 属性的 TableSource

我注册timestamp字段如下

KafkaTableSource source = Kafka08JsonTableSource.builder()// set Kafka topic
            .forTopic("alerting")
            // set Kafka consumer properties
            .withKafkaProperties(getKafkaProperties())
            // set Table schema
            .withSchema(TableSchema.builder()
                    .field("tenant", Types.STRING())
                    .field("message", Types.STRING())
                    .field("frequency", Types.LONG())
                    .field("timestamp", Types.SQL_TIMESTAMP()).build())
            .failOnMissingField(true)
            .withRowtimeAttribute(
                    // "timestamp" is rowtime attribute
                    "timestamp",
                    // value of "timestamp" is extracted from existing field with same name
                    new ExistingField("timestamp"),
                    // values of "timestamp" are at most out-of-order by 30 seconds
                    new BoundedOutOfOrderTimestamps(TimeUnit.DAYS.toMillis(1)))
            .build();

    //register the alerting topic as kafka
    tEnv.registerTableSource("kafka", source);

    Table results = tEnv.sqlQuery("SELECT tenant, message, SUM(frequency) " +
            "FROM kafka " +
            "GROUP BY HOP(rowtime, INTERVAL '1' SECOND, INTERVAL '5' SECOND), tenant, message");

    tEnv.toAppendStream(results, Row.class).print();

并得到以下错误:

线程“main”org.apache.flink.table.api.ValidationException 中的异常:SQL 验证失败。从第 1 行第 64 列到第 1 行第 70 列:在 org.apache.flink 的 org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:93) 的任何表中都找不到列 'rowtime'。 table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:561) at oracle.flink.demo.KafkaSQLStream.main(KafkaSQLStream.java:62) 原因:org.apache.calcite.runtime.CalciteContextException:从第 1 行第 64 列开始到第 1 行第 70 列:在 sun.reflect.DelegatingConstructorAccessorImpl 的 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) 的 sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 的任何表中找不到列“rowtime”。

4

1 回答 1

1

表中的字段kafka被调用timestamp而不是rowtime。因此,您应该使用按名称调用属性timestamp而不是rowtime.

请注意,这TIMESTAMP是 SQL 中的关键字,因此您应该重命名timestamp属性或使用反引号 (`) 转义属性名称:

tEnv.sqlQuery(
  "SELECT tenant, message, SUM(frequency) " +
  "FROM kafka " +
  "GROUP BY HOP(`timestamp`, INTERVAL '1' SECOND, INTERVAL '5' SECOND), tenant, message");

顺便提一句。BoundedOutOfOrderTimestamps一天其实也不少。这可能会导致显着的处理延迟和状态大小,因为查询将在开始发出结果和丢弃状态之前收集一天的数据。

于 2017-12-22T21:35:04.803 回答