我是 apache beam 的新手,正在探索 apache Beam 数据流的 python 版本。我想以特定顺序执行我的数据流任务,但它以并行模式执行所有任务。如何在 apache Beam python 中创建任务依赖项?
示例代码:(在下面的代码中 sample.json 文件包含 5 行)
import apache_beam as beam
import logging
from apache_beam.options.pipeline_options import PipelineOptions
class Sample(beam.PTransform):
def __init__(self, index):
self.index = index
def expand(self, pcoll):
logging.info(self.index)
return pcoll
class LoadData(beam.DoFn):
def process(self, context):
logging.info("***")
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
pipeline = beam.Pipeline(options=PipelineOptions())
(pipeline
| "one" >> Sample(1)
| "two: Read" >> beam.io.ReadFromText('sample.json')
| "three: show" >> beam.ParDo(LoadData())
| "four: sample2" >> Sample(2)
)
pipeline.run().wait_until_finish()
我预计它将按照一、二、三、四的顺序执行。但它以并行模式运行。
上述代码的输出:
INFO:root:Missing pipeline option (runner). Executing pipeline using the
default runner: DirectRunner.
INFO:root:1
INFO:root:2
INFO:root:Running pipeline with DirectRunner.
INFO:root:***
INFO:root:***
INFO:root:***
INFO:root:***
INFO:root:***