1

我正在尝试使用有状态的 DoFn(使用元素)在 Apache Beam(通过 Scio)中聚合(每个键)流数据@ProcessElement@StateId ValueState。我认为这最适合我要解决的问题。要求是:

  • 对于给定的键,记录在所有时间都被聚合(基本上求和) -我不关心以前计算的聚合,只关心最近的
  • 根据我控制的某些条件,密钥可能会被逐出状态 ( )state.clear()
  • 每 5 分钟,无论是否看到任何新密钥,都应输出所有尚未从状态中驱逐的密钥

鉴于这是一个流式管道并且将无限期地运行,使用combinePerKey具有累积触发窗格的全局窗口似乎会继续增加其内存占用以及它需要随时间运行的数据量,所以我想避免它。此外,在测试这一点时,(可能如预期的那样)它只是将新计算的聚合与历史输入一起附加到输出中,而不是使用每个键的最新值。

我的想法是,使用 StatefulDoFn 只会让我输出所有全局状态直到 now(),但这似乎不是一个简单的解决方案。我已经看到使用计时器为此人为执行回调的提示,以及可能使用缓慢增长的侧面输入映射(如何在创建 PCollectionView<Map<String,String>> 时解决重复值异常)并以某种方式刷新它,但这本质上需要迭代地图中的所有值,而不是加入地图。

我觉得我可能忽略了一些简单的东西来让它工作。我对 Beam 中的许多窗口和计时器概念相对较新,正在寻找有关如何解决此问题的任何建议。谢谢!

4

2 回答 2

1

你是对的,有状态的 DoFn 应该在这里帮助你。这是您可以做什么的基本草图。请注意,这仅输出没有密钥的总和。它可能不完全是您想要的,但它应该可以帮助您前进。

class CombiningEmittingFn extends DoFn<KV<Integer, Integer>, Integer> {

  @TimerId("emitter")
  private final TimerSpec emitterSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

  @StateId("done")
  private final StateSpec<ValueState<Boolean>> doneState = StateSpecs.value();

  @StateId("agg")
  private final StateSpec<CombiningState<Integer, int[], Integer>>
      aggSpec = StateSpecs.combining(
          Sum.ofIntegers().getAccumulatorCoder(null, VarIntCoder.of()), Sum.ofIntegers());

  @ProcessElement
  public void processElement(ProcessContext c,
      @StateId("agg") CombiningState<Integer, int[], Integer> aggState,
      @StateId("done") ValueState<Boolean> doneState,
      @TimerId("emitter") Timer emitterTimer) throws Exception {
        if (SOME CONDITION) {
          countValueState.clear();
          doneState.write(true);
        } else {
          countValueState.addAccum(c.element().getValue());
          emitterTimer.align(Duration.standardMinutes(5)).setRelative();
        }
      }
    }

  @OnTimer("emitter")
  public void onEmit(
      OnTimerContext context,
      @StateId("agg") CombiningState<Integer, int[], Integer> aggState,
      @StateId("done") ValueState<Boolean> doneState,
      @TimerId("emitter") Timer emitterTimer) {
      Boolean isDone = doneState.read();
      if (isDone != null && isDone) {
        return;
      } else {
        context.output(aggState.getAccum());
        // Set the timer to emit again
        emitterTimer.align(Duration.standardMinutes(5)).setRelative();
      }
    }
  }
  }

很高兴与您一起迭代一些可行的方法。

于 2020-05-07T01:15:46.060 回答
0

@Pablo 确实是正确的,StatefulDoFn 和计时器在这种情况下很有用。这是我能够开始工作的代码。

有状态的Do Fn

// DomainState is a custom case class I'm using
type DoFnT = DoFn[KV[String, DomainState], KV[String, DomainState]]

class StatefulDoFn extends DoFnT {

  @StateId("key")
  private val keySpec = StateSpecs.value[String]()

  @StateId("domainState")
  private val domainStateSpec = StateSpecs.value[DomainState]()

  @TimerId("loopingTimer")
  private val loopingTimer: TimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME)

  @ProcessElement
  def process(
      context: DoFnT#ProcessContext,
      @StateId("key") stateKey: ValueState[String],
      @StateId("domainState") stateValue: ValueState[DomainState],
      @TimerId("loopingTimer") loopingTimer: Timer): Unit = {

    ... logic to create key/value from potentially null values

    if (keepState(value)) {
      loopingTimer.align(Duration.standardMinutes(5)).setRelative()

      stateKey.write(key)
      stateValue.write(value)

      if (flushState(value)) {
        context.output(KV.of(key, value))
      }
    } else {
      stateValue.clear()
    }
  }

  @OnTimer("loopingTimer")
  def onLoopingTimer(
      context: DoFnT#OnTimerContext,
      @StateId("key") stateKey: ValueState[String],
      @StateId("domainState") stateValue: ValueState[DomainState],
      @TimerId("loopingTimer") loopingTimer: Timer): Unit = {

    ... logic to create key/value checking for nulls

    if (keepState(value)) {

      loopingTimer.align(Duration.standardMinutes(5)).setRelative()

      if (flushState(value)) {
        context.output(KV.of(key, value))
      }
    }
  }
}

带管道

sc
  .pubsubSubscription(...)
  .keyBy(...)
  .withGlobalWindow()
  .applyPerKeyDoFn(new StatefulDoFn())
  .withFixedWindows(
    duration = Duration.standardMinutes(5),
    options = WindowOptions(
      accumulationMode = DISCARDING_FIRED_PANES,
      trigger = AfterWatermark.pastEndOfWindow(),
      allowedLateness = Duration.ZERO,
      // Only take the latest per key during a window
      timestampCombiner = TimestampCombiner.END_OF_WINDOW
    ))
  .reduceByKey(mostRecentEvent())
  .saveAsCustomOutput(TextIO.write()...)
于 2020-05-14T21:57:13.727 回答