4

我正在使用 Vert.x 2.x ( http://vertx.io ),它广泛使用异步回调。由于典型的嵌套/回调地狱问题,这些很快就会变得笨拙。

我已经考虑过 Scala Futures/Promises(我认为这将是事实上的方法)和 Reactive Extensions (RxScala)。

从我的测试中,我发现了一些有趣的性能结果。

我的测试非常基础,我只是向 Vert.x verticle 发出一堆 HTTP 请求(通过 weighttp),它在 Vert.x 事件总线上进行异步调用,并处理一个响应,然后在 HTTP 200 中返回回复。

我发现如下(这里的性能是以每秒 HTTP 请求数来衡量的):

  • 异步回调性能 = 68,305 rps
  • 接收性能 = 64,656 rps
  • 未来/承诺性能 = 61,376 rps

测试条件为:

  • Mac Pro OS X 优胜美地 10.10.2
  • 甲骨文 JVM 1.8U25
  • weighttp 版本 0.3
  • 垂直 x 2.1.5
  • 斯卡拉 2.10.4
  • RxScala 0.23.0
  • 4 个 Web 服务垂直实例
  • 4 个后端服务 Verticle 实例

测试命令是

weighttp -n 1000000 -c 128 -7 8 -k "localhost:8888"

上面的数字是五次测试运行的平均值,最好和最差的结果。请注意,结果与呈现的平均值非常一致(不超过几百 rps 偏差)。

是否有任何已知原因导致上述情况发生 - 即每秒纯请求中的 Rx > Futures?

在我看来,响应式扩展是优越的,因为它们可以做更多的事情,但考虑到异步回调的标准方法通常似乎走上了 Futures/Promises 的轨道,我对性能下降感到惊讶。

编辑:这是 Web 服务垂直

class WebVerticle extends Verticle {
  override def start() {
    val port = container.env().getOrElse("HTTP_PORT", "8888").toInt
    val approach = container.env().getOrElse("APPROACH", "ASYNC")
    container.logger.info("Listening on port: " + port)
    container.logger.info("Using approach: " + approach)

    vertx.createHttpServer.requestHandler { req: HttpServerRequest =>
      approach match {
        case "ASYNC" => sendAsync(req, "hello")
        case "FUTURES" => sendWithFuture("hello").onSuccess { case body => req.response.end(body) }
        case "RX" => sendWithObservable("hello").doOnNext(req.response.end(_)).subscribe()
      }
    }.listen(port)
  }

  // Async callback
  def sendAsync(req: HttpServerRequest, body: String): Unit = {
    vertx.eventBus.send("service.verticle", body, { msg: Message[String] =>
      req.response.end(msg.body())
    })
  }

  // Rx 
  def sendWithObservable(body: String) : Observable[String] = {
    val subject = ReplaySubject[String]()
    vertx.eventBus.send("service.verticle", body, { msg: Message[String] =>
      subject.onNext(msg.body())
      subject.onCompleted()
    })
    subject
  }

  // Futures    
  def sendWithFuture(body: String) : Future[String] = {
    val promise = Promise[String]()
    vertx.eventBus.send("service.verticle", body, { msg: Message[String] =>
      promise.success(msg.body())
    })
    promise.future
  }
}

编辑:这是后端垂直

class ServiceVerticle extends Verticle {
  override def start(): Unit = {
    vertx.eventBus.registerHandler("service.verticle", { msg: Message[String] =>
      msg.reply("Hello Scala")
    })
  }
}
4

0 回答 0