6

在我的项目中,我经常使用 Java Futures 处理并发任务。在一个应用程序中,每个并发任务在完成期间都需要相当大的内存。由于其他一些设计选择,该内存是在线程外部创建的对象中创建和引用的(请参阅下面的更详细示例)。

令我惊讶的是,即使在未来任务(即它的计算线程)已经完成之后,future 仍然持有对这个对象的引用。也就是说:如果在其他地方没有对该对象的其他引用,则该对象将不会被释放,除非未来被释放 - 即使任务已经完成。

我天真的想法是限制并发线程的数量会自动限制任务持有的资源(内存)的数量。这不是真的!

考虑下面的代码。在这个例子中,我创建了一些任务。在他们的计算过程中,一个 ArrayList(它是一个外部变量)的大小会增长。该方法返回一个Vector<Future>. 即使任务已经完成,即使 ArrayList 的范围已经离开,Future 仍然持有对 ArrayList 的引用(通过FutureTask.sync.callable)。

总结一下:

  • FutureTask 持有对 Callable 的引用,即使 Callable 已经完成。
  • Callable 保存对计算期间使用的最终外部变量的引用,即使计算已完成。

问题:释放通过 Future 持有的资源的最佳方式是什么?(当然,我知道可调用的局部变量是在线程完成时释放的——这不是我想要的)。

/*
 * (c) Copyright Christian P. Fries, Germany. All rights reserved. Contact: email@christianfries.com.
 *
 * Created on 17.08.2013
 */

package net.finmath.experiments.concurrency;

import java.util.ArrayList;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * @author Christian Fries
 *
 */
public class ConcurrencyTest {

    private ExecutorService executor = Executors.newFixedThreadPool(10);
    private int numberOfDoubles = 1024*1024/8;      // 1 MB
    private int numberOfModels  = 100;              // 100 * 1 MB

    /**
     * @param args
     * @throws ExecutionException 
     * @throws InterruptedException 
     */
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ConcurrencyTest ct = new ConcurrencyTest();
        ct.concurrencyTest();
    }

    /**
     * @throws ExecutionException 
     * @throws InterruptedException 
     */
    public void concurrencyTest() throws InterruptedException, ExecutionException {
        Vector<Double> results = getResults();

        Runtime.getRuntime().gc();
        System.out.println("Allocated memory (only results): " + (Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory()));
    }


    private Vector<Double> getResults() throws InterruptedException, ExecutionException {
        Vector<Future<Double>> resultsFutures = getResultsConcurrently();


        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.HOURS);

        /*
         * At this point, we expect that no reference to the models is held
         * and the memory is freed.
         * However, the Future still reference each "model". 
         */

        Runtime.getRuntime().gc();
        System.out.println("Allocated memory (only futures): " + (Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory()));

        Vector<Double> results = new Vector<Double>(resultsFutures.size());
        for(int i=0; i<resultsFutures.size(); i++) {
            results.add(i, resultsFutures.get(i).get());
        }       

        return results;
    }

    private Vector<Future<Double>> getResultsConcurrently() {

        /*
         * At this point we allocate some model, which represents
         * something our workers work on.
         */
        Vector<ArrayList<Double>> models = new Vector<ArrayList<Double>>(numberOfModels);
        for(int i=0; i<numberOfModels; i++) {
            models.add(i, new ArrayList<Double>());
        }

        /*
         * Work on the models concurrently
         */
        Vector<Future<Double>> results = calculateResults(models);


        /*
         * Return the futures.
         * Note: We expect that no more reference is held to a model
         * once we are running out scope of this function AND the respective worker
         * has completed.
         */
        return results;
    }

    private Vector<Future<Double>> calculateResults(Vector<ArrayList<Double>> models) {
        Vector<Future<Double>> results = new Vector<Future<Double>>(models.size());
        for(int i=0; i<models.size(); i++) {
            final ArrayList<Double> model       = models.get(i);
            final int               modelNumber = i;

            Callable<Double> worker = new  Callable<Double>() {
                public Double call() throws InterruptedException {

                    /*
                     * The models will perform some thread safe lazy init,
                     * which we simulate here, via the following line
                     */
                    for(int j=0; j<numberOfDoubles; j++) model.add(Math.random());

                    /*
                     * Now the worker starts working on the model
                     */
                    double sum = 0.0;
                    for(Double value : model) sum += value.doubleValue();

                    Thread.sleep(1000);

                    Runtime.getRuntime().gc();
                    System.out.println("Model " + modelNumber + " completed. Allocated memory: " + (Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory()));

                    return sum;
                }
            };

            // The following line will add the future result of the calculation to the vector results               
            results.add(i, executor.submit(worker));
        }

        return results;
    }
}

这是调试器/分析器的屏幕截图(这是在另一个示例中完成的)。FutureTask 已经完成(从结果中可以明显看出)。然而,FutureTask 持有对 Callable 的引用。在这种情况下,Callable 持有对包含一些“大”对象的外部最终变量参数的引用。

在此处输入图像描述

(这个例子更真实。这里Obba Server使用并发创建和处理数据处理电子表格 - 取自我的一个项目)。

更新:

鉴于 allprog 和 sbat 的答案,我想添加一些评论:

  • 我接受了 allprog 的答案,因为它是对原始问题如何在未来释放资源的答案。我不喜欢这个解决方案中对外部库的依赖,但在这种情况下,这是一个很好的提示。

  • 也就是说,我首选的解决方案是 sbat 并且在下面我自己的答案中:避免在 call() 完成后在可调用对象中引用“更大”的对象。实际上,我首选的解决方案是避免匿名类实现 Callable。相反,我定义了一个实现 Callable 的内部类,具有一个构造函数,通过其构造函数接收对其他对象的所有引用,并在 call() 实现结束时释放它们。

4

3 回答 3

2

如果您担心未来的任务会保留对模型的引用,您可以尝试替换

final ArrayList<Double> model = models.get(i);

final ArrayList<ArrayList<Double>> modelList = 
            new ArrayList<ArrayList<Double>>();
modelList.add(models.get(i));

然后在你的任务调用方法的开始,做

ArrayList<Double> model = modelList.get(0);

最后写

model = null;
modelList.clear();

至少这在没有强行清除用户提供的模型的情况下改进了您的“玩具示例”。这是否可以应用于您的实际工作我不能说。

于 2013-08-17T21:05:17.907 回答
1

如果您使用默认实现,Future它本身就是一个。FutureTask 将保留对您提交的 Runnable 或 Callable 的引用,因此,这些分配的任何资源都将保留,直到 Future 未被释放。FutureTaskExecutionService

解决此类问题的最佳方法是transform将 Future 转换为另一个 Future,只保留结果并将其返回给调用者。这样您就不需要完成服务或任何额外的黑客攻击。

// Decorate the executor service with listening
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());

// Submit the task
ListenableFuture<Integer> future = service.submit(new Callable<Integer>() {
    public Integer call() throws Exception {
        return new Integer(1234);
    }
});

// Transform the future to get rid of the memory consumption
// The transformation must happen asynchronously, thus the 
// Executor parameter is vital
Futures.transform(future, new AsyncFunction<Integer, Integer>() {
    public ListenableFuture<Integer> apply(Integer input) {
        return Futures.immediateFuture(input);
    }
}, service);

这有效地实现了您描述的方案,但将黑客从您的肩膀上解除了,并且以后将允许更高的灵活性。这带来了 Guava 作为依赖项,但我认为它值得付出代价。我总是使用这个方案。

于 2013-08-17T21:07:26.093 回答
0

我想到的一个解决方案是

  • 在完成服务中使用期货(通过take(),它应该释放这些期货,然后在另一个线程上执行此操作,返回新的期货。外部代码仅引用来自完成服务的期货。

除了其他答案:另一种可能性是避免在可调用中保留对模型的引用,就像在 sbat 答案中一样。以下以不同的方式执行相同的操作(受访问匿名类构造函数的答案的启发):

  • 通过设置器将对象“模型”传递给匿名类的字段,完成后将该字段设置为零。

        for(int i=0; i<models.size(); i++) {
    // REMOVED: final ArrayList<Double> model       = models.get(i);
            final int               modelNumber = i;
    
            Callable<Double> worker = new  Callable<Double>() {
    /*ADDED*/   private ArrayList<Double> model;
    /*ADDED*/   public Callable<Double> setModel(ArrayList<Double> model) { this.model = model; return this; };
    
                public Double call() throws InterruptedException {
    
                    /* ... */
    
                    /*
                     * Now the worker starts working on the model
                     */
                    double sum = 0.0;
    
                    /* ... */
    
    /*ADDED*/       model = null;
                    return sum;
                }
    /*ADDED*/   }.setModel(models.get(i));
    
    
            // The following line will add the future result of the calculation to the vector results
            results.add(i, executor.submit(worker));
        }
    
于 2013-08-17T20:31:45.147 回答