3

免责声明:我是sttpMonix的新手,这是我尝试更多地了解这些库的尝试。我的目标是通过 HTTP GET 请求从给定 API 获取数据(客户端)-> 解析 JSON 响应-> 将此信息写入数据库。我的问题仅与第一部分有关。我的目标是以异步(希望是快速)的方式运行获取请求,同时有办法避免或处理速率限制。

以下是我已经尝试过的片段,似乎适用于单个请求:

package com.github.client

import io.circe.{Decoder, HCursor}
import sttp.client._
import sttp.client.circe._
import sttp.client.asynchttpclient.monix._
import monix.eval.Task

object SO extends App {

  case class Bla(paging: Int)

  implicit val dataDecoder: Decoder[Bla] = (hCursor: HCursor) => {
    for {
      next_page <- hCursor.downField("foo").downArray.downField("bar").as[Int]
    } yield Bla(next_page)
  }

  val postTask = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
    val r = basicRequest
      .get(uri"https://foo.bar.io/v1/baz")
      .header("accept", "application/json")
      .header("Authorization", "hushh!")
      .response(asJson[Bla])

    r.send() // How can I instead of operating on a single request, operate on multiple
      .flatMap { response =>
        Task(response.body)
      }
      .guarantee(backend.close())
  } 

  import monix.execution.Scheduler.Implicits.global

  postTask.runSyncUnsafe() match {
    case Left(error) => println(s"Error when executing request: $error")
    case Right(data) => println(data)
  }
}

我的问题:

  1. 如何通过使用 Monix 对多个 GET 请求(而不是单个请求)进行操作,同时保持代码异步和可组合
  2. 如何避免或处理 api 服务器施加的速率限制

附带说明一下,如果这将支持速率限制目标,我在使用另一个后端方面也很灵活。

4

1 回答 1

2

您可以像这样使用 monix.reactive.Observable

  Observable.repeatEval(postTask) // we generate infinite observable of the task
    .throttle(1.second, 3) // set throttling
    .mapParallelOrderedF(2)(_.runToFuture) // set execution parallelism and execute tasks
    .subscribe() // start the pipline
  
  
  while (true) {}
于 2020-08-05T21:26:35.537 回答