0

我目前在我的两个应用程序(web/worker)之间实现 RabbitMQ 消息传递时遇到问题。我的 RabbitMQ 服务托管在 CloudAMQP(Heroku 插件)上。但是,@RabbitListener我声明的任何似乎都尝试连接到localhost而不是云服务。

将以下组件添加到我的工作应用程序中后:

@Service
public class TaskConsumer {
    @RabbitListener(queues = "worker.rpc.requests", containerFactory = "rabbitListenerContainerFactory")
    public String fetch(String p) {
        return p;
    }
}

我遇到以下错误:

2021-07-05 14:38:23.006  INFO 18840 --- [ntContainer#0-3] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2021-07-05 14:38:32.145  WARN 18840 --- [ntContainer#0-3] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect
2021-07-05 14:38:32.145  INFO 18840 --- [ntContainer#0-3] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@32c8d668: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0

如何绑定 RabbitListener 以使其连接到 AMQP 环境?这是我的配置:

@Configuration
@EnableRabbit
public class RabbitConfig {

    protected final String workerQueueName = "worker.rpc.requests";
    protected final String routingKeyName = "rpc";
    protected final String directExcName = "worker.exchange";

    @Bean
    public ConnectionFactory connectionFactory() {
        final URI ampqUrl;
        try {
            ampqUrl = new URI(getEnvOrThrow("CLOUDAMQP_URL"));
        } catch (URISyntaxException e) {
            throw new RuntimeException(e);
        }

        final CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUsername(ampqUrl.getUserInfo().split(":")[0]);
        factory.setPassword(ampqUrl.getUserInfo().split(":")[1]);
        factory.setHost(ampqUrl.getHost());
        factory.setPort(ampqUrl.getPort());
        factory.setVirtualHost(ampqUrl.getPath().substring(1));

        try {
            factory.getRabbitConnectionFactory().setUri(ampqUrl);
        } catch (URISyntaxException e) {
            e.printStackTrace();
        } catch (NoSuchAlgorithmException e) {
            e.printStackTrace();
        } catch (KeyManagementException e) {
            e.printStackTrace();
        }

        return factory;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());

        SimpleMessageListenerContainer container = factory
                .createListenerContainer();
        factory.setConcurrentConsumers(50);
        factory.setMaxConcurrentConsumers(100);
        container.setStartConsumerMinInterval(3000);
        container.setQueues(queue());
        factory.setMaxConcurrentConsumers(5);
        return factory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setRoutingKey(this.workerQueueName);
        template.setDefaultReceiveQueue(this.workerQueueName);
        return template;
    }

    @Bean
    public Queue queue() {
        return new Queue(this.workerQueueName);
    }

    @Bean
    public DirectExchange direct() {
        return new DirectExchange(this.directExcName);
    }

    @Bean
    public Binding binding(DirectExchange direct,
                             Queue autoDeleteQueue1) {
        return BindingBuilder.bind(autoDeleteQueue1)
                .to(direct)
                .with(this.routingKeyName);
    }

    /**
     * Required for executing adminstration functions against an AMQP Broker
     */
    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    private static String getEnvOrThrow(String name) {
        final String env = getenv(name);
        if (env == null) {
            throw new IllegalStateException("Environment variable [" + name + "] is not set."); 
        }
        return env;
    }

}
4

0 回答 0