由于您的应用程序是控制台应用程序,因此它在默认同步上下文中运行,其中await
继续回调将在等待任务已完成的同一线程上调用。如果你想在之后切换线程await SendAwaitResponse
,你可以这样做await Task.Yield()
:
await SendAwaitResponse("first message");
await Task.Yield();
// will be continued on a pool thread
// ...
SendAwaitResponse("second message").Wait(); // so no deadlock
您可以通过将其存储Thread.CurrentThread.ManagedThreadId
在内部Task.Result
并将其与await
. 如果您仍在同一线程上,请执行await Task.Yield()
.
虽然我知道这SendAwaitResponse
是您实际代码的简化版本,但它内部仍然是完全同步的(就像您在问题中展示的方式一样)。为什么你会期望那里有任何线程切换?
无论如何,您可能应该重新设计您的逻辑,使其不对您当前所在的线程做出假设。避免混合await
并使Task.Wait()
所有代码异步。Wait()
通常,可以只在顶层的某个地方(例如 inside )坚持一个Main
。
[编辑]由于默认同步上下文的行为,调用task.SetResult(msg)
fromReceiverRun
实际上将控制流转移到您await
在- 没有线程切换的位置。task
因此,执行实际消息处理的代码正在接管ReceiverRun
线程。最终,SendAwaitResponse("second message").Wait()
在同一个线程上被调用,导致死锁。
下面是一个控制台应用程序代码,以您的示例为蓝本。它使用await Task.Yield()
insideProcessAsync
在单独的线程上安排继续,因此控制流返回ReceiverRun
并且没有死锁。
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication
{
class Program
{
class Worker
{
public struct Response
{
public string message;
public int threadId;
}
CancellationToken _token;
readonly ConcurrentQueue<string> _messages = new ConcurrentQueue<string>();
readonly ConcurrentDictionary<string, TaskCompletionSource<Response>> _requests = new ConcurrentDictionary<string, TaskCompletionSource<Response>>();
public Worker(CancellationToken token)
{
_token = token;
}
string ReadNextMessage()
{
// using Thread.Sleep(100) for test purposes here,
// should be using ManualResetEvent (or similar synchronization primitive),
// depending on how messages arrive
string message;
while (!_messages.TryDequeue(out message))
{
Thread.Sleep(100);
_token.ThrowIfCancellationRequested();
}
return message;
}
public void ReceiverRun()
{
LogThread("Enter ReceiverRun");
while (true)
{
var msg = ReadNextMessage();
LogThread("ReadNextMessage: " + msg);
var tcs = _requests[msg];
tcs.SetResult(new Response { message = msg, threadId = Thread.CurrentThread.ManagedThreadId });
_token.ThrowIfCancellationRequested(); // this is how we terminate the loop
}
}
Task<Response> SendAwaitResponse(string msg)
{
LogThread("SendAwaitResponse: " + msg);
var tcs = new TaskCompletionSource<Response>();
_requests.TryAdd(msg, tcs);
_messages.Enqueue(msg);
return tcs.Task;
}
public async Task ProcessAsync()
{
LogThread("Enter Worker.ProcessAsync");
var task1 = SendAwaitResponse("first message");
await task1;
LogThread("result1: " + task1.Result.message);
// avoid deadlock for task2.Wait() with Task.Yield()
// comment this out and task2.Wait() will dead-lock
if (task1.Result.threadId == Thread.CurrentThread.ManagedThreadId)
await Task.Yield();
var task2 = SendAwaitResponse("second message");
task2.Wait();
LogThread("result2: " + task2.Result.message);
var task3 = SendAwaitResponse("third message");
// still on the same thread as with result 2, no deadlock for task3.Wait()
task3.Wait();
LogThread("result3: " + task3.Result.message);
var task4 = SendAwaitResponse("fourth message");
await task4;
LogThread("result4: " + task4.Result.message);
// avoid deadlock for task5.Wait() with Task.Yield()
// comment this out and task5.Wait() will dead-lock
if (task4.Result.threadId == Thread.CurrentThread.ManagedThreadId)
await Task.Yield();
var task5 = SendAwaitResponse("fifth message");
task5.Wait();
LogThread("result5: " + task5.Result.message);
LogThread("Leave Worker.ProcessAsync");
}
public static void LogThread(string message)
{
Console.WriteLine("{0}, thread: {1}", message, Thread.CurrentThread.ManagedThreadId);
}
}
static void Main(string[] args)
{
Worker.LogThread("Enter Main");
var cts = new CancellationTokenSource(5000); // cancel after 5s
var worker = new Worker(cts.Token);
Task receiver = Task.Run(() => worker.ReceiverRun());
Task main = worker.ProcessAsync();
try
{
Task.WaitAll(main, receiver);
}
catch (Exception e)
{
Console.WriteLine("Exception: " + e.Message);
}
Worker.LogThread("Leave Main");
Console.ReadLine();
}
}
}
Task.Run(() => task.SetResult(msg))
这和在里面做的没有太大区别ReceiverRun
。我能想到的唯一优点是您可以明确控制何时切换线程。这样,您可以尽可能长时间地停留在同一个线程上(例如,for task2
, task3
, task4
,但之后您仍然需要另一个线程切换task4
以避免死锁task5.Wait()
)。
这两种解决方案最终都会使线程池增长,这在性能和可扩展性方面都很糟糕。
现在,如果我们在上面的代码中用任何地方替换task.Wait()
await task
ProcessAsync
,我们将不必使用await Task.Yield
并且仍然不会出现死锁。但是,第一个内部await
之后的整个调用链实际上将在线程上执行。只要我们不使用其他风格的调用阻塞这个线程,并且在我们处理消息时不要做大量的 CPU 密集型工作,这种方法就可以正常工作(异步 IO 密集型调用仍然应该是好的,它们实际上可能会触发隐式线程切换)。await task1
ProcessAsync
ReceiverRun
Wait()
await
也就是说,我认为您需要一个单独的线程,并在其上安装一个序列化同步上下文来处理消息(类似于WindowsFormsSynchronizationContext
)。这就是你的异步代码awaits
应该运行的地方。您仍然需要避免Task.Wait
在该线程上使用。如果单个消息处理需要大量 CPU 密集型工作,您应该使用Task.Run
此类工作。对于异步 IO 绑定调用,您可以留在同一个线程上。
您可能希望查看ActionDispatcher
/ActionDispatcherSynchronizationContext
来自@StephenCleary的
Nito 异步库,了解您的异步消息处理逻辑。希望斯蒂芬能够加入并提供更好的答案。