1

问题是用 RX 模拟双循环行为:

while True:
    try:
        token = get_token()
        while True:
            try:
                value = get_value_using_token(token)
                do_something(value)
            except:
                break
    except:
        break

如果将两个循环替换为两个可观察对象,则将是干净的,其中一个充当外部的观察者,而do_something(value)可以自己替换为观察者。任何异常也可以很好地处理。外循环需要阻塞,但内循环可能不是,因为我正在尝试使用外循环来处理异常,使用带有退避函数的重试函数。

到目前为止,我可以使用以下方法构建序列:

Observable.from_iterable(value for value in iter(get_token, None))
    .subscribe(do_something)

但是我怎样才能在外部的阻塞模式下制作类似的结构呢?

4

2 回答 2

1

您只需要使用Repeat运算符来创建一个循环。然后你需要 Retry操作员继续失败。

就像是

Observable.return(get_token())
    .flatMap(token->Observable.return(get_value_using_token(token))
        .repeat())
    .retry()
.subscribe(do_something)

*我不知道python,所以我希望你能转换那个伪代码

于 2016-04-07T02:36:11.047 回答
0

我最终做的是使用repeat运算符创建无限的函数流,并map对其进行调用。

def get_token():
    return some_value

def get_value_with_token(token):
    return some_value_using_token

Observable.repeat(get_token)\
    .map(lambda f: f())\
    .map(lambda n: O.repeat(lambda: get_value_with_token(n)))\
    .concat_all()\
    .map(lambda f: f())\
    .subscribe(logger.info)

whereget_tokenget_value_with_tokenare 函数。

通过对两者都使用阻塞函数,我可以创建一个双循环,并将其他rx运算符应用retry到 observable 上。

于 2017-04-19T15:15:45.673 回答