我已经开始在我们的 scala akka-http 项目中使用 mongo scala 驱动程序,它有很大的帮助,尤其是 v2.0.0 中的案例类支持非常好。我试图围绕如何使用具有非默认执行上下文的mongo scala驱动程序来使用observeOn。
由于我们的 java 库依赖项的性质,我使用阻塞调用从 MongoDB 获取结果,如此处所示Helpers。我已经使用如下所示的 observeOn 稍微修改了 MongoDB Helpers的结果和 headResult 函数,但我注意到一些奇怪的竞争条件,我不知道如何解决。
trait ImplicitObservable[C] {
val observable: Observable[C]
val converter: (C) => String
def headResult()(implicit executionContext: ExecutionContext) = Await.result(observable.observeOn(executionContext).head(), Duration(10, TimeUnit.SECONDS))
def results()(implicit executionContext: ExecutionContext): List[C] = Await.result(observable.observeOn(executionContext).toFuture(), Duration(20, TimeUnit.SECONDS)).toList
}
结果函数不会返回我期望的所有记录,并且每次行为都不同,除非我使用只允许一个线程的 akka PinnedDispatcher 。由于它是一个阻塞操作,我想使用一个非默认的 akka 调度程序,这样它就不会阻塞我的 HTTP 请求。如果有人可以帮助我,我真的很感激。
# looking up dispatcher
val executionContext = system.dispatchers.lookup("mongo-dispatcher")
# application.conf
mongo-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 100
}
throughput = 1
}
我的示例数据库客户端代码:
def getPersonById(personId: String): PersonCaseClass = {
personCollection.find[PersonCaseClass](equal("_id", "person_12345")).first().headResult()
}