Dataflow 和 Datastream 的新功能。有一个用 Java 编写的 Google Dataflow 模板,可以将Datastream
数据从存储桶读取到github repo中的 BigQuery
它正确地将 MySQL 数据复制到指定的 BigQuery 数据集
shuffledTableRows
.apply(
"Map To Replica Tables",
new DataStreamMapper(
options.as(GcpOptions.class),
options.getOutputProjectId(),
options.getOutputDatasetTemplate(),
options.getOutputTableNameTemplate())
.withDataStreamRootUrl(options.getDataStreamRootUrl())
.withDefaultSchema(BigQueryDefaultSchemas.DATASTREAM_METADATA_SCHEMA)
.withIgnoreFields(fieldsToIgnore))
.apply(
"BigQuery Merge/Build MergeInfo",
new MergeInfoMapper(
bigqueryProjectId,
options.getOutputStagingDatasetTemplate(),
options.getOutputStagingTableNameTemplate(),
options.getOutputDatasetTemplate(),
options.getOutputTableNameTemplate()))
.apply(
"BigQuery Merge/Merge into Replica Tables",
BigQueryMerger.of(
MergeConfiguration.bigQueryConfiguration()
.withMergeWindowDuration(
Duration.standardMinutes(options.getMergeFrequencyMinutes()))));
目前,每个更改Datastream
都将通过从特定表中插入或更新来合并
如何调整此模板以添加一个新BigQuery Nested & Repeatable
列,该列将审核由Datastream