问题标签 [apache-beam-io]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
redis - 有没有办法使用内置的 Apache Beam Redis I/O 转换执行 Redis GET 命令?
我对 Google Cloud Dataflow 的用例是在管道期间使用 Redis 作为缓存,因为要发生的转换取决于一些缓存的数据。这意味着执行 Redis GET 命令。官方的内置 Redis I/O 转换的文档提到支持一些方法:
阅读 - “提供一个源,它返回一个包含键/值对作为 KV 的有界 PCollection”
readAll - “可用于使用输入 PCollection 元素作为键模式(作为字符串)来请求 Redis 服务器”
看起来 readAll 不对应于 GET 命令,因为输入 PCollection 将用于过滤扫描整个 Redis 源的结果,所以这不是我想要的。
我想知道在查看可以启用我的用例的内置 I/O 转换时是否缺少某些东西,或者是否有支持它的开源 3rd 方 I/O 转换等替代方案。或者,这是否与 Apache Beam 根本不兼容?
apache-beam - Windows10 上的 Apache Beam Python wordcount 示例错误
我正在使用 Python 2.7 运行 Anaconda - conda virtual env 我在运行时遵循了 Apache Beam Python SDK Quickstart -
我收到以下错误:
非常感谢任何帮助。
python - Apache Beam I/O 转换
Apache Beam 文档Authoring I/O Transforms - Overview指出:
在 Beam 中读取和写入数据是一项并行任务,使用 ParDos、GroupByKeys 等通常就足够了。很少,您将需要更专业的 Source 和 Sink 类来实现特定功能。
有人可以提供一个非常基本的例子来说明如何在 Python 中做到这一点吗?
例如,如果我有一个包含 100 个 jpeg 图像的本地文件夹,我将如何:
- 使用 ParDos 读取/打开文件。
- 在图像上运行一些任意代码(也许将它们转换为灰度)。
- 使用 ParDos 将修改后的图像写入不同的本地文件夹。
谢谢,
protocol-buffers - 读取 Apache Beam 中的协议缓冲区文件
我在 GCS 中有一堆 protobuff 文件,我想通过数据流(java sdk)处理它们,但我不知道该怎么做。
Apache Beam 提供 AvroIO 来读取 avro 文件
读取 protobuff 文件有什么类似的吗?
提前致谢
java - Apache Beam 管道从 csv 文件读取、拆分、groupbyKey 和写入文本文件时出现“IllegalStateException”错误。为什么?
我的输入数据如下所示:
我想构建一个 Beam 管道,它将从 csv 文件中读取此数据,获取 vin 并计算 vin 在文件中出现的次数。所以我想按 vin 分组并计算计数。我希望我的最终输出在一个平面文件中。我错过了注释,所以我现在添加了它,但是我得到了一个不同的错误,我在这里也找不到解决方案。下面是我的代码。
我尝试使用以下命令运行程序:
我收到以下错误:
我无法理解我做错了什么。谁能帮帮我吗?
apache-spark - IN Apache Beam 如何在 Pipeline-IO 级别处理异常/错误
我在 Apache Beam 中使用正在运行的 spark runner 作为管道运行器,发现一个错误。通过得到错误,我提出了问题。我知道错误是由于 sql 查询中的 Column_name 不正确,但我的问题是如何处理 IO 级别的错误/异常
apache-beam - 你如何从Beam写入HDFS?
我正在尝试编写一个使用 SparkRunner 运行、从本地文件读取并写入 HDFS 的 Beam 管道。
这是一个最小的例子:
选项类 -
梁主类-
像这样运行它:
我期望发生的事情:它读取本地 testInput 文件中的行并将它们写入我的 hdfs 主目录中的新文件名 testOutput。
实际发生的情况:据我所知,什么也没有。Spark 说作业成功完成,我在日志中看到了 Beam 步骤,但是没有一个名为 testOutput 的文件或目录写入 hdfs 或我的本地目录。也许它是在火花执行器节点上本地编写的,但我无权访问这些节点来检查。
我猜要么我使用错误的 TextIO 接口,要么我需要做更多的事情来配置文件系统,而不仅仅是将它添加到我的 PipelineOptions 接口。但我找不到解释如何做到这一点的文档。
apache-beam - 如何从 Apache Beam 中的 HTTP 响应中读取大文件?
Apache Beam 的 TextIO 可用于读取某些文件系统中的 JSON 文件,但如何从 Java SDK 中的 HTTP 响应产生的大型 JSON (InputStream) 创建 PCollection?
google-cloud-dataflow - 在运行时读取多个文件(数据流模板)
我正在尝试构建一个数据流模板。
目标是阅读 ValueProvider ,它会告诉我要阅读哪些文件。然后对于每个文件,我需要使用对象读取和丰富数据。我试过这个:
但我有以下错误:
从 CoderRegistry 推断编码器失败:无法为 org.apache.beam.sdk.values.PCollection 提供编码器。
我很想找到一种比自己使用 gcs 客户端读取文件更好的方法。
你有什么建议吗 ?
此致