问题标签 [spotify-scio]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
268 浏览

scala - SCIO 用 scio parquet 读取 parquet 文件,未找到生成的案例类

我有个问题。

我通过 sbt 原型创建了一个 SCIO(Apache Beam)项目:sbt new spotify/scio.g8

此作业的目标是从 GS 读取镶木地板文件

当我直接在 SCIO 中使用 Apache Beam 提供的 ParquetIO 时,这项工作(GenericRecord):

但我想使用 scio-parquet:https ://spotify.github.io/scio/io/Parquet.html#read-avro-parquet-files

我在 build.sbt 文件中有以下依赖项:

我使用模式

然后在我使用的工作中:

但我有以下错误:

我不明白为什么,但模式@AvroType.fromSchema 可能没有正确生成案例类TestObject。

或者也许我没有正确使用 api,但我点击链接:https ://spotify.github.io/scio/io/Parquet.html#read-avro-parquet-files

谢谢你的帮助。

0 投票
2 回答
213 浏览

scala - 参数化测试 SCIO (JobTest) 和 Scala 测试 (forAll)

我想用 SCIO JobTest 和 Scala Test 进行参数化测试。我使用 TableDrivenPropertyChecks,它允许通过 aa forAll 进行参数化测试。

对于第一组参数,测试工作正常,但对于第二组,Job args 变为空。我不明白为什么(也许是一个状态..)。

当我单独执行所有组参数时(单独和没有其他人),它可以工作。

为什么会有这种行为?

是否可以使用 SCIO JobTest 进行参数化测试?

提前感谢您的帮助。

0 投票
1 回答
338 浏览

scala - 运行测试时调试 SCollection 内容

运行单元测试()时,有什么方法可以查看 SCollection 的内容PipelineSpec

在多台机器上运行生产时,无法在一台机器上查看整个集合,但我想知道有没有办法查看 SCollection 的内容(例如,在 intellij 中以调试模式运行单元测试时) .

0 投票
2 回答
167 浏览

google-cloud-storage - Cloud Dataflow GlobalWindow 触发器被忽略

在使用 Dataflow 运行程序运行时,使用 AfterPane.elementCountAtLeast 触发器不起作用,但在本地运行时可以正常工作。在 Dataflow 上运行时,它只生成一个窗格。

目标是从 Cloud SQL 中提取数据,转换并写入 Cloud Storage。但是,内存中存储的数据过多,因此需要将其拆分并分块写入 Cloud Storage。这就是我希望这样做的。

完整的代码是:

我认为问题的根源可能是JdbcIO该类被实现为 aPTransform<PBegin,PCollection>并且对 processElement 的单个调用输出了整个 SQL 查询结果:

0 投票
2 回答
226 浏览

google-cloud-dataflow - Apache Beam 批处理早期输出?

无论如何在批处理模式下运行时触发窗口的早期输​​出?我已经使用 Dataflow 运行器尝试了许多触发器以获得早期窗口输出,但它们始终保持到处理结束。

0 投票
1 回答
64 浏览

spotify-scio - 如何处理自定义案例类的编码器

我尝试运行一个简单的 SCIO 代码。

尝试Foo在 a中使用SCollection它会导致错误:

错误信息中写了很多。我似乎无法理解,尽管所有这些文本如何为我应该围绕 Coders 做的事情提供任何信息来解决这个问题。

任何人都可以阐明如何解决这个问题。

不使用implicits的奖励积分。

0 投票
0 回答
660 浏览

java - 如何修复无法序列化 DoFnWithExecutionInformation

尝试DirectRunner在 Scio/Beam 中运行管道时,出现以下错误:

代码如下:

不确定实际问题是什么。尝试内联writeFoo但仍然无法正常工作。

0 投票
1 回答
198 浏览

google-cloud-dataflow - Beam/Dataflow 状态处理,ParDo 从不运行

我正在尝试在 Dataflow 上使用 Beam 的有状态处理,但每次尝试输出数据时,我都会在日志中收到这些错误。结果是有状态的ParDo+没有输出任何内容DoFn

编辑这似乎是巧合。窗口触发之前,有状态的似乎ParDo不输出任何元素。这个对吗?

此示例使用 Scio 复制错误.batchByKey(它在后台使用有状态处理):

final .map,它只是一个带有单个输出的ParDo+ ,永远不会运行。DoFn

在输出中,我看到五行递增的数字(从 开始new Increasing),然后是上面的两条消息。这重复。

任何人都知道错误可能是什么?这似乎是源apache/beam/../WorkerCustomSources.java#L698

0 投票
1 回答
436 浏览

spotify-scio - 如何将 SCollection[String] 转换为 Seq[String] 或 List[String]?

我想转换SCollection[String]Seq[String]or List[String]

例如,我有一个名为 ids 的变量。

当我将它保存到 Cloud Storage 时,文本文件的内容是一个 ID 表。

我想将文件的内容保留为 Seq 或 List。

val seqOdIds: Seq[String] = ~

0 投票
2 回答
673 浏览

google-cloud-dataflow - Apache Beam Stateful DoFn 周期性输出所有 K/V 对

我正在尝试使用有状态的 DoFn(使用元素)在 Apache Beam(通过 Scio)中聚合(每个键)流数据@ProcessElement@StateId ValueState。我认为这最适合我要解决的问题。要求是:

  • 对于给定的键,记录在所有时间都被聚合(基本上求和) -我不关心以前计算的聚合,只关心最近的
  • 根据我控制的某些条件,密钥可能会被逐出状态 ( )state.clear()
  • 每 5 分钟,无论是否看到任何新密钥,都应输出所有尚未从状态中驱逐的密钥

鉴于这是一个流式管道并且将无限期地运行,使用combinePerKey具有累积触发窗格的全局窗口似乎会继续增加其内存占用以及它需要随时间运行的数据量,所以我想避免它。此外,在测试这一点时,(可能如预期的那样)它只是将新计算的聚合与历史输入一起附加到输出中,而不是使用每个键的最新值。

我的想法是,使用 StatefulDoFn 只会让我输出所有全局状态直到 now(),但这似乎不是一个简单的解决方案。我已经看到使用计时器为此人为执行回调的提示,以及可能使用缓慢增长的侧面输入映射(如何在创建 PCollectionView<Map<String,String>> 时解决重复值异常)并以某种方式刷新它,但这本质上需要迭代地图中的所有值,而不是加入地图。

我觉得我可能忽略了一些简单的东西来让它工作。我对 Beam 中的许多窗口和计时器概念相对较新,正在寻找有关如何解决此问题的任何建议。谢谢!