5

有什么方法可以创建至少有 5 个线程、最多 20 个线程以及任务的无界队列(意味着没有任务被拒绝)的 Executor

我尝试ThreadPoolExecutor(5, 20, 60L, TimeUnit.SECONDS, queue) 了所有我为队列想到的可能性:

new LinkedBlockingQueue() // never runs more than 5 threads
new LinkedBlockingQueue(1000000) // runs more than 5 threads, only when there is more than 1000000 tasks waiting
new ArrayBlockingQueue(1000000) // runs more than 5 threads, only when there is more than 1000000 tasks waiting
new SynchronousQueue() // no tasks can wait, after 20, they are rejected

没有一个按要求工作。

4

3 回答 3

5

也许这样的东西对你有用?我只是鞭打它,所以请戳它。基本上,它实现了一个溢出线程池,用于提供底层ThreadPoolExecutor

我看到它有两个主要缺点:

  • 缺少返回的 Future 对象submit()。但也许这对你来说不是问题。
  • 辅助队列只会ThreadPoolExecutor在提交作业时清空。必须有一个优雅的解决方案,但我还没有看到它。如果您知道将有稳定的任务流进入,StusMagicExecutor那么这可能不是问题。(“可能”是关键词。)一个选项可能是让您提交的任务在StusMagicExecutor完成后戳一下?

斯图的魔法执行者:

public class StusMagicExecutor extends ThreadPoolExecutor {
    private BlockingQueue<Runnable> secondaryQueue = new LinkedBlockingQueue<Runnable>();  //capacity is Integer.MAX_VALUE.

    public StusMagicExecutor() {
        super(5, 20, 60L, SECONDS, new SynchronousQueue<Runnable>(true), new RejectionHandler());  
    }
    public void queueRejectedTask(Runnable task) {
        try {
            secondaryQueue.put(task);
        } catch (InterruptedException e) {
            // do something
        }
    }
    public Future submit(Runnable newTask) {
        //drain secondary queue as rejection handler populates it
        Collection<Runnable> tasks = new ArrayList<Runnable>();
        secondaryQueue.drainTo(tasks);

        tasks.add(newTask);

        for (Runnable task : tasks)
             super.submit(task);

        return null; //does not return a future!
    }
}

class RejectionHandler implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
        ((StusMagicExecutor)executor).queueRejectedTask(runnable);
    }
}
于 2009-09-17T08:05:51.240 回答
1

的javadocsThreadPoolExecutor非常清楚,一旦corePoolSize创建了线程,只有在队列已满时才会创建新线程。所以如果你设置core为 5 和max20,你永远不会得到你想要的行为。

但是,如果将core和都设置max为 20,则只有在所有 20 个线程都忙时,任务才会被添加到队列中。当然,这会使您的“最少 5 个线程”要求变得毫无意义,因为所有 20 个线程都将保持活动状态(无论如何,直到它们空闲为止)。

于 2009-09-16T23:15:15.080 回答
1

我认为这个问题是类的一个缺点,并且给定构造函数参数组合非常误导。这是从 SwingWorker 的内部 ThreadPoolExecutor 中提取的一个解决方案,我将其制成顶级类。它没有最小值,但至少使用了上限。我唯一不知道的是锁定执行对性能有何影响。

public class BoundedThreadPoolExecutor extends ThreadPoolExecutor {
    private final ReentrantLock pauseLock = new ReentrantLock();
    private final Condition unpaused = pauseLock.newCondition();
    private boolean isPaused = false;
    private final ReentrantLock executeLock = new ReentrantLock();

    public BoundedThreadPoolExecutor(int maximumPoolSize,
            long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(0, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public BoundedThreadPoolExecutor(int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(0, maximumPoolSize, keepAliveTime, unit, workQueue,
                threadFactory);
    }

    public BoundedThreadPoolExecutor(int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(0, maximumPoolSize, keepAliveTime, unit, workQueue,
                handler);
    }

    public BoundedThreadPoolExecutor(int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
            RejectedExecutionHandler handler) {
        super(0, maximumPoolSize, keepAliveTime, unit, workQueue,
                threadFactory, handler);
    }

    @Override
    public void execute(Runnable command) {
        executeLock.lock();
        try {
            pauseLock.lock();
            try {
                isPaused = true;
            } finally {
                pauseLock.unlock();
            }
            setCorePoolSize(getMaximumPoolSize());
            super.execute(command);
            setCorePoolSize(0);
            pauseLock.lock();
            try {
                isPaused = false;
                unpaused.signalAll();
            } finally {
                pauseLock.unlock();
            }
        } finally {
            executeLock.unlock();
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        pauseLock.lock();
        try {
            while (isPaused) {
                unpaused.await();
            }
        } catch (InterruptedException ignore) {

        } finally {
            pauseLock.unlock();
        }
    }
}
于 2010-03-31T16:01:36.507 回答