1

使用RxJava2 RxKotlinand Room,我需要查询数据库以进行公开搜索。这意味着我搜索包含名为closedvalue的属性的狩猎false。找到狩猎后,它需要将查询切换到特定的狩猎。

对于这些查询,我有 2 种方法:

getOpenHunt(teamId:String): Flowable<List<Hunt>>
getHunt(huntId:String): Flowable<List<Hunt>>

它们都返回 a List,否则在没有找到寻线时查询会卡住。

我的想法是这样的

fun queryHunt(teamId:String):Flowable<Optional<Hunt>>{
   getOpenHunt(teamId)
      .map<Optional<Hunt>> {
                Optional.create(it.firstOrNull())
            }
      .switchToFlowableIf ( it is Optional.Some, getHunt(it.element().id)
}

//With switchToFlowableIf's being
fun <E:Any> switchToFlowableIf(condition: (E)->Boolean, newFlowable: Flowable<E>): Flowable<E>
//It should unsubscribe from getOpenHunt and subscribe to newFlowable

供参考,这是我的Optional

sealed class Optional<out T> {
class Some<out T>(val element: T) : Optional<T>()
object None : Optional<Nothing>()

fun element(): T? {
    return when (this) {
        is Optional.None -> null
        is Optional.Some -> element
    }
}

companion object {
    fun <T> create(element: T?): Optional<T> {
        return if (element != null) {
            Optional.Some(element)
        } else {
            Optional.None
        }
    }
}
}

RxJava2 中是否已经内置了类似的方法?如果没有,您将如何实施?

4

2 回答 2

1

我通过查看 onErrorResumeNext 解决了这个问题。复制粘贴一些代码并根据我的需要进行修改。我不认为这是完美的,但它确实完成了他的工作。如果您发现一些可能的错误,请发表评论。

public final class FlowableOnPredicateNext<T> extends AbstractFlowableWithUpstream<T, T> {

private final Predicate<? super T>                                  predicate;
private final Function<? super T, ? extends Publisher<? extends T>> next;

public FlowableOnPredicateNext(Flowable<T> source, Predicate<? super T> predicate,
                               Function<? super T, ? extends Publisher<? extends T>> next) {
    super(source);
    this.predicate = predicate;
    this.next = next;
}

@Override
protected void subscribeActual(Subscriber<? super T> s) {
    OnPredicateNextSubscriber<T> parent = new OnPredicateNextSubscriber<>(s, next, predicate);
    s.onSubscribe(parent.arbiter);
    source.subscribe(parent);
}

static final class OnPredicateNextSubscriber<T> implements FlowableSubscriber<T> {

    private final Subscriber<? super T>                                 actual;
    private final Predicate<? super T>                                  predicate;
    private final SubscriptionArbiter                                   arbiter;
    private final Function<? super T, ? extends Publisher<? extends T>> nextSupplier;
    private boolean switched = false;

    OnPredicateNextSubscriber(Subscriber<? super T> actual,
                              Function<? super T, ? extends Publisher<? extends T>>
                                      nextSupplier, Predicate<? super T> predicate) {
        this.actual = actual;
        this.predicate = predicate;
        this.nextSupplier = nextSupplier;
        this.arbiter = new SubscriptionArbiter();
    }

    @Override
    public void onSubscribe(Subscription s) {
        arbiter.setSubscription(s);
    }

    @Override
    public void onNext(T t) {
        try {
            if (!switched && predicate.test(t)) {
                Publisher<? extends T> p;
                p = nextSupplier.apply(t);
                p.subscribe(this);
                switched = true;
            } else {
                actual.onNext(t);
            }
        } catch (Exception e) {
            actual.onError(e);
        }
    }

    @Override
    public void onError(Throwable t) {
        actual.onError(t);
    }

    @Override
    public void onComplete() {
        actual.onComplete();
    }
}
}

使用 Kotlin 我写了一个扩展函数:

@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
fun <E, T : Flowable<E>> T.onPredicateResumeNext(predicate: Predicate<E>, resumeFunction: io.reactivex.functions.Function<E, Publisher<E>>): Flowable<E> {
return RxJavaPlugins.onAssembly<E>(FlowableOnPredicateNext<E>(this,
                                                              predicate,
                                                              resumeFunction
                                                             ))
}

我现在像这样使用它:

override fun getOpenHunt(teamId: String): Flowable<Optional<Hunt>> {
    return created().getOpenHunt(teamId)
            .map {
                Optional.create(it.firstOrNull())
            }
            .onPredicateResumeNext(predicate = Predicate { it.element() != null },
                                   resumeFunction = Function {
                                       created()
                                               .getHunt(it.element()!!.id)
                                               .map<Optional<Hunt>> {
                                                   Optional.create(it.firstOrNull())
                                               }
                                   })
}
于 2017-10-02T13:30:56.770 回答
0

我会通过重用您的方法来做到这一点:

getOpenHunt(teamId:String): Flowable<List<Hunt>>
getHunt(huntId:String): Flowable<List<Hunt>>

getOpenHunt(yourId) // return your Flowable<List<Hunt>>
.flatMapIterable { it } // get each result of open hunts
.flatMapMaybe {  getHunt(it.id) } // querie each result 
.toList() // back to a Maybe/Single<List<Hunt>>
.toFlowable() // back to a Flowable else it wont emit more data
.subscribeBy(onNext = { /** process your results **/ },
             onError = { /** process if no results found **/  }
            )

这意味着您查询数据库是否有符合您条件的条目。然后它得到结果,将其拆分为单个项目,使用您的结果运行多个查询。如果找到结果,则将其转换回Single<List<Hunt>>. 由于您想继续订阅,您需要将其转换回 Flowable 使用.toFlowable()

顺便说一句, change 意味着返回值发生了变化,但在你的情况下是一样的List<Hunt>

整个流程对我来说没有任何意义,因为您也可以通过 getOpenHunt 获得结果并遍历它。

于 2017-10-02T10:22:46.280 回答