问题标签 [apache-beam-io]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
3 回答
2467 浏览

google-cloud-dataflow - 使用 Apache Beam 从数据库中读取批量数据

我想知道,如果我的查询返回数百万行,JdbcIO 将如何并行执行查询。我已经提到了https://issues.apache.org/jira/browse/BEAM-2803和相关的拉取请求。我完全无法理解。

ReadAll expand方法使用ParDo. 因此,它会创建到数据库的多个连接以并行读取数据吗?如果我限制可以创建到数据源中数据库的连接数,它会坚持连接限制吗?

谁能帮我理解这将如何处理JdbcIO?我在用2.2.0

更新 :

上面的代码显示 ReadFn 与 ParDo 一起应用。我认为,ReadFn 将并行运行。如果我的假设是正确的,我将如何使用该readAll()方法从一次只能建立有限数量的连接的数据库中读取?

谢谢巴鲁

0 投票
0 回答
480 浏览

hive - Apache Hive 与 Apache Beam 的集成

我正在做一个POC连接到 Apache Beam 管道中的 Apache Hive 的操作,我得到了类似于下面的 SO 链接的异常。我确实将版本更改为JDBC driver最新版本。但仍然面临这个问题。

如以下链接中所述,这是由于集群问题。如果有人可以清楚地帮助我解决这个问题,我可以指导我的相应团队解决这个问题并解决它。

如果您需要任何其他信息,我会为您提供。

Apache Beam - org.apache.beam.sdk.util.UserCodeException:java.sql.SQLException:无法创建 PoolableConnectionFactory(不支持方法)

0 投票
1 回答
1284 浏览

google-cloud-platform - 每个元素使用 Apache Beam 流式写入 gcs

当前的光束管道正在使用FileIO.matchAll().continuously(). 这将返回 PCollection 。我想用相同的名称将这些文件写回另一个 gcs 存储桶,即每个PCollection文件都是一个文件metadata/readableFile,经过一些处理后应该写回另一个存储桶。我应该使用任何接收器来实现将每个PCollection项目写回 GCS 还是有什么方法可以做到这一点?是否可以为每个元素创建一个窗口,然后使用一些 GCS 接收器 IO 来执行此操作。在窗口上操作时(即使它有多个元素),Beam 是否保证窗口被完全处理或根本不被处理,换句话说,GCS or bigquery对于给定窗口的写入操作是原子的而不是部分的,以防出现任何故障?

0 投票
2 回答
1010 浏览

google-cloud-dataflow - 从 Beam 中的另一个管道读取泡菜?

我在 Google Cloud Dataflow 中运行批处理管道。我需要在一个管道中读取另一个管道先前写入的对象。最简单的 wa 对象是泡菜/莳萝。

写作效果很好,编写了许多文件,每个文件都有一个腌制对象。当我手动下载文件时,我可以解压文件。编写代码:beam.io.WriteToText('gs://{}', coder=coders.DillCoder())

但是每次读数都会中断,并出现以下错误之一。阅读代码:beam.io.ReadFromText('gs://{}*', coder=coders.DillCoder())

任何一个...

...或者...

(对象的类位于带有 的路径中measure,但不确定为什么它错过了那里的最后一个字符)

我尝试使用默认编码器,以及 aBytesCoder和 pickling & unpickling 作为管道中的自定义任务。

我的工作假设是读者逐行拆分文件,因此将单个泡菜(其中包含新行)视为多个对象。如果是这样,有没有办法避免这种情况?

我可以尝试自己构建一个阅读器,但我很犹豫,因为这似乎是一个很好解决的问题(例如,Beam 已经有一种格式可以将对象从一个管道阶段移动到另一个阶段)。

切向相关:如何在 Google Cloud DataFlow 作业中从 GCS 读取 blob(pickle)文件?

谢谢!

0 投票
1 回答
696 浏览

google-cloud-platform - apache 束流管道来观看 gcs 文件正则表达式

我有一个流束管道,我尝试在其中监控多个 glob/regex 模式。这些模式中很少有文件匹配,并且将来会生成很少的模式。

预期的行为是将现有文件与提供的模式匹配,并监视它们以查看是否正在将与这些模式匹配的新文件写入 GCS。我强制执行的终止条件是,如果生成的最后一个与特定模式匹配的文件是一个多小时前生成的,则不要尝试匹配模式。观察到的行为是我们匹配了很多文件,但是在获得无限 f 之后的转换根本没有被执行。日志只显示

我给出了 2 种不同的正则表达式模式来监视。现有的正则表达式模式之一已经有大约 500k 文件匹配,并且每分钟都在添加更多文件,我从未看到过输出,只是上面的日志行。第二个正则表达式模式匹配 0 个文件(在启动管道时),但一旦在未来某个时间点开始与新出现的文件匹配,这些输出文件就会被写入 gcs。

有人可以解释这种行为吗,如果我连续正确使用匹配。我没有在这里创建任何窗口,因为我的用例非常简单,流文件 -> 读取文件 -> 过滤一些事件 -> 将这些文件写回一些 gcs 存储桶。

0 投票
2 回答
2335 浏览

python - 在 Beam 中读取和写入序列化的 protobuf

我想将序列化的protobuf消息的PCollection写入文本文件并将它们读回应该很容易。但经过几次尝试,我没有这样做。如果有人有任何意见,将不胜感激。

我有下面的 python 代码,它实现了一个简单的 Beam 管道,将文本写入序列化的 protobufs。

管道可以成功运行并生成一个包含内容的文件:

然后我编写另一个管道来读取序列化的 protobuf 并使用ParDo.

运行时收到此错误消息。

所以看起来无法解析序列化的 protobuf 字符串。我错过了什么吗?谢谢你的帮助!

0 投票
1 回答
301 浏览

google-cloud-dataflow - 如何在 Java 中将运行时参数传递给 BigtableIO?

根据此页面,BigtableIO 不支持运行时参数(仅适用于 BigQuery、PubSub 和 Text)。在不重新实现类的情况下是否有可能的解决方法或示例?

实际上,我使用的是来自 bigtable-hbase-beam 依赖项的 CloudBigtableIO。是否期望在某个时候同时支持这两个库?

0 投票
1 回答
904 浏览

python - 在 Google App Engine Flex 上运行 Apache Beam 时,“模块”对象没有属性“WriteToBigQuery”

我有一个触发 Cloud DataFlow 管道的 Google App Engine。该管道应该将最终的 PCollection 写入 Google BigQuery,但我找不到安装正确 apache_beam.io 依赖项的方法。

我在本地运行 Apache Beam 2.2.0 版。

项目结构遵循此博客文章中的代码。

这是相关的代码:

当我在本地运行此代码时,beam.io.WriteToBigQuery()会正确调用。它是apache_beam/io/gcp/bigquery.py从我的虚拟环境中获取的。

但是我无法lib在部署应用程序随附的文件夹上安装此依赖项。

即使我有一个包含apache-beam[gcp]==2.2.0作为要求的要求文件,但当我运行时pip install -r requirements.txt -t libapache_beam/io/gcp/bigquery.py下载到我的lib文件夹中的文件不包含 class WriteToBigQuery,然后'module' object has no attribute 'WriteToBigQuery'在 Google App Engine 上运行应用程序时出现错误。

有谁知道我怎样才能得到正确的bigquery.py

0 投票
1 回答
767 浏览

python - Apache Beam 数据流使用 splittable=True 读取大 CSV 导致重复条目

我使用下面的代码片段将 CSV 文件作为字典读入管道。

这个片段几乎是从How to convert csv into a dictionary in apache beam dataflow (Pablo 的回答)中的一篇文章中复制过来的。

后来我注意到这一切都适用于相对较小的文件(例如 35k 行)。但是对于更大的文件,例如。70 万行,我看到在输出 (BigQuery) 中生成了重复项。几乎是 5 倍,所以我最终得到了超过 3M 行。

我仔细查看beam.io.filebasedsource.FileBasedSource并看到默认splitted设置为的参数。True

文档是这样说的:

当参数设置为时,True它能够并行读取源文件。

我注意到,如果我将此参数设置为False,则文件读取正常并且我没有重复。

目前我将此splittable参数设置为,False因为它可以防止重复,但我不确定当我的文件将成行增长时,这是否是未来的证据。

并行读取源文件是否可能存在一些问题?有什么我忽略或没有以正确的方式处理的事情吗?

0 投票
2 回答
1162 浏览

python-2.7 - 在 Python 中的数据流中写入到云存储的动态目标

我试图从云存储中的一个大文件中读取并根据给定的字段对它们进行分片。

我打算阅读 | 映射(lambda x: (x[key field], x)) | GroupByKey | 使用关键字段的名称写入文件。

但是我找不到动态写入云存储的方法。是否支持此功能?

谢谢你,一清