我的 Flink 工作必须在每个工作班次后计算某个聚合。班次是可配置的,看起来像:
1st shift: 00:00am - 06:00am
2nd shift: 06:00am - 12:00pm
3rd shift: 12:00pm - 18:00pm
出于操作目的,每天的班次都是相同的,一周/一年中的天数之间没有区别。班次配置可以随时间变化并且可能是非单调的,因此这会在表格中留下一个微不足道的 EventTime 窗口,例如:
TumblingEventTimeWindows.of(Time.of(6, HOURS))
因为一些班次可能会缩小或跨越加班,或者可能会插入几个小时的休息时间。 ..
我想出了一些基于 GlobalWindow 和自定义触发器的东西:
LinkedList<Shift> shifts;
datastream.windowAll(GlobalWindows.create())
.trigger(ShiftTrigger.create(shifts))
.aggregate(myAggregateFunction)
在我的自定义触发器中,我尝试辨别传入事件是否超过了正在进行的工作班次的结束时间,并为班次触发窗口:
@Override
public TriggerResult onElement(T element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
// compute the end time of the on-going shift
final Instant currentShiftEnd = ...
// fire window for the shift if the event passes the end line
if (ShiftPredicate.of(currentShiftEnd).test(element)) {
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
省略用于状态管理和一些记忆优化的代码,这似乎在流式用例中运行良好:在班次结束时间之后进入的第一个事件,触发最后班次的触发和聚合。
然而,作业可以在日期参数的范围内运行(例如:重新处理过去的时间段),或者由于一组预期的原因而过早关闭。当这种事情发生时,我观察到最后一个窗口没有被触发/刷新,
即:一天的最后一个班次在午夜结束,并且应该开始第二天的第一个班次。晚上 23:59 有一个活动,轮班即将结束。但是,该作业仅在今天运行,并在 00:00 完成。由于没有新元素到达自定义触发器并通过该行触发窗口触发,因此不会计算最后一次班次的聚合,但是,即使在下一次班次中没有发生任何事情或作业终止,仍然会出现一些部分结果在正在进行的转变中。
我读过这个的原因是:
Flink 保证只删除基于时间的窗口而不是其他类型的窗口,例如全局窗口(参见窗口分配器)
我已经查看了org.apache.flink.streaming.api.windowing
包内部以寻找类似 aTumblingEventTimeWindows
或我可以在一天DynamicEventTimeSessionWindows
的结束时间使用或扩展的东西,这样我就可以在元素的水印时依赖这些触发的默认事件时间触发器超过了窗口限制,但我不知道该怎么做。直觉上,我希望得到类似的东西:
shifts.forEach(shift -> {
datastream.windowAll(EventTimeWindow.fromTo(DAILY, shift.startTime, shift.endTime))
.aggregate(myAggregateFunction);
});
我知道对于任意复杂的用例,有些人所做的是放弃 Windows API 以损害低级进程功能,他们在给定规则或它们适合并从定义的聚合函数或累加器中提取结果。同样在一个过程函数中,可以通过点击onClose
钩子来确定任何未决的计算。
是否有一种方法可以通过扩展 Windows API 中的任何对象来获得每天特定时间的重复事件时间窗口的概念?