0

我正在尝试在 Dataflow 上使用 Beam 的有状态处理,但每次尝试输出数据时,我都会在日志中收到这些错误。结果是有状态的ParDo+没有输出任何内容DoFn

16:45:56.948 CEST Proposing dynamic split of work unit myproject;2020-03-31_07_34_07-7523868393961495218;8536385410242733529 at {"fractionConsumed":0.5}
16:45:56.948 CEST Rejecting split request because custom reader returned null residual source.

编辑这似乎是巧合。窗口触发之前,有状态的似乎ParDo不输出任何元素。这个对吗?

此示例使用 Scio 复制错误.batchByKey(它在后台使用有状态处理):

    val create = Create.of(()).withCoder(CoderMaterializer.beam(sc, Coder[Unit]))
    sc.customInput("Unit input", create)
      .map(_ => println("STARTING"))
      .applyTransform(ParDo.of(new Increasing)) // Outputs infinite stream of increasing numbers, one per second, prints each number to stdout
      .keyBy(1 -> _)
      .batchByKey(5)
      .map {
        case (key, vs) => vs.foreach(v => println(s"GOT batch with $v"))
      }
    sc.run()

final .map,它只是一个带有单个输出的ParDo+ ,永远不会运行。DoFn

在输出中,我看到五行递增的数字(从 开始new Increasing),然后是上面的两条消息。这重复。

任何人都知道错误可能是什么?这似乎是源apache/beam/../WorkerCustomSources.java#L698

4

1 回答 1

0

该错误仅意味着您在管道中使用的源无法拆分。因此,它与您的管道进度没有直接关系,但在这种情况下可能与此相关。我看到您使用的唯一来源是Create,您的问题可能是由于您Create使用空元组进行了初始化。可以尝试使用Create具有一个或多个元素的管道播种。

于 2020-03-31T16:02:57.840 回答