2

我有一个在 Tomcat 容器内运行的简单 Web 服务,它本质上是多线程的。在进入服务的每个请求中,我想对外部服务进行并发调用。java.util.concurrent 中的 ExecutorCompletionService 让我部分获得了成功。我可以为它提供一个线程池,它会负责执行我的并发调用,并且当任何结果准备好时我会收到通知。

处理特定传入请求的代码可能如下所示:

void handleRequest(Integer[] input) {
    // Submit tasks
    CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(Executors.newCachedThreadPool());
    for (final Integer i : input) {
        completionService.submit(new Callable<Integer>() {
            public Integer call() {
                return -1 * i;
            }
        });
    }

    // Do other stuff...

    // Get task results
    try {
        for (int i = 0; i < input.size; i++) {
            Future<Integer> future = completionService.take();
            Integer result = future.get();
            // Do something with the result...
        }
    } catch (Exception e) {
        // Handle exception
    }
}

这应该可以正常工作,但效率很低,因为正在为每个传入的请求分配一个新的线程池。如果我将 CompletionService 作为共享实例移出,我将遇到多个请求共享同一个 CompletionService 和线程池的线程安全问题。当请求提交任务并获得结果时,他们得到的结果并不是他们提交的结果。

因此,我需要的是一个线程安全的 CompletionService,它允许我在所有传入请求之间共享一个公共线程池。当每个线程完成一项任务时,应通知传入请求的相应线程,以便它可以收集结果。

实现这种功能最直接的方法是什么?我确信这种模式已经应用了很多次;我只是不确定这是否是 Java 并发库提供的东西,或者是否可以使用一些 Java 并发构建块轻松构建。

更新:我忘记提及的一个警告是,我希望在我提交的任何任务完成后立即收到通知。这是使用 CompletionService 的主要优势,因为它将任务和结果的生产和消费分离。我实际上并不关心我得到结果的顺序,我想避免在等待结果按顺序返回时不必要的阻塞。

4

4 回答 4

2

你分享Executor但不分享CompletionService

我们有一个AsyncCompleter可以做到这一点并处理所有簿记,允许您:

Iterable<Callable<A>> jobs = jobs();
Iterable<A> results async.invokeAll(jobs);

results按返回顺序迭代并阻塞,直到有结果可用

于 2011-02-16T10:47:44.123 回答
2

java.util.concurrent 提供你需要的一切。如果我正确理解您的问题,您有以下要求:

您要提交请求,并立即(在合理范围内)处理请求结果(响应)。好吧,我相信您已经看到了问题的解决方案:java.util.concurrent.CompletionService。

该服务很简单,结合了 Executor 和 BlockingQueue 来处理 Runnable 和/或 Callable 任务。BlockingQueue 用于保存已完成的任务,您可以让另一个线程等待,直到完成的任务在 CompletionService 对象上排队(通过调用 take() 来完成)。

正如之前的海报所提到的,共享 Executor,并为每个请求创建一个 CompletionService。这似乎是一件昂贵的事情,但再次考虑一下 CS 只是与 Executor 和 BlockingQueue 协作。由于您共享要实例化的最昂贵的对象,即 Executor,我想您会发现这是一个非常合理的成本。

然而......说了这么多,你似乎仍然有一个问题,这个问题似乎是请求处理与响应处理的分离。这可以通过创建一个单独的服务来解决,该服务专门处理所有请求或特定类型请求的响应。

这是一个示例:(注意:这意味着 Request 对象实现了 Callable 接口,它应该返回一个 Response 类型......我在这个简单的示例中省略了详细信息)。

class RequestHandler {

  RequestHandler(ExecutorService responseExecutor, ResponseHandler responseHandler) {      
    this.responseQueue = ...
    this.executor = ...
  }  

  public void acceptRequest(List<Request> requestList) {

    for(Request req : requestList) {

      Response response = executor.submit(req);
      responseHandler.handleResponse(response);

    }  
  }  
}

class ResponseHandler {
  ReentrantLock lock;
  ResponseHandler(ExecutorService responseExecutor) {
    ...
  }

  public void handleResponse(Response res) {
    lock.lock() {
    try {
      responseExecutor.submit( new ResponseWorker(res) );
    } finally {
      lock.unlock();
    }    
  }

  private static class ResponseWorker implements Runnable {

    ResponseWorker(Response response) {
      response = ...
    }

    void processResponse() {         
      // process this response 
    }

    public void run() {      
      processResponse();      
    }  
  }
}

有几件事要记住:第一,ExecutorService 从阻塞队列中执行 Callables 或 Runnables;您的 RequestHandler 接收任务,这些任务在 Executor 上排队,并尽快处理。同样的事情发生在你的 ResponseHandler 中;收到响应,并且只要该 SEPRATE 执行器可以,它将处理该响应。简而言之,您有两个执行器同时工作:一个在 Request 对象上,另一个在 Response 对象上。

于 2011-02-16T20:51:38.700 回答
1

您可以只使用普通的共享 ExecutorService。每当您提交任务时,您都会为刚刚提交的任务获得 Future。您可以将它们全部存储在一个列表中并稍后查询它们。

例子:

private final ExecutorService service = ...//a single, shared instance

void handleRequest(Integer[] input) {
    // Submit tasks
    List<Future<Integer>> futures = new ArrayList<Future<Integer>>(input.length);
    for (final Integer i : input) {
        Future<Integer> future = service.submit(new Callable<Integer>() {
            public Integer call() {
                return -1 * i;
            }
        });
        futures.add(future);
    }

    // Do other stuff...

    // Get task results
    for(Future<Integer> f : futures){
        try {
            Integer result = f.get();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
于 2011-02-16T08:23:00.927 回答
0

为什么需要一个CompletionService

每个线程都可以简单地提交或调用Callables一个“常规”和共享的ExecutorService. 然后每个线程都保留自己的私有Future引用。

此外,Executor它的后代在设计上是线程安全的。您真正想要的是每个线程都可以创建自己的任务并检查它们的结果。

中的 Javadocjava.util.concurrent非常好;它包括使用模式和示例。阅读ExecutorService和其他类型的文档以更好地了解如何使用它们。

于 2011-02-16T08:22:04.277 回答