有大量的任务。每个任务都属于一个组。要求是每组任务应该像在单个线程中执行一样串行执行,并且应该在多核(或多cpu)环境中最大化吞吐量。注意:还有大量与任务数量成正比的组。
天真的解决方案是使用 ThreadPoolExecutor 并同步(或锁定)。但是,线程会相互阻塞,吞吐量不会最大化。
有更好的主意吗?或者是否存在满足要求的第三方库?
有大量的任务。每个任务都属于一个组。要求是每组任务应该像在单个线程中执行一样串行执行,并且应该在多核(或多cpu)环境中最大化吞吐量。注意:还有大量与任务数量成正比的组。
天真的解决方案是使用 ThreadPoolExecutor 并同步(或锁定)。但是,线程会相互阻塞,吞吐量不会最大化。
有更好的主意吗?或者是否存在满足要求的第三方库?
一种简单的方法是将所有组任务“连接”成一个超级任务,从而使子任务串行运行。但这可能会导致其他组延迟,除非其他组完全完成并在线程池中腾出一些空间,否则这些组将无法启动。
作为替代方案,请考虑将组的任务链接起来。下面的代码说明了这一点:
public class MultiSerialExecutor {
private final ExecutorService executor;
public MultiSerialExecutor(int maxNumThreads) {
executor = Executors.newFixedThreadPool(maxNumThreads);
}
public void addTaskSequence(List<Runnable> tasks) {
executor.execute(new TaskChain(tasks));
}
private void shutdown() {
executor.shutdown();
}
private class TaskChain implements Runnable {
private List<Runnable> seq;
private int ind;
public TaskChain(List<Runnable> seq) {
this.seq = seq;
}
@Override
public void run() {
seq.get(ind++).run(); //NOTE: No special error handling
if (ind < seq.size())
executor.execute(this);
}
}
优点是没有使用额外的资源(线程/队列),并且任务的粒度比幼稚方法中的要好。缺点是要提前知道所有组的任务。
- 编辑 -
为了使这个解决方案通用和完整,您可能需要决定错误处理(即即使发生错误,链是否继续),实现 ExecutorService 并将所有调用委托给底层执行程序也是一个好主意。
我建议使用任务队列:
快速谷歌搜索表明 java api 本身没有任务/线程队列。但是,有很多关于编码的教程。如果您知道一些,每个人都可以随意列出好的教程/实现:
我基本同意 Dave 的回答,但是如果您需要跨所有“组”划分 CPU 时间,即所有任务组应该并行进行,您可能会发现这种构造很有用(使用删除作为“锁定”。这在我的情况虽然我想它往往会使用更多的内存):
class TaskAllocator {
private final ConcurrentLinkedQueue<Queue<Runnable>> entireWork
= childQueuePerTaskGroup();
public Queue<Runnable> lockTaskGroup(){
return entireWork.poll();
}
public void release(Queue<Runnable> taskGroup){
entireWork.offer(taskGroup);
}
}
和
class DoWork implmements Runnable {
private final TaskAllocator allocator;
public DoWork(TaskAllocator allocator){
this.allocator = allocator;
}
pubic void run(){
for(;;){
Queue<Runnable> taskGroup = allocator.lockTaskGroup();
if(task==null){
//No more work
return;
}
Runnable work = taskGroup.poll();
if(work == null){
//This group is done
continue;
}
//Do work, but never forget to release the group to
// the allocator.
try {
work.run();
} finally {
allocator.release(taskGroup);
}
}//for
}
}
然后,您可以使用最佳线程数来运行DoWork
任务。这是一种循环负载平衡..
您甚至可以通过使用它而不是简单的队列来做更复杂的事情TaskAllocator
(剩余任务更多的任务组往往会被执行)
ConcurrentSkipListSet<MyQueue<Runnable>> sophisticatedQueue =
new ConcurrentSkipListSet(new SophisticatedComparator());
SophisticatedComparator
在哪里
class SophisticatedComparator implements Comparator<MyQueue<Runnable>> {
public int compare(MyQueue<Runnable> o1, MyQueue<Runnable> o2){
int diff = o2.size() - o1.size();
if(diff==0){
//This is crucial. You must assign unique ids to your
//Subqueue and break the equality if they happen to have same size.
//Otherwise your queues will disappear...
return o1.id - o2.id;
}
return diff;
}
}
Actor 也是这种特定类型问题的另一种解决方案。Scala 有演员,还有由 AKKA 提供的 Java。
我遇到了与您类似的问题,我使用了ExecutorCompletionService
与 anExecutor
一起完成任务的集合。这是从 Java7 开始的 java.util.concurrent API 的摘录:
假设您有一组求解某个问题的求解器,每个求解器返回某种类型的值 Result,并希望同时运行它们,处理每个返回非空值的结果,在某些方法中使用(Result r)。你可以这样写:
void solve(Executor e, Collection<Callable<Result>> solvers)
throws InterruptedException, ExecutionException {
CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
for (Callable<Result> s : solvers)
ecs.submit(s);
int n = solvers.size();
for (int i = 0; i < n; ++i) {
Result r = ecs.take().get();
if (r != null)
use(r);
}
}
因此,在您的场景中,每个任务都将是一个Callable<Result>
,并且任务将被分组在一个Collection<Callable<Result>>
.
参考: http ://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorCompletionService.html