问题标签 [apache-beam-io]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
google-cloud-dataflow - apache Beam bigtable 可迭代突变
我正在将我的谷歌数据流 java 1.9 迁移到梁 2.0,我正在尝试使用 BigtableIO.Write
在 BigtableIO 之前的 ParDo 中,我正在努力使 Iterable 成为可能。
上面的代码抛出以下异常 InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag
v 是一个对象列表(Vitals.class)。hbase api 使用 Put 方法来创建突变。如何创建一个可与 BigtableIO 接收器一起使用的 BigTable 突变?
java - 简单的 Apache Beam 操作工作非常缓慢
我对 Apache Beam 非常陌生,而且我的 Java 技能相当低,但我想了解为什么我的简单条目操作在 Apache Beam 上运行如此缓慢。
我正在尝试执行以下操作:我有一个 CSV 文件,其中包含以下方案的 100 万条记录(Alexa 排名前 100 万的站点):(NUMBER,DOMAIN
例如1,google.com
),我想“剥离”第一个(数字)字段和仅获取域部分。我的这个管道的代码如下:
当我用 Maven 执行这段代码时,在我的笔记本电脑上需要超过四分钟才能成功:
cut(1)
虽然在你眨眼之前简单的工作:
那么,这种 Apache Beam 行为是否被认为是可以接受的(可能它在处理大量数据时会更好地工作)还是我的代码效率低下?
2014 年 1 月 7 日更新:
正如 Kenn Knowles建议的那样,我尝试在其他运行器上运行管道,而不是DirectRunner
在DataflowRunner
. 所以更新后的代码如下所示:
与直接运行器相比,在 Google Dataflow 上运行的运行时间更短,但仍然足够慢 - 多于3 分钟:
bigtable - 为什么 BigtableIO 在 GroupBy/Combine DoFn 之后会一一写入记录?
有人知道捆绑包在 BigtableIO 中是如何工作的吗?GroupBy
在使用或Combine
DoFn之前,一切看起来都很好。此时,管道会将我们PCollection
元素的窗格从更改PaneInfo.NO_FIRING
为PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}
,然后BigtableIO
将输出以下日志INFO o.a.b.sdk.io.gcp.bigtable.BigtableIO - Wrote 1 records
。当有数百万条记录要输出时,日志记录是否会导致性能问题,或者是否BigtableIO
为每条记录打开和关闭写入器?
java - Apache Beam - 无法使用 hadoop-file-system sdk 从 S3 读取文本文件
我正在尝试从使用 beam-sdks-java-io-hadoop-file-system v2.0.0 和 Spark 作为运行器的梁应用程序中的 AWS EMR 集群中读取 S3。我可以在纱线日志中看到管道能够检测到 S3 中存在的文件,但它无法读取该文件。请参阅下面的日志。
17/06/27 03:29:33 INFO BlockManagerInfo:在内存中添加了广播_0_piece0 ip-10-130-237-237.vpc.internal:40063(大小:4.6 KB,免费:3.5 GB)17/06/27 03 :29:36 WARN TaskSetManager:在 0.0 阶段丢失任务 0.0(TID 0,ip-10-130-237-237.vpc.internal):java.lang.RuntimeException:读取数据失败。在 org.apache.beam.runners.spark.io.SourceRDD$Bounded$ReaderToIteratorAdapter.tryProduceNext(SourceRDD.java:198) 在 org.apache.beam.runners.spark.io.SourceRDD$Bounded$ReaderToIteratorAdapter.hasNext(SourceRDD. java:239) 在 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) 在 org.apache.spark.storage.MemoryStore 的 scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:41) .unrollSafely(MemoryStore.scala:284) 在 org.apache.spark.CacheManager。
当我使用输入文件系统运行相同的代码时HDFS
,它可以完美运行。有人可以帮我弄清楚如何从 S3 读取数据吗?输入格式是 gzip 压缩的文本文件。
代码:
使用 S3 运行:
使用 HDFS 运行:
python - 从 pub/sub 流式传输到大查询 python 时出错
通过插入以下两个,我无法创建将 pub/sub 源连接到大型查询接收器的 dataflowRunner 作业:
分别进入 beam/sdks/python/apache_beam/examples/streaming_wordcount.py 中的第 59 行和第 74 行(https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/streaming_wordcount.py) github上的例子。删除第 61-70 行并指定正确的 pub/sub 和 bigquery 参数后,脚本运行时不会出现错误,不会构建管道。
旁注:脚本提到流管道支持不可用于 Python。但是,在梁文档中提到 apache_beam.io.gcp.pubsub.PubSubSource 仅可用于流式传输(“apache_beam.io.gcp.pubsub 模块”标题下的第一句:https ://beam.apache.org/documentation /sdks/pydoc/2.0.0/apache_beam.io.gcp.html#module-apache_beam.io.gcp.pubsub)
python - 在 Beam 管道中以编程方式生成 BigQuery 架构
我有一组同质字典,如何在不知道架构的情况下将它们写入 BigQuery?
BigQuerySink 要求我在构造它时指定架构。但是,我不知道架构:它是由我要编写的字典的键定义的。
有没有办法让我的管道推断架构,然后将其提供回(作为侧输入?)到接收器?
例如:
但是,我如何将架构作为参数提供给 BigQuerySink,并在 beam.io.Write 中使用它?
我知道这是不正确的,但我想做的是:
tl;dr 有没有办法从 apache Beam 以编程方式从数据中推断架构来创建和编写 bigquery 表?
google-cloud-dataflow - Apache Beam - org.apache.beam.sdk.util.UserCodeException:java.sql.SQLException:无法创建 PoolableConnectionFactory(不支持方法)
我正在尝试使用 Apache Beam-dataflow 连接到安装在云实例中的配置单元实例。当我运行它时,我得到了以下异常。当我使用 Apache Beam 访问这个数据库时,就会发生这种情况。我见过许多与 apache Beam 或 google 数据流无关的相关问题。
使用相同的连接字符串和驱动程序文件,我可以使用普通的 java-jdbc 程序连接到这个实例。
现在对此进行了一段时间的窃听,我无法找到解决方案。任何人都可以对此提出任何想法吗?
请参阅下面连接到 hive 的代码片段:
google-cloud-dataflow - Dataflow Worker 无法通过 Cloud VPN 连接到 Kafka
我在将 KafkaIO 源连接到只能通过 Cloud VPN 隧道使用的代理时遇到问题。
隧道设置为允许来自特定子网 ( secure
) 的流量,并且路由已设置并适用于该子网中的计算引擎。
使用 KafkaIO 执行管道DirectRunner
能够连接到代理,无论是通过secure
子网中标准计算引擎上的 VPN,还是通过具有 ssh 隧道设置的本地机器sshuttle
。
运行与DataflowRunner
代理的连接的管道失败:
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
。管道在secure
子网中执行。
连接到作业跨越的计算引擎实例,可以看到以下路由:
代理的 IPv4 地址位于172.17.0.0/16
(远程)网络内。VPN 配置了远程网络范围172.16.0.0/12
。
远程172.17.0.0/16
网络是否会被虚拟网络设置遮蔽并被 docker 使用?
google-cloud-dataflow - 读取大量文件时如何提高 TextIO 或 AvroIO 的性能?
TextIO.read()
并且AvroIO.read()
(以及其他一些 Beam IO)默认情况下在当前 Apache Beam 运行器中在读取扩展为大量文件(例如 1M 文件)的文件模式时表现不佳。
如何有效地读取如此大量的文件?
google-cloud-dataflow - Apache Beam 管道中的组元素
我有一个从 AVRO 文件中解析记录的管道。
我需要将传入的记录拆分为 500 个项目的块,以便调用同时接受多个输入的 API。
有没有办法用 Python SDK 做到这一点?