0

我使用 Spring Cloud Spring 服务连接器连接 CloudFoundry 上的 Rabbitmq 服务。

public class CloudConfig extends AbstractCloudConfig {

    @Bean
    public ConnectionFactory rabbitFactory()
    {
         return connectionFactory().rabbitConnectionFactory();
    }
}

但我需要声明一个 CachingConnectionFactory 并将其 PublisherConfirms 设置为 true。因为我们在向队列发送消息时需要使用 publisherConfirm 来检查 ack。我不知道如何注入从云弹簧服务连接器获得的 connectionFactory。或者我们如何处理这种情况。

4

3 回答 3

4

文档包括自定义连接器提供的连接详细信息的示例。

在您的情况下,您应该能够执行以下操作:

@Bean
public RabbitConnectionFactory rabbitFactory() {
    Map<String, Object> properties = new HashMap<String, Object>();
    properties.put("publisherConfirms", true);

    RabbitConnectionFactoryConfig rabbitConfig = new RabbitConnectionFactoryConfig(properties);
    return connectionFactory().rabbitConnectionFactory(rabbitConfig);
}
于 2017-11-28T19:39:35.420 回答
0

这是兔子模板

@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    template.setMandatory(true);
    template.setMessageConverter(new Jackson2JsonMessageConverter());
    template.setConfirmCallback((correlationData, ack, cause) -> {
        if (!ack) {
            System.out.println("send message failed: " + cause + correlationData.toString());
        } else {
            System.out.println("Publisher Confirm" + correlationData.toString());
        }
    });
    return template;
}

这是弹簧云配置:

@Bean
public ConnectionFactory rabbitConnectionFactory() {
    Map<String, Object> properties = new HashMap<String, Object>();
    properties.put("publisherConfirms", true);
    RabbitConnectionFactoryConfig rabbitConfig = new RabbitConnectionFactoryConfig(properties);
    return connectionFactory().rabbitConnectionFactory(rabbitConfig);
}

当我使用此发件人发送消息时。结果不是预期的。

@Component
public class TestSender {

@Autowired
private RabbitTemplate rabbitTemplate;

@Scheduled(cron = "0/5 * *  * * ? ")
public void send() {
System.out.println("===============================================================");
    this.rabbitTemplate.convertAndSend(EXCHANGE, "routingkey", "hello world",
            (Message m) -> {
                m.getMessageProperties().setHeader("tenant", "aaaaa");
                return m;
            }, new CorrelationData(UUID.randomUUID().toString()));
    Date date = new Date();
    System.out.println("Sender Msg Successfully - " + date);
}

}

于 2017-11-30T09:52:47.913 回答
0

您可以重新配置连接器创建的 CCF,如下所示:

@Bean
public SmartInitializingSingleton factoryConfigurer() {
    return new SmartInitializingSingleton() {

        @Autowired
        private CachingConnectionFactory connectionFactory;

        @Override
        public void afterSingletonsInstantiated() {
            this.connectionFactory.setPublisherConfirms(true);
        }
    };
}

您必须确保在应用程序上下文完全初始化之前不要执行任何 RabbitMQ 操作(无论如何这是最佳实践)。

于 2017-11-28T17:59:12.057 回答