0

该方法从远程端startPositionInfoPolling每 2 秒执行一次轮询PositionInfo并更新 UI。

private Subscription mPollingTimerSubscription;
private Observable<Long> mPollingTimerObservable = Observable.timer(0, 2, TimeUnit.SECONDS);

private void startPositionInfoPolling() {
    LOGGER.trace("...");
    mPollingTimerSubscription = mPollingTimerObservable
            .flatMap(new Func1<Long, Observable<PositionInfo>>() {
                @Override
                public Observable<PositionInfo> call(Long ticker) {
                    LOGGER.debug("XXX ticker = {}", ticker);
                    return mMediaRendererClient.createPositionInfoObservable();
                }
            })
            .subscribeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<PositionInfo>() {
                @Override
                public void call(final PositionInfo positionInfo) {
                    LOGGER.debug("XXX positionInfo = {}", positionInfo);
                    mMusicMediaTrackDetailsFragment.updateView(updatedPositionInfo);
                }
            });
}

该方法stopPositionInfoPolling停止 UI 更新。

private void stopPositionInfoPolling() {
    LOGGER.trace("...");
    mPollingTimerSubscription.unsubscribe();
}

我想更改代码,使其不定期(例如每 20 秒)获取远程数据,并定期(例如每 1 秒)使用外推值更新 UI。

我使用 RxJava 的第一个解决方案如下所示:

private Subscription mPollingTimerSubscription, mUpdatingTimerSubscription;
private Observable<Long> mPollingTimerObservable = Observable.timer(0, 20, TimeUnit.SECONDS);
private Observable<Long> mUpdatingTimerObservable = Observable.timer(0, 1, TimeUnit.SECONDS);

private void startPositionInfoPolling() {
    LOGGER.trace("...");
    mPollingTimerSubscription = mPollingTimerObservable
            .subscribe(new Action1<Long>() {
                @Override
                public void call(Long ticker) {
                    LOGGER.debug("XXX ticker = {}", ticker);
                    mMediaRendererClient.createPositionInfoObservable()
                            .retry(2)
                            .subscribe(new Action1<PositionInfo>() {
                                @Override
                                public void call(final PositionInfo positionInfo) {
                                    LOGGER.debug("XXX positionInfo = {}", positionInfo);
                                    mUpdatingTimerSubscription = mUpdatingTimerObservable
                                            .take(20, TimeUnit.SECONDS)
                                            .subscribeOn(AndroidSchedulers.mainThread())
                                            .subscribe(new Action1<Long>() {
                                                @Override
                                                public void call(Long ticker) {
                                                    LOGGER.debug("XXX ticker = {}", ticker);
                                                    String updatedRelTime = ModelUtil.toTimeString(ModelUtil.fromTimeString(positionInfo.getRelTime()) + ticker);
                                                    PositionInfo updatedPositionInfo = new PositionInfo(positionInfo, updatedRelTime, positionInfo.getAbsTime());
                                                    LOGGER.debug("XXX positionInfo = {}", updatedPositionInfo);
                                                    mMusicMediaTrackDetailsFragment.updateView(updatedPositionInfo);
                                                }
                                            });
                                }
                            });
                }
            });
}

private void stopPositionInfoPolling() {
    LOGGER.trace("...");
    mPollingTimerSubscription.unsubscribe();
    mUpdatingTimerSubscription.unsubscribe();
}

有人可以帮我把这段代码转换成看起来不像回调的东西吗?!我觉得flatMap是关键,但我的头脑仍然没有反应性地思考;-)

还有一个问题是mPollingTimerSubscription.unsubscribe();没有取消订阅/取消/停止mUpdatingTimerObservable,因此正在维护另一个订阅。

提前感谢您的任何评论。

更新:感谢@hello_world 降低了原始复杂性;-)

private Subscription mPollingTimerSubscription, mUpdatingTimerSubscription;
private Observable<Long> mPollingTimerObservable = Observable.timer(0, 20, TimeUnit.SECONDS);
private Observable<Long> mUpdatingTimerObservable = Observable.timer(0, 1, TimeUnit.SECONDS);

private void startPositionInfoPolling() {
    LOGGER.trace("...");
    mPollingTimerSubscription = mPollingTimerObservable
            .flatMap(new Func1<Long, Observable<PositionInfo>>() {
                @Override
                public Observable<PositionInfo> call(Long ticker) {
                    LOGGER.debug("XXX ticker = {}", ticker);
                    return mMediaRendererClient.createPositionInfoObservable();
                }
            })
            .retry(2)
            .subscribe(new Action1<PositionInfo>() {
                @Override
                public void call(final PositionInfo positionInfo) {
                    LOGGER.debug("XXX positionInfo = {}", positionInfo);
                    mUpdatingTimerSubscription = mUpdatingTimerObservable
                            .take(20, TimeUnit.SECONDS)
                            .subscribeOn(AndroidSchedulers.mainThread())
                            .subscribe(new Action1<Long>() {
                                @Override
                                public void call(Long ticker) {
                                    LOGGER.debug("XXX ticker = {}", ticker);
                                    String updatedRelTime = ModelUtil.toTimeString(ModelUtil.fromTimeString(positionInfo.getRelTime()) + ticker);
                                    PositionInfo updatedPositionInfo = new PositionInfo(positionInfo, updatedRelTime, positionInfo.getAbsTime());
                                    LOGGER.debug("XXX positionInfo = {}", updatedPositionInfo);
                                    mMusicMediaTrackDetailsFragment.updateView(updatedPositionInfo);
                                }
                            });
                }
            });
}

private void stopPositionInfoPolling() {
    LOGGER.trace("...");
    mPollingTimerSubscription.unsubscribe();
    mUpdatingTimerSubscription.unsubscribe();
}
4

2 回答 2

1

我无法发表评论,所以我将发布答案。

您为什么不尝试使用计时器运算符?

Observable
    .timer(delay, interval, TimeUnit.SECONDS)
    .subscribe(/*  ...  */);

此外,您还可以使用其他一些计时器变体。

也许这些会有所帮助-

试试这个代码:

mPollingTimerObservable
    .flatMap(new Func1<Long, Observable<PositionInfo>>() {
        @Override
        public Observable<PositionInfo> call(Long Long) {
            LOGGER.debug("XXX ticker = {}", ticker);
            return mMediaRendererClient.createPositionInfoObservable();
        }
    })
    .retry(2)
    .subscribe(new Action1<PositionInfo>() {
        @Override
        public void call(final PositionInfo positionInfo) {
            LOGGER.debug("XXX positionInfo = {}", positionInfo);
            mUpdatingTimerSubscription = mUpdatingTimerObservable
                .take(20, TimeUnit.SECONDS)
                .subscribeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long ticker) {
                        LOGGER.debug("XXX ticker = {}", ticker);
                        String updatedRelTime = ModelUtil.toTimeString(ModelUtil.fromTimeString(positionInfo.getRelTime()) + ticker);
                        PositionInfo updatedPositionInfo = new PositionInfo(positionInfo, updatedRelTime, positionInfo.getAbsTime());
                        LOGGER.debug("XXX positionInfo = {}", updatedPositionInfo);
                        mMusicMediaTrackDetailsFragment.updateView(updatedPositionInfo);
                    }
                });
        }
    });

更新

我想了一个办法让它变得简单。我把整个 zip 概念弄错了。:P 我的错!无论如何,这是一个新的“扁平化”代码

mPollingTimerObservable
    .flatMap(new Func1<Long, Observable<PositionInfo>>() {
        @Override
        public Observable<PositionInfo> call(Long Long) {
            LOGGER.debug("XXX ticker = {}", ticker);
            return mMediaRendererClient.createPositionInfoObservable();
        }
    })
    .retry(2)
    .zipWith(mUpdatingTimerObservable, new Func2<PositionInfo, Long, PositionInfo>() {
            @Override
            public PositionInfo call(PositionInfo p, Long l) {
                return p;
            }
        })
    .take(20)
    .subscribeOn(AndroidSchedulers.mainThread())
    .subscribe(/* ... */);

我没有时间对此进行测试,但我认为它应该可以工作。如果没有,请告诉我!

于 2015-07-15T10:53:18.877 回答
0

经过与@hello_world 的一些讨论和一些思考,两个带时间戳的 observable 的组合 observable 似乎可以完成这项工作。确实非常强大的概念,但由于 Java 语法,代码有点难看。

private Subscription mPollingAndUpdatingTimerSubscription;
private Observable<Long> mPollingTimerObservable = Observable.timer(0, 20, TimeUnit.SECONDS);
private Observable<Long> mUpdatingTimerObservable = Observable.timer(0, 1, TimeUnit.SECONDS);

private void startPositionInfoPolling() {
    LOGGER.trace("...");
    Observable<Timestamped<PositionInfo>> timestampedPositionInfoObservable =
            mPollingTimerObservable
                    .flatMap(new Func1<Long, Observable<PositionInfo>>() {
                        @Override
                        public Observable<PositionInfo> call(Long ticker) {
                            LOGGER.debug("XXX ticker = {}", ticker);
                            return mMediaRendererClient.createPositionInfoObservable();
                        }
                    })
                    .retry()
                    .timestamp();
    Observable<Timestamped<Long>> timestampedUpdateObservable = mUpdatingTimerObservable.timestamp();
    Observable<PositionInfo> combinedPositionInfoObservable = Observable.combineLatest(
            timestampedPositionInfoObservable,
            timestampedUpdateObservable,
            new Func2<Timestamped<PositionInfo>, Timestamped<Long>, PositionInfo>() {
                @Override
                public PositionInfo call(Timestamped<PositionInfo> timestampedPositionInfo, Timestamped<Long> timestampedUpdate) {
                    PositionInfo orgPositionInfo = timestampedPositionInfo.getValue();
                    long delayInMillis = timestampedUpdate.getTimestampMillis() - timestampedPositionInfo.getTimestampMillis();
                    LOGGER.debug("XXX delayInMillis = {}", delayInMillis);
                    String updatedRelTime = DurationUtil.format(DurationUtil.parse(orgPositionInfo.getRelTime()).getTime() + delayInMillis);
                    return new PositionInfo(orgPositionInfo, updatedRelTime, orgPositionInfo.getAbsTime());
                }
            });
    mPollingAndUpdatingTimerSubscription =
            combinedPositionInfoObservable
                    .subscribeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Action1<PositionInfo>() {
                        @Override
                        public void call(final PositionInfo positionInfo) {
                            LOGGER.debug("XXX positionInfo = {}", positionInfo);
                            mMusicMediaTrackDetailsFragment.updateView(positionInfo);
                        }
                    });
}

private void stopPositionInfoPolling() {
    LOGGER.trace("...");
    mPollingAndUpdatingTimerSubscription.unsubscribe();
}
于 2015-07-19T00:25:15.640 回答