7

出于某种原因,我无法解决这个问题。我有一个与Play一起运行的应用程序,它调用Elastic Search。作为我设计的一部分,我的服务使用了用 scala future 包装的 Java API,如这篇文所示。我已经更新了该帖子中的代码,以提示 ExecutionContext 它将执行一些阻塞 I/O,如下所示:

    import scala.concurent.{blocking, Future, Promise}
    import org.elasticsearch.action.{ActionRequestBuilder, ActionListener, ActionResponse }
    def execute[RB <: ActionRequestBuilder[_, T, _, _]](request: RB): Future[T] = {
        blocking {
            request.execute(this)
            promise.future
        }
    }

我构造查询以发送到 ES 的实际服务将 executionContext 作为构造函数参数,然后用于调用弹性搜索。我这样做是为了让 play 使用的全局执行上下文不会因为对 ES 的阻塞调用而束缚住它的线程。这个 SO 评论提到只有全局上下文是阻塞感知的,所以我不得不创建自己的。在同一个帖子/答案中,有很多关于使用 ForkJoin 池的信息,但我不确定如何将这些文档中的内容与阻塞文档中的提示结合起来,以创建响应阻塞的执行上下文提示。

我认为我遇到的一个问题是我不确定首先如何响应阻塞上下文?我正在阅读最佳实践,它使用的示例是无限的线程缓存:

请注意,这里我更喜欢使用无界的“缓存线程池”,因此它没有限制。在进行阻塞 I/O 时,想法是您必须有足够的线程可以阻塞。但是,如果 unbounded 太多了,根据用例,您可以稍后对其进行微调,这个示例的想法是让球滚动。

那么这是否意味着对于我的 ForkJoin 支持的线程池,我应该在处理非阻塞 I/O 时尝试使用缓存线程并为阻塞 IO 创建一个新线程?或者是其他东西?我在网上找到的关于使用单独线程池的几乎所有资源都倾向于执行Neophytes 指南所做的事情,也就是说:

如何调整各种线程池高度依赖于您的个人应用程序,超出了本文的范围。

我知道这取决于您的应用程序,但在这种情况下,如果我只想创建某种类型的阻塞感知 ExecutionContext 并了解管理线程的体面策略。如果上下文专门针对应用程序的单个部分,我应该只制作固定的线程池大小而不是blocking首先使用/忽略关键字吗?

我倾向于漫无边际,所以我会尝试分解我在答案中寻找的内容:

  1. 代码!阅读所有这些文档仍然让我感觉自己无法编写阻塞感知上下文,我真的很感激一个例子。
  2. 关于如何处理阻塞线程的任何链接或提示,即无休止地为它们创建一个新线程,检查可用线程的数量,如果太多则拒绝,其他一些策略
  3. 我不是在这里寻找性能提示,我知道我只能通过测试来获得它,但是如果我一开始就无法弄清楚如何对上下文进行编码,我就无法测试!我确实找到了 ForkJoins 与线程池的示例,但我错过了blocking那里的关键部分。

抱歉,这里的问题很长,我只是想让您了解我在看什么,并且我已经尝试了一天多的时间来解决这个问题,并且需要一些外部帮助。


编辑:为了清楚起见,ElasticSearch 服务的构造函数签名是:

//Note that these are not implicit parameters!
class ElasticSearchService(otherParams ..., val executionContext: ExecutionContext)

在我的应用程序启动代码中,我有这样的东西:

object Global extends GlobalSettings {
    val elasticSearchContext = //Custom Context goes here
    ...
    val elasticSearchService = new ElasticSearchService(params, elasticSearchContext);
    ...
}

我也在阅读Play 对 contexts 的建议,但还没有看到任何关于阻止提示的内容,我怀疑我可能需要查看源代码以查看它们是否扩展了BlockContext特征。

4

1 回答 1

3

因此,我深入研究了文档,并且针对我正在处理的情况, Play 的最佳实践是

在某些情况下,您可能希望将工作分派到其他线程池。这可能包括 CPU 繁重的工作或 IO 工作,例如数据库访问。为此,您应该首先创建一个线程池,这可以在 Scala 中轻松完成:

并提供了一些代码:

object Contexts {
    implicit val myExecutionContext: ExecutionContext = Akka.system.dispatchers.lookup("my-context")
}

上下文来自 Akka,所以我跑到那里搜索他们提供的默认值和上下文类型,最终导致我找到了关于 dispatchers 的文档。默认是ForkJoinPool,其管理块的默认方法是调用managedBlock(blocker). 这使我阅读了以下说明的文档:

根据给定的阻止程序阻止。如果当前线程是 ForkJoinWorkerThread,则此方法可能会安排在必要时激活备用线程,以确保在当前线程被阻塞时有足够的并行性。

所以看起来如果我有一个ForkJoinWorkerThread那么我认为我想要的行为就会发生。再看一下 ForkJoinPool 的来源,我注意到默认线程工厂是:

val defaultForkJoinWorkerThreadFactory: ForkJoinWorkerThreadFactory = juc.ForkJoinPool.defaultForkJoinWorkerThreadFactory

这对我来说意味着,如果我使用 Akka 中的默认值,我将获得一个以我期望的方式处理阻塞的上下文。

因此,再次阅读 Akka 文档似乎指定我的上下文是这样的:

my-context {
  type = Dispatcher
  executor = "fork-join-executor"
  fork-join-executor {
    parallelism-min = 8
    parallelism-factor = 3.0
    parallelism-max = 64
    task-peeking-mode = "FIFO"
  }
  throughput = 100
}

会是我想要的。

当我在源代码中搜索时,我做了一些寻找blocking或调用的用途,并找到了一个在ThreadPoolBuildermanagedBlock中覆盖 ForkJoin 行为的示例

private[akka] class AkkaForkJoinWorkerThread(_pool: ForkJoinPool) extends ForkJoinWorkerThread(_pool) with BlockContext {
    override def blockOn[T](thunk: ⇒ T)(implicit permission: CanAwait): T = {
      val result = new AtomicReference[Option[T]](None)
      ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
        def block(): Boolean = {
          result.set(Some(thunk))
          true
        }
        def isReleasable = result.get.isDefined
      })
      result.get.get // Exception intended if None
    }
  }

似乎是我最初要求的作为如何制作实现 BlockContext 的示例的示例。该文件还包含显示如何制作 ExecutorServiceFactory 的代码,我认为这是executor配置部分的引用。所以我想如果我想要一个完全自定义的上下文我会做的是扩展某种类型的 WorkerThread 并编写我自己的使用自定义工作线程的 ExecutorServiceFactory ,然后像这篇文章建议的那样在属性中指定完全限定的类名。

我可能会使用 Akka 的 forkjoin :)

于 2016-01-28T17:34:37.370 回答