3

我在 Google Cloud DataFlow(带有 Scio SDK)上使用 Apache Beam 2.28.0。我有一个很大的输入PCollection(有界),我想将它限制/采样到固定数量的元素,但我想尽快开始下游处理。

目前,当我的输入PCollection有例如 20M 元素并且我想通过使用https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/transforms/Sample.html将其限制为 1M #any-long-

input.apply(Sample.<String>any(1000000))

它一直等到所有 20M 元素都被读取,这需要很长时间。

如何有效地将元素数量限制为固定大小并在达到限制后立即开始下游处理,丢弃其余的输入处理?

4

1 回答 1

2

好的,所以我最初的解决方案是像这样使用Stateful DoFn(我正在使用问题中提到的 Scio 的 Scala SDK):

import java.lang.{Long => JLong}

class MyLimitFn[T](limit: Long) extends DoFn[KV[String, T], KV[String, T]] {
  @StateId("count") private val count = StateSpecs.value[JLong]()

  @ProcessElement
  def processElement(context: DoFn[KV[String, T], KV[String, T]]#ProcessContext, @StateId("count") count: ValueState[JLong]): Unit = {
    val current = count.read()
    if(current < limit) {
      count.write(current + 1L)
      context.output(context.element())
    }
  }
}

此解决方案的缺点是我需要在使用之前将相同的键(例如空字符串)综合添加到所有元素。到目前为止,它比Sample.<>any().

我仍然期待看到更好/更有效的解决方案。

于 2021-06-08T13:40:35.703 回答