3

Per the comment of method public static ExecutorService newCachedThreadPool() in Executor Class:

Threads that have not been used for sixty seconds are terminated and 
removed from the **cache**.

I was wondering where is the cache and how it functions? As I didn't see any possible static Collection variable in the ThreadPoolExecutor or it's super class.

4

3 回答 3

5

Technically Worker is a Runnable containing a reference to a Thread and not a Thread by itself.

Let us dig deeper into the mechanics of this class.

Executors.cachedThreadPool uses this constructor from ThreadPoolExecutor

new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());

where 60s corresponds to the keepAliveTime time.

Worker Addition / Task submission

A RunnableFuture is created out of the submitted Callable or Runnable.
This is passed down to the execute() method.

The execute method tries to insert the task on to the workQueue which in our case is the SynchronousQueue. This will fail and return false due to the semantics of SynchronousQueue.
(Just hold on to this thought, we will revisit this when we talk about caching aspect)

The call goes on to the addIfUnderMaximumPoolSize method within execute which will create a java.util.concurrent.ThreadPoolExecutor.Worker runnable and creates a Thread and adds the created Worker to the workers hashSet. (the one others have mentioned in the answers)

and then it calls the thread.start() .

The run method of Worker is very important and should be noted.

public void run() {
    try {
        Runnable task = firstTask;
        firstTask = null;
        while (task != null || (task = getTask()) != null) {
            runTask(task);
            task = null;
        }
    } finally {
        workerDone(this);
    }
}

At this point in time you have a submitted a task and a thread is created and running it.

Worker Removal

In the run method if you have noticed there is a while loop. It is an incredibly interesting piece of code.

If the task is not null it will short circuit and not check for the second condition.

Once the task has run using runTask and the task reference is set to null, the call comes to the second check condition which takes it into getTask method.

Here is the part which decides a worker should be purged or not.

workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);

The workQueue is polled for a minute in this case to check for any new tasks coming on the queue. If not it will return null and checks for whether worker can exit.

Returning null means we will break out of the while and come to the finally block.

Here the worker is removed from the HashSet and the referenced Thread is also gone.

Caching aspect

Coming back to the SynchronousQueue we discussed in Task submission.

In the event I submit a task where workerQueue.offer and workerQueue.poll is able to work in tandem, i.e. there is a task to process in between those 60s I can re-use the thread.

This can be seen in action if I put a sleep of 59s vs 61s between my each task execution.

for 59s I can see the thread getting re-used. for 61s I can see a new thread getting created in the pool.

N.B. The actual timings could vary from machine to machine and my run() is just printing out Thread.currentThread().getName()

Please let me know in comments if I have missed something or misinterpreted the code.

于 2013-04-22T11:18:25.300 回答
1

Cache word is only an abstraction. Internally it uses HashSet to hold Threads. As per the code:

/**
 * Set containing all worker threads in pool. Accessed only when
 * holding mainLock.
 */
private final HashSet<Worker> workers = new HashSet<Worker>();

And if at all you are interested about the runnables you submit or execute.

newCachedThreadPool uses SynchronousQueue<Runnable> to handle them.

于 2013-04-22T08:53:57.930 回答
0

If you go through the code of ThreadPoolExecutor, you will see this:

 /**
 * Set containing all worker threads in pool. Accessed only when
 * holding mainLock.
 */
private final HashSet<Worker> workers = new HashSet<Worker>();

and this:

/**
 * The queue used for holding tasks and handing off to worker
 * threads.  We do not require that workQueue.poll() returning
 * null necessarily means that workQueue.isEmpty(), so rely
 * solely on isEmpty to see if the queue is empty (which we must
 * do for example when deciding whether to transition from
 * SHUTDOWN to TIDYING).  This accommodates special-purpose
 * queues such as DelayQueues for which poll() is allowed to
 * return null even if it may later return non-null when delays
 * expire.
 */
private final BlockingQueue<Runnable> workQueue;

And this:

 try {
          Runnable r = timed ?
               // here keepAliveTime is passed as sixty seconds from
               // Executors#newCachedThreadPool()
               workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
               workQueue.take();
           if (r != null)
                return r;
           timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }

I sincere walk through the actual implementation code, keeping these pointers in mind will help you understand more clearly.

于 2013-04-22T08:54:23.350 回答