这是我想要实现的目标:
- 创建 2 个返回异步消息的独立对象
- 转换 Observable 中的消息
这些函数很长,因此每个对象都在一个单独的文件中。这是我的对象的基本结构:
class Object:
def message(self, observer, scheduler):
_task = None
def _teardown():
if _task:
_task.cancel()
observer.on_completed()
async def _loop():
while True:
#DO STUFF
observer.on_next(message)
async def _run_loop():
try:
await _loop()
except asyncio.CancelledError:
print('error')
finally:
_teardown()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
_task = loop.create_task(_run_loop())
loop.run_forever()
这是RXPY的代码:
object1 = Object1()
object1_observable = defer(lambda _: create(object1.message)).pipe(
op.observe_on(EventLoopScheduler()),
op.share()
)
object1_observable.subscribe(lambda value: print('Object1 Observer Received'))
object2 = Object2()
object2_observable = defer(lambda _: create(object2.message)).pipe(
op.observe_on(EventLoopScheduler()),
op.share()
)
object2_observable.subscribe(lambda value: print('Object2 Observer Received'))
问题 1:使用此代码,我只收到来自 object1 的消息。我应该如何调整此代码以使其工作并从两个对象接收消息?
问题 2:我希望来自 object1 的消息成为另一个类的参数,比如说 Object3。然后 Object3 将监听 Object2。对于来自 Object2 的每条消息,Object3 都会做一些事情。对于来自 Object1 的每条消息,Object3 的属性都会发生变化。但是,在 Object3 使用来自 Object2 的最新消息完成之前,属性不应更改。我不知道如何用 RXPY 做到这一点。任何想法?
使用踏板更新,代码可以工作。不过,我不确定这是继续进行的好方法。我将使用适当的杀戮过程对其进行更新,并且我仍在处理问题 2
class Object:
def message(self, observer, scheduler):
def _loop():
while True:
#DO STUFF
observer.on_next(message)
thread = threading.Thread(target=_loop)
thread.daemon = False
thread.start()