我想实现一个非常简单的光束管道:
read google storage links to text files from PubSub topic->read each text line by line->write to BigQuery.
Apache Beam 为每个进程预先实现了 PTransform。
所以管道将是:
Pipeline | ReadFromPubSub("topic_name") | ReadAllFromText() | WriteToBigQuery("table_name")
但是,ReadAllFromText
() 以某种方式阻塞了管道。创建自定义 PTransform 在从 PubSub 读取并将其写入 BigQuery 表后返回随机行正常工作(无阻塞)。添加 3 秒的固定窗口或触发每个元素也不能解决问题。
每个文件大约 10MB 和 23K 行。
不幸的是,我找不到有关ReadAllFromText
应该如何工作的文档。如果它试图阻止管道直到读取所有文件,那将是非常奇怪的。而且我希望该函数在读取该行后立即将每行推送到管道。
上述行为是否有任何已知原因?这是一个错误还是我做错了什么?
管道代码:
pipeline_options = PipelineOptions(pipeline_args)
with beam.Pipeline(options=pipeline_options) as p:
lines = p | ReadFromPubSub(subscription=source_dict["input"]) \
| 'window' >> beam.WindowInto(window.FixedWindows(3, 0)) \
| ReadAllFromText(skip_header_lines=1)
elements = lines | beam.ParDo(SplitPayload())
elements | WriteToBigQuery(source_dict["output"], write_disposition=BigQueryDisposition.WRITE_APPEND)
.
.
.
class SplitPayload(beam.DoFn):
def process(self, element, *args, **kwargs):
timestamp, id, payload = element.split(";")
return [{
'timestamp': timestamp,
'id': id,
'payload': payload
}]