108

一段时间以来,我一直对ThreadPoolExecutor支持ExecutorService我们这么多人使用的线程池的默认行为感到沮丧。引用 Javadocs:

如果运行的线程数多于 corePoolSize 但少于 maximumPoolSize,则仅当队列已满时才会创建新线程。

这意味着如果您使用以下代码定义线程池,它将永远不会启动第二个线程,因为它LinkedBlockingQueue是无界的。

ExecutorService threadPool =
   new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/,
      TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue */));

只有当您有一个有界队列并且队列已满时,才会启动超过核心数的任何线程。我怀疑大量初级 Java 多线程程序员没有意识到ThreadPoolExecutor.

现在我有特定的用例,这不是最佳的。我正在寻找方法来解决它,而无需编写自己的 TPE 课程。

我的要求是对可能不可靠的第 3 方进行回调的 Web 服务。

  • 我不想让回调与网络请求同步,所以我想使用线程池。
  • 我通常每分钟得到几个这样的,所以我不想拥有newFixedThreadPool(...)大量大部分处于休眠状态的线程。
  • 每隔一段时间,我就会收到大量流量,我想将线程数扩大到某个最大值(比如说 50)。
  • 我需要尽最大努力完成所有回调,因此我想将任何超过 50 个的其他回调排队。我不想通过使用newCachedThreadPool().

在启动更多线程之前ThreadPoolExecutor,队列需要有界和满的位置,我该如何解决这个限制?如何让它在排队任务之前启动更多线程?

编辑:

@Flavio 提出了一个很好的观点,即使用 让ThreadPoolExecutor.allowCoreThreadTimeOut(true)核心线程超时并退出。我考虑过这一点,但我仍然想要核心线程功能。如果可能,我不希望池中的线程数低于核心大小。

4

10 回答 10

58

ThreadPoolExecutor在启动更多线程之前,队列需要有界和满的位置,我该如何解决这个限制。

我相信我终于找到了一个有点优雅(也许有点老套)的解决方案来解决这个限制ThreadPoolExecutor。它涉及扩展LinkedBlockingQueue以使其在已经有一些任务排队时返回falsequeue.offer(...)如果当前线程跟不上排队的任务,TPE 将添加额外的线程。如果池已经处于最大线程,则将RejectedExecutionHandler调用put(...)进入队列的线程。

offer(...)编写一个可以返回false并且从不阻塞的队列当然很奇怪,put()所以这就是黑客部分。但这适用于 TPE 对队列的使用,所以我认为这样做没有任何问题。

这是代码:

// extend LinkedBlockingQueue to force offer() to return false conditionally
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    private static final long serialVersionUID = -6903933921423432194L;
    @Override
    public boolean offer(Runnable e) {
        // Offer it to the queue if there is 0 items already queued, else
        // return false so the TPE will add another thread. If we return false
        // and max threads have been reached then the RejectedExecutionHandler
        // will be called which will do the put into the queue.
        if (size() == 0) {
            return super.offer(e);
        } else {
            return false;
        }
    }
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/,
        60 /*secs*/, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            // This does the actual put into the queue. Once the max threads
            //  have been reached, the tasks will then queue up.
            executor.getQueue().put(r);
            // we do this after the put() to stop race conditions
            if (executor.isShutdown()) {
                throw new RejectedExecutionException(
                    "Task " + r + " rejected from " + e);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
    }
});

有了这个机制,当我向队列提交任务时,ThreadPoolExecutor会:

  1. 最初将线程数扩展到核心大小(此处为 1)。
  2. 将其提供给队列。如果队列为空,它将排队等待由现有线程处理。
  3. 如果队列已经有 1 个或多个元素,offer(...)则将返回 false。
  4. 如果返回 false,则增加池中的线程数,直到它们达到最大数量(此处为 50)。
  5. 如果在最大值,那么它调用RejectedExecutionHandler
  6. 然后RejectedExecutionHandler将任务放入队列中,由第一个可用线程按 FIFO 顺序处理。

尽管在我上面的示例代码中,队列是无界的,但您也可以将其定义为有界队列。例如,如果您向 1000 添加容量,LinkedBlockingQueue那么它将:

  1. 将线程扩展到最大
  2. 然后排队,直到它满了 1000 个任务
  3. 然后阻塞调用者,直到队列有可用空间。

此外,如果您需要offer(...)RejectedExecutionHandlerthen 中使用,您可以使用该offer(E, long, TimeUnit)方法而不是 withLong.MAX_VALUE作为超时。

警告:

如果您希望在执行程序关闭后将任务添加到执行程序,那么您可能希望更聪明地在执行程序服务关闭时抛出RejectedExecutionException我们的自定义。RejectedExecutionHandler感谢@RaduToader 指出这一点。

编辑:

对这个答案的另一个调整可能是询问 TPE 是否有空闲线程,并且只有在有空闲线程时才将项目入队。您必须为此创建一个真正的类并ourQueue.setThreadPoolExecutor(tpe);在其上添加方法。

那么您的offer(...)方法可能类似于:

  1. 检查是否tpe.getPoolSize() == tpe.getMaximumPoolSize()在这种情况下只需调用super.offer(...).
  2. Else if tpe.getPoolSize() > tpe.getActiveCount()then 调用super.offer(...),因为似乎有空闲线程。
  3. 否则返回falsefork 另一个线程。

也许是这样:

int poolSize = tpe.getPoolSize();
int maximumPoolSize = tpe.getMaximumPoolSize();
if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) {
    return super.offer(e);
} else {
    return false;
}

请注意,TPE 上的 get 方法很昂贵,因为它们访问volatile字段或(在 的情况下getActiveCount())锁定 TPE 并遍历线程列表。此外,这里的竞争条件可能会导致任务不正确地入队或在有空闲线程时分叉另一个线程。

于 2013-10-22T21:02:43.070 回答
34

关于这个问题,我已经得到了另外两个答案,但我怀疑这个是最好的。

它基于当前接受的答案的技术,即:

  1. 覆盖队列的offer()方法以(有时)返回false,
  2. 这会导致ThreadPoolExecutor生成新线程或拒绝任务,并且
  3. 将 设置为RejectedExecutionHandler在拒绝时实际排队任务。

问题是什么时候offer()应该返回false。当队列上有几个任务时,当前接受的答案返回 false,但正如我在评论中指出的那样,这会导致不良影响。或者,如果您总是返回 false,即使有线程在队列中等待,您也会继续产生新线程。

解决方案是使用 Java 7LinkedTransferQueueoffer()调用tryTransfer(). 当有一个等待的消费者线程时,任务将被传递给该线程。否则,offer()将返回 false 并且ThreadPoolExecutor将产生一个新线程。

    BlockingQueue<Runnable> queue = new LinkedTransferQueue<Runnable>() {
        @Override
        public boolean offer(Runnable e) {
            return tryTransfer(e);
        }
    };
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue);
    threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                executor.getQueue().put(r);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    });
于 2014-06-30T15:37:07.533 回答
30

将 core size 和 max size 设置为相同的值,并允许使用allowCoreThreadTimeOut(true).

于 2013-10-22T21:22:34.133 回答
7

注意:我现在更喜欢并推荐我的其他答案

这是一个对我来说感觉更直接的版本:每当执行新任务时增加 corePoolSize(直到 maxPoolSize 的限制),然后每当执行一个新任务时减少 corePoolSize(降低到用户指定的“核心池大小”的限制)任务完成。

换句话说,跟踪正在运行或入队的任务数,并确保 corePoolSize 等于任务数,只要它在用户指定的“核心池大小”和 maximumPoolSize 之间。

public class GrowBeforeQueueThreadPoolExecutor extends ThreadPoolExecutor {
    private int userSpecifiedCorePoolSize;
    private int taskCount;

    public GrowBeforeQueueThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        userSpecifiedCorePoolSize = corePoolSize;
    }

    @Override
    public void execute(Runnable runnable) {
        synchronized (this) {
            taskCount++;
            setCorePoolSizeToTaskCountWithinBounds();
        }
        super.execute(runnable);
    }

    @Override
    protected void afterExecute(Runnable runnable, Throwable throwable) {
        super.afterExecute(runnable, throwable);
        synchronized (this) {
            taskCount--;
            setCorePoolSizeToTaskCountWithinBounds();
        }
    }

    private void setCorePoolSizeToTaskCountWithinBounds() {
        int threads = taskCount;
        if (threads < userSpecifiedCorePoolSize) threads = userSpecifiedCorePoolSize;
        if (threads > getMaximumPoolSize()) threads = getMaximumPoolSize();
        setCorePoolSize(threads);
    }
}

正如所写的那样,该类不支持在构造后更改用户指定的 corePoolSize 或 maximumPoolSize,也不支持直接或通过remove()or操作工作队列purge()

于 2013-11-22T19:43:55.383 回答
6

我们有一个ThreadPoolExecutor需要附加creationThreshold和覆盖的子类execute

public void execute(Runnable command) {
    super.execute(command);
    final int poolSize = getPoolSize();
    if (poolSize < getMaximumPoolSize()) {
        if (getQueue().size() > creationThreshold) {
            synchronized (this) {
                setCorePoolSize(poolSize + 1);
                setCorePoolSize(poolSize);
            }
        }
    }
}

也许这也有帮助,但你的当然看起来更有艺术气息……</p>

于 2013-10-23T10:15:50.267 回答
4

推荐的答案仅解决了 JDK 线程池的一 (1) 个问题:

  1. JDK 线程池偏向于排队。因此,他们不会产生新线程,而是将任务排队。只有当队列达到其限制时,线程池才会产生一个新线程。

  2. 负载减轻时不会发生线程退休。例如,如果我们有大量作业进入池,导致池达到最大值,然后是一次最多 2 个任务的轻负载,则池将使用所有线程来为轻负载提供服务,从而防止线程退休。(只需要 2 个线程……)

对上述行为不满意,我继续实施了一个池来克服上述缺陷。

解决 2) 使用 Lifo 调度解决了该问题。这个想法是由 Ben Maurer 在 ACM applicative 2015 会议上提出的: Systems @ Facebook scale

于是一个新的实现诞生了:

LifoThreadPoolExecutorSQP

到目前为止,此实现提高了ZEL的异步执行性能。

该实现具有自旋能力,可以减少上下文切换开销,从而为某些用例提供卓越的性能。

希望能帮助到你...

PS:JDK Fork Join Pool 实现 ExecutorService 并作为“普通”线程池工作,实现是高性能的,它使用 LIFO 线程调度,但是无法控制内部队列大小,退休超时......,最重要的任务不能取消它们时被打断

于 2015-06-20T14:02:16.197 回答
1

注意:我现在更喜欢并推荐我的其他答案

我有另一个建议,遵循更改队列以返回 false 的原始想法。在这个任务中,所有任务都可以进入队列,但是每当一个任务在 之后入队时execute(),我们都会跟随一个哨兵无操作任务,队列拒绝该任务,从而产生一个新线程,该线程将立即执行无操作队列中的一些东西。

因为工作线程可能正在轮询LinkedBlockingQueue新任务,所以即使有可用线程,任务也可能被排队。为了避免在有可用线程的情况下产生新线程,我们需要跟踪队列中有多少线程在等待新任务,并且只有在队列中的任务多于等待线程时才产生新线程。

final Runnable SENTINEL_NO_OP = new Runnable() { public void run() { } };

final AtomicInteger waitingThreads = new AtomicInteger(0);

BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    @Override
    public boolean offer(Runnable e) {
        // offer returning false will cause the executor to spawn a new thread
        if (e == SENTINEL_NO_OP) return size() <= waitingThreads.get();
        else return super.offer(e);
    }

    @Override
    public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
        try {
            waitingThreads.incrementAndGet();
            return super.poll(timeout, unit);
        } finally {
            waitingThreads.decrementAndGet();
        }
    }

    @Override
    public Runnable take() throws InterruptedException {
        try {
            waitingThreads.incrementAndGet();
            return super.take();
        } finally {
            waitingThreads.decrementAndGet();
        }
    }
};

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue) {
    @Override
    public void execute(Runnable command) {
        super.execute(command);
        if (getQueue().size() > waitingThreads.get()) super.execute(SENTINEL_NO_OP);
    }
};
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (r == SENTINEL_NO_OP) return;
        else throw new RejectedExecutionException();            
    }
});
于 2013-11-27T19:15:06.193 回答
0

我能想到的最好的解决方案是扩展。

ThreadPoolExecutor提供了一些钩子方法: beforeExecuteafterExecute. 在您的扩展中,您可以维护使用有界队列来提供任务,并使用第二个无界队列来处理溢出。当有人来电submit时,您可以尝试将请求放入有界队列。如果遇到异常,只需将任务放入溢出队列即可。然后,您可以在完成任务后利用afterExecute钩子查看溢出队列中是否有任何内容。这样,executor 将首先处理它的有界队列中的内容,并在时间允许的情况下自动从这个无界队列中拉取。

它似乎比您的解决方案工作更多,但至少它不涉及给队列提供意外行为。我还想象有一种更好的方法来检查队列和线程的状态,而不是依赖于抛出相当慢的异常。

于 2013-10-22T21:52:53.357 回答
0

下面是使用两个线程池的解决方案,它们的核心和最大池大小相同。当第一个池忙时使用第二个池。

import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MyExecutor {
    ThreadPoolExecutor tex1, tex2;
    public MyExecutor() {
        tex1 = new ThreadPoolExecutor(15, 15, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
        tex1.allowCoreThreadTimeOut(true);
        tex2 = new ThreadPoolExecutor(45, 45, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
        tex2.allowCoreThreadTimeOut(true);
    }

    public Future<?> submit(Runnable task) {
        ThreadPoolExecutor ex = tex1;
        int excessTasks1 = tex1.getQueue().size() + tex1.getActiveCount() - tex1.getCorePoolSize();
        if (excessTasks1 >= 0) {
            int excessTasks2 = tex2.getQueue().size() + tex2.getActiveCount() - tex2.getCorePoolSize();;
            if (excessTasks2 <= 0 || excessTasks2 / (double) tex2.getCorePoolSize() < excessTasks1 / (double) tex1.getCorePoolSize()) {
                ex = tex2;
            }
        }
        return ex.submit(task);
    }
}
于 2021-07-05T10:24:40.253 回答
0

注意:对于 JDK ThreadPoolExecutor,当您有一个有界队列时,您只会在 offer 返回 false 时创建新线程。您可能会使用CallerRunsPolicy获得一些有用的东西,它会创建一些 BackPressure 并直接在调用者线程中调用 run。

我需要从池创建的线程执行任务,并有一个用于调度的无界队列,而池中的线程数可能会在corePoolSizemaximumPoolSize之间增加减少,所以......

我最终从ThreadPoolExecutor进行了完整的复制粘贴更改了一些执行方法,因为 不幸的是这不能通过扩展来完成(它调用私有方法)。

我不想在新请求到达并且所有线程都忙时立即产生新线程(因为我通常有短暂的任务)。我添加了一个阈值,但可以根据您的需要随意更改(也许大多数情况下 IO 最好删除这个阈值)

private final AtomicInteger activeWorkers = new AtomicInteger(0);
private volatile double threshold = 0.7d;

protected void beforeExecute(Thread t, Runnable r) {
    activeWorkers.incrementAndGet();
}
protected void afterExecute(Runnable r, Throwable t) {
    activeWorkers.decrementAndGet();
}
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }

        if (isRunning(c) && this.workQueue.offer(command)) {
            int recheck = this.ctl.get();
            if (!isRunning(recheck) && this.remove(command)) {
                this.reject(command);
            } else if (workerCountOf(recheck) == 0) {
                this.addWorker((Runnable) null, false);
            }
            //>>change start
            else if (workerCountOf(recheck) < maximumPoolSize //
                && (activeWorkers.get() > workerCountOf(recheck) * threshold
                    || workQueue.size() > workerCountOf(recheck) * threshold)) {
                this.addWorker((Runnable) null, false);
            }
            //<<change end
        } else if (!this.addWorker(command, false)) {
            this.reject(command);
        }
    }
于 2019-12-06T15:40:34.223 回答