5

我需要创建一个 Observable 流,它定期发出异步协程的结果。

intervalRead是一个返回 Observable 的函数,并将间隔rate和异步协程函数作为参数fun,该函数需要在定义的间隔内调用。

我的第一个方法是用区间工厂方法创建一个observable,然后用map调用协程,用from_future把它包装在一个Observable中,然后获取协程返回的值。

async def foo():
    await asyncio.sleep(1)
    return 42

def intervalRead(rate, fun) -> Observable:
    loop = asyncio.get_event_loop()
    return rx.interval(rate).pipe(
        map(lambda i: rx.from_future(loop.create_task(fun()))),
    )

async def main():
    obs = intervalRead(5, foo)
    obs.subscribe(
        on_next= lambda item: print(item)
    )

loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()

然而我得到的输出不是协程的结果,而是 from_future 返回的 Observable,在指定的时间间隔发出

输出:<rx.core.observable.observable.Observable object at 0x033B5650>

我怎么能得到那个 Observable 返回的实际值呢?我希望 42

我的第二种方法是创建一个自定义的 observable:


def intervalRead(rate, fun) -> rx.Observable:
    interval = rx.interval(rate)
    def subs(observer: Observer, scheduler = None):
        loop = asyncio.get_event_loop()
        def on_timer(i):
            task = loop.create_task(fun())
            from_future(task).subscribe(
                on_next= lambda i: observer.on_next(i),
                on_error= lambda e: observer.on_error(e),
                on_completed= lambda: print('coro completed')
            )
        interval.subscribe(on_next= on_timer, on_error= lambda e: print(e))        
    return rx.create(subs)

但是,订阅时from_future(task)永远不会发出值,为什么会发生这种情况?

然而,如果我intervalRead这样写:

def intervalRead(rate, fun):
    loop = asyncio.get_event_loop()
    task = loop.create_task(fun())
    return from_future(task)

我得到了预期的结果:42. 显然这并不能解决我的问题,但它让我感到困惑,为什么它在我的第二种方法中不起作用?

最后,我尝试了第三种方法,使用该方法rx.concurrency CurrentThreadScheduler并定期安排一个动作schedule_periodic。然而,我面临着与第二种方法相同的问题。

def funWithScheduler(rate, fun):
    loop = asyncio.get_event_loop()
    scheduler = CurrentThreadScheduler()
    subject = rx.subjects.Subject()
    def action(param):
        obs = rx.from_future(loop.create_task(fun())).subscribe(
            on_next= lambda item: subject.on_next(item),
            on_error= lambda e: print(f'error in action {e}'),
            on_completed= lambda: print('action completed')
        )     
        obs.dispose()   
    scheduler.schedule_periodic(rate,action)
    return subject

将不胜感激任何洞察我错过了什么或任何其他建议来完成我需要的。这是我第一个使用 asyncio 和 RxPY 的项目,我只在 Angular 项目的上下文中使用 RxJS,所以欢迎任何帮助。

4

1 回答 1

11

你的第一个例子几乎可以工作。只需进行两项更改即可使其正常工作:

首先 from_future 的结果是一个可观察的,它发出单个项目(未来完成时的值)。所以 map 的输出是一个更高阶的 observable(一个发出 observables 的 observable)。这些子 observables 可以通过在 map 之后使用 merge_all 运算符或使用 flat_map 而不是 map 来展平。

然后间隔运算符必须在 AsyncIO 循环上调度其计时器,默认情况下并非如此:默认调度程序是 TimeoutScheduler,它会产生一个新线程。所以在原始代码中,任务不能在 AsyncIO 事件循环上调度,因为 create_task 是从另一个线程调用的。在调用 subscribe 时使用 scheduler 参数声明了用于整个运营商链的默认调度程序。

以下代码有效(每 5 秒打印 42):

import asyncio
import rx
import rx.operators as ops
from rx.scheduler.eventloop import AsyncIOScheduler


async def foo():
    await asyncio.sleep(1)
    return 42


def intervalRead(rate, fun) -> rx.Observable:
    loop = asyncio.get_event_loop()
    return rx.interval(rate).pipe(
        ops.map(lambda i: rx.from_future(loop.create_task(fun()))),
        ops.merge_all()
    )


async def main(loop):
    obs = intervalRead(5, foo)
    obs.subscribe(
        on_next=lambda item: print(item),
        scheduler=AsyncIOScheduler(loop)
    )

loop = asyncio.get_event_loop()
loop.create_task(main(loop))
loop.run_forever()
于 2019-06-23T21:28:50.527 回答