6

我试图用有限数量的线程创建 blaze 客户端,如下所示:

object ReactiveCats extends IOApp {
  private val PORT = 8083
  private val DELAY_SERVICE_URL = "http://localhost:8080"
  
  // trying create client with limited number of threads
  val clientPool: ExecutorService = Executors.newFixedThreadPool(64)
  val clientExecutor: ExecutionContextExecutor = ExecutionContext.fromExecutor(clientPool)

  private val httpClient = BlazeClientBuilder[IO](clientExecutor).resource

  private val httpApp = HttpRoutes.of[IO] {
    case GET -> Root / delayMillis =>
      httpClient.use { client =>
        client
          .expect[String](s"$DELAY_SERVICE_URL/$delayMillis")
          .flatMap(response => Ok(s"ReactiveCats: $response"))
      }
  }.orNotFound

  // trying to create server on fixed thread pool
  val serverPool: ExecutorService = Executors.newFixedThreadPool(64)
  val serverExecutor: ExecutionContextExecutor = ExecutionContext.fromExecutor(serverPool)

  // start server
  override def run(args: List[String]): IO[ExitCode] =
    BlazeServerBuilder[IO](serverExecutor)
      .bindHttp(port = PORT, host = "localhost")
      .withHttpApp(httpApp)
      .serve
      .compile
      .drain
      .as(ExitCode.Success)
}

完整的代码和负载测试  

但是负载测试结果看起来像是一个请求一个线程: 在此处输入图像描述

如何为我的 blaze 客户端限制线程数?

4

2 回答 2

3

您的代码有两个明显的问题:

  1. 您正在创建一个 Executor 而不在完成后将其关闭。
  2. 您在 HTTP 路由中使用 Resource 的use方法httpClient,这意味着每次调用该路由时,都会创建、使用和销毁 http 客户端。相反,您应该在启动期间创建一次。

执行器,像任何其他资源(例如文件句柄等)一样,应该始终使用Resource.make如下方式分配:

  val clientPool: Resource[IO, ExecutorService] = Resource.make(IO(Executors.newFixedThreadPool(64)))(ex => IO(ex.shutdown()))
  val clientExecutor: Resource[IO, ExecutionContextExecutor] = clientPool.map(ExecutionContext.fromExecutor)

  private val httpClient = clientExecutor.flatMap(ex => BlazeClientBuilder[IO](ex).resource)

第二个问题可以通过在构建 HTTP 应用程序之前分配 httpClient 来轻松解决:

  private def httpApp(client: Client[IO]): Kleisli[IO, Request[IO], Response[IO]] = HttpRoutes.of[IO] {
    case GET -> Root / delayMillis =>
      client
        .expect[String](s"$DELAY_SERVICE_URL/$delayMillis")
        .flatMap(response => Ok(s"ReactiveCats: $response"))
  }.orNotFound

…

  override def run(args: List[String]): IO[ExitCode] =
    httpClient.use { client =>
      BlazeServerBuilder[IO](serverExecutor)
        .bindHttp(port = PORT, host = "localhost")
        .withHttpApp(httpApp(client))
        .serve
        .compile
        .drain
        .as(ExitCode.Success)
    }

另一个潜在的问题是您正在使用IOApp,它带有自己的线程池。解决这个问题的最好方法可能是混合IOApp.WithContext特征并实现这个方法:

  override protected def executionContextResource: Resource[SyncIO, ExecutionContext] = ???
于 2020-06-30T16:51:09.190 回答
0

从我的评论中复制。

为 Blaze 客户端正确设置了性能问题的答案 - 对我来说,这是 .withMaxWaitQueueLimit(1024) 参数。

于 2021-10-08T10:28:26.373 回答