0

我需要在on_nextpythonrx订阅中调用一个异步函数,如下所示:

from rx.subject import Subject
import asyncio

async def asyncPrint(value: str):
    print(f'async print: {value}')


async def main():
    s = Subject()
    s.subscribe(
        on_error=lambda e: print(e),
        on_next=lambda value: asyncPrint(value)
    )
    s.on_next('Im from the subject')

if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())

但我收到异步错误:

$ python test.py 
rx\core\observer\autodetachobserver.py:26:
RuntimeWarning: coroutine 'asyncPrint' was never awaited
  self._on_next(value)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

我真的不想使用asyncio.get_event_loop().run_until_complete(...),因为我已经在运行主循环并且我不想开始一个新的循环也不使用嵌套循环。

我搜索了一下,发现一个lambda函数不能是 async。而且我认为这可能是这里的问题,因为我真的不知道如何在没有使用库on_next的 lambda 函数的情况下获取值。rx

我还搜索了async-rx库,但这看起来唯一可以做的不同的事情是await subscribe(...),但这不是我想要的。我想要类似的东西subscribe(on_next=await...)

那可能吗?从我的背景来看,在 a 中启动一个函数javascript很容易,所以这对我来说似乎是一项可能的任务。希望有人能找到解决方案。asyncsubscribe

非常感谢!

4

1 回答 1

0

最后我设法通过使用来完成这项工作asyncio.run_task()

from rx.subject import Subject
import asyncio

async def asyncPrint(value: str):
    print(f'async print: {value}')

async def main():
    s = Subject()
    s.subscribe(
        on_error=lambda e: print(e),
        on_next=lambda value: asyncio.create_task(asyncPrint(value))
    )
    s.on_next('Im from the subject')

if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())

输出:

$ python teste.py 
async print: Im from the subject
于 2021-04-03T04:53:54.883 回答