我正在使用 ReactiveMongoTemplate MongoDB ChangeStream 来监听 MongoDB 集合的更新,执行一些查询,并将文档持久保存到另一个集合。虽然它在本地运行良好,但在部署到体积很大的 UAT 后开始出现以下错误:
Too many operations are already waiting for a collection. Max number of operations (maxWaitQueueSize) of 500 has been exceeded.
有什么方法可以解决这个问题?
我在 application.yml 文件中有以下内容
spring:
data:
mongodb:
uri: mongodb://host:port/db?authMechanism=<val1>&authSource=<val2>&authechanismProperties=<val3>
这就是简化的变更流媒体的样子:
@Autowired
ReactiveMongoTemplate reactiveMongoTemplate;
reactiveMongoTemplate
.changeStream(Sample.class)
.watchCollection("sample_collection")
.filter(
new Criteria.orOperator(
where("operationType").is("update"),
where("operationType").is("insert")
)
)
.listen()
.flatMap(r->processMessage(r)). // processMessage does some queries to collections including this collection being listened to and upserts to same mongodb in a different collection
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
我知道我可能需要添加一些连接池才能处理多个连接?但是如何使用 Reactive MongoDB 进行配置呢?我是反应式编程的新手。任何指针都会非常有帮助。