0

我通过以下方式使用 rxjava 进行维护任务:

在需要定期维护的类中,我使用以下静态订阅,这会导致 Observable 在类加载到内存时首次启动,然后按指定的定期间隔启动。

private static Subscription subscription = Observable.timer(0, 5, TimeUnit.SECONDS)
        .flatMap(new Func1<Long, Observable<String>>() {
            @Override public Observable<String> call(Long aLong) {

                // some code

                return Observable.just(null);
            }
        }).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread())
        .subscribe();

现在我有一种情况,我想向 UI 报告我的维护结果。

通常,我会使用以下架构

    Observable.create(new Observable.OnSubscribe<String>() {

        @Override public void call(Subscriber<? super String> subscriber) {

            // some code

            subscriber.onNext(result);
            subscriber.onCompleted();            }

    }).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<String>() {

                @Override public void call(String result) {

                    // write to the UI

                }

            });

但是这里我们有一个只执行一次的 Observable。

对于定期执行的 Observable,我找不到在 Subscriber 中调用 Action 的方法,因此我可以使用subscriber.onNext() 传递结果。看起来 Observable 没有合适的签名,它可以从 timer() 中花费很长时间,同时允许订阅操作。但是知道 rxjava 我确信有一个技巧 ;-)

我可以使用 zip 压缩 Timer Observable 和一次性 Observable 来完成这项工作(基本上将两个版本压缩在一起),但我宁愿使用第一个结构,因为它的行为略有不同。

--

我尝试通过以下方式将两个版本合并为一个:

private static Subscription subscription = Observable.timer(0, 5, TimeUnit.SECONDS)
        .flatMap(new Func1<Long, Observable<String>>() {
            @Override public Observable<String> call(Long aLong) {

                // some code // stays here to ensure there is no concurrency while executing

                final String result = "result"; // I store the result in a final variable after some code has been finished

                return Observable.create(new Observable.OnSubscribe<String>() {
                    @Override public void call(Subscriber<? super String> subscriber) {
                        subscriber.onNext(result); // then I use it in a new Observable and emit it
                        subscriber.onCompleted(); // not sure if this is needed here (haven't tested this yet)
                    }
                });

            }
        }).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread())
        .subscribe(new Action1<String>() {
            @Override public void call(String result) {
                // so I can finally consume the result on the UI thread
            }
        });

我没有创建和发出“null” Observable,而是创建了一个允许我向订阅者发送结果的 Observable。

相当混乱,但这应该工作,对吧?有更简单的解决方案吗?你觉得呢?你有没有什么想法?

4

2 回答 2

1

我不理解你的问题。

你想从你的 Observable Timer 和你的其他 Observable 使用相同的订阅?

也许您可以使用 Subject 作为 Observable 之间的桥梁。

Subject<?, ?> bridge = PublishSubject.create();
Observable.timer(5, SECONDS).flatMap(/* whatever*/).mergeWith(bridge).subscribe(/* toDo */);
Observable.create(/* subscription */).subscribe(bridge);

您的“One time Observable”发出的每个项目都将被推入 brige,这会将其推入“timer” observable。

那是你要找的吗?

于 2014-12-11T10:59:10.097 回答
0

我知道为这个问题添加答案已经有一段时间了。但是,它可能会帮助其他人。据我了解,您想要创建一个可观察对象并例如每 3 秒运行一次。所以请在下面找到我的示例(您可以发出像 Observable.just 这样的现成运算符,甚至可以使用 Observable.create 运算符创建自己的可观察对象):

Observable.interval(3,TimeUnit.SECONDS).flatMap(new Function<Long, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(Long aLong) throws Exception {

            return Observable.just("Hello " + Thread.currentThread().getName()+aLong);
           /*
            return Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    try{
                        String data = fetchData("http://www.yahoo.com");
                        emitter.onNext(data.length()+"");
                        emitter.onComplete();
                    }catch (Exception e){
                        emitter.onError(e);
                    }
                }
            });
            */
        }
    }).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.d("RxResult","--  " + Thread.currentThread().getName() + " -- " +s);
        }
    });

public String fetchData(String url) throws IOException {
    BufferedReader in = new BufferedReader(new InputStreamReader(new URL(url).openStream()));
    String inputLine;
    String result = "";

    while((inputLine = in.readLine()) != null){
        result += inputLine;
    }

    return result.length()+"";
}
于 2018-05-15T13:44:02.277 回答