4

我是 RxJava 的新手,我正在努力弄清楚如何正确关闭资源,尤其是在处理多个订阅者时。

我有一些资源Observable<T>在哪里(例如,一个 Android 数据库.TCloseableCursor

我可能在 observable 上有多个订阅者。我想close()在每个订阅者处理完资源后获取资源。换句话说,在新资源交付/发出后关闭旧资源,并在最后一个订阅者取消订阅时最终关闭最后一个。

我尝试使用我调用的自定义运算符使其工作AutoCloseOperator,它几乎可以工作,但不太正确。即我仍然处于竞争状态和/或泄漏,例如资源没有被关闭。

在 RxJava 中执行此操作的正确方法是什么?

说我有这个代码:

final AutoCloseOperator<MyResource> autoClose = new AutoCloseOperator<MyResource>();
Subject<MyResource, MyResource> subject = PublishSubject.create();
Observable<MyResource> o = subject.lift(autoClose);

Subscription s1 = o.subscribe(new Action1<MyResource>() {
    public void call(MyResource myObj) {
        System.out.println("s1 handling " + myObj);
    }
});

subject.onNext(new MyResource(1));
subject.onNext(new MyResource(2)); // This should close Resource #1 after Resource #2 is delivered

Subscription s2 = o.subscribe(new Action1<MyResource>() {
    public void call(MyResource myObj) {
        System.out.println("s2 handling " + myObj);
    }
});

subject.onNext(new MyResource(3));
subject.onNext(new MyResource(4));

s1.unsubscribe();

subject.onNext(new MyResource(5));
subject.onNext(new MyResource(6));

s2.unsubscribe();

subject.onNext(new MyResource(7));
subject.onNext(new MyResource(8));

然后我期望以下行为:

s1 handling Resource #1
s1 handling Resource #2
Closing Resource #1
s1 handling Resource #3
Closing Resource #2
s2 handling Resource #3
s1 handling Resource #4
s2 handling Resource #4
Closing Resource #3
s2 handling Resource #5
Closing Resource #4
s2 handling Resource #6
Closing Resource #5
Closing Resource #6
Closing Resource #7
Closing Resource #8

注意:我没有PublishSubject在我的真实代码中使用它,我只是在这里用于说明目的,我使用它在每次更新数据库表时Observable.create发出...Cursor

概括问题:我可以只使用doOnNextdoOnUnsubscribe关闭旧项目,但这并没有考虑到这些事件会发生多次(对于每个订阅者),我只想在所有订阅者都有时关闭资源收到新商品。

是自定义运营商使用lift()的方式,还是有一些现有的运营商可能会对此有所帮助?

我已将我的问题简化为GitHub 上的一个小型命令行应用程序。感谢您的关注!

4

1 回答 1

6

Observable.using()是你需要的。

如果你t的类型T有一个.close()方法并且你想从t(你的光标)中提取一些东西,Observable<R>那么这里是如何做到的:

Func0<T> resourceFactory = () -> t;
Func1<T, Observable<R>> observableFactory = x -> ...
Action1<T> disposeAction = x -> x.close();

Observable<R> results = Observable.using(resourceFactory, observableFactory, disposeAction);

你提到你有Observable<T>。要从所有 T 中获取所有 R,请使用上面的代码,如下所示:

Observable<T> source = ...
Observable<R> results = 
    source.flatMap(t -> {
        Func0<T> resourceFactory = () -> t;
        Func1<T, Observable<R>> observableFactory = x -> ...
        Action1<T> disposeAction = x -> x.close();
        return Observable.using(resourceFactory, observableFactory, disposeAction);});
于 2015-08-11T05:21:12.997 回答