问题标签 [google-dataflow]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
39 浏览

publish-subscribe - 如何自动缩放谷歌数据流(流)管道?

我们在 Google Dataflow 中运行流式传输管道。它提取 Pub/Sub 消息并保存到 BigQuery。出于某种原因,在过去的几天里,我们有积压。系统延迟显示 9-15 小时。我在这里关注文档,并添加了以下参数:

然而,它似乎并没有产生新的管道。我们有拉 Pub/Sub。

0 投票
0 回答
340 浏览

google-cloud-platform - 谷歌数据流问题

我们正在 Google bigquery 上新实现 DataWareHouse,我们所有的资源都在 prim 数据库上。因此,我们将用于 ETL 和 Maven 的数据流与 Apache Beam SDK 一起使用,以便在 Google Cloud Dataflow 服务上运行 30 个管道。

如果表中的数据较少,则它运行成功。如果数据以百万为单位,则会引发如下错误

为了使用参数编译和运行 Java 类的 main 方法,我正在执行以下命令。

请让我知道我是否正确。什么是正确的参数,数据流可以并行执行多个管道吗?

0 投票
1 回答
1211 浏览

python - How to deploy Google Cloud Dataflow with connection to PostgreSQL (beam-nuggets) from Google Cloud Functions

I'm trying to create ETL in GCP which will read part of data from PostgreSQL and put it in the suitable form to BigQuery. I was able to perform this task deploying Dataflow from my computer, but I failed to make it dynamic, so it will read last transferred record and transfer next 100. So I figured out, that I'll create Dataflows from Cloud Function. Everything was working OK, reading/writing to BigQuery works like a charm, but I'm stuck on PostgreSQL requited package: beam-nuggets.

In the function I'm creating pipe arguments:

Then create pipeline:

and run it:

If I omit:

everything is fine except Dataflow cannot use PostgeQSL as import:

fails.

When I add

line, testing function from GCP Function web portal returns:

running

from local computer works OK.

setup.py is deployed along with function code (main.py) and requirements.txt to the Cloud Function.

Requirements.txt is used during Function deploy and looks like this:

setup.py looks like this:

I'm stuck for couple days, tried different setup.py approaches, tried to use requirements.txt instead of setup.py - no luck.

log just says:

Supplementary info:

if I'm using

instead of

I'm getting:

in GCP Functions web portal while running test function.

Afrer I increased memory to 2BG it says:

Logs in this case:

From local machine Dataflow deployment works OK.

Command from logs:

also works OK although it seems like downloading half of the internet for couple of minutes, even if I reduce requirements.txt to beam-nuggets==0.15.1 only.

It stucks on

exactly during setup from this package, function:

0 投票
1 回答
685 浏览

python-3.x - 从 Dataflow 作业中删除 Google 存储中的文件

我在 python 3.7 中有一个用 apache-beam 制作的数据流,我在其中处理一个文件,然后我必须删除它。该文件来自谷歌存储桶,问题是当我使用 DataflowRunner 运行器时,我的工作不起作用,因为谷歌数据流 python 3.7 环境中没有安装 google-cloud-storage API。你们知道不使用这个 API 我怎么能在我的数据流中删除这个文件?我见过像https://beam.apache.org/releases/pydoc/2.22.0/apache_beam.io.filesystem.html这样的 apache_beam 模块,但我不知道如何使用它,也没有找到有关如何使用此模块的教程或示例。

0 投票
1 回答
595 浏览

google-cloud-platform - 数据流作业 GCS 到 Pub/sub 最大批量大小

我正在使用默认数据流模板 GCS 来发布/订阅。在云存储中输入文件,大小为 300MB,每个文件有 2-3 百万行。

启动数据流批处理作业时出现以下错误

来自工作人员的错误消息:javax.naming.SizeLimitExceededException:Pub/Sub 消息大小 (1089680070) 超过了最大批处理大小 (7500000) org.apache.beam.sdk.io.gcp.pubsub.PubsubIO$Write$PubsubBoundedWriter.processElement(PubsubIO.爪哇:1160)

来自文档:Pub/Sub 一个批次最多接受 1,000 条消息,并且一个批次的大小不能超过 10 兆字节。

这是否意味着我必须将输入文件拆分为 10MB 块或 1000 条消息才能发布?

将如此大的文件(每个 300MB)加载到 pubsub 的推荐方法是什么?

在此先感谢您的帮助。

0 投票
1 回答
149 浏览

java - 如何使用 Apache Beam 创建 tar.gz 文件

我使用下面的内容创建了一个tar.gz文件,并且.gz文件已创建但 tar 文件不可用。如何达到结果?

以及如何 tar 多个文件。

提前致谢

0 投票
2 回答
451 浏览

java - 从 Pubsub 读取并写入 GCS 的 Google 数据流作业非常慢(WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards)耗时太长

目前我们有一个数据流作业,它从 pubsub 读取并使用 FileIO.writeDynamic 将 avro 文件写入 GCS,当我们使用 10000 events/sec 进行测试时,由于 WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards 非常慢,因此无法更快地处理。下面是我们用来编写的代码片段。我们如何改进

我们使用自定义文件命名,格式为 gs://tenantID.<>/eventname/dddd-mm-dd/<uniq_id-shardInder-of-numOfShards-pane-paneIndex.avro>

0 投票
2 回答
541 浏览

google-cloud-platform - Google Dataflow 文件处理的输入文件如何统计行数?

我正在尝试计算输入文件中的行数,并且我正在使用 Cloud dataflow Runner 来创建模板。在下面的代码中,我从 GCS 存储桶中读取文件,对其进行处理,然后将输出存储在 Redis 实例中。

但我无法计算输入文件的行数。

主班

示例输入文件

执行管道的命令

我尝试使用StackOverflow 解决方案中的以下代码,但它对我不起作用。

我也浏览了 Apache Beam 文档,但没有发现任何帮助。对此的任何帮助将不胜感激。

0 投票
1 回答
191 浏览

python - 通过依赖管道处理 Dataflow/Apache Beam 中的拒绝

我有一个从 BigQuery 获取数据并将其写入 GCS 的管道,但是,如果我发现任何拒绝,我想将它们正确地写入 Bigquery 表。我将拒绝收集到全局列表变量中,然后将列表加载到 BigQuery 表中。当我在本地运行它时,这个过程运行良好,因为管道以正确的顺序运行。当我使用dataflowrunner运行它时,它不能保证顺序(我希望pipeline1在pipeline2之前运行。有没有办法使用python在Dataflow中拥有依赖管道?或者也请建议是否可以用更好的方法解决这个问题。提前致谢。

0 投票
1 回答
104 浏览

google-bigquery - 在 beam.io.writetobigquery 中使用模式更新选项

我正在使用 apache 光束数据流将一堆日志文件加载到 BigQuery 中。通过向文件添加新列,文件格式可以在一段时间内发生变化。我看到了 Schema Update Option ALLOW_FILED_ADDITION

有谁知道如何使用它?这是我的WriteToBQ步骤的样子: