1

我正在做一个项目,我通过 TCP/IP 通信序列化命令。当我在本地主机上时它可以工作,但是当我在不同的服务器上运行我的侦听器时,它在尝试反序列化侦听器端的命令时随机失败。抛出的异常是:“尝试反序列化一个空流。” 和“在解析完成之前遇到流结束。” 从序列化器

当我单独运行一系列命令时,它可以正常工作,但是当我创建线程并同时运行多个序列时,它会失败。

侦听器在 4 个不同的端口上创建侦听器,客户端为每个端口运行 1 个线程。当其中一个线程到达序列的末尾时,它终止。

我试图让我的客户单身,也尝试过 Mutex。但仍然是同样的问题。

这是我的客户:

public class TcpIpCommunicator : ICommunicator, IDisposable
{
    private Dictionary<int,TcpClient> clientSockets = new Dictionary<int,TcpClient>();
    public IInverterCommand ReadAsyncCommand { set; get; }
    private static TcpIpCommunicator tcpIpCommunicator;

    private TcpIpCommunicator()
    {
    }

    public static TcpIpCommunicator GetInstance()
    {
        if(tcpIpCommunicator == null)
            tcpIpCommunicator = new TcpIpCommunicator();

        return tcpIpCommunicator;
    }

    public void Send(IInverterCommand command, int id)
    {
        var serializer = new Serializer();
        MemoryStream stream = serializer.SerializeMultipleObjects(command);
        var _bytes = stream.GetBuffer();

        var networkStream = clientSockets[id].GetStream();
        networkStream.Write(_bytes, 0, _bytes.Length);
        networkStream.Flush();
    }

    public IInverterCommand Read(int id)
    {
        var memoryStream = new MemoryStream();

        byte[] buffer;
        var networkStream = clientSockets[id].GetStream();
        do
        {
            buffer = new byte[clientSockets[id].ReceiveBufferSize];
            int sizeRead = networkStream.Read(buffer, 0, buffer.Length);
            memoryStream.Write(buffer, 0, sizeRead);
        } while (networkStream.DataAvailable);

        networkStream.Flush();

        memoryStream.Position = 0;
        var serializer = new Serializer();
        return serializer.DeSerializeMultipleObject(memoryStream);

    }

    public void ReadAsync(object id)
    {
        ReadAsyncCommand = Read((int)id);
    }

    public void Dispose()
    {
        foreach (var tcpClient in clientSockets.Values)
        {
            tcpClient.Close();
        }
    }

    public int Connect(string ip, int port)
    {
        var tcpClient = new TcpClient();
        tcpClient.ReceiveTimeout = int.MaxValue;
        tcpClient.SendTimeout = int.MaxValue;
        tcpClient.Connect(ip, port);
        int key = findKey();
        clientSockets.Add(key, tcpClient);

        return key;
    }

    public void DestroyConnection(int id)
    {
        clientSockets[id].Close();
        clientSockets.Remove(id);
    }

    private int findKey()
    {
        int key = 0;
        while(clientSockets.ContainsKey(key))
        {
            key++;
        }

        return key;
    }
}

我的服务器端代码在这里:

public class TCPListener : IDisposable
{
    private readonly TcpListener _serverSocket;
    private NetworkStream _networkStream;
    private readonly TcpClient _clientSocket = default(TcpClient);

    public TCPListener(int port)
    {
        _serverSocket = new TcpListener(port);
        _serverSocket.Server.ReceiveTimeout = int.MaxValue;
        _serverSocket.Server.SendTimeout = int.MaxValue;
        _serverSocket.Start();
        _clientSocket = _serverSocket.AcceptTcpClient();
    }

    public void Send(IInverterCommand message)
    {
        _networkStream = _clientSocket.GetStream();
        var serialize = new Serializer();
        var stream = serialize.SerializeMultipleObjects(message);
        var _bytes = stream.GetBuffer();

        if (_bytes.Length > _clientSocket.ReceiveBufferSize)
        {
            byte[] bytes = new byte[_clientSocket.ReceiveBufferSize];
            for (int i = 0; i < _bytes.Length; i += _clientSocket.ReceiveBufferSize)
            {
                for (int j = 0; j < _clientSocket.ReceiveBufferSize && i + j != _bytes.Length; ++j)
                {
                    bytes[j] = _bytes[i + j];
                }

                _networkStream.Write(bytes, 0, _clientSocket.ReceiveBufferSize);
            }
        }
        else
        {
            _networkStream.Write(_bytes, 0, _bytes.Length);
        }

        Thread.Sleep(50);

        _networkStream.Flush();
    }

    public IInverterCommand ReadCommand()
    {
        _networkStream = _clientSocket.GetStream();
        var memoryStream = new MemoryStream();

        do
        {
            var buffer = new byte[_clientSocket.ReceiveBufferSize];
            int sizeRead = _networkStream.Read(buffer, 0, buffer.Length);
            memoryStream.Write(buffer, 0, sizeRead);
            Thread.Sleep(50);
        } while (_networkStream.DataAvailable);

        _networkStream.Flush();
        memoryStream.Position = 0;
        var serializer = new Serializer();
        return serializer.DeSerializeMultipleObject(memoryStream);
    }

    public void Dispose()
    {
        _clientSocket.Close();
        _serverSocket.Stop();
    }
}

这里通常是在客户端调用代码:

IInverterCommand command = new SoftwareUpdateInverterCommand();
        tcpClient.Send(command, tcpId);

        var thread = new Thread(tcpClient.ReadAsync);
        thread.Start(tcpId);

        if (!thread.Join(timeout))
        {
            thread.Abort();
            tcpClient.DestroyConnection(tcpId);
            return;
        }

和服务器端调用代码:

//Recieving CMD on software update
            TcpListener = new TCPListener((int)port);
            var command = TcpListener.ReadCommand();

            //Sending OK back to server
            command.Message = "OK";
            TcpListener.Send(command);
4

1 回答 1

0

我建议您阅读线程同步的主题以及它的含义以及如何使用它。此外,当您在非线程安全的线程之间共享内容时会发生什么。

您的代码中根本没有线程同步,但是您从至少 2 个不同的线程读取和写入字典(不安全)而没有任何同步。

仅此一项就可能导致难以追踪的错误以及许多您不愿处理的不同现象。

确实需要阅读有关如何正确执行线程的信息。

于 2012-10-04T10:50:03.363 回答