0

我正在向用户提供 Flink SQL 接口,所以我不能真正使用 Table 或 Java/Scala 接口。一切都需要在 SQL 中指定。不过,我可以解析 SQL 文件中的注释,并添加指定的临时较低级别的 API 指令。

一个用户如何转换,比如:

SELECT b, AVG(a) "average" FROM source_data GROUP BY b

name: average_source_data_retracting
  b STRING
  average NUMERIC

- 将值撤回到将附加它们的表单。此附加表单可能具有以下架构:

name: average_source_data_appending
  flag BOOLEAN <-- indicating an accumulate or retract message
  b STRING
  average NUMERIC

Aka 具有等同于 AppendStreamTableSink 的RetractStreamTableSink,但它不是一个接收器。

所有这一切都是为了能够使用 average_source_data_appending 创建一个Temporal 表(过滤撤回消息),但是这种表只接受 append-only 源表。

我考虑过使用 Windows(如在此处讨论),但我希望对临时表的更新是即时的。

4

1 回答 1

1

请忽略这个问题,显然时间表函数可以接受(对我而言)正在缩回的表。

大意是:

SELECT b, AVG(a) "average", MAX(proctime) max_proctime FROM source_data GROUP BY b

可以接受为以 b 为键,max_proctime 作为时间属性的临时表函数。我猜 MAX(proctime) 以某种方式让它认为新行被发出,当它们只被覆盖时?我想我需要更多时间来理解这一点。

编辑:

挖掘源代码,我们发现 Temporal Table Functions 似乎接受撤回定义,但前提是它处于处理时间:

时间ProcessTime JoinOperator.java

@Override
public void processElement2(StreamRecord<BaseRow> element) throws Exception {
    if (BaseRowUtil.isAccumulateMsg(element.getValue())) {
        rightState.update(element.getValue());
        registerProcessingCleanupTimer();
    } else {
        rightState.clear();
        cleanupLastTimer();
    }
}

时间行时间JoinOperator.java

@Override
public void processElement2(StreamRecord<BaseRow> element) throws Exception {
    ...
    checkNotRetraction(row);
    ...
}
private void checkNotRetraction(BaseRow row) {
    if (BaseRowUtil.isRetractMsg(row)) {
        String className = getClass().getSimpleName();
        throw new IllegalStateException(
            "Retractions are not supported by " + className +
                ". If this can happen it should be validated during planning!");
    }
}

这是无证的;我不知道这是否是永久性的,以及文档是否会更新。

于 2019-09-23T15:40:46.253 回答