问题标签 [spotify-scio]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - 使用 Scio 将 SCollection 从 textFile 放入 BigQuery
我已经阅读了一些文档textFile
,并做了flatMap
一个单词,为每个单词添加了一些额外的信息:
我目前正在轻松地将其保存为文本
但我不知道如何将地图保存到 BigQuery 架构。我见过的所有示例都从 BigQuery 创建初始 Scollection,然后将其保存到另一个表中,因此初始集合[TableRow]
不是[String]
.
这里的正确方法是什么?我是否应该调查如何将我的数据转换为 Big Query 可以接受的一种集合?或者我应该尝试进一步调查如何将这个纯文本直接推送到表格中?
google-cloud-dataflow - 如何处理 CoderException: cannot encode a null String with scio
我刚开始使用 scio 和数据流。将我的代码尝试到一个输入文件中,效果很好。但是当我向输入中添加更多文件时,出现以下异常:
我想我的一个输入文件可能包含一些格式错误的数据。但是如何绕过坏数据呢?Java Beam com.google.cloud.dataflow.sdk.coders.CoderException 有一个类似的问题:无法编码空字符串 所以我尝试了这个:
它没有帮助。有人可以帮我吗?谢谢。
google-cloud-storage - 数据流 TextIO.write 缩放问题
我创建了一个简单的数据流管道,它从 pubsub 读取字节数组,将它们窗口化,然后写入 GCS 中的文本文件。我发现对于流量较低的主题,这非常有效,但是我在一个每分钟大约 2.4GB 的主题上运行它,并且开始出现一些问题。
在启动管道时,我没有设置工作人员的数量(正如我想象的那样,它会根据需要自动扩展)。在摄取这么多数据时,worker 的数量保持在 1,但 TextIO.write() 需要 15 分钟以上才能写入 2 分钟的窗口。这将继续备份,直到内存不足。当这一步得到如此备份时,Dataflow 不自动扩展是否有充分的理由?
当我将工作人员的数量增加到 6 个时,写入文件的时间从 4 分钟左右开始,持续 5 分钟,然后下降到 20 秒。
另外,当使用 6 名工人时,计算挂墙时间似乎有问题?即使数据流已经赶上并且运行 4 小时后,我的写入步骤摘要看起来像这样,我的似乎也永远不会下降:
职位编号:2019-03-13_19_22_25-14107024023503564121
join - 将批处理数据与存储在 BigTable 中的数据连接起来
我在 GCS 中有不断增长的数据,并且将有一个批处理作业运行,可以说每天处理 100 万篇文章增量。我需要从 BigTable(包含数十亿条记录)中获取键的附加信息。对地图操作中的每个项目进行查找是否可行?批处理这些查找并执行诸如批量读取之类的操作是否有意义?或者使用 scio/beam 的这个用例的最佳方法是什么?
我在Pattern: Streaming mode large lookup tables中发现,对每个请求执行查找是推荐的流式方法,但是我不确定我是否不会通过批处理作业重载 BigTable。
你们对如何处理这个用例有任何总体或具体的建议吗?
java - DataFlow (Apache Beam) 中 Pub/Sub 的自定义时间戳和窗口化
我想使用 Apache Beam 中的流管道(并在 Google DataFlow 上运行)来实现以下场景:
- 从 Pub/Sub 读取消息(JSON 字符串)
- 反序列化 JSON
- 使用自定义字段(比如
timeStamp
)作为处理元素的时间戳值 - 应用固定窗口
60 seconds
- 从元素中提取密钥并按密钥分组
- <<进行进一步处理>>
我尝试使用 Java(Scala) 和 Python 解决这个问题,但没有一个解决方案有效。
- Python解决方案
add_timestamping
根据文档功能:
Python解决方案的输出:
- 使用时
DirectRunner
,会发出窗口,并且窗口本身或多或少是合适的,具体取决于延迟。 - 使用时
DataFlowRunner
,会跳过所有消息,并在 DataFlow UI 中显示计数器:dropDueToLateness。
- Java / Scala 解决方案 (我使用过Scio,但这也发生在 Java 中的干净 Beam SDK 中)
根据文档添加自定义时间戳:
Java / Scala 解决方案的输出:
我不能DoFn.getAllowedTimestampSkew
在这里使用,因为它已经被弃用了,而且我不知道将发送什么范围的历史数据。
处理历史数据的能力对我的项目至关重要(这些数据将从某个商店发送到 Pub/Sub)。管道必须同时处理当前数据和历史数据。
我的问题是: 如何使用自定义时间戳处理数据,并能够在使用 Beam API 定义的窗口上进行操作?
spotify-scio - 使用 SCIO 将 pubsub 数据导出到对象存储
我正在尝试将 Cloud Pub/Sub 流导出到 Cloud Storage,如 Spotify 的这篇文章所述可靠地将 Cloud Pub/Sub 流导出到 Cloud Storage或 Google 的这篇文章使用 Cloud Pub/Sub、Cloud 简单备份和重播流事件存储和云数据流
PubSub 创建有界PCollection
(或SCollection
在 SCIO 中)但saveastextfile
需要BoundedCollection
.
有什么办法可以克服这个吗?
regex - 如何从名称包含日期的 .txt 文件中提取日期?(斯卡拉)
我有一个 .txt 文件作为我的梁编程项目的输入,使用 scala spotify scio。
input= args.getOrElse("input", "/home/user/Downloads/trade-20181001.txt")
如何从文件名中提取日期 2018-10-01(10 月 1 日)?谢谢!
google-cloud-dataflow - Beam 管道在 GroupByKey 与窗口后不产生任何输出,我得到内存错误
目的:
我想加载流数据,然后添加一个键,然后按键计数。
问题:
当我尝试使用流式方法(无界数据)加载和按键分组大尺寸数据时,Apache Beam 数据流管道出现内存错误。因为似乎数据是按分组累积的,并且它不会在触发每个窗口时更早地触发数据。
如果我减小元素大小(元素数量不会改变),它会起作用!因为实际上 group-by step 等待所有数据被分组,然后触发所有新的窗口数据。
我对两者都进行了测试:
梁版本 2.11.0 和 scio 版本 0.7.4
梁版本 2.6.0 和 scio 版本 0.6.1
重新生成错误的方法:
- 读取包含文件名的 Pubsub 消息
- 从 GCS 读取并加载相关文件作为逐行迭代器
- 逐行展平(因此它会生成大约 10,000 个)元素
- 向元素添加时间戳(当前即时时间)
- 创建我的数据的键值(使用一些从 1 到 10 的随机整数键)
- 应用带触发的窗口(在行较小且没有内存问题的情况下会触发大约 50 次)
- 每个键计数(按键分组,然后组合它们)
- 最后,我们应该有大约 50 * 10 个元素来表示按窗口和键的计数(当行大小足够小时成功测试)
管道的可视化(步骤 4 到 7):
按键分组步骤摘要:
如您所见,数据是按组累积的,不会被发出。
窗口代码在这里:
错误:
是否有任何解决方案可以通过强制 group-by 发出每个窗口的早期结果来解决内存问题。
spotify-scio - 是否可以使用 scio JobTest 控制输入处理时间?
我们正在使用 com.spotify.scio.testing.JobTest 对我们的 scio 管道进行端到端测试。该管道包括一个对数据排序敏感的 DoFn,位于不常到达的配置数据流上。
我们将配置值的有序列表combinedSampleConfig
作为输入传递给 JobTest Builder。当我们运行端到端测试时,有没有办法让 JobTest 保留这个 CustomIO 输入流的顺序?
我看到测试框架advanceProcessingTime
在测试单个组件时可以很好地控制源到达时间(使用 ),但看不到如何使用 JobTest 将其应用于端到端测试。
scala - Apache Beam - 无法运行 Scio g8 入门项目
我正在尝试开始使用 Scio,并且我使用了他们的 giter8 入门项目。https://github.com/spotify/scio.g8
我在 macOS 上使用 Java 8,尝试使用target/pack/bin/word-count --output=wc
命令行或命令行运行项目时出现此错误:
堆栈跟踪:
我也试过java 12,同样的问题。
我做错了什么?
我怎样才能让它工作?