0

我是 alpakka 的新手。我在 Alpakka 中使用以下代码使用 MongoDB 连接器来获取和循环 100K 记录

// Using Stream
def getAllContacts(user_id: Int, list_id: Int): Source[ListContact, NotUsed] = {
        MongoSource[ListContact](mongoDB.getCollection(getCollectionName(user_id, list_id)).find())
    }
// Mongo Observable
    def getAllContacts2(user_id: Int, list_id: Int): FindObservable[ListContact] = {
        mongoDB.getCollection(getCollectionName(user_id, list_id)).find()
    }

执行:

def validateUserList2(user_id: Int, list_id: Int, prefix: String, filter: Option[String]): Future[Seq[PhoneNumber]] = {
    val prefixTrim = prefix.trim
    val listContact = new ListContactRepository
    listContact.getAllContacts2(user_id, list_id).map{ line =>
        validateNumber(line.phone, prefixTrim)
    }.toFuture()
}

def validateUserList(user_id: Int, list_id: Int, prefix: String, filter: Option[String]): Future[immutable.Seq[PhoneNumber]] = {
    val prefixTrim = prefix.trim
    val listContact = new ListContactRepository
    listContact.getAllContacts(user_id, list_id).runWith(Sink.seq).map(lines =>
        lines map { line =>
            validateNumber(line.phone, prefixTrim)
        }
    )
}

和路线:

// Taking around 2.5 seconds to fetch 100K data
private def mapUserListNetwork: Route = {
    get {
        path("validate" / Segment / Segment / Segment) { (userId, listId, prefix) =>
            parameters('filter.?) { filter =>
                complete((phoneValidationActor ? ValidateUserList(userId.toInt, listId.toInt, prefix.toUpperCase, filter)).mapTo[Seq[PhoneNumber]])
            }
        }
    }
}
// Taking around 10 seconds to fetch 100K data
private def mapUserListNetwork2: Route = {
    get {
        path("validate2" / Segment / Segment / Segment) { (userId, listId, prefix) =>
            parameters('filter.?) { filter =>
                complete(PhoneNumberActor.validateUserList2(userId.toInt, listId.toInt, prefix, filter))
            }
        }
    }
}

我想在没有任何内存不足问题/堆空间的情况下将 mongodb 集合流式传输到 akka-http。请提出更好的方法。

4

1 回答 1

0

在实现validateUserList而不是运行 ( .runWith(Sink.seq)) 源时,从该方法返回它。您仍然可以map在源上应用该操作。

然后completeSource. Akka Http 可以使用源完成请求,在这种情况下,它将负责物化并将结果作为分块响应流式传输到客户端。

于 2019-01-02T12:36:48.827 回答