我最近开始使用 spring webflux 和 Rabbitmq 以及 cassandra 反应存储库。我注意到的是,即使保存在 cassandra 中的某些元素没有成功,该消息也会得到确认。我在保存过程中抛出异常,但即使消息已从队列中删除。我想知道我应该怎么做才能让 Rabbitmq 知道该消息应该被视为失败(我想拒绝消息以将其发送到死信队列)
@RabbitListener(queues = Constants.SOME_QUEUE, returnExceptions = "true")
public void receiveMessage(final List<ItemList> itemList) {
log.info("Received message from queue: {}", Constants.SOME_QUEUE);
itemService.saveAll(itemList)
.subscribe(
item-> log.info("Saving item with {}", item.getId()),
error -> {
log.error("Error during saving item", error);
throw new AmqpRejectAndDontRequeueException(error.getMessage());
},
() -> log.info(Constants.SOME_QUEUE+
" queue - {} items saved", itemList.size())
);
}