0

使用 spring 集成将消息从 RabbitMQ 传输到 MQ 效果很好。

如果我停止 RabbitMQ 服务器,那么我们的日志文件就会出错:

ERROR o.s.a.r.c.CachingConnectionFactory - Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - Node was put into maintenance mode, class-id=0, method-id=0)

我们如何拦截这个异常?

ExceptionListener添加jms时效果很好DefaultMessageListenerContainer

按照bean的配置:

<bean id="connectionAmqpFactorySrc" class="com.rabbitmq.client.ConnectionFactory">
    <property name="automaticRecoveryEnabled" value="true"/>
    <property name="networkRecoveryInterval" value="10000"/>
</bean>

<rabbit:connection-factory  id="rabbitConnectionFactory" connection-factory="connectionAmqpFactorySrc"
    username="guest" 
    password="guest" 
    addresses="XX.XX.XX.XX"
    cache-mode="CONNECTION" 
    virtual-host="/"  
    shuffle-addresses="true" />


<bean id="fixedBackOffRabbitMQ" class="org.springframework.util.backoff.FixedBackOff">
    <constructor-arg index="0" value="10000" />
    <constructor-arg index="1" value="3" />
</bean>
    
<bean id="myListener" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="rabbitConnectionFactory" />
    <property name="queueNames" value="MyQueue" />
    <property name="recoveryBackOff" ref="fixedBackOffRabbitMQ"/>
    <property name="channelTransacted" value="true"></property>
    <property name="errorHandler" ref="errorHandler"></property>
</bean>
    
<int-amqp:inbound-channel-adapter   channel="channelRmqMQ" 
        id="inboundChannelAdapter" 
        auto-startup="true" listener-container="myListener" error-channel="processChannel1" />

编辑1

正如你建议我的那样,我使用这样的 bean 的定义:

<bean id="listeners" class="java.util.ArrayList">
    <constructor-arg>
        <list>
            <ref bean="connectionAmqpListener" />
        </list>
    </constructor-arg>
</bean>

<bean id="rabbitConnectionFactory"  class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <constructor-arg value="connectionAmqpFactorySrc"/>
    <property name="username" value="guest"/>
    <property name="password" value="guest"/>
    <property name="addresses" value="XX.XX.XX.XX"/>
    <property name="cacheMode" value="CONNECTION"/>
    <property name="virtualHost" value="/"/>
    <property name="shuffleAddresses" value="true"/>
    <property name="connectionListeners" ref="listeners"/>
</bean>

随着ConnectionAmqpListener.java

public class ConnectionAmqpListener implements ConnectionListener {
    
    private final Log LOG = LogFactory.getLog(ConnectionAmqpListener.class);
    
    public ConnectionAmqpListener() {
        super();
    }
    
    public void onCreate(Connection connection) {
        System.out.println("Open connection");
    }
    
    public void onClose(Connection connection) {
        System.out.println("Connection is closed");
    }
    
    public void onShutDown(ShutdownSignalException signal) {
        System.out.println("Connection is shutdown");
        System.exit(-1);
    }
}   

这很好用,当我停止代理时,onShutDown会调用该方法。

但是,如果我重新启动我的进程(代理关闭),我在日志文件中没有任何消息并且进程停止。

如果连接失败,您对如何获取信息有任何建议吗?

结束编辑1

谢谢你的帮助

问候,

埃里克

4

1 回答 1

0

ConnectionFactory

void addConnectionListener(ConnectionListener listener)

该回调有这个钩子:

/**
 * Called when a connection is force closed.
 * @param signal the shut down signal.
 * @since 2.0
 */
default void onShutDown(ShutdownSignalException signal) {
}
于 2021-03-17T13:49:17.550 回答