0

我最近一直在将 SocketAsyncEventArgs 用于一个项目,并且遇到了一些问题,即 ReceiveAsync 偶尔会以与通过 SendAsync 发送的顺序不同的顺序获取数据。在 SendAsync 方法中发送的每个数据块都会被维护,但这些块的顺序不一定正确。也许我对 SendAsync 方法的理解不正确,但我认为特别是使用 SocketType.Stream 和 ProtocolType.Tcp 可以确保保持顺序。我知道底层进程将不可避免地破坏消息,并且 ReceiveAsync 通常会读取少于缓冲区分配的内容。但我假设发送和接收流会保持秩序。

我开发了一个显示问题的测试控制台程序。它每次尝试使用一组不同的套接字和端口运行大约 20 次。在我的笔记本电脑上,它通常会通过一次然后第二次失败;通常在期待第二个块时会收到一个稍后的块。从其他测试中,我知道预期的块最终确实会出现,只是不按顺序出现。

需要注意的是,我能够在 Windows 2008 远程服务器上对其进行测试并且没有任何问题。但是,它从未接近在我的笔记本电脑上完成。事实上,如果我让调试执行在异常中断中挂起一段时间,我已经让它不止一次地完全冻结了我的笔记本电脑,并且不得不进行硬重启。这是我在 Windows 7 上使用 VS2017 运行的工作笔记本电脑。我不确定这是否是一个因素,但它正在运行 Symantec Endpoint Protection,尽管我在日志中没有发现任何内容。

所以我的问题是,我对 SocketAsyncEventArgs 的运作方式有错误的看法吗?还是我的代码是一场灾难(也许两者兼而有之)?它是我的笔记本电脑独有的吗?(最后一个让我觉得我正在为编程新手做准备,并且您认为编译器一定有问题。)

using System;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading.Tasks;

static class DumTest
{

    static void Main(string[] args)
    {
        for (int i = 9177; i < 9199; i++)
        {
            RunDum(i);
            //Thread.Sleep(350);
        }

        Console.WriteLine("all done.");
        Console.ReadLine();
    }

    static void RunDum(int port)
    {
        var dr = new DumReceiver(port);
        var ds = new DumSender(port);

        dr.Acception.Wait();

        ds.Connection.Wait();

        dr.Completion.Wait();

        ds.Completion.Wait();

        Console.WriteLine($"Completed {port}. " +
            $"sent: {ds.SegmentsSent} segments, received: {dr.SegmentsRead} segments");
    }
}

class DumReceiver
{
    private readonly SocketAsyncEventArgs eva = new SocketAsyncEventArgs();
    private readonly TaskCompletionSource<object> tcsAcc = new TaskCompletionSource<object>();

    private TaskCompletionSource<object> tcsRcv;
    private Socket socket;

    internal DumReceiver(int port)
    {
        this.eva.Completed += this.Received;

        var lstSock = new Socket(
            AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

        var localIP = Dns.GetHostEntry(Dns.GetHostName()).AddressList
            .First(i => i.AddressFamily == AddressFamily.InterNetwork);

        lstSock.Bind(new IPEndPoint(localIP, port));
        lstSock.Listen(1);

        var saea = new SocketAsyncEventArgs();
        saea.Completed += this.AcceptCompleted;
        lstSock.AcceptAsync(saea);
    }

    internal Task Acception => this.tcsAcc.Task;

    internal Task Completion { get; private set; }

    internal int SegmentsRead { get; private set; }

    private void AcceptCompleted(object sender, SocketAsyncEventArgs e)
    {
        if (e.SocketError == SocketError.Success)
        {
            this.socket = e.AcceptSocket;
            e.Dispose();
            try
            {
                this.Completion = this.ReceiveLupeAsync();
            }
            finally
            {
                this.tcsAcc.SetResult(null);
            }
        }
        else
        {
            this.tcsAcc.SetException(new SocketException((int)e.SocketError));
        }
    }

    private async Task ReceiveLupeAsync()
    {
        var buf = new byte[8196];
        byte bufSeg = 1;
        int pos = 0;

        while (true)
        {
            this.tcsRcv = new TaskCompletionSource<object>();
            this.eva.SetBuffer(buf, pos, 8196 - pos);
            if (this.socket.ReceiveAsync(this.eva))
            {
                await this.tcsRcv.Task.ConfigureAwait(false);
            }

            if (this.eva.SocketError != SocketError.Success)
            {
                throw new SocketException((int)eva.SocketError);
            }

            if (this.eva.BytesTransferred == 0)
            {
                if (pos != 0)
                {
                    throw new EndOfStreamException();
                }

                break;
            }

            pos += this.eva.BytesTransferred;
            if (pos == 8196)
            {
                pos = 0;
                for (int i = 0; i < 8196; i++)
                {
                    if (buf[i] != bufSeg)
                    {
                        var msg = $"Expected {bufSeg} but read {buf[i]} ({i} of 8196). " +
                            $"Last read: {this.eva.BytesTransferred}.";
                        Console.WriteLine(msg);
                        throw new Exception(msg);
                    }
                }

                this.SegmentsRead++;
                bufSeg = (byte)(this.SegmentsRead + 1);
            }
        }
    }

    private void Received(object s, SocketAsyncEventArgs e) => this.tcsRcv.SetResult(null);
}

class DumSender
{
    private readonly SocketAsyncEventArgs eva = new SocketAsyncEventArgs();
    private readonly Socket socket = new Socket(
        AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

    private readonly TaskCompletionSource<object> tcsCon = new TaskCompletionSource<object>();
    private TaskCompletionSource<object> tcsSnd;

    internal DumSender(int port)
    {
        this.eva.Completed += this.Sent;

        var saea = new SocketAsyncEventArgs();
        var localIP = Dns.GetHostEntry(Dns.GetHostName()).AddressList
            .First(i => i.AddressFamily == AddressFamily.InterNetwork);

        saea.RemoteEndPoint = new IPEndPoint(localIP, port);
        saea.Completed += this.ConnectionCompleted;
        this.socket.ConnectAsync(saea);
    }

    internal Task Connection => this.tcsCon.Task;

    internal Task Completion { get; private set; }

    internal int SegmentsSent { get; private set; }

    private void ConnectionCompleted(object sender, SocketAsyncEventArgs e)
    {
        if (e.SocketError == SocketError.Success)
        {
            e.Dispose();

            try
            {
                this.Completion = this.SendLupeAsync();
            }
            finally
            {
                this.tcsCon.SetResult(null);
            }
        }
        else
        {
            this.tcsCon.SetException(new SocketException((int)e.SocketError));
        }
    }

    private async Task SendLupeAsync()
    {
        var buf = new byte[8196];
        byte bufSeg = 1;

        while (true)
        {
            for (int i = 0; i < 8196; i++)
            {
                buf[i] = bufSeg;
            }

            this.tcsSnd = new TaskCompletionSource<object>();
            this.eva.SetBuffer(buf, 0, 8196);
            if (this.socket.SendAsync(this.eva))
            {
                await this.tcsSnd.Task.ConfigureAwait(false);
            }

            if (this.eva.SocketError != SocketError.Success)
            {
                throw new SocketException((int)this.eva.SocketError);
            }

            if (this.eva.BytesTransferred != 8196)
            {
                throw new SocketException();
            }

            if (++this.SegmentsSent == 299)
            {
                break;
            }

            bufSeg = (byte)(this.SegmentsSent + 1);
        }

        this.socket.Shutdown(SocketShutdown.Both);
    }

    private void Sent(object s, SocketAsyncEventArgs e) => this.tcsSnd.SetResult(null);
}
4

1 回答 1

0

我相信问题出在您的代码中。

您必须检查使用Socket. 如果它们返回,则不会引发事件,您必须同步处理结果。*AsyncSocketAsyncEventArgsfalseSocketAsyncEventArgs.Completed

参考文档:SocketAsyncEventArgs 类。搜索willRaiseEvent

DumReceiver' 的构造函数中,您不检查AcceptAsync' 结果,并且您不处理同步完成的情况。

DumSender' 的构造函数中,您不检查ConnectAsync' 结果,并且您不处理同步完成的情况。

最重要的是,该SocketAsyncEventArgs.Completed事件可能会在其他一些线程中引发,很可能是来自ThreadPool.

每次您分配DumReceiver.tcsRcvDumSender.tcsSnd没有正确同步时,您都无法确定DumReceiver.Received并且DumSender.Sent正在使用最新的TaskCompletionSource.

实际上,您可以NullReferenceException在第一次迭代中获得一个。

您在以下方面缺乏同步:

  • DumReceiver, 字段tcsRcvsocket和属性CompletionSegmentsRead

  • DumSender, 场tcsSnd和性质CompletionSegmentsSent

我建议您考虑使用单个而不是在每次调用时SemaphoreSlim创建一个新的and 。您将在构造函数中将信号量初始化为 0。如果操作处于挂起状态,您将在信号量上,而事件将在信号量上。TaskCompletionSourceReceiveAsyncSendAsync*Asyncawait WaitAsyncCompletedRelease

这应该足以摆脱TaskCompletionSource田间的竞争条件。您仍然需要在其他字段和属性上进行适当的同步。例如,没有理由Completion不能在构造函数中创建,并且SegmentsReadandSegmentsSent可以是只读的,并且可以引用一个可以在内部使用一个或多个Interlocked方法(例如Interlocked.IncrementInterlocked.Add)访问的字段。

于 2017-09-14T09:44:18.273 回答