9

我有一个代码,它调用 couchbase 来获取一些行,如下所示:

val gotValues: Observable[JsonDocument] = Observable.from(rowKeys).flatMap(id =>
      couchbaseBucket.async().get(id))

如果我有 1,2,3,4,5,6 作为输入行键并且数据库中只存在第 1,2,3 行,那么 observable 只会收到大约 1,2,3 的通知。

然而,我的要求是我返回一个 1、2、3 为真(存在于数据库中)和 4、5、6 为假的地图(意味着数据库中不存在)。我设法用 scala observable 做到了这一点,但是我使用中间地图数据结构来返回包含所有 id 的总地图。下面是一个模拟我的问题的示例代码..

object Main extends App {
  import rx.lang.scala.Observable

  val idsToFetch = Seq(1,2,3,4,5,6)

  println(isInDBOrNot()) // {1=true, 2=true, 3=true, 4=false, 5=false, 6=false}

  private def isInDBOrNot(): ConcurrentHashMap[Int, Boolean] = {
    val inAndNotInDB = new java.util.concurrent.ConcurrentHashMap[Int, Boolean]
    // - How can I avoid the additional data structure?
    // - In this case a map, so that the function will return
    //   a map with all numbers and for each if exist in DB?
    // - I mean I want the function to return a map I don't 
    //   want to populate that map inside the observer,
    //   it's like a mini side effect I would rather simply 
    //   manipulate the stream.

    Observable.from(idsToFetch)
      .filterNot(x => x == 4 || x == 5 || x == 6) // Simulate fetch from DB, 4,5,6 do not exist in DB, so not returned.
      .subscribe(
      x => inAndNotInDB.put(x, true),
      e => println(e),
      () => idsToFetch.filterNot(inAndNotInDB.containsKey)
        .foreach(inAndNotInDB.put(_, false)) // mark all non-found as false.
    )

    inAndNotInDB
  }

}

无论如何要在没有中间映射的情况下做到这一点(不填充中间数据结构,但只能通过操纵流)?看起来不干净!!. 谢谢。

4

3 回答 3

4

您的问题似乎源于您使用的事实,flatMap因此如果数据库中没有给定的数据id并且您得到一个空的Observable,则flatMap不会为此类产生任何输出id。所以看起来你需要的是defaultIfEmpty,它被翻译成 Scala 的orElse. 您可以使用orElse在内部返回一些默认值flatMap。所以要修改你的例子:

def fetchFromDb(id: Int): Observable[String] = {
  if (id <= 3)
    Observable.just(s"Document #$id")
  else
    Observable.empty
}

def gotValue(idsToFetch: Seq[Int]): Observable[(Int, Boolean)] = {
  Observable.from(idsToFetch).flatMap((id: Int) => fetchFromDb(id).map(_ => (id, true)).orElse((id, false)))
}

println(gotValue(Seq(1, 2, 3, 4, 5, 6)).toBlocking.toList)

哪个打印

列表((1,true), (2,true), (3,true), (4,false), (5,false), (6,false))

或者您可以使用Option返回Some(JsonDocument)None诸如

def gotValueEx(idsToFetch: Seq[Int]): Observable[(Int, Option[String])] = {
  Observable.from(idsToFetch).flatMap((id: Int) => fetchFromDb(id).map(doc => (id, Option(doc))).orElse((id, None)))
}

println(gotValueEx(Seq(1, 2, 3, 4, 5, 6)).toBlocking.toList)

哪个打印

List((1,Some(Document #1)), (2,Some(Document #2)), (3,Some(Document #3)), (4,None), (5,None), (6,没有任何))

于 2017-11-22T00:31:47.850 回答
1

一种方法如下:

(1)将 ids 序列转换为Observableand mapit with

id => (id, false)

...所以你会得到一个 observable 类型Observable[(Int, Boolean)](让我们称之为新的 observable first)。

(2)从数据库中获取数据,并将map每个获取的行从:

(some_id, true)

...内部Observable[(Int, Boolean)](让我们称之为 observable last

(3) 连接 firstlast

(4) toMap (3)的结果。来自的重复元素first将被丢弃。(这将是你的resultObsrvable

(5)(可能)收集可观察的(你的地图)的第一个也是唯一的元素。您可能根本不想这样做,但如果您这样做,您应该真正了解此时阻止收集结果的含义。无论如何,这一步实际上取决于您的应用程序细节(如何组织 threading\scheduling\io),但蛮力方法应该看起来像这样(有关更多细节,请参阅此演示):

Await.result(resultObsrvable.toBlocking.toFuture, 2 seconds)
于 2017-11-17T00:50:57.833 回答
1

这个怎么样

Observable.from(idsToFetch)
        .filterNot(x => x._1 == 4 || x._1 == 5 || x._1 == 6)
        .foldLeft(idToFetch.map{_->false}.toMap){(m,id)=>m+(id->true)}
于 2017-11-20T12:46:00.083 回答