0

我有一个具有以下结构的 Flink 表:

Id1, Id2, myTimestamp, value

行时间基于myTimestamp.

我有以下处理效果很好:

Table processed = tableEnv.sqlQuery("SELECT " +
                "Id1, " +
                "MAX(myTimestamp) as myTimestamp, " +
                "SUM(value) as value " +
                "FROM MyTable " +
                "GROUP BY Id1, HOP(rowtime, INTERVAL 10 SECOND, INTERVAL 30 SECOND)");

我想修改之前的代码,比如对于每个窗口,我只使用每个Id2. 因此,我认为按以下方式更改代码会起作用:

Table processed = tableEnv.sqlQuery("SELECT " +
                "Id1, " +
                "MAX(myTimestamp) as myTimestamp, " +
                "SUM(value) as value " +
                "FROM MyTable, " + 
                "(SELECT Id2, MAX(myTimestamp) as latestTimestamp FROM MyTable GROUP BY Id2) as RecordsLatest" +
                "WHERE  MyTable.Id2 = RecordsLatest.Id2 AND MyTable.myTimestamp = RecordsLatest.myTimestamp" +
                "GROUP BY Id1, HOP(rowtime, INTERVAL 10 SECOND, INTERVAL 30 SECOND)");

但是当我这样做时,我收到以下错误:

Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
Please check the documentation for the set of currently supported SQL features.
    at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:387)
    at org.apache.flink.table.api.TableEnvironment.optimizePhysicalPlan(TableEnvironment.scala:302)
    at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:816)
    at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:351)
    at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:879)
    at org.apache.flink.table.api.Table.insertInto(table.scala:1126)

看起来 Flink 不“理解”我加入的两个表是同一个表。

我怎样才能做我想做的事?

4

1 回答 1

1

您的查询不起作用的原因很少。

SELECT 
  Id1, MAX(myTimestamp) as myTimestamp, SUM(value) as value 
FROM 
  MyTable, 
  (SELECT Id2, MAX(myTimestamp) as latestTimestamp 
   FROM MyTable 
   GROUP BY Id2
  ) as RecordsLatest
WHERE 
  MyTable.Id2 = RecordsLatest.Id2 
  AND MyTable.myTimestamp = RecordsLatest.myTimestamp
GROUP BY 
  Id1, HOP(rowtime, INTERVAL 10 SECOND, INTERVAL 30 SECOND)

有些是由于 Flink 的限制,有些是更基础的。

  1. latestTimestamp不再是行时间属性。这是因为,它是计算出来的。一旦您在表达式中使用 rowtime 属性(包括聚合函数,如MAX),它们就会失去其 rowtime 属性并成为常规TIMESTAMP属性。
  2. 内部查询生成一个更新其结果的动态表。它不是一个只追加的表。一旦Id2更改的最大时间戳,就需要撤回先前的结果行并插入新的结果行。
  3. 由于RecordsLatest是更新表(而不是仅追加表)并且latestTimestamp不是行时间属性,因此RecordsLatest和的连接MyTable是“常规连接”(而不是时间窗口连接),它也产生更新结果而不是追加- 只有结果。常规连接不能产生任何行时间属性,因为不能保证输出行的顺序(这是行时间属性的先决条件,因为它们需要与水印对齐)并且结果可能需要在将来删除它们。这导致您看到的错误消息。
  4. 外部查询的GROUP BY子句需要一个带有 rowtime 属性的只追加输入表rowtime。但是,连接的输出不是仅追加的,而是更新的,并且该rowtime属性不能是行时间属性,如前所述。

不幸的是,解决你的任务并不简单,但应该是可能的。

首先,您应该返回一个查询,该查询为每个 ( Id1, Id2) 窗口返回具有最大时间戳的行的值:

SELECT 
  Id1, Id2,
  MAX(myTimestamp) AS maxT
  ValOfMaxT(valX, myTimestamp) AS valXOfMaxT,
  HOP_ROWTIME(myTimestamp, INTERVAL '10' SECOND, INTERVAL '30' SECOND) AS rowtime
FROM
  MyTable
GROUP BY
  Id1, Id2, HOP(myTimestamp, INTERVAL '10' SECOND, INTERVAL '30' SECOND)

ValOfMaxT函数是一个用户定义的聚合函数,用于标识最大时间戳的值并将其返回。rowtime是新的行时间属性和窗口结束时间戳之前的 1ms。

给定这张表,我们称之为它,Temp您可以将下一个查询定义为:


SELECT
  Id1, MAX(maxT) as myTimestamp, SUM(valXOfMaxT)
FROM Temp
GROUP BY
  Id1, TUMBLE(rowtime, INTERVAL '10' SECONDS)

此查询仅对Id1和一个TUMBLE窗口进行分组。这是一个TUMBLE窗口,因为第一个HOP窗口已经将每条记录分组到三个窗口中,我们不应该再这样做了。相反,我们将第一个查询的结果分组到 10 秒的窗口中,因为这是第HOP一个查询中窗口的滑动长度。

于 2019-08-30T09:16:01.523 回答