我想处理 ea。异步查询获取(每个查询可能多次获取)。为了做到这一点,我将处理函数(返回 a Future
)传递给我的查询方法,以便为 ea 调用它。拿来。我事先不知道查询的结果大小;我只知道我提取的最大尺寸。因此,我的查询返回一个Observable
(而不是一个List
for ex。我需要事先知道大小)。唯一的问题是,当我使用Observable
create
or时apply
,它会在内部阻塞,直到 myFuture
完成,然后再调用下一个 onNext ——实际上,消除了我希望从期货中获得的性能提升。Observable
from
工厂方法不会阻塞,但它需要一个Iterable
. 我可以传递一个可变的Iterable
并随着新的获取而增长。有人有一个更具参考性的透明sol'n?这是代码:
object Repository {
def query(fetchSize: Int)(f: Set[Int] => Future[Set[Int]]): Observable[Future[Set[Int]]] = {
// observable (as opposed to list) because modeling a process
// where the total result size is unknown beforehand.
// Also, not creating or applying because it blocks the futures
val mut = scala.collection.mutable.Set[Future[Set[Int]]]()
val obs = Observable.from(mut)
1 to 2100 by fetchSize foreach { i =>
mut += f(DataSource.fetch(i, fetchSize))
}
obs
}
}