2

我在伪代码中具有以下功能:

Result calc(Data data) {
  if (data.isFinal()) {
    return new Result(data); // This is the actual lengthy calculation
  } else {
    List<Result> results = new ArrayList<Result>();
    for (int i=0; i<data.numOfSubTasks(); ++i) {
      results.add(calc(data.subTask(i));
    }
    return new Result(results); // merge all results in to a single result
  }
}

我想使用固定数量的线程并行化它。

我的第一次尝试是:

ExecutorService executorService = Executors.newFixedThreadPool(numOfThreads);

Result calc(Data data) {
  if (data.isFinal()) {
    return new Result(data); // This is the actual lengthy calculation
  } else {
    List<Result> results = new ArrayList<Result>();
    List<Callable<Void>> callables = new ArrayList<Callable<Void>>();
    for (int i=0; i<data.numOfSubTasks(); ++i) {
      callables.add(new Callable<Void>() {
        public Void call() {
         results.add(calc(data.subTask(i));
        }
      });
    }
    executorService.invokeAll(callables);  // wait for all sub-tasks to complete
    return new Result(results); // merge all results in to a single result
  }
}

然而,这很快就陷入了死锁,因为在顶级递归级别等待所有线程完成的同时,内部级别也在等待线程变得可用......

如何有效地并行化我的程序而不会出现死锁?

4

2 回答 2

5

您的问题是使用 ThreadPoolExecutor 处理具有依赖关系的任务时的一般设计问题。

我看到两个选项:

1)确保以自下而上的顺序提交任务,这样您就永远不会有一个正在运行的任务依赖于尚未开始的任务。

2)使用“直接切换”策略(参见ThreadPoolExecutor文档):

ThreadPoolExecutor executor = new ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
executor.setRejectedExecutionHandler(new CallerRunsPolicy());

这个想法是使用同步队列,这样任务就不会在真正的队列中等待。拒绝处理程序负责处理没有可用线程运行的任务。使用这个特定的处理程序,提交者线程运行被拒绝的任务。

此执行器配置保证任务永远不会被拒绝,并且您永远不会因为任务间的依赖关系而出现死锁。

于 2013-02-26T08:51:35.957 回答
0

你应该把你的方法分成两个阶段:

  1. 创建所有树直到 data.isFinal() == true
  2. 递归收集结果(只有在合并不产生其他操作/调用时才有可能)

为此,您可以使用[Futures][1]使结果异步。表示 calc 的所有结果都是 Future[Result] 类型。

立即返回 Future 将释放当前线程并为其他线程的处理提供空间。收集结果(new Result(results))后,您应该等待所有结果准备好(ScatterGather-Pattern,您可以使用信号量等待所有结果)。集合本身将遍历一棵树并在单个线程中进行检查(或等待结果到达)。

总体而言,您构建了一个 Futures 树,用于收集结果并仅在线程池中执行“昂贵”的操作。

于 2013-02-26T08:42:03.073 回答