1

这是我想要实现的目标:

  1. 创建 2 个返回异步消息的独立对象
  2. 转换 Observable 中的消息

这些函数很长,因此每个对象都在一个单独的文件中。这是我的对象的基本结构:

class Object:
    def message(self, observer, scheduler):
        _task = None

        def _teardown():
            if _task:
                _task.cancel()
            observer.on_completed()

        async def _loop():
            while True:
                #DO STUFF
                observer.on_next(message)

        async def _run_loop():
            try:
                await _loop()
            except asyncio.CancelledError:
                print('error')
            finally:
                _teardown()

        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        _task = loop.create_task(_run_loop())
        loop.run_forever()

这是RXPY的代码:

object1 = Object1()

object1_observable = defer(lambda _: create(object1.message)).pipe(
    op.observe_on(EventLoopScheduler()),
    op.share()
)

object1_observable.subscribe(lambda value: print('Object1 Observer Received'))

object2 = Object2()

object2_observable = defer(lambda _: create(object2.message)).pipe(
    op.observe_on(EventLoopScheduler()),
    op.share()
)

object2_observable.subscribe(lambda value: print('Object2 Observer Received'))

问题 1:使用此代码,我只收到来自 object1 的消息。我应该如何调整此代码以使其工作并从两个对象接收消息?

问题 2:我希望来自 object1 的消息成为另一个类的参数,比如说 Object3。然后 Object3 将监听 Object2。对于来自 Object2 的每条消息,Object3 都会做一些事情。对于来自 Object1 的每条消息,Object3 的属性都会发生变化。但是,在 Object3 使用来自 Object2 的最新消息完成之前,属性不应更改。我不知道如何用 RXPY 做到这一点。任何想法?

使用踏板更新,代码可以工作。不过,我不确定这是继续进行的好方法。我将使用适当的杀戮过程对其进行更新,并且我仍在处理问题 2

class Object:
    def message(self, observer, scheduler):
        def _loop():
            while True:
                #DO STUFF
                observer.on_next(message)

        thread = threading.Thread(target=_loop)
        thread.daemon = False
        thread.start()
4

0 回答 0