这是一个例子,但标记的答案是导致我这样做的原因。
var factory = new ConnectionFactory
{
HostName = "MY_HOST_NAME",
UserName = "USERNAME",
Password = "PASSWORD",
RequestedHeartbeat = 30
};
using (var connection = factory.CreateConnection())
{
connection.ConnectionShutdown += (o, e) =>
{
//handle disconnect
};
using (var model = connection.CreateModel())
{
model.ExchangeDeclare(EXCHANGE_NAME, "topic");
var queueName = model.QueueDeclare();
model.QueueBind(queueName, EXCHANGE_NAME, "#");
var consumer = new QueueingBasicConsumer(model);
model.BasicConsume(queueName, true, consumer);
while (!stop)
{
BasicDeliverEventArgs args;
consumer.Queue.Dequeue(5000, out args);
if (stop) return;
if (args == null) continue;
if (args.Body.Length == 0) continue;
Task.Factory.StartNew(() =>
{
//Do work here on different thread then this one
}, TaskCreationOptions.PreferFairness);
}
}
}
关于这一点有几点需要注意。
我正在使用 # 作为主题。这抓住了一切。通常你想限制一个主题。
我正在设置一个名为“stop”的变量来确定进程何时结束。您会注意到循环永远运行,直到该变量为真。
如果没有新消息,出队等待 5 秒然后离开而不获取数据。这是为了确保我们监听那个停止变量并在某个时候真正退出。根据您的喜好更改值。
当有消息进来时,我在一个新线程上生成处理代码。当前线程被保留用于仅收听 rabbitmq 消息,如果处理程序处理时间过长,我不希望它减慢其他消息的速度。根据您的实施,您可能需要也可能不需要。但是要小心编写代码来处理消息。如果运行需要一分钟并且您在亚秒级时间内收到消息,您将耗尽内存或至少会出现严重的性能问题。