1

我的问题是关于处理来自其他参与者的参与者的响应并使用这些结果执行一些操作。

这是接收方法的简短伪示例:

case data: Data =>
    val data2 = actor1 ? data.fieldA
    val almostFinished = data2.flatMap { d2 => 
        val data3 = actor2 ? d2.fieldB
        data3.map { d3 => 
            actor3 ? someStuff(d3)
        }
    }
    almostFinished.map { r =>
        someFinalStuff(r)
    } pipeTo sender

这里是一些通用的业务处理逻辑。

首先,它看起来完全不可读。其次 - 地图中的故障没有得到处理,也没有无处报告。

您能否解释一下我应该如何处理演员和消息方面的这种逻辑?

谢谢!

4

3 回答 3

2

除了使用 ask 和 futures 之外的另一种方法是使用临时参与者以及Akka 2.2.3 中引入的聚合器模式。

于 2013-11-08T12:54:29.540 回答
1

你可以试试这个:

case data: Data =>
  val data2Fut = (actor1 ? data.fieldA).mapTo[Data]
  val result = 
    for{
      data2 <- data2Fut
      data3 <- (actor2 ? data2.fieldB).mapTo[Data]
      data4 <- (actor3 ? someStuff(data3)).mapTo[Data]
    } yield someFinalStuff(data4)
  result pipeTo sender

任何时候你发现自己链接在一起flatMapmap你应该考虑使用这样的理解来清理。

于 2013-11-07T17:59:40.890 回答
0

至于代码不可读,一些for语法可以给我们一些提神

case data: Data =>
  val combinedFuture: Future[Stuff] =
    for {
      data2 <- actor1 ? data.fieldA
      data3 <- actor2 ? data2.fieldB
      stuff <- actor3 ? someStuff(data3)
    } yield someFinalStuff(stuff)

  combinedFuture pipeTo sender

实际上,您的示例似乎完全是为了展示for-comprehension


至于在“未来调用管道”中检测故障的问题,您可能想知道一个Future[A]类型只能有两个可能的实例之一:

Success[A]
Failure[T <: Throwable]

您可以测试类型或更好,您应该使用FutureonComplete中定义onSuccessonFailure回调方法

正如库恩博士所澄清的那样,使用失败的 Future 管道pipeTo将向接收者传递一条Status.Failure消息,您可以从中获取错误作为Throwable实例

编辑

正如 cmbaxter 所注意到的,该?模式将返回一个“无类型”的 Future,这将需要某种强制来返回正确类型的值,使用mapTo

正确的代码将变为

  val combinedFuture: Future[Stuff] =
    for {
      data2 <- (actor1 ? data.fieldA).mapTo[Data2]
      data3 <- (actor2 ? data2.fieldB).mapTo[Data3]
      stuff <- (actor3 ? someStuff(data3)).mapTo[Intermediate]
    } yield someFinalStuff(stuff)
于 2013-11-07T17:58:37.453 回答