3

我们遇到了一点问题。:)

我们要确保任何时候只有 N 个线程在做后台任务。为此,我们使用了一个固定的线程池执行器。它似乎工作正常。

然后我们发现了一个问题。假设你有一个类,它使用 executor 来做一些并行工作,然后它在 executor 线程中调用其他一些类,它也做一些并行工作,打算等待它。这是发生的事情:

  • 主线程调用第一级方法。
  • 这种方法认为它可以并行化为 16 个任务并将其工作拆分。
  • 16 个任务提交给执行者。
  • 主线程开始等待其任务完成。
  • 假设有四个线程可用,前四个任务每个都被拾取并运行。所以队列中还有 12 个任务。
  • 现在,其中一项任务调用了其他方法。
  • 这种新方法认为它可以并行化为 2 个任务。假设这是并行合并排序的第一步或类似的事情。
  • 2个任务提交给执行者。
  • 该线程现在开始等待其任务完成。

哦哦。所以在这一点上,所有四个线程现在都在等待任务完成,但它们正在协作阻止实际运行这些任务的执行程序。

此问题的解决方案 1 如下:在向执行程序提交新任务时,如果我们已经在运行所有线程,并且我们已经在其中一个执行程序线程上运行,则内联运行任务。这工作了 10 个月,但现在我们遇到了问题。如果它正在提交的新任务仍然相对较大,那么您可能会遇到新任务阻止该方法将其他任务添加到队列中的情况,否则这些任务将能够被其他工作线程拾取。因此,当线程在线处理工作时,您会遇到巨大的延迟。

对于执行潜在无界的后台任务树的核心问题,是否有更好的解决方案?我知道 .NET 相当于 executor 服务具有某种从队列中窃取的内置能力,可以防止发生原始死锁问题,据我所知,这是一个理想的解决方案。但是在 Java 的土地上呢?

4

3 回答 3

3

Java 7 有一个概念,ForkJoinPool它允许一个任务通过将另一个任务提交给同一个 Executor 来“分叉”另一个任务。然后,它可以选择稍后尝试“帮助加入”该任务,如果它尚未运行,则尝试运行它。

我相信在 Java 6 中可以通过简单地结合Executorwith来完成同样的事情FutureTask。像这样:

public class Fib implements Callable<Integer> {
    int n;
    Executor exec;

    Fib(final int n, final Executor exec) {
        this.n = n;
        this.exec = exec;
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public Integer call() throws Exception {
        if (n == 0 || n == 1) {
            return n;
        }

        //Divide the problem
        final Fib n1 = new Fib(n - 1, exec);
        final Fib n2 = new Fib(n - 2, exec);

        //FutureTask only allows run to complete once
        final FutureTask<Integer> n2Task = new FutureTask<Integer>(n2);
        //Ask the Executor for help
        exec.execute(n2Task);

        //Do half the work ourselves
        final int partialResult = n1.call();

        //Do the other half of the work if the Executor hasn't
        n2Task.run();

        //Return the combined result
        return partialResult + n2Task.get();
    }

}        
于 2011-03-10T01:34:15.353 回答
1

您可以使用回调而不是让您的线程等待任务完成。您的任务本身需要回调,因为它们提交了更多任务。

例如:

public class ParallelTask implements Runnable, Callback {
  private final Callback mCB;
  private final int mNumChildTasks;
  private int mTimesCalledBack = 0;
  private final Object mLock = new Object();
  private boolean mCompleted = false;
  public ParallelTask(Callback cb) {
    mCB = cb;
    mNumChildTasks = N; // the number of direct child tasks you know this task will spawn
    // only going down 1 generation
    // of course you could figure this number out in the run method (will need to be volatile if so)
   // just as long as it is set before submitting any child tasks for execution
  }

  @Override
  public void run() {
    // do your stuff
    // and submit your child tasks, but don't wait on them to complete
    synchronized(mLock) {
      mCompleted = true;
      if (mNumChildTasks == mTimesCalledBack) {
        mCB.taskCompleted();
      }
    }
  }

  // Callback interface
  // taskCompleted is being called from the threads that this task's children are running in
  @Override
  public void taskCompleted() {
    synchronized(mLock) {
      mTimesCalledBack++;
      // only call our parent back if our direct children have all called us back
      // and our own task is done
      if (mCompleted && mTimesCalledBack == mNumChildTasks) {
        mCB.taskCompleted();
      }
    }
  }
}

在您的主线程中,您提交根任务并注册一些要执行的回调。

由于所有子任务在其子任务报告完成之前都不会报告完成,因此在一切完成之前不应调用您的根回调。

我是即时写的,没有测试或编译它,所以可能会有一些错误。

于 2011-03-10T02:01:50.113 回答
0

问题似乎是任务也试图并行化自己,这使得难以避免资源限制。为什么需要这样做?为什么不总是内联运行子任务?

如果您已经通过并行化充分利用了 CPU,那么就通过将工作再次划分为较小的任务所完成的整体工作而言,您不会购买太多。

于 2011-03-10T01:34:54.227 回答