在我的应用程序中,我有多达 N 个并行工作的消费者和一个生产者。消费者从生产者那里获取资源,完成他们的工作,将结果附加到 anupdateQueue
并请求更多资源。Producer 最初有一些可用资源,并且可以通过应用来自updateQueue
. 在将新资源发送给消费者之前应用所有可用更新非常重要。我尝试使用以下生成器,每当消费者提出请求时“批量”请求更新,并在 a 中留出新资源(消费者不需要,但稍后可能会被其他消费者请求)ticketQueue
:
def updatesOrFresh: Process[Task, Seq[OptimizerResult] \/ Unit] =
Process.await(updateQueue.size.continuous.take(1).runLast) {
case Some(size) =>
println(s"size: $size")
if (size == 0)
wye(updateQueue.dequeueAvailable, ticketQueue.dequeue)(wye.either)
else
updateQueue.dequeueAvailable.map(_.left[Unit])
}.take(1) ++ Process.suspend(updatesOrFresh)
它不起作用 - 最初可用的资源是从 发出的ticketQueue.dequeue
,然后它似乎在wye
, 日志记录中阻塞:
size: 0
<<got ticket>>
size: 0
<<got ticket>>
size: 0 // it appears the updateQueue did not receive the consumer output yet, but I can live with that, it should grab an update from the wye anyway
<<blocks>>
当最初在ticketQueue
. 但是,如果我将其更改为
val updatesOrFresh = wye(updateQueue.dequeueAvailable, ticketQueue.dequeue)(wye.either)
它按预期工作(尽管没有“在发出新资源之前应用更新”保证)。如何确保在正确的时间应用更新?
编辑:我已经使用以下代码解决了它:
val updatesOrFresh: Process[Task, Seq[OptimizerResult] \/ Unit] =
Process.repeatEval {
for {
sizeOpt <- updateQueue.size.continuous.take(1).runLast
nextOpt <-
if (sizeOpt.getOrElse(???) == 0)
wye(updateQueue.dequeueAvailable, ticketQueue.dequeue)(wye.either).take(1).runLast
else
updateQueue.dequeueAvailable.map(_.left[Unit]).take(1).runLast
} yield nextOpt.getOrElse(???)
}
然而,为什么原版def
不起作用的问题仍然存在......