1

我想使用Session窗口聚合,然后Tumble在生成的结果之上运行窗口聚合Table API/Flink SQL

是否可以在第一次聚合后修改rowtime属性session以使其等于.rowtime会话中最后观察到的事件?

我正在尝试做这样的事情:

table
  .window(Session withGap 2.minutes on 'rowtime as 'w)
  .groupBy('w, 'userId)
  .select(
    'userId,
    ('w.end.cast(Types.LONG) - 'w.start.cast(Types.LONG)) as 'sessionDuration,
    ('w.rowtime - 2.minutes) as 'rowtime
  )
  .window(Tumble over 5.minutes on 'rowtime as 'w)
  .groupBy('w)
  .select(
    'w.start,
    'w.end,
    'sessionDuration.avg as 'avgSession,
    'sessionDuration.count as 'numberOfSession
  )

关键部分是:

('w.rowtime - 2.minutes) as 'rowtime

所以我想重新分配.rowtime会话中最新事件的记录,没有会话间隙(2.minutes在本例中)。

这在 BatchTable 中工作正常,但在 StreamTable 中不起作用:

Exception in thread "main" org.apache.flink.table.api.ValidationException: TumblingGroupWindow('w, 'rowtime, 300000.millis) is invalid: Tumbling window expects a time attribute for grouping in a stream environment.

是的,我知道,感觉就像我不想发明时间机器并改变时间的顺序。但实际上有可能以某种方式实现所描述的行为吗?

4

1 回答 1

0

不,很遗憾,您不能在当前版本 (1.6.0) 中使用 SQL 或 Table API 来做到这一点。一旦你修改了一个时间属性(rowtime 或 proctime),它就变成了一个常规TIMESTAMP属性并且失去了它特殊的时间特性。

对于行时间属性,原因是我们不能保证时间戳仍然与水印对齐。原则上,我们可以将水印延迟减去的时间间隔,但这还不支持。

于 2018-10-12T19:01:29.563 回答