1

我有以下助手 rxjs 运算符:

import { share } from 'rxjs/operators';

export const shareResetOnError = <T>() => share<T>({
  resetOnError: true,
  resetOnComplete: false
});

我对此运算符也有以下规范:

import { Observable } from 'rxjs';
import { shareResetOnError } from './rxjs';

fdescribe('shareResetOnError', () => {
  it('should share last emitted value', async () => {
    const expectedValue = 123;

    let count = 0;
    const observable = new Observable(subscriber => {
      count++;
      subscriber.next(-expectedValue);
      subscriber.next(expectedValue);
      subscriber.complete();
    }).pipe(shareResetOnError());

    for (let i = 0; i < 3; i++) {
      await expectAsync(observable.toPromise()).toBeResolvedTo(expectedValue);
    }
    expect(count).toBe(1);
  });

  it('should reset value on error', async () => {
    const expectedError = new Error('test');
    const expectedValue = 123;

    let expectError = true;

    let errorsCount = 0;
    let valuesCount = 0;

    const observable = new Observable(subscriber => {
      if (expectError) {
        errorsCount++;
        subscriber.error(expectedError);
      } else {
        valuesCount++;
        subscriber.next(expectedValue);
      }
      subscriber.complete();
    }).pipe(shareResetOnError());

    for (let i = 0; i < 4; i++) {
      await expectAsync(observable.toPromise()).toBeRejectedWithError(expectedError.message);
    }
    expect(errorsCount).toBe(4);

    expectError = false;
    for (let i = 0; i < 3; i++) {
      await expectAsync(observable.toPromise()).toBeResolvedTo(expectedValue);
    }
    expect(valuesCount).toBe(1);
  });
});

由于某种原因expectAsync(observable.toPromise()).toBeResolvedTo(expectedValue)失败,因为observable被解析为 undefined 而不是expectedValue. 我也尝试过lastValueFromtoPromise但没有区别。在从 rxjs 6 切换到 7 之前,我有以下定义shareResetOnError

import { AsyncSubject, ConnectableObservable, Observable, pipe, Subscription } from 'rxjs';
import { refCount } from 'rxjs/operators';

function publishLastResetOnError<T>() {
  return (source: Observable<T>) => {
    let subject: AsyncSubject<T>;
    let subscription: Subscription;
    resetSubject();
    return new ConnectableObservable(source, () => subject);

    function resetSubject() {
      subscription?.unsubscribe();
      subject = new AsyncSubject<T>();
      subscription = subject.subscribe({
        error: resetSubject
      });
    }
  };
}

export const shareResetOnError = <T>() => pipe(publishLastResetOnError<T>(), refCount());

它按预期工作,规范没有失败。为什么observable.toPromise()使用 rxjs 7 运算符无法解析为预期值?

4

0 回答 0