4

我想构建一个包装类,它在 Observable 的每个发射值之前和之后做一些事情。

这是我想出的:

class Wrapper<T> {
    wrapped$: Observable<T>;

    _dataSubject = new Subject<T>();
    data$ = this._dataSubject.pipe(
        tap(_ => console.log("BEFORE"),
        //
        // map( ??? )
        //
    );

    constructor(wrapped$: Observable<T>) {
        this.wrapped$ = wrapped$.pipe(
            tap(_ => console.log("AFTER")
        );
    }
}

let subject = new Subject<string>();
let wrapper = new Wrapper(subject);
wrapper.data$.subscribe(val => console.log(val));
subject.next("foo")

控制台输出应该是:

BEFORE
foo
AFTER

我不知道如何连接$wrappedObservable 与_dataSubject.

但也许我完全错了,它需要一种不同的方法。

4

4 回答 4

11

这样的事情怎么办

import {Observable} from 'rxjs';

export class DoBeforeAfter<T> {
    wrapped$: Observable<T>;


    constructor(wrapped$: Observable<T>, doFunction: (data: any) => void) {
        this.wrapped$ = Observable.of(null)
            .do(_ => console.log("BEFORE"))
            .switchMap(_ => wrapped$)
            .do(doFunction)
            .do(_ => console.log('AFTER'));
    }

}

像这样被消费

const source = Observable.of('NOW');
const beforeAfter = new DoBeforeAfter(source, data => console.log(data));

beforeAfter.wrapped$.subscribe(
        null,
        error => console.error(error),
        () => console.log('DONE')

)

它看起来有点麻烦,但也许它可以帮助

于 2018-03-22T09:06:09.817 回答
3

因此,据我了解,您可以执行以下操作:

class Wrapper<T> {

    _dataSubject: Subject<T>;
    wrapped$: Observable<T>;

    constructor(wrapped$: Subject<T>) {
        this._dataSubject = wrapped$;
        this.wrapped$ = this._dataSubject.pipe(
          tap(_ => console.log("BEFORE")),
          tap(data => console.log(data)),
          tap(_ => console.log("AFTER"))
        );
    }
}

接着:

let subject = new Subject<string>();
let wrapper = new Wrapper(subject);
wrapper.wrapped$.subscribe();
subject.next("foo")
于 2018-03-22T09:20:30.397 回答
3

最好的方法(虽然很复杂)是创建一个新的运算符,类似于tap但在发出值之前和之后做一些事情。

您可以在示例中看到它的工作原理(它在 ES6 中,因为 SO 代码片段不接受 TypeScript,但您会明白的)

function wrap(before, after) {
    return function wrapOperatorFunction(source) {
        return source.lift(new WrapOperator(before, after));
    };
}
class WrapOperator {
    constructor(before, after) {
        this.before = before;
        this.after = after;
    }
    call(subscriber, source) {
        return source.subscribe(new WrapSubscriber(subscriber, this.before, this.after));
    }
}

class WrapSubscriber extends Rx.Subscriber {
    constructor(destination, before, after) {
        super(destination);
        this.before = before;
        this.after = after;
    }
    _next(value) {
        this.before ? this.before(value) : null;
        this.destination.next(value);
        this.after ? this.after(value) : null;
    }
}

// Now:

const observable = Rx.Observable.from([1, 2, 3, 4]);

observable.pipe(
   wrap(value => console.log('before', value), value => console.log('after', value))
).subscribe(value => console.log('value emitted', value), null, () => console.log('complete'));


// For what you want:
// let's simulate that, for each value in the array, we'll fetch something from an external service:
// we want the before to be executed when we make the request, and the after to be executed when it finishes. In this // case, we just combine two wrap operators and flatMap, for example:

observable.pipe(
  wrap(value => console.log('BEFORE REQUEST', value)),
  Rx.operators.flatMap(value => {
    const subject = new Rx.Subject();
    setTimeout(() => { subject.next(value); subject.complete(); }, 5000);
    return subject;
  }),
  wrap(undefined, value => console.log('AFTER REQUEST', value))
).subscribe(value => console.log('value emitted', value), null, () => console.log('complete'));
<script src="https://unpkg.com/@reactivex/rxjs@5.5.0/dist/global/Rx.js"></script>

如前所述,可能有点复杂,但它与 RxJS 运算符无缝集成,它始终是了解如何创建我们自己的运算符的好例子 :-)

对于您在评论中所说的内容,您可以查看最后一个示例。在那里,我结合了两个wrap运算符。第一个仅使用before回调,因此它仅在发出值之前执行某些操作。如您所见,因为源 observable 来自一个数组,所以四个before回调会立即执行。然后,我们申请flatMap. 为此,我们应用了一个新的wrap,但这次只使用了after回调。因此,此回调仅在flatMapyield 它们的值返回后调用。

当然,如果取而代之的是一个来自数组的可观察对象,那么您将拥有一个由事件侦听器制作的对象,您将拥有:

  1. before回调在触发的事件被 observable 推送之前执行。
  2. 异步 observable 执行。
  3. 异步 observable 在时间 t 后产生值。
  4. after回调被执行。

这是让运营商获得回报的地方,因为它们很容易组合。希望这适合你。

于 2018-03-22T09:19:02.217 回答
0

感谢您的输入!

我现在想出了自己的解决方案。AFTER和输出的DATA顺序错误,但我发现这并不重要,只要它们同时出现

代码仍然需要一些重构,但至少现在可以工作!

class Wrapper {
    _taskDoneSubject = new Subject<boolean>();
    taskDone$ = this._taskDoneSubject.asObservable();

    wrappedSwitchMap<T, R>(wrapped: (val: T) => Observable<R>): OperatorFunction<T, R> {
        return switchMap<T, R>(val => { // can also be done using mergeMap
            this._taskDoneSubject.next(false);
            return wrapped(val).pipe(
                tap(_ => this._taskDoneSubject.next(true))
            );
        });
    }
}

用法:

test(valueEmittingObservable: Observable<string>) {

    let wrapper = new Wrapper();
    wrapper.taskDone$.subscribe(val => console.log("task done? ", val);

    valueEmittingObservable.pipe(
        // wrapper.wrappedSwitchMap(val => anObservableThatTakesAWhile(val))
        wrapper.wrappedSwitchMap(val => of("foo").pipe(delay(5000)) // simulated
    ).subscribe(val => console.log("emitted value: ", val);

}

输出:

task done? false
(... 5 seconds delay ...)
task done? true
emitted value: foo

或者如果它的发射速度超过 5 秒:

task done? false
(... 1 second until next emit ...)
task done? false
(... 5 seconds delay ...)
task done? true
emitted value: foo
于 2018-03-22T12:18:10.357 回答