所以我有 2 个集合,假设它的新集合和其他集合,我想将来自这个其他集合的数据插入到一个全新的集合中,其中另一个集合包含可以处理为新形式的数据的数据,其中将是存储到这个新集合中。这个过程将一次性完成。问题是 Scala MongoDB 驱动程序的异步行为有点问题。我希望这个新集合能够自动递增,因为在将 ID 插入数据库后我需要它。
所以显而易见的步骤是:
- 订阅Other Collection findAll Observable,获取发射的数据
- 计算新集合上的数据以找出数量,递增一,您将获得发出数据的最后一个 id
- 处理数据(映射、变异等)
- 将新数据插入新集合
现在的问题是,在第 2 步和第 4 步之间,在插入数据之前会有一些延迟,因为它有时会抛出一个异常,说在向 MongoDB 插入数据时重复 ID。我无论如何都没有找到阻止文档。
我应该如何阻止 observable 以便在其他线程完成后启动进程?
还是有更好的方法来做我想做的事?我愿意接受建议
也欢迎任何参考阅读,谢谢