要使其按预期运行,需要进行一些更改:
import groovyx.gpars.group.DefaultPGroup
import groovyx.gpars.scheduler.DefaultPool
def poolGroup = new DefaultPGroup(new DefaultPool(true, 5))
def closure = {
when {Integer msg ->
println("I'm number ${msg} on thread ${Thread.currentThread().name}")
Thread.sleep(1000)
stop()
}
}
def integers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
def actors = integers.collect { poolGroup.messageHandler(closure) << it }
actors*.join()
完整的要点文件:https ://gist.github.com/wololock/7f1348e04f68710e42d2
然后输出将是:
I'm number 5 on thread Actor Thread 5
I'm number 4 on thread Actor Thread 4
I'm number 1 on thread Actor Thread 1
I'm number 3 on thread Actor Thread 3
I'm number 2 on thread Actor Thread 2
I'm number 6 on thread Actor Thread 3
I'm number 9 on thread Actor Thread 4
I'm number 7 on thread Actor Thread 2
I'm number 8 on thread Actor Thread 5
I'm number 10 on thread Actor Thread 1
现在让我们来看看发生了什么变化。首先,在您之前的示例中,您只处理了一个演员。您定义poolGroup
正确,但随后您创建了单个参与者并将计算转移到该单个实例。要并行运行这些计算,您必须依赖poolGroup
并且仅将输入发送到某个消息处理程序 - 池组将处理参与者的创建及其生命周期管理。这就是我们所做的:
def actors = integers.collect { poolGroup.messageHandler(closure) << it }
它将创建一个从给定输入开始的演员集合。池组将注意不超过指定的池大小。然后你必须对join
每个演员,这可以通过使用 groovy 的魔法来完成:actors*.join()
. 感谢应用程序将等待终止,直到所有参与者停止他们的计算。这就是为什么我们必须在消息处理程序主体stop()
的闭包中添加方法when
- 没有它,它不会终止,因为池组不知道演员是否在工作 - 他们可能会等待例如另一条消息。
替代解决方案
我们还可以考虑使用 GPars 并行迭代的替代解决方案:
import groovyx.gpars.GParsPool
// This example is dummy, but let's assume that this processor is
// stateless and shared between threads component.
class Processor {
void process(int number) {
println "${Thread.currentThread().name} starting with number ${number}"
Thread.sleep(1000)
}
}
def integers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Processor processor = new Processor()
GParsPool.withPool 5, {
integers.eachParallel { processor.process(it) }
}
在此示例中,您有一个无状态组件Processor
和并行计算,使用一个Processor
具有多个输入值的无状态实例。
我试图弄清楚你在评论中提到的情况,但我不确定单个演员是否可以一次处理多条消息。Actor 的无状态仅意味着它在处理消息期间不会改变其内部状态,并且不得在 Actor 范围内存储任何其他信息。如果我的推理不正确,如果有人能纠正我,那就太好了:)
我希望这能帮到您。最好的!