0

我想处理一条 AMQP 消息,然后将其发送到另一个队列以进行进一步处理。

我正在使用 Spring 集成 DSL 将其存档,如下所示,

@Bean
public IntegrationFlow ocr(ConnectionFactory connectionFactory, AmqpTemplate amqpTemplate,
                           OCRService ocrService) {
    return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, NOTE_INCOMING_QUEUE)
            .concurrentConsumers(2))
            .transform(new JsonToObjectTransformer(Note.class))
            .handle(msg -> {
                // doing ocr here
                amqpTemplate.convertAndSend(NOTE_EXCHANGE, NOTE_OCRED_BINDING, note);
            })
            .get();
}

@Bean
public IntegrationFlow typeProcess(ConnectionFactory connectionFactory) {
    return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, NOTE_OCRED_QUEUE)
            .concurrentConsumers(4))
            .transform(new JsonToObjectTransformer(Note.class))
            .handle(msg -> {
                // doing some work for type processing
            }).get();
}

但是我发现当新消息在类型处理阶段处理时, NOTE_INCOMING_QUEUE队列中的消息仍然未被确认。请参阅下面的 rabbitmq 管理截图。

在此处输入图像描述

我想知道为什么即使处理程序已经成功执行, NOTE_INCOMING_QUEUE中的消息仍然未被确认。是spring集成amqp的设计还是我的代码有问题?

4

1 回答 1

1

使用Amqp.inboundAdapter()而不是网关 - 网关正在等待永远不会到达的回复。

网关用于请求/回复场景;通道适配器适用于单向方案。

于 2017-05-15T12:58:30.860 回答