像任何 ExecutorServices 一样,线程池,我们定义了一个大小为 3 的 newFixedPool。现在我有一个大约 10000 个可运行任务的队列。对于执行上述过程,我有这些疑问 -
执行上面的流程,executor是不是只让任务队列中的3个线程一次性运行?
池将承载 3 个线程,这 3 个线程将只负责执行所有 10000 个任务。如果它是正确的,单个线程如何运行不同的可运行任务,因为最终这些任务也是线程本身,并且在任何作业/任务运行的中间,您可以将新的职责分配给池线程。
像任何 ExecutorServices 一样,线程池,我们定义了一个大小为 3 的 newFixedPool。现在我有一个大约 10000 个可运行任务的队列。对于执行上述过程,我有这些疑问 -
执行上面的流程,executor是不是只让任务队列中的3个线程一次性运行?
池将承载 3 个线程,这 3 个线程将只负责执行所有 10000 个任务。如果它是正确的,单个线程如何运行不同的可运行任务,因为最终这些任务也是线程本身,并且在任何作业/任务运行的中间,您可以将新的职责分配给池线程。
是的,如果实际上您正在使用,最多一次只有 3 个线程在池中Executors.newFixedThreadPool(3)
这10,000个任务并不Threads
简单Runnables
。Thread
必须通过启动AThread#start
才能实际创建系统线程。任务(的实例Runnable
)被放置在一个BlockingQueue
. 线程池中的线程将轮询 BlockingQueue 以获取要运行的任务。当他们完成任务时,他们会返回队列去获取另一个任务。BlockingQueue
如果添加了更多任务,则根据该队列的执行规则将它们插入到队列中。对于大多数队列,这是先进先出的,但PriorityQueue
实际上使用 aComparator
或自然排序来对插入的任务进行排序。
下面是接受 noofThreads 和 MaxConcurrentTask 的 java 中的 customThreadPool。它也有 stop() 来停止完整的 ThreadPool
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@SuppressWarnings("all")
public class ThreadPool {
private BlockingQueue taskQueue = null;
private List<PoolThread> threads = new ArrayList<PoolThread>();
private boolean isStopped = false;
public ThreadPool(int noOfThreads, int maxNoOfTasks) {
taskQueue = new LinkedBlockingQueue(maxNoOfTasks);
for(int i=0; i<noOfThreads; i++) {
threads.add(new PoolThread(taskQueue));
}
for(PoolThread thread : threads) {
thread.start();
}
}
public synchronized void execute(Runnable task) {
if(this.isStopped)
throw new IllegalStateException("ThreadPool is stopped");
this.taskQueue.offer(task);
}
public synchronized void stop() {
this.isStopped = true;
for(PoolThread thread : threads) {
thread.stopMe();
}
}
}
@SuppressWarnings("all")
class PoolThread extends Thread {
private BlockingQueue taskQueue = null;
private boolean isStopped = false;
public PoolThread(BlockingQueue queue) {
taskQueue = queue;
}
public void run() {
while(!isStopped()) {
try {
Runnable runnable = (Runnable) taskQueue.poll();
runnable.run();
} catch(Exception e) {
//log or otherwise report exception, //but keep pool thread alive.
}
}
}
public synchronized void stopMe() {
isStopped = true;
this.interrupt();
//break pool thread out of dequeue() call.
}
public synchronized boolean isStopped() {
return isStopped;
}
}