36

我发现TaskCompletionSource.SetResult();在返回之前调用等待任务的代码。就我而言,这会导致死锁。

这是一个普通的开始的简化版Thread

void ReceiverRun()
    while (true)
    {
        var msg = ReadNextMessage();
        TaskCompletionSource<Response> task = requests[msg.RequestID];

        if(msg.Error == null)
            task.SetResult(msg);
        else
            task.SetException(new Exception(msg.Error));
    }
}

代码的“异步”部分看起来像这样。

await SendAwaitResponse("first message");
SendAwaitResponse("second message").Wait();

Wait 实际上嵌套在非异步调用中。

SendAwaitResponse(简化)

public static Task<Response> SendAwaitResponse(string msg)
{
    var t = new TaskCompletionSource<Response>();
    requests.Add(GetID(msg), t);
    stream.Write(msg);
    return t.Task;
}

我的假设是第二个 SendAwaitResponse 将在 ThreadPool 线程中执行,但它会在为 ReceiverRun 创建的线程中继续。

无论如何设置任务的结果而不继续其等待的代码?

该应用程序是一个控制台应用程序

4

4 回答 4

37

我发现 TaskCompletionSource.SetResult(); 在返回之前调用等待任务的代码。就我而言,这会导致死锁。

是的,我有一篇博客文章记录了这一点(AFAIK 没有记录在 MSDN 上)。发生死锁是因为两件事:

  1. 有混合async代码和阻塞代码(即,一个async方法正在调用Wait)。
  2. 任务继续使用TaskContinuationOptions.ExecuteSynchronously.

我建议从最简单的解决方案开始:删除第一件事 (1)。即,不要混合asyncWait调用:

await SendAwaitResponse("first message");
SendAwaitResponse("second message").Wait();

相反,await始终如一地使用:

await SendAwaitResponse("first message");
await SendAwaitResponse("second message");

如果需要,您可以Wait在调用堆栈的另一位置(不在方法中async)。

这是我最推荐的解决方案。但是,如果您想尝试删除第二件事 (2),您可以做一些技巧:或者将 a 包装SetResultTask.Run一个单独的线程上(我的AsyncEx 库具有*WithBackgroundContinuations执行此操作的扩展方法),或者给出您的线程实际上下文(例如我的AsyncContext类型)并指定ConfigureAwait(false),这将导致继续忽略ExecuteSynchronously标志

但是这些解决方案比仅仅分离async代码和阻塞代码要复杂得多。

作为旁注,看看TPL Dataflow;听起来您可能会发现它很有用。

于 2013-10-21T14:31:42.040 回答
6

由于您的应用程序是控制台应用程序,因此它在默认同步上下文中运行,其中await继续回调将在等待任务已完成的同一线程上调用。如果你想在之后切换线程await SendAwaitResponse,你可以这样做await Task.Yield()

await SendAwaitResponse("first message");
await Task.Yield(); 
// will be continued on a pool thread
// ...
SendAwaitResponse("second message").Wait(); // so no deadlock

您可以通过将其存储Thread.CurrentThread.ManagedThreadId在内部Task.Result并将其与await. 如果您仍在同一线程上,请执行await Task.Yield().

虽然我知道这SendAwaitResponse是您实际代码的简化版本,但它内部仍然是完全同步的(就像您在问题中展示的方式一样)。为什么你会期望那里有任何线程切换?

无论如何,您可能应该重新设计您的逻辑,使其不对您当前所在的线程做出假设。避免混合await并使Task.Wait()所有代码异步。Wait()通常,可以只在顶层的某个地方(例如 inside )坚持一个Main

[编辑]由于默认同步上下文的行为,调用task.SetResult(msg)fromReceiverRun实际上将控制流转移到您await在- 没有线程切换的位置。task因此,执行实际消息处理的代码正在接管ReceiverRun线程。最终,SendAwaitResponse("second message").Wait()在同一个线程上被调用,导致死锁。

下面是一个控制台应用程序代码,以您的示例为蓝本。它使用await Task.Yield()insideProcessAsync在单独的线程上安排继续,因此控制流返回ReceiverRun并且没有死锁。

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication
{
    class Program
    {
        class Worker
        {
            public struct Response
            {
                public string message;
                public int threadId;
            }

            CancellationToken _token;
            readonly ConcurrentQueue<string> _messages = new ConcurrentQueue<string>();
            readonly ConcurrentDictionary<string, TaskCompletionSource<Response>> _requests = new ConcurrentDictionary<string, TaskCompletionSource<Response>>();

            public Worker(CancellationToken token)
            {
                _token = token;
            }

            string ReadNextMessage()
            {
                // using Thread.Sleep(100) for test purposes here,
                // should be using ManualResetEvent (or similar synchronization primitive),
                // depending on how messages arrive
                string message;
                while (!_messages.TryDequeue(out message))
                {
                    Thread.Sleep(100);
                    _token.ThrowIfCancellationRequested();
                }
                return message;
            }

            public void ReceiverRun()
            {
                LogThread("Enter ReceiverRun");
                while (true)
                {
                    var msg = ReadNextMessage();
                    LogThread("ReadNextMessage: " + msg);
                    var tcs = _requests[msg];
                    tcs.SetResult(new Response { message = msg, threadId = Thread.CurrentThread.ManagedThreadId });
                    _token.ThrowIfCancellationRequested(); // this is how we terminate the loop
                }
            }

            Task<Response> SendAwaitResponse(string msg)
            {
                LogThread("SendAwaitResponse: " + msg);
                var tcs = new TaskCompletionSource<Response>();
                _requests.TryAdd(msg, tcs);
                _messages.Enqueue(msg);
                return tcs.Task;
            }

            public async Task ProcessAsync()
            {
                LogThread("Enter Worker.ProcessAsync");

                var task1 = SendAwaitResponse("first message");
                await task1;
                LogThread("result1: " + task1.Result.message);
                // avoid deadlock for task2.Wait() with Task.Yield()
                // comment this out and task2.Wait() will dead-lock
                if (task1.Result.threadId == Thread.CurrentThread.ManagedThreadId)
                    await Task.Yield();

                var task2 = SendAwaitResponse("second message");
                task2.Wait();
                LogThread("result2: " + task2.Result.message);

                var task3 = SendAwaitResponse("third message");
                // still on the same thread as with result 2, no deadlock for task3.Wait()
                task3.Wait();
                LogThread("result3: " + task3.Result.message);

                var task4 = SendAwaitResponse("fourth message");
                await task4;
                LogThread("result4: " + task4.Result.message);
                // avoid deadlock for task5.Wait() with Task.Yield()
                // comment this out and task5.Wait() will dead-lock
                if (task4.Result.threadId == Thread.CurrentThread.ManagedThreadId)
                    await Task.Yield();

                var task5 = SendAwaitResponse("fifth message");
                task5.Wait();
                LogThread("result5: " + task5.Result.message);

                LogThread("Leave Worker.ProcessAsync");
            }

            public static void LogThread(string message)
            {
                Console.WriteLine("{0}, thread: {1}", message, Thread.CurrentThread.ManagedThreadId);
            }
        }

        static void Main(string[] args)
        {
            Worker.LogThread("Enter Main");
            var cts = new CancellationTokenSource(5000); // cancel after 5s
            var worker = new Worker(cts.Token);
            Task receiver = Task.Run(() => worker.ReceiverRun());
            Task main = worker.ProcessAsync();
            try
            {
                Task.WaitAll(main, receiver);
            }
            catch (Exception e)
            {
                Console.WriteLine("Exception: " + e.Message);
            }
            Worker.LogThread("Leave Main");
            Console.ReadLine();
        }
    }
}

Task.Run(() => task.SetResult(msg))这和在里面做的没有太大区别ReceiverRun。我能想到的唯一优点是您可以明确控制何时切换线程。这样,您可以尽可能长时间地停留在同一个线程上(例如,for task2, task3, task4,但之后您仍然需要另一个线程切换task4以避免死锁task5.Wait())。

这两种解决方案最终都会使线程池增长,这在性能和可扩展性方面都很糟糕。

现在,如果我们在上面的代码中用任何地方替换task.Wait()await taskProcessAsync,我们将不必使用await Task.Yield并且仍然不会出现死锁。但是,第一个内部await之后的整个调用链实际上将在线程上执行。只要我们不使用其他风格的调用阻塞这个线程,并且在我们处理消息时不要做大量的 CPU 密集型工作,这种方法就可以正常工作(异步 IO 密集型调用仍然应该是好的,它们实际上可能会触发隐式线程切换)。await task1ProcessAsyncReceiverRunWait()await

也就是说,我认为您需要一个单独的线程,并在其上安装一个序列化同步上下文来处理消息(类似于WindowsFormsSynchronizationContext)。这就是你的异步代码awaits应该运行的地方。您仍然需要避免Task.Wait在该线程上使用。如果单个消息处理需要大量 CPU 密集型工作,您应该使用Task.Run此类工作。对于异步 IO 绑定调用,您可以留在同一个线程上。

您可能希望查看ActionDispatcher/ActionDispatcherSynchronizationContext来自@StephenClearyNito 异步库,了解您的异步消息处理逻辑。希望斯蒂芬能够加入并提供更好的答案。

于 2013-10-21T07:43:12.490 回答
0

“我的假设是第二个 SendAwaitResponse 将在 ThreadPool 线程中执行,但它会在为 ReceiverRun 创建的线程中继续执行。”

这完全取决于您在 SendAwaitResponse 中所做的事情。异步和并发不是一回事

查看:C# 5 Async/Await - 它是 *concurrent* 吗?

于 2013-10-20T19:56:48.417 回答
0

聚会有点晚了,但这是我认为具有附加值的解决方案。

我也一直在为此苦苦挣扎,我通过在等待的方法上捕获 SynchronizationContext 来解决它。

它看起来像:

// just a default sync context
private readonly SynchronizationContext _defaultContext = new SynchronizationContext();

void ReceiverRun()
{
    while (true)    // <-- i would replace this with a cancellation token
    {
        var msg = ReadNextMessage();
        TaskWithContext<TResult> task = requests[msg.RequestID];

        // if it wasn't a winforms/wpf thread, it would be null
        // we choose our default context (threadpool)
        var context = task.Context ?? _defaultContext;

        // execute it on the context which was captured where it was added. So it won't get completed on this thread.
        context.Post(state =>
        {
            if (msg.Error == null)
                task.TaskCompletionSource.SetResult(msg);
            else
                task.TaskCompletionSource.SetException(new Exception(msg.Error));
        });
    }
}

public static Task<Response> SendAwaitResponse(string msg)
{
    // The key is here! Save the current synchronization context.
    var t = new TaskWithContext<Response>(SynchronizationContext.Current); 

    requests.Add(GetID(msg), t);
    stream.Write(msg);
    return t.TaskCompletionSource.Task;
}

// class to hold a task and context
public class TaskWithContext<TResult>
{
    public SynchronizationContext Context { get; }

    public TaskCompletionSource<TResult> TaskCompletionSource { get; } = new TaskCompletionSource<Response>();

    public TaskWithContext(SynchronizationContext context)
    {
        Context = context;
    }
}
于 2019-03-19T10:26:20.013 回答