问题标签 [spotify-scio]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
serialization - 我可以在 Scio 中设置/取消设置默认编码器吗?
我想始终如一地RicherIndicatorCoder
为我的案例类应用自定义RicherIndicator
。此外,如果我无法为Tuples
或KVs
包含新的编码器,RicherIndicator
那么我希望获得编译时或运行时错误,而不是求助于次优编码器。
然而 Scio 似乎并不尊重@DefaultCoder
注释:
Scio 也不会优先考虑使用 注册的自定义编码器CoderRegistry
,而是使用自己的默认编码器:
因此,我必须在出现这种类型的setCoder(RicherIndicatorCoder.of)
任何地方使用SCollection
,并仔细梳理管道,以防有复合类型包含RicherIndicator
.
有没有办法将我的自定义编码器设置为默认值,或者禁用默认的基于 Magnolia 或 Kryo 的编码器?
scala - Apache Beam 使用 Scio 保存到 BigQuery 并明确指定 TriggeringFrequency
我正在使用Spotify Scio
创建一个由Pub/Sub
消息触发的 scala Dataflow 管道。它从我们的私有中读取DB
,然后将信息插入到BigQuery
.
问题是:
- 我需要删除以前的数据
- 为此,我需要使用 write disposition
WRITE_TRUNCATE
- 但是,该作业自动注册为流式传输,因此出现以下错误:
WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection
- 所以我需要手动将管道改为
Batch
管道,指定触发频率。
所以直到现在我有以下管道:
scio
当我使用api ( saveAsBigQuery
)时,我似乎找不到指定触发频率的方法。
它仅存在于本机beam
api 中:
如果我使用BigQueryIO
我将不得不使用sc.pipeline.apply
而不是我当前的管道。
有没有办法以某种方式将其集成BigQueryIO
到我当前的管道或以某种方式withTriggeringFrequency
在当前管道上指定?
google-cloud-storage - 如何使用 Apache Beam 库 [org.apache.beam.sdk.io.*] 获取/添加 GCS 文件用户定义的元数据
我正在设置一个 Dataflow 管道,其中一项操作是获取/添加 GCS 文件的元数据[用户提供的元数据]。
在一个独立的 Java 应用程序中,我使用下面的方法来获取来自 StorageObject 类的元数据,但在 Apache Beam 库中没有找到类似的方法/api。任何指针都会非常有帮助。
scala - 将 Scio 类型的 bigquery api 与 apache-beam 一起使用时编译管道时出错
我正在尝试使用网站bigquery
中显示的类型化 api :scio
我sbt pack -Dbigquery.project=sandbox-data
在命令行中运行并收到以下错误:
我的build.sbt
文件是:
如果您需要任何其他信息,请发表评论,我会提供。
google-cloud-dataflow - Beam SQL 未触发
我正在构建一个简单的原型,其中我正在从 Pubsub 读取数据并使用 BeamSQL,代码片段如下
我正在使用 Directrunner 在本地对其进行测试,并且在执行梁 sql 之前我能够看到结果。beam sql 的输出没有被打印出来。
scala - Scio Apache Beam - 如何正确分离管道代码?
我有一个带有一组 PTransforms 的管道,我的方法变得很长。
我想将我的 DoFns 和我的复合变换写在一个单独的包中,然后在我的 main 方法中使用它们。使用 python 非常简单,我如何使用 Scio 来实现呢?我没有看到任何这样做的例子。:(
scala - 如何从 SBT(本地)在 Dataflow 上运行 Scio 管道
我正在尝试在Dataflow上运行我的第一个Scio管道。
有问题的代码可以在这里找到。不过我不认为这太重要了。
我的第一个实验是使用DirecRunner读取一些本地 CSV 文件并编写另一个本地 CSV 文件。这按预期工作。
现在,我正在尝试从GCS读取文件,将输出写入BigQuery并使用DataflowRunner运行管道。我已经做了所有必要的改变(或者这就是我所相信的)。但我无法让它运行。
我已经gcloud auth application-default login
和当我这样做了
我可以看到 Jb 是在Dataflow中提交的。但是,一小时后作业失败并显示以下错误消息。
工作流失败。原因:Dataflow 作业似乎卡住了,因为在过去 1 小时内没有看到任何工作人员活动。
(请注意,这项工作在那段时间里什么也没做,因为这是一个实验,所以数据太简单了,需要几分钟时间)。
检查StackDriver我可以发现以下错误:
java.lang.ClassNotFoundException:scala.collection.Seq
与杰克逊的一些事情有关:
java.util.ServiceConfigurationError: com.fasterxml.jackson.databind.Module: 提供者 com.fasterxml.jackson.module.scala.DefaultScalaModule 无法实例化
这就是在一开始就杀死每个执行者的原因。我真的不明白为什么我找不到 Scala 标准库。
我还尝试先创建一个模板,然后使用以下命令对其进行缩减:
但是,在运行模板后,我得到了同样的错误。
另外,我注意到在暂存文件夹中有很多罐子,但scala-library.jar
不在那里。
我错过了一些明显的东西?
java - Apache Beam 通配符递归搜索文件
我正在使用 Spotify 的 Scio 库在 scala 中编写 apache 光束管道。我想在可以是hdfs、alluxio或GCS的文件系统上以递归方式搜索目录下的文件。像 *.jar 应该找到提供的目录和子目录下的所有文件。
Apache beam sdkorg.apache.beam.sdk.io.FileIO
为此目的提供了类,我可以使用pipeline.apply(FileIO.match().filepattern(filesPattern))
.
如何递归搜索与提供的模式匹配的所有文件?
目前,我正在尝试另一种方法,我正在创建所提供模式的 resourceId 并获取所提供模式的当前目录,然后我正在尝试使用resourceId.resolve()
方法解析当前目录中的所有子目录。但它为它抛出了一个异常。
因为currentDir.resolve
我收到以下异常:
请建议使用 apache Beam 递归搜索文件的正确方法是什么?