如何设置 1+ 个频道以在多个队列中发布/消费?
您可以使用线程和通道来实现。您所需要的只是一种对事物进行分类的方法,即登录中的所有队列项目、security_events 中的所有队列元素等。可以使用routingKey 来实现分类。
即:每次将项目添加到队列时,您都指定路由键。它将作为属性元素附加。通过这种方式,您可以从特定事件中获取值,例如logging。
以下代码示例说明了如何在客户端完成它。
例如:
路由键用于识别通道的类型并检索类型。
例如,如果您需要获取有关 Login 类型的所有通道,那么您必须将路由键指定为 login 或其他一些关键字来识别它。
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
string routingKey="login";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
您可以在此处查看有关分类的更多详细信息..
螺纹部分
发布部分结束后,您可以运行线程部分..
在这部分中,您可以根据类别获取已发布的数据。IE; 路由键,在您的情况下是日志记录、security_events 和 customer_orders 等。
查看示例以了解如何在线程中检索数据。
例如:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//**The threads part is as follows**
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
// This part will biend the queue with the severity (login for eg:)
for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
}
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.contentType;
long deliveryTag = envelope.getDeliveryTag();
// (process the message components here ...)
channel.basicAck(deliveryTag, false);
}
});
现在创建了一个处理登录类型(路由键)的队列中的数据的线程。通过这种方式,您可以创建多个线程。每个服务于不同的目的。
在这里查看有关线程部分的更多详细信息..