我尝试使用 spring amqp 应用程序将 JSON 数据作为消息发送给消费者(用 Ruby 编写)。
我尝试的任何设置都会出现连接重置错误
注意:我删除了默认的来宾用户,并添加了对默认虚拟主机具有相同权限的管理员。
这是配置:
<beans:bean id="cachingConnectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<beans:property name="channelCacheSize" value="5" />
<beans:property name="username" value="admin" />
<beans:property name="password" value="admin" />
</beans:bean>
<rabbit:queue id='analytics.persistence.queue' name='analytics.persistence.queue' />
<rabbit:direct-exchange name="amq.direct">
<rabbit:bindings>
<rabbit:binding queue="analytics.persistence.queue">
</rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:connection-factory id="cachingConnectionFactory" />
<rabbit:admin connection-factory="cachingConnectionFactory" />
<beans:bean id="rabbitTemplate"
class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<beans:property name="connectionFactory" ref="cachingConnectionFactory" />
<beans:property name="exchange" value="amq.direct" />
<beans:property name="routingKey" value="analytics.persistence.queue"/>
</beans:bean>
<beans:bean id="analyticsMessageProducer"
class="hoodibaba.analytics.publish.AnalyticsPersistenceMessageProducer">
<beans:property name="rabbitTemplate" ref="rabbitTemplate" />
</beans:bean>
在我的制作人课上
public class AnalyticsPersistenceMessageProducer extends RabbitGatewaySupport implements
AnalyticsPersistenceMessageGateway {
private static final Logger logger = LoggerFactory.getLogger(AnalyticsPersistenceMessageProducer.class);
@Override
public void sendAnalyticsMessage(String jsonData) {
try {
getRabbitTemplate().convertAndSend("analytics.persistence.queue",jsonData);
} catch (AmqpException e) {
logger.error(e.getMessage());
logger.error("Exception Stacktrace",e);
logger.error("flushing JSON data to logs: " + jsonData);
}
}
}
我得到一个
com.rabbitmq.client.PossibleAuthenticationFailureException: 可能是认证失败异常引起
堆栈跟踪显示连接重置
Caused by: com.rabbitmq.client.PossibleAuthenticationFailureException: Possibly caused by authentication failure
at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:348)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:516)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:545)
at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:160)
... 43 more
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; reason: java.net.SocketException: Connection reset
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:202)
at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:340)
... 46 more
Caused by: java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:189)
at java.net.SocketInputStream.read(SocketInputStream.java:121)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288)
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:131)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:508)
消息消费者如下
#!/usr/bin/env ruby
# encoding: UTF-8
require 'bunny'
conn = Bunny.new("amqp://admin:admin@localhost:5672")
conn.start
ch = conn.create_channel
puts 'channel created'
q = ch.queue("analytics.persistence.queue",:exclusive => false, :auto_delete => false)
x = ch.direct("amq.direct")
q.bind(x)
q.subscribe(:block => true, :ack => true) do |delivery_info, properties, payload|
puts "Received #{payload}, message properties are #{properties.inspect}"
end
ch.close
con.close
更新:似乎更新的用户名和密码设置没有传播,兔子日志显示我的应用程序仍然使用来宾
=ERROR REPORT==== 19-Mar-2013::00:16:12 ===
closing AMQP connection <0.13815.0> (127.0.0.1:44736 -> 127.0.0.1:5672):
{handshake_error,starting,0,
{amqp_error,access_refused,
"PLAIN login refused: user 'guest' - invalid credentials",
'connection.start_ok'}}