0

我有一个数据流作业,它将单个文件拆分为 x 个记录(表)。这些流入 bigquery 没问题。

但我发现没有办法在结果之后在管道中执行另一个阶段。

例如

# Collection1- filtered on first two characters = 95
collection1 = (
    rows    | 'Build pCollection1' >> beam.Filter(lambda s: data_ingestion.filterRowCollection(s, '95'))
            | 'p1 Entities to JSON' >> beam.Map(lambda s: data_ingestion.SplitRowDict(s, '95'))
            | 'Load p1 to BIGQUERY' >> beam.io.WriteToBigQuery(
                    data_ingestion.spec1,
                    schema=parse_table_schema_from_json(data_ingestion.getBqSchema('95')),
                    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED) # Write to Bigquery
            )

# Collection2 - filtered on first two characters = 99
collection2 = (
    rows    | 'Build pCollection2' >> beam.Filter(lambda s: data_ingestion.filterRowCollection(s, '99'))
            | 'p2 Split Entities to JSON' >> beam.Map(lambda s: data_ingestion.SplitRowDict(s, '99'))
            | 'Load p2 to BIGQUERY' >> beam.io.WriteToBigQuery(
                    data_ingestion.spec2,
                    schema=parse_table_schema_from_json(data_ingestion.getBqSchema('99')),
                    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED) # Write to Bigquery)

按照上述内容,我想运行以下内容:

final_output = (
    collection1, collection2
       | 'Log Completion' >> beam.io.WriteToPubSub('<topic>'))

无论如何,在对 bigquery 的 upsert 之后运行管道的另一部分还是这是不可能的?提前致谢。

4

1 回答 1

0

从技术上讲,没有办法完全按照你的要求去做。 beam.io.WriteToBigquery消耗pCollection剩下的东西。

但是,在调用之前将输入复制到beam.io.WriteToBigqueryin并沿每个路径发送 pCollection 的副本很简单。请参阅从文档中引用此示例的此答案parDobeam.io.WriteToBigquerydoFn

于 2020-11-16T02:36:29.337 回答