我是 RabbitMQ 的新手,想知道我正在考虑的这个问题的好方法。我想创建一个订阅队列并且只提取满足特定条件的消息的服务;例如,如果消息中包含特定的主题标头。
我仍在学习 RabbitMQ,并正在寻找有关如何解决此问题的提示。我的问题包括:消费者如何仅从队列中提取特定消息?生产者如何在消息中设置主题标头(如果这甚至是正确的术语?)
我是 RabbitMQ 的新手,想知道我正在考虑的这个问题的好方法。我想创建一个订阅队列并且只提取满足特定条件的消息的服务;例如,如果消息中包含特定的主题标头。
我仍在学习 RabbitMQ,并正在寻找有关如何解决此问题的提示。我的问题包括:消费者如何仅从队列中提取特定消息?生产者如何在消息中设置主题标头(如果这甚至是正确的术语?)
RabbitMQ 非常适合这种情况。你有很多选择可以做你想做的事。我建议阅读文档以更好地理解。我建议您使用主题或直接交流。主题更灵活。它是这样的。
生产者代码连接到 RabbitMQ 代理并使用特定名称创建和交换。
生产者发布交换。发布的每条消息都将使用路由键发布。
消费者连接到 RabbitMQ 代理。
消费者创建队列
Consumer 将 Queue 绑定到交换器,与生产者中定义的交换器相同。绑定还包括此特定消费者所需的每条消息的路由键。
假设您正在发布日志消息。路由键可能类似于“log.info”、“log.warn”、“log.error”。生产者发布的每条消息都将附加相关的路由密钥。然后,您将有一个消费者发送所有错误消息并通过电子邮件发送,另一个消费者将所有错误消息写入文件。因此,电子邮件发送者将使用路由键“log.error”定义从其队列到交换的绑定。这种方式虽然交换接收所有消息,但为电子邮件发送者定义的队列将只包含错误消息。文件记录器将定义一个新的单独队列绑定到同一个交换并设置不同的路由键。您可以为三个不同的路由键要求进行三个单独的绑定,或者只使用通配符“log.*”
这是一个简单的示例,展示了如何实现您想要做的事情。
在此处查找代码示例,特别是编号为 5 的教程。
建议充分利用rabbitmq的交换/路由。如果您确实想根据消息内容进行检查,以下代码是一个可行的解决方案。
从队列中检索消息并检查,有选择地确认您感兴趣的消息。
拉一条消息
GetResponse resp = channel.basicGet(QUEUE_NAME, false);
确认一条消息
channel.basicAck(resp.getEnvelope().getDeliveryTag(), false);
例子
import com.rabbitmq.client.*;
public class ReceiveLogs {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try(Connection connection = factory.newConnection();
Channel channel = connection.createChannel();){
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// pull one message and ack manually and exit
GetResponse resp = channel.basicGet(QUEUE_NAME, false);
if( resp != null ){
String message = new String(resp.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
channel.basicAck(resp.getEnvelope().getDeliveryTag(), false);
}
System.out.println();
}
}
}
依赖
compile group: 'com.rabbitmq', name: 'amqp-client', version: '5.8.0'
要从 RabbitMQ 检索消息,我们需要先连接 RabbitMQ 服务器
public WebClient GetRabbitMqConnection(string userName, string password)
{
var client = new WebClient();
client.Credentials = new NetworkCredential(userName, password);
return client;
}
现在使用以下代码从 RabbitMQ 检索消息。
public string GetRabbitMQMessages(string domainName, string port,
string queueName, string virtualHost, WebClient client, string methodType)
{
string messageResult = string.Empty;
string strUri = "http://" + domainName + ":" + port +
"/api/queues/" + virtualHost + "/";
var data = client.DownloadString(strUri + queueName + "/");
var queueInfo = JsonConvert.DeserializeObject<QueueInfo>(data);
if (queueInfo == null || queueInfo.messages == 0)
return string.Empty;
if (methodType == "POST")
{
string postbody = "
{\"ackmode\":\"ack_requeue_true\",\"count\":
\"$totalMessageCount\",\"name\":\"${DomainName}\",
\"requeue\":\"false\",\"encoding\":\"auto\",\"vhost\" :
\"${QueueName}\"}";
postbody = postbody
.Replace("$totalMessageCount", queueInfo.messages.ToString())
.Replace("${DomainName}", domainName)
.Replace("${QueueName}", queueName);
messageResult = client.UploadString(strUri + queueName +
"/get", "POST", postbody);
}
return messageResult;
}
我认为这将帮助您实现 RabbitMQ。
如果您想一次检索单个消息,请在您的检索代码中添加以下属性。
Boolean autoAck = false;
model.BasicConsume(Queuename, autoAck);
model.BasicGet("Queuename", false);
model.BasicGet("Queuename", false);
通过添加 RabbitMQ 的此属性,您可以从队列中一一检索消息。与 FIFO 标准相同