9

我一直在努力处理一些异步等待的东西。我正在使用 RabbitMQ 在某些程序之间发送/接收消息。

作为背景知识,RabbitMQ 客户端使用了 3 个左右的线程,我可以看到:一个连接线程和两个心跳线程。每当通过 TCP 接收到消息时,连接线程都会处理它并调用我通过接口提供的回调。文档说最好避免在此调用期间做大量工作,因为它在与连接相同的线程上完成,并且需要继续进行。他们提供了一个QueueingBasicConsumer具有阻塞“出队”方法的方法,该方法用于等待接收消息。

我希望我的消费者能够在这段等待时间内真正释放他们的线程上下文,以便其他人可以做一些工作,所以我决定使用 async/await 任务。我编写了一个以以下方式AwaitableBasicConsumer使用 s 的类:TaskCompletionSource

我有一个等待的出列方法:

public Task<RabbitMQ.Client.Events.BasicDeliverEventArgs> DequeueAsync(CancellationToken cancellationToken)
{
    //we are enqueueing a TCS. This is a "read"
    rwLock.EnterReadLock();

    try
    {
        TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs = new TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs>();

        //if we are cancelled before we finish, this will cause the tcs to become cancelled
        cancellationToken.Register(() =>
        {
            tcs.TrySetCanceled();
        });

        //if there is something in the undelivered queue, the task will be immediately completed
        //otherwise, we queue the task into deliveryTCS
        if (!TryDeliverUndelivered(tcs))
            deliveryTCS.Enqueue(tcs);
        }

        return tcs.Task;
    }
    finally
    {
        rwLock.ExitReadLock();
    }
}

rabbitmq 客户端调用的回调完成任务:这是从 AMQP 连接线程的上下文中调用的

public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IBasicProperties properties, byte[] body)
{
    //we want nothing added while we remove. We also block until everybody is done.
    rwLock.EnterWriteLock();
    try
    {
        RabbitMQ.Client.Events.BasicDeliverEventArgs e = new RabbitMQ.Client.Events.BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);

        bool sent = false;
        TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs;
        while (deliveryTCS.TryDequeue(out tcs))
        {
            //once we manage to actually set somebody's result, we are done with handling this
            if (tcs.TrySetResult(e))
            {
                sent = true;
                break;
            }
        }

        //if nothing was sent, we queue up what we got so that somebody can get it later.
        /**
         * Without the rwlock, this logic would cause concurrency problems in the case where after the while block completes without sending, somebody enqueues themselves. They would get the
         * next message and the person who enqueues after them would get the message received now. Locking prevents that from happening since nobody can add to the queue while we are
         * doing our thing here.
         */
        if (!sent)
        {
            undelivered.Enqueue(e);
        }
    }
    finally
    {
        rwLock.ExitWriteLock();
    }
}

rwLock是一个ReaderWriterLockSlim。两个队列 (deliveryTCSundelivered) 是 ConcurrentQueues。

问题:

每隔一段时间,等待出队方法的方法就会抛出一个异常。这通常不是问题,因为该方法也是async,因此它进入任务进入的“异常”完成状态。DequeueAsync问题出现在RabbitMQ 客户端创建的 AMQP 连接线程等待后恢复调用的任务的情况。通常我已经看到任务恢复到主线程或浮动的工作线程之一。但是,当它恢复到 AMQP 线程并引发异常时,一切都停止了。该任务没有进入其“异常状态”,并且 AMQP 连接线程留在说它正在执行发生异常的方法。

我在这里的主要困惑是为什么这不起作用:

var task = c.RunAsync(); //<-- This method awaits the DequeueAsync and throws an exception afterwards

ConsumerTaskState state = new ConsumerTaskState()
{
    Connection = connection,
    CancellationToken = cancellationToken
};

//if there is a problem, we execute our faulted method
//PROBLEM: If task fails when its resumed onto the AMQP thread, this method is never called
task.ContinueWith(this.OnFaulted, state, TaskContinuationOptions.OnlyOnFaulted);

这是RunAsync为测试设置的方法:

public async Task RunAsync()
{
    using (var channel = this.Connection.CreateModel())
    {
        ...
        AwaitableBasicConsumer consumer = new AwaitableBasicConsumer(channel);
        var result = consumer.DequeueAsync(this.CancellationToken);

        //wait until we find something to eat
        await result;

        throw new NotImplementeException(); //<-- the test exception. Normally this causes OnFaulted to be called, but sometimes, it stalls
        ...
    } //<-- This is where the debugger says the thread is sitting at when I find it in the stalled state
}

阅读我所写的内容,我发现我可能没有很好地解释我的问题。如果需要澄清,请问。

我提出的解决方案如下:

  • 删除所有 Async/Await 代码,直接使用线程并阻塞。性能会下降,但至少有时不会停顿
  • 以某种方式免除 AMQP 线程用于恢复任务。我假设他们正在睡觉或其他什么,然后默认TaskScheduler决定使用它们。如果我能找到一种方法来告诉任务调度程序这些线程是不受限制的,那就太好了。

有没有人解释为什么会发生这种情况或有任何解决这个问题的建议?现在我正在删除异步代码,以便程序可靠,但我真的很想了解这里发生了什么。

4

1 回答 1

5

我首先建议您阅读我的async介绍,它准确地解释了如何await捕获上下文并使用它来恢复执行。简而言之,它将捕获当前SynchronizationContext(或当前TaskSchedulerif SynchronizationContext.Currentis null)。

另一个重要的细节是async延续计划TaskContinuationOptions.ExecuteSynchronously(正如@svick 在评论中指出的那样)。我有一篇关于此的博客文章,但 AFAIK 并没有在任何地方正式记录。这个细节确实使编写async生产者/消费者队列变得困难。

原因await不是“切换回原始上下文”是(可能)因为 RabbitMQ 线程没有SynchronizationContextor TaskScheduler- 因此,当您调用时直接执行延续,TrySetResult因为这些线程看起来就像常规线程池线程。

顺便说一句,通读您的代码,我怀疑您使用读/写锁和并发队列不正确。如果没有看到整个代码,我无法确定,但这是我的印象。

我强烈建议您使用现有async队列并围绕它建立一个消费者(换句话说,让其他人来做困难的部分:)。TPL Dataflow中的BufferBlock<T>类型可以充当队列;如果您的平台上有可用的 Dataflow,那将是我的第一个建议。否则,我的AsyncEx 库中有一个类型,或者您可以编写自己的类型(正如我在博客中描述的那样)。asyncAsyncProducerConsumerQueue

这是一个使用示例BufferBlock<T>

private readonly BufferBlock<RabbitMQ.Client.Events.BasicDeliverEventArgs> _queue = new BufferBlock<RabbitMQ.Client.Events.BasicDeliverEventArgs>();

public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IBasicProperties properties, byte[] body)
{
    RabbitMQ.Client.Events.BasicDeliverEventArgs e = new RabbitMQ.Client.Events.BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
    _queue.Post(e);
}

public Task<RabbitMQ.Client.Events.BasicDeliverEventArgs> DequeueAsync(CancellationToken cancellationToken)
{
    return _queue.ReceiveAsync(cancellationToken);
}

在这个例子中,我保留了你的DequeueAsyncAPI。但是,一旦您开始使用 TPL 数据流,请考虑在其他地方也使用它。当您需要这样的队列时,通常会发现代码的其他部分也将从数据流方法中受益。例如,DequeueAsync您可以将您的方法链接BufferBlock到一个ActionBlock.

于 2013-10-31T00:28:59.097 回答