我正在使用 scala observables 从 couchbase 获取项目,然后我使用 map、flatMap、zip 来转换结果。问题是,如果 couchbase 中不存在某个项目,那么例如,.zip
它不会仅调用 onComplete。例子:
import rx.lang.scala._
def getIdsWithValues(ids: Seq[String]): Map[K, V] = {
val values = Observable.from(keyValueIds).flatMap(id => couchbaseBucket.async().get(id))
values.zip(Observable.from(ids)) // zip is not called if no row in couchbase with id.
...
}
所以我想要:
- 返回 k -> v 的映射
- 我让 .zip 将 k 耦合到返回的 v (我希望 v 类似于
None
if 不存在。 - 我看到如果 db 中不存在任何项目,则根本不调用 zip。
在运行上面的代码后,我想到了扫描ids
输入参数,并为每个没有用 value 压缩的参数添加一个 id 到它的值,但这就像添加另一个流,我希望 zip 处理现有和不存在的行。
我该如何处理?如何.zip
处理现有行和非现有行?