我正在使用 RxJava,我需要做两件事:
- 获取从发出的最后一个元素
Observable
- 确定是否
onError
被调用,vs。onCompleted
我已经研究过使用last
and lastOrDefault
(这实际上是我需要的行为),但我无法解决onError
隐藏最后一个元素的问题。我可以使用 Observable 两次,一次获取last
值,一次获取完成状态,但到目前为止,我只能通过创建自己的来完成此操作Observer
:
public class CacheLastObserver<T> implements Observer<T> {
private final AtomicReference<T> lastMessageReceived = new AtomicReference<>();
private final AtomicReference<Throwable> error = new AtomicReference<>();
@Override
public void onCompleted() {
// Do nothing
}
@Override
public void onError(Throwable e) {
error.set(e);
}
@Override
public void onNext(T message) {
lastMessageReceived.set(message);
}
public Optional<T> getLastMessageReceived() {
return Optional.ofNullable(lastMessageReceived.get());
}
public Optional<Throwable> getError() {
return Optional.ofNullable(error.get());
}
}
我自己制作没有问题Observer
,但感觉 Rx 应该能够更好地满足“获取完成前发出的最后一个元素”的用例。关于如何做到这一点的任何想法?