0

我有一个连接 JMS 侦听器的 spring 集成代码(侦听 mq 系列本地队列)

    <int-jms:message-driven-channel-adapter>

并通过一个将消息转发给rabbitmq(不受我控制)

    <int-amqp:outbound-channel-adapter>

我试图保持这种事务性意味着如果 rabbitmq 没有收到消息,我想将其保留在 MQ 系列本地队列中。但是我注意到,如果 rabbitmq 配置中提到的交换不存在,我会在日志中看到这一行:

ERROR (CachingConnectionFactory.java:292)     - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'xxx' in vhost 'xxx', class-id=60, method-id=40)

但是对于 mq 系列本地队列,我的消息消失了。

如果rabbitmq代理失败或交换不存在,我该怎么做才能重试发送消息?

感谢您的帮助,我的配置:

   <int-jms:message-driven-channel-adapter
        id="x.y.z" channel="channel-in"
        error-channel="errorChannel" header-mapper="jmsIntegrationHeaderMapper"
        acknowledge="transacted" destination-name="a.b.c" />

    <int:channel id="channel-in">

    </int:channel>

    <int:header-enricher input-channel="channel-in"
        output-channel="channel-out">
        <int:header name="url"
            expression="'amqp://${amqp.user}@${amqp.host}:${amqp.port}/${amqp.vhost}'"></int:header>
    </int:header-enricher>


    <int:channel id="channel-out">
    </int:channel>

    <int-amqp:outbound-channel-adapter
        channel="channel-out" amqp-template="amqpTemplate"
        routing-key="crd" mapped-request-headers="*" exchange-name="${amqp.exchange}">
    </int-amqp:outbound-channel-adapter>


    <rabbit:connection-factory id="amqpConnectionFactory" addresses="${amqp.host}:${amqp.port}"
                           cache-mode="CONNECTION"
                           channel-cache-size="25"
                           username="${amqp.user}"
                           password="${amqp.pass}"
                           virtual-host="${amqp.vhost}"/>

    <rabbit:template id="amqpTemplate"
                 connection-factory="amqpConnectionFactory" mandatory="true" channel-transacted="true"/>
4

1 回答 1

0

与 JMS 不同,发布到 RabbitMQ 是异步的(这就是它通常更快的原因),因此缺少交换的失败会在不同的线程上报告;您可以添加一个侦听器来获取通知,而不是仅仅记录它。

您可以启用事务template.setChannelTransacted(true);,然后在发布线程上报告失败,因为它将被阻塞等待来自txCommit()IO 错误发生时的回复。

但是,事务非常昂贵,因此会损害性能。

对于其他错误(例如交换存在,但没有到队列的路由),您可以使用发布者确认并返回,但同样,等待发布的每个单独消息的确认会降低性能。

编辑

你应该得到一个例外txCommit

Caused by: org.springframework.amqp.AmqpIOException: java.io.IOException
    at org.springframework.amqp.rabbit.connection.RabbitUtils.commitIfNecessary(RabbitUtils.java:107) ~[spring-rabbit-2.0.2.BUILD-SNAPSHOT.jar:2.0.2.BUILD-SNAPSHOT]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doSend(RabbitTemplate.java:2003) ~[spring-rabbit-2.0.2.BUILD-SNAPSHOT.jar:2.0.2.BUILD-SNAPSHOT]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.lambda$send$3(RabbitTemplate.java:865) ~[spring-rabbit-2.0.2.BUILD-SNAPSHOT.jar:2.0.2.BUILD-SNAPSHOT]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1841) ~[spring-rabbit-2.0.2.BUILD-SNAPSHOT.jar:2.0.2.BUILD-SNAPSHOT]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1784) ~[spring-rabbit-2.0.2.BUILD-SNAPSHOT.jar:2.0.2.BUILD-SNAPSHOT]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:864) ~[spring-rabbit-2.0.2.BUILD-SNAPSHOT.jar:2.0.2.BUILD-SNAPSHOT]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:927) ~[spring-rabbit-2.0.2.BUILD-SNAPSHOT.jar:2.0.2.BUILD-SNAPSHOT]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:921) ~[spring-rabbit-2.0.2.BUILD-SNAPSHOT.jar:2.0.2.BUILD-SNAPSHOT]
    at com.example.So47454769Application.lambda$0(So47454769Application.java:29) [classes/:na]
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:780) [spring-boot-2.0.0.M7.jar:2.0.0.M7]
    ... 5 common frames omitted
Caused by: java.io.IOException: null
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:126) ~[amqp-client-5.0.0.jar:5.0.0]
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:122) ~[amqp-client-5.0.0.jar:5.0.0]
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:144) ~[amqp-client-5.0.0.jar:5.0.0]
    at com.rabbitmq.client.impl.ChannelN.txCommit(ChannelN.java:1519) ~[amqp-client-5.0.0.jar:5.0.0]
    at com.rabbitmq.client.impl.ChannelN.txCommit(ChannelN.java:52) ~[amqp-client-5.0.0.jar:5.0.0]
    at org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl.txCommit(PublisherCallbackChannelImpl.java:588) ~[spring-rabbit-2.0.2.BUILD-SNAPSHOT.jar:2.0.2.BUILD-SNAPSHOT]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_181]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_181]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_181]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_181]
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:981) ~[spring-rabbit-2.0.2.BUILD-SNAPSHOT.jar:2.0.2.BUILD-SNAPSHOT]
    at com.sun.proxy.$Proxy60.txCommit(Unknown Source) ~[na:na]
    at org.springframework.amqp.rabbit.connection.RabbitUtils.commitIfNecessary(RabbitUtils.java:104) ~[spring-rabbit-2.0.2.BUILD-SNAPSHOT.jar:2.0.2.BUILD-SNAPSHOT]
    ... 14 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'junk' in vhost '/', class-id=60, method-id=40)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) ~[amqp-client-5.0.0.jar:5.0.0]
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) ~[amqp-client-5.0.0.jar:5.0.0]
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:494) ~[amqp-client-5.0.0.jar:5.0.0]
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:288) ~[amqp-client-5.0.0.jar:5.0.0]
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:138) ~[amqp-client-5.0.0.jar:5.0.0]
    ... 24 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'junk' in vhost '/', class-id=60, method-id=40)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:504) ~[amqp-client-5.0.0.jar:5.0.0]
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346) ~[amqp-client-5.0.0.jar:5.0.0]
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:178) ~[amqp-client-5.0.0.jar:5.0.0]
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:111) ~[amqp-client-5.0.0.jar:5.0.0]
    at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:643) ~[amqp-client-5.0.0.jar:5.0.0]
    at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47) ~[amqp-client-5.0.0.jar:5.0.0]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:581) ~[amqp-client-5.0.0.jar:5.0.0]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_181]

于 2019-08-08T13:15:09.940 回答