1

As i am bit new to Rx and learning my way through it. I checked out lots of examples out there but none fits my need.

Scenario : I have one socket server socket (created using simple socket object and not TCPListener object). I have multiple clients (client socket and not TCPClient) connected to this server socket. I am trying to use Rx (Reactive extension) to get the messages sent by client socket by using "FromAsyncPattern" operator for BeginReceive and EndReceive asynchrounous operation.

I have almost started getting the messages in the buffer. But the issue is that the buffer return either the same message received from previous operation or will have mix content from current and previous receive operation.

Here is the code used :

    private void ReceiveMessageFromClient(Socket socket)
    {
        try
        {
            if (socket != null)
            {
                byte[] buffer = new byte[400];

                var functionReceiveSocketData = Observable.FromAsyncPattern
                                                            <byte[], int, int, SocketFlags, int>(socket.BeginReceive, socket.EndReceive);

                Observable.Defer(() =>
                                functionReceiveSocketData(buffer, 0, 400, SocketFlags.None)
                                ).Repeat().Subscribe(onNext: x => OnMessageReceived(buffer));
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
    }

OnMessageReceived method is just an delegate to raise the message received event and send the buffer to other class to process further.

Problem : The same buffer is used evertime i get message how can i get the message received clean from previous received buffer.

-- What should be done if i have message received partially and the next message is actually part of this same message.

Please help me out with the above issue with some code snippets which would be greatful. Reason of posting this question is cause the implementation i have gone through till now are using TCPListener and here the constraint is to use socket object :(

Thanks in advance.

4

1 回答 1

2

您遇到的问题是您在重复调用可观察对象之间共享相同的缓冲区实例。您需要确保每次都有一个新的缓冲区实例。看来您应该根据返回整数截断缓冲区。以下是我建议您的if陈述中需要包含的内容:

var functionReceiveSocketData =
    Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>
        (socket.BeginReceive, socket.EndReceive);

Func<byte[], int, byte[]> copy = (bs, n) =>
{
    var rs = new byte[n];
    bs.CopyTo(rs, 0);
    return rs;
};

Observable
    .Defer(() =>
    {
        var buffer = new byte[400];
        return 
            from n in functionReceiveSocketData(buffer, 0, 400, SocketFlags.None)
            select copy(buffer, n);
    })
    .Repeat()
    .Subscribe(x => OnMessageReceived(x));

编辑:响应评论重新加入消息部分。还修复了上面的解决方案 - 应该0而不是n在lambdaCopyTo函数中。copy

我不是使用套接字的专家,但是,因为套接字(如果我错了,请纠正我)只是一个字节流,你需要能够确定流中的每条消息何时完成。

这是一种可能的方法:

/* include `functionReceiveSocketData` & `copy` from above */ =

Func<byte[], byte[], byte[]> append = (bs1, bs2) =>
{
    var rs = new byte[bs1.Length + bs2.Length];
    bs1.CopyTo(rs, 0);
    bs2.CopyTo(rs, bs1.Length);
    return rs;
};

var messages = new Subject<byte[]>();
messages.Subscribe(x => OnMessageReceived(x));

Observable
    .Defer(() => /* as above */)
    .Repeat()
    .Scan(new byte[] { }, (abs, bs) =>
    {
        var current = append(abs, bs);
        if (isCompleteMessage(current))
        {
            messages.OnNext(current);
            current = new byte[] { };
        }
        return current;
    })
    .Subscribe();

您唯一需要做的就是弄清楚如何实现该isCompleteMessage功能。

于 2011-07-28T11:35:21.433 回答