8

BoundedExecutorJava Concurrency in Practice 一书中的实现有些奇怪。

当 Executor 中有足够多的线程排队或运行时,它应该通过阻塞提交线程来限制向 Executor 提交的任务。

这是实现(在 catch 子句中添加缺少的重新抛出之后):

public class BoundedExecutor {
    private final Executor exec;
    private final Semaphore semaphore;

    public BoundedExecutor(Executor exec, int bound) {
        this.exec = exec;
        this.semaphore = new Semaphore(bound);
    }

    public void submitTask(final Runnable command) throws InterruptedException, RejectedExecutionException {
        semaphore.acquire();

        try {
            exec.execute(new Runnable() {
                @Override public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            semaphore.release();
            throw e;
        }
    }

当我BoundedExecutor用 anExecutors.newCachedThreadPool()和 4 来实例化 时,我希望缓存线程池实例化的线程数永远不会超过 4。但实际上,确实如此。我得到了这个小测试程序来创建多达 11 个线程:

public static void main(String[] args) throws Exception {
    class CountingThreadFactory implements ThreadFactory {
        int count;

        @Override public Thread newThread(Runnable r) {
            ++count;
            return new Thread(r);
        }           
    }

    List<Integer> counts = new ArrayList<Integer>();

    for (int n = 0; n < 100; ++n) {
        CountingThreadFactory countingThreadFactory = new CountingThreadFactory();
        ExecutorService exec = Executors.newCachedThreadPool(countingThreadFactory);

        try {
            BoundedExecutor be = new BoundedExecutor(exec, 4);

            for (int i = 0; i < 20000; ++i) {
                be.submitTask(new Runnable() {
                    @Override public void run() {}
                });
            }
        } finally {
            exec.shutdown();
        }

        counts.add(countingThreadFactory.count);
    }

    System.out.println(Collections.max(counts));
}

我认为在信号量的释放和任务结束之间有一个很小的时间框架,另一个线程可以在释放线程尚未完成时获得许可并提交任务。换句话说,它有一个竞争条件。

有人可以证实这一点吗?

4

3 回答 3

10

BoundedExecutor 确实旨在说明如何限制任务提交,而不是作为限制线程池大小的一种方式。正如至少有一条评论指出的那样,有更直接的方法可以实现后者。

但是其他答案没有提到书中说使用无界队列和

将信号量的界限设置为等于池大小加上您希望允许的排队任务数,因为信号量限制了当前正在执行和等待执行的任务数。[JCiP,第 8.3.3 节结束]

通过提到无界队列和池大小,我们暗示(显然不是很清楚)使用有界大小的线程池。

然而,BoundedExecutor 一直困扰着我的是它没有实现 ExecutorService 接口。实现类似功能并仍然实现标准接口的现代方法是使用 Guava 的listenerDecorator方法和ForwardingListeningExecutorService类。

于 2012-04-11T15:51:41.580 回答
5

您对比赛条件的分析是正确的。ExecutorService 和 Semaphore 之间没有同步保证。

但是,我不知道限制线程数是否是 BoundedExecutor 的用途。我认为更多的是限制提交给服务的任务数量。想象一下,如果您有 500 万个任务需要提交,并且如果您提交的任务超过 10,000 个,那么您的内存就会用完。

好吧,在任何给定时间你只会有 4 个线程在运行,你为什么要尝试将所有 500 万个任务排队?您可以使用与此类似的构造来限制在任何给定时间排队的任务数量。您应该从中得到的是,在任何给定时间只有 4 个任务在运行。

显然,解决方案是使用Executors.newFixedThreadPool(4).

于 2012-04-10T18:18:04.597 回答
2

我看到一次创建了多达 9 个线程。我怀疑存在竞争条件导致线程数量超过所需数量。

这可能是因为在运行任务之前和之后有工作要做。这意味着即使您的代码块中只有 4 个线程,也有许多线程停止前一个任务或准备开始一个新任务。

即线程在它仍在运行时执行释放()。即使它是你做的最后一件事,它也不是在获得新任务之前做的最后一件事。

于 2012-04-10T18:22:07.630 回答