2

假设我们有一个case class MyCaseClass(name: String, value: Int). 给定一个fs2.Stream[F, MyCaseClass]我想对相同的元素进行分组name

val sourceStream: fs2.Stream[F, MyCaseClass] = //
val groupedSameNameStream: fs2.Stream[F, fs2.Stream[F, MyCaseClass]] = //

我需要这样做的原因是我想应用有效的转换

val transform: MyCaseClass => F[Unit] = //

到流的所有元素,如果一组失败,另一组应该继续工作。

这样的事情可能吗?

4

1 回答 1

1

这是可能的,但需要注意。

如果您接受具有无限数量的键的 a 以及每个关联的无限数量的s,那么执行此操作相对简单。MapQueue

我们在生产中使用了基于github 用户 kiambogo 的要点的代码(尽管我们的代码已经过调整),并且运行良好:

import fs2.concurrent.Queue
import cats.implicits._
import cats.effect.Concurrent
import cats.effect.concurrent.Ref
 
def groupBy[F[_], A, K](selector: A => F[K])(implicit F: Concurrent[F]): Pipe[F, A, (K, Stream[F, A])] = {
  in =>
  Stream.eval(Ref.of[F, Map[K, Queue[F, Option[A]]]](Map.empty)).flatMap { st =>
    val cleanup = {
      import alleycats.std.all._
      st.get.flatMap(_.traverse_(_.enqueue1(None)))
    }

    (in ++ Stream.eval_(cleanup))
      .evalMap { el =>
        (selector(el), st.get).mapN { (key, queues) =>
          queues.get(key).fold {
            for {
              newQ <- Queue.unbounded[F, Option[A]] // Create a new queue
              _ <- st.modify(x => (x + (key -> newQ), x)) // Update the ref of queues
              _ <- newQ.enqueue1(el.some)
            } yield (key -> newQ.dequeue.unNoneTerminate).some
          }(_.enqueue1(el.some) as None)
        }.flatten
      }.unNone.onFinalize(cleanup)
  }
}

如果我们假设每个 Map 条目的开销为 64 字节(我认为这被高估),那么 100,000 个唯一键的基数给我们大约 6.1MiB -在 jvm 进程的合理大小范围内

于 2020-07-31T13:50:53.110 回答