1

我正在尝试在Dataflow上运行我的第一个Scio管道。

有问题的代码可以在这里找到。不过我不认为这太重要了。
我的第一个实验是使用DirecRunner读取一些本地 CSV 文件并编写另一个本地 CSV 文件。这按预期工作。

现在,我正在尝试从GCS读取文件,将输出写入BigQuery并使用DataflowRunner运行管道。我已经做了所有必要的改变(或者这就是我所相信的)。但我无法让它运行。

我已经gcloud auth application-default login和当我这样做了

sbt run --runner=DataflowRunner --project=project-id --input-path=gs://path/to/data --output-table=dataset.table

我可以看到 Jb 是在Dataflow中提交的。但是,一小时后作业失败并显示以下错误消息。

工作流失败。原因:Dataflow 作业似乎卡住了,因为在过去 1 小时内没有看到任何工作人员活动。

(请注意,这项工作在那段时间里什么也没做,因为这是一个实验,所以数据太简单了,需要几分钟时间)

检查StackDriver我可以发现以下错误:

java.lang.ClassNotFoundException:scala.collection.Seq

与杰克逊的一些事情有关:

java.util.ServiceConfigurationError: com.fasterxml.jackson.databind.Module: 提供者 com.fasterxml.jackson.module.scala.DefaultScalaModule 无法实例化

这就是在一开始就杀死每个执行者的原因。我真的不明白为什么我找不到 Scala 标准库。

我还尝试先创建一个模板,然后使用以下命令对其进行缩减:

sbt run --runner=DataflowRunner --project=project-id --input-path=gs://path/to/data --output-table=dataset.table --stagingLocation=gs://path/to/staging --templateLocation=gs://path/to/templates/template-1

但是,在运行模板后,我得到了同样的错误。
另外,我注意到在暂存文件夹中有很多罐子,但scala-library.jar不在那里。

我错过了一些明显的东西?

4

2 回答 2

4

通过设置 sbt 修复classLoaderLayeringStrategy

run / classLoaderLayeringStrategy := ClassLoaderLayeringStrategy.Flat

sbt 为使用run. 这会导致 JVM 已加载的其他类(例如 Predef)被重用,从而减少启动时间。有关详细信息,请参阅进程内类加载器

这不适用于 Beam DataflowRunner,因为它明确不从父类加载器暂存类,请参阅PipelineResources.java#L51

尝试检测类加载器可以访问的所有资源。这不会递归到类加载器父级阻止它从系统类加载器中提取资源。

因此,解决方法是强制将应用程序使用的所有类加载到同一个类加载器中,以便 DataflowRunner 暂存所有内容。

希望有帮助

于 2020-02-21T14:57:20.567 回答
4

这是 sbt 1.3.0 的一个已知问题,它引入了一些重大更改 wrt 类加载器。试试 1.2.8?

Jackson 问题也可能与 Java 11 或更高版本有关。现在继续使用 Java 8。

于 2019-09-18T19:36:09.653 回答