2

我尝试更改以下配置以支持优先级队列。根据已完成的研究,它说我应该为每个优先级设置两个队列。我从以下调整了我的配置:

@Configuration
public class FixedReplyQueueConfig {

@Bean
public ConnectionFactory rabbitConnectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setHost("localhost");
    connectionFactory.setUsername("urbanbuz");
    connectionFactory.setPassword("ub");
    connectionFactory.setVirtualHost("urbanbuzvhost");
    return connectionFactory;
}

/**
 * @return Rabbit template with fixed reply queue.
 */
@Bean
public RabbitTemplate fixedReplyQRabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory());
    template.setExchange(ex().getName());
    template.setRoutingKey("test");
    template.setReplyQueue(replyQueue());
    return template;
}

/**
 * @return The reply listener container - the rabbit template is the listener.
 */
@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(rabbitConnectionFactory());
    container.setQueues(replyQueue());
    container.setMessageListener(fixedReplyQRabbitTemplate());
    return container;
}

/**
 * @return The listener container that handles the request and returns the reply.
 */
@Bean
public SimpleMessageListenerContainer serviceListenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(rabbitConnectionFactory());
    container.setQueues(requestQueue());
    container.setMessageListener(new MessageListenerAdapter(new PojoListener()));
    return container;
}

@Bean
public DirectExchange ex() {
    return new DirectExchange("ub.exchange", false, true);
}

@Bean
public Binding binding() {
    return BindingBuilder.bind(requestQueue()).to(ex()).with("test");
}

@Bean
public Queue requestQueue() {
    return new Queue("ub.request");
}

@Bean
public Queue replyQueue() {
    return new Queue("ub.reply");
}

/**
 * @return an admin to handle the declarations.
 */
@Bean
public RabbitAdmin admin() {
    return new RabbitAdmin(rabbitConnectionFactory());
}
}

到以下:

@Configuration
public class FixedReplyQueueConfig {

@Bean
public ConnectionFactory rabbitConnectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setHost("localhost");
    connectionFactory.setUsername("urbanbuz");
    connectionFactory.setPassword("ub");
    connectionFactory.setVirtualHost("urbanbuzvhost");
    return connectionFactory;
}

/**
 * @return Rabbit template with fixed reply queue.
 */
@Bean
public RabbitTemplate fixedReplyQRabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory());
    template.setExchange(ex().getName());
    template.setRoutingKey("high");
    template.setRoutingKey("normal");
    template.setReplyQueue(replyQueue());
    return template;
}


/**
 * @return The reply listener container - the rabbit template is the listener.
 */
@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(rabbitConnectionFactory());
    container.setQueues(replyQueue());
    container.setMessageListener(fixedReplyQRabbitTemplate());
    return container;
}


/**
 * @return The listener container that handles the request and returns the reply.
 */
@Bean
public SimpleMessageListenerContainer serviceListenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(rabbitConnectionFactory());
    container.setQueues(requestQueueHigh(), requestQueue());
    container.setMessageListener(new MessageListenerAdapter(new PojoListener()));
    return container;
}

@Bean
public DirectExchange ex() {
    return new DirectExchange("ub.exchange", false, true);
}

@Bean
public Binding binding() {
    return BindingBuilder.bind(requestQueue()).to(ex()).with("normal");
}

@Bean
public Binding bindingHigh() {
    return BindingBuilder.bind(requestQueueHigh()).to(ex()).with("high");
}

@Bean
public Queue requestQueue() {
    return new Queue("ub.request");
}

@Bean
public Queue requestQueueHigh() {
    return new Queue("ub.request.high");
}

@Bean
public Queue replyQueue() {
    return new Queue("ub.reply");
}

/**
 * @return an admin to handle the declarations.
 */
@Bean
public RabbitAdmin admin() {
    return new RabbitAdmin(rabbitConnectionFactory());
}
}

我是否以正确的方式解决这个问题?现在我应该如何继续让消费者消费一个而不是另一个?

这就是我调用测试的方式:

public class App {  
public static void main(String[] args) {        
    ApplicationContext context = new AnnotationConfigApplicationContext(FixedReplyQueueConfig.class);
    RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);

    rabbitTemplate.convertSendAndReceive("ub.exchange", "normal" , "yalla");

    rabbitTemplate.convertSendAndReceive("ub.exchange", "high" , "hello");
}
}

这是我的 pojo 课:

public class PojoListener {
public String handleMessage(String foo) {
    System.out.println("IN MESSAGE RECEIVER");
    return "it's the weekend!!!!!!!!!!!!";
}
}

两条消息都是通过发送的,但不确定如何使用已实现的配置和类实现优先级。

请注意,我试图避免使用任何插件来支持优先级。

4

1 回答 1

1

Spring AMQP ( BlockingQueueConsumer) 中的代码如下所示:

for (String queueName : queues) {
    if (!this.missingQueues.contains(queueName)) {
        consumeFromQueue(queueName);
    }
}

操作系统,如果您的配置是:

container.setQueues(requestQueueHigh(), requestQueue());

你真的会收到来自requestQueueHigh()over的消息requestQueue()。它独立于concurrencyon ListenerContainer

您可以使用多条消息进行测试,但stopped在应用程序开始时使用侦听器。start()最终在发送这些消息后,侦听器容器。

于 2014-11-27T09:42:56.260 回答