1

我处于以下情况(也许我对整个事情进行了过度设计,或者我处于完全的僵局,但无法想到另一种方式):

  • 取一个或多个FutureTask实现异步计算(侦听网络中的多播数据包,收集不同种类的统计信息),我们将其命名为MulticastStatisticsProvider
  • 采取另一个依赖于第一个任务的计算来执行额外的计算(将统计数据与公式结合以公开一些综合信息),这个计算也是异步的,所以在另一个FutureTask我们将调用的定义中FormulaComputing
  • 问题:我们希望它在调用时FormulaComputing.get(timeout, timeUnit)将超时传播到其内部并且找不到实现此目的的方法。CallableMulticastStatisticsProvider.get(timeout, timeUnit)

在此,我到目前为止实现的代码状态:

  • 这是调用者代码。

    // This is the code creating the formula computing code.
    public FormulaComputing getFormulaComputing() {
      // Retrieve from another service a list of FutureTasks already
      // scheduled for execution in their own ThreadPool.
      List<MulticastStatisticsProvider> requiredTasks = service.getRequiredTasks();
      // Create the formulaComputing task and schedule it for execution
      FormulaComputing formulaComputing = new FormulaComputing(requiredTasks);
      threadPool.execute(formulaComputing);
      return formulaComputing;
    }
    
    // And then, from another caller
    getFormulaComputing().get(10, TimeUnit.SECONDS);
    
  • 这是FormulaComputing代码:

    public class FormulaComputing extends FutureTask<Object> {
      private long timeout;
      private TimeUnit timeUnit;
      private Map<String, Future<Map<String, ? extends AbstractSymposiumMessage>>> promises;
      private Closure<Object> callback;
    
      public FormulaComputing(List<MulticastStatisticsProvider> dependentTasks, Closure<Object> callback) {
        super(new Callable<Object>() {
          @Override
          public Object call() throws Exception {
            List<Object> results = new ArrayList<Object>();
            for (MulticastStatisticsProvider task : dependentTasks) {
              // Here is the problem, you cannot access field values in constructor: "Cannot refer to an instance method while explicitly invoking a constructor".
              results.add(task.get(getTimeout(), getTimeUnit()));
            }
            return callback.call(results.toArray());
          }
        });
      }
    
      @Override
      public Object get(long timeout, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        return super.get(timeout, timeUnit);
      }
    }
    

我曾想象过通过自省来改变私人内部sync领域,用我的自定义方法FutureTask手工制作,但内省和反思通常是可以避免的黑客攻击。Callableget

4

1 回答 1

3

如果您使用Guava,它看起来像是ListenableFutures 的一个很好的例子:

List<ListenableFuture<Object>> requiredTasks = ...;

ListenableFuture<List<Object>> requiredTasksResult = Futures.allAsList(requiredTasks);

ListenableFuture<Object> resultFuture = Futures.transform(requiredTasksResult, new Function<List<Object>, Object>() {
    public Object apply(List<Object> results) {
        // Apply computing formula
    }
}, threadPool); // Function will be executed in threadPool

Object result = resultFuture.get(10, TimeUnit.SECONDS);

您可以通过将其ListenableFuture提交FutureTaskListeningExecutorService或改为使用ListenableFutureTask来获得。

于 2014-01-21T11:16:56.793 回答