0

我在一个 flink 集群上运行一个流束作业,我得到了以下异常。

Caused by: org.apache.beam.sdk.util.UserCodeException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
        at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
        at org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source)
        at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:218)
        at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:183)
        at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
        at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:544)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
        at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:941)
        at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:895)
        at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:252)
        at org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:74)
        at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:576)
        at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:71)
        at org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)
Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Expect srcResourceIds and destResourceIds have the same scheme, but received alluxio, file.
        at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
        at org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn$DoFnInvoker.invokeProcessElement(Unknown Source)
        at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:218)
        at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:183)
        at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
        at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:544)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
        at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:941)
        at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:895)
        at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:252)
        at org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:74)
        at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:576)
        at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:71)
        at org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)
        at org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source)
        at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:218)
        at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:183)
        at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
        at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:544)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Expect srcResourceIds and destResourceIds have the same scheme, but received alluxio, file.
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:141)
        at org.apache.beam.sdk.io.FileSystems.validateSrcDestLists(FileSystems.java:428)
        at org.apache.beam.sdk.io.FileSystems.rename(FileSystems.java:308)
        at org.apache.beam.sdk.io.FileBasedSink$WriteOperation.moveToOutputFiles(FileBasedSink.java:755)
        at org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn.process(WriteFiles.java:850)

流式作业是从 apache pulsar 源获取数据,并将输出数据以 parquet 文件格式写入 Alluxio 数据湖。我正在使用 Spotify 的 scio 在 Scala 中编写这份工作。一个小代码块来强调我想要实现的目标:

    pulsarSource
      .open(sc)
      .withFixedWindows(Duration.standardSeconds(windowDuration))
      .toSinkTap(sink)

从异常中,我可以看到源路径和输出路径应该具有相同的 URI 方案,但我不知道它是如何发生的,因为我使用 alluxio 路径作为输出目录。在 alluxio 输出目录上创建了一些临时目录,但是在WindowDuration创建输出文件时,会发生此异常。我怀疑临时位置可能默认配置为本地文件系统,所以我确实将其设置为输出目录路径(alluxio dir 路径),但它没有改变任何东西。

sc.options.setTempLocation(outputDir)

我想进行远程调试以找出问题所在。我已尝试使用此文档在任务执行程序节点上进行远程调试,但是一旦我的 IntelliJ IDE 与该节点连接,我的断点就不会受到影响。

有人可以建议,我如何调试或获取有关此问题的更多信息。谢谢

4

1 回答 1

0

远程调试可能相当困难,但让我们先尝试一下:确保连接到任务管理器而不是作业管理器(使用线程名称很容易验证)。然后确保进行大量重试,以免错过任务执行,因为附加调试可能需要一段时间。

仔细检查堆栈跟踪的行号是否与 IDE 中的代码版本匹配也很有帮助。如果预装了 Flink/Beam,它们可能会运行稍微不同的版本,并且您的断点无效。只需将堆栈跟踪粘贴到您的 IDE 中并检查每一行是否符合预期。最后,在中心位置添加更多断点,org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)以确保设置是否正常工作。

但是,远程调试通常不是大数据系统的推荐选项。您首先要确保大多数事情在本地通过一些 IT 测试和本地运行程序自行运行。然后,您可能希望使用 docker 容器和本地迷你集群添加 e2e 测试。此外,您将添加大量日志记录语句,您可以使用日志记录配置打开和关闭这些语句。同样,如果您将日志记录级别设置为调试,则框架的现有日志语句可能已经足以获得一些见解。您应该始终查看的一件重要事情是您可以在 Web UI 中看到的生成的拓扑。也许它已经告诉你有问题的路径。

于 2020-09-02T07:19:57.697 回答