我是 alpakka 的新手。我在 Alpakka 中使用以下代码使用 MongoDB 连接器来获取和循环 100K 记录
// Using Stream
def getAllContacts(user_id: Int, list_id: Int): Source[ListContact, NotUsed] = {
MongoSource[ListContact](mongoDB.getCollection(getCollectionName(user_id, list_id)).find())
}
// Mongo Observable
def getAllContacts2(user_id: Int, list_id: Int): FindObservable[ListContact] = {
mongoDB.getCollection(getCollectionName(user_id, list_id)).find()
}
执行:
def validateUserList2(user_id: Int, list_id: Int, prefix: String, filter: Option[String]): Future[Seq[PhoneNumber]] = {
val prefixTrim = prefix.trim
val listContact = new ListContactRepository
listContact.getAllContacts2(user_id, list_id).map{ line =>
validateNumber(line.phone, prefixTrim)
}.toFuture()
}
def validateUserList(user_id: Int, list_id: Int, prefix: String, filter: Option[String]): Future[immutable.Seq[PhoneNumber]] = {
val prefixTrim = prefix.trim
val listContact = new ListContactRepository
listContact.getAllContacts(user_id, list_id).runWith(Sink.seq).map(lines =>
lines map { line =>
validateNumber(line.phone, prefixTrim)
}
)
}
和路线:
// Taking around 2.5 seconds to fetch 100K data
private def mapUserListNetwork: Route = {
get {
path("validate" / Segment / Segment / Segment) { (userId, listId, prefix) =>
parameters('filter.?) { filter =>
complete((phoneValidationActor ? ValidateUserList(userId.toInt, listId.toInt, prefix.toUpperCase, filter)).mapTo[Seq[PhoneNumber]])
}
}
}
}
// Taking around 10 seconds to fetch 100K data
private def mapUserListNetwork2: Route = {
get {
path("validate2" / Segment / Segment / Segment) { (userId, listId, prefix) =>
parameters('filter.?) { filter =>
complete(PhoneNumberActor.validateUserList2(userId.toInt, listId.toInt, prefix, filter))
}
}
}
}
我想在没有任何内存不足问题/堆空间的情况下将 mongodb 集合流式传输到 akka-http。请提出更好的方法。