2

我目前正在努力实现对 API 的客户端 http 请求,并决定探索 sttp 和 monix 来完成这项任务。由于我是 Monix 的新手,我仍然不确定如何运行任务并检索它们的结果。我的目标是获得一系列 http 请求结果,我可以并行调用 -> 解析 -> 加载。

以下是我迄今为止尝试过的一个片段:

import sttp.client._
import sttp.client.asynchttpclient.monix._
import monix.eval.Task

object SO extends App {

  val postTask = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
    val r1 = basicRequest.get(uri"https://hello.world.io/v1/bla")
      .header("accept", "application/json")
      .response(asString)
      .body()
      .send()

    val tasks = Seq(r1).map(i => Task(i))
    Task.parSequenceUnordered(tasks).guarantee(backend.close())
  }
  
  import monix.execution.Scheduler.Implicits.global

  postTask.runToFuture.foreach(println) // prints: List(Task.FlatMap$2052527361)
}

我的困惑相当简单(我猜)。如何运行Task.parSequenceUnordered我创建的任务,并处理(解析 http 结果)序列中的任务?

很高兴:出于好奇,在处理请求的任务序列时是否可以天真地引入速率限制/节流?我并不是真的在寻找构建复杂的东西。它可以像间隔批量请求一样简单。想知道 Monix 是否已经为此提供了帮助。

4

1 回答 1

2

感谢Oleg Pyzhcovmonix gitter 社区帮助我解决这个问题。

在这里引用奥列格:

由于您已经在使用支持 monix 的后端,因此 r1 的类型是Task[Response[Either[String,String]]]. 所以当你在做的时候 Seq(r1).map(i => Task(i)),你把它变成一个不做任何事情的任务序列,除了给你其他给你结果的任务(类型是Seq[Task[Task[Response[...]]]])。然后,您的代码会并行化外层任务,即给予任务的任务,并且您会得到开始时使用的任务作为结果。您只需要处理 Seq(r1) 即可并行运行请求。

如果您使用的是 Intellij,则可以按Alt + =查看选择的类型 - 如果您不能仅从代码中分辨出类型,这会有所帮助(但随着经验的积累会变得更好)。

至于速率限制,我们有 parSequenceN 可让您设置并行度限制。请注意,无序仅意味着您以结果在输出中的随机顺序为代价获得轻微的性能优势,无论如何它们都是非确定性地执行的。

我最终得到了一个看起来像这样的(简化的)实现:

import sttp.client._
import sttp.client.asynchttpclient.monix._
import monix.eval.Task

object SO extends App {

  val postTask = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
    val r1 = basicRequest.get(uri"https://hello.world.io/v1/bla")
      .header("accept", "application/json")
      .response(asString)
      .body()
      .send()

    val items = Seq(r1.map(x => x.body))
    Task.parSequenceN(1)(items).guarantee(backend.close())
  }
  
  import monix.execution.Scheduler.Implicits.global

   postTask.runToFuture.foreach(println)
}
于 2020-08-05T13:33:36.797 回答