3

我正在学习 Akka 2.1 中的远程参与者,并尝试修改Typesafe 提供的反例。我从控制台实现了一个快速'n'dirty UI 来发送滴答声。并退出询问(并显示结果)当前计数。

这个想法是启动一个将运行 Counter actor 的主节点和一些将通过远程处理向其发送消息的客户端节点。但是,我想通过配置和对代码的最小更改来实现这一点。因此,通过更改配置,可以使用本地演员。

我发现这个关于类似问题的博客条目,即使有许多实例正在运行,所有 API 调用都必须通过一个参与者。

我写了类似的配置,但我无法让它工作。我当前的代码确实使用了远程处理,但它在主节点上为每个新节点创建了一个新的参与者,如果没有明确地给它路径(并且无视配置点),我无法让它连接到现有参与者。然而,这不是我想要的,因为状态不能以这种方式在 JVM 之间共享。

可通过 git repo获得完整的可运行代码

这是我的配置文件

akka {
    actor {
        provider = "akka.remote.RemoteActorRefProvider"
        deployment {
            /counter {
                remote = "akka://ticker@127.0.0.1:2552"
            }
        }
    }
    remote {
        transport = "akka.remote.netty.NettyRemoteTransport"
        log-sent-messages = on
        netty {
            hostname = "127.0.0.1"
        }
    }
}

和完整的来源

import akka.actor._
import akka.pattern.ask
import scala.concurrent.duration._
import akka.util.Timeout
import scala.util._

case object Tick
case object Get

class Counter extends Actor {
  var count = 0

  val id = math.random.toString.substring(2)
  println(s"\nmy name is $id\ni'm at ${self.path}\n")
  def log(s: String) = println(s"$id: $s")

  def receive = {
    case Tick =>
      count += 1
      log(s"got a tick, now at $count")
    case Get  =>
      sender ! count
      log(s"asked for count, replied with $count")
  }
}

object AkkaProjectInScala extends App {
  val system = ActorSystem("ticker")
  implicit val ec = system.dispatcher

  val counter = system.actorOf(Props[Counter], "counter")

  def step {
    print("tick or quit? ")
    readLine() match {
      case "tick" => counter ! Tick
      case "quit" => return
      case _ =>
    }
    step
  }
  step

  implicit val timeout = Timeout(5.seconds)

  val f = counter ? Get
  f onComplete {
    case Failure(e) => throw e
    case Success(count) => println("Count is " + count)
  }

  system.shutdown()
}

我用sbt run和在另一个窗口sbt run -Dakka.remote.netty.port=0中运行它。

4

2 回答 2

1

我发现我可以使用某种模式。Akka remote 仅允许在远程系统上进行部署(无法找到一种仅通过配置使其在远程上查找的方法……我在这里弄错了吗?)。

所以我可以部署一个“侦察员”来传回 ActorRef。可运行代码在分支“scout-hack”下的原始仓库中可用。因为这感觉就像一个黑客。我仍然会欣赏基于配置的解决方案。

演员

case object Fetch

class Scout extends Actor{
  def receive = {
    case Fetch => sender ! AkkaProjectInScala._counter
  }
}

Counter actor 创建现在是懒惰的

lazy val _counter = system.actorOf(Props[Counter], "counter")

所以它只在master上执行(由端口决定)并且可以像这样获取

val counter: ActorRef = {
  val scout = system.actorOf(Props[Scout], "scout")
  val ref = Await.result(scout ? Fetch, timeout.duration) match {
    case r: ActorRef => r
  }
  scout ! PoisonPill
  ref
}

和完整的配置

akka {
    actor {
        provider = "akka.remote.RemoteActorRefProvider"
        deployment {
            /scout {
                remote = "akka://ticker@127.0.0.1:2552"
            }
        }
    }
    remote {
        transport = "akka.remote.netty.NettyRemoteTransport"
        log-sent-messages = on
        netty {
            hostname = "127.0.0.1"
        }
    }
}

编辑:我还找到了一种简洁的方法:检查“counterPath”的配置,如果存在则为actorFor(path),否则创建actor。很好,你可以在运行时注入 master,代码比“scout”干净得多,但它仍然必须决定天气来查找或创建一个演员。我想这是无法避免的。

于 2013-02-11T15:19:02.820 回答
0

我尝试了你的 git 项目,它实际上工作正常,除了编译错误,你必须使用-Dakka.remote.netty.port=0jvm 的参数启动 sbt 会话,而不是作为run.

您还应该了解,您不必在两个进程中都启动 Counter actor。在此示例中,它旨在从客户端创建并部署在服务器上(端口 2552)。您不必在服务器上启动它。对于这个例子来说,在服务器上创建actor系统就足够了。

于 2013-02-11T10:59:52.173 回答