0

我检查了 clustered-static-discovery,也检查了基于 udp 的集群,如果我在集群中有 2 个节点,则必须需要 2 个连接,如果我有 4 个,则必须需要 4 个连接才能循环使用消息。

假设我有 2 个服务器需要 2 个连接,如果我只创建了一个连接或侦听器,并且将产生 10 条消息,那么我将错过 5 条消息。

我们如何在一个连接中接收消息而不是创建多个连接(取决于使用了多少服务器节点)。因为有添加运行时节点的场景,所以我们会错过那些将要运行时添加节点的消息

这是示例我有 2 个节点(在集群中)和一个连接

import javax.jms.*;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.util.ServerUtil;
/**
 * A simple example that demonstrates server side load-balancing of messages between the queue instances on different
 * nodes of the cluster. The cluster is created from a static list of nodes.
 */
public class StaticClusteredQueueExample {
   public static void main(final String[] args) throws Exception {
      Connection connection0 = null;
      try {
         
         Topic topic = ActiveMQJMSClient.createTopic("exampleTopic");
        
         ConnectionFactory cf0 = new ActiveMQConnectionFactory("tcp://localhost:9616");
         Thread.sleep(2000);
         
         connection0 = cf0.createConnection();
         final String clientID = "admin";
         connection0.setClientID(clientID);
         final String subscriptionName = "mySub";
      
         Session session0 = connection0.createSession(false, Session.AUTO_ACKNOWLEDGE);
         connection0.start();
         
         MessageConsumer subscriber0 = session0.createDurableSubscriber(topic, subscriptionName);
         Thread.sleep(2000);
         
         MessageProducer producer = session0.createProducer(topic);
         //  We send 20 messages to server 
         final int numMessages = 20;
         for (int i = 0; i < numMessages; i++) {
            TextMessage message = session0.createTextMessage("This is text message " + i);
            producer.send(message);
            System.out.println("Sent message: " + message.getText());
         }
         Thread.sleep(2000);
                  
         for (int i = 0; i < numMessages; i += 2) {
            try {
               TextMessage message0 = (TextMessage) subscriber0.receive(5000);
               System.out.println("" + message0.getText() + ": from node " + ServerUtil.getServer(connection0));
            } catch (Exception e) {}
         }
      } finally {
         // Step 15. Be sure to close our resources!
         if (connection0 != null) {
            connection0.close();
         }
      }
   }
}

在上面的示例中,生产者发送 20 条消息,但是当我打印输出时,它只打印 10 条消息而不是 20 条

4

1 回答 1

0

正如集群文档<message-load-balancing>所述,集群中消息负载平衡的方式取决于<cluster-connection>. 该文档指出:

此参数确定消息是否/如何在集群的其他节点之间分发。它可以是三个值之一 - OFFSTRICTON_DEMAND(默认值)。此参数替换不推荐使用的forward-when-no-consumers参数。

如果将其设置为,OFF则消息将永远不会转发到集群中的另一个节点

如果将其设置为,STRICT那么即使集群的其他节点上的相同队列可能根本没有消费者,或者它们可能具有具有不匹配消息过滤器(选择器)的消费者,每个传入消息都将被轮询。请注意,如果其他节点上没有同名队列,则 Apache ActiveMQ Artemis 不会将消息转发到其他节点,即使此参数设置为STRICT. 使用STRICT就像将 legacyforward-when-no-consumers参数设置为true.

如果将其设置为,ON_DEMAND则 Apache ActiveMQ Artemis 将仅将消息转发到集群的其他节点,前提是它们被转发到的地址具有具有消费者的队列,并且如果这些消费者具有消息过滤器(选择器),则至少有一个选择器必须匹配消息。使用ON_DEMAND就像将legacy forward-when-no-consumers参数设置为false.

默认为ON_DEMAND

clustered-static-discovery示例使用STRICT<message-load-balancing>就是为什么需要连接到每个节点的原因。

另外,请记住,这会<redistribution-delay>影响此处的行为。您可以在集群文档的“消息重新分发”部分阅读更多相关信息。

如果您使用ON_DEMANDfor<message-load-balancing>和 a <redistribution-delay>that is,>=0那么您将能够使用集群中特定队列上的每条消息,从连接到单个节点。

于 2017-12-12T19:49:55.597 回答