我试图在 Flink SQL 的跳跃窗口上做一个指数衰减的移动平均线。我需要有权访问窗口的边界之一,即以下 HOP_START:
SELECT
lb_index one_key,
-- I have access to this one:
HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND) start_time,
-- Aggregation primitive:
SUM(
Y * EXP(TIMESTAMPDIFF(
SECOND,
proctime,
-- This one throws:
HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)
)))
FROM write_position
GROUP BY lb_index, HOP(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)
我得到以下堆栈跟踪:
11:55:37.011 [main] DEBUG o.a.c.p.RelOptPlanner - For final plan, using Aggregate(groupBy: (lb_index), window: (SlidingGroupWindow('w$, 'proctime, 5000.millis, 50.millis)), select: (lb_index, SUM($f2) AS Y, start('w$) AS w$start, end('w$) AS w$end, proctime('w$) AS w$proctime))
11:55:37.011 [main] DEBUG o.a.c.p.RelOptPlanner - For final plan, using Calc(select: (lb_index, proctime, *(payload.Y, EXP(/(CAST(/INT(Reinterpret(-(HOP_START(PROCTIME(proctime), 50, 5000), PROCTIME(proctime))), 1000)), 1000))) AS $f2))
11:55:37.011 [main] DEBUG o.a.c.p.RelOptPlanner - For final plan, using rel#459:DataStreamScan.DATASTREAM.true.Acc(table=[_DataStreamTable_0])
Exception in thread "main" org.apache.flink.table.codegen.CodeGenException: Unsupported call: HOP_START
If you think this function should be supported, you can create an issue and start a discussion for it.
at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1027)
at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1027)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:1027)
at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
它确实说它在聚合 SUM 之外工作时是否未实现。所以这就是让我认为这是一个范围界定问题的原因。
现在,问题是:我可以转换这个表达式并在聚合之外进行最终处理,如 exp(x+y) = exp(x)*exp(y); 但我坚持使用 TIMESTAMPDIFF (这在我的上一期中创造了奇迹)。我还没有找到将 TIME ATTRIBUTE 转换为 NUMERIC 类型的方法;此外,即使我缩小它们,我也不喜欢对 UNIX 时间戳求幂。
无论如何,这种解决方法会有点笨拙,我可能还有另一种方法。我不知道如何在这个 SQL 片段中按摩作用域,使其仍然“处于”窗口作用域中,并且在不抛出的情况下获得开始时间。