1

我正在尝试使用 scala 演员并行化代码。那是我第一次使用演员的真正代码,但我对 C 中的 Java 多线程和 MPI 有一些经验。但是我完全迷路了。

我想实现的工作流是一个循环管道,可以描述如下:

  • 每个工人演员都有对另一个人的引用,从而形成一个圆圈
  • 有一个协调演员可以通过发送StartWork()消息来触发计算
  • 当一个工人收到一条StartWork()消息时,它会在本地处理一些东西并将DoWork(...)消息发送给它在圈子中的邻居。
  • 邻居做一些其他的事情,然后向DoWork(...)自己的邻居发送消息。
  • 这种情况一直持续到初始工作人员收到DoWork()消息为止。
  • 协调者可以向初始工作人员发送GetResult()消息并等待回复。

关键是协调器应该只在数据准备好时接收结果。工人如何在回复GetResult()消息之前等待工作返回给它?

为了加快计算速度,任何工作人员都可以随时收到一个StartWork()

这是我第一次尝试对工作人员进行伪实现:

class Worker( neighbor: Worker, numWorkers: Int ) {
   var ready = Foo()
   def act() {
     case StartWork() => { 
       val someData = doStuff()
       neighbor ! DoWork( someData, numWorkers-1 ) 
       }
     case DoWork( resultData, remaining ) => if( remaining == 0 ) {
         ready = resultData
       } else {
         val someOtherData = doOtherStuff( resultData )
         neighbor ! DoWork( someOtherData, remaining-1 )
      }
    case GetResult() => reply( ready )
  }
}

在协调器方面:

worker ! StartWork()
val result = worker !? GetResult() // should wait
4

2 回答 2

3

首先,您显然需要对构成单件作品的内容有一些标识符,以便GetResult可以获得正确的结果。我想显而易见的解决方案是让你的演员保留Map结果和Map任何等待的吸气剂

class Worker( neighbor: Worker, numWorkers: Int ) {
   var res: Map[Long, Result] = Map.empty
   var gets: Map[Long, OutputChannel[Any]] = Map.empty   
   def act() {
     ...
     case DoWork( id, resultData, remaining ) if remaining == 0 =>
       res += (id -> resultData)
       gets.get(id).foreach(_ ! res(id)) //reply to getters when result is ready
       gets -= id //clear out getter map now?
     case GetResult(id) if res.isDefinedAt(d) => //result is ready
       reply (res(id))
     case GetResult(id) => //no result ready 
       gets += (id -> sender)
   }
}

注意:在匹配条件中使用if可以让消息处理更清晰一点

于 2010-03-17T17:56:29.960 回答
1

一种选择是:

class Worker( neighbor: Worker, numWorkers: Int ) {
   var ready = Foo()
   def act() {
     case StartWork() => { 
       val someData = doStuff()
       neighbor ! DoWork( someData, numWorkers-1 ) 
       }
     case DoWork( resultData, remaining ) => if( remaining == 0 ) {
         ready = resultData
         react {
           case GetResult() => reply( ready )
         }
       } else {
         val someOtherData = doOtherStuff( resultData )
         neighbor ! DoWork( someOtherData, remaining-1 )
      }
  }
}

工作完成后,这个worker会一直卡住,直到收到GetResult消息。另一方面,协调者可以立即发送GetResult,因为它将保留在邮箱中,直到工作人员收到它。

于 2010-03-17T22:29:06.767 回答