0

我已经设法使用 Google Cloud Scheduler 来安排数据流管道的运行,但我也希望管道最多运行一个小时。是否可以安排数据流的结束时间?

编辑:我创建了一个管道,它会等待一定时间然后取消,但我在 cancel() 行上收到错误IOError: Failed to get the Dataflow job id.

这是管道代码:

p = beam.Pipeline(options=PipelineOptions(region='us-central1'))

(p
    | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
    | 'Decode' >> beam.Map(lambda x:x.decode('utf-8'))
    | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:MarkTest.scraped'.format(PROJECT), schema=schema, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish(duration=3000)

result.cancel()   # If the pipeline has not finished, you can cancel it
4

0 回答 0