问题标签 [google-dataflow]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
289 浏览

google-cloud-platform - Google Dataflow SQL 中的 SQL 启动器错误 (java.lang.NullPointerException)

我正在尝试使用 Google 数据流 SQL 从 Pubsub 主题中读取数据并收到“NullPointerException”错误。谁能指导我做错了什么。

下面是 SQL 查询。我也尝试选择几列。同样的错误即将到来。

来自 LogsExplorer 的日志

0 投票
0 回答
94 浏览

debugging - CoGroupByKey 在大数据上总是失败(PythonSDK)

我有大约 4000 个文件(每个平均约 7MB)输入。

当数据大小达到大约 4GB 时,我的管道在步骤 CoGroupByKey 上总是失败。我试图限制只使用 300 个文件然后它运行得很好。

如果失败,GCP 数据流上的日志仅显示:

我在日志资源管理器中挖掘所有日志。除上述内容外,没有其他任何指示错误,即使是我的logging.infotry...except代码。

认为这与实例的记忆有关,但我没有深入那个方向。因为它有点像我在使用 GCP 服务时不想担心的事情。

谢谢。

0 投票
0 回答
39 浏览

google-cloud-platform - 谷歌的数据流可能有时间限制?

我已经设法使用 Google Cloud Scheduler 来安排数据流管道的运行,但我也希望管道最多运行一个小时。是否可以安排数据流的结束时间?

编辑:我创建了一个管道,它会等待一定时间然后取消,但我在 cancel() 行上收到错误IOError: Failed to get the Dataflow job id.

这是管道代码:

0 投票
1 回答
410 浏览

google-cloud-platform - 创建 Google Dataflow 模板文件时出错

我正在尝试使用模板安排在一段时间后结束的数据流。使用命令行时,我能够成功执行此操作,但是当我尝试使用 Google Cloud Scheduler 执行此操作时,我在创建模板时遇到错误。

错误是

我用来制作模板的命令是

我拥有的管道文件是这个

有谁知道为什么我可能会收到此错误?

0 投票
1 回答
179 浏览

google-cloud-platform - Apache Beam - Bigquery Upsert

我有一个数据流作业,它将单个文件拆分为 x 个记录(表)。这些流入 bigquery 没问题。

但我发现没有办法在结果之后在管道中执行另一个阶段。

例如

按照上述内容,我想运行以下内容:

无论如何,在对 bigquery 的 upsert 之后运行管道的另一部分还是这是不可能的?提前致谢。

0 投票
2 回答
164 浏览

google-cloud-platform - 使用 flexRSGoal 运行数据流批处理

我发现这篇关于在抢占式机器上运行数据流批处理的文章。

我尝试使用此脚本使用此功能:

但这是结果:

错误:(gcloud.beta.dataflow.jobs.run)无法识别的参数:--flexRSGoal=COST_OPTIMIZED

要搜索 gcloud 命令的帮助文本,请运行: gcloud help -- SEARCH_TERMS

我运行命令gcloud beta dataflow jobs run help,似乎没有这个选项flexRSGoal......

我错过了什么?

0 投票
1 回答
82 浏览

google-cloud-functions - Google Dataflow pipeline for varying schema

I have a product to define and configure business workflows. A part of this product is a form-builder which enables users to setup different forms.

This entire forms data is backed on MongoDB in the following structure

As seen above the form can be for various different businesses. However, I am looking at data pipeline to transport the data to Google Bigquery at periodic intervals for analysis.

On BQ side, I am maintaining separate tables for each workflows

I have a current working solution which is completely written on Google Cloud Functions. I have a Google Scheduler Job run at periodic intervals invoking the different cloud functions. The cloud functions is doing the following things at high level

  • Iterate for each schema
  • Read the data mongodb for every schema since the last run (as cursor)
  • For each row of data, run the custom transformation logic (this includes transforming various nested data types like grids/lookup etc)
  • Write each row of transformed data directly as stream as ndjson on Google Cloud Storage

I above solution provides me with,

  • Complete control on transformation
  • Simple deployment

However since its all on CF, I am bound by limitation of 9 minutes per run. This essentially puts a lot of pagination requirements especially if there is a need to regenerate the complete data from beginning of time

While the above solution works fine for now, I was looking at other serverless options like Google data-flow. Since I am just starting on data-flow/apache beam, I was wondering

If I were to write a pipeline on beam, should I go with same approach of

  1. Extract(Row by Row) -> Transform -> Load (GCS) -> Load (BQ)

or

  1. Extract (entire data as JSON) -> Load to GCS -> Transform (Beam) -> Load to GCS -> Load to BQ

Let me know if there is any better option for entire data processing.

0 投票
1 回答
336 浏览

python-3.x - 通过 POST 请求将 Dataflow 模板作为 URL 传递时,云调度程序出现错误 401

我为 Dataflow Batch Jobs 创建了一个自定义模板。现在我需要使用云调度程序每 5 分钟运行一次。模板存储在云存储中。但是,每当我在调度程序的 POST 请求中传递模板的 URI 时,我都会收到 401 错误

我不确定身体应该包含什么

错误:

任何人都可以请帮助如何去做。因为我是 GCP 的新手

0 投票
1 回答
118 浏览

python - 与字符串列表相比,带有字典列表的 beam.Create() 非常慢

我正在使用 Dataflow 处理具有大约 400 万个特征(总共约 2GB)的 Shapefile 并将几何加载到 BigQuery 中,因此在我的管道开始之前,我将 shapefile 特征提取到一个列表中,并使用beam.Create(features). 有两种方法可以创建初始功能列表:

  1. 将每个特征导出为 json 字符串,后续DoFns 需要将其解析为 dict:
  1. 导出从 JSON 字符串预解析的 python dict

使用选项 1 时,beam.Create(features)需要一分钟左右,管道继续。使用选项 2,beam.Create(features)在 6 核 i7 上需要 3 多个小时,并且似乎在这里花费了很多时间:

这是传递字典列表时trivial_inference变慢的原因吗?beam.Create我可以配置beam.Create为不做它试图在那里做的任何事情,或者以其他方式加快它的速度,这样字典列表就不会比字符串列表慢 100 倍?

0 投票
1 回答
70 浏览

google-app-engine - 从 Google App Engine Mapreduce 迁移到 Apache Beam

我一直是 Google App Engine 的 Mapreduce 库的长期用户,用于处理 Google Datastore 中的数据。Google 不再支持它,它在 Python 3 中根本不起作用。我正在尝试将我们较旧的 Mapreduce 作业迁移到 Google 的 Dataflow / Apache Beam 运行器,但官方文档很糟糕,它只是描述了 Apache Beam,它确实不告诉你如何迁移。

特别是,问题是这样的:

  1. 在 Mapreduce 中,作业将在您现有的已部署应用程序上运行。但是在 Beam 中,您必须创建和部署自定义 Docker 映像来构建 Dataflow 环境,对吗?
  2. 要在 Mapreduce 中创建新的作业模板,您只需编辑一个 yaml 文件并部署它。要在 Apache Beam 中创建一个,您需要创建自定义运行器代码、部署到谷歌云存储的模板文件,并与 docker 映像链接,对吗?

以上准确吗?如果是这样,通常情况下使用 Dataflow 比使用 Mapreduce 困难得多吗?是否有任何图书馆或提示可以使这更容易?