2

我有一个任务管道(管道中的每个任务都有不同的并行度要求),每个任务都在不同的 ExecutorService 中工作。任务处理数据包,因此如果我们有 10 个数据包,那么将提交 10 个任务service1,每个数据包一个任务。一旦提交的任务service1实际调用,它可能会提交一个新任务以进一步处理数据包到service2service3不。

以下代码工作正常,即:

  • shutdown()service1在所有内容提交后调用service1
  • 然后 awaitTermination() 不会返回,直到在 shutdown() 之前提交的所有任务都实际完成运行。--shutdown()然后调用 onservice2但是因为所有提交的任务service1都已完成,并且所有任务都提交到service2from tasks 上的service1所有任务都已提交过service2shutdown()调用 on service2。-- 以此类推service3

    ExecutorService[] services = {
            service1,
            service2,
            service3};
    
    int count = 0;
    for(ExecutorService service: services)
    {   
        service.shutdown();
        service.awaitTermination(1, TimeUnit.HOURS);
    }
    

但是,我现在添加了一个案例,service2可以将数据包分解为较小的数据包并提交额外的任务service2,但代码现在失败了。问题是一旦所有任务完成后shutdown()就会调用它,但现在我们想从正在运行的任务提交额外的任务service2service1service2service2

我的问题:

  1. shutdown()是在所有提交的任务完成运行后重新运行,还是立即返回但不停止已经提交的任务运行?更新:在下面回答
  2. 如何解决我的新问题?
4

5 回答 5

2

“关闭”只是告诉池不要再接受任何工作。它什么也不做。所有现有提交的工作将正常执行。当队列耗尽时,池实际上会销毁它的所有线程并终止。

这里的问题是你说 service2 中的任务会将额外的任务提交给 service2 进行处理。似乎没有办法知道什么时候应该真正调用关机。但是,唉,还有另一种选择,假设这些较小的数据包不会进一步分解为服务。

List<Future<Void>> service2Futures = new ArrayList<Future<Void>>();

service2Futures.add(service2.submit(new Callable<Void>() {
  public Void call() throws Exception {
    // do your work, submit more stuff to service2
    // if you submit Callables, you could use Future.get() to wait on those
    // results.
    return null;
  }
}));

for (Future<Void> future : service2Futures) {
  future.get();
}

service2.shutdown();
...

这里发生的事情是您正在为顶级提交的任务存储 Future 对象(您必须使用 Callable 而不是 Runnable)。您只需收集 Future 对象,而不是在提交后立即关闭池。然后,您通过循环遍历它们,并在每个对象上调用 get() 等到它们全部运行完毕。“get()”方法会阻塞,直到运行该任务的线程完成。

此时,所有顶级任务都已完成,他们将提交二级任务。您现在可以发出关机命令。这假设二级任务不会向 service2 提交更多内容。

综上所述,如果您使用的是 java 7,则应该考虑改用 ForkJoinPool 和 RecursiveTask。它可能对你正在做的事情更有意义。

ForkJoinPool forkJoinPool = new ForkJoinPool();
RecursiveAction action = new RecursiveAction() {
    protected void compute() {
         // break down here and build actions
         RecursiveAction smallerActions[] = ...; 
         invokeAll(smallerActions);
    }
};

Future<Void> future = forkJoinPool.submit(action);
于 2012-06-08T13:40:05.253 回答
1

ExecutorService#shutdown让已经提交的任务完成他们正在做的任何事情 - javadoc 提取

启动有序关闭,其中执行先前提交的任务,但不会接受新任务。如果已经关闭,调用没有额外的效果。
此方法不等待先前提交的任务完成执行。使用 awaitTermination 来做到这一点。

在实践中,您可以认为调用 toshutdown做了几件事:

  1. ExecutorService不能再接受新工作了
  2. 现有线程一旦完成运行就会终止

所以回答你的问题:

  • service1如果您在调用之前已经提交了所有任务service1.shutdown(如果您在该调用之后提交任何内容,无论如何都会收到异常),那么您很好(即,如果这些任务向 service2 提交了某些内容并且 service2 没有关闭,它们将被执行) .
  • shutdown 立即返回,并且不保证已经提交的任务会停止(它们可以永远运行)。
  • 您遇到的问题可能与您如何将任务从一项服务提交到另一项服务有关,仅凭您提供的信息似乎很难解决。

最好的方法是在您的问题中包含一个SSCCE,以复制您所看到的行为。

于 2012-06-08T12:05:24.853 回答
1

您应该跟踪任务本身,而不是关闭 ExecutorService。他们可以传递一个“工作状态”对象,用于跟踪未完成的工作,例如:

public class JobState {
  private int _runningJobs;

  public synchronized void start() {
    ++_runningJobs;
  }

  public synchronized void finish() {
    --_runningJobs;
    if(_runningJobs == 0) { notifyAll(); }
  }

  public synchronized void awaitTermination() {
    while(_runningJobs > 0) { wait() }
  }
}

public class SomeJob implements Runnable {
  private final JobState _jobState;

  public void run() {
    try {
      // ... do work here, possibly submitting new jobs, and pass along _jobState
    } finally {
      _jobState.finish();
    }
  }
}

// utility method to start a new job
public static void submitJob(ExecutorService executor, Runnable runnable, JobState jobState) {
  // call start _before_ submitting
  jobState.start();
  executor.submit(runnable);
}

// start main work
JobState state = new JobState();
Runnable firstJob = new SomeJob(state);
submitJob(executor, firstJob, state);
state.awaitTermination();
于 2012-06-08T12:59:12.660 回答
0

当您调用关机时,它不会等待所有任务完成。像使用 awaitTermination 一样进行操作。

但是一旦调用了关机 - 新任务就会被阻止。您的执行器服务拒绝所有新任务。对于 RejectedExecutionHandler 中的 ThreadPoolExecutor 拒绝任务句柄。如果您指定自定义处理程序,您可以在关闭任务后处理拒绝。这是解决方法之一。

于 2012-06-08T12:26:26.760 回答
0

Matts 的问题看起来可能很有效,但我担心它可能会导致新问题。

我想出了一个解决方案,它不需要对我的场景进行很多代码更改,虽然它看起来有点笨拙

我引入了一项新服务 (service2a),它运行与 service2 相同的任务。当 service2 中的任务想要提交一个小数据包时,它会将其提交给 service2a 而不是 service2,因此所有子数据包都会在服务 2 关闭之前提交给 service2a。这对我有用,因为较小的数据包不需要分解为更多的子包,并且子包的想法仅适用于 service2(a) 而不是任何其他服务。

于 2012-06-08T14:10:13.193 回答