8

正如我自己对自己问题的回答一样,我遇到了一种情况,即我正在处理大量到达队列的事件。每个事件都以完全相同的方式处理,甚至可以独立于所有其他事件进行处理。

我的程序利用了 Scala 并发框架,其中涉及的许多进程都被建模为Actors。由于Actors 顺序处理他们的消息,因此它们不太适合这个特定问题(即使我的其他参与者正在执行顺序操作)。由于我希望 Scala 能够“控制”所有线程创建(我认为这是它首先具有并发系统的关键),因此我似乎有两个选择:

  1. 将事件发送到我控制的事件处理器池
  2. 让我Actor通过其他机制同时处理它们

我会认为#1 否定了使用演员子系统的意义:我应该创建多少个处理器演员?是一个明显的问题。据说这些事情对我来说是隐藏的,并由子系统解决。

我的回答是执行以下操作:

val eventProcessor = actor {
  loop {
    react {
      case MyEvent(x) =>
        //I want to be able to handle multiple events at the same time
        //create a new actor to handle it
        actor {
          //processing code here
          process(x)
        }
    }
  }
}

有更好的方法吗?这是不正确的吗?

编辑:一个可能更好的方法是:

val eventProcessor = actor {
  loop {
    react {
      case MyEvent(x) =>
        //Pass processing to the underlying ForkJoin framework
        Scheduler.execute(process(e))
    }
  }
}
4

5 回答 5

8

这似乎是另一个问题的重复。所以我会重复我的答案

Actor 一次处理一条消息。处理多条消息的经典模式是在一组消费者参与者的前面设置一个协调参与者。如果你使用 react,那么消费者池可能会很大,但仍然只会使用少量的 JVM 线程。这是一个示例,我创建了一个由 10 个消费者组成的池,并为他们创建了一个协调员。

import scala.actors.Actor
import scala.actors.Actor._

case class Request(sender : Actor, payload : String)
case class Ready(sender : Actor)
case class Result(result : String)
case object Stop

def consumer(n : Int) = actor {
  loop {
    react {
      case Ready(sender) => 
        sender ! Ready(self)
      case Request(sender, payload) =>
        println("request to consumer " + n + " with " + payload)
        // some silly computation so the process takes awhile
        val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
        sender ! Result(result)
        println("consumer " + n + " is done processing " + result )
      case Stop => exit
    }
  }
}

// a pool of 10 consumers
val consumers = for (n <- 0 to 10) yield consumer(n)

val coordinator = actor {
  loop {
     react {
        case msg @ Request(sender, payload) =>
           consumers foreach {_ ! Ready(self)}
           react {
              // send the request to the first available consumer
              case Ready(consumer) => consumer ! msg
           }
         case Stop => 
           consumers foreach {_ ! Stop} 
           exit
     }
  }
}

// a little test loop - note that it's not doing anything with the results or telling the coordinator to stop
for (i <- 0 to 1000) coordinator ! Request(self, i.toString)

此代码测试以查看哪个消费者可用并向该消费者发送请求。替代方案是随机分配给消费者或使用循环调度程序。

根据您所做的事情,Scala 的 Futures 可能会为您提供更好的服务。例如,如果你真的不需要演员,那么上述所有机器都可以写成

import scala.actors.Futures._

def transform(payload : String) = {      
  val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
  println("transformed " + payload + " to " + result )
  result
}

val results = for (i <- 0 to 1000) yield future(transform(i.toString))
于 2009-06-18T00:56:23.467 回答
3

如果事件都可以独立处理,为什么要排队?对您的设计一无所知,这似乎是一个不必要的步骤。如果您可以process使用触发这些事件的任何内容来组合该函数,则您可能会消除队列。

演员本质上是配备队列的并发效果。如果你想同时处理多条消息,你真的不需要演员。您只希望在某个方便的时间安排一个函数 (Any => ()) 执行。

话虽如此,如果您想留在演员库中并且事件队列不在您的控制范围内,那么您的方法是合理的。

Scalaz对 Actor 和并发效果进行了区分。虽然它Actor的重量很轻,scalaz.concurrent.Effect但仍然更轻。这是您的代码大致翻译到 Scalaz 库:

val eventProcessor = effect (x => process x)

这是最新的行李箱头,尚未发布。

于 2009-06-17T14:56:37.240 回答
1

这听起来像是一个简单的消费者/生产者问题。我会使用一个带有消费者池的队列。您可能可以使用 java.util.concurrent 用几行代码来编写它。

于 2009-06-17T15:21:28.577 回答
1

演员(好吧,其中之一)的目的是确保演员内的状态一次只能由单个线程访问。如果消息的处理不依赖于actor中的任何可变状态,那么将任务提交给调度程序或线程池进行处理可能更合适。演员提供的额外抽象实际上妨碍了你。

scala.actors.Scheduler 中有一些方便的方法,或者您可以使用 java.util.concurrent 中的 Executor。

于 2009-06-17T17:22:53.430 回答
1

Actor 比线程轻得多,因此另一种选择是使用 Actor 对象,例如您习惯于提交到线程池的 Runnable 对象。主要区别在于您无需担心 ThreadPool - 线程池由 Actor 框架为您管理,并且主要是配置问题。

def submit(e: MyEvent) = actor {
  // no loop - the actor exits immediately after processing the first message
  react {
    case MyEvent(x) =>
      process(x)
  }
} ! e // immediately send the new actor a message

然后提交消息,这样说:

submit(new MyEvent(x))

,对应于

eventProcessor ! new MyEvent(x)

从你的问题。

在四核 i7 笔记本电脑上大约 10 秒内发送和接收了 100 万条消息,成功测试了这种模式。

希望这可以帮助。

于 2011-05-20T14:56:47.703 回答