我正在尝试使用 Apache Beam(通过 Scio)运行来自流式源的最后 3 天数据(处理时间)的连续聚合,并每 5 分钟从最早的活动窗口输出结果。Earliest表示开始时间最早的窗口,active表示窗口结束时间还没有过去。本质上,我试图通过删除滑动窗口之间的非重叠时段来获得“滚动”聚合。
我正在尝试使用大小为 3 天和周期为 1 天的示例滑动窗口来实现的可视化:
early firing - ^ no firing - x
|
** stop firing from this window once time passes this point
^ ^ ^ ^ ^ ^ ^ ^
| | | | | | | | ** stop firing from this window once time passes this point
w1: +====================+^ ^ ^
x x x x x x x | | |
w2: +====================+^ ^ ^
x x x x x x x | | |
w3: +====================+
time: ----d1-----d2-----d3-----d4-----d5-----d6-----d7---->
我尝试过使用滑动窗口(大小=3 天,周期=5 分钟),但它们会在未来每 3 天/5 分钟组合生成一个新窗口,并且会为每个窗口发出早期结果。我尝试使用trigger = AfterWatermark.pastEndOfWindow()
,但在工作刚开始时我需要早期结果。我尝试比较窗口之间的pane
数据(isLast
,timestamp
等),但它们看起来相同。
我最近的尝试,这似乎有点 hack,包括将窗口信息附加到 DoFn 中的每个键,重新窗口到固定窗口,并尝试从附加数据中分组并减少到最旧的窗口,但最后reduceByKey
没有'似乎没有输出任何东西。
DoFn 附加窗口信息
// ValueType is just a case class I'm using for objects
type DoFnT = DoFn[KV[String, ValueType], KV[String, (ValueType, Instant)]]
class Test extends DoFnT {
// Window.toString looks like the following:
// [2020-05-16T23:57:00.000Z..2020-05-17T00:02:00.000Z)
def parseWindow(window: String): Instant = {
Instant.parse(
window
.stripPrefix("[")
.stripSuffix(")")
.split("\\.\\.")(1))
}
@ProcessElement
def process(
context: DoFnT#ProcessContext,
window: BoundedWindow): Unit = {
context.output(
KV.of(
context.element().getKey,
(context.element().getValue, parseWindow(window.toString))
)
)
}
}
sc
.pubsubSubscription(...)
.keyBy(_.key)
.withSlidingWindows(
size = Duration.standardDays(3),
period = Duration.standardMinutes(5),
options = WindowOptions(
accumulationMode = DISCARDING_FIRED_PANES,
allowedLateness = Duration.ZERO,
trigger = Repeatedly.forever(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(1)))))))
.reduceByKey(ValueType.combineFunction())
.applyPerKeyDoFn(new Test())
.withFixedWindows(
duration = Duration.standardMinutes(5),
options = WindowOptions(
accumulationMode = DISCARDING_FIRED_PANES,
trigger = AfterWatermark.pastEndOfWindow(),
allowedLateness = Duration.ZERO))
.reduceByKey((x, y) => if (x._2.isBefore(y._2)) x else y)
.saveAsCustomOutput(
TextIO.write()...
)
有什么建议么?