创建了具有两个 rabbitMQ 节点的集群。rabbit1 和 rabbit2 节点的配置如下。
1> CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("rabbit1:5672,rabbit2:5672");
2> 节点类型rabbit1 - 磁盘节点rabbit2 - ram 节点3> 生产者和消费者程序位于rabbit2 节点(即> ram 节点)
4> Producer sample code -
String QueueName = "Queue.";
for(int m=0; m<50000; m++){
// send message
System.out.println(this.rabbitTemplate.getConnectionFactory().getHost());
this.rabbitTemplate.convertAndSend(m);
/*Thread.sleep(100);*/
}
5> consumer code -
String QueueName = "Queue.";
public void run() {
System.out.println("Consumer running host : " + this.connectionFactory.getHost());
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(this.connectionFactory);
container.setQueueNames(this.queueName);
container.setMessageListener(new MessageListenerAdapter(new TestMessageHandler(this.connectionFactory.getHost()), new JsonMessageConverter()));
container.start();
}
TestMessageHandler class sample code-
public TestMessageHandler(String hostName){
System.out.println("Host: " + hostName);
this.hostName = hostName;
}
// Handle message
public void handleMessage(int message) {
System.out.println("handleMessage Host: " + this.hostName);
System.out.println("Int : " + message);
}
6> Each node executed below policy
cmd> rabbitmqctl set_policy ha-all "^Queue\." "{""ha-mode"":""all""}"
7> Started producer and consumer simultaneously. Could see host name as "rabbit1" then stopped "rabbit1" node with "rabbitmqctl stop_app" command to test fail-over scenario. Then got the below error
WARN [.listener.SimpleMessageListenerContainer]: Consumer raised exception, processing can restart if the connection factory supports it
com.rabbitmq.client.ShutdownSignalException: connection error; reason: {#method<connection.close>(reply-code=541, reply-text=INTERNAL_ERROR, class-id=0, method-id=0), null, ""}
at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:678)
at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:668)
at com.rabbitmq.client.impl.AMQConnection.handleConnectionClose(AMQConnection.java:624)
at com.rabbitmq.client.impl.AMQConnection.processControlCommand(AMQConnection.java:598)
at com.rabbitmq.client.impl.AMQConnection$1.processAsync(AMQConnection.java:96)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:523)
INFO [.listener.SimpleMessageListenerContainer]: Restarting Consumer: tag=[amq.ctag-5CJ3YJYfMZDnJOnXsds6_Q], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@192.168.97.70:5672/,1), acknowledgeMode=AUTO local queue size=0
after this warning, again am getting host name as "rabbit1" only. actually it should be "rabbit2" as per my understanding but its not happening.
So, here are my Queries -
1> Why am getting host name as "rabbit1" even after stopping?
2> To test the fail-over do we require any load balancer?
3> If my steps are wrong for testing fail-over case, please provide steps for the same?
4> How to distribute queues/messages to particular node, as below, 1-500 messages/queues to node1, 501-1000 messages/queues to node2, etc.
5> Please let me know is there any other approach to test fail-over scenario?
感谢您对此的任何帮助。