问题标签 [spotify-scio]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - 如何在 Cloud Dataflow 中使用 TextIO.Read 将多个文件与名称匹配
我有一个 gcs 文件夹如下:
我只想匹配Scio下dt=2017-12-02
和dt=2017-12-03
使用的文件,据我所知,它在下面使用。sc.textFile()
TextIO.Read.from()
我试过了
和
两者都匹配零文件:
这样做的有效文件模式应该是什么?
google-cloud-dataflow - 修复了无界输入 (PubSub) 上的窗口在工作人员自动扩展后停止触发
使用 scio 0.4.7 版,我有一个正在监听 pubsub 主题的流式作业,我在这里使用事件处理,RFC3339 中的消息属性上存在“时间戳”属性
我使用 windowedEvents 在管道中进行进一步的聚合和计算
如果我在谷歌数据流上使用以下自动缩放参数运行作业
如果只有一名工作人员在运行,则作业运行并且固定窗口正确触发。一旦作业自动扩展到超过 1 个工作人员,固定窗口就会停止触发,初始 pubsub 步骤的系统延迟和挂墙时间会持续增长,而数据水印不会向前移动。
我的触发器设置有问题吗?有没有其他人在数据流运行器或其他运行器上遇到过这种情况?任何帮助是极大的赞赏。如果我不能解决这个问题,我倾向于放弃 scio 并恢复到 apache-beam java sdk。
jdbc - 将数据从 CloudSql 流式传输到 Dataflow
我们目前正在探索如何使用 Apache Beam/Google Dataflow 处理 Google Cloud SQL 数据库 (MySQL) 中的大量数据存储。
该数据库在一个表中存储了大约 200GB 的数据。
我们使用 成功地从数据库中读取了行JdbcIO
,但到目前为止,这只有在我们LIMIT
查询的行数时才有可能。否则我们会遇到内存问题。我假设默认情况下,SELECT
查询会尝试将所有结果行加载到内存中。
什么是惯用的方法?批处理 SQL 查询?流式传输结果?
我们尝试调整fetch size
执行的语句,但没有多大成功。
这是我们的 JDBC 读取设置的样子:
到目前为止,我还没有找到任何关于来自 sql 的流的资源。
编辑
我将列出我采用的视图方法,以便其他人可以学到一些东西(例如如何不这样做)。为了了解更多上下文,所讨论的数据库表的结构非常糟糕:它有一个包含 JSON 字符串的列,以及一个id
列(主键)加上一个added
和modified
列(两种TIMESTAMP
类型)。在第一种方法时,它没有进一步的索引。该表包含 25 个 mio 行。所以这可能更像是一个数据库问题,而不是 Apache Beam/JDBC 问题。但尽管如此:
方法 1(上) - 查询所有内容
基本上它看起来像这样:
如果我将 a 添加LIMIT
到查询中,这将起作用。但显然很慢。
方法 2 - 键集分页
虽然我很快了解到LIMIT _ OFFSET _
组合也从第一行开始扫描,但这效果更好一些。因此,每个后续查询都花费了更长的时间,收敛到很长时间。
方法 2.5 - 带排序的键集分页
与上述方法类似,但我们在added
列上创建了一个索引并将查询更新为
这加快了速度,但最终查询时间变长了。
方法 3 - 无波束/数据流
这会将结果集逐行返回并将行写入文件。所有 25 条 mio 记录运行了大约 2 小时。最后。如果有人能指出如何使用 Beam 实现此解决方案,那就太好了。
顺便说一句:现在我有了原始数据作为用 Beam 处理的 CSV 文件是轻而易举的事。大约 80GB 的原始数据可以在大约 5 分钟内通过自动缩放等转换为另一种 CSV 格式。
google-cloud-dataflow - 长时间运行的流数据流作业的“超出 GC 开销限制”
长时间运行我的流式数据流作业往往会导致“超出 GC 开销限制”错误,从而导致作业停止。我怎样才能最好地继续调试呢?
- 职位编号:2018-02-06_00_54_50-15974506330123401176
- SDK:适用于 Java 2.2.0 的 Apache Beam SDK
- Scio 版本:0.4.7
spotify-scio - Scio 测试未访问的计数器
我正在围绕我的管道构建一些测试,特别是我有两个分支(一个考虑错误,另一个成功),在错误方面我有一个递增计数器(ScioMetrics.counter("MetricName").inc()
)并且在为另一个分支构建测试时我想要断言错误计数器为 0。
当测试成功运行时,我收到一条NoSuchelementException
消息,说明未找到该指标,因为它可能没有在管道内被访问,这没关系,因为我可以断言引发了异常,但是。难道不应该有一种“更好”的测试方式吗?
谢谢!
spotify-scio - Scio JobTest、PubSubIO、pubsubSubscriptionWithAttributes、timestampAttribute 和窗口问题
我正在构建一个管道来将 PubSub 中的数据备份到 GCS 中,并希望使用它创建一个测试JobTest
,我正在努力让 PubSubIO 正确获取事件时间。
PubSub 使用sc.pubsubSubscriptionWithAttributes[String]("path/to/subscription", timestampAttribute = "doc_timestamp")
. 在此之后,我应用窗口并将其发送到CustomIO
测试看起来像这样:
结果是值被放在了-290308-12-21T20:00:00.000Z..-290308-12-21T21:00:00.000Z
窗口中!!。可能是因为"doc_timestamp"
没有正确解释日期。实际上,无论"doc_timestamp"
键上的值如何,窗口都不会改变。
幸运的是,这项工作在生产中运行时运行良好,但我想编写这个测试。
scala - Scio TypeSafe BigQuery 是否支持带子句
一直在给我:40: error: Missing query
。一旦我关闭 legacySql 模式,此查询在 BigQuery 中运行良好。我不应该期望在 BigQuery 中运行的每个查询都能与 TypeSafe BigQuery 一起使用吗?
scala - 为什么在 Scio 中您更喜欢聚合而不是 groupByKey?
从:
https://github.com/spotify/scio/wiki/Scio-data-guideline
“比 groupByKey 更喜欢 combine/aggregate/reduce 转换。请记住,reduce 操作必须是关联的和可交换的。”
为什么特别喜欢聚合而不是 groupByKey?
google-bigquery - 与 BigQuery 表输入大小相比,为什么我的 PCollection (SCollection) 大小如此之大?
上图是一个大查询表的表模式,它是在 spotify 的 scio 上运行的 apache Beam 数据流作业的输入。如果您不熟悉 scio,它是 Apache Beam Java SDK 的 Scala 包装器。特别是,“SCollection 包装 PCollection”。我在 BigQuery 磁盘上的输入表是 136 GB,但在数据流 UI 中查看我的 SCollection 的大小时,它是 504.91 GB。
我知道 BigQuery 在数据压缩和表示方面可能要好得多,但大小增加 3 倍以上似乎相当高。非常清楚,我使用的是类型安全大查询案例类(我们称之为 Clazz)表示,所以我的 SCollection 是 SCollection[Clazz] 类型而不是 SCollection[TableRow]。TableRow 是 Java JDK 中的原生表示。关于如何降低内存分配的任何提示?它与我输入中的特定列类型有关:字节、字符串、记录、浮点数等?
scala - scio 类型安全 BigQuery 类 - 注释问题
我正在尝试使用类型安全的 BigQuery 类。我还安装了intellij scio 插件。但我得到了错误,
错误:(37, 21) 类型参数 [RowElement] 不符合方法 typedBigQuery 的类型参数界限 [T <: com.spotify.scio.bigquery.types.BigQueryType.HasAnnotation] sc.typedBigQueryRowElement
这是我的斯卡拉代码: