2

我正在尝试将一些阻塞调用包装在Future. 返回类型是Seq[User]where Useris a case class。以下内容无法与存在各种重载版本的投诉一起编译。有什么建议么?我尝试了几乎所有的变化都Source.apply没有任何运气。

// All I want is Seq[User] => Future[Seq[User]]

def findByFirstName(firstName: String) = {
  val users: Seq[User] = userRepository.findByFirstName(firstName)

  val sink = Sink.fold[User, User](null)((_, elem) => elem)

  val src = Source(users) // doesn't compile

  src.runWith(sink)
}
4

2 回答 2

6

首先,我假设您使用的是 1.0 版,akka-http-experimental因为该 API 可能与之前的版本有所不同。

您的代码无法编译的原因是akka.stream.scaladsl.Source$.apply()requires scala.collection.immutable.Seq而不是scala.collection.mutable.Seq.

因此,您必须使用方法将可变序列转换为不可变序列to[T]

文档:akka.stream.scaladsl.Source

此外,正如您看到的文档,Source$.apply()接受,()=>Iterator[T]因此您也可以()=>users.iterator作为参数传递。

由于Sink.fold(...)返回最后一个计算的表达式,你可以给一个空作为第一个参数,通过将元素附加到序列来Seq()迭代,最后得到结果。users

但是,可能有一个更好的解决方案可以创建一个Sink将每个评估的表达式放入Seq中,但我找不到它。

以下代码有效。

import akka.actor._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Source,Sink}
import scala.concurrent.ExecutionContext.Implicits.global

case class User(name:String)

object Main extends App{
    implicit val system = ActorSystem("MyActorSystem")
    implicit val materializer = ActorMaterializer()
    val users = Seq(User("alice"),User("bob"),User("charlie"))

    val sink = Sink.fold[Seq[User], User](Seq())(
      (seq, elem) => 
        {println(s"elem => ${elem} \t| seq => ${seq}");seq:+elem})

    val src = Source(users.to[scala.collection.immutable.Seq])
    // val src = Source(()=>users.iterator) // this also works

    val fut = src.runWith(sink) // Future[Seq[User]]
    fut.onSuccess({
      case x=>{
        println(s"result => ${x}")
      }
    })
}

上面代码的输出是

elem => User(alice)     | seq => List()
elem => User(bob)       | seq => List(User(alice))
elem => User(charlie)   | seq => List(User(alice), User(bob))
result => List(User(alice), User(bob), User(charlie))
于 2015-09-10T10:03:03.547 回答
3

如果您只需要 Future[Seq[Users]] 不要使用 akka 流,而是使用 期货

import scala.concurrent._
import ExecutionContext.Implicits.global
val session = socialNetwork.createSessionFor("user", credentials)
val f: Future[List[Friend]] = Future {
  session.getFriends()
}
于 2015-09-10T07:31:14.763 回答