我有一个async
方法是一个长时间运行的方法,它读取一个流,当它发现某些东西时会触发一个事件:
public static async void GetStream(int id, CancellationToken token)
它需要一个取消令牌,因为它是在新任务中创建的。await
它在读取流时在内部调用:
var result = await sr.ReadLineAsync()
现在,我想将其转换为返回 IObservable<> 的方法,以便可以将其与反应式扩展一起使用。从我读过的内容来看,最好的方法是使用Observable.Create
,并且由于 RX 2.0 现在也支持异步,因此我可以使用以下方式来完成所有工作:
public static IObservable<Message> ObservableStream(int id, CancellationToken token)
{
return Observable.Create<Message>(
async (IObserver<Message> observer) =>
{
里面的其余代码是相同的,但我调用的不是触发事件observer.OnNext()
。但是,这感觉不对。一方面,我在其中混合了 CancellationTokens,虽然添加了 async 关键字使它起作用,但这实际上是最好的做法吗?我这样称呼我的 ObservableStream:
Client.ObservableStream(555404, token).ObserveOn(Dispatcher.CurrentDispatcher).SubscribeOn(TaskPoolScheduler.Default).Subscribe(m => Messages.Add(m));