如果您想随着时间的推移向数组添加/删除可观察对象,这意味着您需要一个状态(因为添加/删除)。如果你想坚持使用 rxjs,我知道的唯一无副作用的方法是使用扫描运算符。以下代码显示了一个实现:
界面
interface MappableObservable <T>{
key: number,
observable: Observable<T>
}
功能
// Function that adds to an array of MappableObservables
const addToState = (update: MappableObservable<string>) => (state: MappableObservable<string>[]): MappableObservable<string>[] =>
[...state, update];
// Function that deletes all items with given key from an array of MappableObservables
const deleteFromState = (key: number) => (state: MappableObservable<string>[]): MappableObservable<string>[] =>
state.filter(item => item.key !== key);
// Function that executes the above add or delete function inside a scan
const scanFn = (state: MappableObservable<string>[], fn: (state: MappableObservable<string>[]) => MappableObservable<string>[]) =>
fn(state)
执行
const DEFAULT_MAPPABLE_OBSERVABLE_STATE: MappableObservable<string>[] = [];
const add$: Subject<MappableObservable<string>> = new Subject();
const delete$: Subject<number> = new Subject();
const source$: Observable<string[]> = merge(
add$.pipe(map(addToState)),
delete$.pipe(map(deleteFromState))
).pipe(
scan(scanFn, DEFAULT_MAPPABLE_OBSERVABLE_STATE),
map(state => state.map(mappableObservable => mappableObservable.observable)),
switchMap(observables => combineLatest(observables))
)
向这个源添加或删除一些 observable$
add$.next({key: 1, observable: of('uno')}) // Output: 'uno'
add$.next({key: 2, observable: of('duo')}) // Output: 'uno', 'duo'
add$.next({key: 3, observable: of('tri')}) // Output: 'uno', 'duo', 'tri'
add$.next({key: 2, observable: of('duo-duo')}) // Output: 'uno', 'duo', 'tri' 'duo-duo'
delete$.next(2); // Output: 'uno', 'tri'
仅供参考:只有当您重放它们或使用 of(...) 时,才会直接观察到,就像我在这个例子中所做的那样。如果您有非重放或即时可观察对象,它们只会在它们全部新完成时发出
运行堆栈闪电战