我使用 RxPy 来处理文件,我想构建管道加载序列
pool_scheduler = ThreadPoolScheduler(multiprocessing.cpu_count())
rx.from_list(independing_files).pipe(
self._build_dataflow(),
ops.subscribe_on(pool_scheduler),
).subscribe(
on_next=lambda file: logger.info(f'file: {file}'),
on_error=print,
on_completed=lambda: logger.info("independing frames loaded!"))
withdraw_file = []
for file in filtered_files:
if self._table_name_on_contain(file) == 'mellow':
withdraw_file += [file]
rx.from_list(withdraw_file).pipe(
self._build_apples_dataflow(),
ops.subscribe_on(pool_scheduler)
).subscribe(
on_next=lambda file: logger.info(f'file: {file}'),
on_error=print,
on_completed=lambda: logger.info("apples loaded!"))
rx.from_list(depending_files).pipe(
self._build_dataflow(),
ops.subscribe_on(pool_scheduler)
).subscribe(
on_next=lambda file: logger.info(f'file: {file}'),
on_error=print,
on_completed=lambda: self._complete_action())
但是我得到了一个我没有预料到的结果:似乎每个管道都是异步运行的,因为我没有表示“停止点”。我希望第二个和第三个管道只有在第一个管道完成后才开始。如何解决?