我知道这违反了很多 Rx 规则,但我真的很喜欢RxJava-JDBC,我的队友也很喜欢。关系数据库是我们工作的核心,Rx 也是如此。
但是,在某些情况下,我们不想作为 an 发出,Observable<ResultSet>
而是希望只使用基于拉取的 Java 8Stream<ResultSet>
或 Kotlin Sequence<ResultSet>
。但是我们非常习惯 RxJava-JDBC 库,它只返回一个Observable<ResultSet>
.
因此,我想知道是否有一种方法可以使用扩展功能将 anObservable<ResultSet>
转换为 a Sequence<ResultSet>
,而不进行任何中间收集或toBlocking()
调用。以下是我目前所拥有的一切,但我现在正在尝试连接基于推拉的系统,我无法缓冲,因为ResultSet
每次onNext()
调用都是有状态的。这是不可能完成的任务吗?
import rx.Observable
import rx.Subscriber
import java.sql.ResultSet
fun Observable<ResultSet>.asSequence() = object: Iterator<ResultSet>, Subscriber<ResultSet>() {
private var isComplete = false
override fun onCompleted() {
isComplete = true
}
override fun onError(e: Throwable?) {
throw UnsupportedOperationException()
}
override fun onNext(rs: ResultSet?) {
throw UnsupportedOperationException()
}
override fun hasNext(): Boolean {
throw UnsupportedOperationException()
}
override fun next(): ResultSet {
throw UnsupportedOperationException()
}
}.asSequence()