有没有办法使用单个同步调用接收多条消息?
当我知道队列中有 N 条消息(N 可能是小于 10 的小值)时,我应该能够执行类似 channel.basic_get(String queue, boolean autoAck, int numberofMsg) 的操作。我不想向服务器发出多个请求。
有没有办法使用单个同步调用接收多条消息?
当我知道队列中有 N 条消息(N 可能是小于 10 的小值)时,我应该能够执行类似 channel.basic_get(String queue, boolean autoAck, int numberofMsg) 的操作。我不想向服务器发出多个请求。
不幸的是,如文档中所见, RabbitMQbasic.get
不支持多条消息。检索多条消息的首选方法是使用basic.consume将消息推送到客户端,避免多次往返。是异步的,因此您的客户端不会等待服务器响应。还具有允许 RabbitMQ 在客户端断开连接时重新传递消息的好处,这是无法做到的。这也可以关闭,也可以设置为。acks
basic.consume
basic.get
no-ack
true
设置basic.qos
prefetch-count
将设置随时推送到客户端的消息数量。如果在客户端没有等待消息(将立即返回),客户端库往往会因可选的超时而阻塞。
您可以使用允许您在单个请求中检索多条消息QueueingConsumer
的接口实现。Consumer
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(plugin.getQueueName(), false, queueingConsumer);
for(int i = 0; i < 10; i++){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(100);//read timeout in ms
if(delivery == null){
break;
}
}
这不是一个优雅的解决方案,也不能解决多次调用问题,但您可以使用 MessageCount 方法。例如:
bool noAck = false;
var messageCount = channel.MessageCount("hello");
BasicGetResult result = null;
if (messageCount == 0)
{
// No messages available
}
else
{
while (messageCount > 0)
{
result = channel.BasicGet("hello", noAck);
var message = Encoding.UTF8.GetString(result.Body);
//process message .....
messageCount = channel.MessageCount("hello");
}
首先声明包装模型的 QueueingBasicConsumer() 实例。
从模型执行 model.BasicConsume(QueueName, false, consumer)
然后实现一个循环,该循环将围绕来自队列的消息进行循环,然后处理
下一行 - consumer.Queue.Dequeue() 方法 - 等待接收消息队列。
然后将字节数组转换为字符串并显示出来。
Model.BasicAck() - 从队列中释放消息以接收下一条消息
然后在服务器端可以开始等待下一条消息通过:
public string GetMessagesByQueue(string QueueName)
{
var consumer = new QueueingBasicConsumer(_model);
_model.BasicConsume(QueueName, false, consumer);
string message = string.Empty;
while (Enabled)
{
//Get next message
var deliveryArgs = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
//Serialize message
message = Encoding.Default.GetString(deliveryArgs.Body);
_model.BasicAck(deliveryArgs.DeliveryTag, false);
}
return message;
}