5

ThreadPoolExecutor文档说

如果 corePoolSize 或更多线程正在运行,Executor 总是更喜欢排队请求而不是添加新线程。


如果运行的线程数多于 corePoolSize 但少于 maximumPoolSize,则仅当队列已满时才会创建新线程。

有没有办法让执行程序更喜欢创建新线程,直到达到最大值,即使有超过核心大小的线程,然后开始排队?如果队列达到最大大小,任务将被拒绝。如果在处理繁忙的突发后超时设置将启动并将线程删除到核心大小,那就太好了。我看到了喜欢排队以进行节流的原因;但是,这种定制还允许队列主要充当尚未运行的任务列表。

4

4 回答 4

5

无法使用ThreadPoolExecutor.

但是,这里有几个解决方案:

  1. 考虑,

    • 如果少于corePoolSize线程正在运行,则将为排队的每个项目创建一个新线程,直到coorPoolSize线程正在运行。

    • 仅当队列已满且运行的线程数少于时,才会创建新线程maximumPoolSize

    因此,将 ThreadPoolExecutor 包装在一个监控项目排队速度的类中。然后,在提交许多项目时将核心池大小更改为更高的值。这将导致每次提交新项目时创建一个新线程。

    提交突发完成后,需要再次手动减少核心池大小,以便线程自然超时。如果您担心忙突发可能会突然结束,导致手动方法失败,请务必使用allowCoreThreadTimeout

  2. 创建固定线程池,并允许CoreThreadTimeout

    不幸的是,这在低提交突发期间使用了更多线程,并且在零流量期间不存储空闲线程。

如果您有时间、需要和倾向,请使用第一种解决方案,因为它将处理更广泛的提交频率,因此在灵活性方面是一个更好的解决方案。

否则使用第二种解决方案。

于 2013-08-14T17:09:47.377 回答
2

只需执行Executors.newFixedThreadPool并设置core和设置max相同的值即可。这是newFixedThreadPoolJava 6 的源代码:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

如果你有一个现有的,你可以做什么:

ThreadPoolExecutor tpe = ... ;
tpe.setCorePoolSize(tpe.getMaxPoolSize());

编辑:正如威廉在评论中指出的那样,这意味着所有线程都是核心线程,因此没有一个线程会超时并终止。要更改此行为,只需使用ThreadPoolExecutor.allowCoreThreadTimeout(true). 这将使线程可以在执行程序不使用时超时并被清除。

于 2013-08-14T17:09:39.017 回答
1

您的偏好似乎是在低活动期间的延迟最小。为此,我只需将 corePoolSize 设置为最大值并让额外的线程挂起。在高活动期间,这些线程无论如何都会存在。在低活动时期,它们的存在不会产生太大影响。如果你想让它们死掉,你可以设置核心线程超时。

这样,所有线程将始终可用于尽快执行任务。

于 2013-08-14T17:12:24.407 回答
0

自定义阻塞队列

package com.gunjan;

import java.util.concurrent.BlockingQueue;

public abstract class CustomBlockingQueue<E> implements BlockingQueue<E> {

public BlockingQueue<E> blockingQueue;

public CustomBlockingQueue(BlockingQueue blockingQueue) {
    this.blockingQueue = blockingQueue;
}

@Override
final public boolean offer(E e) {
    return false;
}

final public boolean customOffer(E e) {
    return blockingQueue.offer(e);
}
}

线程池阻塞队列

package com.gunjan;

import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class ThreadPoolBlockingQueue<E> extends CustomBlockingQueue<E> {

    public ThreadPoolBlockingQueue(BlockingQueue blockingQueue) {
        super(blockingQueue);
    }

    @Override
    public E remove() {
        return this.blockingQueue.remove();
    }

    @Override
    public E poll() {
        return this.blockingQueue.poll();
    }

    @Override
    public E element() {
        return this.blockingQueue.element();
    }

    @Override
    public E peek() {
        return this.blockingQueue.peek();
    }

    @Override
    public int size() {
        return this.blockingQueue.size();
    }

    @Override
    public boolean isEmpty() {
        return this.blockingQueue.isEmpty();
    }

    @Override
    public Iterator<E> iterator() {
        return this.blockingQueue.iterator();
    }

    @Override
    public Object[] toArray() {
        return this.blockingQueue.toArray();
    }

    @Override
    public <T> T[] toArray(T[] a) {
        return this.blockingQueue.toArray(a);
    }

    @Override
    public boolean containsAll(Collection<?> c) {
        return this.blockingQueue.containsAll(c);
    }

    @Override
    public boolean addAll(Collection<? extends E> c) {
        return this.blockingQueue.addAll(c);
    }

    @Override
    public boolean removeAll(Collection<?> c) {
        return this.blockingQueue.removeAll(c);
    }

    @Override
    public boolean retainAll(Collection<?> c) {
        return this.blockingQueue.retainAll(c);
    }

    @Override
    public void clear() {
        this.blockingQueue.clear();
    }

    @Override
    public boolean add(E e) {
        return this.blockingQueue.add(e);
    }

    @Override
    public void put(E e) throws InterruptedException {
        this.blockingQueue.put(e);
    }

    @Override
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
        return this.blockingQueue.offer(e, timeout, unit);
    }

    @Override
    public E take() throws InterruptedException {
        return this.blockingQueue.take();
    }

    @Override
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        return this.blockingQueue.poll(timeout, unit);
    }

    @Override
    public int remainingCapacity() {
        return this.blockingQueue.remainingCapacity();
    }

    @Override
    public boolean remove(Object o) {
        return this.blockingQueue.remove(o);
    }

    @Override
    public boolean contains(Object o) {
        return this.blockingQueue.contains(o);
    }

    @Override
    public int drainTo(Collection<? super E> c) {
        return this.blockingQueue.drainTo(c);
    }

    @Override
    public int drainTo(Collection<? super E> c, int maxElements) {
        return this.blockingQueue.drainTo(c, maxElements);
    }
}

RejectedExecutionHandlerImpl

package com.gunjan;

import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        boolean inserted = ((CustomBlockingQueue) executor.getQueue()).customOffer(r);
        if (!inserted) {
            throw new RejectedExecutionException();
        }
    }
}

CustomThreadPoolExecutorTest

package com.gunjan;

import java.util.concurrent.*;

public class CustomThreadPoolExecutorTest {

public static void main(String[] args) throws InterruptedException {
    LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue<Runnable>(500);
    CustomBlockingQueue customLinkedBlockingQueue = new ThreadPoolBlockingQueue<Runnable>(linkedBlockingQueue);
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 100, 60, TimeUnit.SECONDS,
            customLinkedBlockingQueue, new RejectedExecutionHandlerImpl());


    for (int i = 0; i < 750; i++) {
        try {
            threadPoolExecutor.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(1000);
                        System.out.println(threadPoolExecutor);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            e.printStackTrace();
        }

    }

    threadPoolExecutor.shutdown();
    threadPoolExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MINUTES);
    System.out.println(threadPoolExecutor);
}
}
于 2017-11-02T18:13:21.940 回答