4

我已经开始在我们的 scala akka-http 项目中使用 mongo scala 驱动程序,它有很大的帮助,尤其是 v2.0.0 中的案例类支持非常好。我试图围绕如何使用具有非默认执行上下文的mongo scala驱动程序来使用observeOn

由于我们的 java 库依赖项的性质,我使用阻塞调用从 MongoDB 获取结果,如此处所示Helpers。我已经使用如下所示的 observeOn 稍微修改了 MongoDB Helpers的结果和 headResult 函数,但我注意到一些奇怪的竞争条件,我不知道如何解决。

  trait ImplicitObservable[C] {
    val observable: Observable[C]
    val converter: (C) => String

    def headResult()(implicit executionContext: ExecutionContext) = Await.result(observable.observeOn(executionContext).head(), Duration(10, TimeUnit.SECONDS))

    def results()(implicit executionContext: ExecutionContext): List[C] = Await.result(observable.observeOn(executionContext).toFuture(), Duration(20, TimeUnit.SECONDS)).toList
  }

结果函数不会返回我期望的所有记录,并且每次行为都不同,除非我使用只允许一个线程的 akka PinnedDispatcher 。由于它是一个阻塞操作,我想使用一个非默认的 akka 调度程序,这样它就不会阻塞我的 HTTP 请求。如果有人可以帮助我,我真的很感激。

  # looking up dispatcher 
  val executionContext = system.dispatchers.lookup("mongo-dispatcher")

  # application.conf
  mongo-dispatcher {
    type = Dispatcher
    executor = "thread-pool-executor"
    thread-pool-executor {
      fixed-pool-size = 100
    }
    throughput = 1
  }

我的示例数据库客户端代码:

def getPersonById(personId: String): PersonCaseClass = {
    personCollection.find[PersonCaseClass](equal("_id", "person_12345")).first().headResult()
}
4

0 回答 0