0

我正在与我无法控制的遗留库集成。

它定义了以下接口:

interface Factory {
    Future<Void> subscribe(Context c);
}

这种“订阅”方法被不同的线程频繁调用。我关心“Future.get()”结果的唯一情况是它失败了,所以我可以获取并处理异常。这不必在调用线程中发生。另外,在我的情况下,阻塞“Future.get()”上的调用线程可能会非常昂贵,因为即使成功也可能需要几秒钟才能完成。

所以我的任务是以某种方式“后处理”所有这些期货,过滤失败的期货。基本上,我可以看到两种可能的方法:

方法#1:

在获得 Future 的实例后,将单独的 Runnable 提交给将执行所需操作的外部执行程序:

    executor.submit(
        new Runnable(){
            @Override
            public void run() {
                try {
                    future.get();
                } catch(Exception e){
                    // process the exception
                }
            }
        }
    );

这种方法的缺点是我们仍然可能长时间阻塞线程。正如我所说,这个代码片段将被相当频繁地执行。

方法#2:

在获得 Future 的实例后,将其放置到某个集合中,并指定一个单独的线程,该线程将定期运行该集合的元素进行处理:

    while(true){
        Iterator<Future<Void>> iterator = collection.iterator();
        while(iterator.hasNext()){
            Future<Void> future = iterator.next();
            if(future.isDone()){
                try {
                    future.get();
                } catch(Exception e){
                    // process the exception
                } finally {
                    iterator.remove();
                }
            }
        }

        TimeUnit.MILLISECONDS.sleep(1000); // sleep
    }

你怎么看?有没有更好的方法来解决这个问题?

4

1 回答 1

1

由于您无法创建Future最佳选项,因此请Future使用自定义的Future.

因此,在您的情况下,我推荐一种可能看起来像您的两个选项混合的模式。将Futures 添加到(线程安全)队列中,并将Runnables 提交给执行程序,该执行程序在循环中处理所有项目。所以你可以通过配置来限制线程的数量Executor,即没有Futures那么多的线程但仍然可以有多个并且不必一直保持这些后处理线程处于活动状态。

为了避免重新排队未完成的项目时出现无限循环,请使用本地集合将待处理的项目与重新排队的项目分开:

static BlockingQueue<Future<?>> PENDING = …;
static int MAX_ITEMS_PER_JOB = …;
…
/*scheduling code …*/new Runnable() {
  public void run()
  {
    ArrayList<Future<?>> myLocalItems=new ArrayList<>();
    PENDING.drainTo(myLocalItems, MAX_ITEMS_PER_JOB);
    for(Future<?> f:myLocalItems) {
      if(!f.isDone()) PENDING.offer(f); // re-queue
      try {
        f.get();
      } catch(ExecutionException ex) {
        handleException(ex.getCause());
      }
    }
  }
};

因此,这Runnable将检查并处理有限数量的Futures 然后返回,因此如果有很多项目处于待处理状态,则适合多次提交以进行并行处理,但如果待处理的数量较少,则不会造成任何伤害,因为作业不会挂起如果无事可做。它们甚至适合scheduleWithFixedDelay使用ScheduledExecutorService.

于 2013-10-08T09:21:54.627 回答