我一直在努力处理一些异步等待的东西。我正在使用 RabbitMQ 在某些程序之间发送/接收消息。
作为背景知识,RabbitMQ 客户端使用了 3 个左右的线程,我可以看到:一个连接线程和两个心跳线程。每当通过 TCP 接收到消息时,连接线程都会处理它并调用我通过接口提供的回调。文档说最好避免在此调用期间做大量工作,因为它在与连接相同的线程上完成,并且需要继续进行。他们提供了一个QueueingBasicConsumer
具有阻塞“出队”方法的方法,该方法用于等待接收消息。
我希望我的消费者能够在这段等待时间内真正释放他们的线程上下文,以便其他人可以做一些工作,所以我决定使用 async/await 任务。我编写了一个以以下方式AwaitableBasicConsumer
使用 s 的类:TaskCompletionSource
我有一个等待的出列方法:
public Task<RabbitMQ.Client.Events.BasicDeliverEventArgs> DequeueAsync(CancellationToken cancellationToken)
{
//we are enqueueing a TCS. This is a "read"
rwLock.EnterReadLock();
try
{
TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs = new TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs>();
//if we are cancelled before we finish, this will cause the tcs to become cancelled
cancellationToken.Register(() =>
{
tcs.TrySetCanceled();
});
//if there is something in the undelivered queue, the task will be immediately completed
//otherwise, we queue the task into deliveryTCS
if (!TryDeliverUndelivered(tcs))
deliveryTCS.Enqueue(tcs);
}
return tcs.Task;
}
finally
{
rwLock.ExitReadLock();
}
}
rabbitmq 客户端调用的回调完成任务:这是从 AMQP 连接线程的上下文中调用的
public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IBasicProperties properties, byte[] body)
{
//we want nothing added while we remove. We also block until everybody is done.
rwLock.EnterWriteLock();
try
{
RabbitMQ.Client.Events.BasicDeliverEventArgs e = new RabbitMQ.Client.Events.BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
bool sent = false;
TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs;
while (deliveryTCS.TryDequeue(out tcs))
{
//once we manage to actually set somebody's result, we are done with handling this
if (tcs.TrySetResult(e))
{
sent = true;
break;
}
}
//if nothing was sent, we queue up what we got so that somebody can get it later.
/**
* Without the rwlock, this logic would cause concurrency problems in the case where after the while block completes without sending, somebody enqueues themselves. They would get the
* next message and the person who enqueues after them would get the message received now. Locking prevents that from happening since nobody can add to the queue while we are
* doing our thing here.
*/
if (!sent)
{
undelivered.Enqueue(e);
}
}
finally
{
rwLock.ExitWriteLock();
}
}
rwLock
是一个ReaderWriterLockSlim
。两个队列 (deliveryTCS
和undelivered
) 是 ConcurrentQueues。
问题:
每隔一段时间,等待出队方法的方法就会抛出一个异常。这通常不是问题,因为该方法也是async
,因此它进入任务进入的“异常”完成状态。DequeueAsync
问题出现在RabbitMQ 客户端创建的 AMQP 连接线程等待后恢复调用的任务的情况。通常我已经看到任务恢复到主线程或浮动的工作线程之一。但是,当它恢复到 AMQP 线程并引发异常时,一切都停止了。该任务没有进入其“异常状态”,并且 AMQP 连接线程留在说它正在执行发生异常的方法。
我在这里的主要困惑是为什么这不起作用:
var task = c.RunAsync(); //<-- This method awaits the DequeueAsync and throws an exception afterwards
ConsumerTaskState state = new ConsumerTaskState()
{
Connection = connection,
CancellationToken = cancellationToken
};
//if there is a problem, we execute our faulted method
//PROBLEM: If task fails when its resumed onto the AMQP thread, this method is never called
task.ContinueWith(this.OnFaulted, state, TaskContinuationOptions.OnlyOnFaulted);
这是RunAsync
为测试设置的方法:
public async Task RunAsync()
{
using (var channel = this.Connection.CreateModel())
{
...
AwaitableBasicConsumer consumer = new AwaitableBasicConsumer(channel);
var result = consumer.DequeueAsync(this.CancellationToken);
//wait until we find something to eat
await result;
throw new NotImplementeException(); //<-- the test exception. Normally this causes OnFaulted to be called, but sometimes, it stalls
...
} //<-- This is where the debugger says the thread is sitting at when I find it in the stalled state
}
阅读我所写的内容,我发现我可能没有很好地解释我的问题。如果需要澄清,请问。
我提出的解决方案如下:
- 删除所有 Async/Await 代码,直接使用线程并阻塞。性能会下降,但至少有时不会停顿
- 以某种方式免除 AMQP 线程用于恢复任务。我假设他们正在睡觉或其他什么,然后默认
TaskScheduler
决定使用它们。如果我能找到一种方法来告诉任务调度程序这些线程是不受限制的,那就太好了。
有没有人解释为什么会发生这种情况或有任何解决这个问题的建议?现在我正在删除异步代码,以便程序可靠,但我真的很想了解这里发生了什么。