0

我是 apache beam 的新手,正在探索 apache Beam 数据流的 python 版本。我想以特定顺序执行我的数据流任务,但它以并行模式执行所有任务。如何在 apache Beam python 中创建任务依赖项?

示例代码:(在下面的代码中 sample.json 文件包含 5 行)

import apache_beam as beam
import logging
from apache_beam.options.pipeline_options import PipelineOptions

class Sample(beam.PTransform):
    def __init__(self, index):
        self.index = index

    def expand(self, pcoll):
        logging.info(self.index)
        return pcoll

class LoadData(beam.DoFn):
    def process(self, context):
        logging.info("***")

if __name__ == '__main__':

    logging.getLogger().setLevel(logging.INFO)
    pipeline = beam.Pipeline(options=PipelineOptions())

    (pipeline
        | "one" >> Sample(1)
        | "two: Read" >> beam.io.ReadFromText('sample.json')
        | "three: show" >> beam.ParDo(LoadData())
        | "four: sample2" >> Sample(2)
    )
    pipeline.run().wait_until_finish()

我预计它将按照一、二、三、四的顺序执行。但它以并行模式运行。

上述代码的输出:

INFO:root:Missing pipeline option (runner). Executing pipeline using the 
default runner: DirectRunner.
INFO:root:1
INFO:root:2
INFO:root:Running pipeline with DirectRunner.
INFO:root:***
INFO:root:***
INFO:root:***
INFO:root:***
INFO:root:***
4

1 回答 1

1

根据数据流的文档

当管道运行器为分布式执行构建您的实际管道时,可能会优化管道。例如,将某些变换一起运行或以不同的顺序运行可能在计算上更有效。Dataflow 服务完全管理管道执行的这一方面。

同样根据Apache Beam 的文档

API 强调并行处理元素,这使得难以表达诸如“为 PCollection 中的每个元素分配一个序列号”之类的操作。这是故意的,因为这样的算法更有可能遇到可伸缩性问题。并行处理所有元素也有一些缺点。具体来说,它使得无法批处理任何操作,例如将元素写入接收器或在处理期间检查点进度

所以问题是 Dataflow 和 Apache Beam 本质上是并行的;它们旨在处理令人尴尬的并行用例,如果您需要以特定顺序执行操作,它们可能不是最好的工具。正如@jkff 指出的那样,Dataflow 将优化 Pipeline,使其以最佳方式并行化操作。

如果您确实需要按连续顺序执行每个步骤,解决方法是使用阻塞执行,使用其他Stack Overflow 答案中解释waitUntilFinish()方法。但是,我的理解是,这样的实现只能在批处理管道中工作,因为流管道会连续消耗数据,因此您不能阻止执行以在连续步骤上工作。

于 2018-03-21T11:10:59.933 回答