1

我试图掌握创建 SyncOnSubscribe 的窍门,但不太确定如果在generateState状态初始化期间该方法失败该怎么办。

return Observable.create(new SyncOnSubscribe<MyState, String>() {
    @Override
    protected MyState generateState() {
        return new MyState();   // <---- what if this fails?
    }

    @Override
    protected MyState next(MyState state, Observer<? super String> observer) {
        // do something with state
    }
});

我可以想到几种可以临时处理的方法:

  1. 如果抛出运行时异常,库是否会自动调用o.onError?(见编辑)。
  2. 我可以包装MyState另一个存储错误的变量,我可以o.onError在第一次调用时自行next调用。

我只是好奇是否有建议的做法?

谢谢!

编辑:我尝试在generateState方法中抛出一个运行时异常,我认为它指向我将在方法MyState中检查的错误包装。next如果您有更好的建议,请发表评论/回答。

public static Observable<String> getEventsOnSubscribe1() {
    return Observable.create((s) -> {
        throw new UnsupportedOperationException("getEvents3");
    });
};

/** Like 'getEventsOnSubscribe1' but wrap exception and call onError manually.  */
public static Observable<String> getEventsOnSubscribe2() {
    return Observable.create((s) -> {
        try {
            throw new UnsupportedOperationException("getEvents3");
        } catch (Exception ex) {
            s.onError(ex);
        }
    });
};

public static Observable<String> getEventsSyncOnSubscribe() {
    return Observable.create(new SyncOnSubscribe<Channel, String>() {
        @Override
        protected Channel generateState() {
            System.out.println("SyncOnSubscribe.generateState");
            throw new UnsupportedOperationException("Not supported yet.");
        }

        @Override
        protected Channel next(Channel state, Observer<? super String> observer) {
            System.out.println("SyncOnSubscribe.next");
            throw new UnsupportedOperationException("Not supported yet.");
        }

        @Override
        protected void onUnsubscribe(Channel state) {
            System.out.println("SyncOnSubscribe.onUnsubscribe");
            throw new UnsupportedOperationException("Not supported yet.");
        }
    });
}

public static void main(String[] args) throws IOException, TimeoutException {
    getEventsOnSubscribe1()
    //getEventsOnSubscribe2()
    //getEventsOnSyncSubscribe()
            .toBlocking()
            .subscribe(new Subscriber<String>() {
                @Override
                public void onCompleted() {
                    System.out.println("onCompleted");
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("onError: " + e.getLocalizedMessage());
                }

                @Override
                public void onNext(String t) {
                    System.out.println("onNext: " + t);
                }
            });
}

使用main上面的函数getEventsOnSubscribe1getEventsSyncOnSubscribe都调用了订阅者onError,但他们让程序挂起。使用getEventsOnSubscribe2which 包装异常并手动调用s.onError,程序能够退出。

4

0 回答 0