3

我对 GPars Actors 的理解可能不正确,所以如果我错了,请纠正我。我有一个 Groovy 应用程序,用于轮询 Web 服务以查找工作。当找到一个或多个工作时,它会将每个工作发送到DynamicDispatchActor我创建的一个,然后处理该工作。这些作业是完全独立的,不需要向主线程返回任何内容。当多个作业同时进入时,我希望它们被并行处理,但无论我尝试什么配置,演员都会先入先出地处理它们。

举一个代码示例:

def poolGroup = new DefaultPGroup(new DefaultPool(true, 5))

def actor = poolGroup.messageHandler {
    when {Integer msg -> 
        println("I'm number ${msg} on thread ${Thread.currentThread().name}")
        Thread.sleep(1000)
    }
}

def integers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

integers.each {
    actor << it
}

这打印出来:

我在线程 Actor 线程 31 上排名第一
我在线程 Actor 线程 31 上排名第二
我在线程 Actor 线程 31 上排名第三
我在线程 Actor 线程 31 上排名第 4
我在线程 Actor 线程 31 上排名第 5
我在线程 Actor 线程 31 上排名第 6
我在线程 Actor 线程 31 上排名第 7
我在线程 Actor 线程 31 上排名第 8
我在线程 Actor 线程 31 上排名第 9
我在线程 Actor 线程 31 上排名第 10

在每次打印输出之间稍作停顿。另请注意,每个打印输出都来自同一个 Actor/线程。

我想在这里看到的是前 5 个数字会立即打印出来,因为线程池设置为 5,然后随着这些线程的释放,接下来的 5 个数字会被打印出来。我完全不在这儿吗?

4

1 回答 1

3

要使其按预期运行,需要进行一些更改:

import groovyx.gpars.group.DefaultPGroup
import groovyx.gpars.scheduler.DefaultPool

def poolGroup = new DefaultPGroup(new DefaultPool(true, 5))

def closure = {
    when {Integer msg ->
        println("I'm number ${msg} on thread ${Thread.currentThread().name}")
        Thread.sleep(1000)
        stop()
    }
}

def integers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

def actors = integers.collect { poolGroup.messageHandler(closure) << it }
actors*.join()

完整的要点文件:https ://gist.github.com/wololock/7f1348e04f68710e42d2

然后输出将是:

I'm number 5 on thread Actor Thread 5
I'm number 4 on thread Actor Thread 4
I'm number 1 on thread Actor Thread 1
I'm number 3 on thread Actor Thread 3
I'm number 2 on thread Actor Thread 2
I'm number 6 on thread Actor Thread 3
I'm number 9 on thread Actor Thread 4
I'm number 7 on thread Actor Thread 2
I'm number 8 on thread Actor Thread 5
I'm number 10 on thread Actor Thread 1

现在让我们来看看发生了什么变化。首先,在您之前的示例中,您只处理了一个演员。您定义poolGroup正确,但随后您创建了单个参与者并将计算转移到该单个实例。要并行运行这些计算,您必须依赖poolGroup并且仅将输入发送到某个消息处理程序 - 池组将处理参与者的创建及其生命周期管理。这就是我们所做的:

def actors = integers.collect { poolGroup.messageHandler(closure) << it }

它将创建一个从给定输入开始的演员集合。池组将注意不超过指定的池大小。然后你必须对join每个演员,这可以通过使用 groovy 的魔法来完成:actors*.join(). 感谢应用程序将等待终止,直到所有参与者停止他们的计算。这就是为什么我们必须在消息处理程序主体stop()的闭包中添加方法when- 没有它,它不会终止,因为池组不知道演员是否在工作 - 他们可能会等待例如另一条消息。

替代解决方案

我们还可以考虑使用 GPars 并行迭代的替代解决方案:

import groovyx.gpars.GParsPool

// This example is dummy, but let's assume that this processor is
// stateless and shared between threads component.
class Processor {
    void process(int number) {
        println "${Thread.currentThread().name} starting with number ${number}"
        Thread.sleep(1000)
    }
}

def integers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Processor processor = new Processor()

GParsPool.withPool 5, {
    integers.eachParallel { processor.process(it) }
}

在此示例中,您有一个无状态组件Processor和并行计算,使用一个Processor具有多个输入值的无状态实例。

我试图弄清楚你在评论中提到的情况,但我不确定单个演员是否可以一次处理多条消息。Actor 的无状态仅意味着它在处理消息期间不会改变其内部状态,并且不得在 Actor 范围内存储任何其他信息。如果我的推理不正确,如果有人能纠正我,那就太好了:)

我希望这能帮到您。最好的!

于 2014-12-07T08:50:46.790 回答