0

我正在尝试使用 Apache Beam(通过 Scio)运行来自流式源的最后 3 天数据(处理时间)的连续聚合,并每 5 分钟从最早活动窗口输出结果。Earliest表示开始时间最早的窗口,active表示窗口结束时间还没有过去。本质上,我试图通过删除滑动窗口之间的非重叠时段来获得“滚动”聚合。

我正在尝试使用大小为 3 天和周期为 1 天的示例滑动窗口来实现的可视化:

early firing -  ^       no firing - x
                |

                               ** stop firing from this window once time passes this point
          ^  ^  ^  ^  ^  ^  ^  ^      
          |  |  |  |  |  |  |  |      ** stop firing from this window once time passes this point
w1:       +====================+^  ^  ^
                  x x x x x x x |  |  |
w2:              +====================+^  ^  ^
                         x x x x x x x |  |  |
w3:                     +====================+

time: ----d1-----d2-----d3-----d4-----d5-----d6-----d7---->

我尝试过使用滑动窗口(大小=3 天,周期=5 分钟),但它们会在未来每 3 天/5 分钟组合生成一个新窗口,并且会为每个窗口发出早期结果。我尝试使用trigger = AfterWatermark.pastEndOfWindow(),但在工作刚开始时我需要早期结果。我尝试比较窗口之间的pane数据(isLast,timestamp等),但它们看起来相同。

我最近的尝试,这似乎有点 hack,包括将窗口信息附加到 DoFn 中的每个键,重新窗口到固定窗口,并尝试从附加数据中分组并减少到最旧的窗口,但最后reduceByKey没有'似乎没有输出任何东西。

DoFn 附加窗口信息

// ValueType is just a case class I'm using for objects

type DoFnT = DoFn[KV[String, ValueType], KV[String, (ValueType, Instant)]]

class Test extends DoFnT {
  // Window.toString looks like the following:
  // [2020-05-16T23:57:00.000Z..2020-05-17T00:02:00.000Z)
  def parseWindow(window: String): Instant = {
    Instant.parse(
      window
        .stripPrefix("[")
        .stripSuffix(")")
        .split("\\.\\.")(1))
  }

  @ProcessElement
  def process(
        context: DoFnT#ProcessContext,
        window: BoundedWindow): Unit = {
    context.output(
      KV.of(
        context.element().getKey,
        (context.element().getValue, parseWindow(window.toString))
      )
    )
  }
}
sc
  .pubsubSubscription(...)
  .keyBy(_.key)
  .withSlidingWindows(
    size = Duration.standardDays(3),
    period = Duration.standardMinutes(5),
    options = WindowOptions(
      accumulationMode = DISCARDING_FIRED_PANES,
      allowedLateness = Duration.ZERO,
      trigger = Repeatedly.forever(
        AfterWatermark.pastEndOfWindow()
          .withEarlyFirings(
            AfterProcessingTime
              .pastFirstElementInPane()
              .plusDelayOf(Duration.standardMinutes(1)))))))
  .reduceByKey(ValueType.combineFunction())
  .applyPerKeyDoFn(new Test())
  .withFixedWindows(
    duration = Duration.standardMinutes(5),
    options = WindowOptions(
      accumulationMode = DISCARDING_FIRED_PANES,
      trigger = AfterWatermark.pastEndOfWindow(),
      allowedLateness = Duration.ZERO))
  .reduceByKey((x, y) => if (x._2.isBefore(y._2)) x else y)
  .saveAsCustomOutput(
    TextIO.write()...
  )

有什么建议么?

4

1 回答 1

0

首先,关于处理时间:如果要根据处理时间进行窗口化,则应将事件时间设置为处理时间。这很好 - 这意味着您正在处理的事件是摄取记录的事件,而不是记录所代表的事件。

现在您可以使用现成的滑动窗口来获得您想要的聚合,按照您想要的方式进行分组。

但是你说得对,以你想要的方式触发有点让人头疼。触发器不容易表达到“输出最后 3 天的聚合,但仅在窗口结束后 5 分钟时开始”,更不能表达“从管道启动后的前 3 天,输出整个时间”。

我相信有状态的ParDo(DoFn)将是您的最佳选择。状态是按键和窗口划分的。由于您希望跨 3 天聚合进行交互,因此您需要DoFn在全局窗口中运行并自己管理聚合的分区。您标记了您的问题google-cloud-dataflow并且 Dataflow 不支持MapState,因此您将需要使用一个ValueState包含活动 3 天聚合的地图,根据需要启动新的聚合并在完成后删除旧的聚合。另外,您可以轻松地跟踪要从中定期输出的聚合,并拥有一个定期发出活动聚合的计时器回调。类似于以下伪Java;您可以翻译成 Scala 并插入您自己的类型:

DoFn<> {
  @StateId("activePeriod") StateSpec<ValueState<Period>> activePeriod = StateSpecs.value();

  @StateId("accumulators") StateSpec<ValueState<Map<Period, Accumulator>>> accumulators = StateSpecs.value();

  @TimerId("nextPeriod") TimerSpec nextPeriod = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @TimerId("output") TimerSpec outputTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);


  @ProcessElement
  public void process(
      @Element element,
      @TimerId("nextPeriod") Timer nextPeriod,
      @TimerId("output") Timer output,
      @StateId("activePeriod") ValueState<Period> activePeriod
      @StateId("accumulators") ValueState<Map<Period, Accumulator>> accumulators) {

    // Set nextPeriod if it isn't already running
    // Set output if it isn't already running
    // Set activePeriod if it isn't already set
    // Add the element to the appropriate accumulator
  }

  @OnTimer("nextPeriod")
  public void onNextPeriod(
      @TimerId("nextPeriod") Timer nextPeriod,
      @StateId("activePriod") ValueState<Period> activePeriod {

    // Set activePeriod to the next one
    // Clear the period we will never read again
    // Reset the timer (there's a one-time change in this logic after the first window; add a flag for this)
  }

  @OnTimer("output")
  public void onOutput(
      @TimerId("output") Timer output,
      @StateId("activePriod") ValueState<Period> activePeriod,
      @StateId("accumulators") ValueState<MapState<Period, Accumulator>> {

    // Output the current accumulator for the active period
    // Reset the timer
  }
}

我对此确实有一些保留意见,因为我们努力压制的输出与“替换”它们的输出无法相提并论。我会很高兴了解更多关于用例的信息。可能有一种更直接的方式来表达您感兴趣的结果。

于 2020-06-01T22:15:58.497 回答