1

我有一个使用 akka 的应用程序,现在我想通过套接字连接连接到它。因此,我使用类似于scala page 的机制。但是,如果我尝试这样做tell,虽然我有一个 open OutputStream,但目标不会收到任何消息。

这是我的源代码:

object Connector {

  def main(args: Array[String]) {
    val port = 1337
    val conf = ConfigFactory.load
    val system = ActorSystem("SDDB", conf.getConfig("SDDB"))
    val master = system.actorOf(Props[TestActor])
    master ! "a"

    try {
      val listener = new ServerSocket(port)
      println("listening on port: " + port)
      while (true)
        new ConnectionThread(listener accept, master).start
      listener close
    } catch {
      case e: IOException =>
        System.err.println("Could not listen on port: " + port + ".")
        System.exit(-1)
    } finally {
      system.shutdown
    }
  }
}

case class ConnectionThread(socket: Socket, master: ActorRef) 
  extends Thread("ConnectionThread") {

  private val Select_* = """select (\w+) from (\w+) on (\d{4})-(\d\d)-(\d\d)""".r
  private implicit var id = 0L
  private implicit val timeout = Timeout(25.0 seconds)

  master ! "b"

  override def run {
    master ! "c"
    try{
      master ! "d"
      val in = new ObjectInputStream(socket getInputStream)
      master ! "e"
      val out = new ObjectOutputStream(socket getOutputStream)

      out writeObject("listening")
      out flush

      master ! "f"
      val command = in.readObject.asInstanceOf[String]
      println("client sent: '" + command + "'")
      // process the command

      master ! "g"
      out.writeObject("EOF")
      out.flush

      out.close
      in.close
      socket.close
    } catch {
      case e: SocketException =>
      case e: IOException => e printStackTrace
    }
  }
}

class TestActor extends Actor with ActorLogging{

  log info("TestActor running")

  def receive = {
    case s: String =>
      log info("received: " + s)
  }

}

我得到输出:

listening on port: 1337
[INFO] TestActor running
[INFO] received: a
[INFO] received: b
[INFO] received: c
[INFO] received: d

现在我预计它会一直持续到 g,但我得到了:

client sent: 'select content from testdata on 2012-07-06'

我发现它可以工作,直到我打开套接字的流,可能是因为它也是基于套接字的tell并且ask使用套接字的输出流,胎面运行。之后套接字连接工作,但我无法发送任何消息到演员系统。
我无法删除连接器和 ConnectionThread。我该如何解决?

4

1 回答 1

0

我必须承认,我没有完全理解文档中的示例。但我发现使用 aConnectionHelper而不是直接寻址ActorRef效果很好。
我将代码更改为以下内容:

object Connector {

  def main(args: Array[String]) {
    val port = 1337
    val conf = ConfigFactory.load
    val system = ActorSystem("SDDB", conf.getConfig("SDDB"))

    //    val master = system.actorOf(Props[TestActor], "master")
    //    master ! "a"

    try {
      val listener = new ServerSocket(port)
      println("listening on port: " + port)
      while (true)
      //        new ConnectionThread(listener accept, master.asInstanceOf[TestActor]).start
        new ConnectionThread(listener accept, system).start
      listener close
    } catch {
      case e: IOException =>
        System.err.println("Could not listen on port: " + port + ".")
        System.exit(-1)
    } finally {
      //      master ! PoisonPill
      system.shutdown
    }
  }

}

case class ConnectionThread(socket: Socket, sys: ActorSystem) 
  extends Thread("ConnectionThread") {

  private val Select_* = """select (\w+) from (\w+) on (\d{4})-(\d\d)-(\d\d)""".r
  private implicit var id = 0L
  private implicit val timeout = Timeout(25.0 seconds)
  private val conHelper = new ConnectionHelper

  override def run {
    try {
      val out = new ObjectOutputStream(socket getOutputStream)
      val in = new ObjectInputStream(socket getInputStream)

      conHelper tell "funzt"
      out writeObject ("Hi")
      out.flush
      val command = in.readObject.asInstanceOf[String]
      println("received: " + command)
      out writeObject ("test")
      out.flush
      out writeObject ("EOF")
      out.flush

      out.close
      in.close
      socket.close
    }
  }

  private class ConnectionHelper {
    val tester = sys.actorOf(Props[TestActor])

    def tell(s: String) { tester ! s }

  }

}

我真的不明白为什么这行得通,而我的问题中的代码却没有。我欢迎所有的解释。

于 2012-07-12T11:21:10.877 回答