0

我在多线程应用程序中遇到问题,过去 3 天一直在调试它,但终其一生都无法弄清楚。我正在写这个,希望我在输入这个时有一个 DUH 时刻,或者有人在我提供的代码片段中看到一些明显的东西。这是发生了什么:

我一直在研究一个新的 UDP 网络库,并且有一个数据生成器,它将 UDP 数据报多播到多个接收器应用程序。发送者在两个不同的套接字上发送,这些套接字绑定到单独的 UDP 多播地址和单独的端口。接收方应用程序还创建两个套接字并将每个套接字绑定到发送方的多播地址/端口之一。

当接收器接收到数据报时,它会从 MemoryStream 中的缓冲区复制它,然后将其放入线程安全队列中,另一个线程从中读取数据并从 MemoryStream 中解码数据。

两个套接字都有自己的队列。

现在发生的事情真的很奇怪,它是随机发生的,不可重现,当我运行多个接收器应用程序时,它只会不时地在其中一个上随机发生。

基本上,从队列中读取 MemoryStream 的线程,通过像 ReadInt32() 等 BinaryReader 读取它,从而解码数据。然而,当它时不时地读取数据时,它从中读取的数据是不正确的,例如发送者永远不会编码的负整数。

然而,如前所述,解码仅在一个接收器应用程序中失败,在其他应用程序中数据报解码良好。现在您可能会说,好吧,UDP 数据报可能存在字节损坏或其他问题,但我已经记录了每个传入的数据报,并在所有接收器上进行了比较,并且每个应用程序接收到的数据报都是完全相同的。现在它变得更奇怪了,当我将解码失败的数据报转储到磁盘并编写一个读取它并通过解码器运行它的单元测试时,它解码得很好。此外,当我在解码器周围包裹一个 try/catch 时,重置 catch 中的 MemoryStream 位置并再次通过解码器运行它,它解码得很好。更奇怪的是,这也只会在我绑定两个套接字以从发送方读取数据时发生,如果我只绑定一个,它不会

以下是一些与正在发生的事情相对应的代码:

这是套接字的接收回调:

        private void ReceiveCompleted(object sender, SocketAsyncEventArgs args)
    {
        if (args.SocketError != SocketError.Success)
        {
            InternalShutdown(args.SocketError);
            return;
        }

        if (args.BytesTransferred > SequencedUnitHeader.UNIT_HEADER_SIZE)
        {
            DataChunk chunk = new DataChunk(args.BytesTransferred);
            Buffer.BlockCopy(args.Buffer, 0, chunk.Buffer, 0, args.BytesTransferred);

            chunk.MemoryStream = new MemoryStream(chunk.Buffer);
            chunk.BinaryReader = new BinaryReader(chunk.MemoryStream);

            chunk.SequencedUnitHeader.SequenceID = chunk.BinaryReader.ReadUInt32();
            chunk.SequencedUnitHeader.Count = chunk.BinaryReader.ReadByte();

            if (prevSequenceID + 1 != chunk.SequencedUnitHeader.SequenceID)
            {
                log.Error("UdpDatagramGap\tName:{0}\tExpected:{1}\tReceived:{2}", unitName, prevSequenceID + 1, chunk.SequencedUnitHeader.SequenceID);
            }
            else if (chunk.SequencedUnitHeader.SequenceID < prevSequenceID)
            {
                log.Error("UdpOutOfSequence\tName:{0}\tExpected:{1}\tReceived:{2}", unitName, prevSequenceID + 1, chunk.SequencedUnitHeader.SequenceID);
            }

            prevSequenceID = chunk.SequencedUnitHeader.SequenceID;

            messagePump.Produce(chunk);
        }
        else
            UdpStatistics.FramesRxDiscarded++;

        Socket.InvokeAsyncMethod(Socket.ReceiveAsync, ReceiveCompleted, asyncReceiveArgs);
    }

这是一些解码数据的存根代码:

        public static void OnDataChunk(DataChunk dataChunk)
    {
        try
        {
            for (int i = 0; i < dataChunk.SequencedUnitHeader.Count; i++)
            {
                int val = dataChunk.BinaryReader.ReadInt32();

                if(val < 0)
                    throw new Exception("EncodingException");

                // do something with that value
            }
        }
        catch (Exception ex)
        {
            writer.WriteLine("ID:" + dataChunk.SequencedUnitHeader.SequenceID + " Count:" + dataChunk.SequencedUnitHeader.Count + " " + BitConverter.ToString(dataChunk.Buffer, 0, dataChunk.Size));
            writer.Flush();
            log.ErrorException("OnDataChunk", ex);

            log.Info("RETRY FRAME:{0} Data:{1}", dataChunk.SequencedUnitHeader.SequenceID, BitConverter.ToString(dataChunk.Buffer, 0, dataChunk.Size));
            dataChunk.MemoryStream.Position = 0;

            dataChunk.SequencedUnitHeader.SequenceID = dataChunk.BinaryReader.ReadUInt32();
            dataChunk.SequencedUnitHeader.Count = dataChunk.BinaryReader.ReadByte();

            OnDataChunk(dataChunk);
        }
    }

您在 catch{} 部分中看到,我只是将 MemoryStream.Position 重置为 0 并再次调用相同的方法,下次它就可以正常工作了吗?在这一点上我真的没有想法,不幸的是没有 DUH 时刻写这个。任何人都知道可能发生了什么,或者我还能做些什么来解决这个问题?

谢谢,

汤姆

4

1 回答 1

0

看起来数据损坏是由于多个线程的不同步访问而发生的。您能否确认以线程安全的方式访问数据块成员?

于 2010-06-15T10:21:15.313 回答