对此可能有一个愚蠢的简单答案,但我正在尝试使用 ActiveMQ 在生产者和消费者之间传递消息。我将有许多生产者和许多消费者,但我希望每条消息在消费者之间只传递一次。这似乎意味着我不能使用主题,因为它们会将消息传递给所有正在收听的消费者,而我希望只有一个消费者接收每条消息。
我的问题是我能够接收消息,但消息没有出队。所以如果我重新启动我的消费者进程,所有的消息都会被重新处理。 这个答案似乎是相关的,但似乎并不适用,因为我无法创建持久的队列消费者,只能创建持久的主题消费者(除非我在 API 文档中遗漏了一些东西)。
我的代码如下。
TopicConnectionFactory factory = new ActiveMQConnectionFactory(props.getProperty("mq.url"));
Connection conn = factory.createConnection();
Session session = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(props.getProperty("mq.source_queue"));
conn.start();
MessageConsumer consumer = session.createConsumer(queue);
然后稍后
Message msg = consumer.receive();
msg.acknowledge();
if (!(msg instanceof TextMessage)) continue;
String msgStr = ((TextMessage)msg).getText();
这是我当前的代码。我尝试过使用 Session.AUTO_ACKNOWLEDGE 而没有使用 msg.acknowledge()。此代码的任何工作排列似乎都可以检索消息,但是当我重新启动消费者时,所有消息都会再次收到,即使它们是在重新启动之前收到的。