问题标签 [apache-beam-io]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
1430 浏览

google-cloud-dataflow - 将 CoGroupByKey 与自定义类型一起使用会导致 Coder 错误

我想加入两个 PCollection(分别来自不同的输入)并按照此处描述的步骤实现,“加入 CoGroupByKey”部分: https ://cloud.google.com/dataflow/model/group-by-key

就我而言,我想加入 GeoIP 的“区块”信息和“位置”信息。因此,我将 Block 和 Location 定义为自定义类,然后编写如下:

键具有 Long 类型的值。我以为它已经完成了,但是当我运行时mvn compile,它会输出以下错误:

输出错误的确切 DoFn 是ExtractGeoNameIDBlock,它只是创建其键(要连接)和自身的键值对。

loadFromCsvLine只需解析 CSV 行,将字段转换为每个相应的类型并分配给它的私有字段。

所以看起来我需要为我的自定义类设置一些编码器才能使其工作。我找到了一个引用编码器的文档,但仍然不确定如何实现我的。 https://cloud.google.com/dataflow/model/data-encoding

有没有我可以遵循的真实示例来为我的自定义类创建自定义编码器?

[更新 13:02 09/26/2017] 我添加了

然后得到一个错误

[更新 14:05 09/26/2017] 我改变了这样的实现:

(将@Nullable 设置为所有属性)

但仍然出现此错误:

谢谢。

0 投票
0 回答
149 浏览

google-cloud-datastore - Google Datastore 库与 Apache Beam DatastoreIO 冲突

我正在使用谷歌数据流 SDK

和谷歌数据存储 SDK

使用 DatastoreIO.v1().write() 时出现以下冲突:

0 投票
1 回答
548 浏览

google-cloud-dataflow - 数据流 GroupBy -> 基于键的多个输出

有没有什么简单的方法可以根据 Group 键将 GroupBy 的输出重定向到多个输出文件中?

如果 Sink 是解决方案,请您与我分享示例代码吗?

谢谢!

0 投票
1 回答
4976 浏览

google-bigquery - Apache Beam:将具有对象列表的对象转换为多个 TableRows 以写入 BigQuery

我正在研究一个梁管道来处理一个 json 并将其写入 bigquery。JSON是这样的。

}

我使用杰克逊将其解析为以下结构。


现在在我的管道中,我将集合转换为

根据类中的属性将其写入不同的表。我已经写了一个转换来做到这一点。要求是根据消息对象的数量创建多个 TableRows。我在 JSON 中还有一些属性以及将添加到 tableRow 和每个消息属性的 publishDate。所以表格如下。

我试图创建以下转换。但是,不确定它将如何根据消息列表输出多行。

但是,这也将是一个列表,我将无法应用 BigQueryIO.write 转换。


根据“Eugene”又名@jkff 的评论更新

谢谢@jkff。现在,我已经更改了您在第二段中提到的代码。在将表行设置为之后,messages.forEach 中的 context.output(row)

现在,当我尝试将此集合写入 BigQuery 时,如

我收到以下异常。

请帮忙。

谢谢你。

0 投票
0 回答
249 浏览

apache-spark - CopyAndReset 必须返回一个零值副本

我想在 apache beam 提供的 spark 集群中运行 wordcount 示例。但是当我将这个项目提交给 spark 时,它会抛出这个异常。

与stackoverflow中的相同问题相同的问题 但是我的spark集群是v2.0.0,无法通过将spark版本更改为v1.6.0来解决。有什么想法可以在不更改 spark 版本的情况下解决这个问题吗?

0 投票
1 回答
2060 浏览

http - DoFn 中的 HTTP 客户端

我想通过 DoFn 为在 Dataflow 上运行的 Apache Beam 管道发出 POST 请求。

为此,我创建了一个客户端,它实例化了在 PoolingHttpClientConnectionManager 上配置的 HttpClosableClient。

但是,我为我处理的每个元素实例化了一个客户端。

如何设置所有元素都使用的持久客户端?

我应该使用其他用于并行和高速 HTTP 请求的类吗?

0 投票
1 回答
228 浏览

java - FileBasedSink 的类型参数是什么?

我正在迁移将 FileBasedSink 从版本 2.0.0 扩展到 2.2.0 的自定义接收器。该类已更改并添加了两个额外的类型参数:UserTDestinationT

我检查了FileBasedSink的文档,但找不到它的用途。

所有类型参数中只有OutputT一个文档:

0 投票
1 回答
688 浏览

google-cloud-dataflow - 谷歌云数据流 java API 不读取其他项目的 pubsub 主题

我只有一个在生产项目中创建的主题。我想在需要使用生产 pubsub 主题的开发环境中运行我的数据流作业。当我在开发项目中提交我的数据流作业时,它不工作,它总是显示在数据流 UI 中运行,但没有从 pubsub 读取任何元素。如果我提交生产项目,它会完美运行。

为什么它不读取来自不同项目主题的消息?我正在使用 java-sdk 2.1,跑步者是“dataflowrunner”

使用 mvn 提交数据流作业

注意:如果我使用的是 directrunner,它可以工作并使用来自不同项目 pubsub 主题的消息。 在此处输入图像描述

队列中没有添加任何元素,也没有估计大小。

0 投票
1 回答
5971 浏览

mysql - Google Dataflow (Apache beam) JdbcIO 批量插入 mysql 数据库

我正在使用 Dataflow SDK 2.X Java API(Apache Beam SDK)将数据写入 mysql。我创建了基于Apache Beam SDK 文档的管道,以使用数据流将数据写入 mysql。它一次插入单行,因为我需要实现批量插入。我在官方文档中找不到启用批量插入模式的任何选项。

想知道是否可以在数据流管道中设置批量插入模式?如果是,请让我知道我需要在下面的代码中更改什么。

0 投票
1 回答
689 浏览

google-bigquery - Apache 将 KafkaIO 偏移管理到外部数据存储

我正在尝试在 apache Beam 上使用 KafkaIO 从多个 kafka 经纪人那里读取数据。偏移管理的默认选项是 kafka 分区本身(不再使用 kafka >0.9 的 zookeper)。使用此设置,当我重新启动作业/管道时,存在重复和丢失记录的问题。

根据我的阅读,处理此问题的最佳方法是管理外部数据存储的偏移量。是否可以使用当前版本的 apache beam 和 KafkaIO 来做到这一点?我现在使用的是 2.2.0 版本。

而且,从 kafka 阅读后,我会将其写入 BigQuery。KafkaIO 中是否有设置,只有在将消息插入 BigQuery 后才能设置提交的消息?我现在只能找到自动提交设置。