2

最近我遇到了来自 mongo-cursor-readable 的 rx.js 的背压问题。任务基本上是:

  1. 查询 DB A (Mongo),使用它的可读性并将其转换为可观察的
  2. 对输入流执行各种异步转换,例如

    2.1 查询另一个 DB B(Mongo,等等)以获取一个项目(或一批项目)的附加数据,然后对其执行(可能是异步的)操作

    2.2 为一个项目(或一批项目)查询另一个 DB B(Mongo,无论如何),然后使用该附加数据执行过滤

  3. 计算输入流并将其与输出流进行比较(因为可能会发生过滤)

虽然我确实有解决所有问题的方法,但我正在处理来自第一个 Mongo-DB 的背压,因为 2. 或 2.1/2.2 可能非常耗时,但初始查询的性能要高得多 ( a.k.a consumers are slow, producers are fast)

目前,我唯一的解决方案是将 1. 限制在合理的数量以减少初始吞吐量,但这显然会导致整个转换链的吞吐量变慢(生产者需要长时间等待,直到整个链完成工作)

如何实现一个始终忙碌但又不受背压的转型链?

我在这里考虑根据其消费者的当前(或预计)性能(例如转换所花费的时间)动态降低生产者的速度,但这对我来说似乎不是很直观。

4

0 回答 0