3

Observable.FromAsyncPattern 可用于从 BeginX EndX 样式的异步方法中生成 observable。

也许我误解了一些事情,但是是否有类似的功能可以从新的异步样式方法创建一个 observable - 即.. Stream.ReadAsync?

4

2 回答 2

6

您可以使用ToObservable创建IObservable<T>一个:Task<T>

using System.Reactive.Threading.Tasks;

Stream s = ...;
IObservable<int> o = s.ReadAsync(buffer, offset, count).ToObservable();
于 2013-01-19T21:56:44.210 回答
2

请注意,李的回答是正确的,但最终我最终做的是使用 Observable.Create 不断地从流中读取,见下文 -

    public static IConnectableObservable<Command> GetReadObservable(this CommandReader reader)
    {

       return Observable.Create<Command>(async (subject, token) =>
        {


            try
            {

                while (true)
                {

                    if (token.IsCancellationRequested)
                    {
                        subject.OnCompleted();
                        return;
                    }

                    Command cmd = await reader.ReadCommandAsync();

                    subject.OnNext(cmd);

                }

            }

            catch (Exception ex)
            {
                try
                {
                    subject.OnError(ex);
                }
                catch (Exception)
                {
                    Debug.WriteLine("An exception was thrown while trying to call OnError on the observable subject -- means you're not catching exceptions everywhere");
                    throw;
                }
            }

        }).Publish();

    }
于 2013-02-12T21:29:57.547 回答