ExecutorService
如果您曾经启动线程,则通常不能从外部代码中断ExecutorService::execute(Runnable)
线程,因为外部代码没有Thread
对每个正在运行的线程的对象的引用(尽管请参阅此答案的结尾以获取解决方案,如果你需要ExecutorService::execute
)。但是,如果您改为使用ExecutorService::submit(Callable<T>)
提交作业,则会返回 a ,它在开始执行Future<T>
后在内部保留对正在运行的线程的引用。Callable::call()
这个线程可以通过调用来中断Future::cancel(true)
。因此,任何检查当前线程中断状态的代码(或由其调用)Callable
都可以通过Future
引用中断。这包括BlockingQueue::take()
,即使被阻塞,也会响应线程中断。(如果 JRE 阻塞方法在阻塞时被中断,通常会唤醒,意识到它们已被中断,并抛出一个InterruptedException
.)
总结一下:既取消未来的工作Future::cancel()
,同时也中断正在进行的工作(只要正在进行的工作响应线程中断)。这两个调用都不会影响已经成功完成的工作。Future::cancel(true)
Future::cancel(true)
cancel
请注意,一旦线程被取消中断,InterruptException
就会在线程内抛出一个(例如,BlockingQueue::take()
在这种情况下)。但是,下次您调用成功取消(即在完成之前被取消的 a)时,您的 aCancellationException
将被扔回主线程。这与您通常期望的不同:如果未取消的throws ,下一次调用将 throw ,但如果取消的throws ,下一次调用将通过。Future::get()
Future
Future
Callable
InterruptedException
Future::get()
InterruptedException
Callable
InterruptedException
Future::get()
CancellationException
这是一个说明这一点的例子:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
public class Test {
public static void main(String[] args) throws Exception {
// Start Executor with 4 threads
int numThreads = 4;
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(numThreads);
try {
// Set up BlockingQueue for inputs, and List<Future> for outputs
BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
List<Future<String>> futures = new ArrayList<>(numThreads);
for (int i = 0; i < numThreads; i++) {
int threadIdx = i;
futures.add(executor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
try {
// Get an input from the queue (blocking)
int val = queue.take();
return "Thread " + threadIdx + " got value " + val;
} catch (InterruptedException e) {
// Thrown once Future::cancel(true) is called
System.out.println("Thread " + threadIdx + " got interrupted");
// This value is returned to the Future, but can never
// be read, since the caller will get a CancellationException
return "Thread " + threadIdx + " got no value";
}
}
}));
}
// Enqueue (numThreads - 1) values into the queue, so that one thread blocks
for (int i = 0; i < numThreads - 1; i++) {
queue.add(100 + i);
}
// Cancel all futures
for (int i = 0; i < futures.size(); i++) {
Future<String> future = futures.get(i);
// Cancel the Future -- this doesn't throw an exception until
// the get() method is called
future.cancel(/* mayInterruptIfRunning = */ true);
try {
System.out.println(future.get());
} catch (CancellationException e) {
System.out.println("Future " + i + " was cancelled");
}
}
} finally {
// Terminate main after all threads have shut down (this call does not block,
// so main will exit before the threads stop running)
executor.shutdown();
}
}
}
每次运行时,输出都会有所不同,但这里有一次运行:
Future 1 was cancelled
Future 0 was cancelled
Thread 2 got value 100
Thread 3 got value 101
Thread 1 got interrupted
这表明Future::cancel()
调用了之前完成的线程 2 和线程 3。线程 1 被取消,所以内部InterruptedException
被抛出,外部CancellationException
被抛出。线程 0 在开始运行之前被取消。(请注意,线程索引通常与索引不相关Future
,因此Future 0 was cancelled
可能对应于线程 0 或线程 1 被取消,对于Future 1 was cancelled
.)
高级:实现相同效果的一种方法 with Executor::execute
(不返回Future
引用)而不是使用 customExecutor::submit
创建 a ,并为创建的每个线程在并发集合(例如并发队列)中记录一个引用。然后要取消所有线程,您可以简单地调用所有先前创建的线程。但是,您将需要处理在中断现有线程时可能会创建新线程的竞争条件。要处理此问题,请设置一个对可见的标志,告诉它不要再创建任何线程,然后一旦设置,取消现有线程。ThreadPoolExecutor
ThreadFactory
ThreadFactory
Thread::interrupt()
AtomicBoolean
ThreadFactory