我想处理一条 AMQP 消息,然后将其发送到另一个队列以进行进一步处理。
我正在使用 Spring 集成 DSL 将其存档,如下所示,
@Bean
public IntegrationFlow ocr(ConnectionFactory connectionFactory, AmqpTemplate amqpTemplate,
OCRService ocrService) {
return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, NOTE_INCOMING_QUEUE)
.concurrentConsumers(2))
.transform(new JsonToObjectTransformer(Note.class))
.handle(msg -> {
// doing ocr here
amqpTemplate.convertAndSend(NOTE_EXCHANGE, NOTE_OCRED_BINDING, note);
})
.get();
}
@Bean
public IntegrationFlow typeProcess(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, NOTE_OCRED_QUEUE)
.concurrentConsumers(4))
.transform(new JsonToObjectTransformer(Note.class))
.handle(msg -> {
// doing some work for type processing
}).get();
}
但是我发现当新消息在类型处理阶段处理时, NOTE_INCOMING_QUEUE队列中的消息仍然未被确认。请参阅下面的 rabbitmq 管理截图。
我想知道为什么即使处理程序已经成功执行, NOTE_INCOMING_QUEUE中的消息仍然未被确认。是spring集成amqp的设计还是我的代码有问题?