问题标签 [google-dataflow]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
47 浏览

google-cloud-platform - 如何在 GCP 上同时处理数据流两个批处理文件

我想同时处理从 gcp 到数据流的两个文件。

我认为如果再有一个文件进入侧面输入是可能的。

但是,在这种情况下,我认为它会每次都被处理,而不仅仅是一次。

例如)如何同时读取和处理file1和file2(我是否必须将两个文件放在一个文件中并按照路径?)

如果您能给我一个很好的例子或建议,我将不胜感激。

谢谢你。

0 投票
2 回答
185 浏览

google-cloud-platform - 使用 Dataflow 将 csv 数据从 Pub/Sub 订阅流式传输到 BigQuery

使用 GCP 探索 ETL 过程。我在 Dataflow 中使用 Pub/Sub Subscription to BigQuery 模板。

Pub/Sub Subscription 中的消息数据为 csv 格式,如下所示

53466,06/30/2020,,特立尼达和多巴哥,2020-07-01 04:33:52,130.0,8.0,113.0

这会在加载到 BigQuery 表时出现错误。如何在模板中将 CSV 数据转换为 JSON?

0 投票
1 回答
353 浏览

python - AttributeError:模块 'apache_beam' 没有属性 'options'

运行 Apache Beam 管道时出现以下错误。完整的错误代码是:

产生错误的代码是当我尝试调用PipelineOptions该类时。

0 投票
0 回答
12 浏览

google-bigquery - 从 Oracle db 到实时大查询的 Google 数据流能力

Google 数据流是否能够实时(或接近实时)从 Oracle db 流式传输到针对大表和高事务表的 google 大查询?(实时数据复制)

(数据流看起来更适合低事务性应用数据)

谢谢

0 投票
2 回答
273 浏览

google-cloud-dataflow - Dataflow SQL (GCP) 不支持使用 STRUCT 的嵌套行

0 投票
1 回答
120 浏览

apache-beam - 使用历史数据回填 Beam 管道

我有一个 Google Cloud Dataflow 管道(使用 Apache Beam SDK 编写),它在正常操作模式下处理发布到 Cloud Pub/Sub 的事件数据。

为了使管道状态保持最新并创建正确的输出,必须首先处理大量历史事件数据。此历史数据可通过 JDBC 获得。在测试中,我可以使用JdbcIO.ReadPTransform 读取和处理所有历史状态,但我想使用此 JDBC 事件数据初始化我的生产管道,然后干净地转换为从 Pub/Sub 读取事件。如果流水线逻辑以向后不兼容的方式改变,那么同样的过程可能会在未来再次发生。

请注意,当这种历史读取发生时,新事件会继续到达 Pub/Sub(这些最终也会进入数据库),因此应该从仅从 JDBC 读取的历史事件和仅读取较新的事件进行干净的切换来自发布/订阅。

我考虑过的一些方法:

  1. 有一个从两个输入读取的管道,但在某个时间戳之前从 JDBC 过滤数据,在某个时间戳之后从 pub/sub 过滤数据。一旦管道被赶上,部署一个删除 JDBC 输入的更新。

    我认为这不会起作用,因为删除 I/O 转换不向后兼容。或者,管道的 JDBC 部分必须永远留在那里,无缘无故地消耗 CPU 周期。

  2. 编写一次性作业,用全部历史数据填充 pub/sub,然后启动仅从 pub/sub 读取的主管道。

    这似乎使用了比必要更多的发布/订阅资源,而且我认为在管道中交错的新数据与更旧的数据会导致水印过早推进。

  3. 选项 #2 的变体——在处理历史数据之前停止创建新事件,以避免弄乱水印。

    这需要停机时间。

将历史数据回填到管道中似乎是一种常见要求,但我一直无法找到解决此问题的好方法。

0 投票
0 回答
34 浏览

google-cloud-platform - 由于错误“计划从未来的 JDK 版本中删除 Nashorn 引擎”,Google 数据流作业未执行转换

参考:https ://cloud.google.com/composer/docs/how-to/using/using-dataflow-template-operator 代码:https ://github.com/kolban-google/composer-dataflow

使用以下脚本提交作业,但转换步骤不生成任何数据,仅显示以下错误。

消息:“警告:计划从未来的 JDK 版本中删除 Nashorn 引擎”步骤:“JavascriptTextTransformer.TransformTextViaJavascript/ParDo(Anonymous)”

因此,“插入 Bigquery”步骤失败。

0 投票
1 回答
57 浏览

google-cloud-platform - 通过使用“云存储上的文本文件到大查询”数据流模板 GCP 将大查询中的日期格式从 DD-MM-YYYY 转换为 YYYY-MM-DD

我是 GCP 的新手,需要一些帮助来解决我的问题。

我正在创建 CSV 文件、json 文件和 java 脚本文件并上传到 GCP 存储桶。创建“云存储上的文本文件到大查询”数据流模板以将数据填充到大查询中。

通过在 GCP 中使用“云存储上的文本文件到大查询”数据流模板,在大查询中将日期格式从 DD-MM-YYYY 转换为 YYYY-MM-DD 时需要一些帮助。

0 投票
0 回答
118 浏览

apache-beam - 将数据流用作订阅者时,未显示 PubSub Unacked 消息

我有一个从订阅中读取消息的数据流管道。当消息以正确的格式出现时,它工作正常。但是当消息格式不正确时,它会抛出错误。我决定在有一些错误消息时使用死信主题,因为我无法立即更改代码,所以它将被推送到主题。我用死信主题更新了订阅并提供了必要的权限。我可以看到订阅中的 unack 消息数在增加,但它永远不会到达死信队列。

问题:

0 投票
2 回答
596 浏览

python-3.x - 如何使用具有自定义格式的 Apache Beam 以 JSON 格式将 BigQuery 结果写入 GCS?

我正在尝试使用 Python 中的 Apache Beam 将 BigQuery 表记录写为 GCS 存储桶中的 JSON 文件。

我有一个 BigQuery 表 -my_project.my_dataset.my_table像这样

在此处输入图像描述

我希望将表记录/条目写入 GCS 存储桶位置中的 JSON 文件 - “gs://my_core_bucket/data/my_data.json”

预期的 JSON 格式:

但是,通过我当前的 apache 管道实现,我看到创建的 JSON 文件在文件“gs://my_core_bucket/data/my_data.json”中有类似这样的条目

如何创建一个干净的 JSON 文件,将 BigQuery 记录作为 JSON 数组元素?

这是我的管道代码。