-1

我有一个用例,其中我有一个演员层次结构

parent -> childABC -> workerchild

现在工人孩子工作并将其结果发送给它的父母(childABC,它是父母的孩子)并且那个孩子演员(childABC)将结果发送回我正在使用的父母演员pipeTo,这里得到死信是我的代码

parent演员:

final case object GetFinalValue

class MyActor extends Actor{
  import context.dispatcher
  import akka.pattern.pipe
  val log = LoggerFactory.getLogger(this.getClass)
  val myManageActor = context.actorOf(Props[ManagerMyActor],"Managemyactor")
  implicit val timeout = Timeout(ReadTimeIntervalValue.getInterval(), SECONDS)

  override def receive: Receive = {
    case GetFinalValue=>
      ask(myManageActor,GetValue).pipeTo(sender())

    case message =>
      log.warn(" Unhandled message received : {}", message)
      unhandled(message)
  }

}

childABC(根据我上面给出的示例)

final case object GetValue

class ManagerMyActor extends Actor{
  import context.dispatcher
  import akka.pattern.pipe
  val log = LoggerFactory.getLogger(this.getClass)
  val myTokenActor = context.actorOf(Props[TokenMyActor2],"toknMyActor2")
  implicit val timeout = Timeout(ReadTimeIntervalValue.getInterval(), SECONDS)

  override def receive: Receive = {
    case GetValue=>
      ask(myTokenActor,CalculateValue).pipeTo(sender())
 
    case message =>
      log.warn(" Unhandled message received : {}", message)
      unhandled(message)
  }

}

child演员:

final case object CalculateValue

class TokenMyActor2 extends Actor{
  import context.dispatcher
  import akka.pattern.pipe
  val log = LoggerFactory.getLogger(this.getClass)

  override def receive: Receive = {
    case CalculateValue=>
      val future = Future{ "get the string"
      }
      val bac = future.map{result =>
          sender ! result
      }//.pipeTo(sender())


    case message =>
      log.warn("Actor MyActor: Unhandled message received : {}", message)
      unhandled(message)
  }

}


def main(args: Array[String]): Unit = {
    implicit val timeout = Timeout(ReadTimeIntervalValue.getInterval(), SECONDS)

    val myActor = system.actorOf(Props[MyActor],"myActor")
    val future = ask(myActor, GetFinalValue).mapTo[String]
    future.map {str =>
      log.info ("string is {}",str)
    }

以下是日志:

[INFO] [akkaDeadLetter][01/12/2021 19:17:22.000] [api-akka.actor.default-dispatcher-5] [akka://api/deadLetters] Message [java.lang.String] from Actor[akka://api/user/myActor/Managemyactor/toknMyActor2#1239397461] to Actor[akka://api/deadLetters] was not delivered. [1] dead letters encountered. If this is not an expected behavior then Actor[akka://api/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akkaDeadLetter][01/12/2021 19:17:41.989] [api-akka.actor.default-dispatcher-7] [akka://api/deadLetters] Message [akka.actor.Status$Failure] from Actor[akka://api/user/myActor#1829301550] to Actor[akka://api/deadLetters] was not delivered. [2] dead letters encountered. If this is not an expected behavior then Actor[akka://api/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akkaDeadLetter][01/12/2021 19:17:41.996] [api-akka.actor.default-dispatcher-7] [akka://api/deadLetters] Message [akka.actor.Status$Failure] from Actor[akka://api/user/myActor/Managemyactor#-269929265] to Actor[akka://api/deadLetters] was not delivered. [3] dead letters encountered. If this is not an expected behavior then Actor[akka://api/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

请指导我我在哪里弄错了,或者pipeTo不应该这样使用?如果是这样,我应该怎么做才能使它工作

4

2 回答 2

3

不确定它是否有意,但ask(myManageActor,GetValue).pipeTo(sender())可以实现为forward.

class MyActor extends Actor {
  lazy val myManageActor: ActorRef = ???

  override def receive: Receive = {
    case GetFinalValue =>
      myManageActor.forward(GetValue)
  }
}

forward相同,tell但它保留了消息的原始发件人。

这可以应用于MyActorManagerMyActor

在 的情况下TokenMyActor2,您不应该使用

future.map{ result =>
          sender ! result
      }

因为它破坏了 akka 上下文封装,如文档中所述

当使用未来的回调,如 onComplete,或映射,如 thenRun,或 thenApply 内部 Actor 时,您需要小心避免关闭包含 Actor 的引用,即不要在回调中调用方法或访问封闭 Actor 上的可变状态。这会破坏actor封装,并可能引入同步错误和竞争条件,因为回调将同时调度到封闭的actor。不幸的是,目前还没有一种方法可以在编译时检测这些非法​​访问。另请参阅:Actor 和共享可变状态

您应该改为依赖Future(???).pipeTo(sender()),它可以安全地与 一起使用sender()

应用这些更改后,代码确实按预期工作

case object GetFinalValue
case object GetValue
case object CalculateValue

class MyActor extends Actor {
  private val myManageActor: ActorRef =
    context.actorOf(Props[ManagerMyActor], "myManageActor")

  override def receive: Receive = { case GetFinalValue =>
    myManageActor.forward(GetValue)
  }
}

class ManagerMyActor extends Actor {
  private val myTokenActor =
    context.actorOf(Props[TokenMyActor2], "toknMyActor2")

  override def receive: Receive = { case GetValue =>
    myTokenActor.forward(CalculateValue)
  }

}

class TokenMyActor2 extends Actor {
  import context.dispatcher

  override def receive: Receive = { case CalculateValue =>
    val future = Future { "get the string" }
    future.pipeTo(sender())
  }
}
implicit val timeout = Timeout(3, SECONDS)
implicit val system = ActorSystem("adasd")
import system.dispatcher
val myActor = system.actorOf(Props[MyActor], "myActor")
val future = ask(myActor, GetFinalValue).mapTo[String]
future.foreach { str =>
  println(s"got $str")
}

产生got get the string.

最后一点,我建议不要ask在演员中使用模式。的基本功能ask只需tell和即可轻松实现forward。此外,代码更短,并且不会因不断需要而过载implicit val timeout

于 2021-01-12T14:48:04.380 回答
1

只是为了补充@IvanStanislavciuc 的精彩帖子。您已经注意到您在期货中丢失了对发件人的引用。一个简单的解决方案是保持领先。

这意味着改变MyActor

ask(myManageActor,GetValue).pipeTo(sender()) // Won't work

进入:

val originalSender = sender()
ask(myTokenActor,CalculateValue).pipeTo(originalSender)

ManagerMyActor中,更改:

ask(myTokenActor,CalculateValue).pipeTo(sender()) // Won't work

进入:

val originalSender = sender()
ask(myManageActor,GetValue).pipeTo(originalSender)

并在TokenMyActor2

val originalSender = sender()
Future{ "get the string" }.pipeTo(originalSender)

代码在Scastie运行。

于 2021-01-13T10:16:00.733 回答