问题标签 [apache-beam-io]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
442 浏览

google-cloud-dataflow - apache Beam bigtable 可迭代突变

我正在将我的谷歌数据流 java 1.9 迁移到梁 2.0,我正在尝试使用 BigtableIO.Write

在 BigtableIO 之前的 ParDo 中,我正在努力使 Iterable 成为可能。

上面的代码抛出以下异常 InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag

v 是一个对象列表(Vitals.class)。hbase api 使用 Put 方法来创建突变。如何创建一个可与 BigtableIO 接收器一起使用的 BigTable 突变?

0 投票
1 回答
3503 浏览

java - 简单的 Apache Beam 操作工作非常缓慢

我对 Apache Beam 非常陌生,而且我的 Java 技能相当低,但我想了解为什么我的简单条目操作在 Apache Beam 上运行如此缓慢。

我正在尝试执行以下操作:我有一个 CSV 文件,其中包含以下方案的 100 万条记录(Alexa 排名前 100 万的站点):(NUMBER,DOMAIN例如1,google.com),我想“剥离”第一个(数字)字段和仅获取域部分。我的这个管道的代码如下:

当我用 Maven 执行这段代码时,在我的笔记本电脑上需要超过四分钟才能成功:

cut(1)虽然在你眨眼之前简单的工作:

那么,这种 Apache Beam 行为是否被认为是可以接受的(可能它在处理大量数据时会更好地工作)还是我的代码效率低下?

2014 年 1 月 7 日更新:

正如 Kenn Knowles建议的那样,我尝试在其他运行器上运行管道,而不是DirectRunnerDataflowRunner. 所以更新后的代码如下所示:

与直接运行器相比,在 Google Dataflow 上运行的运行时间更短,但仍然足够慢 - 多于3 分钟

谷歌数据流作业

Google 数据流作业日志

0 投票
1 回答
86 浏览

bigtable - 为什么 BigtableIO 在 GroupBy/Combine DoFn 之后会一一写入记录?

有人知道捆绑包在 BigtableIO 中是如何工作的吗?GroupBy在使用或CombineDoFn之前,一切看起来都很好。此时,管道会将我们PCollection元素的窗格从更改PaneInfo.NO_FIRINGPaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0},然后BigtableIO将输出以下日志INFO o.a.b.sdk.io.gcp.bigtable.BigtableIO - Wrote 1 records。当有数百万条记录要输出时,日志记录是否会导致性能问题,或者是否BigtableIO为每条记录打开和关闭写入器?

0 投票
1 回答
1871 浏览

java - Apache Beam - 无法使用 hadoop-file-system sdk 从 S3 读取文本文件

我正在尝试从使用 beam-sdks-java-io-hadoop-file-system v2.0.0 和 Spark 作为运行器的梁应用程序中的 AWS EMR 集群中读取 S3。我可以在纱线日志中看到管道能够检测到 S3 中存在的文件,但它无法读取该文件。请参阅下面的日志。


17/06/27 03:29:33 INFO BlockManagerInfo:在内存中添加了广播_0_piece0 ip-10-130-237-237.vpc.internal:40063(大小:4.6 KB,免费:3.5 GB)17/06/27 03 :29:36 WARN TaskSetManager:在 0.0 阶段丢失任务 0.0(TID 0,ip-10-130-237-237.vpc.internal):java.lang.RuntimeException:读取数据失败。在 org.apache.beam.runners.spark.io.SourceRDD$Bounded$ReaderToIteratorAdapter.tryProduceNext(SourceRDD.java:198) 在 org.apache.beam.runners.spark.io.SourceRDD$Bounded$ReaderToIteratorAdapter.hasNext(SourceRDD. java:239) 在 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) 在 org.apache.spark.storage.MemoryStore 的 scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:41) .unrollSafely(MemoryStore.scala:284) 在 org.apache.spark.CacheManager。

当我使用输入文件系统运行相同的代码时HDFS,它可以完美运行。有人可以帮我弄清楚如何从 S3 读取数据吗?输入格式是 gzip 压缩的文本文件。

代码:

使用 S3 运行:

使用 HDFS 运行:

0 投票
1 回答
895 浏览

python - 从 pub/sub 流式传输到大查询 python 时出错

通过插入以下两个,我无法创建将 pub/sub 源连接到大型查询接收器的 dataflowRunner 作业:

分别进入 beam/sdks/python/apache_beam/examples/streaming_wordcount.py 中的第 59 行和第 74 行(https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/streaming_wordcount.py) github上的例子。删除第 61-70 行并指定正确的 pub/sub 和 bigquery 参数后,脚本运行时不会出现错误,不会构建管道。

旁注:脚本提到流管道支持不可用于 Python。但是,在梁文档中提到 apache_beam.io.gcp.pubsub.PubSubSource 仅可用于流式传输(“apache_beam.io.gcp.pubsub 模块”标题下的第一句:https ://beam.apache.org/documentation /sdks/pydoc/2.0.0/apache_beam.io.gcp.html#module-apache_beam.io.gcp.pubsub

0 投票
2 回答
1439 浏览

python - 在 Beam 管道中以编程方式生成 BigQuery 架构

我有一组同质字典,如何在不知道架构的情况下将它们写入 BigQuery?

BigQuerySink 要求我在构造它时指定架构。但是,我不知道架构:它是由我要编写的字典的键定义的。

有没有办法让我的管道推断架构,然后将其提供回(作为侧输入?)到接收器?

例如:

但是,我如何将架构作为参数提供给 BigQuerySink,并在 beam.io.Write 中使用它?

我知道这是不正确的,但我想做的是:

tl;dr 有没有办法从 apache Beam 以编程方式从数据中推断架构来创建和编写 bigquery 表?

0 投票
1 回答
2278 浏览

google-cloud-dataflow - Apache Beam - org.apache.beam.sdk.util.UserCodeException:java.sql.SQLException:无法创建 PoolableConnectionFactory(不支持方法)

我正在尝试使用 Apache Beam-dataflow 连接到安装在云实例中的配置单元实例。当我运行它时,我得到了以下异常。当我使用 Apache Beam 访问这个数据库时,就会发生这种情况。我见过许多与 apache Beam 或 google 数据流无关的相关问题。

使用相同的连接字符串和驱动程序文件,我可以使用普通的 java-jdbc 程序连接到这个实例。

现在对此进行了一段时间的窃听,我无法找到解决方案。任何人都可以对此提出任何想法吗?

请参阅下面连接到 hive 的代码片段:

0 投票
0 回答
686 浏览

google-cloud-dataflow - Dataflow Worker 无法通过 Cloud VPN 连接到 Kafka

我在将 KafkaIO 源连接到只能通过 Cloud VPN 隧道使用的代理时遇到问题。

隧道设置为允许来自特定子网 ( secure) 的流量,并且路由已设置并适用于该子网中的计算引擎。

使用 KafkaIO 执行管道DirectRunner能够连接到代理,无论是通过secure子网中标准计算引擎上的 VPN,还是通过具有 ssh 隧道设置的本地机器sshuttle

运行与DataflowRunner代理的连接的管道失败: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata。管道在secure子网中执行。

连接到作业跨越的计算引擎实例,可以看到以下路由:

代理的 IPv4 地址位于172.17.0.0/16(远程)网络内。VPN 配置了远程网络范围172.16.0.0/12

远程172.17.0.0/16网络是否会被虚拟网络设置遮蔽并被 docker 使用?

0 投票
1 回答
1336 浏览

google-cloud-dataflow - 读取大量文件时如何提高 TextIO 或 AvroIO 的性能?

TextIO.read()并且AvroIO.read()(以及其他一些 Beam IO)默认情况下在当前 Apache Beam 运行器中在读取扩展为大量文件(例如 1M 文件)的文件模式时表现不佳。

如何有效地读取如此大量的文件?

0 投票
1 回答
1947 浏览

google-cloud-dataflow - Apache Beam 管道中的组元素

我有一个从 AVRO 文件中解析记录的管道。

我需要将传入的记录拆分为 500 个项目的块,以便调用同时接受多个输入的 API。

有没有办法用 Python SDK 做到这一点?