0

我正在研究两个 RabbitMQ 消息代理之间的集成流程。

我的 IntegrationFlow 代码是:

   @Bean
    public IntegrationFlow messageFlow() {
        return IntegrationFlows.from(stompInboundChannelAdapter())
                .transform(inBoundStompMsgTransformer::transform)
                .headerFilter("stomp_subscription","content-length")
                .handle(Amqp.outboundAdapter(outboundConfiguration.rabbitTemplate()))
                .log()
                .get();
    }

入站适配器代码是:

   @Bean
    public MessageChannel stompInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public StompInboundChannelAdapter stompInboundChannelAdapter() {
        StompInboundChannelAdapter adapter = new StompInboundChannelAdapter(stompSessionManager(), "/queue/myQueue");
        adapter.setOutputChannel(stompInputChannel());
        adapter.setPayloadType(ByteString.class);
        return adapter;
    }

我收到消息。消息正在发生变化。但是,转换后的消息没有到达另一个 RabbitMQ

rabbitTemplate 代码是:

  @Bean
    Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(routingkey);
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(host);
        cachingConnectionFactory.setUsername(username);
        cachingConnectionFactory.setUsername(password);
        return cachingConnectionFactory;
    }

    @Bean
    public AmqpTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setExchange(exchange);
        rabbitTemplate.setRoutingKey(routingkey);
        return rabbitTemplate;
    }

我的 IntegrationFlow 出了什么问题?

谢谢,

马赫什

4

0 回答 0