0

我是 MongoDB 和 Scala 语言的新手

我正在使用scala语言在本地连接mongodb

我正在使用以下依赖项

// https://mvnrepository.com/artifact/org.mongodb.scala/mongo-scala-driver

libraryDependencies += "org.mongodb.scala" %% "mongo-scala-driver" % "4.2.3"

我试过的

object Demo extends App {
  
  val mongoClient: MongoClient = MongoClient("mongodb://127.0.0.1:27017/")
  val database: MongoDatabase = mongoClient.getDatabase("DemoDB")
  println(database)

  val collection: MongoCollection[Document] =database.getCollection("demodata");
  val observable = collection.find();
}

上面的代码以以下格式返回数据

FindObservable(com.mongodb.reactivestreams.client.internal.FindPublisherImpl@6253c26)

我也试过

observable.subscribe ( new Observer[Document] {
  override def onNext(result: Document): Unit = println(result.toJson())
  override def onError(e: Throwable): Unit = println("Failed" + e.getMessage)
  override def onComplete(): Unit = println("Completed")
})

我也尝试过printResult()方法printHeadResult(),但没有一种方法有效

请帮助提前谢谢

4

2 回答 2

3

Mongo Scala 驱动程序以非阻塞方式工作,返回Observables需要Subsribed打开的内容以使用已发布的数据。

当您订阅observable以下类似内容时,

object Demo extends App {
  
  val mongoClient: MongoClient = MongoClient("mongodb://127.0.0.1:27017/")
  val database: MongoDatabase = mongoClient.getDatabase("DemoDB")
  println(database)

  val collection: MongoCollection[Document] = database.getCollection("demodata")
  val observable = collection.find()

  observable.subscribe ( new Observer[Document] {
    override def onNext(result: Document): Unit = println(result.toJson())
    override def onError(e: Throwable): Unit = println("Failed" + e.getMessage)
    override def onComplete(): Unit = println("Completed")
  })
}

您的代码不会等待observable实际发布任何内容,它会在订阅后立即完成。因此你什么也得不到。

您可以Thread.sleep(5000)在末尾添加一个类似 a 的东西来阻止并给obeservable一些时间(希望完成并)发布数据。

或者,您可以添加val resultSeq = observable.collect以阻止和收集单个序列中的所有已发布数据。

于 2021-05-26T08:23:56.767 回答
0

我找到了这个链接

它适用于 printResult() 和 printHeadResult() 方法

使用 mongo-scala-driver 在 Scala 中打印来自 Mongodb 的查询结果

于 2021-05-29T08:26:58.947 回答