问题标签 [spotify-scio]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
1100 浏览

scala - 如何在 Cloud Dataflow 中使用 TextIO.Read 将多个文件与名称匹配

我有一个 gcs 文件夹如下:

我只想匹配Scio下dt=2017-12-02dt=2017-12-03使用的文件,据我所知,它在下面使用。sc.textFile()TextIO.Read.from()

我试过了

两者都匹配零文件:

这样做的有效文件模式应该是什么?

0 投票
1 回答
394 浏览

google-cloud-dataflow - 修复了无界输入 (PubSub) 上的窗口在工作人员自动扩展后停止触发

使用 scio 0.4.7 版,我有一个正在监听 pubsub 主题的流式作业,我在这里使用事件处理,RFC3339 中的消息属性上存在“时间戳”属性

我使用 windowedEvents 在管道中进行进一步的聚合和计算

如果我在谷歌数据流上使用以下自动缩放参数运行作业

如果只有一名工作人员在运行,则作业运行并且固定窗口正确触发。一旦作业自动扩展到超过 1 个工作人员,固定窗口就会停止触发,初始 pubsub 步骤的系统延迟和挂墙时间会持续增长,而数据水印不会向前移动。

我的触发器设置有问题吗?有没有其他人在数据流运行器或其他运行器上遇到过这种情况?任何帮助是极大的赞赏。如果我不能解决这个问题,我倾向于放弃 scio 并恢复到 apache-beam java sdk。

0 投票
2 回答
1830 浏览

jdbc - 将数据从 CloudSql 流式传输到 Dataflow

我们目前正在探索如何使用 Apache Beam/Google Dataflow 处理 Google Cloud SQL 数据库 (MySQL) 中的大量数据存储。

该数据库在一个表中存储了大约 200GB 的数据。

我们使用 成功地从数据库中读取了行JdbcIO,但到目前为止,这只有在我们LIMIT查询的行数时才有可能。否则我们会遇到内存问题。我假设默认情况下,SELECT查询会尝试将所有结果行加载到内存中。

什么是惯用的方法?批处理 SQL 查询?流式传输结果?

我们尝试调整fetch size执行的语句,但没有多大成功。

这是我们的 JDBC 读取设置的样子:

到目前为止,我还没有找到任何关于来自 sql 的流的资源。

编辑

我将列出我采用的视图方法,以便其他人可以学到一些东西(例如如何这样做)。为了了解更多上下文,所讨论的数据库表的结构非常糟糕:它有一个包含 JSON 字符串的列,以及一个id列(主键)加上一个addedmodified列(两种TIMESTAMP类型)。在第一种方法时,它没有进一步的索引。该表包含 25 个 mio 行。所以这可能更像是一个数据库问题,而不是 Apache Beam/JDBC 问题。但尽管如此:

方法 1(上) - 查询所有内容

基本上它看起来像这样:

如果我将 a 添加LIMIT到查询中,这将起作用。但显然很慢。

方法 2 - 键集分页

虽然我很快了解到LIMIT _ OFFSET _组合也从第一行开始扫描,但这效果更好一些。因此,每个后续查询都花费了更长的时间,收敛到很长时间。

方法 2.5 - 带排序的键集分页

与上述方法类似,但我们在added列上创建了一个索引并将查询更新为

这加快了速度,但最终查询时间变长了。

方法 3 - 无波束/数据流

这会将结果集逐行返回并将行写入文件。所有 25 条 mio 记录运行了大约 2 小时。最后。如果有人能指出如何使用 Beam 实现此解决方案,那就太好了。

顺便说一句:现在我有了原始数据作为用 Beam 处理的 CSV 文件是轻而易举的事。大约 80GB 的原始数据可以在大约 5 分钟内通过自动缩放等转换为另一种 CSV 格式。

0 投票
1 回答
908 浏览

google-cloud-dataflow - 长时间运行的流数据流作业的“超出 GC 开销限制”

长时间运行我的流式数据流作业往往会导致“超出 GC 开销限制”错误,从而导致作业停止。我怎样才能最好地继续调试呢?

  • 职位编号:2018-02-06_00_54_50-15974506330123401176
  • SDK:适用于 Java 2.2.0 的 Apache Beam SDK
  • Scio 版本:0.4.7
0 投票
1 回答
112 浏览

spotify-scio - Scio 测试未访问的计数器

我正在围绕我的管道构建一些测试,特别是我有两个分支(一个考虑错误,另一个成功),在错误方面我有一个递增计数器(ScioMetrics.counter("MetricName").inc())并且在为另一个分支构建测试时我想要断言错误计数器为 0。

当测试成功运行时,我收到一条NoSuchelementException消息,说明未找到该指标,因为它可能没有在管道内被访问,这没关系,因为我可以断言引发了异常,但是。难道不应该有一种“更好”的测试方式吗?

谢谢!

0 投票
1 回答
369 浏览

spotify-scio - Scio JobTest、PubSubIO、pubsubSubscriptionWithAttributes、timestampAttribute 和窗口问题

我正在构建一个管道来将 PubSub 中的数据备份到 GCS 中,并希望使用它创建一个测试JobTest,我正在努力让 PubSubIO 正确获取事件时间。

PubSub 使用sc.pubsubSubscriptionWithAttributes[String]("path/to/subscription", timestampAttribute = "doc_timestamp"). 在此之后,我应用窗口并将其发送到CustomIO

测试看起来像这样:

结果是值被放在了-290308-12-21T20:00:00.000Z..-290308-12-21T21:00:00.000Z窗口中!!。可能是因为"doc_timestamp"没有正确解释日期。实际上,无论"doc_timestamp"键上的值如何,窗口都不会改变。

幸运的是,这项工作在生产中运行时运行良好,但我想编写这个测试。

0 投票
1 回答
221 浏览

scala - Scio TypeSafe BigQuery 是否支持带子句

一直在给我:40: error: Missing query。一旦我关闭 legacySql 模式,此查询在 BigQuery 中运行良好。我不应该期望在 BigQuery 中运行的每个查询都能与 TypeSafe BigQuery 一起使用吗?

0 投票
1 回答
1447 浏览

scala - 为什么在 Scio 中您更喜欢聚合而不是 groupByKey?

从:

https://github.com/spotify/scio/wiki/Scio-data-guideline

“比 groupByKey 更喜欢 combine/aggregate/reduce 转换。请记住,reduce 操作必须是关联的和可交换的。”

为什么特别喜欢聚合而不是 groupByKey?

0 投票
2 回答
487 浏览

google-bigquery - 与 BigQuery 表输入大小相比,为什么我的 PCollection (SCollection) 大小如此之大?

在此处输入图像描述

上图是一个大查询表的表模式,它是在 spotify 的 scio 上运行的 apache Beam 数据流作业的输入。如果您不熟悉 scio,它是 Apache Beam Java SDK 的 Scala 包装器。特别是,“SCollection 包装 PCollection”。我在 BigQuery 磁盘上的输入表是 136 GB,但在数据流 UI 中查看我的 SCollection 的大小时,它是 504.91 GB。 在此处输入图像描述

我知道 BigQuery 在数据压缩和表示方面可能要好得多,但大小增加 3 倍以上似乎相当高。非常清楚,我使用的是类型安全大查询案例类(我们称之为 Clazz)表示,所以我的 SCollection 是 SCollection[Clazz] 类型而不是 SCollection[TableRow]。TableRow 是 Java JDK 中的原生表示。关于如何降低内存分配的任何提示?它与我输入中的特定列类型有关:字节、字符串、记录、浮点数等?

0 投票
1 回答
486 浏览

scala - scio 类型安全 BigQuery 类 - 注释问题

我正在尝试使用类型安全的 BigQuery 类。我还安装了intellij scio 插件。但我得到了错误,

错误:(37, 21) 类型参数 [RowElement] 不符合方法 typedBigQuery 的类型参数界限 [T <: com.spotify.scio.bigquery.types.BigQueryType.HasAnnotation] sc.typedBigQueryRowElement

这是我的斯卡拉代码: