我正在尝试在 python 的响应式扩展中进行调度。我想用来subscribe_on
并行处理多个可观察对象。如果 observable 是用创建的,这很好用just
,但如果使用range
orfrom_
则不行。
just
默认为Scheduler.immediate
,而其他生成器默认为Scheduler.current_thread
. 这导致了差异,但对我来说感觉不一致。可能是因为我没有掌握完整的问题。
考虑以下示例:
import rx
from rx.concurrency.scheduler import Scheduler
import time
import threading
def work(x):
print "processing %s on thread %s" % (x, threading.currentThread().name)
time.sleep(1)
def finish(x):
print "finished %s on thread %s" % (x, threading.currentThread().name)
# Creates new thread (I like)
rx.Observable.just(3)\
.do_action(work)\
.subscribe_on(Scheduler.new_thread)\
.subscribe(finish)
# Runs on MainThread (I don't like)
rx.Observable.range(1, 3) \
.do_action(work) \
.subscribe_on(Scheduler.new_thread) \
.subscribe(finish)
如果调度程序直接传递给生成器,它可以工作observe_on
,但我想将可观察的创建与处理分离,并实现如下效果:
import rx
from rx.concurrency.scheduler import Scheduler
import time
import threading
def work(x):
print "processing %s on thread %s" % (x, threading.currentThread().name)
time.sleep(1)
def finish(x):
print "finished %s on thread %s" % (x, threading.currentThread().name)
def factory_single():
return rx.Observable.just(1).do_action(work)
def factory_multiple():
return rx.Observable.range(2, 4).do_action(work)
def process(factory):
factory().subscribe_on(Scheduler.new_thread).subscribe(finish)
# Creates a new thread (I like)
process(factory_single)
# Runs on MainThread (I don't like)
process(factory_multiple)
我误会了subscribe_on
吗?我的方法错了吗?