As i am bit new to Rx and learning my way through it. I checked out lots of examples out there but none fits my need.
Scenario : I have one socket server socket (created using simple socket object and not TCPListener object). I have multiple clients (client socket and not TCPClient) connected to this server socket. I am trying to use Rx (Reactive extension) to get the messages sent by client socket by using "FromAsyncPattern" operator for BeginReceive and EndReceive asynchrounous operation.
I have almost started getting the messages in the buffer. But the issue is that the buffer return either the same message received from previous operation or will have mix content from current and previous receive operation.
Here is the code used :
private void ReceiveMessageFromClient(Socket socket)
{
try
{
if (socket != null)
{
byte[] buffer = new byte[400];
var functionReceiveSocketData = Observable.FromAsyncPattern
<byte[], int, int, SocketFlags, int>(socket.BeginReceive, socket.EndReceive);
Observable.Defer(() =>
functionReceiveSocketData(buffer, 0, 400, SocketFlags.None)
).Repeat().Subscribe(onNext: x => OnMessageReceived(buffer));
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
OnMessageReceived method is just an delegate to raise the message received event and send the buffer to other class to process further.
Problem : The same buffer is used evertime i get message how can i get the message received clean from previous received buffer.
-- What should be done if i have message received partially and the next message is actually part of this same message.
Please help me out with the above issue with some code snippets which would be greatful. Reason of posting this question is cause the implementation i have gone through till now are using TCPListener and here the constraint is to use socket object :(
Thanks in advance.