6

我有以下代码:

val context = newFixedThreadPoolContext(nThreads = 10, name="myThreadPool")
val total = 1_000_000 //can be other number as well
val maxLimit = 1_000
return runBlocking {
  (0..total step maxLimit).map {
    async(context) {
      val offset = it
      val limit = it + maxLimit
      blockingHttpCall(offset, limit)
    }
  }.flatMap {
    it.await()
  }.associateBy {
    ...
  }.toMutableMap()
}

我希望阻塞 api 同时发生 10 个调用。但是,上面的代码似乎没有像我预期的那样做(我认为所有的调用都会立即开始),或者至少我不明白它是否这样做。
实施它的正确方法是什么?如果我使用改造的异步 api,相同的解决方案会起作用吗?

4

1 回答 1

2

我不完全了解您的情况,但最简单的方法 - 使用 OkHttp API 配置并发级别,例如,这是OkHttp 的默认并发策略

但是如果您将自己的Dispatcher实例设置为OkHttpClient.Builder

当然,你也可以使用协程

您当前的实现是不正确的,因为您为每个项目创建协程调度程序,但是要共享线程池,所有协程应该使用相同的调度程序,只需将newFixedThreadPoolContext创建移到循环之外(现在您有 1000 个调度程序,每个调度程序有 10 个线程)。

但是我不建议你使用协程+阻塞调用,最好配置OkHttp并发(更灵活)和使用非阻塞调用的协程(你可以编写自己的适配器或使用现有的库如kotlin-coroutines-retrofit) . 它将允许您混合您的 http 请求和 UI 代码或其他任务。

所以如果你使用非阻塞 API + OkHttp 内部并发,你不需要有特殊的代码来控制并发,当然你可以像上面的例子一样限制并发调用的数量(使用固定的调度器构造),但是我真的认为这没有多大意义,因为您可以降低并发级别,而不是提高并发级别。

迁移到非阻塞 API 后,您可以在任何协程调度程序中并行运行所有协程(甚至在 UI 线程中)并等待结果而不会阻塞。

此外,使用 OkHttpClient 配置隐式控制并发在架构方面看起来是一种更正确的方式(您可以拥有配置 Retrofit + OkHttp 的 DI 代码,并使用预配置的并发策略将其提供给您的客户端代码)。当然,您可以使用其他方法来实现这一点,但是这对我来说看起来更自然。

于 2017-09-25T03:24:08.540 回答