2

我尝试使用 spring amqp 应用程序将 JSON 数据作为消息发送给消费者(用 Ruby 编写)。

我尝试的任何设置都会出现连接重置错误

注意:我删除了默认的来宾用户,并添加了对默认虚拟主机具有相同权限的管理员。

这是配置:

<beans:bean id="cachingConnectionFactory"
    class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <beans:property name="channelCacheSize" value="5" />
    <beans:property name="username" value="admin" />
    <beans:property name="password" value="admin" />
</beans:bean>

<rabbit:queue id='analytics.persistence.queue' name='analytics.persistence.queue' />

<rabbit:direct-exchange name="amq.direct">
    <rabbit:bindings>
        <rabbit:binding queue="analytics.persistence.queue">
        </rabbit:binding>
    </rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:connection-factory id="cachingConnectionFactory" />

<rabbit:admin connection-factory="cachingConnectionFactory" />

<beans:bean id="rabbitTemplate"
    class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <beans:property name="connectionFactory" ref="cachingConnectionFactory" />
    <beans:property name="exchange" value="amq.direct" />
    <beans:property name="routingKey" value="analytics.persistence.queue"/>
</beans:bean>

<beans:bean id="analyticsMessageProducer"
    class="hoodibaba.analytics.publish.AnalyticsPersistenceMessageProducer">
    <beans:property name="rabbitTemplate" ref="rabbitTemplate" />
</beans:bean>

在我的制作人课上

public class AnalyticsPersistenceMessageProducer extends RabbitGatewaySupport implements
    AnalyticsPersistenceMessageGateway {

  private static final Logger logger = LoggerFactory.getLogger(AnalyticsPersistenceMessageProducer.class);

  @Override
  public void sendAnalyticsMessage(String jsonData) {
    try {
      getRabbitTemplate().convertAndSend("analytics.persistence.queue",jsonData);
    } catch (AmqpException e) {
      logger.error(e.getMessage());
      logger.error("Exception Stacktrace",e);
      logger.error("flushing JSON data to logs: " + jsonData);
    }
  }

}

我得到一个

com.rabbitmq.client.PossibleAuthenticationFailureException: 可能是认证失败异常引起

堆栈跟踪显示连接重置

Caused by: com.rabbitmq.client.PossibleAuthenticationFailureException: Possibly caused by authentication failure
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:348)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:516)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:545)
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:160)
    ... 43 more
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; reason: java.net.SocketException: Connection reset
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
    at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:202)
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:340)
    ... 46 more
Caused by: java.net.SocketException: Connection reset
    at java.net.SocketInputStream.read(SocketInputStream.java:189)
    at java.net.SocketInputStream.read(SocketInputStream.java:121)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288)
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:131)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:508)

消息消费者如下

#!/usr/bin/env ruby
# encoding: UTF-8
require 'bunny'

conn = Bunny.new("amqp://admin:admin@localhost:5672")
conn.start

ch = conn.create_channel
puts 'channel created'
q = ch.queue("analytics.persistence.queue",:exclusive => false, :auto_delete => false)
x = ch.direct("amq.direct")
q.bind(x)
q.subscribe(:block => true, :ack => true) do |delivery_info, properties, payload|
  puts "Received #{payload}, message properties are #{properties.inspect}"
end
ch.close
con.close

更新:似乎更新的用户名和密码设置没有传播,兔子日志显示我的应用程序仍然使用来宾

=ERROR REPORT==== 19-Mar-2013::00:16:12 ===
closing AMQP connection <0.13815.0> (127.0.0.1:44736 -> 127.0.0.1:5672):
{handshake_error,starting,0,
                 {amqp_error,access_refused,
                             "PLAIN login refused: user 'guest' - invalid credentials",
                             'connection.start_ok'}}
4

1 回答 1

0

(OP在帖子中编辑了答案。请参阅没有答案的问题,但在评论中解决了问题(或在聊天中扩展)

OP写道:

问题出在我的配置中;缓存连接工厂是使用 beans 命名空间和 rabbit 命名空间定义的,后者使用默认值。将配置更改为以下,我不再收到连接重置错误

<rabbit:queue id='analytics.persistence.queue' name='analytics.persistence.queue' />

<rabbit:direct-exchange name="amq.direct">
    <rabbit:bindings>
        <rabbit:binding queue="analytics.persistence.queue">
        </rabbit:binding>
    </rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:connection-factory id="cachingConnectionFactory"
    username="analytics" password="analytics" channel-cache-size="5"
    virtual-host="analytics" />

<rabbit:admin connection-factory="cachingConnectionFactory" />

<beans:bean id="rabbitTemplate"
    class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <beans:property name="connectionFactory" ref="cachingConnectionFactory" />
    <beans:property name="exchange" value="amq.direct" />
    <beans:property name="queue" value="analytics.persistence.queue" />
    <beans:property name="routingKey" value="save.analytics"></beans:property>
</beans:bean>

<beans:bean id="analyticsMessageProducer"
    class="hoodibaba.analytics.publish.AnalyticsPersistenceMessageProducer"
    autowire="byName" />
于 2015-01-26T22:31:29.290 回答