12

当RxJS Observable的最后一次订阅被释放时,执行副作用的最干净的方法是什么?这可能发生在 Observable 终止之前

假设我需要一个函数返回一个Observable向资源发出更改的函数。我想在处理完所有订阅后执行清理操作。

var observable = streamResourceChanges(resource);
var subscription1 = observable.subscribe(observer1);
var subscription2 = observable.subscribe(observer2);
// ...
subscription1.dispose();  // Does not perform the cleanup
subscription2.dispose();  // Performs the cleanup

我发现定义订阅处置操作的唯一方法是使用Rx.Observable.create. 最后的处置可以通过共享订阅来处理,例如与Observable.prototype.singleInstance().

例如:

function streamResourceChanges(resource) {
    return Rx.Observable.create(function(observer) {
        // Subscribe the observer for resource changes...
        // Return a cleanup function
        return function() {
            // Perform cleanup here...
            console.log("Cleanup performed!");
        };
    }).singleInstance();
}

是否有一种更简洁的方法来定义订阅处置的副作用,doOnNext类似于doOnCompleteddoOnError

var withCleanup = withoutCleanup.doOnDispose(function() {
    // Perform cleanup here...
});
4

1 回答 1

13

根据您的实际用例,您会想到两种选择:

。最后()

source.finally(() => console.log("cleaning up")).singleInstance()

。使用()

Rx.Observable
    .using(
        // allocate some disposable resource during subscribe.
        // resource.dispose() will be called during unsubscribe.
        () => new SomeResource(),

        // use the disposable resource to create your observable
        // for example...
        resource => Rx.Observable.interval(resource.time))
    .singleInstance();
于 2015-10-26T20:41:09.500 回答