1

我是 Scala 和 Akka 的新手,但有一个简单的场景......我正在尝试获取一个 url 列表,将每个 url 作为消息发送到一个新的参与者中,该参与者将发出一个 http GET 请求。只有 16 个 url 的列表,我获得了大约 40-65% 的成功,而其他人得到死信“消息未传递”。但是,如果我在创建演员之前放置一个 Thread.sleep(50),我不会收到死信未命中。注意:HTTP Get 使用的是 java 类,而不是 scala 类,它可能是一个因素,也可能不是一个因素。请参阅下面的代码示例...我知道 Akka 不保证消息,但是这个成功率看起来不对,我一定是做错了。感谢您的任何指示。

  def ParallelTest(urls: List[String]): String =   {
     val system = ActorSystem("HelloSystem")

     var cnt: Int = 0
     for (item <- urls){
       Thread.sleep(50)
       createActor(system, "helloactor" + cnt, item)
       cnt += 1
       /*
       historical example
       helloActor ! "hello"
       helloActor ! "hello"
       helloActor ! "Buenos dias"
       */
      }
     system.shutdown

     println("parallel done");
     "done"   
   }

  def createActor(actorSystem: ActorSystem, actorName: String, urlItem: String) = {
    val helloActor = actorSystem.actorOf(Props[HelloActor], name = actorName)
    helloActor ! UrlTransport(urlItem)
  }
4

1 回答 1

2

你可能想考虑做这样的事情,让演员知道他们什么时候完成。

class HelloManager(urls: List[String]) extends Actor {

    var completed = 0
    def remaining = urls.size - completed

    def receive: Receive = {
        case StartSystem => startRequests
        case RequestComplete => handleComplete

    }

    def startRequests(): Unit = {
        for ((url, i) <- urls.zipWithIndex) {
            val helloActor = context.actorOf(Props[HelloActor], name = s"helloActor$i")
            helloActor ! UrlTransport(url)
        }
    }  

    def handleComplete(): Unit = {
        completed += 1
        if (remaining == 0) 
            // do something like
            // context.stop(self)
    }
}

然后你只需要 HelloActorsender ! RequestComplete在它完成它的工作时做一个。

于 2014-02-07T00:03:53.240 回答