我正在寻找一个关于如何使用响应式扩展(RxPY)和 Twisted 的非常基本的示例。这是一个使用 Twisted 流式传输消息的最小 hello 应用程序。
def hello():
print 'Hello from the reactor loop!'
print 'Lately I feel like I\'m stuck in a rut.'
from twisted.internet import reactor
reactor.callWhenRunning(hello)
print 'Starting the reactor.'
reactor.run()
我想使用 RxPY 库来挂钩这些流(如果这样更容易,它们不必打印到屏幕上),并执行映射、过滤等规范操作......
我可以找到的所有 RxPY 示例都可以生成它们自己的流,例如从可迭代的,以下代码流整数 0-9:
xs = Observable.from_(range(10))
xs.map(
lambda x: x * 2
).subscribe(print)
或者包含在更复杂的示例中(例如子类化 WebSocket 处理程序)。知道如何拦截打印消息吗?EG,从 Twisted 反应器生成一个可观察的流?