4

I have a jobs_queue collection in MongoDB. It's a capped collection which I'm polling using a tailable cursor:

val cur =
  jobsQueue
    .find(Json.obj("done" -> Json.obj("$ne" -> true)))
    .options(QueryOpts().tailable.awaitData)
    .cursor[JsObject]

cur.enumerate() |>>> Iteratee.foreach { queuedDoc =>
  // do some processing and store the results back in the DB
}

This is being called from a regular Scala App, so there's no Akka or Play wrapping at all.

What would be the most appropriate way to make sure the App doesn't exit until I explicitly break out of the Iteratee.foreach? Also, I don't have to use play-iteratees at all if there's a simpler (even if slightly less elegant) way.


P.S. I do ensure the collection is capped:

val jobsQueueMaybe = db.collection[JSONCollection]("jobs_queue")
val jobsQueue: JSONCollection =
  jobsQueueMaybe.stats()
    .flatMap {
      case stats if !stats.capped =>
        jobsQueueMaybe.convertToCapped(size = 1024 * 1024, maxDocuments = None)
      case _ =>
        Future(jobsQueueMaybe)
    }
    .recover { case _ => jobsQueueMaybe.createCapped(size = 1024 * 1024, maxDocuments = None) }
    .map { _ => jobsQueueMaybe }

P.P.S.

I will also appreciate any criticism as to how I've designed this bit of logic, and how I could solve this by rethinking my approach and slightly overhauling the implementation.

4

1 回答 1

1

作为当前的解决方法,我从 更改Iteratee.foreachIteratee.foldM以便每次迭代都返回一个 Future;这样看来,它似乎迫使 ReactiveMongo 继续计算直到被中断,而不是foreach似乎过早退出:

cur.enumerate() |>>> Iteratee.foldM(()) { (acc, queuedDoc) =>
  // always yield something like Future.successful(acc) or an actual `Future[Unit]`
}

然后,我只需要等到整个程序终止(这是由放入 a 的东西发出的信号stopSignal: ConcurrentLinkedQueue

while (stopSignal.isEmpty) Thread.sleep(1000)

但是,虽然它运作良好,但我并不特别喜欢这种解决方案。

也许我的担心是没有道理的,但我真的想要一个更权威的答案来说明我应该如何解决这个问题。

于 2015-01-18T13:41:37.237 回答