7

抽象的问题

有什么方法可以mergeMap按照外部 observable 的原始顺序消耗 a 的结果,同时仍然允许内部 observable 并行运行?


更详细的解释

让我们看一下两个合并映射运算符:

  • mergeMap

    ...它需要一个映射回调,以及可以同时运行的内部可观察对象的数量:

      of(1, 2, 3, 4, 5, 6).pipe(
          mergeMap(number => api.get('/double', { number }), 3)
      );
    

    在此处查看实际操作https ://codepen.io/JosephSilber/pen/YzwVYNb?editors=1010

    这将分别触发12和的 3 个并行请求3。一旦其中一个请求完成,它将触发另一个请求4。以此类推,始终保持 3 个并发请求,直到处理完所有值。

    但是,由于先前的请求可能在后续请求之前完成,因此产生的值可能是无序的。所以而不是:

      [2, 4, 6, 8, 10, 12]
    

    ...我们实际上可能会得到:

      [4, 2, 8, 10, 6, 12] // or any other permutation
    
  • concatMap

    ...输入concatMap。该运算符确保所有可观察对象都按原始顺序连接,因此:

      of(1, 2, 3, 4, 5, 6).pipe(
          concatMap(number => api.get('/double', { number }))
      );
    

    ...将始终产生:

      [2, 4, 6, 8, 10, 12]
    

    在这里查看它https ://codepen.io/JosephSilber/pen/OJMmzpy?editors=1010

    这是我们想要的,但现在请求不会并行运行。正如文档所说:

    concatMap等价于mergeMap参数concurrency设置为1

回到问题:是否有可能获得 的好处mergeMap,即可以并行运行给定数量的请求,同时仍然以原始顺序发出映射值?


我的具体问题

上面抽象地描述了这个问题。当您知道手头的实际问题时,有时会更容易推理问题,所以这里是:

  1. 我有一份必须发货的订单清单:

     const orderNumbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
    
  2. 我有一种shipOrder实际发送订单的方法。它返回一个Promise

     const shipOrder = orderNumber => api.shipOrder(orderNumber);
    
  3. API 最多只能同时处理 5 个订单发货,所以我mergeMap用来处理:

     from(orderNumbers).pipe(
         mergeMap(orderNumber => shipOrder(orderNumber), 5)
     );
    
  4. 订单发货后,我们需要打印其发货标签。我有一个printShippingLabel功能,给定发货订单的订单号,将打印其发货标签。所以我订阅了我们的 observable,并在输入值时打印运输标签:

     from(orderNumbers)
         .pipe(mergeMap(orderNumber => shipOrder(orderNumber), 5))
         .pipe(orderNumber => printShippingLabel(orderNumber));
    
  5. 这可行,但现在运输标签打印乱序,因为mergeMap根据何时shipOrder完成其请求发出值。我想要的是标签以与原始列表相同的顺序打印

那可能吗?


可视化

有关问题的可视化,请参见此处:https ://codepen.io/JosephSilber/pen/YzwVYZb?editors=1010

您可以看到较早的订单在后续订单发货之前就已打印。

4

4 回答 4

4

我确实设法部分解决了它,所以我在这里发布它作为我自己问题的答案。

我仍然非常想知道处理这种情况的规范方法。


一个复杂的解决方案

  1. 创建一个自定义运算符,该运算符接受具有索引键的值({ index: number }Typescript 用语中),并保留值的缓冲区,仅根据它们index的顺序发出它们。

  2. 将原始列表映射到index嵌入对象的列表中。

  3. 将其传递给我们的自定义sortByIndex运算符。

  4. 将值映射回其原始值。

这就是它的sortByIndex样子:

function sortByIndex() {
    return observable => {
        return Observable.create(subscriber => {
            const buffer = new Map();
            let current = 0;
            return observable.subscribe({
                next: value => {
                    if (current != value.index) {
                        buffer.set(value.index, value);
                    } else {
                        subscriber.next(value);
                    
                        while (buffer.has(++current)) {
                            subscriber.next(buffer.get(current));
                            buffer.delete(current);
                        }
                    }
                },
                complete: value => subscriber.complete(),
            });
        });
    };
}

有了sortByIndex操作符,我们现在可以完成整个管道:

of(1, 2, 3, 4, 5, 6).pipe(
    map((number, index) => ({ number, index })),
    mergeMap(async ({ number, index }) => {
        const doubled = await api.get('/double', { number });
        return { index, number: doubled };
    }, 3),
    sortByIndex(),
    map(({ number }) => number)
);

在此处查看实际操作https ://codepen.io/JosephSilber/pen/zYrwpNj?editors=1010

创建concurrentConcat运算符

事实上,有了这个sortByIndex运算符,我们现在可以创建一个通用concurrentConcat运算符,它将在{ index: number, value: T }内部进行类型的转换:

function concurrentConcat(mapper, parallel) {
    return observable => {
        return observable.pipe(
            mergeMap(
                mapper,
                (_, value, index) => ({ value, index }),
                parallel
            ),
            sortByIndex(),
            map(({ value }) => value)
        );
    };
}

然后我们可以使用这个concurrentConcat操作符来代替mergeMap,它现在会按照原来的顺序发出值:

of(1, 2, 3, 4, 5, 6).pipe(
    concurrentConcat(number => api.get('/double', { number }), 3),
);

在此处查看实际操作https ://codepen.io/JosephSilber/pen/pogPpRP?editors=1010

所以要解决我原来的订单发货问题:

from(orderNumbers)
    .pipe(concurrentConcat(orderNumber => shipOrder(orderNumber), maxConcurrent))
    .subscribe(orderNumber => printShippingLabel(orderNumber));

在这里查看它https ://codepen.io/JosephSilber/pen/rNxmpWp?editors=1010

您可以看到,即使后来的订单最终可能会在较早的订单之前发货,但标签始终按原始顺序打印。


结论

这个解决方案甚至不完整(因为它不处理发出多个值的内部可观察对象),但它需要一堆自定义代码。这是一个常见的问题,我觉得必须有一种更简单(内置)的方法来解决这个问题:|

于 2019-07-15T19:13:04.637 回答
1

您可以使用此运算符:sortedMergeMap示例

const DONE = Symbol("DONE");
const DONE$ = of(DONE);
const sortedMergeMap = <I, O>(
  mapper: (i: I) => ObservableInput<O>,
  concurrent = 1
) => (source$: Observable<I>) =>
  source$.pipe(
    mergeMap(
      (value, idx) =>
        concat(mapper(value), DONE$).pipe(map(x => [x, idx] as const)),
      concurrent
    ),
    scan(
      (acc, [value, idx]) => {
        if (idx === acc.currentIdx) {
          if (value === DONE) {
            let currentIdx = idx;
            const valuesToEmit = [];
            do {
              currentIdx++;
              const nextValues = acc.buffer.get(currentIdx);
              if (!nextValues) {
                break;
              }
              valuesToEmit.push(...nextValues);
              acc.buffer.delete(currentIdx);
            } while (valuesToEmit[valuesToEmit.length - 1] === DONE);
            return {
              ...acc,
              currentIdx,
              valuesToEmit: valuesToEmit.filter(x => x !== DONE) as O[]
            };
          } else {
            return {
              ...acc,
              valuesToEmit: [value]
            };
          }
        } else {
          if (!acc.buffer.has(idx)) {
            acc.buffer.set(idx, []);
          }
          acc.buffer.get(idx)!.push(value);
          if (acc.valuesToEmit.length > 0) {
            acc.valuesToEmit = [];
          }
          return acc;
        }
      },
      {
        currentIdx: 0,
        valuesToEmit: [] as O[],
        buffer: new Map<number, (O | typeof DONE)[]>([[0, []]])
      }
    ),
    mergeMap(scannedValues => scannedValues.valuesToEmit)
  );
于 2020-06-25T10:42:08.867 回答
1

结果

https://youtu.be/NEr6qfPlahY

request 1
request 2
request 3
response 3
request 4
response 1
request 5
1
response 4
request 6
response 2
request 7
2
3
4
response 6
request 8
response 5
request 9
5
6
response 7
request 10
7
response 9
response 10
response 8
8
9
10

代码

https://stackblitz.com/edit/js-5kvwl6?file=index.js

import { range, Subject, from, of } from 'rxjs';
import { concatMap, share, map, concatAll, delayWhen } from 'rxjs/operators';

const pipeNotifier = new Subject().pipe(share());

range(1, 10)
  .pipe(
    // 1. Make Observable controlled by pipeNotifier
    concatMap((v) => of(v).pipe(delayWhen(() => pipeNotifier))),
    // 2. Submit the request
    map((v) =>
      from(
        (async () => {
          console.log('request', v);
          await wait();
          console.log('response', v);

          pipeNotifier.next();

          return v;
        })()
      )
    ),
    // 3. Keep order
    concatAll()
  )
  .subscribe((x) => console.log(x));

// pipeNotifier controler
range(0, 3).subscribe(() => {
  pipeNotifier.next();
});

function wait() {
  return new Promise((resolve) => {
    const random = 5000 * Math.random();
    setTimeout(() => resolve(random), random);
  });
}
于 2022-02-15T16:10:13.017 回答
0

你想要的是这样的:

from(orderNumbers)
  .pipe(map(shipOrder), concatAll())
  .subscribe(printShippingLabel)

解释:

管道中的第一个运算符是map。它立即为每个值调用shipOrder (因此后续值可能会启动并行请求)。

第二个运算符concatAll将解析的值按正确的顺序排列。

(我简化了代码;concatAll() 等价于 concatMap(identity)。)

于 2020-06-23T12:39:07.263 回答