5

我正在使用 RxJava,我需要做两件事:

  • 获取从发出的最后一个元素Observable
  • 确定是否onError被调用,vs。onCompleted

我已经研究过使用lastand lastOrDefault(这实际上是我需要的行为),但我无法解决onError隐藏最后一个元素的问题。我可以使用 Observable 两次,一次获取last值,一次获取完成状态,但到目前为止,我只能通过创建自己的来完成此操作Observer

public class CacheLastObserver<T> implements Observer<T> {

    private final AtomicReference<T> lastMessageReceived = new AtomicReference<>();
    private final AtomicReference<Throwable> error = new AtomicReference<>();

    @Override
    public void onCompleted() {
        // Do nothing
    }

    @Override
    public void onError(Throwable e) {
        error.set(e);
    }

    @Override
    public void onNext(T message) {
        lastMessageReceived.set(message);
    }

    public Optional<T> getLastMessageReceived() {
        return Optional.ofNullable(lastMessageReceived.get());
    }

    public Optional<Throwable> getError() {
        return Optional.ofNullable(error.get());
    }
}

我自己制作没有问题Observer,但感觉 Rx 应该能够更好地满足“获取完成前发出的最后一个元素”的用例。关于如何做到这一点的任何想法?

4

4 回答 4

4

尝试这个:

source.materialize().buffer(2).last()

在错误情况下,最后的发射将是两个项目的列表,即最后发射的值包装为 aNotification和错误通知。如果没有错误,第二项将是完成通知。

另请注意,如果未发出任何值,则结果将是一个项目列表,即终端通知。

于 2016-06-09T22:32:32.547 回答
1

我解决了:

source.materialize().withPrevious().last()

哪里withPrevious是(科特林):

fun <T> Observable<T>.withPrevious(): Observable<Pair<T?, T>> =
    this.scan(Pair<T?, T?>(null, null)) { previous, current -> Pair(previous.second, current) }
        .skip(1)
        .map { it as Pair<T?, T> }
于 2017-10-09T14:35:31.287 回答
0

你有没有尝试过 onErrorResumeNext 在这里你可以看到其余的或错误处理操作符https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators

于 2016-06-10T08:09:58.303 回答
0

我用这种方法来解决你的问题。

public class ExampleUnitTest {
    @Test
    public void testSample() throws Exception {
        Observable.just(1, 2, 3, 4, 5)
                .map(number -> {
                    if (number == 4)
                        throw new NullPointerException();
                    else
                        return number;
                })
                .onErrorResumeNext(t -> Observable.empty())
                .lastOrDefault(15)
                .subscribe(lastEmittedNumber -> System.out.println("onNext: " + lastEmittedNumber));
    }
}

它会发出onNext: 3

希望它有所帮助。

于 2016-06-10T13:12:53.637 回答