我目前正在使用 Monix 后端学习和玩 STTP。在处理完所有请求(每个请求都是一个任务)后,我主要坚持关闭后端。
我创建了类似于我的问题的示例/模拟代码(据我了解,我的问题更普遍,而不是特定于我的代码):
import sttp.client.asynchttpclient.monix._
import monix.eval.Task
import monix.reactive.Observable
import sttp.client.{Response, UriContext}
import scala.concurrent.duration.DurationInt
object ObservableTest extends App {
val activities = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
val ids: Task[List[Int]] = Task { (1 to 3).toList }
val f: String => Task[Response[Either[String, String]]] = (i: String) => fetch(uri"$i", "")
val data: Task[List[Task[Response[Either[String, String]]]]] = ids map (_ map (_ => f("https://heloooo.free.beeceptor.com/my/api/path")))
data.guarantee(backend.close()) // If I close the backend here, I can' generate requests after (when processing the actual requests in the list)
// I have attempted to return a Task containing a tuple of (data, backend) but closing the backend from outside of the scope did not work as I expected
}
import monix.execution.Scheduler.Implicits.global
val obs = Observable
.fromTask(activities)
.flatMap { listOfFetches =>
Observable.fromIterable(listOfFetches)
}
.throttle(3 second, 1)
.map(_.runToFuture)
obs.subscribe()
}
我的 fetch (api call maker) 函数看起来像:
def fetch(uri: Uri, auth: String)(implicit
backend: SttpBackend[Task, Observable[ByteBuffer], WebSocketHandler]
) = {
println(uri)
val task = basicRequest
.get(uri)
.header("accept", "application/json")
.header("Authorization", auth)
.response(asString)
.send()
task
}
由于我的主要任务包含稍后需要处理的其他任务,因此我需要找到一种替代方法来从外部关闭 Monix 后端。在我消费请求后,有没有一种干净的方法来关闭后端List[Task[Response[Either[String, String]]]]
?