GetMessages
用 Rx编写函数的最简洁的方法是什么:
static void Main()
{
Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
var messages = GetMessages(socket, IPAddress.Loopback, 4000);
messages.Subscribe(x => Console.WriteLine(x));
Console.ReadKey();
}
static IObservable<string> GetMessages(Socket socket, IPAddress addr, int port)
{
var whenConnect = Observable.FromAsyncPattern<IPAddress, int>(socket.BeginConnect, socket.EndConnect)(addr, port);
// now will receive a stream of messages
// each message is prefixed with an 4 bytes/Int32 indicating it's length.
// the rest of the message is a string
// ????????????? Now What ?????????????
}
一个简单的服务器作为上述示例的驱动程序:http: //gist.github.com/452893#file_program.cs
关于使用 Rx 进行套接字编程
我一直在研究使用Reactive Extensions进行我正在做的一些套接字编程工作。我这样做的动机是它会以某种方式使代码“更简单”。这是否意味着更少的代码,更少的嵌套。
然而到目前为止,情况似乎并非如此:
- 我还没有找到很多将 Rx 与套接字一起使用的示例
- 我发现的示例似乎并不比我现有的 BeginXXXX、EndXXXX 代码
复杂 - 虽然
Observable
有FromAsyncPattern的扩展方法,但这不包括SocketEventArgs
Async API。
当前非工作解决方案
这是我到目前为止所拥有的。这不起作用,它因堆栈溢出而失败(呵呵)我还没有弄清楚语义,以便我可以创建一个IObservable
将读取指定字节数的。
static IObservable<int> GetMessages(Socket socket, IPAddress addr, int port)
{
var whenConnect = Observable.FromAsyncPattern<IPAddress, int>(socket.BeginConnect, socket.EndConnect)(addr, port);
// keep reading until we get the first 4 bytes
byte[] buffer = new byte[1024];
var readAsync = Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>(socket.BeginReceive, socket.EndReceive);
IObservable<int> readBytes = null;
var temp = from totalRead in Observable.Defer(() => readBytes)
where totalRead < 4
select readAsync(buffer, totalRead, totalRead - 4, SocketFlags.None);
readBytes = temp.SelectMany(x => x).Sum();
var nowDoSomethingElse = readBytes.SkipUntil(whenConnect);
}