3

我正在寻找一个关于如何使用响应式扩展(RxPY)和 Twisted 的非常基本的示例。这是一个使用 Twisted 流式传输消息的最小 hello 应用程序。

def hello():
    print 'Hello from the reactor loop!'
    print 'Lately I feel like I\'m stuck in a rut.'

from twisted.internet import reactor

reactor.callWhenRunning(hello)

print 'Starting the reactor.'
reactor.run()

我想使用 RxPY 库来挂钩这些流(如果这样更容易,它们不必打印到屏幕上),并执行映射、过滤等规范操作......

我可以找到的所有 RxPY 示例都可以生成它们自己的流,例如从可迭代的,以下代码流整数 0-9:

xs = Observable.from_(range(10))
xs.map(
    lambda x: x * 2
      ).subscribe(print)

或者包含在更复杂的示例中(例如子类化 WebSocket 处理程序)。知道如何拦截打印消息吗?EG,从 Twisted 反应器生成一个可观察的流?

4

1 回答 1

3

我一直在试图弄清楚同样的事情。这是我的发现。

启动 RX 数据流的最简单方法是创建Subject()结合了ObserverObservable. 然后,您可以使用on_next方法将数据输入其中。

接收部分:

from rx.subjects import Subject
subject=Subject()
subject.filter(...).map(...).subscribe(my_observer)

从您扭曲的回调中,您只需执行以下操作:

subject.on_next(data_item)
...
subject.on_completed()

或者您可以发出错误信号:

subject.on_error(my_error)

在学习 RxPy 时,我使用 RxPy 和 Twisted 编写了简单的 Web 服务器。它是单个python文件,可以用作示例。

于 2015-11-21T16:30:12.653 回答