问题标签 [apache-beam-io]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
97 浏览

google-cloud-dataflow - 谷歌数据流仅部分解压缩使用 pbzip2 压缩的文件

然后我在两个压缩文件上运行以下管道。

这导致我的存储桶处于以下状态:

工作后的文件

如您所见,通过 pbzip2 压缩的未压缩文件太小而无法正确解压缩。似乎只有第一个块被解压缩,其余的被丢弃。

pbzip2 版本:

并行 BZIP2 v1.1.12 [2014 年 12 月 21 日]

bzip2 版本:

bzip2,一个块排序文件压缩器。1.0.6 版,2010 年 9 月 6 日。

我正在使用 2.0.0 版的数据流 SDK。

我有很多用 pbzip2 压缩的文件,我不想改变它们的压缩方式。

关于如何解决这个问题的任何建议?这甚至假设可以处理使用 pbzip2 压缩的文件吗?

0 投票
2 回答
4377 浏览

google-cloud-dataflow - 将无界集合写入 GCS

我看到很多关于同一主题的问题。但是,我仍然无法写入 GCS。我正在阅读来自 pubsub 的主题并尝试将其推送到 GCS。我已经提到了这个链接。但是,在最新的光束包中找不到 IOChannelUtils。

这是我从堆栈溢出中的许多其他类似主题中获取的。现在,我明白了,TextIO 确实支持带有 withWindowedWrites 和 withNumShards 的无界 PCollection 写入选项。

参考:使用 DoFn 使用 Cloud Dataflow 从 PubSub 写入 Google Cloud Storage

但是,我不明白,我应该怎么做。

我正在尝试按如下方式写信给 GCS。

我没有足够的分数来为 Stack Overflow 中的这些主题添加评论,因此我将其作为一个不同的问题提出。

0 投票
1 回答
564 浏览

apache-beam - Apache Beam mongodb 源

我有一个以 mongodb 作为源的梁管道,但是当我尝试运行它时,它会引发异常。

An exception occured while executing the Java class. null: InvocationTargetException: org.apache.beam.sdk.io.BoundedSource.getDefaultOutputCoder()Lorg/apache/beam/sdk/coders/Coder

这是我在主函数中运行的代码片段

0 投票
1 回答
70 浏览

apache-beam - BigtableIO 读取具有给定前缀的键

我正在寻找读取具有给定前缀的所有行的最佳方式。我看到有一个withKeyRange方法,BigTableIO.Read但它要求您指定一个开始键和一个结束键。有没有办法指定从前缀读取?

0 投票
1 回答
1699 浏览

java - 如何在 Apache Beam 中将文件读取为 byte[]?

我们目前正在研究 Cloud Dataflow 上的 Apache Beam Pipeline 概念验证。我们将一些文件(无文本;自定义二进制格式)放入 Google Cloud Buckets,并希望将这些文件读取为 byte[] 并在流程中反序列化它们。但是,我们找不到能够读取非文本文件的 Beam 源。唯一的想法是扩展FileBasedSource类,但我们相信应该有一个更简单的解决方案,因为这听起来像是一个非常简单的任务。

谢谢大家帮助。

0 投票
1 回答
885 浏览

google-cloud-dataflow - 在 DataStoreIO.Write 之后链接另一个转换

我正在使用 Apache Beam Java SDK 创建一个 Google 数据流管道。我在那里进行了一些转换,最后我创建了一个实体集合( PCollection< Entity > )。我需要将其写入 Google DataStore,然后在写入所有实体后执行另一个转换。(例如通过 PubSub Message 将已保存对象的 ID 广播给多个订阅者)。

现在,存储 PCollection 的方法是:entities.DatastoreIO.v1().write().withProjectId("abc")

这将返回一个 PDone 对象,我不确定如何在此 Write() 完成后链接另一个转换发生。由于 DatastoreIO.write() 调用不返回 PCollection,因此我无法进一步处理管道。我有 2 个问题:

  1. 如何获取写入数据存储的对象的 ID?

  2. 如何附加另一个在保存所有实体后将起作用的转换?

0 投票
1 回答
107 浏览

apache-beam - 不使用 Maven 执行 Apache Beam 程序

我想使用 Apache Spark 运行器运行一个简单的示例 Beam 程序。1)我能够在本地成功编译程序。2) 我想将 JAR 文件推送到未安装 Maven 的 QA 框中。3) 我看到了使用 Maven 命令编译和执行示例程序的示例。4) 请告诉我在不安装 Maven 的情况下运行代码的步骤。5) spark-submit 命令运行良好。6)要不要我把所有依赖的JAR文件一一放到/opt/mapr/spark/spark-2.1.0/jars目录下执行程序

谢谢。

0 投票
1 回答
342 浏览

google-app-engine - 带有 Google DatastoreIO 的 Apache Beam 2.1.0 对 GAE 中不存在的函数调用 Guava Preconditions checkArgument

在构建应从数据存储区读取的数据流模板时,我在堆栈驱动程序日志中收到以下错误(来自 Google App Engine):

java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;I)V at org.apache.beam.sdk.io.gcp.datastore.DatastoreV1$Read.withQuery(DatastoreV1. java:494) .... 我的代码

这发生在将从 Datastore 生成读取的行中。pom 依赖

参考

但是这个版本在类 Preconditions 中不包含方法 checkArgument(String string),我看过的任何其他版本也没有。如上所述,模板应该是在GAE柔性环境项目中构建并稍后执行,但是模板生成失败。

如果我让 main 函数在本地生成模板,它可以正常工作,但是一旦项目在 GAE 中,它就会失败。

任何输入都受到高度赞赏

编辑:com.google.guava 的依赖树:

更新:

添加后

并更新处理 DatastoreEntities 的函数,它似乎又可以工作了!很抱歉打扰,有时它只是有助于解决问题,而 stackoverflow 对此很有帮助。

0 投票
1 回答
700 浏览

google-cloud-dataflow - Apache Beam 模板:运行时上下文错误

我目前正在尝试创建基于 Apache Beam SDK v2.1.0 的数据流模板,如Google 教程

这是我的主要课程

如果我执行

如果我使用该方法,命令它正在工作

但不是如果

错误

0 投票
0 回答
281 浏览

google-cloud-datastore - 将 DataStoreIO.read 中的“DISTINCT”功能与 Apache Beam Java SDK 一起使用

我正在运行一个数据流作业(Apache Beam SDK 2.1.0 Java,Google 数据流运行器),我需要从 Google DataStore 中“清楚地”读取一个特定属性。(就像 SQL 中古老的“DISTINCT”关键字一样)。这是我的代码片段:

当管道运行时,read() 由于以下错误而失败:

java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: com.google.datastore.v1.client.DatastoreException:当按属性设置分组时,上的不等式过滤器也必须是按属性分组。,代码=INVALID_ARGUMENT

有人可以告诉我哪里出错了。