1

我正在尝试使用 Apache Beam Java SDK 加入 2 个无限源。加入时我收到以下错误消息。

线程“主”java.lang.UnsupportedOperationException 中的异常:当前仅支持非全局窗口加入无界 PCollections,其触发器已知每个窗口产生一次输出,例如允许延迟为零的默认触发器。在这些情况下,Beam 可以保证它在每个窗口中加入一次所有输入元素。WindowingStrategy{windowFn=org.apache.beam.sdk.transforms.windowing.SlidingWindows@1b87117, allowedLateness=PT0S, trigger=Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute)), 累积模式=DISCARDING_FIRED_PANES, timestampCombiner=EARLIEST org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel 的 org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel.verifySupportedTrigger(BeamJoinRel.java:341) 不支持。

我尝试使用固定和滑动窗口以及触发(pastEndOfWindow 和 pastFirstElementInPane),允许延迟为零。尝试了 Accumalate 和 Discard 烧制的窗格。我每次都收到相同的错误消息。

下面是我尝试使用固定和滑动窗口的 2 个片段。

p1.apply("window",
    Window
      .<Row>into(FixedWindows.of(Duration.standardSeconds(50)))
      .triggering(AfterWatermark.pastEndOfWindow())
      .withAllowedLateness(Duration.ZERO)
      .accumulatingFiredPanes());
p1.apply("window2",
    Window.<Row>into(
        SlidingWindows
          .of(Duration.standardSeconds(30))
          .every(Duration.standardSeconds(5)))
      .triggering(
        Repeatedly
          .forever(
             AfterProcessingTime
               .pastFirstElementInPane()
               .plusDelayOf(Duration.standardMinutes(1))))
      .withAllowedLateness(Duration.ZERO)
      .discardingFiredPanes());

我只是想实现一个带有滑动窗口的 sql 转换,延迟触发并允许延迟。请指导我完成它。

谢谢, 戈瑟姆

4

2 回答 2

3

直到现在(2.13.0),BeamSQL 不支持使用非默认触发器的无界连接无界 PCollection。此类连接只允许使用默认触发器(因此每个窗口只会发出一个结果)。

主要原因是,在当前的 Beam Java SDK 实现中,在 Join 这样的情况下,缺少一种机制(称为收回和累积)来细化数据。

于 2019-07-08T17:51:57.300 回答
1

从评论中,如果我理解正确,所需的行为是:

  • 加入两个流;
  • 在现实世界时间每 30 秒发出一次结果;
  • 如果无法匹配到数据,则等待对应的匹配记录最长 30 分钟;
  • 30 分钟后删除记录;

基本上,这是两个流中最后 30 分钟数据的连续滑动匹配,结果每 30 秒发出一次。

好消息是它应该可以在 Beam Java 中实现(也可能在 Python 中)。坏消息在 Java 中可能不是微不足道的,我认为目前在 SQL 中根本不可能。

它可能会是什么样子:

  • 输入应该在全局窗口中;
  • 一个有状态的ParDo(或this),它通过将所有看到的元素存储在状态单元中来跟踪它们:
    • 您可能需要使用侧面输入CoGroupByKey预先应用 a 才能访问同一输入中的两个输入的元素ParDo
    • 侧输入并CoGroupByKey具有不同的语义,可能不容易使用;
  • 在每个输入上手动检查匹配记录的状态;
  • 要么立即发出结果,要么将它们保存在另一个状态单元中;
  • 一个计时器可以清除旧的不匹配记录:
    • 您可能需要手动跟踪时间戳和其他内容;
  • 如果需要,将所需的窗口/触发器应用于输出;

我建议你通读这个例子,它完成了你需要的定时器和状态部分(它等待匹配的记录,将不匹配的记录保持在状态,并在定时器触发时清除状态)并使用CoGroupByKey. 在理解了这个例子之后,你可能会更好地了解它是如何工作的。

于 2019-07-10T16:07:12.947 回答