4

给定一个要被多次调用的函数A => IO[B](又名Kleisli[IO, A, B]),并且有副作用,比如更新数据库,如何将这样的多次调用委托给一个流(我猜Pipe[IO, A, B])(fs2,monix observable/iterant)?这样做的原因是能够在一个时间窗口内累积状态、批量调用等。

更具体地说,http4s 服务器需要一个Request => IO[Response],所以我正在研究如何对流进行操作(为了上述好处),但最终为 http4s 提供了这样的功能。

我怀疑它在幕后需要一些相关 ID,我对此很好,我对如何从 FP 的角度安全和正确地进行操作更感兴趣。

最终,我期望的签名可能是这样的:

Pipe[IO, A, B] => (A => IO[B]), 这样对 Kleisli 的调用就会通过管道传输。

作为事后的想法,是否有可能背压?

4

1 回答 1

1

一种想法是使用 MPSC(多发布者单消费者)对其进行建模。我将举一个 Monix 的例子,因为我更熟悉它,但即使你使用 FS2,这个想法也保持不变。

object MPSC extends App {

  sealed trait Event
  object Event {
    // You'll need a promise in order to send the response back to user
    case class SaveItem(num: Int, promise: Deferred[Task, Int]) extends Event
  }

  // For backpressure, take a look at `PublishSubject`.
  val cs = ConcurrentSubject[Event](MulticastStrategy.Publish)

  def pushEvent(num: Int) = {
    for {
      promise <- Deferred[Task, Int]
      _ <- Task.delay(cs.onNext(SaveItem(num, promise)))
    } yield promise
  }

  // You get a list of events now since it is buffered
  // Monix has a lot of buffer strategies, check the docs for more details
  def processEvents(items: Seq[Event]): Task[Unit] = {
    Task.delay(println(s"Items: $items")) >>
      Task.traverse(items) {
        case SaveItem(_, promise) => promise.complete(Random.nextInt(100))
      }.void
  }

  val app = for {
    // Start the stream in the background
    _ <- cs
      .bufferTimed(3.seconds) // Buffer all events within 3 seconds
      .filter(_.nonEmpty)
      .mapEval(processEvents)
      .completedL
      .startAndForget

    _ <- Task.sleep(1.second)
    p1 <- pushEvent(10)
    p2 <- pushEvent(20)
    p3 <- pushEvent(30)

    // Wait for the promise to complete, you'll do this for each request
    x <- p1.get
    y <- p2.get
    z <- p3.get

    _ <- Task.delay(println(s"Completed promise: [$x, $y, $z]"))
  } yield ()

  app.runSyncUnsafe()
}
于 2019-11-18T09:51:53.480 回答