0

我有一个响应式 Spring Boot 应用程序,它使用来自 RabbitMQ 的消息并将它们保存在(MongoDB)存储库中:

@RabbitListener(...)
public void processMessage(Message message) {
    repository.persist(message).subscribe();
}

假设多条消息在短时间内到达,此代码可能会耗尽数据库的已配置 ConnectionPool。如果我会在 a 中接收消息Flux,我可以将concatMap()它们放入数据库或将它们插入到 n 个文档的存储桶中。

这就是为什么我尝试将给定的 RabbitMQ 侦听器连接到自我管理的 Flux 的原因:

@Component
public class QueueListenerController {

    private final MyMongoRepository repository;
    private final FluxProcessor<Message, Message> fluxProcessor;
    private final FluxSink<Message> fluxSink;

    public QueueListenerController(MyMongoRepository repository) {
        this.repository = repository;
        this.fluxProcessor = DirectProcessor.<Message>create().serialize();
        this.fluxSink = fluxProcessor.sink();
    }

    @PostConstruct
    private void postConstruct() {
        fluxProcessor.concatMap(repository::persist)
                .subscribe();
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "my-queue", durable = "true", autoDelete = "false"),
            exchange = @Exchange(value = "amq.direct", durable = "true", autoDelete = "false")
    ))
    public void processMessage(Message message) {
        fluxSink.next(message);
    }
}

这在本地和一段时间内有效,但在一段时间后(我预计 12-24 小时)它会停止在数据库中存储消息,所以我很确定我做错了什么。

将传入的 RabbitMQ 消息转换Flux为消息的正确方法是什么?

4

0 回答 0