0

我使用以下 url 来创建 ActiveMQConnactionFactory:

failover:(tcp://server1:port,tcp://server2:port,tcp://server2:port)

我想做的是从这个代理网络创建多个消息消费者。以下不是真正的代码,但有助于理解我是如何做到的:

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("BROKER_URL");
connection = connectionFactory.createConnection();
connection.start();

for (int i=0; i<10; i++) {
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Destination queue = consumerSession.createQueue("QUEUE_NAME");
consumer = consumerSession.createConsumer(queue);
consumer.setMessageListener(new MessageListener());
}

问题是所有消费者都将连接到一个随机选择的代理。但我希望他们在经纪人网络上保持平衡。

我相信可以通过与工厂建立多个联系来做到这一点。

但是,最好的做法是什么?这是我想要的好事吗?:)

4

1 回答 1

0

实际上,消费者不会连接到随机选择的代理。

连接是连接到代理的部分。使用您提供的连接字符串,您将有一个连接映射到一个随机选择的代理。所有消费者都有自己的会话,但这些会话将使用与该 ONE 代理的相同 ONE 连接。

我知道的唯一设置是,您可以通过设置?randomize=false连接字符串来禁用故障转移协议的随机化行为。这意味着您的连接将首先尝试第一个,然后是第二个,然后是第三个,依此类推。

但要达到你的要求。我会让每个消费者都有自己的联系。这与故障转移协议中的随机化功能一起,可以平衡消费者的负载;但不是真的,那里没有情报,只是“随机化”它连接的经纪人。

这意味着,我将执行以下操作(来自您的代码)

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("BROKER_URL");

for (int i=0; i<10; i++) {

connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Destination queue = consumerSession.createQueue("QUEUE_NAME");
consumer = consumerSession.createConsumer(queue);
consumer.setMessageListener(new MessageListener());

}

这样,每个消费者都将拥有自己的连接到您的故障转移连接字符串的“一个”代理

问题更改后更新:

如果你想让 ActiveMQ 为每个消费者随机选择一个代理,上面提到的解决方案是要走的路。

最佳实践是让您的消费者和生产者尽可能靠近。为此,我建议降低网络消费者优先级,以便本地消费者和生产者具有最高优先级。只有当本地消费者没有空闲时,它才会通过网络进一步分发给其他消费者。

除此之外,如果消费者端的操作长时间运行以设置较低的预取值,这将是一个好主意,以便消息确实在代理网络周围得到负载平衡,而不是一个消费者抢夺 1,000 条消息而其他消费者消费者闲着。

于 2011-03-16T13:18:47.030 回答