我正在使用 Java 实现 Akka Alpakka,以便从 ActiveMQ 队列中消费和生产。我可以成功地从队列中消费,但我还不能实现应用程序级的消息确认。
我的目标是使用队列中的消息并将它们发送给另一个参与者进行处理。当该参与者完成处理后,我希望它能够控制 ActiveMQ 中消息的确认。据推测,这可以通过向另一个可以进行确认的参与者发送消息、在消息本身上调用确认函数或其他方式来完成。
在我的测试中,有 2 条消息被放入 AlpakkaTest 队列,然后这段代码尝试消费并确认它们。但是,我看不到将 ActiveMQ 会话设置为 CLIENT_ACKNOWLEDGE 的方法,并且无论是否调用m.acknowledge();
. 因此,我认为消息仍在被自动确认。
有谁知道为 CLIENT_ACKNOWLEDGE 配置 ActiveMQ 会话并使用 Alpakka 在 Java Akka 系统中手动确认 ActiveMQ 消息的公认方法?
相关的测试函数为:
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://0.0.0.0:2999"); // An embedded broker running in the test.
Source<Message, NotUsed> jmsSource = JmsSource.create(
JmsSourceSettings.create(connectionFactory)
.withQueue("AlpakkaTest")
.withBufferSize(2)
);
Materializer materializer = ActorMaterializer.create(system); // `system` is an ActorSystem passed to the function.
try {
List<Message> messages = jmsSource
.take(2)
.runWith(Sink.seq(), materializer)
.toCompletableFuture().get(4, TimeUnit.SECONDS);
for(Message m:messages) {
System.out.println("Found Message ID: " + m.getJMSMessageID());
try {
m.acknowledge();
} catch(JMSException jmsException) {
System.out.println("Acknowledgement Failed for Message ID: " + m.getJMSMessageID() + " (" + jmsException.getLocalizedMessage() + ")");
}
}
} catch (InterruptedException e1) {
e1.printStackTrace();
} catch (ExecutionException e1) {
e1.printStackTrace();
} catch (TimeoutException e1) {
e1.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
此代码打印:
Found Message ID: ID:jmstest-43178-1503343061195-1:26:1:1:1
Found Message ID: ID:jmstest-43178-1503343061195-1:27:1:1:1