0

我目前正在使用 Akka Streams 和Alpakka MongoDB connector

是否可以指定类型MongoSource

val codecRegistry = fromRegistries(fromProviders(classOf[TodoMongo]), DEFAULT_CODEC_REGISTRY)
  private val todoCollection: MongoCollection[TodoMongo] = mongoDb
    .withCodecRegistry(codecRegistry)
    .getCollection("todo")

我想做这样的事情:

val t: FindObservable[Seq[TodoMongo]] = todoCollection.find()
MongoSource(t) // Stuck here

但我收到以下错误:

Expected Observable[scala.Document], Actual FindObservable[Seq[TodoMongo]].

我找不到关于这部分的正确文档。

4

1 回答 1

1

这还没有发布,但是在 Alpakka 的 master 分支中,MongoSource.apply采用了一个类型参数:

object MongoSource {
  def apply[T](query: Observable[T]): Source[T, NotUsed] =
    Source.fromPublisher(ObservableToPublisher(query))
}

因此,随着即将发布的 0.18 版本的 Alpakka,您将能够执行以下操作:

val source: Source[TodoMongo, NotUsed] = MongoSource[TodoMongo](todoCollection.find())

请注意,source这里假设todoCollection.find()返回一个Observable[TodoMongo]; 根据需要调整类型。

同时,您可以简单地手动添加上述代码。例如:

package akka.stream.alpakka.mongodb.scaladsl

import akka.NotUsed
import akka.stream.alpakka.mongodb.ObservableToPublisher
import akka.stream.scaladsl.Source
import org.mongodb.scala.Observable

object MyMongoSource {
  def apply[T](query: Observable[T]): Source[T, NotUsed] =
    Source.fromPublisher(ObservableToPublisher(query))
}

请注意,它MyMongoSource被定义为驻留在akka.stream.alpakka.mongodb.scaladsl包中(如MongoSource),因为ObservableToPublisher它是包私有类。您将以与使用MyMongoSource相同的方式使用MongoSource

val source: Source[TodoMongo, NotUsed] = MyMongoSource[TodoMongo](todoCollection.find()) 
于 2018-03-10T14:55:28.267 回答