59

我刚刚阅读了RabbitMQ 的 Java API 文档,发现它内容丰富且直截了当。如何设置一个简单Channel的发布/消费的例子很容易理解。但这是一个非常简单/基本的示例,它给我留下了一个重要问题:如何设置 1+Channels来发布/使用多个队列?

假设我有一个带有 3 个队列的 RabbitMQ 服务器loggingsecurity_eventscustomer_orders. 因此,我们要么需要一个Channel能够发布/使用所有 3 个队列的能力,要么更可能需要 3 个单独Channels的 ,每个专用于一个队列。

最重要的是,RabbitMQ 的最佳实践要求我们Channel为每个消费者线程设置 1 个。对于这个例子,假设security_events只有 1 个消费者线程很好,但loggingcustomer_order需要 5 个线程来处理卷。所以,如果我理解正确,这是否意味着我们需要:

  • 1个Channel和1个消费者线程,用于发布/消费到和从security_events;和
  • 5个Channels和5个消费者线程,用于发布/消费到和从logging;和
  • 5 个Channels和 5 个消费者线程,用于发布/消费到和从customer_orders?

如果我的理解在这里被误导,请先纠正我。无论哪种方式,一些厌战的 RabbitMQ 资深人士能否帮助我用一个体面的代码示例“连接点”,以便在这里设置满足我要求的发布者/消费者?提前致谢!

4

2 回答 2

138
于 2013-09-09T15:09:04.870 回答
25

如何设置 1+ 个频道以在多个队列中发布/消费?

您可以使用线程和通道来实现。您所需要的只是一种对事物进行分类的方法,即登录中的所有队列项目、security_events 中的所有队列元素等。可以使用routingKey 来实现分类。

即:每次将项目添加到队列时,您都指定路由键。它将作为属性元素附加。通过这种方式,您可以从特定事件中获取值,例如logging

以下代码示例说明了如何在客户端完成它。

例如:

路由键用于识别通道的类型并检索类型。

例如,如果您需要获取有关 Login 类型的所有通道,那么您必须将路由键指定为 login 或其他一些关键字来识别它。

            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.exchangeDeclare(EXCHANGE_NAME, "direct");

            string routingKey="login";

            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());

您可以在此处查看有关分类的更多详细信息..


螺纹部分

发布部分结束后,您可以运行线程部分..

在这部分中,您可以根据类别获取已发布的数据。IE; 路由键,在您的情况下是日志记录、security_events 和 customer_orders 等。

查看示例以了解如何在线程中检索数据。

例如:

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
//**The threads part is as follows** 
 channel.exchangeDeclare(EXCHANGE_NAME, "direct");      
 String queueName = channel.queueDeclare().getQueue();
    // This part will biend the queue with the severity (login for eg:)
    for(String severity : argv){
              channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
    }
    boolean autoAck = false;
    channel.basicConsume(queueName, autoAck, "myConsumerTag",
    new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag,
                                Envelope envelope,
                                AMQP.BasicProperties properties,
                                byte[] body)
         throws IOException
     {
             String routingKey = envelope.getRoutingKey();
             String contentType = properties.contentType;
             long deliveryTag = envelope.getDeliveryTag();

             // (process the message components here ...)
             channel.basicAck(deliveryTag, false);
     }
 });

现在创建了一个处理登录类型(路由键)的队列中的数据的线程。通过这种方式,您可以创建多个线程。每个服务于不同的目的。

在这里查看有关线程部分的更多详细信息..

于 2013-09-09T15:06:57.530 回答