2

我有一个带有 2 个节点的 HornetQ 独立集群,并且 Mule ESB 配置为连接 HornetQ 集群发现组。

我写了一个 Mule 回调监听器,它使用来自 HornetQ 集群的消息。但是集群中的第二个节点表现得像一个备份或故障转移服务器。当我发送多条消息时,它总是只传递到 node1。当我停止 node1 时, node2 正在挑选消息。对于 node2,JConsole 始终将消费者数量显示为 0。而 node1 有 6 个使用者,如 Mule JMS 连接器->numberOfConsumers 属性中定义的,如下面的配置所示。

<jms:connector name="hornetq-connector" username="guest" 
    maxRedelivery="5" password="guest" specification="1.1"
    connectionFactory-ref="connectionFactory" numberOfConsumers="6" >
    <spring:property name="retryPolicyTemplate" ref="ThreadingPolicyTemplate"  />
</jms:connector>

我只有一个在 mule 流中定义的消费者,如下所示。

<flow name="Flow2" doc:name="Flow2">
    <jms:inbound-endpoint queue="InboundQueue"
     connector-ref="hornetq-connector">
        <jms:transaction action="ALWAYS_BEGIN" timeout="10000" />
    </jms:inbound-endpoint>

    <component class="com.test.Consumer" />
</flow>

Mule Callback java 类作为组件添加,如上面的代码()所示。请在下面找到代码。

公共类消费者实现 org.mule.api.lifecycle.Callable{

private static DateFormat df1 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");

@Override
public Object onCall(MuleEventContext eventContext) throws Exception {
    // TODO Auto-generated method stub

    System.out.println(df1.format(new Date().getTime())+"Consumer:Message Received,"+ Thread.currentThread()+ "," + eventContext.getMessageAsString());
    Thread.sleep(5000);
    System.out.println(df1.format(new Date().getTime())+"Consumer: Message process complete, "+ Thread.currentThread()+ ","  + eventContext.getMessageAsString());

    return "Success";
}

}

如何在所有 HornetQ 集群节点上分配这个消费者?

4

0 回答 0