0

I am currently trying to use Monix for throttling api get requests. I have tried using STTP's Monix backend, and it worked fine until couldn't shut down the Monix backend after I was done... As this seems more like an sttp issue than a Monix one, I tried to re-approach the problem by using sttp's default backend, while still using Monix to throttle.

I am mainly struggling with closing the monix backend once I am done with consuming the observable

I have tried to simplify the problem through:

  import monix.execution.Scheduler.Implicits.global

  val someIter = List(Task(1), Task(2))

  val obs: Observable[CancelableFuture[Int]] = Observable
    .fromIterable(someIter)
    .throttle(3.second, 1)
    .map(_.runToFuture)

However, I am still not sure how to turn off the program after the Observable is consumed, as it terminates prematurely here (unlike the monix backend case)...

In other words, how can I block terminating program until the Observable iterable is complete?

4

2 回答 2

3

You can create Promise, complete it when Observable is completed by .doOnComplete

And await it in the main thread.

import monix.execution.Scheduler.Implicits.global

val someIter = List(Task(1), Task(2))
val promise = Promise()
val obs: Observable[CancelableFuture[Int]] = Observable.fromIterable(someIter).throttle(3.second, 1)
  .map(_.runToFuture)
  .doOnComplete(Task { promise.complete(Success()) })

Await.ready(promise.future, Duration.Inf)
于 2020-08-10T20:16:05.720 回答
1

In addition to the accepted answer by Artem, and with insights from the Monix Gitter community, another potential implementation could be:

  val someIter = List(Task(1), Task(2))
  val obs =
    Observable
      .fromIterable(someIter)
      .throttle(1 second, 10)
      .mapParallelUnordered(10)(x => x.map(x => x.send().body)) // Here we send requests
      .sumL // Sum just as an example 
      .runSyncUnsafe()
于 2020-08-12T19:42:56.630 回答