给定一个同步观察者,有没有办法做到这一点:
observable.SubscribeAsync(observer);
并且所有方法都observer
被异步调用,还是我在创建观察者时必须处理的事情?
给定一个同步观察者,有没有办法做到这一点:
observable.SubscribeAsync(observer);
并且所有方法都observer
被异步调用,还是我在创建观察者时必须处理的事情?
如果您需要在流吐出新值时调用异步方法,您会发现最常见的解决方案是使用SelectMany
. 问题是这不会等待方法完成,导致由创建的任何任务SelectMany
并行运行。
如果您想在等待异步函数完成时阻止流,这就是您需要的:
Observable.Interval(TimeSpan.FromSeconds(1))
.Select(l => Observable.FromAsync(asyncMethod))
.Concat()
.Subscribe();
或者:
Observable.Interval(TimeSpan.FromSeconds(1))
.Select(_ => Observable.Defer(() => asyncMethod().ToObservable()))
.Concat()
.Subscribe();
如果通过异步调用观察者上的方法,你的意思是你想要一个新的通知可以发布而不等待前一个通知的处理完成的情况,那么这就是你必须自己做的事情。这违反了 Rx 的约定,因为如果你可以同时有多个通知在飞行中,你就不能再保证通知是按顺序处理的。我认为这种方法还有其他问题 - 这是你需要小心的事情。
另一方面,如果您只是想在与创建通知的线程不同的线程上处理通知,那么 ObserveOn 和 SubscribeOn 就是您想要研究的。