0

当消息侦听器容器上的 channelTransacted = true 时,确认模式.AUTO 和确认模式.手动有什么区别?

我知道当我想使用带有 channelTransacted=false 的 acknowledgeMode.MANUAL 时,侦听器必须通过调用 Channel.basicAck() 来确认消息。当侦听器容器具有 channelTransacted = true 时,是否不需要使用 acknowledgeMode.MANUAL?因为当我将 acknowledgeMode.MANUAL 和 channelTransacted=true 与调用 Channel.basicAck() 结合起来时,我得到了异常。

@Bean(name = "amqpInboundEsignRequest")
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory, PlatformTransactionManager transactionManager) {
    return IntegrationFlows.from(
            Amqp.inboundAdapter(connectionFactory, EsignContstants.QUEUE)
                    .acknowledgeMode(AcknowledgeMode.MANUAL)
                    .messageConverter(new Jackson2JsonMessageConverter())
                    .autoStartup(false)
                    .channelTransacted(true)
                    .transactionManager(transactionManager)
                    .txSize(1)
    )
            .enrichHeaders(s -> s.header(MessageHeaders.REPLY_CHANNEL, "basicAck.flow", true))
            .log("amqpInbound.start-process")
            .channel("insertAndStartProcess.input")
            .get();
}

例外

2018-02-19 09:57:27.478 ERROR 4664 --- [rContainer#0-45] o.s.t.s.TransactionSynchronizationUtils  : TransactionSynchronization.afterCompletion threw exception

org.springframework.amqp.AmqpException: failed to commit RabbitMQ transaction
    at org.springframework.amqp.rabbit.connection.RabbitResourceHolder.commitAll(RabbitResourceHolder.java:168) ~[spring-rabbit-1.7.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$RabbitResourceSynchronization.afterCompletion(ConnectionFactoryUtils.java:264) ~[spring-rabbit-1.7.4.RELEASE.jar:na]
    at org.springframework.transaction.support.TransactionSynchronizationUtils.invokeAfterCompletion(TransactionSynchronizationUtils.java:168) ~[spring-tx-4.3.13.RELEASE.jar:4.3.13.RELEASE]
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.invokeAfterCompletion(AbstractPlatformTransactionManager.java:1002) [spring-tx-4.3.13.RELEASE.jar:4.3.13.RELEASE]
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.triggerAfterCompletion(AbstractPlatformTransactionManager.java:977) [spring-tx-4.3.13.RELEASE.jar:4.3.13.RELEASE]
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:806) [spring-tx-4.3.13.RELEASE.jar:4.3.13.RELEASE]
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:730) [spring-tx-4.3.13.RELEASE.jar:4.3.13.RELEASE]
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:150) [spring-tx-4.3.13.RELEASE.jar:4.3.13.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1198) [spring-rabbit-1.7.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:102) [spring-rabbit-1.7.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1470) [spring-rabbit-1.7.4.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
Caused by: java.io.IOException: null
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:105) ~[amqp-client-4.0.3.jar:4.0.3]
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:101) ~[amqp-client-4.0.3.jar:4.0.3]
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:123) ~[amqp-client-4.0.3.jar:4.0.3]
    at com.rabbitmq.client.impl.ChannelN.txCommit(ChannelN.java:1318) ~[amqp-client-4.0.3.jar:4.0.3]
    at com.rabbitmq.client.impl.ChannelN.txCommit(ChannelN.java:50) ~[amqp-client-4.0.3.jar:4.0.3]
    at sun.reflect.GeneratedMethodAccessor151.invoke(Unknown Source) ~[na:na]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_131]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_131]
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:980) ~[spring-rabbit-1.7.4.RELEASE.jar:na]
    at com.sun.proxy.$Proxy191.txCommit(Unknown Source) ~[na:na]
    at org.springframework.amqp.rabbit.connection.RabbitResourceHolder.commitAll(RabbitResourceHolder.java:164) ~[spring-rabbit-1.7.4.RELEASE.jar:na]
    ... 11 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) ~[amqp-client-4.0.3.jar:4.0.3]
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:32) ~[amqp-client-4.0.3.jar:4.0.3]
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:366) ~[amqp-client-4.0.3.jar:4.0.3]
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:229) ~[amqp-client-4.0.3.jar:4.0.3]
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:117) ~[amqp-client-4.0.3.jar:4.0.3]
    ... 19 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:505) ~[amqp-client-4.0.3.jar:4.0.3]
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:336) ~[amqp-client-4.0.3.jar:4.0.3]
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:143) ~[amqp-client-4.0.3.jar:4.0.3]
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:90) ~[amqp-client-4.0.3.jar:4.0.3]
    at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:634) ~[amqp-client-4.0.3.jar:4.0.3]
    at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47) ~[amqp-client-4.0.3.jar:4.0.3]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:572) ~[amqp-client-4.0.3.jar:4.0.3]
    ... 1 common frames omitted
4

1 回答 1

0

使用事务不应该有任何区别;该错误意味着在您收到消息和发送消息之间由于某种原因通道已关闭,basicAck在事务提交之前我们不会检测到它。

我们最近添加了一个修复程序来检测通道已关闭并在这种情况下拒绝该通道basicAck

修复在 2.0.2.RELEASE 中。

于 2018-02-19T17:20:51.623 回答