0

我有一个流式输入,比如股票价格数据(包括多只股票),我想每 1 分钟按价格进行一次排名。排名基于所有股票的最新价格,无论前1分钟是否更新,都需要对所有股票进行排序。我尝试在 flink 流 SQL 中使用 ORDER BY。

我未能实现我的逻辑,我对两个部分感到困惑:

  1. 为什么ORDER BY只能使用时间属性作为主要且仅支持ASC?如何按价格等其他类型执行订单?

  2. 下面的 SQL(来自 Flink 文档)是什么意思?没有窗口也没有窗口,所以我假设每个订单都会立即执行 SQL,在这种情况下,对一个元素进行排序看起来毫无意义。

[更新]:当我阅读 ProcimeSortProcessFunction.scala 的代码时,似乎 Flink 对接下来一毫秒内收到的元素进行了排序。

SELECT *
FROM Orders
ORDER BY orderTime

最后,有没有办法在 SQL 中实现我的逻辑?

4

1 回答 1

1

ORDER BY在流式查询中很难计算,因为当我们必须发出一个需要转到结果表开头的结果时,我们不想更新整个结果。因此,我们仅ORDER BY time-attribute在可以保证结果具有(大致)增加时间戳的情况下才支持。

在未来(Flink 1.6 或更高版本),我们还将支持一些查询,例如ORDER BY x ASC LIMIT 10,这将导致更新表,其中包含具有 10 个最小值的记录x

无论如何,您不能(轻松)使用GROUP BY滚动窗口计算每分钟的前 k 名排名。GROUP BY查询将组的记录(在 的情况下也是窗口GROUP BY TUMBLE(rtime, INTERVAL '1' MINUTE))聚合到单个记录中。所以每分钟不会有多条记录,而只有一条。

如果您希望查询以a每分钟计算前 10 个字段,您将需要一个类似于此的查询:

SELECT a, b, c 
FROM (
  SELECT 
    a, b, c, 
    RANK() OVER (ORDER BY a PARTITION BY CEIL(t TO MINUTE) BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as rank 
  FROM yourTable)
WHERE rank <= 10

但是,Flink(1.4 版本)尚不支持此类查询,因为子句中使用了时间属性,PARTITION BY而不是窗口的ORDER BY子句。OVER

于 2018-03-20T09:43:01.250 回答