1

我有一个 Java 迭代器,它列出了来自远程位置的项目。项目列表以“页面”形式出现,“获取下一页”操作相当缓慢。(具体来说,我的迭代器被调用S3Find并列出了来自 Amazon S3 的对象)。

所以,为了加快速度,我想预取一个列表页面。为此,我使用了一个ExecutorService和一个Callable/Future模式来预取项目的“页面”。问题是,该迭代器的调用者可能随时放弃操作,而不通知我的班级。例如,考虑以下循环:

for (S3URL f : new S3Find(topdir).withRecurse(true)) {
    // do something with f
    if (some_condition) break;
}

结果,a 有资源泄漏,因为即使没有更多对包含的引用(并且即使下一次预取已经完成)ExecutorService,我用来提交的那个仍然保持活动状态并运行。CallableS3Find

处理这个问题的正确方法是什么?我使用了错误的方法吗?我是否应该ExecutorService为每个预取放弃并使用一个新的裸线程(并在预取完成时终止线程)?请注意,每次获取页面大约需要 500 毫秒,因此相比之下,每次创建一个新线程可能可以忽略不计。我不想要的一件事是要求调用者明确告知S3Find他们已经完成迭代(因为它肯定会被某些人忘记)。

这是当前的预取代码(内部S3Find):

/**
 * This class holds one ObjectListing (one "page"), and also pre-fetches
 * the next page using a {@link S3Find#NextPageGetter} Callable on a
 * separate thread.
 */
private static class Pager {
    private final AmazonS3 s3;
    private ObjectListing currentList;
    private Future<ObjectListing> future;
    private final ExecutorService exec;
    public Pager(AmazonS3 s3, ListObjectsRequest request) {
        this.s3 = s3;
        currentList = s3.listObjects(request);
        exec = Executors.newSingleThreadExecutor();
        future = submitPrefetch();
    }
    public ObjectListing getCurrentPage() {
        return currentList;
    }
    /**
     * Move currentList to the next page, and returns it.
     */
    public ObjectListing getNextPage() {
        if (future == null) return null;
        try {
            currentList = future.get();
            future = submitPrefetch();
        } catch (InterruptedException|ExecutionException e) {
            e.printStackTrace();
        }
        return currentList;
    }
    private Future<ObjectListing> submitPrefetch() {
        if (currentList == null || !currentList.isTruncated()) {
            exec.shutdown();
            return null;
        } else {
            NextPageGetter worker = new NextPageGetter(s3, currentList);
            return exec.submit(worker);
        }
    }
}

/**
 * This class retrieves the "next page" of a truncated ObjectListing.
 * It is meant to be called in a Callable/Future pattern.
 */
private static class NextPageGetter implements Callable<ObjectListing> {
    private final ObjectListing currentList;
    private final AmazonS3 s3;

    public NextPageGetter(AmazonS3 s3, ObjectListing currentList) {
        super();
        this.s3 = s3;
        this.currentList = currentList;
        if (currentList == null || !currentList.isTruncated()) {
            throw new IllegalArgumentException(currentList==null ?
                        "null List" : "List is not truncated");
        }
    }

    @Override
    public ObjectListing call() throws Exception {
        ObjectListing nextList = s3.listNextBatchOfObjects(currentList);
        return nextList;
    }
}
4

2 回答 2

1

这是我遇到过几次的经典问题。数据库连接发生在我身上。

我是否应该放弃 ExecutorService 并为每个预取使用一个新的裸线程(并在预取完成时终止线程)?

我想这是你唯一的选择。我不会费心杀死线程。让它完成它的工作并在后台死去。为下一页创建一个新线程。您需要加入线程并使用某种通用(或其他东西)在调用者和线程AtomicReference之间共享结果列表。S3Find

我不想要的一件事是要求调用者明确通知 S3Find 他们已经完成迭代(因为它肯定会被某些人忘记)。

如果调用者在 try/finally 中调用某种方法,我看不出有任何简单的方法可以“正确”地做到这一点close()您不能以某种方式在 Javadocs 中明确说明这一点吗?这就是我在我的ORMLite 数据库迭代器中所做的。

S3Find s3Find = new S3Find(topdir).withRecurse(true);
try {
    for (S3URL f : s3Find) {
        ...
    }
} finally {
    s3Find.close();
}

然后在S3Find.close()

public void close() {
    exec.shutdown();
}

在 Java 7 中,他们添加了try with resources 构造,该语言会自动关闭任何Closeable资源。这是一个很大的胜利。

于 2012-10-18T22:51:36.443 回答
0

我想我现在有一个解决方案,在使用上面讨论的裸线程时,它非常简单并且非常接近初始版本。它仍然利用了 niceCallable模式,但是使用 aFutureTask而不是 a Future,并且根本没有ExecutorService

我之前错过的关键是FutureTask扩展Runnable,您实际上可以通过new Thread(task). 换句话说:

NextPageGetter worker = new NextPageGetter(s3, currentList);
FutureTask<ObjectListing> future = new FutureTask<>(worker);
new Thread(future).start();

然后:

currentList = future.get();

现在,无论迭代器是否耗尽,所有资源都被愉快地处理掉了。事实上,线程一FutureTask完成就消失了。

为了完整起见,这里是修改后的代码(仅class Pager已更改):

/**
 * This class holds one ObjectListing (one "page"), and also pre-fetches the next page
 * using a {@link S3Find#NextPageGetter} Callable on a separate thread.
 */
private static class Pager {
    private final AmazonS3 s3;
    private ObjectListing currentList;
    private FutureTask<ObjectListing> future;
    public Pager(AmazonS3 s3, ListObjectsRequest request) {
        this.s3 = s3;
        currentList = s3.listObjects(request);
        future = submitPrefetch();
    }
    public ObjectListing getCurrentPage() {
        return currentList;
    }
    /**
     * Move currentList to the next page, and returns it.
     */
    public ObjectListing getNextPage() {
        if (future == null) return null;
        try {
            currentList = future.get();
            future = submitPrefetch();
        } catch (InterruptedException|ExecutionException e) {
            e.printStackTrace();
        }
        return currentList;
    }
    private FutureTask<ObjectListing> submitPrefetch() {
        if (currentList == null || !currentList.isTruncated()) {
            return null;
        } else {
            NextPageGetter worker = new NextPageGetter(s3, currentList);
            FutureTask<ObjectListing> f = new FutureTask<>(worker);
            new Thread(f).start();
            return f;
        }
    }
}
于 2012-10-19T01:06:50.200 回答