2

我正在使用 RabbitMQ 3.6.10 UI 发布一条消息,该消息由使用 Spring Integration AMQP 4.3.11 的 Java 应用程序接收。该消息是对使用拆分器创建的较早消息的回复,因此它具有 asequenceNumbersequenceSize标头。我将这些标头复制到回复中,并将它们设置为NumberRabbitMQ UI 中的类型。但是,在 Java 方面,我遇到了一个异常:

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Message conversion failed
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:223)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:822)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:745)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:97)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:189)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1276)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:726)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1219)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1189)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1500(SimpleMessageListenerContainer.java:97)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1421)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: The 'sequenceNumber' header value must be an Integer.
    at org.springframework.util.Assert.isTrue(Assert.java:92)
    at org.springframework.integration.IntegrationMessageHeaderAccessor.verifyType(IntegrationMessageHeaderAccessor.java:143)
    at org.springframework.messaging.support.MessageHeaderAccessor.setHeader(MessageHeaderAccessor.java:298)
    at org.springframework.messaging.support.MessageHeaderAccessor.copyHeaders(MessageHeaderAccessor.java:389)
    at org.springframework.integration.support.MessageBuilder.copyHeaders(MessageBuilder.java:177)
    at org.springframework.integration.support.MessageBuilder.copyHeaders(MessageBuilder.java:47)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.processMessage(AmqpInboundChannelAdapter.java:243)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:203)
    ... 14 more

我检查了 Java 端的sequenceNumber和标头的类型是,而不是。然而,在 RabbitMQ UI 中没有选项可以做出这种改变。消息将由非 Java 应用程序发送,那么如何确保标头被 Spring Integration 识别为整数?sequenceSizeLongInteger

当我使用 Java 客户端发布回复并将标头值设置为 时Integer,消费者会接受它们。所以这可能是 RabbitMQ UI 没有足够具体的标头类型(例如 32 位与 64 位数字)或 Java 客户端对预期值类型过于严格的限制。任何人都可以确认其中一个吗?

4

1 回答 1

1

将一个添加MessagePostProcessor到适配器的侦听器容器...

@Bean
public AmqpInboundChannelAdapter adapter(ConnectionFactory cf) {
    AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer(cf));
    adapter.setOutputChannelName("someChannel");
    return adapter;
}

@Bean
public AbstractMessageListenerContainer listenerContainer(ConnectionFactory cf) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
    container.setQueueNames("foo");
    container.setAfterReceivePostProcessors(m -> {
        if (m.getMessageProperties().getHeaders()
                .get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER) instanceof Long) {
            Integer sequenceNumber = ((Long) m.getMessageProperties().getHeaders()
                    .get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)).intValue();
            m.getMessageProperties().getHeaders().put(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER,
                    sequenceNumber);
        }
        return m;
    });
    return container;
}

请打开一个JIRA 问题- 我们可能应该更加宽容,特别是如果值为 < Integer.MAX_VALUE

于 2017-09-26T13:57:53.877 回答