我正在尝试在 Dataflow 上使用 Beam 的有状态处理,但每次尝试输出数据时,我都会在日志中收到这些错误。结果是有状态的ParDo
+没有输出任何内容DoFn
:
16:45:56.948 CEST Proposing dynamic split of work unit myproject;2020-03-31_07_34_07-7523868393961495218;8536385410242733529 at {"fractionConsumed":0.5}
16:45:56.948 CEST Rejecting split request because custom reader returned null residual source.
编辑这似乎是巧合。在窗口触发之前,有状态的似乎ParDo
不输出任何元素。这个对吗?
此示例使用 Scio 复制错误.batchByKey
(它在后台使用有状态处理):
val create = Create.of(()).withCoder(CoderMaterializer.beam(sc, Coder[Unit]))
sc.customInput("Unit input", create)
.map(_ => println("STARTING"))
.applyTransform(ParDo.of(new Increasing)) // Outputs infinite stream of increasing numbers, one per second, prints each number to stdout
.keyBy(1 -> _)
.batchByKey(5)
.map {
case (key, vs) => vs.foreach(v => println(s"GOT batch with $v"))
}
sc.run()
final .map
,它只是一个带有单个输出的ParDo
+ ,永远不会运行。DoFn
在输出中,我看到五行递增的数字(从 开始new Increasing
),然后是上面的两条消息。这重复。
任何人都知道错误可能是什么?这似乎是源apache/beam/../WorkerCustomSources.java#L698