我有一个 Java 迭代器,它列出了来自远程位置的项目。项目列表以“页面”形式出现,“获取下一页”操作相当缓慢。(具体来说,我的迭代器被调用S3Find
并列出了来自 Amazon S3 的对象)。
所以,为了加快速度,我想预取一个列表页面。为此,我使用了一个ExecutorService
和一个Callable
/Future
模式来预取项目的“页面”。问题是,该迭代器的调用者可能随时放弃操作,而不通知我的班级。例如,考虑以下循环:
for (S3URL f : new S3Find(topdir).withRecurse(true)) {
// do something with f
if (some_condition) break;
}
结果,a 有资源泄漏,因为即使没有更多对包含的引用(并且即使下一次预取已经完成)ExecutorService
,我用来提交的那个仍然保持活动状态并运行。Callable
S3Find
处理这个问题的正确方法是什么?我使用了错误的方法吗?我是否应该ExecutorService
为每个预取放弃并使用一个新的裸线程(并在预取完成时终止线程)?请注意,每次获取页面大约需要 500 毫秒,因此相比之下,每次创建一个新线程可能可以忽略不计。我不想要的一件事是要求调用者明确告知S3Find
他们已经完成迭代(因为它肯定会被某些人忘记)。
这是当前的预取代码(内部S3Find
):
/**
* This class holds one ObjectListing (one "page"), and also pre-fetches
* the next page using a {@link S3Find#NextPageGetter} Callable on a
* separate thread.
*/
private static class Pager {
private final AmazonS3 s3;
private ObjectListing currentList;
private Future<ObjectListing> future;
private final ExecutorService exec;
public Pager(AmazonS3 s3, ListObjectsRequest request) {
this.s3 = s3;
currentList = s3.listObjects(request);
exec = Executors.newSingleThreadExecutor();
future = submitPrefetch();
}
public ObjectListing getCurrentPage() {
return currentList;
}
/**
* Move currentList to the next page, and returns it.
*/
public ObjectListing getNextPage() {
if (future == null) return null;
try {
currentList = future.get();
future = submitPrefetch();
} catch (InterruptedException|ExecutionException e) {
e.printStackTrace();
}
return currentList;
}
private Future<ObjectListing> submitPrefetch() {
if (currentList == null || !currentList.isTruncated()) {
exec.shutdown();
return null;
} else {
NextPageGetter worker = new NextPageGetter(s3, currentList);
return exec.submit(worker);
}
}
}
/**
* This class retrieves the "next page" of a truncated ObjectListing.
* It is meant to be called in a Callable/Future pattern.
*/
private static class NextPageGetter implements Callable<ObjectListing> {
private final ObjectListing currentList;
private final AmazonS3 s3;
public NextPageGetter(AmazonS3 s3, ObjectListing currentList) {
super();
this.s3 = s3;
this.currentList = currentList;
if (currentList == null || !currentList.isTruncated()) {
throw new IllegalArgumentException(currentList==null ?
"null List" : "List is not truncated");
}
}
@Override
public ObjectListing call() throws Exception {
ObjectListing nextList = s3.listNextBatchOfObjects(currentList);
return nextList;
}
}