2

在我的应用程序中,我有多达 N 个并行工作的消费者和一个生产者。消费者从生产者那里获取资源,完成他们的工作,将结果附加到 anupdateQueue并请求更多资源。Producer 最初有一些可用资源,并且可以通过应用来自updateQueue. 在将新资源发送给消费者之前应用所有可用更新非常重要。我尝试使用以下生成器,每当消费者提出请求时“批量”请求更新,并在 a 中留出新资源(消费者不需要,但稍后可能会被其他消费者请求)ticketQueue

      def updatesOrFresh: Process[Task, Seq[OptimizerResult] \/ Unit] =
        Process.await(updateQueue.size.continuous.take(1).runLast) {
          case Some(size) =>
            println(s"size: $size")
            if (size == 0)
              wye(updateQueue.dequeueAvailable, ticketQueue.dequeue)(wye.either)
            else
              updateQueue.dequeueAvailable.map(_.left[Unit])
        }.take(1) ++ Process.suspend(updatesOrFresh)

它不起作用 - 最初可用的资源是从 发出的ticketQueue.dequeue,然后它似乎在wye, 日志记录中阻塞:

size: 0
<<got ticket>>
size: 0
<<got ticket>>
size: 0   // it appears the updateQueue did not receive the consumer output yet, but I can live with that, it should grab an update from the wye anyway
<<blocks>>

当最初在ticketQueue. 但是,如果我将其更改为

 val updatesOrFresh = wye(updateQueue.dequeueAvailable, ticketQueue.dequeue)(wye.either)

它按预期工作(尽管没有“在发出新资源之前应用更新”保证)。如何确保在正确的时间应用更新?


编辑:我已经使用以下代码解决了它:

  val updatesOrFresh: Process[Task, Seq[OptimizerResult] \/ Unit] =
    Process.repeatEval {
      for {
        sizeOpt <- updateQueue.size.continuous.take(1).runLast
        nextOpt <-
          if (sizeOpt.getOrElse(???) == 0)
            wye(updateQueue.dequeueAvailable, ticketQueue.dequeue)(wye.either).take(1).runLast
          else
            updateQueue.dequeueAvailable.map(_.left[Unit]).take(1).runLast
      } yield nextOpt.getOrElse(???)
    }

然而,为什么原版def不起作用的问题仍然存在......

4

0 回答 0