我曾经fs2.concurrent.Queue通过队列尝试生产者/消费者。
val program = fs2.Stream.eval(Queue.bounded[IO, Option[Int]](maxSlots)).flatMap {
q =>
val p = customers[IO](10.millis, 100.millis, openingSeconds).evalMap(writeToQueue[IO](q))
val c = q.dequeue.unNoneTerminate.parEvalMap(maxBarbers)(barbers[IO](300.millis, 100.millis))
c concurrently p
}
我现在使用的解决方案是p停止生产Some(id),其中 id 是客户的 id,openingSeconds然后更改为 Stream ofNone以终止cie q.dequeue.unNoneTerminate。
我可以使用单个None而不是重复的流None来终止c吗?
我遇到的问题None是当队列q已满时,单None将不会插入队列,因为writeToQueue当队列已满时不会接受新元素。p并c以不同的速度生产和消费。谢谢