6

注意:让我为这个问题的长度道歉,我不得不在其中输入很多信息。我希望这不会导致太多人简单地略读并做出假设。请完整阅读。谢谢。

我有一个通过套接字传入的数据流。该数据是面向行的。

我正在使用 .NET(BeginRead 等)的 APM(异步编程方法)。这排除了使用基于流的 I/O,因为异步 I/O 是基于缓冲区的。可以重新打包数据并将其发送到流,例如内存流,但也存在问题。

问题是我的输入流(我无法控制)没有给我任何关于流有多长的信息。它只是一个看起来像这样的换行符流:

COMMAND\n
...Unpredictable number of lines of data...\n
END COMMAND\n
....repeat....

因此,使用 APM,并且由于我不知道任何给定的数据集会有多长时间,数据块很可能会跨越需要多次读取的缓冲区边界,但这些多次读取也将跨越多个数据块。

例子:

Byte buffer[1024] = ".................blah\nThis is another l"
[another read]
                    "ine\n.............................More Lines..."

我的第一个想法是使用 StringBuilder 并将缓冲区行简单地附加到 SB。这在某种程度上有效,但我发现很难提取数据块。我尝试使用 StringReader 读取换行数据,但无法知道您是否获得了完整的行,因为 StringReader 在添加的最后一个块的末尾返回部分行,然后返回 null 。没有办法知道返回的是否是完整的换行数据行。

例子:

// Note: no newline at the end
StringBuilder sb = new StringBuilder("This is a line\nThis is incomp..");
StringReader sr = new StringReader(sb);
string s = sr.ReadLine(); // returns "This is a line"
s = sr.ReadLine();        // returns "This is incomp.."

更糟糕的是,如果我只是继续追加数据,缓冲区会变得越来越大,而且由于这可能一次运行数周或数月,这不是一个好的解决方案。

我的下一个想法是在阅读时从 SB 中删除数据块。这需要编写我自己的 ReadLine 函数,但后来我在读写过程中卡住了数据。此外,较大的数据块(可能包含数百个读取和数兆字节的数据)需要扫描整个缓冲区以查找换行符。它效率不高,而且很丑陋。

我正在寻找具有 StreamReader/Writer 的简单性和异步 I/O 的便利性的东西。

我的下一个想法是使用 MemoryStream,并将数据块写入内存流,然后将 StreamReader 附加到流并使用 ReadLine,但我再次遇到问题,知道缓冲区中的最后一次读取是否是完整的行或不是,而且从流中删除“陈旧”数据更加困难。

我还考虑过使用具有同步读取的线程。这样做的好处是,使用 StreamReader,它将始终从 ReadLine() 返回整行,除非在连接断开的情况下。但是,这会导致取消连接的问题,并且某些类型的网络问题可能会导致长时间挂起阻塞套接字。我正在使用异步 IO,因为我不想在程序阻塞数据接收的整个生命周期中占用一个线程。

连接是持久的。随着时间的推移,数据将继续流动。在初始连接期间,有大量数据流,一旦该流完成,套接字保持打开状态等待实时更新。我不确切知道初始流程何时“完成”,因为唯一知道的方法是不会立即发送更多数据。这意味着我不能等待初始数据加载在处理之前完成,因为它进入时我几乎被“实时”处理。

那么,任何人都可以提出一种不太复杂的方式来处理这种情况的好方法吗?我真的希望它尽可能简单和优雅,但由于所有边缘情况,我不断提出越来越复杂的解决方案。我想我想要的是某种 FIFO,我可以在其中轻松地继续追加更多数据,同时从中弹出符合某些条件的数据(即换行符终止的字符串)。

4

2 回答 2

5

这是一个相当有趣的问题。正如您所建议的那样,我过去的解决方案是使用具有同步操作的单独线程。(我设法解决了使用锁和大量异常处理程序阻塞套接字的大部分问题。)不过,通常建议使用内置异步操作,因为它允许真正的操作系统级异步 I/O,所以我理解你的观点。

好吧,我已经写了一个类来完成我认为你需要的东西(我会说是一种相对干净的方式)。让我知道你的想法。

using System;
using System.Collections.Generic;
using System.IO;
using System.Text;

public class AsyncStreamProcessor : IDisposable
{
    protected StringBuilder _buffer;  // Buffer for unprocessed data.

    private bool _isDisposed = false; // True if object has been disposed

    public AsyncStreamProcessor()
    {
        _buffer = null;
    }

    public IEnumerable<string> Process(byte[] newData)
    {
        // Note: replace the following encoding method with whatever you are reading.
        // The trick here is to add an extra line break to the new data so that the algorithm recognises
        // a single line break at the end of the new data.
        using(var newDataReader = new StringReader(Encoding.ASCII.GetString(newData) + Environment.NewLine))
        {
            // Read all lines from new data, returning all but the last.
            // The last line is guaranteed to be incomplete (or possibly complete except for the line break,
            // which will be processed with the next packet of data).
            string line, prevLine = null;
            while ((line = newDataReader.ReadLine()) != null)
            {
                if (prevLine != null)
                {
                    yield return (_buffer == null ? string.Empty : _buffer.ToString()) + prevLine;
                    _buffer = null;
                }
                prevLine = line;
            }

            // Store last incomplete line in buffer.
            if (_buffer == null)
                // Note: the (* 2) gives you the prediction of the length of the incomplete line, 
                // so that the buffer does not have to be expanded in most/all situations. 
                // Change it to whatever seems appropiate.
                _buffer = new StringBuilder(prevLine, prevLine.Length * 2);
            else
                _buffer.Append(prevLine);
        }
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    private void Dispose(bool disposing)
    {
        if (!_isDisposed)
        {
            if (disposing)
            {
                // Dispose managed resources.
                _buffer = null;
                GC.Collect();
            }

            // Dispose native resources.

            // Remember that object has been disposed.
            _isDisposed = true;
        }
    }
}

应该为每个 NetworkStream 创建一个此类的实例,并且每当接收到新数据时都应该调用 Process 函数(在 BeginRead 的回调方法中,在你调用下一个 BeginRead 之前,我会想象)。

注意:我仅使用测试数据验证了此代码,而不是通过网络传输的实际数据。但是,我预计不会有任何差异......

此外,警告该类当然不是线程安全的,但只要在处理当前数据之前不再次执行 BeginRead (正如我假设你正在做的那样),就不会有任何问题。

希望这对你有用。如果还有其他问题,请告诉我,我将尝试修改解决方案来处理它们。(尽管仔细阅读,我可能错过了一些微妙的问题!)

于 2009-02-08T01:33:22.243 回答
0

你在你的问题中解释的内容让我非常想起 ASCIZ 字符串。(链接文本)。这可能是一个有益的开始。

我不得不在大学里为我正在做的一个项目写类似的东西。不幸的是,我控制了发送套接字,所以我插入了一段消息字段作为协议的一部分。但是,我认为类似的方法可能会使您受益。

我如何接近我的解决方案是我会发送类似 5HELLO 的内容,所以首先我会看到 5,并且知道我的消息长度为 5,因此我需要的消息是 5 个字符。但是,如果在我的异步读取中,我只有 5HE,我会看到我的消息长度为 5,但我只能从线路读取 3 个字节(假设 ASCII 字符)。因此,我知道我丢失了一些字节,并将我拥有的内容存储在片段缓冲区中。每个套接字我有一个片段缓冲区,因此避免了任何同步问题。粗略的过程是。

  1. 从socket读入字节数组,记录读取了多少字节
  2. 逐字节扫描,直到找到换行符(如果您没有接收到 ascii 字符,这将变得非常复杂,但是可能是多个字节的字符,您只能自己处理)
  3. 将您的片段缓冲区转换为字符串,并将您的读取缓冲区追加到新行。将此字符串作为已完成的消息放入队列或要处理的它自己的委托中。(您可以通过实际读取套接字写入与片段相同的字节数组来优化这些缓冲区,但这很难解释)
  4. 继续循环,每次我们找到一个新行时,从记录的开始/结束位置的字节排列创建一个字符串,然后放到队列/委托上进行处理。
  5. 一旦我们到达读取缓冲区的末尾,就将剩下的任何内容复制到片段缓冲区中。
  6. 调用套接字上的 BeginRead,这将跳转到第 1 步。当套接字中的数据可用时。

然后你使用另一个线程来读取你的incommign消息队列,或者只是让线程池使用委托来处理它。并进行您必须做的任何数据处理。如果我错了,有人会纠正我,但这几乎没有线程同步问题,因为你只能在任何时候从套接字读取或等待读取,所以不用担心锁(除非你是填充队列,我在实现中使用了委托)。您需要自己解决一些细节,例如要留下多大的片段缓冲区,如果在读取时收到 0 个换行符,则必须将整个消息附加到片段缓冲区而不会覆盖任何事物。我认为它最终运行了大约 700 到 800 行代码,但其中包括连接设置、加密协商、

这个设置对我来说表现得非常好;使用这个实现一个包括加密处理的 1.8Ghz opteron,我能够在 100Mbps 以太网上执行高达 80Mbps 的速度。而且由于您与套接字绑定,因此服务器将扩展,因为可以同时处理多个套接字。如果您需要按顺序处理项目,则需要使用队列,但如果顺序无关紧要,那么委托将为您提供线程池之外的非常可扩展的性能。

希望这会有所帮助,这并不是一个完整的解决方案,而是一个开始寻找的方向。

*请注意,我的实现完全在字节级别并支持加密,我在示例中使用了字符以使其更易于可视化。

于 2009-02-08T01:13:51.800 回答