1

我目前正在使用 Monix 后端学习和玩 STTP。在处理完所有请求(每个请求都是一个任务)后,我主要坚持关闭后端。

我创建了类似于我的问题的示例/模拟代码(据我了解,我的问题更普遍,而不是特定于我的代码):

import sttp.client.asynchttpclient.monix._
import monix.eval.Task
import monix.reactive.Observable
import sttp.client.{Response, UriContext}

import scala.concurrent.duration.DurationInt

object ObservableTest extends App {

  val activities = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
    val ids: Task[List[Int]] = Task { (1 to 3).toList }
    val f: String => Task[Response[Either[String, String]]] = (i: String) => fetch(uri"$i", "")
    val data: Task[List[Task[Response[Either[String, String]]]]] = ids map (_ map (_ => f("https://heloooo.free.beeceptor.com/my/api/path")))
    data.guarantee(backend.close()) // If I close the backend here, I can' generate requests after (when processing the actual requests in the list)
    // I have attempted to return a Task containing a tuple of (data, backend) but closing the backend from outside of the scope did not work as I expected
  }
  import monix.execution.Scheduler.Implicits.global
  val obs = Observable
    .fromTask(activities)
    .flatMap { listOfFetches =>
      Observable.fromIterable(listOfFetches)
    }
    .throttle(3 second, 1)
    .map(_.runToFuture)

  obs.subscribe()
}

我的 fetch (api call maker) 函数看起来像:

  def fetch(uri: Uri, auth: String)(implicit
      backend: SttpBackend[Task, Observable[ByteBuffer], WebSocketHandler]
  ) = {
    println(uri)
    val task = basicRequest
      .get(uri)
      .header("accept", "application/json")
      .header("Authorization", auth)
      .response(asString)
      .send()

    task
  }

由于我的主要任务包含稍后需要处理的其他任务,因此我需要找到一种替代方法来从外部关闭 Monix 后端。在我消费请求后,有没有一种干净的方法来关闭后端List[Task[Response[Either[String, String]]]]

4

1 回答 1

1

问题来自这样一个事实,即在打开 sttp 后端的情况下,您正在计算要执行的任务列表 - List[Task[Response[Either[String, String]]]],但您没有运行它们。因此,我们需要在后端关闭之前按顺序运行这些任务。

这里要做的关键是创建一个任务的单一描述,在后端仍然打开时运行所有这些请求。

一旦你计算data(它本身是一个任务 - 计算的描述 - 运行时会产生一个任务列表 - 也是计算的描述),我们需要将其转换为单个非嵌套的Task. 这可以通过多种方式完成(例如使用简单的排序),但在您的情况下,这将使用Observable

AsyncHttpClientMonixBackend().flatMap { implicit backend =>
  val ids: Task[List[Int]] = Task { (1 to 3).toList }
  val f: String => Task[Response[Either[String, String]]] = (i: String) => fetch(uri"$i", "")
  val data: Task[List[Task[Response[Either[String, String]]]]] =
    ids map (_ map (_ => f("https://heloooo.free.beeceptor.com/my/api/path")))

  val activities = Observable
    .fromTask(data)
    .flatMap { listOfFetches =>
      Observable.fromIterable(listOfFetches)
    }
    .throttle(3 second, 1)
    .mapEval(identity)
    .completedL

  activities.guarantee(
    backend.close()
  )
}

首先要注意的Observable.fromTask(...)是在最外层里面flatMap,所以是在后端仍然打开的时候创建的。我们创建 observable,对其进行限制等,然后是关键事实:一旦我们有了限制流,我们Task[...]使用mapEval. 我们得到一个流Either[String, String],它是请求的结果。

最后,我们将流转换为Taskusing .completedL(丢弃结果),等待整个流完成。

然后通过关闭后端对最终任务进行排序。如上所述,最终会发生的副作用是:

  1. 打开后端
  2. 创建任务列表 ( data)
  3. 创建一个流,它限制由计算的列表中的元素data
  4. 评估流中的每个项目(发送请求)
  5. 关闭后端
于 2020-08-11T07:42:47.833 回答