7

我在这里挣扎。通常我会读一本书,但现在还没有。我发现了无数与使用 RX 读取流有关的各种示例,但我发现很难理解。

我知道我可以使用 Observable.FromAsyncPattern 创建 Stream 的 BeginRead/EndRead 或 BeginReadLine/EndReadLine 方法的包装。

但这只会读取一次——当第一个观察者订阅时。

我想要一个 Observable ,它将继续读取和泵送 OnNext 直到流错误或结束。

除此之外,我还想知道如何与多个订阅者共享该可观察对象,以便他们都获得项目。

4

4 回答 4

5

您可以使用Repeat为了保持阅读行直到流结束,Publish或者Replay为了控制跨多个阅读器的共享。

一个简单、完整的 Rx 解决方案的示例,用于从任何流中读取行直到结束:

public static IObservable<string> ReadLines(Stream stream)
{
    return Observable.Using(
        () => new StreamReader(stream),
        reader => Observable.FromAsync(reader.ReadLineAsync)
                            .Repeat()
                            .TakeWhile(line => line != null));
}

该解决方案还利用了到达流末尾时ReadLine返回的事实。null

于 2016-11-15T21:10:34.993 回答
4

添加到李的答案,使用rxx

using (new FileStream(@"filename.txt", FileMode.Open)
       .ReadToEndObservable()
       .Subscribe(x => Console.WriteLine(x.Length)))
{
  Console.ReadKey();
}

将输出读取缓冲区的长度。

于 2013-01-22T13:37:45.440 回答
1

嘿 - 将在这里重用我的其他答案之一(好吧,它的一部分,无论如何):

参考:从 NetworkStream 读取会破坏缓冲区

在那,我有一个这样的扩展方法:

public static class Ext
{        
    public static IObservable<byte[]> ReadObservable(this Stream stream, int bufferSize)
    {        
        // to hold read data
        var buffer = new byte[bufferSize];
        // Step 1: async signature => observable factory
        var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
            stream.BeginRead, 
            stream.EndRead);
        return Observable.While(
            // while there is data to be read
            () => stream.CanRead, 
            // iteratively invoke the observable factory, which will
            // "recreate" it such that it will start from the current
            // stream position - hence "0" for offset
            Observable.Defer(() => asyncRead(buffer, 0, bufferSize))
                .Select(readBytes => buffer.Take(readBytes).ToArray()));
    }
}

您可能可以以如下形式使用它:

// Note: ToEnumerable works here because your filestream
// has a finite length - don't do this with infinite streams!
var blobboData  = stream
     .ReadObservable(bufferSize)
     // take while we're still reading data
     .TakeWhile(returnBuffer => returnBuffer.Length > 0)
     .ToEnumerable()
     // mash them all together
     .SelectMany(buffer => buffer)
     .ToArray();
于 2013-01-22T17:06:43.067 回答
1

解决方案是使用 Observable.Create

这是一个可以适应读取任何类型流的示例

    public static IConnectableObservable<Command> GetReadObservable(this CommandReader reader)
    {

       return Observable.Create<Command>(async (subject, token) =>
        {


            try
            {

                while (true)
                {

                    if (token.IsCancellationRequested)
                    {
                        subject.OnCompleted();
                        return;
                    }

                    //this part here can be changed to something like this
                    //int received = await Task.Factory.FromAsync<int>(innerSocket.BeginReceive(data, offset, size, SocketFlags.None, null, null), innerSocket.EndReceive);

                    Command cmd = await reader.ReadCommandAsync();

                    subject.OnNext(cmd);

                }

            }

            catch (Exception ex)
            {
                try
                {
                    subject.OnError(ex);
                }
                catch (Exception)
                {
                    Debug.WriteLine("An exception was thrown while trying to call OnError on the observable subject -- means you're not catching exceptions everywhere");
                    throw;
                }
            }

        }).Publish();

    }

不要忘记在返回的 IConnectableObservable 上调用 Connect()

于 2013-02-12T21:27:34.680 回答