我曾经fs2.concurrent.Queue
通过队列尝试生产者/消费者。
val program = fs2.Stream.eval(Queue.bounded[IO, Option[Int]](maxSlots)).flatMap {
q =>
val p = customers[IO](10.millis, 100.millis, openingSeconds).evalMap(writeToQueue[IO](q))
val c = q.dequeue.unNoneTerminate.parEvalMap(maxBarbers)(barbers[IO](300.millis, 100.millis))
c concurrently p
}
我现在使用的解决方案是p
停止生产Some(id)
,其中 id 是客户的 id,openingSeconds
然后更改为 Stream ofNone
以终止c
ie q.dequeue.unNoneTerminate
。
我可以使用单个None
而不是重复的流None
来终止c
吗?
我遇到的问题None
是当队列q
已满时,单None
将不会插入队列,因为writeToQueue
当队列已满时不会接受新元素。p
并c
以不同的速度生产和消费。谢谢