问题标签 [spotify-scio]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
261 浏览

serialization - 我可以在 Scio 中设置/取消设置默认编码器吗?

我想始终如一地RicherIndicatorCoder为我的案例类应用自定义RicherIndicator。此外,如果我无法为TuplesKVs包含新的编码器,RicherIndicator那么我希望获得编译时或运行时错误,而不是求助于次优编码器。

然而 Scio 似乎并不尊重@DefaultCoder注释:

Scio 也不会优先考虑使用 注册的自定义编码器CoderRegistry,而是使用自己的默认编码器:

因此,我必须在出现这种类型的setCoder(RicherIndicatorCoder.of)任何地方使用SCollection,并仔细梳理管道,以防有复合类型包含RicherIndicator.

有没有办法将我的自定义编码器设置为默认值,或者禁用默认的基于 Magnolia 或 Kryo 的编码器?

0 投票
1 回答
445 浏览

scala - Apache Beam 使用 Scio 保存到 BigQuery 并明确指定 TriggeringFrequency

我正在使用Spotify Scio创建一个由Pub/Sub消息触发的 scala Dataflow 管道。它从我们的私有中读取DB,然后将信息插入到BigQuery.

问题是:

  • 我需要删除以前的数据
  • 为此,我需要使用 write dispositionWRITE_TRUNCATE
  • 但是,该作业自动注册为流式传输,因此出现以下错误:WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection
  • 所以我需要手动将管道改为Batch管道,指定触发频率。

所以直到现在我有以下管道:

scio当我使用api ( saveAsBigQuery)时,我似乎找不到指定触发频率的方法。

它仅存在于本机beamapi 中:

如果我使用BigQueryIO我将不得不使用sc.pipeline.apply而不是我当前的管道。

有没有办法以某种方式将其集成BigQueryIO到我当前的管道或以某种方式withTriggeringFrequency在当前管道上指定?

0 投票
1 回答
376 浏览

google-cloud-storage - 如何使用 Apache Beam 库 [org.apache.beam.sdk.io.*] 获取/添加 GCS 文件用户定义的元数据

我正在设置一个 Dataflow 管道,其中一项操作是获取/添加 GCS 文件的元数据[用户提供的元数据]。

在一个独立的 Java 应用程序中,我使用下面的方法来获取来自 StorageObject 类的元数据,但在 Apache Beam 库中没有找到类似的方法/api。任何指针都会非常有帮助。

0 投票
1 回答
366 浏览

scala - 将 Scio 类型的 bigquery api 与 apache-beam 一起使用时编译管道时出错

我正在尝试使用网站bigquery中显示的类型化 api :scio

sbt pack -Dbigquery.project=sandbox-data在命令行中运行并收到以下错误:

我的build.sbt文件是:

如果您需要任何其他信息,请发表评论,我会提供。

0 投票
2 回答
136 浏览

google-cloud-dataflow - 奇怪的 Google Dataflow 作业日志条目

最近,我在工作详细信息视图中的工作日志中充满了以下条目:

作业本身似乎工作正常,但这些条目之前没有出现,我担心因此我无法发现有意义的日志条目。

这是我应该担心的事情吗?你知道如何摆脱它们吗? 在此处输入图像描述

0 投票
1 回答
66 浏览

google-cloud-dataflow - Beam SQL 未触发

我正在构建一个简单的原型,其中我正在从 Pubsub 读取数据并使用 BeamSQL,代码片段如下

我正在使用 Directrunner 在本地对其进行测试,并且在执行梁 sql 之前我能够看到结果。beam sql 的输出没有被打印出来。

0 投票
3 回答
497 浏览

scala - Scio Apache Beam - 如何正确分离管道代码?

我有一个带有一组 PTransforms 的管道,我的方法变得很长。

我想将我的 DoFns 和我的复合变换写在一个单独的包中,然后在我的 main 方法中使用它们。使用 python 非常简单,我如何使用 Scio 来实现呢?我没有看到任何这样做的例子。:(

0 投票
2 回答
1044 浏览

scala - 如何从 SBT(本地)在 Dataflow 上运行 Scio 管道

我正在尝试在Dataflow上运行我的第一个Scio管道。

有问题的代码可以在这里找到。不过我不认为这太重要了。
我的第一个实验是使用DirecRunner读取一些本地 CSV 文件并编写另一个本地 CSV 文件。这按预期工作。

现在,我正在尝试从GCS读取文件,将输出写入BigQuery并使用DataflowRunner运行管道。我已经做了所有必要的改变(或者这就是我所相信的)。但我无法让它运行。

我已经gcloud auth application-default login和当我这样做了

我可以看到 Jb 是在Dataflow中提交的。但是,一小时后作业失败并显示以下错误消息。

工作流失败。原因:Dataflow 作业似乎卡住了,因为在过去 1 小时内没有看到任何工作人员活动。

(请注意,这项工作在那段时间里什么也没做,因为这是一个实验,所以数据太简单了,需要几分钟时间)

检查StackDriver我可以发现以下错误:

java.lang.ClassNotFoundException:scala.collection.Seq

与杰克逊的一些事情有关:

java.util.ServiceConfigurationError: com.fasterxml.jackson.databind.Module: 提供者 com.fasterxml.jackson.module.scala.DefaultScalaModule 无法实例化

这就是在一开始就杀死每个执行者的原因。我真的不明白为什么我找不到 Scala 标准库。

我还尝试先创建一个模板,然后使用以下命令对其进行缩减:

但是,在运行模板后,我得到了同样的错误。
另外,我注意到在暂存文件夹中有很多罐子,但scala-library.jar不在那里。

我错过了一些明显的东西?

0 投票
0 回答
279 浏览

scala - sbt 卡住并进入 OutOfMemory

sbt尝试编译这个项目snowplow/snowplow,分支)时卡住了,sbt_issue然后去OutOfMemory

它编译得很好,很快,在我更改了一些函数的一些返回类型后,问题就出现了。

问题首先发生在:

  • Debian 伸展
  • 斯卡拉 2.12.8
  • sbt 1.2.8
  • java 1.8 (openjdk 和 oracle jdk)

我尝试了几个版本的 Scala,sbt但同样的问题。该错误可以在Travis和 Mac OS 上重现。

我不认为内存是这里的问题,因为我分配了很多:

知道可能出了什么问题或如何解决这个问题吗?

0 投票
1 回答
633 浏览

java - Apache Beam 通配符递归搜索文件

我正在使用 Spotify 的 Scio 库在 scala 中编写 apache 光束管道。我想在可以是hdfs、alluxio或GCS的文件系统上以递归方式搜索目录下的文件。像 *.j​​ar 应该找到提供的目录和子目录下的所有文件。

Apache beam sdkorg.apache.beam.sdk.io.FileIO为此目的提供了类,我可以使用pipeline.apply(FileIO.match().filepattern(filesPattern)).

如何递归搜索与提供的模式匹配的所有文件?

目前,我正在尝试另一种方法,我正在创建所提供模式的 resourceId 并获取所提供模式的当前目录,然后我正在尝试使用resourceId.resolve()方法解析当前目录中的所有子目录。但它为它抛出了一个异常。

因为currentDir.resolve我收到以下异常:

请建议使用 apache Beam 递归搜索文件的正确方法是什么?

参考资料: https ://beam.apache.org/releases/javadoc/2.11.0/index.html?org/apache/beam/sdk/io/fs/ResourceId.html