5

我正在尝试在 python 的响应式扩展中进行调度。我想用来subscribe_on并行处理多个可观察对象。如果 observable 是用创建的,这很好用just,但如果使用rangeorfrom_则不行。

just默认为Scheduler.immediate,而其他生成器默认为Scheduler.current_thread. 这导致了差异,但对我来说感觉不一致。可能是因为我没有掌握完整的问题。
考虑以下示例:

import rx
from rx.concurrency.scheduler import Scheduler
import time
import threading


def work(x):
    print "processing %s on thread %s" % (x, threading.currentThread().name)
    time.sleep(1)


def finish(x):
    print "finished %s on thread %s" % (x, threading.currentThread().name)


# Creates new thread (I like)
rx.Observable.just(3)\
    .do_action(work)\
    .subscribe_on(Scheduler.new_thread)\
    .subscribe(finish)

# Runs on MainThread (I don't like)
rx.Observable.range(1, 3) \
    .do_action(work) \
    .subscribe_on(Scheduler.new_thread) \
    .subscribe(finish)

如果调度程序直接传递给生成器,它可以工作observe_on,但我想将可观察的创建与处理分离,并实现如下效果:

import rx
from rx.concurrency.scheduler import Scheduler
import time
import threading


def work(x):
    print "processing %s on thread %s" % (x, threading.currentThread().name)
    time.sleep(1)


def finish(x):
    print "finished %s on thread %s" % (x, threading.currentThread().name)


def factory_single():
    return rx.Observable.just(1).do_action(work)


def factory_multiple():
    return rx.Observable.range(2, 4).do_action(work)


def process(factory):
    factory().subscribe_on(Scheduler.new_thread).subscribe(finish)

# Creates a new thread (I like)
process(factory_single)

# Runs on MainThread (I don't like)
process(factory_multiple)

我误会了subscribe_on吗?我的方法错了吗?

4

1 回答 1

4

您的示例中有三个可以独立安排的操作:

  1. 数据馈送操作。just并且range默认使用不同的调度器,但是它们之间并没有太大的区别。两者都在当前线程上输入初始值。您可以通过将默认调度程序作为参数传递给这些方法来覆盖它们的默认调度程序。

  2. 订阅动作。默认使用Scheduler.current_thread。即,它与数据馈送操作在同一线程上执行。可以被subscribe_on方法覆盖。

  3. 观察 ( on_next, on_error, on_completed) 动作。默认使用Scheduler.current_thread。即它与订阅操作在同一个线程上执行。可以被observe_on方法覆盖。

如果您仅针对其中一项操作覆盖调度程序,则其他操作应按照上述说明进行操作。

关于调度程序

Scheduler.immediate并没有真正安排任何事情。它立即在调度它的同一线程上调用操作。

Scheduler.current_thread通过排队操作避免递归,但仍然在调度它的同一线程上调用操作。

Scheduler.new_thread启动单个后台线程以一个接一个地执行操作。

Scheduler.timeout为需要执行的每个操作启动新的后台线程。

尝试并行处理

在不同线程中调度工作的最合适方法似乎是observe_on.

thread_pool但问题是现在 RxPy中没有调度程序。new_thread调度程序只启动一个线程,所以它不会对你有太大帮助。

timeout调度程序可用于并行,但它无法控制并发线程的数量,因此并发任务数量的爆炸式增长可能会溢出内存并有效地使您的系统崩溃。

不是observe_on中的错误

我尝试运行您的示例,observe_on(Scheduler.timeout)但任务仍然没有并行进行。在查看 RxPy 源代码后,我发现它仅在当前事件完成后才安排下一个事件,这有效地禁用了并行处理。我的第一反应是报告实施中的错误observe_on

但经过进一步调查,我发现串行执行不是错误,而是预期行为

并行执行任务的正确方法

这是有效的代码(基于此答案):

Observable.range(1, 3) \
  .select_many(lambda i: Observable.start(lambda: work(i), scheduler=Scheduler.timeout)) \
  .observe_on(Scheduler.event_loop) \
  .subscribe(finish)

Observable.start创建异步observable,通过Scheduler.timeout.

observe_on(Scheduler.event_loop)是可选的。它强制finish所有项目的方法在同一个线程上调用。

请注意,不能保证按初始顺序finish调用方法。range

于 2015-11-21T19:50:44.090 回答