0

我正在构建一个应用程序,它侦听多个 Rabbit MQ QUEUES 并执行逻辑并创建一个文件到 FTP,我使用“RabbitListenerContainerFactory”控制了每个队列的侦听器。

根据 FTP 位置的可用性,我将控制侦听器以侦听消息或处于空闲状态。

例如 - 我们有两个队列

  1. 队列-a
  2. 队列-b

Queue-a 消息将由

 @Component
public class ListenerForQueueA {

    @RabbitListener
    (queues = "queue-a",
        id = "queue-a", containerFactory = "myRabbitListenerContainerFactory")
    public void processOrder(String message) throws Exception {
        //businesslogic
    
    }

Queue-b 消息将被

 @Component
public class ListenerForQueueB {

    @RabbitListener
    (queues = "queue-b",
        id = "queue-b", containerFactory = "myRabbitListenerContainerFactory")
    public void processOrder(String message) throws Exception {
        //businesslogic
    
    }

我的兔子配置 -

 @Bean
  public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
      SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
      factory.setConnectionFactory(connectionFactory());
      factory.setMaxConcurrentConsumers(10);
      factory.setBatchListener(true);
      return factory;
  }

控制器 -

    @PostMapping("/startListener")
    ResponseEntity<HttpStatus> startListener(@RequestParam String queueName) {
        start(queueName);
        return new ResponseEntity<>(HttpStatus.OK);
    }
    
    
    @PostMapping("/stopListener")
    ResponseEntity<HttpStatus> getRegisters(@RequestParam String queueName) {
        stop(queueName);
        return new ResponseEntity<>(HttpStatus.OK);
    }
    
    
    public void start(String queueName) {
        MessageListenerContainer containers =  registry.getListenerContainer(queueName);
        if(!containers.isRunning()){
            logger.info(queueName+" is resuming its listening");
            containers.start();
        }
    }
    
    public void stop(String queueName) {
        
        MessageListenerContainer containers =  registry.getListenerContainer(queueName);
        if(containers.isRunning()) {
            logger.info(queueName+ " is running and It's about to be stoped");
            containers.stop();
        }
            }

现在,当我在具有单个实例的 Portainer 服务中部署此应用程序时,它按预期工作。但是,当我将可伸缩性从 1 个实例增加到 3 个实例并尝试停止侦听器时,它仅在其中一个实例中停止,但仍然选择了队列消息。:(

请帮我解决这个问题。一站式 API 调用以停止所有侦听器以停止侦听消息队列。

我是码头工人和搬运工的新手。

4

0 回答 0