0

我正在使用 scala observables 从 couchbase 获取项目,然后我使用 map、flatMap、zip 来转换结果。问题是,如果 couchbase 中不存在某个项目,那么例如,.zip它不会仅调用 onComplete。例子:

import rx.lang.scala._

def getIdsWithValues(ids: Seq[String]): Map[K, V] = {
  val values = Observable.from(keyValueIds).flatMap(id => couchbaseBucket.async().get(id))
  values.zip(Observable.from(ids)) // zip is not called if no row in couchbase with id.
  ...
}

所以我想要:

  1. 返回 k -> v 的映射
  2. 我让 .zip 将 k 耦合到返回的 v (我希望 v 类似于Noneif 不存在。
  3. 我看到如果 db 中不存在任何项目,则根本不调用 zip。

在运行上面的代码后,我想到了扫描ids输入参数,并为每个没有用 value 压缩的参数添加一个 id 到它的值,但这就像添加另一个流,我希望 zip 处理现有和不存在的行。

我该如何处理?如何.zip处理现有行和非现有行?

4

1 回答 1

1

不要使用zip()运算符。相反,只需使用flatMap()and materialize().take(1)materialize()会将onComplete()事件转换为Notification您可以映射到None的 a ,而Notification带有值的 a 将被映射到Some(value)

def getIdsWithValues(ids: Seq[String]): Map[K, V] = {
  val values = Observable.from(keyValueIds)
   .flatMap(id => couchbaseBucket.async()
     .get(id)
     .materialize()
     .take(1)
     .map( res => if ( res.isOnComplete() )
                     (id, None)
                  else 
                     (id, Some(res.getValue))
  ...
}
于 2017-11-08T21:37:54.217 回答