1

我在 Play 框架的控制器中遇到了奇怪的行为。我正在使用 elastic4s 连接到 Elasticsearch 集群,并且取决于建立连接的精确时间,它要么工作要么不工作。我目前为止的最小示例如下所示:

class EsController extends Controller {


  def buildClient() = ElasticClient.transport(Settings.builder
      .put("cluster.name", "es").build,
    "127.0.0.1:9300")

  val eagerClient = buildClient()
  lazy val lazyClient = buildClient()


  object ElasticConnection extends ElasticDsl {

    def eagerStats = eagerClient.execute(get cluster stats)

    def lazyStats = lazyClient.execute(get cluster stats)
  }

  // accessible via GET /eagerStats
  def eagerStats = Action.async {
    ElasticConnection.eagerStats map (s => Ok(s.toString))
  }

  // accessible via GET /lazyStats
  def lazyStats = Action.async {
    ElasticConnection.lazyStats map (s => Ok(s.toString))
  }
}

我通过启动应用程序sbt run。然后我可以尝试获取两个端点。curl localhost:9000/lazyStats/工作正常,与我的 ES 节点对话并返回正确的统计信息。curl localhost:9000/eagerStats/从 ES 传输层抛出异常(最后的堆栈跟踪)。由于我的代码在两种情况下都是相同的(唯一的区别是valvs lazy val)我想构造函数是以一种奇怪的方式实例化的。任何人都可以确认或否认吗?

作为旁注,我可以创建一个单独的类来处理 ES 连接和@Inject它(可能也使它成为一个@Singleton)——这可能是首选的解决方案。不过,我发现所描述的行为很奇怪,我很高兴看到解释。

elastic4s是官方 ES Java 驱动程序的一个小包装器,我非常有信心在这种情况下不会发生任何魔法。

更新:我在与独立进程相同的机器上运行单节点 ES 集群。它配置正确,甚至还启动并运行了一个 kopf 插件。REST 接口工作正常;如果我使用延迟初始化的连接,传输接口也可以工作。

我正在使用 ES 2.2.0(客户端和服务器端)、Play 2.4.6、scala 2.11.7 和 SBT 0.13.8。

来自急切连接的堆栈跟踪:

play.api.http.HttpErrorHandlerExceptions$$anon$1: Execution exception[[NoNodeAvailableException: None of the configured nodes are available: [{#transport#-1}{127.0.0.1}{127.0.0.1:9300}]]]
at play.api.http.HttpErrorHandlerExceptions$.throwableToUsefulException(HttpErrorHandler.scala:265) ~[play_2.11-2.4.6.jar:2.4.6]
at play.api.http.DefaultHttpErrorHandler.onServerError(HttpErrorHandler.scala:191) ~[play_2.11-2.4.6.jar:2.4.6]
at play.api.GlobalSettings$class.onError(GlobalSettings.scala:179) [play_2.11-2.4.6.jar:2.4.6]
at play.api.DefaultGlobal$.onError(GlobalSettings.scala:212) [play_2.11-2.4.6.jar:2.4.6]
at play.api.http.GlobalSettingsHttpErrorHandler.onServerError(HttpErrorHandler.scala:94) [play_2.11-2.4.6.jar:2.4.6]
at play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$9$$anonfun$apply$1.applyOrElse(PlayDefaultUpstreamHandler.scala:151) [play-netty-server_2.11-2.4.6.jar:2.4.6]
at play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$9$$anonfun$apply$1.applyOrElse(PlayDefaultUpstreamHandler.scala:148) [play-netty-server_2.11-2.4.6.jar:2.4.6]
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) [scala-library-2.11.7.jar:na]
at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216) [scala-library-2.11.7.jar:na]
at scala.util.Try$.apply(Try.scala:192) [scala-library-2.11.7.jar:na]
at scala.util.Failure.recover(Try.scala:216) [scala-library-2.11.7.jar:na]
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) [scala-library-2.11.7.jar:na]
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) [scala-library-2.11.7.jar:na]
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) [scala-library-2.11.7.jar:na]
at play.api.libs.iteratee.Execution$trampoline$.executeScheduled(Execution.scala:109) [play-iteratees_2.11-2.4.6.jar:2.4.6]
at play.api.libs.iteratee.Execution$trampoline$.execute(Execution.scala:71) [play-iteratees_2.11-2.4.6.jar:2.4.6]
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) [scala-library-2.11.7.jar:na]
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) [scala-library-2.11.7.jar:na]
at scala.concurrent.Promise$class.complete(Promise.scala:55) [scala-library-2.11.7.jar:na]
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153) [scala-library-2.11.7.jar:na]
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:23) [scala-library-2.11.7.jar:na]
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) [akka-actor_2.11-2.3.13.jar:na]
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) [akka-actor_2.11-2.3.13.jar:na]
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [scala-library-2.11.7.jar:na]
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [scala-library-2.11.7.jar:na]
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.11.7.jar:na]
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.11.7.jar:na]
Caused by: org.elasticsearch.client.transport.NoNodeAvailableException: None of the configured nodes are available: [{#transport#-1}{127.0.0.1}{127.0.0.1:9300}]
at org.elasticsearch.client.transport.TransportClientNodesService.ensureNodesAreAvailable(TransportClientNodesService.java:290) ~[elasticsearch-2.2.0.jar:2.2.0]
at org.elasticsearch.client.transport.TransportClientNodesService.execute(TransportClientNodesService.java:207) ~[elasticsearch-2.2.0.jar:2.2.0]
at org.elasticsearch.client.transport.support.TransportProxyClient.execute(TransportProxyClient.java:55) ~[elasticsearch-2.2.0.jar:2.2.0]
at org.elasticsearch.client.transport.TransportClient.doExecute(TransportClient.java:286) ~[elasticsearch-2.2.0.jar:2.2.0]
at org.elasticsearch.client.support.AbstractClient.execute(AbstractClient.java:351) ~[elasticsearch-2.2.0.jar:2.2.0]
at org.elasticsearch.client.support.AbstractClient$ClusterAdmin.execute(AbstractClient.java:845) ~[elasticsearch-2.2.0.jar:2.2.0]
at org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:85) ~[elasticsearch-2.2.0.jar:2.2.0]
at com.sksamuel.elastic4s.admin.ClusterDsl$ClusterStatsExecutable$$anonfun$apply$2.apply(ClusterDsl.scala:24) ~[elastic4s-core_2.11-2.2.0.jar:2.2.0]
at com.sksamuel.elastic4s.admin.ClusterDsl$ClusterStatsExecutable$$anonfun$apply$2.apply(ClusterDsl.scala:24) ~[elastic4s-core_2.11-2.2.0.jar:2.2.0]
at com.sksamuel.elastic4s.Executable$class.injectFuture(Executable.scala:30) ~[elastic4s-core_2.11-2.2.0.jar:2.2.0]
at com.sksamuel.elastic4s.admin.ClusterDsl$ClusterStatsExecutable$.injectFuture(ClusterDsl.scala:21) ~[elastic4s-core_2.11-2.2.0.jar:2.2.0]
at com.sksamuel.elastic4s.admin.ClusterDsl$ClusterStatsExecutable$.apply(ClusterDsl.scala:24) ~[elastic4s-core_2.11-2.2.0.jar:2.2.0]
at com.sksamuel.elastic4s.admin.ClusterDsl$ClusterStatsExecutable$.apply(ClusterDsl.scala:21) ~[elastic4s-core_2.11-2.2.0.jar:2.2.0]
at com.sksamuel.elastic4s.ElasticClient.execute(ElasticClient.scala:20) ~[elastic4s-core_2.11-2.2.0.jar:2.2.0]
at controllers.BookController$ElasticConnection$.eagerStats(BookController.scala:35) ~[classes/:na]
at controllers.BookController$$anonfun$eagerStats$1.apply(BookController.scala:41) ~[classes/:na]
at controllers.BookController$$anonfun$eagerStats$1.apply(BookController.scala:41) ~[classes/:na]
at play.api.mvc.ActionBuilder$$anonfun$async$1.apply(Action.scala:456) ~[play_2.11-2.4.6.jar:2.4.6]
at play.api.mvc.ActionBuilder$$anonfun$async$1.apply(Action.scala:456) ~[play_2.11-2.4.6.jar:2.4.6]
at play.api.mvc.Action$.invokeBlock(Action.scala:533) ~[play_2.11-2.4.6.jar:2.4.6]
at play.api.mvc.Action$.invokeBlock(Action.scala:530) ~[play_2.11-2.4.6.jar:2.4.6]
at play.api.mvc.ActionBuilder$$anon$1.apply(Action.scala:493) ~[play_2.11-2.4.6.jar:2.4.6]
at play.api.mvc.Action$$anonfun$apply$1$$anonfun$apply$4$$anonfun$apply$5.apply(Action.scala:105) ~[play_2.11-2.4.6.jar:2.4.6]
at play.api.mvc.Action$$anonfun$apply$1$$anonfun$apply$4$$anonfun$apply$5.apply(Action.scala:105) ~[play_2.11-2.4.6.jar:2.4.6]
at play.utils.Threads$.withContextClassLoader(Threads.scala:21) ~[play_2.11-2.4.6.jar:2.4.6]
at play.api.mvc.Action$$anonfun$apply$1$$anonfun$apply$4.apply(Action.scala:104) ~[play_2.11-2.4.6.jar:2.4.6]
at play.api.mvc.Action$$anonfun$apply$1$$anonfun$apply$4.apply(Action.scala:103) ~[play_2.11-2.4.6.jar:2.4.6]
at scala.Option.map(Option.scala:146) ~[scala-library-2.11.7.jar:na]
at play.api.mvc.Action$$anonfun$apply$1.apply(Action.scala:103) ~[play_2.11-2.4.6.jar:2.4.6]
at play.api.mvc.Action$$anonfun$apply$1.apply(Action.scala:96) ~[play_2.11-2.4.6.jar:2.4.6]
at play.api.libs.iteratee.DoneIteratee$$anonfun$mapM$2.apply(Iteratee.scala:741) ~[play-iteratees_2.11-2.4.6.jar:2.4.6]
at play.api.libs.iteratee.DoneIteratee$$anonfun$mapM$2.apply(Iteratee.scala:741) ~[play-iteratees_2.11-2.4.6.jar:2.4.6]
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) [scala-library-2.11.7.jar:na]
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) [scala-library-2.11.7.jar:na]
... 6 common frames omitted
4

1 回答 1

1

结果证明这是一个奇怪的比赛条件,所以你最好现在停止阅读。

这是我对发生的情况的假设:

竞争条件是由 ES java 驱动处理连接的方式引起的,与 Play 或 elastic4s 无关。

ElasticClient.transport(...)在建立连接之前不会阻塞;它只用给定的设置初始化驱动程序。这会导致驱动程序尝试连接到 ES 集群,但连接尝试在后台是异步的。如果驱动程序在请求任何 API 调用之前设法建立连接,则一切正常。另一方面,如果在驱动程序初始化之后立即进行任何 API 调用并且还没有连接,那么 API 调用将失败并出现这个确切的异常。在通常情况下,这一切都发生得非常快,不会造成任何麻烦。我怀疑从交互式 sbt 会话运行会增加复杂性,因为 Play 自动重新加载 - 内存是有限的(SBT 本身消耗的部分),并且 JVM 负载很重,因为编译、Play 引导和 ES 连接初始化都是根据请求进行的。

于 2016-02-17T20:24:21.960 回答