我遇到的问题是,我在 Dataflow 上运行了一个流式 scio 管道,该管道正在删除重复消息并按键执行一些计数。当我尝试排空管道时,我得到了大量的None.get
异常,据说是在我的重复数据删除步骤中抛出的(我的这个假设是基于我在堆栈驱动程序日志中观察到的标签)。
我们目前在 scio 版本 0.7.0-beta1 和 beam 版本 2.8.0 上运行。我已经尝试在我的代码中尽可能多地保护任何潜在的无,但这似乎发生在重复数据删除步骤内部的更深处。
我得到的错误如下:
"java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at com.spotify.scio.util.Functions$$anon$2.mergeAccumulators(Functions.scala:227)
at com.spotify.scio.util.Functions$$anon$2.mergeAccumulators(Functions.scala:220)
at org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillCombiningState.getAccum(WindmillStateInternals.java:958)
at org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillCombiningState.read(WindmillStateInternals.java:920)
at org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
at org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
at org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:768)
at org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:95)
at org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
at org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:135)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:45)
at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:50)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:202)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:160)
at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1226)
at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:141)
at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:965)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
正如你所看到的,这从来没有真正进入我的代码,我不确定我应该如何去寻找这个问题。也许它与“LateDataDroppingDoFnRunner”有关?我们允许的迟到相对较大(3 天,窗口为 1 小时)。
val input = PubsubIO.readStrings()
.fromSubscription(subscription)
.withTimestampAttribute("ts")
.withName("Window messages")
.withFixedWindows(
duration = windowSize,
options = WindowOptions(
trigger = AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(earlyFiring))
.withLateFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(lateFiring)),
accumulationMode = ACCUMULATING_FIRED_PANES,
allowedLateness = allowedLateness
)
)
.withName(s"Deduplicate messages")
.distinctBy[String](f = getId)
...
// I am being overly cautious here because I have been having
// so much trouble debugging this
def getId(message: Map[String, Any]): String = {
message match {
case null => {
logger.warn("message is null when getting id")
""
}
case message => {
message.get("id") match {
case None => {
logger.warn("id is null in message")
""
}
case id => id.get.toString
}
}
}
}
我很困惑我怎么可能在None.get
这里得到一个,为什么这只会在我排空时发生。
我可以就如何调试此错误或应该在哪里寻找一些建议吗?