0

我想创建一个具有固定线程池大小的单例执行器服务。另一个线程将为 ExecutorService 提供 Callables,我想在执行完成后立即解析 Callables 的结果(最佳)。

我真的不确定如何正确实施。我最初的想法是单例-ES中的一个方法,它通过“submit(callable)”向ExecutorService添加一个Callable,并将生成的Future存储在单例中的HashMap或ArrayList中。另一个线程将在给定的时间间隔内检查 Futures 的结果。

但不知何故,这个解决方案“感觉不对”,而且我没有在其他地方找到这个用例的解决方案,所以我在编写一些我以后后悔的东西之前问你们。你会如何处理这个问题?

我期待着您的回复!

4

4 回答 4

1

您可以使用MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(THREAD_NUMBER));创建服务并使用 guava ListenableFuture 立即解析结果,也可以使用您的自行车来聆听未来的结果。

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
  public Explosion call() {
    return pushBigRedButton();
  }
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
  // we want this handler to run immediately after we push the big red button!
  public void onSuccess(Explosion explosion) {
    walkAwayFrom(explosion);
  }
  public void onFailure(Throwable thrown) {
    battleArchNemesis(); // escaped the explosion!
  }
});
于 2013-10-23T12:05:33.823 回答
1
import java.util.concurrent.*;

public class PostProcExecutor extends ThreadPoolExecutor {

  // adjust the constructor to your desired threading policy
  public PostProcExecutor(int corePoolSize, int maximumPoolSize,
      long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
  }

  @Override
  protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable) {
      @Override
      protected void done()
      {
        if(!isCancelled()) try {
          processResult(get());
        } catch(InterruptedException ex) {
          throw new AssertionError("on complete task", ex);
        } catch(ExecutionException ex) {
          // no result available
        }
      }
    };
  }

  protected void processResult(Object o)
  {
    System.out.println("Result "+o);// do your post-processing here
  }
}
于 2013-10-23T12:07:19.173 回答
1

使用ExecutorCompletionService。这样,您可以在Callable准备好后立即获得 (s) 的结果。完成服务的take方法会阻塞等待每个任务完成。

以下是来自 java 文档的示例:

void solve(Executor e,
            Collection<Callable<Result>> solvers)
     throws InterruptedException, ExecutionException {
     CompletionService<Result> ecs
         = new ExecutorCompletionService<Result>(e);
     for (Callable<Result> s : solvers)
         ecs.submit(s);
     int n = solvers.size();
     for (int i = 0; i < n; ++i) {
         Result r = ecs.take().get();
         if (r != null)
             use(r);
     }
 }
于 2013-10-23T12:10:54.613 回答
1

您可以使用ExecutorCompletionService来实现它。

以下步骤可以为您提供一些帮助。

  1. 使用Runtime.getRuntime().availableProcessors()填充可用处理器的数量。让我们将值保留在变量 availableProcessors 中。

  2. 初始化ExecutorService,比如service = Executors.newFixedThreadPool(availableProcessors)

  3. 初始化ExecutorCompletionService,假设Callable的结果是一个整数数组Integer[],ExecutorCompletionService completionService = new ExecutorCompletionService(service)

  4. 使用completionService.submit提交任务。

  5. 使用completionService.take().get()获取任务的每个结果(可调用)。

基于以上步骤你可以得到所有callable的结果,做一些你想做的事情。

于 2013-10-23T12:25:17.937 回答