在我的项目中,我经常使用 Java Futures 处理并发任务。在一个应用程序中,每个并发任务在完成期间都需要相当大的内存。由于其他一些设计选择,该内存是在线程外部创建的对象中创建和引用的(请参阅下面的更详细示例)。
令我惊讶的是,即使在未来任务(即它的计算线程)已经完成之后,future 仍然持有对这个对象的引用。也就是说:如果在其他地方没有对该对象的其他引用,则该对象将不会被释放,除非未来被释放 - 即使任务已经完成。
我天真的想法是限制并发线程的数量会自动限制任务持有的资源(内存)的数量。这不是真的!
考虑下面的代码。在这个例子中,我创建了一些任务。在他们的计算过程中,一个 ArrayList(它是一个外部变量)的大小会增长。该方法返回一个Vector<Future>
. 即使任务已经完成,即使 ArrayList 的范围已经离开,Future 仍然持有对 ArrayList 的引用(通过FutureTask.sync.callable
)。
总结一下:
- FutureTask 持有对 Callable 的引用,即使 Callable 已经完成。
- Callable 保存对计算期间使用的最终外部变量的引用,即使计算已完成。
问题:释放通过 Future 持有的资源的最佳方式是什么?(当然,我知道可调用的局部变量是在线程完成时释放的——这不是我想要的)。
/*
* (c) Copyright Christian P. Fries, Germany. All rights reserved. Contact: email@christianfries.com.
*
* Created on 17.08.2013
*/
package net.finmath.experiments.concurrency;
import java.util.ArrayList;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* @author Christian Fries
*
*/
public class ConcurrencyTest {
private ExecutorService executor = Executors.newFixedThreadPool(10);
private int numberOfDoubles = 1024*1024/8; // 1 MB
private int numberOfModels = 100; // 100 * 1 MB
/**
* @param args
* @throws ExecutionException
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException, ExecutionException {
ConcurrencyTest ct = new ConcurrencyTest();
ct.concurrencyTest();
}
/**
* @throws ExecutionException
* @throws InterruptedException
*/
public void concurrencyTest() throws InterruptedException, ExecutionException {
Vector<Double> results = getResults();
Runtime.getRuntime().gc();
System.out.println("Allocated memory (only results): " + (Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory()));
}
private Vector<Double> getResults() throws InterruptedException, ExecutionException {
Vector<Future<Double>> resultsFutures = getResultsConcurrently();
executor.shutdown();
executor.awaitTermination(1, TimeUnit.HOURS);
/*
* At this point, we expect that no reference to the models is held
* and the memory is freed.
* However, the Future still reference each "model".
*/
Runtime.getRuntime().gc();
System.out.println("Allocated memory (only futures): " + (Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory()));
Vector<Double> results = new Vector<Double>(resultsFutures.size());
for(int i=0; i<resultsFutures.size(); i++) {
results.add(i, resultsFutures.get(i).get());
}
return results;
}
private Vector<Future<Double>> getResultsConcurrently() {
/*
* At this point we allocate some model, which represents
* something our workers work on.
*/
Vector<ArrayList<Double>> models = new Vector<ArrayList<Double>>(numberOfModels);
for(int i=0; i<numberOfModels; i++) {
models.add(i, new ArrayList<Double>());
}
/*
* Work on the models concurrently
*/
Vector<Future<Double>> results = calculateResults(models);
/*
* Return the futures.
* Note: We expect that no more reference is held to a model
* once we are running out scope of this function AND the respective worker
* has completed.
*/
return results;
}
private Vector<Future<Double>> calculateResults(Vector<ArrayList<Double>> models) {
Vector<Future<Double>> results = new Vector<Future<Double>>(models.size());
for(int i=0; i<models.size(); i++) {
final ArrayList<Double> model = models.get(i);
final int modelNumber = i;
Callable<Double> worker = new Callable<Double>() {
public Double call() throws InterruptedException {
/*
* The models will perform some thread safe lazy init,
* which we simulate here, via the following line
*/
for(int j=0; j<numberOfDoubles; j++) model.add(Math.random());
/*
* Now the worker starts working on the model
*/
double sum = 0.0;
for(Double value : model) sum += value.doubleValue();
Thread.sleep(1000);
Runtime.getRuntime().gc();
System.out.println("Model " + modelNumber + " completed. Allocated memory: " + (Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory()));
return sum;
}
};
// The following line will add the future result of the calculation to the vector results
results.add(i, executor.submit(worker));
}
return results;
}
}
这是调试器/分析器的屏幕截图(这是在另一个示例中完成的)。FutureTask 已经完成(从结果中可以明显看出)。然而,FutureTask 持有对 Callable 的引用。在这种情况下,Callable 持有对包含一些“大”对象的外部最终变量参数的引用。
(这个例子更真实。这里Obba Server使用并发创建和处理数据处理电子表格 - 取自我的一个项目)。
更新:
鉴于 allprog 和 sbat 的答案,我想添加一些评论:
我接受了 allprog 的答案,因为它是对原始问题如何在未来释放资源的答案。我不喜欢这个解决方案中对外部库的依赖,但在这种情况下,这是一个很好的提示。
也就是说,我首选的解决方案是 sbat 并且在下面我自己的答案中:避免在 call() 完成后在可调用对象中引用“更大”的对象。实际上,我首选的解决方案是避免匿名类实现 Callable。相反,我定义了一个实现 Callable 的内部类,具有一个构造函数,通过其构造函数接收对其他对象的所有引用,并在 call() 实现结束时释放它们。