2

使用响应式扩展,我如何创建一个 Observable,它将持续调用流上的 Read 方法并将结果传播给它的观察者?

或者这是完全错误的处理方式?我应该实现自己的 IObservable 吗?

4

1 回答 1

2

我从来没有遇到过实现我自己的 observable 有意义的情况。

试试这个:

public static IObservable<byte[]> ObservableRead(Stream stream, int bufferSize)
{
    return Observable.Create<byte[]>(o =>
    {
        var buffer = new byte[bufferSize];
        var read = 0;
        try
        {
            while (true)
            {
                read = stream.Read(buffer, 0, buffer.Length);
                if (read == 0)
                {
                    break;
                }
                var results = buffer.Take(read).ToArray();
                //Always return a copy
                //never the buffer for concurrency's sake.
                o.OnNext(results);
            }
        }
        catch (Exception ex)
        {
            o.OnError(ex);
        }
        finally
        {
            o.OnCompleted();
        }
        return Disposable.Empty;
    });
}
于 2013-01-20T03:40:10.617 回答