4

我正在实现一个角度服务,让消费者根据他们的 id 观察各种值:

它的本质是这样的:

private subjects = new Map<number, Subject<any>>();

public subscribe(id: number, observer: any): Subscription {
  // try getting subject for this id (or undefined if it does not yet exist)
  let subj = this.subjects.get(id);

  // create subject if it does not yet exist
  if (!subj) {
    subj = new Subject<any>();
    this.subjects.set(id, subj);
  }

  // subscribe observer
  const subscription = subj.subscribe(observer);

  // set up teardown logic (gets called when subscription is unsubscribed)
  subscription.add(() => { 
    // remove subject from the map, if this was the last subscription
    if (subj.observers.length == 0) {
      this.subjects.delete(id);
    }
  });

  // return subscription
  return subscription;
}

这是完整的 stackblitz 示例

以上工作正常,但 API 使用起来有点麻烦(在消费者中,我需要手动跟踪所有订阅并确保正确取消订阅)。

我希望有一个返回Observable这样的方法:

public subscribe(id: number): Observable<any> {
  // TODO: Return an observable for this id and make sure that
  // its corresponding subject is in the map iff at least one of the observables
  // for this id has at least one subscription.
  
  return ...;
}

因为这将允许我使用管道直接从组件模板订阅我需要的值async,其中 angular 将负责取消订阅观察者。

但是我不太清楚如何实现逻辑以从不再使用它们时删除未使用Subject的 s 。Map有什么好的方法吗?

这是一个带有一些测试用例的不完整的 stackblitz 示例

4

1 回答 1

3

我认为你可以尝试这样的事情:

function subscribe(id: number): Observable<any> {

  /* ... */

  return sbj
    .pipe(
      finalize(() => {
        if (subj.observers.length == 0) {
          this.subjects.delete(id);
        }
      })
    );
}

有了这个,您还可以使用带有返回的异步管道(作为 的结果调用)。确保观察者(例如来自模板)将被添加到“AnonymousSubject Subject”列表中。AnonymousSubjectSubject.liftSubject.pipe()AnonymousSubject's parent

finalize()当源(例如Subject)被取消订阅时调用。这可能在组件被销毁时发生,或者在complete/error事件发生时发生,这也包括Subject完成时的情况。当 aSubject完成时,它将向其所有订阅者发送完成通知,这意味着观察者最终将自动从Subject的观察者列表中删除。

编辑

app.component.ts

  show1 = true;
  show12 = true;
  show2 = true;

  v1$: Observable<any>;
  v12$: Observable<any>;
  v2$: Observable<any>;

  constructor(public valueService: ValueService) {
  }

  async ngOnInit() {
    await this.sleep(2000);
    // const s11 = this.valueService.subscribe(1, v => this.v1 = v);
    this.v1$ = this.valueService.subscribe(1);
    await this.sleep(2000);
    // const s21 = this.valueService.subscribe(2, v => this.v2 = v);
    this.v2$ = this.valueService.subscribe(2);
    await this.sleep(2000);
    // const s12 = this.valueService.subscribe(1, () => {});
    this.v12$ = this.valueService.subscribe(1);
    await this.sleep(2000);
    // s12.unsubscribe();
    this.show12 = false
    await this.sleep(2000);
    // s11.unsubscribe();
    this.show1 = false;
    await this.sleep(2000);
    // s21.unsubscribe();
    this.show2 = false
  }

app.component.html

<div *ngIf="show1">
  v1: {{ v1$ | async }}
</div>

<div *ngIf="show12">
  v12: {{ v12$ | async }}
</div>

<div *ngIf="show2">
  v2: {{ v2$ | async }}
</div>

价值.service.ts

public subscribe(id: number): Observable<any> {
  let subj = this.subjects.get(id);

  if (!subj) {
    subj = new Subject<any>();
    this.subjects.set(id, subj);
  }

  return subj.pipe(
    finalize(() => {
      if (subj.observers.length === 1) {
        this.subjects.delete(id);
      }
    })
  )
}

堆栈闪电战

正如@ggradnig 提到的,检查应该是subj.observers.length === 1,因为finalize()至少在 RxJs 中6.5.x,在发生任何其他取消订阅之前运行它的回调。

于 2020-06-25T16:38:12.937 回答