20

我有一个演员,它在收到消息时在文件系统中搜索文件并返回文件的完整路径。

为了保持异步,我做了:

def receive = {
  case s:String => {

    val f = future{
      val ans = search(s)
      println("Input Request: "+s+" output:"+ans+" "+sender.path)
    }
    f.onComplete{
      case Success(x) => sender ! x
      case Failure(y) => println("Could not complete it")
    }
  } 
}

但我观察到它将消息返回到akka://FileSystem/deadLetters而不是sender. 文档说:

仅在 Actor 本身内有效,因此不要关闭它并 * 将其发布到其他线程!

那么这是否意味着,我必须保持同步?还有其他方法吗?

4

3 回答 3

46

您犯了一个非常常见的错误,即“关闭可变状态”。您传递给的闭包onComplete不会复制this.sender,因此当您onComplete被调用时,您会将消息发送到this.sender当时发生的指向,而不是创建闭包时指向的内容。

您可以通过创建自己的当前内容的本地不可变副本this.sender并在闭包中引用该值来避免此问题:

val origSender = sender
f.onComplete {
    case Successs(x) => origSender ! x
    ...
}
于 2013-06-03T13:52:22.787 回答
3
import akka.pattern.pipe

行得通。正在做:

val reply = sender
future {
  val ans = searchAndCache(s)
  println("Input Request: "+s+" output:"+ans+" "+reply.path)
  ans
} pipeTo reply

回复发件人

于 2013-06-03T13:44:23.590 回答
1

我知道这很旧,但我必须添加它,
pipeTo正确的方法,但您不需要复制您的 sender
您已经处于相同的上下文中。
实际上 pipeTo 会为您执行此操作。
它将获取当前的发件人参考(通过传递给它的参数)
并使用它为您解决未来(查看它的实现)
只需执行以下操作:

future {
  val ans = searchAndCache(s)
  println("Input Request: "+s+" output:"+ans+" "+reply.path)
  ans
} pipeTo reply
于 2018-06-13T17:45:59.427 回答