2

我正在使用 RxJS 6 创建可管道操作符complete(),并且不清楚当操作是异步时如何观察观察者。

对于同步操作,逻辑很简单。在下面的示例中,来自源的所有值Observable都将传递给observer.next(),然后observer.complete()被调用。

const syncOp = () => (source) =>
  new rxjs.Observable(observer => {
    return source.subscribe({
      next: (x) => observer.next(x),
      error: (e) => observer.error(err),
      complete: () => observer.complete()
    })
  });
  
rxjs.from([1, 2, 3]).pipe(syncOp()).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js">
</script>

但是,对于异步操作,我有点不知所措。在下面的示例中,异步操作由对 的调用表示setTimeout()。显然,observer.complete()任何值传递给observer.next().

const asyncOp = () => (source) =>
  new rxjs.Observable(observer => {
    return source.subscribe({
      next: (x) => setTimeout(() => observer.next(x), 100),
      error: (e) => observer.error(err),
      complete: () => observer.complete()
    })
  });
  
rxjs.from([1, 2, 3]).pipe(asyncOp()).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js">
</script>

所以问题是:什么是惯用的 RxJS 方法,以便observer.complete()仅在所有值都异步传递给之后才进行调用observer.next()?我应该手动跟踪待处理的呼叫还是有更“被动”的解决方案?

(请注意,上面的示例是对我的实际代码的简化,并且调用setTimeout()旨在表示“任何异步操作”。我正在寻找一种通用方法来处理可管道运算符中的异步操作,而不是关于如何处理的建议处理 RxJS 中的延迟或超时。)

4

3 回答 3

3

一种思路可能是重组您asyncOp以使用其他运算符,例如mergeMap.

这是使用这种方法重现您的示例的代码

const asyncOp = () => source => source.pipe(mergeMap(x => of(x).pipe(delay(100))));
from([1, 2, 3]).pipe(asyncOp1()).subscribe(x => console.log(x));

这是否值得考虑取决于您的asyncOp工作。如果它是异步的,因为它依赖于一些回调,例如在 https 调用或从文件系统读取的情况下,那么我认为这种方法可以工作,因为您可以将基于回调的函数转换为 Observable。

于 2018-07-30T06:59:29.220 回答
1

仍然希望获得关于更具反应性/惯用实施的输入,但以下是我暂时决定使用的内容。

本质上,我只是将一个计数器用于进行中的操作(pending),并使其仅在源 observable 完成(completed)并且没有待处理的操作(!pending)时才完成。

const asyncOp = () => (source) =>
  new rxjs.Observable(observer => {
    let pending = 0; // the number of in-flight operations
    let completed = false; // whether or not the source observable completed
    
    return source.subscribe({
      next: (x) => {
        pending++;
        
        setTimeout(() => {              
          observer.next(x);
          
          if (!--pending && completed) { // no ops pending and source completed
            observer.complete();
          }
        }, 100);
      },
      error: (e) => observer.error(err),
      complete: () => {
        completed = true;
        
        if (!pending) { // no ops pending
          observer.complete();
        }
      }
    })
  });
  
rxjs.from([1, 2, 3]).pipe(asyncOp()).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js">
</script>

于 2018-07-26T02:40:11.057 回答
0

我创建了这个可运行的 StackBlitz 演示来展示我认为应该做什么。

这里的想法是用于toArray()将源 observable 中的所有值获取到数组中。之后的代码toArray()是单个值(数组)。

注意:有很多方法(操作员)可以解决问题,这只是基于我从这个问题中理解的一个例子——这对 RxJS Observables 来说既是好事也是坏事。希望这可以帮助。:-)

主要演示代码是:

// --- for each value, do the async service
of(...[1, 2, 3]).pipe(
  // let each value be processed by both async service...
  concatMap(no => myAsyncService$(no)),
  concatMap(no => myAsyncService2$(no)),

  // --- toArray() combines all the values (i.e. they completed)
  toArray(),

  // --- this will only be called once - with all completed values
  // --- testing: try commenting the toArray() to see the values as individual "next" value
  tap(val => {
    // see the combined values
    console.log(val)
  })
).subscribe();
于 2018-07-30T05:22:55.630 回答