2

在 Scala 中,我编写了两个 MongoDB 可观察对象,并在传递自定义执行上下文时调用了 observable。对 observable 的调用是在第一个 observable 上进行的,但自定义执行上下文不会传播到第二个 observable。

为了帮助说明这一点,我编写了以下自包含的代码:

import java.util.concurrent.{LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}

import org.apache.commons.lang3.concurrent.BasicThreadFactory.Builder
import org.mongodb.scala.bson.collection.immutable.Document
import org.mongodb.scala.{MongoClient, Observable}

import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext}

object Test extends App {

  val client = MongoClient("mongodb://localhost")

  def insertObs = {
    client.getDatabase("test").getCollection("test").insertOne(Document("test" -> 1))
  }

  val threadPool = new ThreadPoolExecutor(2, 2, 0L,
    TimeUnit.MILLISECONDS, new LinkedBlockingQueue[Runnable],
    new Builder().namingPattern("Custom pool").build())
  val executionContext = ExecutionContext fromExecutor (threadPool)

  val obs = Observable(List(1, 2, 3))
  val res =
    obs.observeOn(executionContext).map {
      i =>
    println("OBS " + Thread.currentThread().getName)
    i
    }.flatMap(_ => insertObs.map {
      i =>
    println("INSERT " + Thread.currentThread().getName)
    i
    })
  Await.result(res.toFuture(), Duration(20, TimeUnit.SECONDS))
}

输出如下:

OBS Custom pool
INSERT Thread-2
OBS Custom pool
INSERT Thread-2
OBS Custom pool
INSERT Thread-4

我期望在执行“INSERT”可观察对象时仅将“自定义池”用作执行上下文,而不是使用 Thread-2 和 Thread-4。如此处文档中所述:

MongoDB 观察 API

具体来说,它说:为未来的操作使用特定的执行上下文

为什么自定义线程池不用于“插入”可观察对象?

4

1 回答 1

0

这似乎按预期工作:见这张票:https ://jira.mongodb.org/browse/SCALA-437

于 2018-07-30T15:55:20.780 回答