问题标签 [google-dataflow]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
json - 如何使用 Google Cloud Platform 的数据流的 Pub/Sub 主题 -> BigQuery 模板将嵌套的 JSON 数据放入 BigQuery 表
我正在尝试将从 IoT 设备发送的消息存储在 BigQuery 表中。
云架构如下:
本地设备 -> json_message -> mqtt_client -> GC IoT 设备 -> 设备注册表 -> Pub/Sub 主题 -> 带有 Pub/Sub 主题到 BigQuery 模板的数据流 -> BigQuery 表
我已经让这个系统使用这样构造的非嵌套 JSON 消息
BigQuery 中成功存储此数据的表具有以下架构:
现在我试图让这个系统使用嵌套的 JSON 消息,它的构造如下:
我正在尝试将其存储在具有以下架构的 BigQuery 表中:
不幸的是,当我尝试这种方法时,我收到以下错误,该错误由 DataFlow 输出到 BigQuery 中的错误表。
解决此问题的最佳方法是什么?我无法更改嵌套的 JSON 结构,因为我正在构建一个测试套件,这是必需的格式。
go - GroupByKey 始终将所有内容保存在 RAM 中,导致 OOM
GroupByKey
我正在编写一个管道代码,它将在 DataFlow 的批处理和流模式下使用,并且在批处理模式下使用时我遇到了 OOM 问题。下面的代码显示了问题:当我有一个大文件时,GroupByKey
似乎将所有内容都保存在内存中,仅在输入完成后才发出值。我尝试使用触发器来强制触发事件,但失败了。我找不到在大文件上使用此转换的任何方法。
如何在 beam go 中实现一个包含分组并且可以有效处理大文件的管道?
输出:
编辑:我发现与触发器和窗口相关的 Jira 票证,在撰写本文时,让人相信触发器,特别是触发器传播是 WIP。
python - 多次执行“finish_bundle”方法:Apache beam、Google Dataflow
我正在尝试使用 apache 光束管道作为 Google Dataflow 作业分批创建 100 条记录的 JSON 文件。
我正在从 BigQuery 读取记录并尝试创建每个有 100 条记录的 JSON 文件,即batch_size = 100
因此,我希望7 JSON
在执行700 records
从 BQ 读取的 Dataflow 时创建文件,但是我看到创建了更多文件并且批处理大小与预期不符。
我希望该"finish_bundle"
方法执行一次,但我看到它通过创建记录少于 100 的 JSON 批处理文件在那里多次执行。
这是当前 DF 执行的日志详细信息
我希望创建具有正确序列和批量大小的 JSON 文件批次,每条记录有 100 条记录,如下所示
这是我创建 JSON 批处理文件并存储在 GCS 存储桶中的管道代码。
我不确定为什么多次调用“finish_bundle”?
我的管道代码中的哪些修改会使文件以给定的批量大小创建?
编辑:我尝试使用“DirectRunner”执行相同的程序,它创建了正确数量的文件。
google-cloud-dataflow - Google 对数据流生成的数据流作业日志收费
Google 对管道本身生成的数据流作业日志收费,我们不会从作业中生成任何日志,我们正在使用 python-apache 束通过 composer 运行数据流作业。
这没有任何意义,因为我们没有生成任何日志,而且我们没有太多关于如何关闭生成的日志并将其摄取到云日志记录的信息。
我们收到的 Google 邮件
我们写信提醒您,从 2021 年 10 月 12 日开始,您的 Dataflow 应用程序生成的日志(并被提取并存储在 Google Cloud Logging 中)将按标准 Cloud Logging 价格收费。您仍将收到每个项目每月 50 GB 总(跨所有日志卷)摄取日志的免费层级。这是我们在 2021 年 7 月 12 日发送的通信的后续行动。
我需要做什么?
我们建议您在 2021 年 10 月 12 日之前通过执行以下步骤来检查 Cloud Logging 中 Dataflow 作业日志记录的使用情况:使用 Cloud Monitoring 中的 Metrics Explorer 查看您的使用情况指标。将您在上一步中使用的图表保存到 Stackdriver Monitoring 信息中心或使用数据流使用开源信息中心模板。有关优化 Dataflow 作业的日志量的信息,请参阅控制 Dataflow 日志量。
google-dataflow - 部署数据流作业时如何以代码 0 退出?
如果我创建或更新 Dataflow 作业,例如
python path/to/dataflow_file.py
它在 GCP 云数据流中创建了一个作业,它打印了从我用来启动作业的 shell 创建作业所需的步骤,最后到达了一个点
工人成功启动。
此时我希望进程以代码 0 退出,但它会继续等待。我怎么做?
google-cloud-platform - 包括自定义 PTransform 导致在 GCP 的 Dataflow 作业中找不到依赖项
我试图创建一个复合 PTransform 如下(Python):
然后在 main run() 方法中使用它,如下所示:
run() 和 LimitVolume 都在同一个 main.py 脚本中,然后将其作为作业提交/部署到 GCP
当我通过 DirectRunner 在本地运行这项工作时 - 一切正常;如果我在 GCP 中使用 DataflowRunner 提交并运行它 - 它开始抛出如下错误:
基本上没有找到很多在 requirements.txt 文件中定义并在部署作业时通过 --requirements_file 选项指定的依赖项
请参阅下面的完整错误堆栈跟踪(缩写)。
现在,妙语:
如果我将 LimitVolume PTransform 中的相同逻辑放入 run() 方法并直接在我的管道中指定:
并从 main.py 文件中删除 LimitVolume 类的定义 -它在本地和 GCP 中都可以正常工作!依赖关系没有问题。
所以,很明显,管道中唯一存在的自定义 PTransform 有一些非常“特别”的东西——有人知道那可能是什么吗?
我找不到任何关于自定义 PTransforms 的信息,或者用它打包的细节,或者像这样的错误——这本身就令人担忧......
谢谢!!
这是错误的更大输出:
java - 调整 Datastream-BigQuery 数据流模板以保持审计
Dataflow 和 Datastream 的新功能。有一个用 Java 编写的 Google Dataflow 模板,可以将Datastream
数据从存储桶读取到github repo中的 BigQuery
它正确地将 MySQL 数据复制到指定的 BigQuery 数据集
目前,每个更改Datastream
都将通过从特定表中插入或更新来合并
如何调整此模板以添加一个新BigQuery Nested & Repeatable
列,该列将审核由Datastream
google-bigquery - Google Cloud Dataflow,apache Beam 无法设置 BQ 查询参数:
要求是从 BQ 读取最新更新的记录并加载到 CloudSQL:
这是执行的步骤,
读取大于 LAST_UPD_TS 的 BQ 表记录。PCollection read_from_bq = pipeline.apply("从 bq 读取", BigQueryIO.readTableRows() .withTemplateCompatibility() .fromQuery("select * from
pdata.DEPT
WHERE LAST_UPD_TS >='parametervalue'";) .withoutValidation().usingStandardSql());记录将插入 CloudSQL read_from_bq.apply("Insert and Update", JdbcIO.write() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(ValueProvider.StaticValueProvider.of("com.mysql.jdbc.Driver"), jdbcUrlValueProvider )) .withStatement("插入 DEPT (LOC_DIM_ID,DIVN_NBR,DEPT_NBR,END_DT,START_DT,PRC_OPT_CD,PRN_LVL_CD,PRICE_LOC_NBR,LAST_UPD_TS,LAST_UPD_USERID)" + "值(?,?,?,?,?,?,?,? ,?,?)" + "在重复密钥更新 START_DT=?,PRC_OPT_CD=?,PRN_LVL_CD=?,PRICE_LOC_NBR=?,LAST_UPD_TS=?,LAST_UPD_USERID=?") .withPreparedStatementSetter(new DEPT_BULKPreparedStatementSetters()) ); PipelineResult.State state = pipeline.run().waitUntilFinish();
上面的代码完美执行,没有任何问题
要求是,首先需要从 CloudSQL 表现有记录中读取 LAST_UPD_TS 列,并将其设置为管道中 BigQuery 查询的参数值,在使用 System.out.println(tableRowTypedRead.getTable( ).get("loc_dim")); 在下面的代码中,我们如何将从 CloudSQL 获取的 LAST_UPD_TS 值作为管道中的第一步作为内联参数传递给同一管道中的 BQ 查询?