0

我遇到的问题是,我在 Dataflow 上运行了一个流式 scio 管道,该管道正在删除重复消息并按键执行一些计数。当我尝试排空管道时,我得到了大量的None.get异常,据说是在我的重复数据删除步骤中抛出的(我的这个假设是基于我在堆栈驱动程序日志中观察到的标签)。

我们目前在 scio 版本 0.7.0-beta1 和 beam 版本 2.8.0 上运行。我已经尝试在我的代码中尽可能多地保护任何潜在的无,但这似乎发生在重复数据删除步骤内部的更深处。

我得到的错误如下:

"java.util.NoSuchElementException: None.get
    at scala.None$.get(Option.scala:347)
    at scala.None$.get(Option.scala:345)
    at com.spotify.scio.util.Functions$$anon$2.mergeAccumulators(Functions.scala:227)
    at com.spotify.scio.util.Functions$$anon$2.mergeAccumulators(Functions.scala:220)
    at org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillCombiningState.getAccum(WindmillStateInternals.java:958)
    at org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillCombiningState.read(WindmillStateInternals.java:920)
    at org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
    at org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
    at org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:768)
    at org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:95)
    at org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
    at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
    at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
    at org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
    at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:135)
    at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:45)
    at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:50)
    at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:202)
    at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:160)
    at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
    at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1226)
    at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:141)
    at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:965)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

正如你所看到的,这从来没有真正进入我的代码,我不确定我应该如何去寻找这个问题。也许它与“LateDataDroppingDoFnRunner”有关?我们允许的迟到相对较大(3 天,窗口为 1 小时)。

val input = PubsubIO.readStrings()
  .fromSubscription(subscription)
  .withTimestampAttribute("ts")
  .withName("Window messages")
    .withFixedWindows(
      duration = windowSize,
      options = WindowOptions(
        trigger = AfterWatermark.pastEndOfWindow()
          .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
            .plusDelayOf(earlyFiring))
          .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
            .plusDelayOf(lateFiring)),
        accumulationMode = ACCUMULATING_FIRED_PANES,
        allowedLateness = allowedLateness
      )
    )
  .withName(s"Deduplicate messages")
  .distinctBy[String](f = getId)

...
// I am being overly cautious here because I have been having
// so much trouble debugging this
def getId(message: Map[String, Any]): String = {
  message match {
    case null => {
     logger.warn("message is null when getting id")
      ""
    }
    case message => {
      message.get("id") match {
        case None => {
          logger.warn("id is null in message")
          ""
        }
        case id => id.get.toString
      }
    }
  }
}

我很困惑我怎么可能在None.get这里得到一个,为什么这只会在我排空时发生。

我可以就如何调试此错误或应该在哪里寻找一些建议吗?

4

0 回答 0