0

我使用 RxPy 来处理文件,我想构建管道加载序列

pool_scheduler = ThreadPoolScheduler(multiprocessing.cpu_count())

    rx.from_list(independing_files).pipe(
        self._build_dataflow(),
        ops.subscribe_on(pool_scheduler),
    ).subscribe(
        on_next=lambda file: logger.info(f'file: {file}'),
        on_error=print,
        on_completed=lambda: logger.info("independing frames loaded!"))

    withdraw_file = []
    for file in filtered_files:
        if self._table_name_on_contain(file) == 'mellow':
            withdraw_file += [file]

    rx.from_list(withdraw_file).pipe(
        self._build_apples_dataflow(),
        ops.subscribe_on(pool_scheduler)
    ).subscribe(
        on_next=lambda file: logger.info(f'file: {file}'),
        on_error=print,
        on_completed=lambda: logger.info("apples loaded!"))

    rx.from_list(depending_files).pipe(
        self._build_dataflow(),
        ops.subscribe_on(pool_scheduler)
    ).subscribe(
        on_next=lambda file: logger.info(f'file: {file}'),
        on_error=print,
        on_completed=lambda: self._complete_action())

但是我得到了一个我没有预料到的结果:似乎每个管道都是异步运行的,因为我没有表示“停止点”。我希望第二个和第三个管道只有在第一个管道完成后才开始。如何解决?

4

2 回答 2

1

您可以multiprocessing.Event用于同步管道:

event = multiprocessing.Event()

rx.pipe(...).subscribe(on_completed=event.set)

event.wait()

rx.pipe(...)
rx.pipe(...)
于 2020-06-23T03:38:32.213 回答
0

如上所述,我使用条件变量来强制线程到达障碍点。我有这样的东西,它工作得很好。

import logging
import threading

import rx
from EventMonitoringETL.tools import logging_config
from rx import operators as ops
import multiprocessing
import rx.scheduler as scheduler
if __name__ == '__main__':
    logging_config.InitLogging()
    logger = logging.getLogger('c4t_etl')

    thread_count = multiprocessing.cpu_count()
    thread_pool_scheduler = scheduler.ThreadPoolScheduler(thread_count)

    event = multiprocessing.Event()

    rx.of(1,2,3,4,5,6,7,8,9,10).pipe(
      ops.subscribe_on(thread_pool_scheduler)
      ).subscribe(lambda i: print(f'{i} - {threading.get_ident()}'), on_completed=event.set)

    event.wait()
    event.clear()
    print("AAAAA")

    rx.of(11,12,13,14,15,16,17,18,19,110).pipe(
      ops.subscribe_on(thread_pool_scheduler)
      ).subscribe(lambda i: print(f'{i} - {threading.get_ident()}'), on_completed=event.set)

    event.wait()
    event.clear()
    print("BBBBB")

    rx.of(21,22,23,24,25,26,27,28,29,210).pipe(
      ops.subscribe_on(thread_pool_scheduler)
      ).subscribe(lambda i: print(f'{i} - {threading.get_ident()}'), on_completed=event.set)

    event.wait()
    event.clear()
    print("CCCCC")

    rx.of(31,32,33,34,35,36,37,38,39,310).pipe(
      ops.subscribe_on(thread_pool_scheduler)
      ).subscribe(lambda i: print(f'{i} - {threading.get_ident()}'), on_completed=event.set)

    event.wait()
    event.clear()
    print("DDDDDDD")
于 2020-07-15T11:01:21.040 回答