问题标签 [apache-beam-io]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
541 浏览

apache-beam - 是否可以按顺序读取文本文件?

beam.io.ReadFromText用来处理来自文本文件的数据。

解析文件比逐行读取更复杂(有一些状态需要携带并逐行更改)。

我可以让 Beam 只用一个处理器读取我的文件吗?(未并行化)这些情况下的任何其他最佳实践?

0 投票
1 回答
238 浏览

java - Apache Beam - BigQueryIO 读取投影

我有一个从 BigQuery 表中读取数据的数据流管道。但是,在读取数据时,除了使用read(SerializableFunction)readTableRows()方法读取所有记录之外别无选择。我想知道,在使用这些方法时,是否有提供类似列的投影的选项(类似于 HBaseIO Scan addColumn()过滤器)。

我知道从 BigQuery 加载数据时可以选择执行查询(使用readTableRows().fromQuery()),但我想知道是否有类似的选项可以在 HBaseIO 中进行投影。

0 投票
1 回答
410 浏览

tensorflow - 使用 Snappy 使用 BeamIO 进行张量流预处理时出现问题

在使用 Apache beamIO预处理数据时,snappy库是一个很好的压缩模块,但看起来文件转换似乎不起作用,因为它在库中找不到crc32压缩函数!我使用的是 snappy-0.5.2 版本

错误看起来像这样 -

如果有人可以帮助我正确使用带有 tensorflow 的 snappy!谢谢

0 投票
0 回答
368 浏览

google-cloud-platform - 当数据来自 Apache Beam JdbcIO 时,BigQuery 加载作业缓慢

我正在尝试使用 BigQuery 加载作业从我的 Apache Beam 管道向 BigQuery 添加行。我正在处理的初始数据来自 Postgresql 数据库,并使用JdbcIO数据源读入 Beam:

BigQuery 编写器:

当我执行管道时,我得到了数千个以下日志条目(我猜每行一个),这个过程需要很长时间(+10 分钟)。

我检查了谷歌云存储中的临时文件(BigQuery 加载作业需要),并注意到每个文件只包含 1 行的数据。

我对此很担心,因为当我使用相同的数据执行完全相同的操作,但从文件而不是数据库中读取时(使用TextIO),我只会得到几十个日志条目,而临时文件包含数千条 BigQuery 记录。在这种情况下,该过程在不到一分钟的时间内完成

我没有指定任何窗口或触发,因为我只是想读取一次源。

我想我必须启用某种批处理,但我不知道什么和在哪里。

0 投票
1 回答
1392 浏览

python - 如何在 Apache Beam python 中创建任务之间的依赖关系

我是 apache beam 的新手,正在探索 apache Beam 数据流的 python 版本。我想以特定顺序执行我的数据流任务,但它以并行模式执行所有任务。如何在 apache Beam python 中创建任务依赖项?

示例代码:(在下面的代码中 sample.json 文件包含 5 行)

我预计它将按照一、二、三、四的顺序执行。但它以并行模式运行。

上述代码的输出:

0 投票
1 回答
987 浏览

python - 带有 Pub/Sub 源的 Apache Beam Python SDK 在运行时卡住

我正在使用 Python SDK 在 Apache Beam 中编写一个程序,以从 Pub/Sub 读取 JSON 文件的内容,并对接收到的字符串进行一些处理。这是程序中我从 Pub/Sub 中提取内容并进行处理的部分:

运行程序时,代码在创建 PCollection 后卡住tupled(此后没有执行任何代码行)。奇怪的是,当我将源从 Pub/Sub 更改为包含完全相同内容的本地文件(使用ReadFromText())时,程序运行良好。这种行为的原因可能是什么?

0 投票
1 回答
847 浏览

google-cloud-platform - apache Beam 全局组合洗牌

在我的 apache 梁和数据流管道中,我做了一些需要全局组合操作的转换,例如 min 、 max 、自定义全局组合函数。pcollection 中要处理的项目数量为 2-40 亿。

我读到大多数组合操作都是建立在 groupBykey 之上的,这会导致 shuffle ,我相信这会使我当前的管道变慢,或者从 UI 中观察到,这是全局组合操作中最高的挂壁时间。我查看了代码, groupByKey 尝试为所有元素添加一个静态 void 键,然后执行 groupby ,这是否意味着我们正在改组数据(特别是当我们只有一个键时)?有没有办法有效地做到这一点

我自己理解的另一个问题:梁/数据流文档说密钥的所有元素都由单个工作线程/线程处理。以在整数的 pcollection 中查找最大值为例,此全局操作是完全可并行化的,其中我的组合器/累加器在数据的部分/子集上工作以找到最大值,然后在树中合并部分结果(合并两个最大值以获得最大值)像这样的结构,叶子的结果可以合并得到父节点,每个节点基本上可以以分布式的方式进行评估。那么究竟是什么操作强制一个键必须由一个工作线程/线程处理。似乎任何具有可交换和关联组合器的全局操作都可以轻松并行化。全局组合的哪一部分需要通过单个工作线程?

0 投票
2 回答
1774 浏览

google-cloud-dataflow - 将额外的输入传递给 ParDo

将其他输入传递给 ParDo 转换的选项是什么。就我而言,我需要将大约 5000 个字符串对象传递给我的 ParDo。据我了解,这些是我的选择:

a) 将其作为侧输入传递:但我想,像巨大的侧输入这样的传递可能会降低我的管道的性能

b)将其作为参数传递给我的 ParDo 类的构造函数,并将其保留为类成员:有人可以告诉我,在内部,这与将其作为侧面输入传递有什么不同?

还有其他方法可以将额外的输入传递给 ParDo 吗?

0 投票
3 回答
2797 浏览

python - 有没有办法使用 ReadFromText 转换(Python)在 Apache Beam 中读取多行 csv 文件?

ReadFromText有没有办法使用Python中的转换来读取多行 csv 文件?我有一个包含一行的文件,我试图让 Apache Beam 将输入读取为一行,但无法使其正常工作。

上面的代码将输入解析为两行,即使多行 csv 文件的标准是将多行元素包含在双引号中。

0 投票
2 回答
610 浏览

google-cloud-datastore - 如何使用多个工作人员加快批量导入谷歌云数据存储的速度?

我有一个基于 apache-beam 的数据流作业,可以使用vcf 源从单个文本文件(存储在谷歌云存储中)读取,将文本行转换为数据存储Entities并将它们写入数据存储接收器。工作流程运行良好,但我注意到的缺点是:

  • 写入数据存储的速度最多约为每秒 25-30 个实体。
  • 我尝试使用--autoscalingAlgorithm=THROUGHPUT_BASED --numWorkers=10 --maxNumWorkers=100,但执行似乎更喜欢一名工作人员(见下图:目标工作人员曾经增加到 2,但“基于在当前运行步骤中并行化工作的能力”减少到 1)。

我没有使用祖先路径作为键;所有的实体都是一样的kind

管道代码如下所示:

因为我有数百万行要写入数据存储区,所以以 30 个实体/秒的速度写入会花费很长时间。

问题:输入只是一个巨大的 gzip 文件。我是否需要将其拆分为多个小文件以触发多个工作人员?有没有其他方法可以加快导入速度?我错过了num_workers设置中的某些内容吗?谢谢!