我玩的是akka-stream-and-http-experimental
1.0。到目前为止,我有一个可以接受和响应 HTTP 请求的用户服务。我还将有一个可以管理约会的约会服务。为了进行约会,必须是现有用户。如果用户存在,约会服务将与用户服务进行检查。现在这显然可以通过 HTTP 完成,但我宁愿让约会服务向用户服务发送消息。作为新手,我不清楚如何使用演员(作为akka-http
抽象)来发送和接收消息。文档中提到了ActorRef
and ActorPublisher
,但没有前者的例子,后者看起来像我需要的过度杀伤力。我的代码如下所示,位于Github上:
trait UserReadResource extends ActorPlumbing {
val userService: UserService
val readRoute = {
// route stuff
}
}
trait ActorPlumbing {
implicit val system: ActorSystem
implicit def executor: ExecutionContextExecutor
implicit val materializer: Materializer
def config: Config
val logger: LoggingAdapter
}
trait UserService { // Implemented by Slick and MongoDB in the backend
def findByFirstName(firstName: String): Future[immutable.Seq[User]]
}
object UserApp extends App with UserReadResource with UserWriteResource with ActorPlumbing {
override implicit val system = ActorSystem()
override implicit def executor = system.dispatcher
override implicit val materializer = ActorMaterializer()
override def config = ConfigFactory.load()
override val logger = Logging(system, getClass)
private val collection = newCollection("users")
val userRepository = new MongoDBUserRepository(collection)
val userService: UserService = new MongoDBUserRepositoryAdapter(userRepository) with UserBusinessDelegate {
// implicitly finds the executor in scope. Ain't that cute?
override implicit def executor = implicitly
}
Http().bindAndHandle(readRoute ~ writeRoute, config.getString("http.interface"), config.getInt("http.port"))
}
编辑:我想出了如何发送消息,这可以使用Source.actorRef
. 那只会将消息发送到流中。我想做的是让路由处理程序类接收响应。这样,当我创建约会服务时,它的参与者可以调用用户服务参与者并以与我示例中的用户路由处理程序相同的方式接收响应。伪代码:
val src = Source.single(name) \\ How to send this to an actor and get the response
编辑 2:基于@yardena 的回答,我想出了以下内容,但最后一行没有编译。我的演员发布者返回 aFuture
我猜它会被包装在 a 中Promise
,然后作为 a 传递Future
给路由处理程序。
get {
parameters("firstName".?, "lastName".?).as(FindByNameRequest) { name =>
type FindResponse = Future[FindByNameResponse]
val src: Source[FindResponse, Unit] = Source.actorPublisher[FindResponse](businessDelegateProps).mapMaterializedValue {
_ ! name
}
val emptyResponse = Future.apply(FindByNameResponse(OK, Seq.empty))
val sink = Sink.fold(emptyResponse)((_, response: FindResponse) => response)
complete(src.runWith(sink)) // doesn't compile
}
}