我有一个带有 2 个节点的 HornetQ 独立集群,并且 Mule ESB 配置为连接 HornetQ 集群发现组。
我写了一个 Mule 回调监听器,它使用来自 HornetQ 集群的消息。但是集群中的第二个节点表现得像一个备份或故障转移服务器。当我发送多条消息时,它总是只传递到 node1。当我停止 node1 时, node2 正在挑选消息。对于 node2,JConsole 始终将消费者数量显示为 0。而 node1 有 6 个使用者,如 Mule JMS 连接器->numberOfConsumers 属性中定义的,如下面的配置所示。
<jms:connector name="hornetq-connector" username="guest"
maxRedelivery="5" password="guest" specification="1.1"
connectionFactory-ref="connectionFactory" numberOfConsumers="6" >
<spring:property name="retryPolicyTemplate" ref="ThreadingPolicyTemplate" />
</jms:connector>
我只有一个在 mule 流中定义的消费者,如下所示。
<flow name="Flow2" doc:name="Flow2">
<jms:inbound-endpoint queue="InboundQueue"
connector-ref="hornetq-connector">
<jms:transaction action="ALWAYS_BEGIN" timeout="10000" />
</jms:inbound-endpoint>
<component class="com.test.Consumer" />
</flow>
Mule Callback java 类作为组件添加,如上面的代码()所示。请在下面找到代码。
公共类消费者实现 org.mule.api.lifecycle.Callable{
private static DateFormat df1 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
@Override
public Object onCall(MuleEventContext eventContext) throws Exception {
// TODO Auto-generated method stub
System.out.println(df1.format(new Date().getTime())+"Consumer:Message Received,"+ Thread.currentThread()+ "," + eventContext.getMessageAsString());
Thread.sleep(5000);
System.out.println(df1.format(new Date().getTime())+"Consumer: Message process complete, "+ Thread.currentThread()+ "," + eventContext.getMessageAsString());
return "Success";
}
}
如何在所有 HornetQ 集群节点上分配这个消费者?