我通过以下方式使用 rxjava 进行维护任务:
在需要定期维护的类中,我使用以下静态订阅,这会导致 Observable 在类加载到内存时首次启动,然后按指定的定期间隔启动。
private static Subscription subscription = Observable.timer(0, 5, TimeUnit.SECONDS)
.flatMap(new Func1<Long, Observable<String>>() {
@Override public Observable<String> call(Long aLong) {
// some code
return Observable.just(null);
}
}).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread())
.subscribe();
现在我有一种情况,我想向 UI 报告我的维护结果。
通常,我会使用以下架构
Observable.create(new Observable.OnSubscribe<String>() {
@Override public void call(Subscriber<? super String> subscriber) {
// some code
subscriber.onNext(result);
subscriber.onCompleted(); }
}).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
@Override public void call(String result) {
// write to the UI
}
});
但是这里我们有一个只执行一次的 Observable。
对于定期执行的 Observable,我找不到在 Subscriber 中调用 Action 的方法,因此我可以使用subscriber.onNext() 传递结果。看起来 Observable 没有合适的签名,它可以从 timer() 中花费很长时间,同时允许订阅操作。但是知道 rxjava 我确信有一个技巧 ;-)
我可以使用 zip 压缩 Timer Observable 和一次性 Observable 来完成这项工作(基本上将两个版本压缩在一起),但我宁愿使用第一个结构,因为它的行为略有不同。
--
我尝试通过以下方式将两个版本合并为一个:
private static Subscription subscription = Observable.timer(0, 5, TimeUnit.SECONDS)
.flatMap(new Func1<Long, Observable<String>>() {
@Override public Observable<String> call(Long aLong) {
// some code // stays here to ensure there is no concurrency while executing
final String result = "result"; // I store the result in a final variable after some code has been finished
return Observable.create(new Observable.OnSubscribe<String>() {
@Override public void call(Subscriber<? super String> subscriber) {
subscriber.onNext(result); // then I use it in a new Observable and emit it
subscriber.onCompleted(); // not sure if this is needed here (haven't tested this yet)
}
});
}
}).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread())
.subscribe(new Action1<String>() {
@Override public void call(String result) {
// so I can finally consume the result on the UI thread
}
});
我没有创建和发出“null” Observable,而是创建了一个允许我向订阅者发送结果的 Observable。
相当混乱,但这应该工作,对吧?有更简单的解决方案吗?你觉得呢?你有没有什么想法?