问题标签 [apache-beam-io]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
971 浏览

redis - 有没有办法使用内置的 Apache Beam Redis I/O 转换执行 Redis GET 命令?

我对 Google Cloud Dataflow 的用例是在管道期间使用 Redis 作为缓存,因为要发生的转换取决于一些缓存的数据。这意味着执行 Redis GET 命令。官方的内置 Redis I/O 转换的文档提到支持一些方法:

阅读 - “提供一个源,它返回一个包含键/值对作为 KV 的有界 PCollection”

readAll - “可用于使用输入 PCollection 元素作为键模式(作为字符串)来请求 Redis 服务器”

看起来 readAll 不对应于 GET 命令,因为输入 PCollection 将用于过滤扫描整个 Redis 源的结果,所以这不是我想要的。

我想知道在查看可以启用我的用例的内置 I/O 转换时是否缺少某些东西,或者是否有支持它的开源 3rd 方 I/O 转换等替代方案。或者,这是否与 Apache Beam 根本不兼容?

0 投票
1 回答
311 浏览

apache-beam - Windows10 上的 Apache Beam Python wordcount 示例错误

我正在使用 Python 2.7 运行 Anaconda - conda virtual env 我在运行时遵循了 Apache Beam Python SDK Quickstart -

我收到以下错误:

非常感谢任何帮助。

0 投票
1 回答
271 浏览

python - Apache Beam I/O 转换

Apache Beam 文档Authoring I/O Transforms - Overview指出:

在 Beam 中读取和写入数据是一项并行任务,使用 ParDos、GroupByKeys 等通常就足够了。很少,您将需要更专业的 Source 和 Sink 类来实现特定功能。

有人可以提供一个非常基本的例子来说明如何在 Python 中做到这一点吗?

例如,如果我有一个包含 100 个 jpeg 图像的本地文件夹,我将如何:

  1. 使用 ParDos 读取/打开文件。
  2. 在图像上运行一些任意代码(也许将它们转换为灰度)。
  3. 使用 ParDos 将修改后的图像写入不同的本地文件夹。

谢谢,

0 投票
0 回答
553 浏览

protocol-buffers - 读取 Apache Beam 中的协议缓冲区文件

我在 GCS 中有一堆 protobuff 文件,我想通过数据流(java sdk)处理它们,但我不知道该怎么做。

Apache Beam 提供 AvroIO 来读取 avro 文件

读取 protobuff 文件有什么类似的吗?

提前致谢

0 投票
2 回答
346 浏览

java - Apache Beam 管道从 csv 文件读取、拆分、groupbyKey 和写入文本文件时出现“IllegalStateException”错误。为什么?

我的输入数据如下所示:

我想构建一个 Beam 管道,它将从 csv 文件中读取此数据,获取 vin 并计算 vin 在文件中出现的次数。所以我想按 vin 分组并计算计数。我希望我的最终输出在一个平面文件中。我错过了注释,所以我现在添加了它,但是我得到了一个不同的错误,我在这里也找不到解决方案。下面是我的代码。

我尝试使用以下命令运行程序:

我收到以下错误:

我无法理解我做错了什么。谁能帮帮我吗?

0 投票
1 回答
768 浏览

apache-spark - IN Apache Beam 如何在 Pipeline-IO 级别处理异常/错误

我在 Apache Beam 中使用正在运行的 spark runner 作为管道运行器,发现一个错误。通过得到错误,我提出了问题。我知道错误是由于 sql 查询中的 Column_name 不正确,但我的问题是如何处理 IO 级别的错误/异常

0 投票
1 回答
1264 浏览

apache-beam - 你如何从Beam写入HDFS?

我正在尝试编写一个使用 SparkRunner 运行、从本地文件读取并写入 HDFS 的 Beam 管道。

这是一个最小的例子:

选项类 -

梁主类-

像这样运行它:

我期望发生的事情:它读取本地 testInput 文件中的行并将它们写入我的 hdfs 主目录中的新文件名 testOutput。

实际发生的情况:据我所知,什么也没有。Spark 说作业成功完成,我在日志中看到了 Beam 步骤,但是没有一个名为 testOutput 的文件或目录写入 hdfs 或我的本地目录。也许它是在火花执行器节点上本地编写的,但我无权访问这些节点来检查。

我猜要么我使用错误的 TextIO 接口,要么我需要做更多的事情来配置文件系统,而不仅仅是将它添加到我的 PipelineOptions 接口。但我找不到解释如何做到这一点的文档。

0 投票
1 回答
909 浏览

apache-beam - 如何从 Apache Beam 中的 HTTP 响应中读取大文件?

Apache Beam 的 TextIO 可用于读取某些文件系统中的 JSON 文件,但如何从 Java SDK 中的 HTTP 响应产生的大型 JSON (InputStream) 创建 PCollection?

0 投票
1 回答
1319 浏览

google-cloud-dataflow - 在运行时读取多个文件(数据流模板)

我正在尝试构建一个数据流模板。

目标是阅读 ValueProvider ,它会告诉我要阅读哪些文件。然后对于每个文件,我需要使用对象读取和丰富数据。我试过这个:

但我有以下错误:

从 CoderRegistry 推断编码器失败:无法为 org.apache.beam.sdk.values.PCollection 提供编码器。

我很想找到一种比自己使用 gcs 客户端读取文件更好的方法。

你有什么建议吗 ?

此致

0 投票
2 回答
155 浏览

java - 带有 MessagePack 的 Apache Beam - 如何从 Map 获取值?

在 Apache Beam 转换中,我可以成功地将我从 Google Cloud Pub/Sub 读取的 pub/sub 值(以 MessagePack 格式)转换为 MessagePackValue对象的映射,如下所示:

当我检查时,map我可以看到以下内容:

在此处输入图像描述

如果我然后尝试获取这样的值,它总是返回 null:

我如何获得价值?我需要以不同的方式转换值,还是需要以不同的方式检索它们?