这是可能的,但需要注意。
如果您接受具有无限数量的键的 a 以及每个关联的无限数量的s,那么执行此操作相对简单。Map
Queue
我们在生产中使用了基于github 用户 kiambogo 的要点的代码(尽管我们的代码已经过调整),并且运行良好:
import fs2.concurrent.Queue
import cats.implicits._
import cats.effect.Concurrent
import cats.effect.concurrent.Ref
def groupBy[F[_], A, K](selector: A => F[K])(implicit F: Concurrent[F]): Pipe[F, A, (K, Stream[F, A])] = {
in =>
Stream.eval(Ref.of[F, Map[K, Queue[F, Option[A]]]](Map.empty)).flatMap { st =>
val cleanup = {
import alleycats.std.all._
st.get.flatMap(_.traverse_(_.enqueue1(None)))
}
(in ++ Stream.eval_(cleanup))
.evalMap { el =>
(selector(el), st.get).mapN { (key, queues) =>
queues.get(key).fold {
for {
newQ <- Queue.unbounded[F, Option[A]] // Create a new queue
_ <- st.modify(x => (x + (key -> newQ), x)) // Update the ref of queues
_ <- newQ.enqueue1(el.some)
} yield (key -> newQ.dequeue.unNoneTerminate).some
}(_.enqueue1(el.some) as None)
}.flatten
}.unNone.onFinalize(cleanup)
}
}
如果我们假设每个 Map 条目的开销为 64 字节(我认为这被高估了),那么 100,000 个唯一键的基数给我们大约 6.1MiB -在 jvm 进程的合理大小范围内。