1

我想实现一个非常简单的光束管道:

read google storage links to text files from PubSub topic->read each text line by line->write to BigQuery.

Apache Beam 为每个进程预先实现了 PTransform。

所以管道将是:

Pipeline | ReadFromPubSub("topic_name") | ReadAllFromText() | WriteToBigQuery("table_name")

但是,ReadAllFromText() 以某种方式阻塞了管道。创建自定义 PTransform 在从 PubSub 读取并将其写入 BigQuery 表后返回随机行正常工作(无阻塞)。添加 3 秒的固定窗口或触发每个元素也不能解决问题。

每个文件大约 10MB 和 23K 行。

不幸的是,我找不到有关ReadAllFromText应该如何工作的文档。如果它试图阻止管道直到读取所有文件,那将是非常奇怪的。而且我希望该函数在读取该行后立即将每行推送到管道。

上述行为是否有任何已知原因?这是一个错误还是我做错了什么?

管道代码:

pipeline_options = PipelineOptions(pipeline_args)
    with beam.Pipeline(options=pipeline_options) as p:
        lines = p | ReadFromPubSub(subscription=source_dict["input"]) \
                | 'window' >> beam.WindowInto(window.FixedWindows(3, 0)) \
                | ReadAllFromText(skip_header_lines=1)

            elements = lines | beam.ParDo(SplitPayload())

            elements | WriteToBigQuery(source_dict["output"], write_disposition=BigQueryDisposition.WRITE_APPEND)
    .
    .
    .
    class SplitPayload(beam.DoFn):
        def process(self, element, *args, **kwargs):

            timestamp, id, payload = element.split(";")

            return [{
                'timestamp': timestamp,
                'id': id,
                'payload': payload
            }]
4

0 回答 0