我有一个数据流作业,它将单个文件拆分为 x 个记录(表)。这些流入 bigquery 没问题。
但我发现没有办法在结果之后在管道中执行另一个阶段。
例如
# Collection1- filtered on first two characters = 95
collection1 = (
rows | 'Build pCollection1' >> beam.Filter(lambda s: data_ingestion.filterRowCollection(s, '95'))
| 'p1 Entities to JSON' >> beam.Map(lambda s: data_ingestion.SplitRowDict(s, '95'))
| 'Load p1 to BIGQUERY' >> beam.io.WriteToBigQuery(
data_ingestion.spec1,
schema=parse_table_schema_from_json(data_ingestion.getBqSchema('95')),
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED) # Write to Bigquery
)
# Collection2 - filtered on first two characters = 99
collection2 = (
rows | 'Build pCollection2' >> beam.Filter(lambda s: data_ingestion.filterRowCollection(s, '99'))
| 'p2 Split Entities to JSON' >> beam.Map(lambda s: data_ingestion.SplitRowDict(s, '99'))
| 'Load p2 to BIGQUERY' >> beam.io.WriteToBigQuery(
data_ingestion.spec2,
schema=parse_table_schema_from_json(data_ingestion.getBqSchema('99')),
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED) # Write to Bigquery)
按照上述内容,我想运行以下内容:
final_output = (
collection1, collection2
| 'Log Completion' >> beam.io.WriteToPubSub('<topic>'))
无论如何,在对 bigquery 的 upsert 之后运行管道的另一部分还是这是不可能的?提前致谢。