1

我将在 android 应用程序中使用 rxandroid。我现在正在尝试在 rxpy 中对行为进行建模,因为它对我来说是最容易设置和使用的。在下面的示例中,source3 发出了正确的数据;这是需要一些时间的初始化和我刚刚伪造的永久订阅的串联。我想要 BehaviorSubject 因为我需要立即使用最后一个值进行字段初始化。

我无法弄清楚如何将 BehaviorSubject 链接到 source3 之上,以便它在记住最后一个值的同时发出源 3。我在互联网上搜索了两天,没有找到关于这个用例的明确方向。这是我的代码,问题是为什么我没有从观察者那里得到任何排放。

from rx import Observable, Observer
from rx.subjects import BehaviorSubject
import time, random

def fake_initialization(observer):
    time.sleep(5)  # It takes some time
    observer.on_next("Alpha")
    observer.on_completed()

def fake_subscription(observer):
    iter = 0 # Subscription emits forever
    while True:
        observer.on_next("message %02d"%(iter))
        time.sleep(random.randrange(2,5))
        iter += 1

class PrintObserver(Observer):

    def on_next(self, value):
        print("Received {0}".format(value))
        #bsubject.on_next(value)

    def on_completed(self):
        print("Done!")

    def on_error(self, error):
        print("Error Occurred: {0}".format(error))

source1 = Observable.create(fake_initialization)
source2 = Observable.create(fake_subscription)
source3 = source1 + source2

bsubject = BehaviorSubject(False)
source4 = source3.multicast(bsubject)
source4.connect()
source4.subscribe(PrintObserver())
4

1 回答 1

0

对于某人来说,这实际上是一个相当容易的答案。我发布这个以防其他人最终陷入这种情况。诚然,我没有仔细阅读 rxpy 页面。你需要自己添加并发,大概是因为Python中有太多的并发解决方案。这是最终的工作代码:

import random
import time

import multiprocessing
from rx import Observable,Observer
from rx.concurrency import ThreadPoolScheduler
from rx.subjects import Subject

class PrintObserver1(Observer):

    def on_next(self, value):
        print("Received 1 {0}".format(value))
        #bsubject.on_next(value)

    def on_completed(self):
        print("Done 1!")

    def on_error(self, error):
        print("Error Occurred: 1 {0}".format(error))

class PrintObserver2(Observer):

    def on_next(self, value):
        print("Received 2 {0}".format(value))
        #bsubject.on_next(value)

    def on_completed(self):
        print("Done 2!")

    def on_error(self, error):
        print("Error Occurred: 2 {0}".format(error))

def fake_initialization(observer):
    time.sleep(5)  # It takes some time
    observer.on_next("Alpha")
    observer.on_completed()

def fake_subscription(observer):
    iter = 0 # Subscription emits forever
    while True:
        observer.on_next("message %02d"%(iter))
        time.sleep(random.randrange(2,5))
        iter += 1

optimal_thread_count = multiprocessing.cpu_count()
pool_scheduler = ThreadPoolScheduler(optimal_thread_count)

source1 = Observable.create(fake_initialization).subscribe_on(pool_scheduler)
source2 = Observable.create(fake_subscription).subscribe_on(pool_scheduler)
catted_source = source1 + source2

native_source = Observable.interval(1000)
print native_source,catted_source
#source = source3
subject = Subject()
# native_source = works
# catted_source = not works
subSource = catted_source.subscribe(subject)
#####

subSubject1 = subject.subscribe(PrintObserver1())
subSubject2 = subject.subscribe(PrintObserver2())
time.sleep(30)
subject.on_completed()
subSubject1.dispose()
subSubject2.dispose()

另请注意,您必须安装“futures”包才能在 Python 2.7 上运行并发。

如果您收到此错误:

from concurrent.futures import ThreadPoolExecutor 
ImportError: No module named concurrent.futures

阅读此内容(链接是针对略有不同的错误,但解决方案有效):

ImportError:没有名为 concurrent.futures.process 的模块

于 2018-03-29T15:49:09.217 回答