我正在尝试使用 Rx 从 TCPClient 接收流中读取数据并将数据解析为字符串的 IObservable,由换行符“\r\n”分隔以下是我从套接字流接收的方式...
var messages = new Subject<string>();
var functionReceiveSocketData =
Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>
(client.Client.BeginReceive, client.Client.EndReceive);
Func<byte[], int, byte[]> copy = (bs, n) =>
{
var rs = new byte[buffer.Length];
bs.CopyTo(rs, 0);
return rs;
};
Observable
.Defer(() =>
{
var buffer = new byte[50];
return
from n in functionReceiveSocketData(buffer, 0, buffer.Length, SocketFlags.None)
select copy(buffer, n);
}).Repeat().Subscribe(x => messages.OnNext(System.Text.Encoding.UTF8.GetString(x)));
这是我想出的解析字符串的方法。这目前不起作用...
obsStrings = messages.Buffer<string,string>(() =>
messages.Scan((a, c) => a + c).SkipWhile(a => !a.Contains("\r\n"))
);
消息主题以块的形式接收消息,因此我尝试连接它们并测试连接的字符串是否包含换行符,从而指示缓冲区关闭并输出缓冲的块。不知道为什么它不起作用。似乎我只从 obsStrings 中得到第一块。
所以我正在寻找两件事。我想简化 io 流的阅读并消除消息主题的使用。其次,我想让我的字符串解析工作。我已经对此进行了一段时间的破解,但无法提出可行的解决方案。我是 Rx 的初学者。
编辑:这是问题解决后的成品....
var receivedStrings = socket.ReceiveUntilCompleted(SocketFlags.None)
.SelectMany(x => System.Text.Encoding.UTF8.GetString(x).ToCharArray())
.Scan(String.Empty, (a, b) => (a.EndsWith("\r\n") ? "" : a) + b)
.Where(x => x.EndsWith("\r\n"))
.Select(buffered => String.Join("", buffered))
.Select(a => a.Replace("\n", ""));
“ReceiveUntilCompleted”是 RXX 项目的扩展。