4

我正在使用 Java 实现 Akka Alpakka,以便从 ActiveMQ 队列中消费和生产。我可以成功地从队列中消费,但我还不能实现应用程序级的消息确认。

我的目标是使用队列中的消息并将它们发送给另一个参与者进行处理。当该参与者完成处理后,我希望它能够控制 ActiveMQ 中消息的确认。据推测,这可以通过向另一个可以进行确认的参与者发送消息、在消息本身上调用确认函数或其他方式来完成。

在我的测试中,有 2 条消息被放入 AlpakkaTest 队列,然后这段代码尝试消费并确认它们。但是,我看不到将 ActiveMQ 会话设置为 CLIENT_ACKNOWLEDGE 的方法,并且无论是否调用m.acknowledge();. 因此,我认为消息仍在被自动确认。

有谁知道为 CLIENT_ACKNOWLEDGE 配置 ActiveMQ 会话并使用 Alpakka 在 Java Akka 系统中手动确认 ActiveMQ 消息的公认方法?

相关的测试函数为:

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://0.0.0.0:2999"); // An embedded broker running in the test.

Source<Message, NotUsed> jmsSource = JmsSource.create(
    JmsSourceSettings.create(connectionFactory)
        .withQueue("AlpakkaTest")
        .withBufferSize(2)
);

Materializer materializer = ActorMaterializer.create(system); // `system` is an ActorSystem passed to the function.

try {
    List<Message> messages = jmsSource
        .take(2)
        .runWith(Sink.seq(), materializer)
        .toCompletableFuture().get(4, TimeUnit.SECONDS);

    for(Message m:messages) {
        System.out.println("Found Message ID: " + m.getJMSMessageID());

        try {
            m.acknowledge();
        } catch(JMSException jmsException) {
            System.out.println("Acknowledgement Failed for Message ID: " + m.getJMSMessageID() + " (" + jmsException.getLocalizedMessage() + ")");
        }
    }
} catch (InterruptedException e1) {
    e1.printStackTrace();
} catch (ExecutionException e1) {
    e1.printStackTrace();
} catch (TimeoutException e1) {
    e1.printStackTrace();
} catch (JMSException e) {
    e.printStackTrace();
}

此代码打印:

Found Message ID: ID:jmstest-43178-1503343061195-1:26:1:1:1
Found Message ID: ID:jmstest-43178-1503343061195-1:27:1:1:1
4

2 回答 2

4

更新:自 Alpakka 0.15起,确认模式可在 JMS 连接器中配置。从链接的文档中:

Source<Message, NotUsed> jmsSource = JmsSource.create(JmsSourceSettings
    .create(connectionFactory)
    .withQueue("test")
    .withAcknowledgeMode(AcknowledgeMode.ClientAcknowledge())
);

CompletionStage<List<String>> result = jmsSource
    .take(msgsIn.size())
    .map(message -> {
        String text = ((ActiveMQTextMessage)message).getText();
        message.acknowledge();
        return text;
    })
    .runWith(Sink.seq(), materializer);

从 0.11 版开始,Alpakka 的 JMS 连接器不支持应用程序级消息确认。Alpakka 在内部创建 a Sessionwith CLIENT_ACKNOWLEDGEmode here并internal 中确认每个消息MessageListener。API 不会公开这些设置以进行覆盖。

有一个开放的票证讨论了启用基于队列的源的下游确认,但该票证已经有一段时间不活动了。

目前,您无法阻止 Alpakka 在 JMS 级别确认消息。但是,这并不妨碍您向流中添加一个阶段,该阶段将每条消息发送给参与者进行处理,并将参与者的回复用作背压信号。Akka Streams文档mapAsync描述了如何使用和ask或的组合来做到这一点Sink.actorRefWithAck。例如,要使用前者:

Timeout askTimeout = Timeout.apply(4, TimeUnit.SECONDS);

jmsSource
    .mapAsync(2, msg -> ask(processorActor, msg, askTimeout))
    .runWith(Sink.seq(), materializer);

(旁注:在相关的 Streamz 项目中,有一个最近打开的票证允许应用程序级确认。Streamz 是旧的 akka-camel 模块的替代品,并且与 Alpakka 一样,构建在 Akka Streams 之上。Streamz 也有一个 Java API 并在 Alpakka 文档中列为外部连接器。)

于 2017-08-22T11:02:01.397 回答
2

查看 Alpakka JmsSourceStage 的源代码,它已经为您确认每条传入的消息(并且它的会话是 Client Ack 会话)。据我所知,没有任何模式可以让您确认消息。

您可以在此处查看 Alpakka 的源代码。

于 2017-08-21T22:47:57.507 回答