2

我玩的是akka-stream-and-http-experimental1.0。到目前为止,我有一个可以接受和响应 HTTP 请求的用户服务。我还将有一个可以管理约会的约会服务。为了进行约会,必须是现有用户。如果用户存在,约会服务将与用户服务进行检查。现在这显然可以通过 HTTP 完成,但我宁愿让约会服务向用户服务发送消息。作为新手,我不清楚如何使用演员(作为akka-http抽象)来发送和接收消息。文档中提到了ActorRefand ActorPublisher,但没有前者的例子,后者看起来像我需要的过度杀伤力。我的代码如下所示,位于Github上:

trait UserReadResource extends ActorPlumbing {
  val userService: UserService
  val readRoute = {
  // route stuff
  }
}

trait ActorPlumbing {
  implicit val system: ActorSystem
  implicit def executor: ExecutionContextExecutor
  implicit val materializer: Materializer

  def config: Config
  val logger: LoggingAdapter
}

trait UserService { // Implemented by Slick and MongoDB in the backend
  def findByFirstName(firstName: String): Future[immutable.Seq[User]]
}

object UserApp extends App with UserReadResource with UserWriteResource with ActorPlumbing {
  override implicit val system = ActorSystem()
  override implicit def executor = system.dispatcher
  override implicit val materializer = ActorMaterializer()

  override def config = ConfigFactory.load()
  override val logger = Logging(system, getClass)

  private val collection = newCollection("users")
  val userRepository = new MongoDBUserRepository(collection)
  val userService: UserService = new MongoDBUserRepositoryAdapter(userRepository) with UserBusinessDelegate { 
    // implicitly finds the executor in scope. Ain't that cute?
    override implicit def executor = implicitly
  } 

  Http().bindAndHandle(readRoute ~ writeRoute, config.getString("http.interface"), config.getInt("http.port"))
}

编辑:我想出了如何发送消息,这可以使用Source.actorRef. 那只会将消息发送到流中。我想做的是让路由处理程序类接收响应。这样,当我创建约会服务时,它的参与者可以调用用户服务参与者并以与我示例中的用户路由处理程序相同的方式接收响应。伪代码:

val src = Source.single(name) \\ How to send this to an actor and get the response

编辑 2:基于@yardena 的回答,我想出了以下内容,但最后一行没有编译。我的演员发布者返回 aFuture我猜它会被包装在 a 中Promise,然后作为 a 传递Future给路由处理程序。

get {
  parameters("firstName".?, "lastName".?).as(FindByNameRequest) { name =>
    type FindResponse = Future[FindByNameResponse]

    val src: Source[FindResponse, Unit] = Source.actorPublisher[FindResponse](businessDelegateProps).mapMaterializedValue {
      _ ! name
    }
    val emptyResponse = Future.apply(FindByNameResponse(OK, Seq.empty))

    val sink = Sink.fold(emptyResponse)((_, response: FindResponse) => response)

    complete(src.runWith(sink)) // doesn't compile
  }
}
4

2 回答 2

1

我最终使用了Actor.ask. 简单的。

于 2015-09-29T07:30:38.240 回答
1

此链接可能会有所帮助:http: //zuchos.com/blog/2015/05/23/how-to-write-a-subscriber-for-akka-streams/和@Noah回答akka 流 Source 由 Source.actorRef 创建

基本上你有2个选择:1)如果你想要一个“简单”的actor,它将接收到的所有消息转发到流中,你可以使用Source.actorRef。然后,您可以通过使用 mapAsync 创建处理阶段将消息通过管道传输到 UserService。2) 另一种选择,如果您希望演员有一些自定义行为,是编写您自己的 ActorPublisher。

高温高压

于 2015-09-13T10:18:22.113 回答