2

我们有一些使用 ThreadPoolExecutor 和 CompletionService 的 Java 代码。任务大批量提交到池中;结果转到完成服务,我们在可用时收集已完成的任务,而无需等待整个批次完成:

 ThreadPoolExecutor _executorService =
            new ThreadPoolExecutor(MAX_NUMBER_OF_WORKERS, new LinkedBlockingQueue(20));
 CompletionService _completionService =
            new ExecutorCompletionService<Callable>(_executorService)

//submit tasks
_completionService.submit( some task);

//get results
while(...){
   Future result = _completionService.poll(timeout);
   if(result)
      //process result
}

池中的工人总数为 MAX_NUMBER_OF_WORKERS;在没有可用工作人员的情况下提交的任务排队;最多可以排队 20 个任务,之后,任务被拒绝。

这种方法的Gpars对应物是什么?

阅读有关 gpars 并行性的文档collectManyParallel(),我发现了许多潜在的选项: 、anyParallel()fork/join等,我什至不确定要测试哪些选项。我希望在文档中找到一些提及“完成”或“完成服务”的内容作为比较,但一无所获。我正在寻找一些关于从那些有 gpars 经验的人那里开始的方向/指针。

4

1 回答 1

1

即时收集结果,限制生产者——这需要数据流解决方案。请在下面找到一个可运行的示例示例:

import groovyx.gpars.dataflow.DataflowQueue
import groovyx.gpars.group.DefaultPGroup
import groovyx.gpars.scheduler.DefaultPool

import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

int MAX_NUMBER_OF_WORKERS = 10

ThreadPoolExecutor _executorService =
        new ThreadPoolExecutor(MAX_NUMBER_OF_WORKERS, MAX_NUMBER_OF_WORKERS, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(200));

final group = new DefaultPGroup(new DefaultPool(_executorService))
final results = new DataflowQueue()

//submit tasks
30.times {value ->
    group.task(new Runnable() {
        @Override
        void run() {
            println 'Starting ' + Thread.currentThread()
            sleep 5000
            println 'Finished ' + Thread.currentThread()
            results.bind(value)
        }
    });
}
group.task {
    results << -1  //stop the consumer eventually
}

//get results
while (true) {
    def result = results.val
    println result
    if (result == -1) break
    //process result
}

group.shutdown()
于 2014-01-14T08:33:20.533 回答