抽象的问题
有什么方法可以mergeMap
按照外部 observable 的原始顺序消耗 a 的结果,同时仍然允许内部 observable 并行运行?
更详细的解释
让我们看一下两个合并映射运算符:
-
...它需要一个映射回调,以及可以同时运行的内部可观察对象的数量:
of(1, 2, 3, 4, 5, 6).pipe( mergeMap(number => api.get('/double', { number }), 3) );
在此处查看实际操作:https ://codepen.io/JosephSilber/pen/YzwVYNb?editors=1010
这将分别触发
1
、2
和的 3 个并行请求3
。一旦其中一个请求完成,它将触发另一个请求4
。以此类推,始终保持 3 个并发请求,直到处理完所有值。但是,由于先前的请求可能在后续请求之前完成,因此产生的值可能是无序的。所以而不是:
[2, 4, 6, 8, 10, 12]
...我们实际上可能会得到:
[4, 2, 8, 10, 6, 12] // or any other permutation
-
...输入
concatMap
。该运算符确保所有可观察对象都按原始顺序连接,因此:of(1, 2, 3, 4, 5, 6).pipe( concatMap(number => api.get('/double', { number })) );
...将始终产生:
[2, 4, 6, 8, 10, 12]
在这里查看它:https ://codepen.io/JosephSilber/pen/OJMmzpy?editors=1010
这是我们想要的,但现在请求不会并行运行。正如文档所说:
concatMap
等价于mergeMap
参数concurrency
设置为1
。
回到问题:是否有可能获得 的好处mergeMap
,即可以并行运行给定数量的请求,同时仍然以原始顺序发出映射值?
我的具体问题
上面抽象地描述了这个问题。当您知道手头的实际问题时,有时会更容易推理问题,所以这里是:
我有一份必须发货的订单清单:
const orderNumbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
我有一种
shipOrder
实际发送订单的方法。它返回一个Promise
:const shipOrder = orderNumber => api.shipOrder(orderNumber);
API 最多只能同时处理 5 个订单发货,所以我
mergeMap
用来处理:from(orderNumbers).pipe( mergeMap(orderNumber => shipOrder(orderNumber), 5) );
订单发货后,我们需要打印其发货标签。我有一个
printShippingLabel
功能,给定发货订单的订单号,将打印其发货标签。所以我订阅了我们的 observable,并在输入值时打印运输标签:from(orderNumbers) .pipe(mergeMap(orderNumber => shipOrder(orderNumber), 5)) .pipe(orderNumber => printShippingLabel(orderNumber));
这可行,但现在运输标签打印乱序,因为
mergeMap
根据何时shipOrder
完成其请求发出值。我想要的是标签以与原始列表相同的顺序打印。
那可能吗?
可视化
有关问题的可视化,请参见此处:https ://codepen.io/JosephSilber/pen/YzwVYZb?editors=1010
您可以看到较早的订单在后续订单发货之前就已打印。