注意:让我为这个问题的长度道歉,我不得不在其中输入很多信息。我希望这不会导致太多人简单地略读并做出假设。请完整阅读。谢谢。
我有一个通过套接字传入的数据流。该数据是面向行的。
我正在使用 .NET(BeginRead 等)的 APM(异步编程方法)。这排除了使用基于流的 I/O,因为异步 I/O 是基于缓冲区的。可以重新打包数据并将其发送到流,例如内存流,但也存在问题。
问题是我的输入流(我无法控制)没有给我任何关于流有多长的信息。它只是一个看起来像这样的换行符流:
COMMAND\n
...Unpredictable number of lines of data...\n
END COMMAND\n
....repeat....
因此,使用 APM,并且由于我不知道任何给定的数据集会有多长时间,数据块很可能会跨越需要多次读取的缓冲区边界,但这些多次读取也将跨越多个数据块。
例子:
Byte buffer[1024] = ".................blah\nThis is another l"
[another read]
"ine\n.............................More Lines..."
我的第一个想法是使用 StringBuilder 并将缓冲区行简单地附加到 SB。这在某种程度上有效,但我发现很难提取数据块。我尝试使用 StringReader 读取换行数据,但无法知道您是否获得了完整的行,因为 StringReader 在添加的最后一个块的末尾返回部分行,然后返回 null 。没有办法知道返回的是否是完整的换行数据行。
例子:
// Note: no newline at the end
StringBuilder sb = new StringBuilder("This is a line\nThis is incomp..");
StringReader sr = new StringReader(sb);
string s = sr.ReadLine(); // returns "This is a line"
s = sr.ReadLine(); // returns "This is incomp.."
更糟糕的是,如果我只是继续追加数据,缓冲区会变得越来越大,而且由于这可能一次运行数周或数月,这不是一个好的解决方案。
我的下一个想法是在阅读时从 SB 中删除数据块。这需要编写我自己的 ReadLine 函数,但后来我在读写过程中卡住了数据。此外,较大的数据块(可能包含数百个读取和数兆字节的数据)需要扫描整个缓冲区以查找换行符。它效率不高,而且很丑陋。
我正在寻找具有 StreamReader/Writer 的简单性和异步 I/O 的便利性的东西。
我的下一个想法是使用 MemoryStream,并将数据块写入内存流,然后将 StreamReader 附加到流并使用 ReadLine,但我再次遇到问题,知道缓冲区中的最后一次读取是否是完整的行或不是,而且从流中删除“陈旧”数据更加困难。
我还考虑过使用具有同步读取的线程。这样做的好处是,使用 StreamReader,它将始终从 ReadLine() 返回整行,除非在连接断开的情况下。但是,这会导致取消连接的问题,并且某些类型的网络问题可能会导致长时间挂起阻塞套接字。我正在使用异步 IO,因为我不想在程序阻塞数据接收的整个生命周期中占用一个线程。
连接是持久的。随着时间的推移,数据将继续流动。在初始连接期间,有大量数据流,一旦该流完成,套接字保持打开状态等待实时更新。我不确切知道初始流程何时“完成”,因为唯一知道的方法是不会立即发送更多数据。这意味着我不能等待初始数据加载在处理之前完成,因为它进入时我几乎被“实时”处理。
那么,任何人都可以提出一种不太复杂的方式来处理这种情况的好方法吗?我真的希望它尽可能简单和优雅,但由于所有边缘情况,我不断提出越来越复杂的解决方案。我想我想要的是某种 FIFO,我可以在其中轻松地继续追加更多数据,同时从中弹出符合某些条件的数据(即换行符终止的字符串)。