问题标签 [spotify-scio]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1024 浏览

google-bigquery - 在 Apache Beam 中维护全局状态

我们有一个 PubSub 主题,其中包含进入 BigQuery 的事件(尽管特定数据库在这里几乎无关紧要)。事件可能带有新的未知属性,这些属性最终应该作为单独的 BigQuery 列结束。

所以,基本上我在这里有两个问题:

  1. 在 Pipeline 中维护全局状态的正确方法是什么(在我的例子中是一组遇到的属性)?
  2. 一旦遇到新属性并执行直到ALTER TABLE执行,什么是缓冲/保持事件流的好策略

现在我尝试使用以下(我正在使用 Spotify scio):

但这非常低效,因为我们至少需要将整个加载rowsIterable到内存中,甚至遍历它。

0 投票
2 回答
613 浏览

google-cloud-dataflow - PubSub 水印不前进

我使用 Scio 编写了一个 Apache Beam 作业,目的是为传入的数据记录生成会话 ID,然后以某种方式丰富它们,然后再将它们输出到 BigQuery。这是代码:

我正在使用事件时间戳,它是 中的属性键的值tsPubsubMessage作为我的时间戳属性。.timestampBy这与我在窗口化数据之前使用的时间戳完全相同。我期望的是一旦水印超过 sessionGap(默认 30 分钟),输出触发器就会触发。

使用 Dataflow 运行器和 DirectRunner 时,触发器永远不会触发,即使我模拟时间戳间隔超过 30 分钟的数据。在 Dataflow UI 中,我可以看到水印永远不会根据事件时间戳前进,而是每隔一分钟前进一次,就好像没有收到数据一样。

我已经验证了数据是实际收到的,因为在窗口化之前执行了转换。我还测试了每秒大约 10 条记录,但也许这仍然不足以更新水印?我还设置了一个 JobTest,在其中我得到了预期的输出,同时也向我表明问题是基于时间戳/水印的。

我确定我错过了文档中的重要内容或在某处犯了愚蠢的错误,并希望有人能指出我正确的方向。

0 投票
1 回答
619 浏览

google-bigquery - Scio saveAsTypedBigQuery 写入分区,用于 SCollection of Typed Big Query 案例类

我正在尝试使用以下方法将 SCollection 写入 Big Query 中的分区:

我得到的错误是 表 ID 必须是字母数字(加上下划线),并且必须最多 1024 个字符长。此外,不能使用表装饰器。”

如何写入分区?我没有看到任何通过 saveAsTypedBigQuery 方法指定分区的选项,所以我尝试了 Legacy SQL 表装饰器。

0 投票
2 回答
485 浏览

google-cloud-dataflow - Scio所有saveAs txt文件方法输出一个带有part前缀的txt文件

如果我想将 TableRow 或 String 的 SCollection 输出到谷歌云存储 (GCS),我分别使用 saveAsTableRowJsonFile 或 saveAsTextFile。这两种方法最终都使用

它强制文件名以“part”开头。是通过使用 saveAsCustomOutput 输出自定义分片文件的唯一方法吗?

0 投票
1 回答
544 浏览

google-cloud-dataflow - 更新 Apache Beam (Scio) 中的 BigTable 行

我有以下用例:

有一个 PubSub 主题,其中包含我想使用 Scio 聚合的数据,然后将这些聚合保存到 BigTable 中。

在我的管道中有一个CountByKey聚合。我想做的是能够为给定键增加BigTable 中的值,最好使用ReadModifyWrite. 在scio-examples其中只有与设置列值相关的更新,但没有使用原子增量。

我知道我需要创建Mutation才能在 BigTable 上执行任何操作,如下所示:

如何从 Scio / Apache Beam 转换创建UPDATE突变以原子更新 BigTable 中的行?

0 投票
1 回答
326 浏览

apache-beam - Scio/Apache Beam,如何映射分组结果

我有一个简单的管道,可以从固定窗口中的 pubsub 读取,解析消息并按特定属性对它们进行分组。但是,如果我mapgroupBy我的函数之后似乎没有被执行。

我错过了什么吗?

0 投票
1 回答
127 浏览

serialization - Scio 如何回退到 Kryo

我看到 Scio 回退到 Kryo 编码器而不是 Java Serializer,后者是当无法通过 CoderRegistry 推断/找到编码器时用于 Dataflow 的默认编码器。我没有看到对任何地方的任何引用setFallbackCoderProvider,Scio 如何将回退注册到 KryoAtomicCoder?

0 投票
1 回答
304 浏览

google-cloud-dataflow - 在 Scio Spotify 中使用 ContextAndArgs 创建 scioContext 时作为参数传递的内容

我是 Scio 的新手,并试图了解更多有关它的信息。我在 Scio 源代码中看到了一些示例,并想运行它。但它要求一些我不知道且在文档中未指定的论点。

对于这部分代码,我需要传递一些参数,但我不知道它到底需要什么以及采用什么格式。另外,如果我没有在系统中安装 gsutil,如何将 Google Cloud Credential 直接传递给代码?

0 投票
3 回答
889 浏览

google-cloud-dataflow - 如何部署 Apache Beam/Spotify Scio 管道?

我使用 Apache Beam 的 Scio 包装器创建了一个管道。我想将它部署在 Google Dataflow 中。

我希望有一个特定的按钮或端点或函数可以定期执行此作业。

我能找到的所有指令都涉及运行 sbt runMain/pack,它会构建工件并每次都上传它们。

如何上传工件一次,然后尽可能轻松地基于管道创建作业?

0 投票
0 回答
85 浏览

spotify-scio - 为什么在排空流式管道时会收到很多 None.get 异常?

我遇到的问题是,我在 Dataflow 上运行了一个流式 scio 管道,该管道正在删除重复消息并按键执行一些计数。当我尝试排空管道时,我得到了大量的None.get异常,据说是在我的重复数据删除步骤中抛出的(我的这个假设是基于我在堆栈驱动程序日志中观察到的标签)。

我们目前在 scio 版本 0.7.0-beta1 和 beam 版本 2.8.0 上运行。我已经尝试在我的代码中尽可能多地保护任何潜在的无,但这似乎发生在重复数据删除步骤内部的更深处。

我得到的错误如下:

正如你所看到的,这从来没有真正进入我的代码,我不确定我应该如何去寻找这个问题。也许它与“LateDataDroppingDoFnRunner”有关?我们允许的迟到相对较大(3 天,窗口为 1 小时)。

我很困惑我怎么可能在None.get这里得到一个,为什么这只会在我排空时发生。

我可以就如何调试此错误或应该在哪里寻找一些建议吗?