I have created a small class using RabbitMQ that implements a publish/subscribe messaging pattern on a topic exchange. On top of this pub/sub I have the methods and properties:

  1. void Send(Message, Subject) - Publish message to destination topic for any subscribers to handle.

  2. MessageReceivedEvent - Subscribe to message received events on this messaging instance (messaging instance is bound to the desired subscribe topic when created).

  3. SendWaitReply(Message, Subject) - Send a message and block until a reply message is received with a correlation id matching the sent message id (or timeout). This is essentially a request/reply or RPC mechanism on top of the pub/sub pattern.

The messaging patterns I have chosen are somewhat set in stone due to the way the system is to be designed. I realize I could use reply-to queues to mitigate the potential issue with SendWaitReply, but that breaks some requirements.

Right now my issues are:

  • For the Listen event, the messages are processed synchronously through the event subscribers as the listener runs in a single thread. This causes some serious performance issues when handling large volumes of messages (i.e. in a back-end process consuming events from a web api). I am considering passing in a callback function as opposed to subscribing to an event and then dispatching the collection of callbacks in parallel using Task or Threadpool. Thread safety would obviously now be a concern of the caller. I am not sure if this is a correct approach.

  • For the SendWaitReply event, I have built what seems to be a hacky solution that takes all inbound messages from the message listener loop and places them in a ConcurrentDictionary if they contain a non-empty correlation guid. Then in the SendWaitReply method, I poll the ConcurrentDictionary for a message containing a key that matches the Id of the sent message (or timeout after a certain period). If there is a faster/better way to do this, I would really like to investigate it. Maybe a way to signal to all of the currently blocked SendWaitReply methods that a new message is available and they should all check their Ids instead of polling continuously?

Update 10/15/2014

After much exhaustive research, I have concluded that there is no "official" mechanism/helper/library to directly handle the particular use-case I have presented above for SendWaitReply in the scope of RabbitMQ or AMQP. I will stick with my current solution (and investigate more robust implementations) for the time being. There have been answers recommending I use the provided RPC functionality, but this unfortunately only works in the case that you want to use exclusive callback queues on a per-request basis. This breaks one of my major requirements of having all messages (request and reply) visible on the same topic exchange.

To further clarify, the typical message pair for a SendWaitReply request is in the format of:

  • Topic_Exchange.Service_A => some_command => Topic_Exchange.Service_B
  • Topic_Exchange.Service_B => some_command_reply => Topic_Exchange.Service_A

This affords me a powerful debugging and logging technique where I simply set up a listener on Topic_Exchange.# and can see all of the system traffic for tracing very deep 'call stacks' through various services.

TL; DR - Current Problem Below

Backing down from the architectural level - I still have an issue with the message listener loop. I have tried the EventingBasicConsumer and am still seeing a block. The way my class works is that the caller subscribes to the delegate provided by the instance of the class. The message loop fires the event on that delegate and those subscribers then handle the message. It seems as if I need a different way to pass the message event handlers into the instance such that they don't all sit behind one delegate which enforces synchronous processing.


1 回答 1


很难说为什么您的代码在没有示例的情况下会阻塞,但是为了防止在使用时阻塞,您应该使用 EventingBasicConsumer。

var consumer = new EventingBasicConsumer;
consumer.Received += (s, delivery) => { /* do stuff here */ };
channel.BasicConsume(queue, false, consumer);

需要注意的是,如果您使用 autoAck = false (就像我一样),那么您需要确保在执行 channel.BasicAck 时锁定通道,否则您可能会遇到 .NET 库中的并发问题。

对于 SendWaitReply,如果您只使用 RabbitMQ 客户端库中包含的 SimpleRpcClient,您的运气可能会更好:

var props = channel.CreateBasicProperties();
// Set your properties
var client = new RabbitMQ.Client.MessagePatterns.SimpleRpcClient(channel, exchange, ExchangeType.Direct, routingKey);
IBasicProperties replyProps;
byte[] response = client.Call(props, body, out replyProps);

SimpleRpcClient 将处理创建一个临时队列、相关 ID 等,而不是构建您自己的。如果你发现你想做一些更高级的事情,源代码也是一个很好的参考。

于 2014-10-11T04:42:50.030 回答