问题标签 [google-dataflow]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
publish-subscribe - 如何自动缩放谷歌数据流(流)管道?
我们在 Google Dataflow 中运行流式传输管道。它提取 Pub/Sub 消息并保存到 BigQuery。出于某种原因,在过去的几天里,我们有积压。系统延迟显示 9-15 小时。我在这里关注文档,并添加了以下参数:
然而,它似乎并没有产生新的管道。我们有拉 Pub/Sub。
google-cloud-platform - 谷歌数据流问题
我们正在 Google bigquery 上新实现 DataWareHouse,我们所有的资源都在 prim 数据库上。因此,我们将用于 ETL 和 Maven 的数据流与 Apache Beam SDK 一起使用,以便在 Google Cloud Dataflow 服务上运行 30 个管道。
如果表中的数据较少,则它运行成功。如果数据以百万为单位,则会引发如下错误
为了使用参数编译和运行 Java 类的 main 方法,我正在执行以下命令。
请让我知道我是否正确。什么是正确的参数,数据流可以并行执行多个管道吗?
python - How to deploy Google Cloud Dataflow with connection to PostgreSQL (beam-nuggets) from Google Cloud Functions
I'm trying to create ETL in GCP which will read part of data from PostgreSQL and put it in the suitable form to BigQuery. I was able to perform this task deploying Dataflow from my computer, but I failed to make it dynamic, so it will read last transferred record and transfer next 100. So I figured out, that I'll create Dataflows from Cloud Function. Everything was working OK, reading/writing to BigQuery works like a charm, but I'm stuck on PostgreSQL requited package: beam-nuggets.
In the function I'm creating pipe arguments:
Then create pipeline:
and run it:
If I omit:
everything is fine except Dataflow cannot use PostgeQSL as import:
fails.
When I add
line, testing function from GCP Function web portal returns:
running
from local computer works OK.
setup.py is deployed along with function code (main.py) and requirements.txt to the Cloud Function.
Requirements.txt is used during Function deploy and looks like this:
setup.py looks like this:
I'm stuck for couple days, tried different setup.py approaches, tried to use requirements.txt instead of setup.py - no luck.
log just says:
Supplementary info:
if I'm using
instead of
I'm getting:
in GCP Functions web portal while running test function.
Afrer I increased memory to 2BG it says:
Logs in this case:
From local machine Dataflow deployment works OK.
Command from logs:
also works OK although it seems like downloading half of the internet for couple of minutes, even if I reduce requirements.txt to beam-nuggets==0.15.1 only.
It stucks on
exactly during setup from this package, function:
python-3.x - 从 Dataflow 作业中删除 Google 存储中的文件
我在 python 3.7 中有一个用 apache-beam 制作的数据流,我在其中处理一个文件,然后我必须删除它。该文件来自谷歌存储桶,问题是当我使用 DataflowRunner 运行器时,我的工作不起作用,因为谷歌数据流 python 3.7 环境中没有安装 google-cloud-storage API。你们知道不使用这个 API 我怎么能在我的数据流中删除这个文件?我见过像https://beam.apache.org/releases/pydoc/2.22.0/apache_beam.io.filesystem.html这样的 apache_beam 模块,但我不知道如何使用它,也没有找到有关如何使用此模块的教程或示例。
google-cloud-platform - 数据流作业 GCS 到 Pub/sub 最大批量大小
我正在使用默认数据流模板 GCS 来发布/订阅。在云存储中输入文件,大小为 300MB,每个文件有 2-3 百万行。
启动数据流批处理作业时出现以下错误
来自工作人员的错误消息:javax.naming.SizeLimitExceededException:Pub/Sub 消息大小 (1089680070) 超过了最大批处理大小 (7500000) org.apache.beam.sdk.io.gcp.pubsub.PubsubIO$Write$PubsubBoundedWriter.processElement(PubsubIO.爪哇:1160)
来自文档:Pub/Sub 一个批次最多接受 1,000 条消息,并且一个批次的大小不能超过 10 兆字节。
这是否意味着我必须将输入文件拆分为 10MB 块或 1000 条消息才能发布?
将如此大的文件(每个 300MB)加载到 pubsub 的推荐方法是什么?
在此先感谢您的帮助。
java - 如何使用 Apache Beam 创建 tar.gz 文件
我使用下面的内容创建了一个tar.gz
文件,并且.gz
文件已创建但 tar 文件不可用。如何达到结果?
以及如何 tar 多个文件。
提前致谢
java - 从 Pubsub 读取并写入 GCS 的 Google 数据流作业非常慢(WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards)耗时太长
目前我们有一个数据流作业,它从 pubsub 读取并使用 FileIO.writeDynamic 将 avro 文件写入 GCS,当我们使用 10000 events/sec 进行测试时,由于 WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards 非常慢,因此无法更快地处理。下面是我们用来编写的代码片段。我们如何改进
我们使用自定义文件命名,格式为 gs://tenantID.<>/eventname/dddd-mm-dd/<uniq_id-shardInder-of-numOfShards-pane-paneIndex.avro>
google-cloud-platform - Google Dataflow 文件处理的输入文件如何统计行数?
我正在尝试计算输入文件中的行数,并且我正在使用 Cloud dataflow Runner 来创建模板。在下面的代码中,我从 GCS 存储桶中读取文件,对其进行处理,然后将输出存储在 Redis 实例中。
但我无法计算输入文件的行数。
主班
示例输入文件
执行管道的命令
我尝试使用StackOverflow 解决方案中的以下代码,但它对我不起作用。
我也浏览了 Apache Beam 文档,但没有发现任何帮助。对此的任何帮助将不胜感激。
python - 通过依赖管道处理 Dataflow/Apache Beam 中的拒绝
我有一个从 BigQuery 获取数据并将其写入 GCS 的管道,但是,如果我发现任何拒绝,我想将它们正确地写入 Bigquery 表。我将拒绝收集到全局列表变量中,然后将列表加载到 BigQuery 表中。当我在本地运行它时,这个过程运行良好,因为管道以正确的顺序运行。当我使用dataflowrunner运行它时,它不能保证顺序(我希望pipeline1在pipeline2之前运行。有没有办法使用python在Dataflow中拥有依赖管道?或者也请建议是否可以用更好的方法解决这个问题。提前致谢。
google-bigquery - 在 beam.io.writetobigquery 中使用模式更新选项
我正在使用 apache 光束数据流将一堆日志文件加载到 BigQuery 中。通过向文件添加新列,文件格式可以在一段时间内发生变化。我看到了 Schema Update Option ALLOW_FILED_ADDITION
。
有谁知道如何使用它?这是我的WriteToBQ
步骤的样子: