28

对我最近的一个问题的答复表明,一个演员一次处理一条消息。这是真的?我没有看到任何明确说明的内容(在 Scala 编程中),其中包含以下片段(第 593 页)

如果[该react方法]找到可以处理的消息,[它]将安排对该消息的处理以供以后执行并抛出异常

(强调我自己的)。两个相关(且相互排斥)的问题:

  1. 假设一个演员可以同时处理多条消息,我如何强制一个演员一次处理 1 条消息(如果这是我希望做的)?(使用receive?)
  2. 假设一个演员一次处理一个消息,我将如何最好地实现一个实际上可以同时处理消息的演员

编辑:做一些测试似乎证明我错了,演员确实是连续的。所以这是我需要回答的问题#2

4

4 回答 4

26

Actor 一次处理一条消息。处理多条消息的经典模式是在一组消费者参与者的前面设置一个协调参与者。如果你使用 react,那么消费者池可能会很大,但仍然只会使用少量的 JVM 线程。这是一个示例,我创建了一个由 10 个消费者组成的池,并为他们创建了一个协调员。

import scala.actors.Actor
import scala.actors.Actor._

case class Request(sender : Actor, payload : String)
case class Ready(sender : Actor)
case class Result(result : String)
case object Stop

def consumer(n : Int) = actor {
  loop {
    react {
      case Ready(sender) => 
        sender ! Ready(self)
      case Request(sender, payload) =>
        println("request to consumer " + n + " with " + payload)
        // some silly computation so the process takes awhile
        val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
        sender ! Result(result)
        println("consumer " + n + " is done processing " + result )
      case Stop => exit
    }
  }
}

// a pool of 10 consumers
val consumers = for (n <- 0 to 10) yield consumer(n)

val coordinator = actor {
  loop {
     react {
        case msg @ Request(sender, payload) =>
           consumers foreach {_ ! Ready(self)}
           react {
              // send the request to the first available consumer
              case Ready(consumer) => consumer ! msg
           }
         case Stop => 
           consumers foreach {_ ! Stop} 
           exit
     }
  }
}

// a little test loop - note that it's not doing anything with the results or telling the coordinator to stop
for (i <- 0 to 1000) coordinator ! Request(self, i.toString)

此代码测试以查看哪个消费者可用并向该消费者发送请求。替代方案是随机分配给消费者或使用循环调度程序。

根据您所做的事情,Scala 的 Futures 可能会为您提供更好的服务。例如,如果你真的不需要演员,那么上述所有机器都可以写成

import scala.actors.Futures._

def transform(payload : String) = {      
  val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
  println("transformed " + payload + " to " + result )
  result
}

val results = for (i <- 0 to 1000) yield future(transform(i.toString))
于 2009-06-17T17:41:26.417 回答
6

我认为答案是Actor不能异步处理消息。如果你有一个Actor应该监听可以异步处理这些消息的消息,那么它可以这样写:

val actor_ = actor {

  loop {
    react {
      case msg =>
        //create a new actor to execute the work. The framework can then 
        //manage the resources effectively
        actor {
          //do work here
        }
      }
    }
  }
于 2009-06-17T14:13:53.940 回答
0

如果你想做多件事,那么你应该使用多个演员。使用参与者的全部原因是将工作分配给多个独立的进程。

于 2009-06-17T13:39:19.907 回答
0

您可以尝试使用期货概念。请使用 Futures 存储所有这些消息,然后尝试处理所有这些消息。

于 2019-09-04T07:28:59.790 回答