我通过查看 onErrorResumeNext 解决了这个问题。复制粘贴一些代码并根据我的需要进行修改。我不认为这是完美的,但它确实完成了他的工作。如果您发现一些可能的错误,请发表评论。
public final class FlowableOnPredicateNext<T> extends AbstractFlowableWithUpstream<T, T> {
private final Predicate<? super T> predicate;
private final Function<? super T, ? extends Publisher<? extends T>> next;
public FlowableOnPredicateNext(Flowable<T> source, Predicate<? super T> predicate,
Function<? super T, ? extends Publisher<? extends T>> next) {
super(source);
this.predicate = predicate;
this.next = next;
}
@Override
protected void subscribeActual(Subscriber<? super T> s) {
OnPredicateNextSubscriber<T> parent = new OnPredicateNextSubscriber<>(s, next, predicate);
s.onSubscribe(parent.arbiter);
source.subscribe(parent);
}
static final class OnPredicateNextSubscriber<T> implements FlowableSubscriber<T> {
private final Subscriber<? super T> actual;
private final Predicate<? super T> predicate;
private final SubscriptionArbiter arbiter;
private final Function<? super T, ? extends Publisher<? extends T>> nextSupplier;
private boolean switched = false;
OnPredicateNextSubscriber(Subscriber<? super T> actual,
Function<? super T, ? extends Publisher<? extends T>>
nextSupplier, Predicate<? super T> predicate) {
this.actual = actual;
this.predicate = predicate;
this.nextSupplier = nextSupplier;
this.arbiter = new SubscriptionArbiter();
}
@Override
public void onSubscribe(Subscription s) {
arbiter.setSubscription(s);
}
@Override
public void onNext(T t) {
try {
if (!switched && predicate.test(t)) {
Publisher<? extends T> p;
p = nextSupplier.apply(t);
p.subscribe(this);
switched = true;
} else {
actual.onNext(t);
}
} catch (Exception e) {
actual.onError(e);
}
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
}
}
使用 Kotlin 我写了一个扩展函数:
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
fun <E, T : Flowable<E>> T.onPredicateResumeNext(predicate: Predicate<E>, resumeFunction: io.reactivex.functions.Function<E, Publisher<E>>): Flowable<E> {
return RxJavaPlugins.onAssembly<E>(FlowableOnPredicateNext<E>(this,
predicate,
resumeFunction
))
}
我现在像这样使用它:
override fun getOpenHunt(teamId: String): Flowable<Optional<Hunt>> {
return created().getOpenHunt(teamId)
.map {
Optional.create(it.firstOrNull())
}
.onPredicateResumeNext(predicate = Predicate { it.element() != null },
resumeFunction = Function {
created()
.getHunt(it.element()!!.id)
.map<Optional<Hunt>> {
Optional.create(it.firstOrNull())
}
})
}