我正在构建一个应用程序,它侦听多个 Rabbit MQ QUEUES 并执行逻辑并创建一个文件到 FTP,我使用“RabbitListenerContainerFactory”控制了每个队列的侦听器。
根据 FTP 位置的可用性,我将控制侦听器以侦听消息或处于空闲状态。
例如 - 我们有两个队列
- 队列-a
- 队列-b
Queue-a 消息将由
@Component
public class ListenerForQueueA {
@RabbitListener
(queues = "queue-a",
id = "queue-a", containerFactory = "myRabbitListenerContainerFactory")
public void processOrder(String message) throws Exception {
//businesslogic
}
Queue-b 消息将被
@Component
public class ListenerForQueueB {
@RabbitListener
(queues = "queue-b",
id = "queue-b", containerFactory = "myRabbitListenerContainerFactory")
public void processOrder(String message) throws Exception {
//businesslogic
}
我的兔子配置 -
@Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setMaxConcurrentConsumers(10);
factory.setBatchListener(true);
return factory;
}
控制器 -
@PostMapping("/startListener")
ResponseEntity<HttpStatus> startListener(@RequestParam String queueName) {
start(queueName);
return new ResponseEntity<>(HttpStatus.OK);
}
@PostMapping("/stopListener")
ResponseEntity<HttpStatus> getRegisters(@RequestParam String queueName) {
stop(queueName);
return new ResponseEntity<>(HttpStatus.OK);
}
public void start(String queueName) {
MessageListenerContainer containers = registry.getListenerContainer(queueName);
if(!containers.isRunning()){
logger.info(queueName+" is resuming its listening");
containers.start();
}
}
public void stop(String queueName) {
MessageListenerContainer containers = registry.getListenerContainer(queueName);
if(containers.isRunning()) {
logger.info(queueName+ " is running and It's about to be stoped");
containers.stop();
}
}
现在,当我在具有单个实例的 Portainer 服务中部署此应用程序时,它按预期工作。但是,当我将可伸缩性从 1 个实例增加到 3 个实例并尝试停止侦听器时,它仅在其中一个实例中停止,但仍然选择了队列消息。:(
请帮我解决这个问题。一站式 API 调用以停止所有侦听器以停止侦听消息队列。
我是码头工人和搬运工的新手。