0

我最近开始使用 spring webflux 和 Rabbitmq 以及 cassandra 反应存储库。我注意到的是,即使保存在 cassandra 中的某些元素没有成功,该消息也会得到确认。我在保存过程中抛出异常,但即使消息已从队列中删除。我想知道我应该怎么做才能让 Rabbitmq 知道该消息应该被视为失败(我想拒绝消息以将其发送到死信队列)

@RabbitListener(queues = Constants.SOME_QUEUE, returnExceptions = "true")
public void receiveMessage(final List<ItemList> itemList) {
    log.info("Received message from queue: {}", Constants.SOME_QUEUE);
    itemService.saveAll(itemList)
            .subscribe(
                    item-> log.info("Saving item with {}", item.getId()),
                    error -> {
                        log.error("Error during saving item", error);
                        throw new AmqpRejectAndDontRequeueException(error.getMessage());
                    },
                    () -> log.info(Constants.SOME_QUEUE+
                            " queue - {} items saved", itemList.size())
            );
}
4

2 回答 2

0

反应式是非阻塞的;一旦侦听器线程返回容器,消息就会被确认。您需要以某种方式阻止侦听器线程(例如使用 a Future<?>)并在 cassandra 操作完成时将其唤醒,如果成功则正常退出,或者在失败时抛出异常以便重新传递消息。

于 2018-05-28T14:51:18.690 回答
0

我通过向rabbitmq 发送明确的确认/拒绝消息解决了我的问题。这导致我被迫编写更多代码,但现在至少它可以工作并且我可以完全控制正在发生的事情。

于 2018-05-29T07:58:29.430 回答