1

我想在 RxJava 中实现一个下载一些文件的处理队列。我要下载的文件数量可能多达 100 个左右。

一切都是在 Android 上使用 RxJava 1.1.1 开发的

我当前的实现看起来像这样:

PublishSubject<URL> publishSubject = PublishSubject.create();
_subject = new SerializedSubject<>(publishSubject);

_subscription = _subject
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .subscribe(_getObserver());  // Observer

_subject.onBackpressureBuffer(10000, new Action0() {
    @Override
    public void call() {
        Log.d(TAG, "onBackpressureBuffer called");
        return;
    }
});

// download a file
_subject.onNext(aValidURL);

Where_getObserver()返回一个新的观察者对象,该对象在“onNext”方法中下载到文件中。

但是,我的问题是我很快得到了一个MissingBackpreasureException我不明白的。我尝试实现 a backpreasurebuffer,但它似乎没有被调用。

我究竟做错了什么?

4

1 回答 1

2

在 RxJava 中,当你应用一个操作符时,你会得到一个新的 Observable 实例,该实例具有修改后的行为,但原始实例保持不变。在这里,您调用onBackpressureBuffer_subject但不使用它的结果,否则调用什么也不做。您需要按顺序应用它:

PublishSubject<URL> publishSubject = PublishSubject.create();
_subject = new SerializedSubject<>(publishSubject);

_subscription = _subject
                .onBackpressureBuffer(10000, new Action0() {
                    @Override
                    public void call() {
                        Log.d(TAG, "onBackpressureBuffer called");
                        return;
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .subscribe(_getObserver());  // Observer

// download a file
_subject.onNext(aValidURL);
于 2016-02-22T11:27:35.767 回答