如以下链接所述:-
如何让 ThreadPoolExecutor 在排队之前将线程增加到最大值?
我将队列实现更改为在输入元素后返回 false。因此,每当将新任务插入队列时,都会为其创建一个新线程。
但是,当我使用放置的记录器大规模运行以下实现(Bis 系统测试)时,会产生一个新问题。
当一个任务来执行时,它会被插入到队列中,当队列返回 false 时,会创建一个新线程来执行它。池中当前存在的空闲线程不会被拾取。getTask()
原因是从队列中选择任务的方法将任务分配给空闲线程。所以我的问题是如何改变这种行为,以便如果线程空闲,如何确保为空闲线程分配任务以执行而不是创建新线程?
下面的输出会让它更清楚:-
Task 46 ends
Active Count: 0 Pool Size : 3 Idle Count: 3 Queue Size: 0
Task 47 ends
Active Count: 0 Pool Size : 3 Idle Count: 3 Queue Size: 0
Task 48 ends
Active Count: 0 Pool Size : 3 Idle Count: 3 Queue Size: 0
Active Count: 1 Pool Size : 4 Idle Count: 3 Queue Size: 0
Task 49 ends
Active Count: 2 Pool Size : 5 Idle Count: 3 Queue Size: 0
Task 50 ends
Active Count: 2 Pool Size : 5 Idle Count: 3 Queue Size: 0
代码文件如下:-
ThreadPoolExecutor 是 java 1.5 版本,因为我们在服务器机器上使用 1.5 并且无法升级它。
线程池执行器:-
public void execute(Runnable command) {
System.out.println("Active Count: " + getActiveCount()
+ " Pool Size : " + getPoolSize() + " Idle Count: "
+ (getPoolSize() - getActiveCount())+" Queue Size: "+getQueue().size());
if (command == null)
throw new NullPointerException();
for (;;) {
if (runState != RUNNING) {
reject(command);
return;
}
if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
return;
if (workQueue.offer(command))
return;
int status = addIfUnderMaximumPoolSize(command);
if (status > 0) // created new thread
return;
if (status == 0) { // failed to create thread
reject(command);
return;
}
// Retry if created a new thread but it is busy with another task
}
}
LinkedBlockingQueue:-
public class CustomBlockingQueue<E> extends LinkedBlockingQueue<E>
{
/**
*
*/
private static final long serialVersionUID = 1L;
public CustomBlockingQueue() {
super(Integer.MAX_VALUE);
}
public boolean offer(E e) {
return false;
}
}
在拒绝处理程序中,我们正在调用我们尚未覆盖的队列的 put 方法
调用执行器
final CustomThreadPoolExecutor tpe = new CustomThreadPoolExecutor(3, 8, 0L, TimeUnit.MILLISECONDS, new MediationBlockingQueue<Runnable>(), new MediationRejectionHandler());
private static final int TASK_COUNT = 100;
for (int i = 0; i < TASK_COUNT; i++) {
......
tpe.execute(new Task(i));
.....
}
我们调用核心池大小为 3 的执行器,最大池大小为 8,并为任务使用无界链接阻塞队列。