1

我需要接收异步消息。

在所有消息中,前 2 个字节表示下一个字节数组的长度。我的问题是在少数情况下我会收到意外的数据包。

如果我使用Thread.Sleep(200)这个问题不会发生,或者很少发生。

我哪里错了?

protected void StartListening()
{
    StateObject state = new StateObject() { ProcessHeader = true };
    state.PrepareBuffer(HeaderLength);
    lock (_secureConnection)
        _secureConnection.BeginReceive(state.Buffer, 0, HeaderLength, 0, new AsyncCallback(ReadCallback), state);
}

private void ReadCallback(IAsyncResult ar)
{
    if (_disposing)
        return;
    StateObject state = (StateObject)ar.AsyncState;
    try
    {
        lock (_secureConnection)
            _secureConnection.EndReceive(ar);
        if (state.ProcessHeader)
        {
            state.ProcessHeader = !state.ProcessHeader;
            var bodyLength = GetBodyLength(state.Buffer);
            //Thread.Sleep(200);
            state.CompleteMessage.AddRange(state.Buffer);
            state.PrepareBuffer(bodyLength);
            lock (_secureConnection)
                _secureConnection.BeginReceive(state.Buffer, 0, bodyLength, 0, new AsyncCallback(ReadCallback), state);
        }
        else
        {
            state.CompleteMessage.AddRange(state.Buffer);
            ProcessMessage(state.CompleteMessage); //process this message
            //Thread.Sleep(200);
            state.ProcessHeader = !state.ProcessHeader;
            state.CompleteMessage.Clear();
            state.PrepareBuffer(HeaderLength);
            lock (_secureConnection)
                _secureConnection.BeginReceive(state.Buffer, 0, HeaderLength, 0, new AsyncCallback(ReadCallback), state);
        }
    }
    catch (Exception e)
    {
        Close(true);
    }
}

class StateObject
{
    public StateObject()
    {
        ProcessHeader = true;
    }
    public byte[] Buffer { get; private set; }
    public bool ProcessHeader { get; set; }
    public List<byte> CompleteMessage = new List<byte>();
    public void PrepareBuffer(int size)
    {
        Buffer = new byte[size];
    }
}
4

2 回答 2

2

您假设 TCP 是基于消息的协议。不过,它是一个字节流。您的读取可以读取任何大于零的数量。这就像使用 FileStream 一样。文件也没有消息。

你的代码必须处理这个事实。搜索“TCP 消息帧”。

于 2014-03-21T14:14:28.963 回答
0

SOLUTION FOUND 经过很长时间,我写了一个很好的解决方案,希望对某人有所帮助。非常感谢您的建议。

    int HeaderLength = 2;
    int bodyLength;
    int bytesReceived;
    int totalBytesReceived;

    protected void StartListening()
    {
        StateObject state = new StateObject() { ProcessHeader = true };
        state.PrepareBuffer(HeaderLength);
        lock (_secureConnection)
            _secureConnection.BeginReceive(state.Buffer, 0, HeaderLength, 0, new AsyncCallback(ReadCallback), state);
    }

    /// <summary>
    /// Reads the callback.
    /// 
    /// A message is in this form:
    /// 
    /// 2 bytes indicate the leght of body message (n)
    /// n bytes for body message
    /// 
    /// </summary>
    /// <param name="ar">IAsyncResult.</param>
    private void ReadCallback(IAsyncResult ar)
    {
        if (_disposing)
            return;
        StateObject state = (StateObject)ar.AsyncState;
        try
        {
            lock (_secureConnection)
                bytesReceived = _secureConnection.EndReceive(ar);

            if (state.ProcessHeader)    //In this phase I receive 2 bytes that indicate the total length of the next message
            {
                state.ProcessHeader = !state.ProcessHeader;
                bodyLength = GetBodyLength(state.Buffer);   //I interpret 2 bytes to know body message length
                state.CompleteMessage.AddRange(state.Buffer);
                state.PrepareBuffer(bodyLength);
                totalBytesReceived = bytesReceived = 0;
                lock (_secureConnection)
                    _secureConnection.BeginReceive(state.Buffer, 0, bodyLength, 0, new AsyncCallback(ReadCallback), state);
            }
            else    //In this phase I receive the message, with one or more recursive CallBack
            {
                state.CompleteMessage.AddRange(state.Buffer.ToList().GetRange(0, bytesReceived));

                totalBytesReceived += bytesReceived;
                int totalBytesMissing = bodyLength - totalBytesReceived;


                if (totalBytesReceived < bodyLength)
                {
                    state.PrepareBuffer(totalBytesMissing);
                    lock (_secureConnection)
                        _secureConnection.BeginReceive(state.Buffer, 0, totalBytesMissing, 0, new AsyncCallback(ReadCallback), state);
                    return;
                }
                //totalMessageLenght = body length plus first 2 bytes indicate body length
                int totalMessageLenght = bodyLength + 2;    

                var completeMessage = state.CompleteMessage.GetRange(0, totalMessageLenght).ToList();
                ProcessMessage(completeMessage);
                state.ProcessHeader = !state.ProcessHeader; //I prepare Callback to read 2 bytes indicate the total length of the next message

                state.CompleteMessage.Clear();
                state.PrepareBuffer(HeaderLength);
                lock (_secureConnection)
                    _secureConnection.BeginReceive(state.Buffer, 0, HeaderLength, 0, new AsyncCallback(ReadCallback), state);
            }
        }
        catch (Exception e)
        {
            Close(true);
        }
    }
于 2014-04-01T08:28:57.363 回答