-1

我试图依靠反应式编程与许多订阅者分享 http 调用的结果。同时我希望能够再次执行呼叫(刷新)。

我从一个执行 http 调用的冷 Observable 开始,然后立即完成。

我想包装它以获得一个像这样工作的热可观察对象:每个订阅者在订阅时应该总是收到最后一个事件(如果有的话),并且在取消订阅之前收到所有其他事件。我应该有一种方法(该可观察的外部)来触发刷新,从而触发所有订阅者的新事件。

更详细:

对于由retrofit2. 为了完整起见,这是我的服务接口

@GET("/path")
Observable<MyData> httpCall();

我要求改造服务:

Retrofit retrofit = new Retrofit.Builder()
                    .baseUrl(REST_BASE_URL)
                    .addConverterFactory(GsonConverterFactory.create())
                    .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
                    .build();
MyService service = retrofit.create(MyServiceInterface.class);

从服务中我得到了可观察的:

Observable<MyData> coldObservable = service.httpCall();

这是一个的observable,每次调用它都会执行 http 调用subscribe(),然后立即完成。

我想公开一个热门的observable,我读过我可以这样做:

Observable<MyData>hotObservable = coldObservable.publish()
    .autoConnect();

这样,http 调用首先subscribe()在其上执行,如果我多次订阅,所有这些都将“连接”到同一个coldObservable.

调用完成后,如果我subscribe()再次调用什么都不会发生,甚至没有回调完成。

相反,我希望它接收最后一个事件。

如果用户请求它,我想强制刷新(重复 http 调用)。所有订阅者都应该收到新的结果/错误。

我想象这样的事情:

Observable<MyData> theSmartObservable = helperClass.getObservable();

// at some point later
helperClass.triggerRefresh();

触发的刷新应该在theSmartObservable.

我如何构建这样一个可观察的?

我希望我解释了自己,如果没有,请在评论中说明。

4

1 回答 1

0

子类化 Observable 很棘手。将主题作为“桥梁”使用构图应该更容易。就像是

public class HelperClass {

    private Subscription bridgeSubscription;
    private ReplaySubject<MyData> bridgeSubject;
    private MyService service;
    private Subscriber<MyData> mSubscriber = new Subscriber<MyData>() {
                @Override
                public void onCompleted() {
                  // To DO
                }

                @Override
                public void onError(Throwable e) {
                  // TO DO
                }

                @Override
                public void onNext(MyData d) {
                    bridgeSubject.onNext(d);
                }
            };
    public HelperClass(MyService service) {
        this.service = service;
        bridgeSubject = ReplaySubject.create(1);
        bridgeSubscription = this.service.httpCall().publish().autoConnect().subscribe(mSubscriber);
    }

    public Observable<MyData> getObservable() {
        // Might not work too well to hide the observable identity  
        // return bridgeSubject.asObservable();
        return bridgeSubject;
    }

    public void triggerRefresh() {
        if (bridgeSubscription != null) {
            bridgeSubscription.unsubscribe();
            bridgeSubscription = null;
        }
        bridgeSubscription = service.httpCall().publish().autoConnect().subscribe(mSubscriber);
    }
}
于 2016-04-06T16:34:44.923 回答