1

我有一个处理 web 请求的 Observable,我想在不同的流中处理成功或失败,这与这个 example非常相似。我的脚本和示例之间的主要区别是我不想合并流然后订阅。我将 RxPY 1.6.1 与 Python 2.7 一起使用。

request = Observable.of(requests.get(self.URL, params=request_params))

request_success, request_failed = request.partition(lambda r: r.status_code == requests.codes.ok)          

request_failed.subscribe(lambda r: print_msg('failure!'))
request_success.subscribe(lambda r: print_msg('success!'))

当请求失败时,脚本failure!会按预期打印。但是,当响应正常时,脚本不会打印success!. 有趣的是,当您切换订阅的顺序时,success!确实会打印出来,而failure!永远不会到达。

我想可能request无法多播,所以我尝试添加publish()requestobservable 并connect()在创建订阅后调用。这没有帮助(所以我把它排除在上面的最小示例之外)。

我错过了什么?

4

1 回答 1

1

从将您的代码与RxPy 对分区运算符进行的单元测试进行比较,看起来代码几乎是正确的。

您走在正确的轨道上,您确实需要将请求 Observable 转换为多播的 observable。

这是工作代码(在 Repl.it 上测试,您必须将请求列表转换回您在代码中使用的类/对象)

from rx import Observable

def print_msg(message):
  print(message)

class Request(object):
  def __init__(self, status_code):
    self.status_code = status_code

request = Observable.of(
  Request(200),
  Request(404),
  Request(412),
  Request(200),
).publish()

request_success, request_failed = request.partition(lambda r: \
  r.status_code == 200)

request_success.subscribe(lambda r: print_msg('success!'))
request_failed.subscribe(lambda r: print_msg('failure!'))
request.connect()

请注意,一旦请求列表变成了一个 Observable,它就会被发布(Observable.of(...).publish()),并且只有我们订阅了分区的 observable 之后,我们才调用 connect。

输出是:

success!
failure!
failure!
success!
于 2018-10-22T19:37:37.953 回答