9

作为我们应用程序的一部分(现在生产大约 4 个月),我们有一个来自外部设备的数据流,我们将其转换为 IObservable

到目前为止,我们一直在使用以下内容来生成它,并且效果很好。

IObservable<string> ObserveStringStream(Stream inputStream)
{
    var streamReader = new StreamReader(inputStream);
    return Observable
            .Create<string>(observer => Scheduler.ThreadPool
            .Schedule(() => ReadLoop(streamReader, observer)));
}

private void ReadLoop(StreamReader reader, IObserver<string> observer)
{
    while (true)
    {
        try
        {
            var line = reader.ReadLine();
            if (line != null)
            {
                observer.OnNext(line);
            }
            else
            {
                observer.OnCompleted();
                break;
            }
        }
        catch (Exception ex)
        {
            observer.OnError(ex);
            break;
        }
    }
}

昨晚我想知道是否有一种方法可以使用yield return语法来实现相同的结果并想出了这个:

IObservable<string> ObserveStringStream(Stream inputStream)
{
    var streamReader = new StreamReader(inputStream);
    return ReadLoop(streamReader)
            .ToObservable(Scheduler.ThreadPool);
}

private IEnumerable<string> ReadLoop(StreamReader reader)
{
    while (true)
    {
        var line = reader.ReadLine();
        if (line != null)
        {
            yield return line;
        }
        else
        {
            yield break;
        }
    }
}

它似乎工作得很好,而且更干净,但我想知道一种方式是否比另一种方式有任何优点或缺点,或者是否有更好的方法。

4

3 回答 3

14

我认为你有一个好主意(Stream变成Enumerable<string>then IObservable<string>)。但是,IEnumerable代码可以更简洁:

IEnumerable<string> ReadLines(Stream stream)
{
    using (StreamReader reader = new StreamReader(stream))
    {
        while (!reader.EndOfStream)
            yield return reader.ReadLine();
    }
}

然后对于IObservable

IObservable<string> ObserveLines(Stream inputStream)
{
    return ReadLines(inputStream).ToObservable(Scheduler.ThreadPool);
}

这更短,更易读,并且可以正确处理流。它也很懒惰。

ToObservable扩展负责捕获OnNext事件(新行)以及OnCompleted事件(可枚举结束)和OnError.

于 2012-04-12T07:36:35.283 回答
2

我手头没有代码,但这里是异步预异步 CTP 的方法。

[略读的注意事项:如果您不需要扩展太多,则无需费心]

创建一个 AsyncTextReader 实现,它本身就是 Observable。ctor 接收一个 Stream,并对流执行 BeginRead(256bytes),将自身作为延续传递,然后返回。

进入延续时,调用 EndRead,并将返回的字节添加到类上的一个小缓冲区中。重复此操作,直到缓冲区包含一个或多个行尾序列(根据 TextWriter)。发生这种情况时,通过 Observable 接口将缓冲区的这些位作为字符串发送出去,然后重复。

完成后,发出 OnComplete 等信号...(并处理流)。如果您在 continuation 中收到 EndReadByte 引发的异常,请捕获它并将其传递给 OnError 接口。

调用代码如下所示:

IObservable = new AsyncTextReader(stream);

这可以很好地扩展。只需要确保您不会对缓冲区处理做任何太愚蠢的事情。

伪代码:

public ctor(Stream stream){
    this._stream = stream;
    BeginRead();
    return;
}

private void BeginRead(){
    // kick of async read and return (synchronously)
    this._stream.BeginRead(_buffer,0,256,EndRead,this);
}

private void EndRead(IAsyncResult result){
    try{
        // bytesRead will be *up to* 256
        var bytesRead = this._stream.EndRead(result);
        if(bytesRead < 1){
            OnCompleted();
            return;
        }
        // do work with _buffer, _listOfBuffers
        // to get lines out etc...
        OnNext(aLineIFound); // times n
        BeginRead(); // go round again
    }catch(Exception err){
        OnException(err);
    }
}

好的,这就是 APM,只有母亲才会喜欢的东西。我热切地等待着另一种选择。

ps:读者是否应该关闭流是一个有趣的问题。我说不,因为它没有创造它。

于 2012-04-13T15:29:00.600 回答
1

使用 async/await 支持,以下很可能是您最好的选择:

IObservable<string> ObserveStringStream(Stream inputStream)
{
    return Observable.Using(() => new StreamReader(inputStream), 
        sr => Observable.Create<string>(async (obs, ct) =>
        {
            while (true)
            {
                ct.ThrowIfCancellationRequested();
                var line = await sr.ReadLineAsync().ConfigureAwait(false);
                if (line == null)
                    break;
                obs.OnNext(line);
            }
            obs.OnCompleted();
    }));
}
于 2015-11-16T14:02:06.400 回答