0

我刚开始使用 ActiveMQ,我似乎有一个奇怪的问题。(以下来源)

有2个场景

  1. 消费者连接到代理,等待队列中的任务。生产者稍后到达,删除任务列表,它们被不同的消费者正确地占用并执行。这很好用,我也模拟了它。

  2. 生产者首先连接,删除任务列表。此时没有连接消费者。现在当让我们说 3 个消费者 - C1、C2 和 C3 连接到代理(按该顺序)时,我看到只有 C1 接手并执行生产者丢弃的任务。C2 和 C3 保持空闲。为什么会这样?

我还注意到关于第二种情况的另外一件事——如果生产者继续在队列中删除任务,C2 和 C3 会拾取任务,但是如果生产者之前已经删除了任务(如前所述),那么 C2 和 C3 不会做任何事情。

生产者代码

package com.activemq.apps;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

import com.commons.helpers.Maths;

public class Publisher implements MessageListener {

    private static String _URL;
    private static String _TOPIC_PUBLISH;
    private static String _TOPIC_CONSUME;

    public Publisher (String URL, String TOPIC) {

        _URL = URL;
        _TOPIC_PUBLISH = TOPIC + "_REQUESTS";
        _TOPIC_CONSUME = TOPIC + "_RESPONSES";

    }

    public void initialize() {

        try
        {

            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(_URL);
            Connection connection = connectionFactory.createConnection();
            connection.start();

            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            Destination destinationProducer = session.createQueue(_TOPIC_PUBLISH);
            Destination destinationConsumers = session.createQueue(_TOPIC_CONSUME);

            MessageProducer producer = session.createProducer(destinationProducer);
            MessageConsumer consumer = session.createConsumer(destinationConsumers);

            consumer.setMessageListener(this);

            int count = 0;

            System.out.println("Sending requests");

            while (true)
            {
                int randomSleepTime = Maths.rand(1000, 5000);

                String messageToSend = count + "_" + randomSleepTime;

                TextMessage message = session.createTextMessage(messageToSend);

                producer.send(message);

                System.out.println("Job #" + count + " | " + (randomSleepTime/1000) + "s");

                if (count++%10 == 0)
                    Thread.sleep(10000);

            }
        }

        catch (JMSException ex)
        {
            ex.printStackTrace();
        }

        catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    @Override
    public void onMessage(Message message) {

        if (message instanceof TextMessage)
        {
            TextMessage msg = (TextMessage) message;

            try {

                String response = msg.getText();
                String[] responseSplit = response.split("_");

                String clientId = responseSplit[1];
                String count = responseSplit[0];

                System.out.println("Got response from " + clientId + " Job #" + count);
            } 

            catch (JMSException e) {
                e.printStackTrace();
            }
        }

    }

}

消费者代码

package com.activemq.apps;

import java.util.UUID;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer implements MessageListener {

    private static String _URL;
    private static String _TOPIC_PUBLISH;
    private static String _TOPIC_CONSUME;
    private static String _CLIENTID;

    private MessageProducer producer;
    private Session session;

    public Consumer (String URL, String TOPIC) {

        _URL = URL;
        _TOPIC_PUBLISH = TOPIC + "_RESPONSES";
        _TOPIC_CONSUME = TOPIC + "_REQUESTS";

    }

    public void initialize() {

        try
        {

            _CLIENTID = UUID.randomUUID().toString();

            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(_URL);
            Connection connection = connectionFactory.createConnection();
            connection.start();

            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            Destination destinationProducer = session.createQueue(_TOPIC_PUBLISH);
            Destination destinationConsumers = session.createQueue(_TOPIC_CONSUME);

            producer = session.createProducer(destinationProducer);
            MessageConsumer consumer = session.createConsumer(destinationConsumers);

            consumer.setMessageListener(this);

            System.out.println("Client: " + _CLIENTID + "\nWaiting to pick up tasks");
        }

        catch (JMSException ex)
        {
            ex.printStackTrace();
        }

    }

    @Override
    public void onMessage(Message message) {

        if (message instanceof TextMessage)
        {
            TextMessage msg = (TextMessage) message;

            try 
            {

                String[] messageSplits = msg.getText().split("_");

                String count = messageSplits[0];
                String timeString = messageSplits[1];

                int sleepFor = Integer.parseInt(timeString);

                System.out.println("Job #" + count + " | Sleeping for " + (sleepFor/1000) + "s");

                Thread.sleep(sleepFor);

                TextMessage sendToProducer = session.createTextMessage(count + "_" + _CLIENTID);

                producer.send(sendToProducer);
            } 

            catch (JMSException e) {
                e.printStackTrace();
            } 

            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

}
4

3 回答 3

1

你提到过

现在让我们说 3 个消费者 - C1、C2 和 C3 连接到代理(按此顺序)

由于 C1 首先连接,它在连接后立即开始获取队列中的所有消息。这是意料之中的。所以我在这里没有看到任何问题。C2、C3 不是空闲的,但 C1 在 C2 和 C3 之前已经掌握了消息。

我不确定生产者发送了多少条消息。我假设消息的数量较少。要查看您的期望,请尝试从生产者发送许多消息,例如数千或数百万条消息,然后启动消费者。大量消息是主观的,取决于内存、网络和其他资源。你可能会看到你所期待的。

于 2013-07-04T02:37:57.910 回答
0

我不认为这里有什么奇怪的。队列代表 P2P 模式,它应该只有一个消费者。在我们的例子中,我们有 3 个消费者,这不是被禁止的,但不能保证消费者接收消息的任何特定顺序。

于 2013-07-03T05:15:40.023 回答
0

我相信您应该为您的消费者检查 prefetchPolicy 参数。默认情况下,预取策略的默认值为 1000。这意味着第一个消费者最多获取 1000 条消息,其他消费者无法获取其他任何内容。如果您正在尝试对消费者之间的消息进行负载平衡,请考虑将此参数修改为 0 或 1。

于 2019-08-22T06:58:31.600 回答