我正在尝试使用 Apache Beam Java SDK 加入 2 个无限源。加入时我收到以下错误消息。
线程“主”java.lang.UnsupportedOperationException 中的异常:当前仅支持非全局窗口加入无界 PCollections,其触发器已知每个窗口产生一次输出,例如允许延迟为零的默认触发器。在这些情况下,Beam 可以保证它在每个窗口中加入一次所有输入元素。WindowingStrategy{windowFn=org.apache.beam.sdk.transforms.windowing.SlidingWindows@1b87117, allowedLateness=PT0S, trigger=Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute)), 累积模式=DISCARDING_FIRED_PANES, timestampCombiner=EARLIEST org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel 的 org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel.verifySupportedTrigger(BeamJoinRel.java:341) 不支持。
我尝试使用固定和滑动窗口以及触发(pastEndOfWindow 和 pastFirstElementInPane),允许延迟为零。尝试了 Accumalate 和 Discard 烧制的窗格。我每次都收到相同的错误消息。
下面是我尝试使用固定和滑动窗口的 2 个片段。
p1.apply("window",
Window
.<Row>into(FixedWindows.of(Duration.standardSeconds(50)))
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes());
p1.apply("window2",
Window.<Row>into(
SlidingWindows
.of(Duration.standardSeconds(30))
.every(Duration.standardSeconds(5)))
.triggering(
Repeatedly
.forever(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(1))))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
我只是想实现一个带有滑动窗口的 sql 转换,延迟触发并允许延迟。请指导我完成它。
谢谢, 戈瑟姆