44

这里证明。
知道这段代码有什么问题吗?

    [TestMethod]
    public void TestTest()
    {
        var tcp = new TcpClient() { ReceiveTimeout = 5000, SendTimeout = 20000 };
        tcp.Connect(IPAddress.Parse("176.31.100.115"), 25);
        bool ok = Read(tcp.GetStream()).Wait(30000);
        Assert.IsTrue(ok);
    }

    async Task Read(NetworkStream stream)
    {
        using (var cancellationTokenSource = new CancellationTokenSource(5000))
        {
            int receivedCount;
            try
            {
                var buffer = new byte[1000];
                receivedCount = await stream.ReadAsync(buffer, 0, 1000, cancellationTokenSource.Token);
            }
            catch (TimeoutException e)
            {
                receivedCount = -1;
            }
        }
    }
4

6 回答 6

37

我终于找到了解决方法。使用 Task.WaitAny 将异步调用与延迟任务 (Task.Delay) 相结合。当延迟在 io 任务之前过去时,关闭流。这将强制任务停止。您应该正确处理 io 任务的异步异常。并且您应该为延迟任务和 io 任务添加一个延续任务。

它也适用于 tcp 连接。在另一个线程中关闭连接(您可以认为它是延迟任务线程)会强制使用/等待此连接停止的所有异步任务。

- 编辑 -

@vtortola 建议的另一个更清洁的解决方案:使用取消令牌注册对流的调用。关闭:

async ValueTask Read(NetworkStream stream, TimeSpan timeout = default)
{
    if(timeout == default(TimeSpan))
      timeout = TimeSpan.FromSeconds(5);

    using var cts = new CancellationTokenSource(timeout); //C# 8 syntax
    using(cts.Token.Register(() => stream.Close()))
    {
       int receivedCount;
       try
       {
           var buffer = new byte[30000];
           receivedCount = await stream.ReadAsync(buffer, 0, 30000, tcs.Token).ConfigureAwait(false);
       }
       catch (TimeoutException)
       {
           receivedCount = -1;
       }
    }
}
于 2012-10-15T09:43:35.857 回答
22

取消是合作的。NetworkStream.ReadAsync必须配合才能取消。这样做有点困难,因为这可能会使流处于未定义状态。哪些字节已经从 Windows TCP 堆栈中读取,哪些还没有?IO 不容易取消。

反射器显示NetworkStream不覆盖ReadAsync。这意味着它将获得Stream.ReadAsync只是丢弃令牌的默认行为。没有通用方法可以取消 Stream 操作,因此 BCLStream类甚至不尝试(它不能尝试 - 没有办法做到这一点)。

您应该在Socket.

于 2012-09-14T20:35:55.743 回答
6

根据 Softlion 的回答中的描述:

使用 Task.WaitAny 将异步调用与延迟任务 (Task.Delay) 相结合。当延迟在 io 任务之前过去时,关闭流。这将强制任务停止。您应该正确处理 io 任务的异步异常。你应该为delay任务和io任务添加一个延续任务。

我编写了一些代码,可以为您提供带超时的异步读取:

using System;
using System.Net.Sockets;
using System.Threading.Tasks;

namespace ConsoleApplication2013
{
    class Program
    {
        /// <summary>
        /// Does an async read on the supplied NetworkStream and will timeout after the specified milliseconds.
        /// </summary>
        /// <param name="ns">NetworkStream object on which to do the ReadAsync</param>
        /// <param name="s">Socket associated with ns (needed to close to abort the ReadAsync task if the timeout occurs)</param>
        /// <param name="timeoutMillis">number of milliseconds to wait for the read to complete before timing out</param>
        /// <param name="buffer"> The buffer to write the data into</param>
        /// <param name="offset">The byte offset in buffer at which to begin writing data from the stream</param>
        /// <param name="amountToRead">The maximum number of bytes to read</param>
        /// <returns>
        /// a Tuple where Item1 is true if the ReadAsync completed, and false if the timeout occurred,
        /// and Item2 is set to the amount of data that was read when Item1 is true
        /// </returns>
        public static async Task<Tuple<bool, int>> ReadWithTimeoutAsync(NetworkStream ns, Socket s, int timeoutMillis, byte[] buffer, int offset, int amountToRead)
        {
            Task<int> readTask = ns.ReadAsync(buffer, offset, amountToRead);
            Task timeoutTask = Task.Delay(timeoutMillis);

            int amountRead = 0;

            bool result = await Task.Factory.ContinueWhenAny<bool>(new Task[] { readTask, timeoutTask }, (completedTask) =>
            {
                if (completedTask == timeoutTask) //the timeout task was the first to complete
                {
                    //close the socket (unless you set ownsSocket parameter to true in the NetworkStream constructor, closing the network stream alone was not enough to cause the readTask to get an exception)
                    s.Close();
                    return false; //indicate that a timeout occurred
                }
                else //the readTask completed
                {
                    amountRead = readTask.Result;
                    return true;
                }
            });

            return new Tuple<bool, int>(result, amountRead);
        }

        #region sample usage
        static void Main(string[] args)
        {
            Program p = new Program();
            Task.WaitAll(p.RunAsync());
        }

        public async Task RunAsync()
        {
            Socket s = new Socket(SocketType.Stream, ProtocolType.Tcp);

            Console.WriteLine("Connecting...");
            s.Connect("127.0.0.1", 7894);  //for a simple server to test the timeout, run "ncat -l 127.0.0.1 7894"
            Console.WriteLine("Connected!");

            NetworkStream ns = new NetworkStream(s);

            byte[] buffer = new byte[1024];
            Task<Tuple<bool, int>> readWithTimeoutTask = Program.ReadWithTimeoutAsync(ns, s, 3000, buffer, 0, 1024);
            Console.WriteLine("Read task created");

            Tuple<bool, int> result = await readWithTimeoutTask;

            Console.WriteLine("readWithTimeoutTask is complete!");
            Console.WriteLine("Read succeeded without timeout? " + result.Item1 + ";  Amount read=" + result.Item2);
        }
        #endregion
    }
}
于 2014-01-03T18:06:51.617 回答
3

有几个问题在那里弹出:

  1. CancellationTokenthrows OperationCanceledException, not TimeoutException(取消并不总是由于超时)。
  2. ReceiveTimeout不适用,因为您正在执行异步读取。即使是这样,您也会在IOExceptionand之间出现竞争条件OperationCanceledException
  3. 由于您正在同步连接套接字,因此您需要在此测试中设置较长的超时时间(IIRC,默认连接超时时间约为 90 秒,但可以在 Windows 监控网络速度时进行更改)。
  4. 测试异步代码的正确方法是使用异步测试:

    [TestMethod]
    public async Task TestTest()
    {
        var tcp = new TcpClient() { ReceiveTimeout = 5000, SendTimeout = 20000 };
        tcp.Connect(IPAddress.Parse("176.31.100.115"), 25);
        await Read(tcp.GetStream());
    }
    
于 2012-09-14T10:37:45.313 回答
1

提供有关三种不同方法的更多背景信息。我的服务监控其他 Web 应用程序的可用性。因此,它需要与各种网站建立大量连接。其中一些崩溃/返回错误/变得无响应。

轴 Y - 挂起测试(会话)的数量。由于部署/重新启动而下降到 0。

I.(1 月 25 日)改造服务后,初始实现使用带有取消令牌的 ReadAsync。这导致大量测试挂起(对这些网站运行请求表明服务器有时确实没有返回内容)。

二、(2 月 17 日)部署了使用 Task.Delay 保护取消的更改。这完全解决了这个问题。

private async Task<int> StreamReadWithCancellationTokenAsync(Stream stream, byte[] buffer, int count, Task cancellationDelayTask)
{
    if (cancellationDelayTask.IsCanceled)
    {
        throw new TaskCanceledException();
    }

    // Stream.ReadAsync doesn't honor cancellation token. It only checks it at the beginning. The actual
    // operation is not guarded. As a result if remote server never responds and connection never closed
    // it will lead to this operation hanging forever.
    Task<int> readBytesTask = stream.ReadAsync(
        buffer,
        0,
        count);
    await Task.WhenAny(readBytesTask, cancellationDelayTask).ConfigureAwait(false);

    // Check whether cancellation task is cancelled (or completed).
    if (cancellationDelayTask.IsCanceled || cancellationDelayTask.IsCompleted)
    {
        throw new TaskCanceledException();
    }

    // Means that main task completed. We use Result directly.
    // If the main task failed the following line will throw an exception and
    // we'll catch it above.
    int readBytes = readBytesTask.Result;

    return readBytes;
}

III(3 月 3 日)在此 StackOverflow 之后实现了基于超时关闭流:

using (timeoutToken.Register(() => stream.Close()))
{
    // Stream.ReadAsync doesn't honor cancellation token. It only checks it at the beginning. The actual
    // operation is not guarded. As a result if a remote server never responds and connection never closed
    // it will lead to this operation hanging forever.
    // ReSharper disable once MethodSupportsCancellation
    readBytes = await targetStream.ReadAsync(
        buffer,
        0,
        Math.Min(responseBodyLimitInBytes - totalReadBytes, buffer.Length)).ConfigureAwait(false);
}

这个实现带来了后顾之忧(与最初的方法不同):

在此处输入图像描述

恢复为 Task.Delay 解决方案。

于 2020-04-09T01:50:00.410 回答
1

只是提醒一下,await _stream.WriteAsync(message,cancellationToken);(_stream 是一个 SslStream) 在执行 BeginEndWriteAsync 之前在后台检查取消令牌是否已被取消,因此您必须在开始写入之前取消您的令牌。

public virtual Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
    {
        // If cancellation was requested, bail early with an already completed task.
        // Otherwise, return a task that represents the Begin/End methods.
        return cancellationToken.IsCancellationRequested
                    ? Task.FromCanceled(cancellationToken)
                    : BeginEndWriteAsync(buffer, offset, count);
    }
于 2020-09-04T15:04:56.013 回答