问题标签 [spotify-scio]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
449 浏览

scala - 使用 Scio 将 SCollection 从 textFile 放入 BigQuery

我已经阅读了一些文档textFile,并做了flatMap一个单词,为每个单词添加了一些额外的信息:

我目前正在轻松地将其保存为文本

但我不知道如何将地图保存到 BigQuery 架构。我见过的所有示例都从 BigQuery 创建初始 Scollection,然后将其保存到另一个表中,因此初始集合[TableRow]不是[String].

这里的正确方法是什么?我是否应该调查如何将我的数据转换为 Big Query 可以接受的一种集合?或者我应该尝试进一步调查如何将这个纯文本直接推送到表格中?

0 投票
1 回答
1085 浏览

google-cloud-dataflow - 如何处理 CoderException: cannot encode a null String with scio

我刚开始使用 scio 和数据流。将我的代码尝试到一个输入文件中,效果很好。但是当我向输入中添加更多文件时,出现以下异常:

我想我的一个输入文件可能包含一些格式错误的数据。但是如何绕过坏数据呢?Java Beam com.google.cloud.dataflow.sdk.coders.CoderException 有一个类似的问题:无法编码空字符串 所以我尝试了这个:

它没有帮助。有人可以帮我吗?谢谢。

0 投票
1 回答
176 浏览

google-cloud-storage - 数据流 TextIO.write 缩放问题

我创建了一个简单的数据流管道,它从 pubsub 读取字节数组,将它们窗口化,然后写入 GCS 中的文本文件。我发现对于流量较低的主题,这非常有效,但是我在一个每分钟大约 2.4GB 的主题上运行它,并且开始出现一些问题。

在启动管道时,我没有设置工作人员的数量(正如我想象的那样,它会根据需要自动扩展)。在摄取这么多数据时,worker 的数量保持在 1,但 TextIO.write() 需要 15 分钟以上才能写入 2 分钟的窗口。这将继续备份,直到内存不足。当这一步得到如此备份时,Dataflow 不自动扩展是否有充分的理由?

当我将工作人员的数量增加到 6 个时,写入文件的时间从 4 分钟左右开始,持续 5 分钟,然后下降到 20 秒。

另外,当使用 6 名工人时,计算挂墙时间似乎有问题?即使数据流已经赶上并且运行 4 小时后,我的写入步骤摘要看起来像这样,我的似乎也永远不会下降:

职位编号:2019-03-13_19_22_25-14107024023503564121

0 投票
2 回答
327 浏览

join - 将批处理数据与存储在 BigTable 中的数据连接起来

我在 GCS 中有不断增长的数据,并且将有一个批处理作业运行,可以说每天处理 100 万篇文章增量。我需要从 BigTable(包含数十亿条记录)中获取键的附加信息。对地图操作中的每个项目进行查找是否可行?批处理这些查找并执行诸如批量读取之类的操作是否有意义?或者使用 scio/beam 的这个用例的最佳方法是什么?

我在Pattern: Streaming mode large lookup tables中发现,对每个请求执行查找是推荐的流式方法,但是我不确定我是否不会通过批处理作业重载 BigTable。

你们对如何处理这个用例有任何总体或具体的建议吗?

0 投票
1 回答
2598 浏览

java - DataFlow (Apache Beam) 中 Pub/Sub 的自定义时间戳和窗口化

我想使用 Apache Beam 中的流管道(并在 Google DataFlow 上运行)来实现以下场景:

  1. 从 Pub/Sub 读取消息(JSON 字符串)
  2. 反序列化 JSON
  3. 使用自定义字段(比如timeStamp)作为处理元素的时间戳值
  4. 应用固定窗口60 seconds
  5. 从元素中提取密钥并按密钥分组
  6. <<进行进一步处理>>

我尝试使用 Java(Scala) 和 Python 解决这个问题,但没有一个解决方案有效。

  1. Python解决方案

add_timestamping根据文档功能:

Python解决方案的输出

  1. 使用时DirectRunner,会发出窗口,并且窗口本身或多或少是合适的,具体取决于延迟。
  2. 使用时DataFlowRunner会跳过所有消息,并在 DataFlow UI 中显示计数器:dropDueToLateness

  1. Java / Scala 解决方案 (我使用过Scio,但这也发生在 Java 中的干净 Beam SDK 中)

根据文档添加自定义时间戳:

Java / Scala 解决方案的输出

我不能DoFn.getAllowedTimestampSkew在这里使用,因为它已经被弃用了,而且我不知道将发送什么范围的历史数据。


处理历史数据的能力对我的项目至关重要(这些数据将从某个商店发送到 Pub/Sub)。管道必须同时处理当前数据和历史数据。

我的问题是: 如何使用自定义时间戳处理数据,并能够在使用 Beam API 定义的窗口上进行操作?

0 投票
1 回答
147 浏览

spotify-scio - 使用 SCIO 将 pubsub 数据导出到对象存储

我正在尝试将 Cloud Pub/Sub 流导出到 Cloud Storage,如 Spotify 的这篇文章所述可靠地将 Cloud Pub/Sub 流导出到 Cloud Storage或 Google 的这篇文章使用 Cloud Pub/Sub、Cloud 简单备份和重播流事件存储和云数据流

PubSub 创建有界PCollection(或SCollection在 SCIO 中)但saveastextfile需要BoundedCollection.

有什么办法可以克服这个吗?

0 投票
1 回答
181 浏览

regex - 如何从名称包含日期的 .txt 文件中提取日期?(斯卡拉)

我有一个 .txt 文件作为我的梁编程项目的输入,使用 scala spotify scio。

input= args.getOrElse("input", "/home/user/Downloads/trade-20181001.txt")

如何从文件名中提取日期 2018-10-01(10 月 1 日)?谢谢!

0 投票
1 回答
2120 浏览

google-cloud-dataflow - Beam 管道在 GroupByKey 与窗口后不产生任何输出,我得到内存错误

目的:

我想加载流数据,然后添加一个键,然后按键计数。

问题:

当我尝试使用流式方法(无界数据)加载和按键分组大尺寸数据时,Apache Beam 数据流管道出现内存错误。因为似乎数据是按分组累积的,并且它不会在触发每个窗口时更早地触发数据。

如果我减小元素大小(元素数量不会改变),它会起作用!因为实际上 group-by step 等待所有数据被分组,然后触发所有新的窗口数据。

我对两者都进行了测试:

梁版本 2.11.0 和 scio 版本 0.7.4

梁版本 2.6.0 和 scio 版本 0.6.1

重新生成错误的方法:

  1. 读取包含文件名的 Pubsub 消息
  2. 从 GCS 读取并加载相关文件作为逐行迭代器
  3. 逐行展平(因此它会生成大约 10,000 个)元素
  4. 向元素添加时间戳(当前即时时间)
  5. 创建我的数据的键值(使用一些从 1 到 10 的随机整数键)
  6. 应用带触发的窗口(在行较小且没有内存问题的情况下会触发大约 50 次)
  7. 每个键计数(按键分组,然后组合它们)
  8. 最后,我们应该有大约 50 * 10 个元素来表示按窗口和键的计数(当行大小足够小时成功测试)

管道的可视化(步骤 4 到 7):

在此处输入图像描述

按键分组步骤摘要:

在此处输入图像描述

如您所见,数据是按组累积的,不会被发出。

窗口代码在这里:

错误:

是否有任何解决方案可以通过强制 group-by 发出每个窗口的早期结果来解决内存问题。

0 投票
1 回答
74 浏览

spotify-scio - 是否可以使用 scio JobTest 控制输入处理时间?

我们正在使用 com.spotify.scio.testing.JobTest 对我们的 scio 管道进行端到端测试。该管道包括一个对数据排序敏感的 DoFn,位于不常到达的配置数据流上。

我们将配置值的有序列表combinedSampleConfig作为输入传递给 JobTest Builder。当我们运行端到端测试时,有没有办法让 JobTest 保留这个 CustomIO 输入流的顺序?

我看到测试框架advanceProcessingTime在测试单个组件时可以很好地控制源到达时间(使用 ),但看不到如何使用 JobTest 将其应用于端到端测试。

0 投票
1 回答
1054 浏览

scala - Apache Beam - 无法运行 Scio g8 入门项目

我正在尝试开始使用 Scio,并且我使用了他们的 giter8 入门项目。https://github.com/spotify/scio.g8

我在 macOS 上使用 Java 8,尝试使用target/pack/bin/word-count --output=wc命令行或命令行运行项目时出现此错误:

堆栈跟踪:

我也试过java 12,同样的问题。

我做错了什么?

我怎样才能让它工作?