8

我有一个async方法是一个长时间运行的方法,它读取一个流,当它发现某些东西时会触发一个事件:

public static async void GetStream(int id, CancellationToken token)

它需要一个取消令牌,因为它是在新任务中创建的。await它在读取流时在内部调用:

var result = await sr.ReadLineAsync()

现在,我想将其转换为返回 IObservable<> 的方法,以便可以将其与反应式扩展一起使用。从我读过的内容来看,最好的方法是使用Observable.Create,并且由于 RX 2.0 现在也支持异步,因此我可以使用以下方式来完成所有工作:

public static IObservable<Message> ObservableStream(int id, CancellationToken token)
{
    return Observable.Create<Message>(
        async (IObserver<Message> observer) =>
            {

里面的其余代码是相同的,但我调用的不是触发事件observer.OnNext()。但是,这感觉不对。一方面,我在其中混合了 CancellationTokens,虽然添加了 async 关键字使它起作用,但这实际上是最好的做法吗?我这样称呼我的 ObservableStream:

Client.ObservableStream(555404, token).ObserveOn(Dispatcher.CurrentDispatcher).SubscribeOn(TaskPoolScheduler.Default).Subscribe(m => Messages.Add(m));
4

2 回答 2

13

你是对的。一旦你通过一个 来表示你的接口IObservable,你应该避免要求调用者提供一个CancellationToken. 这并不意味着您不能在内部使用它们。Rx 提供了几种机制来生成CancellationToken实例,当观察者取消订阅您的 observable 时,这些实例会被取消。

有很多方法可以解决您的问题。最简单的方法几乎不需要更改您的代码。它使用一个重载,如果调用者取消订阅,Observable.Create它会为您提供CancellationToken触发:

public static IObservable<Message> ObservableStream(int id)
{
    return Observable.Create<Message>(async (observer, token) =>
    {
         // no exception handling required.  If this method throws,
         // Rx will catch it and call observer.OnError() for us.
         using (var stream = /*...open your stream...*/)
         {
             string msg;
             while ((msg = await stream.ReadLineAsync()) != null)
             {
                 if (token.IsCancellationRequested) { return; }
                 observer.OnNext(msg);
             }
             observer.OnCompleted();
         }
    });
}
于 2013-07-05T20:12:43.763 回答
1

您应该更改 GetStream 以返回 Task,而不是 void(返回 async void 不好,除非绝对需要,正如 svick 评论的那样)。返回任务后,您只需调用 .ToObservable() 即可完成。

例如:

public static async Task<int> GetStream(int id, CancellationToken token) { ... }

然后,

GetStream(1, new CancellationToken(false))
   .ToObservable()
   .Subscribe(Console.Write);
于 2013-07-04T14:10:02.697 回答