2

所以我有 2 个集合,假设它的集合和其他集合,我想将来自这个其他集合的数据插入到一个全新的集合中,其中另一个集合包含可以处理为新形式的数据的数据,其中将是存储到这个新集合中。这个过程将一次性完成。问题是 Scala MongoDB 驱动程序的异步行为有点问题。我希望这个新集合能够自动递增,因为在将 ID 插入数据库后我需要它。

所以显而易见的步骤是:

  1. 订阅Other Collection findAll Observable,获取发射的数据
  2. 计算新集合上的数据以找出数量,递增一,您将获得发出数据的最后一个 id
  3. 处理数据(映射、变异等)
  4. 将新数据插入新集合

现在的问题是,在第 2 步和第 4 步之间,在插入数据之前会有一些延迟,因为它有时会抛出一个异常,说在向 MongoDB 插入数据时重复 ID。我无论如何都没有找到阻止文档。

我应该如何阻止 observable 以便在其他线程完成后启动进程?

还是有更好的方法来做我想做的事?我愿意接受建议

也欢迎任何参考阅读,谢谢

4

1 回答 1

0

编辑:在进行了更多搜索之后,我发现了一种更类似于 Scala 的方法来解决这个问题:

import scala.concurrent.Await
import scala.concurrent.duration.Duration
import org.mongodb.scala.{Observable,Completed}

...

val query: Observable[Completed] = collection.insertOne(food)
val result = Await.result(query.toFuture, Duration.Inf)

这正是它看起来的样子:运行查询,等待它完成,然后返回结果。


我写了这段代码来解决一个非常相似的问题。有点破解,但它的工作原理。

import java.util.concurrent.Semaphore

...

val sem: Semaphore = new Semaphore(1)                                                                                                
sem.acquire()
coll.find().subscribe(new Observer[T] {
      override def onNext(result: T) = processResult(result)
      override def onError(e: Throwable) = {
         sem.release()
         throw e
      }
      override def onComplete() = sem.release()
   })
sem.acquire()

获取信号量将其设置为零,然后导致下一次获取阻塞。然后,该线程将休眠,直到观察者完成或收到错误,在功能上实现与订阅已阻塞相同的结果。

于 2018-08-15T04:49:07.507 回答