我正在尝试使用 scala 演员并行化代码。那是我第一次使用演员的真正代码,但我对 C 中的 Java 多线程和 MPI 有一些经验。但是我完全迷路了。
我想实现的工作流是一个循环管道,可以描述如下:
- 每个工人演员都有对另一个人的引用,从而形成一个圆圈
- 有一个协调演员可以通过发送
StartWork()
消息来触发计算 - 当一个工人收到一条
StartWork()
消息时,它会在本地处理一些东西并将DoWork(...)
消息发送给它在圈子中的邻居。 - 邻居做一些其他的事情,然后向
DoWork(...)
自己的邻居发送消息。 - 这种情况一直持续到初始工作人员收到
DoWork()
消息为止。 - 协调者可以向初始工作人员发送
GetResult()
消息并等待回复。
关键是协调器应该只在数据准备好时接收结果。工人如何在回复GetResult()
消息之前等待工作返回给它?
为了加快计算速度,任何工作人员都可以随时收到一个StartWork()
。
这是我第一次尝试对工作人员进行伪实现:
class Worker( neighbor: Worker, numWorkers: Int ) {
var ready = Foo()
def act() {
case StartWork() => {
val someData = doStuff()
neighbor ! DoWork( someData, numWorkers-1 )
}
case DoWork( resultData, remaining ) => if( remaining == 0 ) {
ready = resultData
} else {
val someOtherData = doOtherStuff( resultData )
neighbor ! DoWork( someOtherData, remaining-1 )
}
case GetResult() => reply( ready )
}
}
在协调器方面:
worker ! StartWork()
val result = worker !? GetResult() // should wait