问题标签 [apache-beam-io]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
3 回答
1123 浏览

google-cloud-dataflow - 使用 apache beam python sdk 通过 PubSub 读写转换的示例

我在这里看到示例https://cloud.google.com/dataflow/model/pubsub-io#reading-with-pubsubio for Java,但是当我看这里时https://github.com/apache/beam/blob/master /sdks/python/apache_beam/io/gcp/pubsub.py它说:

这意味着什么?Cloud Data Flow Python SDK PubSub Source/Sink 还没有准备好?

0 投票
2 回答
674 浏览

google-cloud-dataflow - 为什么 Dataflow-BigTable 连接器不支持增量?

我们在 Streaming 模式中有一个用例,我们希望从管道中跟踪 BigTable 上的计数器(#items 已完成处理),我们需要对其进行增量操作。通过查看https://cloud.google.com/bigtable/docs/dataflow-hbase,我发现此客户端不支持 HBase API 的追加/增量操作。说明的原因是批处理模式下的重试逻辑,但是如果 Dataflow 保证仅一次,为什么支持它是一个坏主意,因为我确定增量只被调用一次?我想了解我缺少什么部分。

此外,CloudBigTableIO可以在流模式下使用还是仅与批处理模式相关联?我想我们可以直接在管道中使用 BigTable HBase 客户端,但是连接器似乎具有很好的属性,例如我们想要利用的连接池,因此是问题所在。

0 投票
1 回答
1643 浏览

java - 确认 Apache Beam 上的 Google Pub/Sub 消息

我正在尝试使用以下代码从 pub/sub 读取

与我在 NodeJS 上收到的内容相比,我得到的消息将包含在该data字段中。我怎样才能得到这个ackId字段(我以后可以用它来确认消息)?我正在打印的属性图是null. 是否有其他方法可以确认所有消息而无需找出 ackId?

0 投票
2 回答
1154 浏览

hive - 如何使用 Apache Beam 从 Hive 读取数据?

如何使用 Apache Beam 从 Hive 读取/如何在 Apache Beam 中使用 Hive 作为源?

0 投票
1 回答
351 浏览

google-bigquery - 对 BigQuery 输出表进行分片

我从文档和这个答案中都读到了可以动态确定表目标的信息。我使用了完全相同的方法,如下所示:

但是,我收到以下编译错误:

任何帮助,将不胜感激。

编辑以澄清我如何在我的案例中使用窗口:

在这种情况下,我收到以下错误The method apply(PTransform<? super PCollection<TableRow>,OutputT>) in the type PCollection<TableRow> is not applicable for the arguments (Window<TableRow>)

0 投票
2 回答
463 浏览

google-bigquery - 以编程方式创建日期分区表

有没有办法使用创建日期分区表Apache Beam BigQueryIO,换句话说,有没有办法为尚未创建的表使用分区装饰器?

我知道我可以先创建一个表,然后我可以在我的代码中使用分区装饰器,但由于我动态确定TableDestination行的 from 字段,我无法提前创建这些表。

我的代码是这样的:

有了这个,它尝试创建一个表,project-id:dataset-id.tableName$partition并抱怨$不能在表名中使用。

0 投票
1 回答
255 浏览

google-bigquery - 在 BigQuery 客户端库和 Beam IO 之间共享架构定义

背景:我们在 Beam 2.0 中使用云数据流运行器将我们的数据 ETL 到我们在 BigQuery 中的仓库。我们希望使用 BigQuery 客户端库 (Beta) 在光束管道向数据仓库填充数据之前创建数据仓库的架构。(原因:对表定义的完全控制,例如分区、易于创建 DW 实例(即数据集)、ETL 逻辑与 DW 设计的分离以及代码模块化)

问题:Beam 中的 BigQury IO 使用 com.google.api.services.bigquery.model 下的 TableFieldSchema 和 TableSchema 类来表示 BigQuery 字段和架构,而 BigQuery 客户端库使用 com.google.cloud.bigquerypackage 下的 TableDefinition 来表示相同的东西,所以字段和模式定义不能在一个地方定义并在另一个地方重复使用。

有没有办法在一个地方定义架构并重新使用它?

谢谢,索比

ps 我们在 Beam 中使用 Java SDK

0 投票
1 回答
561 浏览

google-cloud-dataflow - Apache Beam 抛出无法 setCoder(null):java

我是 Apache Beam 的新手,我正在尝试连接到 mysql 数据库的谷歌云实例。当我运行下面的代码片段时,它会抛出下面的异常。

线程“主”java.lang.IllegalArgumentException 中的异常:无法在 org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)处设置编码器(null)在 org.apache .beam.sdk.values.PCollection.setCoder(PCollection.java:265) 在 org.apache.beam.sdk.io.jdbc.JdbcIO$Read.expand(JdbcIO.java:325) 在 org.apache.beam.sdk .io.jdbc.JdbcIO$Read.expand(JdbcIO.java:272) 在 org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482) 在 org.apache.beam.sdk.Pipeline.applyTransform(Pipeline .java:422) 在 org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44) 在 org.apache.beam.sdk.Pipeline.apply(Pipeline.java:164) 在 com.neudesic。 com.GoogleSQLPipeline.main(GoogleSQLPipeline.java:24)

0 投票
2 回答
1479 浏览

google-cloud-sql - 如何在访问 google sql 实例的数据流中运行梁类?

当我从本地机器运行我的管道时,我可以更新驻留在云 Sql 实例中的表。但是,当我移动它以使用 DataflowRunner 运行时,同样会失败,并出现以下异常。

为了从我的 Eclipse 连接,我将数据源配置创建为 .create("com.mysql.jdbc.Driver", "jdbc:mysql://<ip of sql instance > :3306/mydb").

.create("com.mysql.jdbc.GoogleDriver", "jdbc:google:mysql://<project-id>:<instance-name>/my-db")我在运行 Dataflow 运行器时更改为相同 。

  1. 我应该将实例的区域信息添加到前缀吗?

我运行它时得到的异常如下:

非常感谢任何解决此问题的帮助。这是我第一次尝试将梁管道作为数据流作业运行。

0 投票
3 回答
2081 浏览

google-cloud-dataflow - Apache Beam with Dataflow - 从 BigQuery 读取时的 Nullpointer

我正在使用从 BigQuery 表和文件读取的 apache Beam 编写的 google 数据流上运行一项工作。转换数据并将其写入其他 BigQuery 表。工作“通常”成功,但有时我在从大查询表中读取时随机得到空指针异常并且我的工作失败:

我无法弄清楚这与什么有关。当我清除临时目录并重新上传我的模板时,作业再次通过。

我从 BQ 阅读的方式很简单:

我将不胜感激任何帮助。

任何人?