1

我正在使用 ReactiveMongoTemplate MongoDB ChangeStream 来监听 MongoDB 集合的更新,执行一些查询,并将文档持久保存到另一个集合。虽然它在本地运行良好,但在部署到体积很大的 UAT 后开始出现以下错误:

Too many operations are already waiting for a collection. Max number of operations (maxWaitQueueSize) of 500 has been exceeded.

有什么方法可以解决这个问题?

我在 application.yml 文件中有以下内容

spring:
   data: 
      mongodb:
          uri: mongodb://host:port/db?authMechanism=<val1>&authSource=<val2>&authechanismProperties=<val3>

这就是简化的变更流媒体的样子:

@Autowired
ReactiveMongoTemplate reactiveMongoTemplate;

reactiveMongoTemplate
  .changeStream(Sample.class)
  .watchCollection("sample_collection")
  .filter(
     new Criteria.orOperator(
        where("operationType").is("update"),
        where("operationType").is("insert")
     )
  )
  .listen()
  .flatMap(r->processMessage(r)). // processMessage does some queries to collections including this collection being listened to and upserts to same mongodb in a different collection
  .subscribeOn(Schedulers.boundedElastic())
  .subscribe();

我知道我可能需要添加一些连接池才能处理多个连接?但是如何使用 Reactive MongoDB 进行配置呢?我是反应式编程的新手。任何指针都会非常有帮助。

4

1 回答 1

1

您可以在这里做几件事:

  1. 检查是否有一些长时间的阻塞调用,导致线程被阻塞,并导致创建大量连接,因为之前的连接仍然被占用执行繁重的任务。尝试检查阻止这些调用的代码的一些优化。在反应式编程中,您可以使用BlockHound.

  2. 通过指定waitQueueMultiple或增加连接限制maxPoolSize- https://docs.mongodb.com/manual/reference/connection-string/#connection-pool-options

在此之前,您可以检查您的 mongo db stats 以查看当前和允许的连接,使用

db.serverStatus().connections
于 2020-08-21T17:07:55.983 回答