0

假设我在 Akka Actor 中有一个带有方法的 Actor 类:

class SecureActor extends Actor{
  def methodWithThread(): Unit={
}

可以在这种方法中运行一个新线程吗?Akka自己的线程不会有问题吗?

class ThreadExample extends Thread{  
    override def run(){  
      println("Thread is running...");  
}
def methodWithThread(): Unit={
  var t = new ThreadExample()  
  t.start()
}
4

2 回答 2

1

可以在 Actor 内部的单独线程中启动代码(例如 using Future),但您必须小心它如何与 Akka 交互。线程不应读取或修改 Actor 中的任何可变状态,并且只能通过发送消息与 Actor 进行通信。

于 2021-01-19T09:13:34.627 回答
1

既然您说您正在寻找如何确保以这种方式从 Akka Actor 创建异步任务将破坏它提供给您的 Akka 并发模式,这里有一些基于您的类的非常简单的示例。

正如我在评论中所建议的那样,这不是演示如何从父演员生成子演员的答案。正如你所说,你有兴趣展示如何打破 Akka 并发模式。我建议您寻找如何做到这一点的简单示例。也许这个例子有帮助。

import akka.actor.{Actor, ActorSystem, Props}

import scala.util.Random

object BreakingAkkaConcurrency {
  def main(args: Array[String]): Unit = {
    val actorSystem = ActorSystem("BreakingAkkaConcurrency")
    val unsecureActor = actorSystem.actorOf(Props[UnsecureActor], "unsecureActor")
    unsecureActor ! "1"
    unsecureActor ! "2"
    unsecureActor ! "3"
    unsecureActor ! "4"
    unsecureActor ! "5"
    unsecureActor ! "6"
    unsecureActor ! "7"
    unsecureActor ! "8"
    unsecureActor ! "9"
    unsecureActor ! "10"

    Thread.sleep(10000)
  }
}

class UnsecureActor extends Actor {
  def methodWithThread(): Unit = {
  }

  override def receive: Receive = {
    case msg: String =>
      var t = new ThreadExample(msg)
      // by starting a new thread inside an Akka actor you break the synchronous pattern provided by Akka
      t.start()
  }
}

class ThreadExample(id: String) extends Thread {
  override def run() {
    // simulate a random computation which will potentially break the order of messages processed by Akka actors
    // This is where the Akka actors don't have control anymore.
    Thread.sleep(Random.nextInt(10) * 1000)
    println(s"finished $id");
  }
}

输出的顺序与unsecureActor ! "1"发送消息的顺序不同。

finished 9
finished 4
finished 1
finished 7
finished 3
finished 2
finished 8
finished 5
finished 6
finished 10
于 2021-01-19T08:45:57.593 回答