5

我正在尝试从 XML Spring amqp 配置迁移到基于 java 注释的配置,因为它“更简单”。不知道我在做什么错 XML 配置工作正常,但 java @Configurable 抛出“由:java.net.SocketException:连接重置”异常。

XML 配置(完美运行):

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:util="http://www.springframework.org/schema/util"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
           http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
           http://www.springframework.org/schema/context
           http://www.springframework.org/schema/context/spring-context.xsd
           http://www.springframework.org/schema/util
           http://www.springframework.org/schema/util/spring-util.xsd
           http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans.xsd">

    <!-- define which properties files will be used -->
    <context:property-placeholder location="classpath:*.properties" />

    <rabbit:connection-factory id="connectionFactory"
                               addresses='${rabbitmq.hostname}'
                               username='${rabbitmq.username}'
                               password='${rabbitmq.password}' 
                               virtual-host='${rabbitmq.virtual_host}'  
                               cache-mode='${rabbitmq.cache_mode}'                             
                               channel-cache-size='${rabbitmq.channel_cache_size}'/>

    <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="corePoolSize" value="3"/>
        <property name="maxPoolSize" value="5"/>
        <property name="queueCapacity" value="15"/>                          
    </bean>                              


    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
    <rabbit:admin connection-factory="connectionFactory"/>
       <rabbit:queue name="${rabbitmq.queue_name}" />
<rabbit:topic-exchange name="${rabbitmq.topic_exchange_name}">
    <rabbit:bindings>
        <rabbit:binding queue="${rabbitmq.queue_name}" pattern="${rabbitmq.topic_exchange_pattern}"/>
    </rabbit:bindings>
</rabbit:topic-exchange>

    <bean id="listener" class="com.my.package.path.worker.DefaultMessageListener"/>


    <rabbit:listener-container id="listenerContainer" connection-factory="connectionFactory" task-executor="taskExecutor">
            <rabbit:listener ref="listener" queues="notification.main" />

    </rabbit:listener-container>
</beans>

Java配置:

@Configurable
@PropertySource("classpath:rabbitmq.properties")
public class RabbitMQConfig {

@Value("${rabbitmq.hostname}")
private String hostname;

@Value("${rabbitmq.port}")
private String port; 

@Value("${rabbitmq.username}")
private String username;

@Value("${rabbitmq.password}")
private String password; 

@Value("${rabbitmq.virtual_host}")
private String virtualHost; 

//@Value("${rabbitmq.cache_mode}")
//private String cacheMode;

@Value("${rabbitmq.channel_cache_size}")
private String channelCacheSize;

@Value("${rabbitmq.topic_exchange_name}")
private String topicExchangeName;

@Value("${rabbitmq.topic_exchange_pattern}")
private String topicExchangePattern;

@Value("${rabbitmq.queue_name}")
private String queueName; 

@Autowired
private ConnectionFactory cachingConnectionFactory;

@Bean(name="cachingConnectionFactory")
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(hostname,Integer.valueOf(port));

    connectionFactory.setUsername(username);
    connectionFactory.setPassword(password);

    //connectionFactory.setCacheMode(CacheMode.valueOf(cacheMode));
    connectionFactory.setChannelCacheSize(Integer.valueOf( channelCacheSize ));

    return connectionFactory;
}

@Bean(name="taskExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
    ThreadPoolTaskExecutor tpte = new ThreadPoolTaskExecutor();
    tpte.setCorePoolSize(3);
    tpte.setMaxPoolSize(5);
    tpte.setQueueCapacity(15);
    return tpte;
}

@Bean
public AmqpTemplate AmqpTemplate() {
    RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);

    return template;
}


@Bean
public AmqpAdmin amqpAdmin() {
    RabbitAdmin amqpAdmin = new RabbitAdmin(cachingConnectionFactory);

    return amqpAdmin;
}

@Bean
public Queue queue() {
    return new Queue(queueName);
}

@Bean
public TopicExchange topicExchange() {
    TopicExchange topicExchange = new TopicExchange(topicExchangeName);
    return topicExchange;
}

@Bean
public Binding dataBinding(TopicExchange topicExchange, Queue queue) {
    return BindingBuilder.bind(queue).to(topicExchange).with(topicExchangePattern);
}

@Bean 
public DefaultMessageListener defaultMessageListener() {
    return new DefaultMessageListener();
}

@Bean 
public SimpleMessageListenerContainer container(DefaultMessageListener defaultMessageListener) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(cachingConnectionFactory);
    container.setQueueNames(queueName);
    container.setAutoStartup(true);
    container.setMessageListener(defaultMessageListener);
    //container.setTaskExecutor(taskExecutor);
    return container;
}

@Bean
public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() {
   return new PropertySourcesPlaceholderConfigurer();
}

java配置错误:

INFO : org.springframework.context.support.DefaultLifecycleProcessor - Starting beans in phase 2147483647
DEBUG: org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - No global properties bean
DEBUG: org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Starting Rabbit listener container.
ERROR: org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Failed to check/redeclare auto-delete queue(s).
org.springframework.amqp.AmqpIOException: java.io.IOException
    at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:63)
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:217)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:444)
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createConnection(ConnectionFactoryUtils.java:80)
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:130)
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:67)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1035)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1028)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1004)
    at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:254)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.redeclareElementsIfNecessary(SimpleMessageListenerContainer.java:963)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$300(SimpleMessageListenerContainer.java:83)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1081)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:376)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:603)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:637)
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:208)
    ... 12 more
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:348)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:221)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
    ... 16 more
Caused by: java.net.SocketException: Connection reset
    at java.net.SocketInputStream.read(SocketInputStream.java:209)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288)
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:534)
    ... 1 more

我踏入了 Spring amqp 代码,罪魁祸首是 RabbitAdmin#getQueueProperties 方法。在 XML 配置中它执行得很好......但是一旦它使用 java 配置执行它就会抛出上面的异常?我在做什么不一样?两种配置对我来说都一样。

package org.springframework.amqp.rabbit.core;

public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, InitializingBean {
//... 
    @Override
    public Properties getQueueProperties(final String queueName) {
        Assert.hasText(queueName, "'queueName' cannot be null or empty");
        return this.rabbitTemplate.execute(new ChannelCallback<Properties>() {
            @Override
            public Properties doInRabbit(Channel channel) throws Exception {
                try {
                    DeclareOk declareOk = channel.queueDeclarePassive(queueName);
                    Properties props = new Properties();
                    props.put(QUEUE_NAME, declareOk.getQueue());
                    props.put(QUEUE_MESSAGE_COUNT, declareOk.getMessageCount());
                    props.put(QUEUE_CONSUMER_COUNT, declareOk.getConsumerCount());
                    return props;
                }
                catch (Exception e) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Queue '" + queueName + "' does not exist");
                    }
                    return null;
                }
            }
        });
    }
}

两个配置都在类路径上使用完全相同的 rabbitmq.properties 文件。我什至在运行时检查了两个配置的 RabbitAdmin 和 RabbitTemplate 类的属性,它们看起来完全一样......

4

2 回答 2

4

你应该使用@Configuration,而不是@Configurable

编辑:

看起来 rabbitmq-server 正在关闭连接:

Caused by: java.net.SocketException: Connection reset

查看服务器日志;如果那没有帮助;在某处发布完整的调试日志org.springframework(可能对这里来说太大了)。

编辑2:

您有身份验证问题...

{handshake_error,opening,0,
             {amqp_error,access_refused,
                         "access to vhost '/' refused for user 'gggdw'",
                         'connection.open'}}

...检查您的用户名和密码(和虚拟主机)。

于 2015-05-19T16:46:43.367 回答
1

我没有使用“/”根虚拟主机。我有自己的 virtual_host 自定义值。虽然,我确实通过 spel 将此属性注入到我的 java 配置中,但我没有在 connectionFactory 上明确设置它。

connectionFactory.setVirtualHost(virtualHost);

感谢@Gary Russell 帮助我解决问题。

@Bean(name="cachingConnectionFactory")
public ConnectionFactory connectionFactory() {

        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(hostname,Integer.valueOf(port));

        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);

        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setChannelCacheSize(Integer.valueOf( channelCacheSize ));

        return connectionFactory;
    }
于 2015-05-19T17:20:28.820 回答