1
from rx import Observable, Observer
from __future__ import print_function
import random

def create_observable(observer):
    while True:
        observer.on_next(random.randint(1,100))    

Observable.create(create_observable).take_while(lambda x: x>50).repeat(6).subscribe(print)

74 78 94 59 79 76

序列,而我希望每个数字将重复 6 次

所以“重复”永远不会对使用 create 方法创建的 observables 起作用。

4

2 回答 2

1

发布的代码在 [51, 100] 范围内获得 6 次随机整数序列。

尝试

(Observable.create(create_observable)
    .take_while(lambda x: x > 50)
    .select_many(lambda x: Observable.just(x).repeat(6))
    .subscribe(print))

要不就

(Observable.create(create_observable)
    .take_while(lambda x: x > 50)
    .select_many(lambda v: [v] * 6)
    .subscribe(print))
于 2016-10-17T14:24:01.030 回答
0

问题在于create函数中的无限循环。每当您订阅此类可观察对象时,它都会调用传递给创建的函数,并且无法退出此循环。您的程序此时卡住了,没有进一步的代码执行。你没有提到,但我想你不得不强行终止它。

生成无限序列时,您必须在每次迭代后检查订阅是否有效。最简单和最安全的方法是使用Observable.generate()函数:

Observable.generate(random.randint(1, 100), 
                    lambda x: True,
                    lambda x: random.randint(1, 100),
                    lambda x: x) \
    .take_while(lambda x: x > 50) \
    .repeat(6) \
    .subscribe(print)
于 2017-01-13T08:53:07.783 回答