我有以下助手 rxjs 运算符:
import { share } from 'rxjs/operators';
export const shareResetOnError = <T>() => share<T>({
resetOnError: true,
resetOnComplete: false
});
我对此运算符也有以下规范:
import { Observable } from 'rxjs';
import { shareResetOnError } from './rxjs';
fdescribe('shareResetOnError', () => {
it('should share last emitted value', async () => {
const expectedValue = 123;
let count = 0;
const observable = new Observable(subscriber => {
count++;
subscriber.next(-expectedValue);
subscriber.next(expectedValue);
subscriber.complete();
}).pipe(shareResetOnError());
for (let i = 0; i < 3; i++) {
await expectAsync(observable.toPromise()).toBeResolvedTo(expectedValue);
}
expect(count).toBe(1);
});
it('should reset value on error', async () => {
const expectedError = new Error('test');
const expectedValue = 123;
let expectError = true;
let errorsCount = 0;
let valuesCount = 0;
const observable = new Observable(subscriber => {
if (expectError) {
errorsCount++;
subscriber.error(expectedError);
} else {
valuesCount++;
subscriber.next(expectedValue);
}
subscriber.complete();
}).pipe(shareResetOnError());
for (let i = 0; i < 4; i++) {
await expectAsync(observable.toPromise()).toBeRejectedWithError(expectedError.message);
}
expect(errorsCount).toBe(4);
expectError = false;
for (let i = 0; i < 3; i++) {
await expectAsync(observable.toPromise()).toBeResolvedTo(expectedValue);
}
expect(valuesCount).toBe(1);
});
});
由于某种原因expectAsync(observable.toPromise()).toBeResolvedTo(expectedValue)
失败,因为observable
被解析为 undefined 而不是expectedValue
. 我也尝试过lastValueFrom
,toPromise
但没有区别。在从 rxjs 6 切换到 7 之前,我有以下定义shareResetOnError
:
import { AsyncSubject, ConnectableObservable, Observable, pipe, Subscription } from 'rxjs';
import { refCount } from 'rxjs/operators';
function publishLastResetOnError<T>() {
return (source: Observable<T>) => {
let subject: AsyncSubject<T>;
let subscription: Subscription;
resetSubject();
return new ConnectableObservable(source, () => subject);
function resetSubject() {
subscription?.unsubscribe();
subject = new AsyncSubject<T>();
subscription = subject.subscribe({
error: resetSubject
});
}
};
}
export const shareResetOnError = <T>() => pipe(publishLastResetOnError<T>(), refCount());
它按预期工作,规范没有失败。为什么observable.toPromise()
使用 rxjs 7 运算符无法解析为预期值?