0

我的 Flink 工作必须在每个工作班次后计算某个聚合。班次是可配置的,看起来像:

1st shift: 00:00am - 06:00am
2nd shift: 06:00am - 12:00pm
3rd shift: 12:00pm - 18:00pm

出于操作目的,每天的班次都是相同的,一周/一年中的天数之间没有区别。班次配置可以随时间变化并且可能是非单调的,因此这会在表格中留下一个微不足道的 EventTime 窗口,例如: TumblingEventTimeWindows.of(Time.of(6, HOURS))因为一些班次可能会缩小或跨越加班,或者可能会插入几个小时的休息时间。 ..

我想出了一些基于 GlobalWindow 和自定义触发器的东西:

LinkedList<Shift> shifts;

datastream.windowAll(GlobalWindows.create())
          .trigger(ShiftTrigger.create(shifts))
          .aggregate(myAggregateFunction)

在我的自定义触发器中,我尝试辨别传入事件是否超过了正在进行的工作班次的结束时间,并为班次触发窗口:

@Override
public TriggerResult onElement(T element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
    // compute the end time of the on-going shift
    final Instant currentShiftEnd = ...
    // fire window for the shift if the event passes the end line
    if (ShiftPredicate.of(currentShiftEnd).test(element)) {
        return TriggerResult.FIRE_AND_PURGE;
    }
    return TriggerResult.CONTINUE;
}

省略用于状态管理和一些记忆优化的代码,这似乎在流式用例中运行良好:在班次结束时间之后进入的第一个事件,触发最后班次的触发和聚合。

然而,作业可以在日期参数的范围内运行(例如:重新处理过去的时间段),或者由于一组预期的原因而过早关闭。当这种事情发生时,我观察到最后一个窗口没有被触发/刷新,

即:一天的最后一个班次在午夜结束,并且应该开始第二天的第一个班次。晚上 23:59 有一个活动,轮班即将结束。但是,该作业仅在今天运行,并在 00:00 完成。由于没有新元素到达自定义触发器并通过该行触发窗口触发,因此不会计算最后一次班次的聚合,但是,即使在下一次班次中没有发生任何事情或作业终止,仍然会出现一些部分结果在正在进行的转变中。

我读过这个的原因是:

Flink 保证只删除基于时间的窗口而不是其他类型的窗口,例如全局窗口(参见窗口分配器)

我已经查看了org.apache.flink.streaming.api.windowing包内部以寻找类似 aTumblingEventTimeWindows或我可以在一天DynamicEventTimeSessionWindows的结束时间使用或扩展的东西,这样我就可以在元素的水印时依赖这些触发的默认事件时间触发器超过了窗口限制,但我不知道该怎么做。直觉上,我希望得到类似的东西:

shifts.forEach(shift -> {
    datastream.windowAll(EventTimeWindow.fromTo(DAILY, shift.startTime, shift.endTime))
              .aggregate(myAggregateFunction);
});

我知道对于任意复杂的用例,有些人所做的是放弃 Windows API 以损害低级进程功能,他们在给定规则或它们适合并从定义的聚合函数或累加器中提取结果。同样在一个过程函数中,可以通过点击onClose钩子来确定任何未决的计算。

是否有一种方法可以通过扩展 Windows API 中的任何对象来获得每天特定时间的重复事件时间窗口的概念?

4

1 回答 1

1

如果我理解正确,这里有两个单独的问题/问题需要解决:

  1. 如何处理没有统一的窗口边界。
  2. 如何在不丢失最后一个窗口的结果的情况下终止作业。

对于 (1),您使用GlobalWindows自定义ShiftTrigger的方法是一种方法。如果您想探索使用流程函数的替代方案,我已经编写了一个示例,您可以在 Flink 文档中找到该示例。

对于更流畅的 API,您可以创建一个 custom WindowAssigner,然后可以利用内置EventTimeTrigger作为其默认触发器。为此,您需要实现WindowAssigner接口。

对于 (2),只要您依赖事件时间处理,就不会触发最后一组窗口,除非在作业终止之前到达足以关闭它们的水印。这通常要求您有一个事件,其时间戳在窗口结束之后足以创建一个足够大以触发窗口的水印(并且作业保持运行足够长的时间以发生这种情况)。

但是,当 Flink 意识到一个流式作业即将自然结束时,它会自动注入一个时间戳设置为 MAX_WATERMARK 的 Watermark,其作用是触发所有事件时间计时器,并关闭所有事件时间窗口。对于任何有界源,这都会自动发生。使用 Kafka(例如),您还可以通过让您的反序列化器从isEndOfStream.

处理此问题的另一种方法是避免在此类作业完成后取消它们,而是使用./bin/flink stop --drain [-p savepointPath] <jobID>干净地停止作业(使用保存点),同时耗尽所有剩余的窗口结果(通过注入最后一个大水印 (MAX_WATERMARK) )。

于 2020-11-06T19:28:59.157 回答