我想取消从 ThreadPoolExecutor 获得的 FutureTask,但我想确保线程池上的 Callable 已停止它的工作。
如果我调用 FutureTask#cancel(false) 然后 get() (阻塞直到完成)我得到一个 CancelledException。这个异常是立即抛出还是在任务停止执行后抛出?
我想取消从 ThreadPoolExecutor 获得的 FutureTask,但我想确保线程池上的 Callable 已停止它的工作。
如果我调用 FutureTask#cancel(false) 然后 get() (阻塞直到完成)我得到一个 CancelledException。这个异常是立即抛出还是在任务停止执行后抛出?
是的,CancellationException
立即抛出。您可以扩展 FutureTask 以添加get()
等待直到Callable
线程完成的方法版本。
public class ThreadWaitingFutureTask<T> extends FutureTask<T> {
private final Semaphore semaphore;
public ThreadWaitingFutureTask(Callable<T> callable) {
this(callable, new Semaphore(1));
}
public T getWithJoin() throws InterruptedException, ExecutionException {
try {
return super.get();
}
catch (CancellationException e) {
semaphore.acquire();
semaphore.release();
throw e;
}
}
private ThreadWaitingFutureTask(final Callable<T> callable,
final Semaphore semaphore) {
super(new Callable<T>() {
public T call() throws Exception {
semaphore.acquire();
try {
return callable.call();
}
finally {
semaphore.release();
}
}
});
this.semaphore = semaphore;
}
}
Aleksey 的例子效果很好。我写了一个变体,构造函数采用 Runnable(将返回 null)并展示如何在 cancel() 上直接阻塞(join):
public class FutureTaskCancelWaits<T> extends FutureTask<T> {
private final Semaphore semaphore;
public FutureTaskCancelWaits(Runnable runnable) {
this(Executors.callable(runnable, (T) null));
}
public FutureTaskCancelWaits(Callable<T> callable) {
this(callable, new Semaphore(1));
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
// If the task was successfully cancelled, block here until call() returns
if (super.cancel(mayInterruptIfRunning)) {
try {
semaphore.acquire();
// All is well
return true;
} catch (InterruptedException e) {
// Interrupted while waiting...
} finally {
semaphore.release();
}
}
return false;
}
private FutureTaskCancelWaits(final Callable<T> callable, final Semaphore semaphore) {
super(new Callable<T>() {
public T call() throws Exception {
semaphore.acquire();
try {
return callable.call();
} finally {
semaphore.release();
}
}
});
this.semaphore = semaphore;
}
}
取消后立即抛出。
没有简单的方法可以知道它已经开始和结束。您可以为您的 runnable 创建一个包装器以检查其状态。
final AtomicInteger state = new AtomicInteger();
// in the runnable
state.incrementAndGet();
try {
// do work
} finally {
state.decrementAdnGet();
}
这个答案通过检查任务是否在可调用对象中被取消来修复 Aleksey 和 FooJBar 代码中的竞争条件。(在 FutureTask.run 检查状态和运行可调用对象之间有一个窗口,在此期间取消和 getWithJoin 都可以成功完成。但是,可调用对象仍将运行。)
我还决定不覆盖原来的取消,因为新的取消需要声明InterruptedException
. 新的取消删除了它无用的返回值(因为true
可以表示“任务尚未开始”、“任务已经开始并且已经完成了大部分损害”、“任务已经开始并将最终完成”中的任何一个)。对返回值的检查也没有了super.cancel
,因此如果从不同的线程多次调用新的取消,它们都将等待任务完成。
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Based on: http://stackoverflow.com/questions/6040962/wait-for-cancel-on-futuretask
*
* @author Aleksandr Dubinsky
*/
public class FixedFutureTask<T> extends FutureTask<T> {
/**
* Creates a {@code FutureTask} that will, upon running, execute the given {@code Runnable},
* and arrange that {@code get} will return the given result on successful completion.
*
* @param runnable the runnable task
* @param result the result to return on successful completion.
* If you don't need a particular result, consider using constructions of the form:
* {@code Future<?> f = new FutureTask<Void>(runnable, null)}
* @throws NullPointerException if the runnable is null
*/
public
FixedFutureTask (Runnable runnable, T result) {
this (Executors.callable (runnable, result));
}
/**
* Creates a {@code FutureTask} that will, upon running, execute the given {@code Callable}.
*
* @param callable the callable task
* @throws NullPointerException if the callable is null
*/
public
FixedFutureTask (Callable<T> callable) {
this (new MyCallable (callable));
}
/** Some ugly code to work around the compiler's limitations on constructors */
private
FixedFutureTask (MyCallable<T> myCallable) {
super (myCallable);
myCallable.task = this;
}
private final Semaphore semaphore = new Semaphore(1);
private static class MyCallable<T> implements Callable<T>
{
MyCallable (Callable<T> callable) {
this.callable = callable;
}
final Callable<T> callable;
FixedFutureTask<T> task;
@Override public T
call() throws Exception {
task.semaphore.acquire();
try
{
if (task.isCancelled())
return null;
return callable.call();
}
finally
{
task.semaphore.release();
}
}
}
/**
* Waits if necessary for the computation to complete or finish cancelling, and then retrieves its result, if available.
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an exception
* @throws InterruptedException if the current thread was interrupted while waiting
*/
@Override public T
get() throws InterruptedException, ExecutionException, CancellationException {
try
{
return super.get();
}
catch (CancellationException e)
{
semaphore.acquire();
semaphore.release();
throw e;
}
}
/**
* Waits if necessary for at most the given time for the computation to complete or finish cancelling, and then retrieves its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an exception
* @throws InterruptedException if the current thread was interrupted while waiting
* @throws CancellationException
* @throws TimeoutException if the wait timed out
*/
@Override public T
get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, CancellationException, TimeoutException {
try
{
return super.get (timeout, unit);
}
catch (CancellationException e)
{
semaphore.acquire();
semaphore.release();
throw e;
}
}
/**
* Attempts to cancel execution of this task and waits for the task to complete if it has been started.
* If the task has not started when {@code cancelWithJoin} is called, this task should never run.
* If the task has already started, then the {@code mayInterruptIfRunning} parameter determines
* whether the thread executing this task should be interrupted in an attempt to stop the task.
*
* <p>After this method returns, subsequent calls to {@link #isDone} will
* always return {@code true}. Subsequent calls to {@link #isCancelled}
* will always return {@code true} if this method returned {@code true}.
*
* @param mayInterruptIfRunning {@code true} if the thread executing this task should be interrupted;
* otherwise, in-progress tasks are allowed to complete
* @throws InterruptedException if the thread is interrupted
*/
public void
cancelAndWait (boolean mayInterruptIfRunning) throws InterruptedException {
super.cancel (mayInterruptIfRunning);
semaphore.acquire();
semaphore.release();
}
}
CompletionSerivce 比仅 FutureTask 更强大,在许多情况下它更合适。我从中得到一些想法来解决问题。此外,它的子类 ExecutorCompletionService 比 FutureTask 简单,只包含几行代码。这很容易阅读。所以我修改了类以获得部分计算的结果。对我来说是一个令人满意的解决方案,毕竟它看起来简单明了。
CompletionService 可以确保 FutureTask 已经完成,我们从take
orpoll
方法中获得。为什么?因为QueueingFuture
类,它的方法run
只被调用,其他方法如cancel
没有被调用。换句话说,它正常完成。
演示代码:
CompletionService<List<DeviceInfo>> completionService =
new MyCompletionService<>(Executors.newCachedThreadPool());
Future task = completionService.submit(yourTask);
try {
LogHelper.i(TAG, "result 111: " );
Future<List<DeviceInfo>> result = completionService.take();
LogHelper.i(TAG, "result: " + result.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
这是类代码:
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
/**
* This is a CompletionService like java.util.ExecutorCompletionService, but we can get partly computed result
* from our FutureTask which returned from submit, even we cancel or interrupt it.
* Besides, CompletionService can ensure that the FutureTask is done when we get from take or poll method.
*/
public class MyCompletionService<V> implements CompletionService<V> {
private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;
/**
* FutureTask extension to enqueue upon completion.
*/
private static class QueueingFuture<V> extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task,
BlockingQueue<Future<V>> completionQueue) {
super(task, null);
this.task = task;
this.completionQueue = completionQueue;
}
private final Future<V> task;
private final BlockingQueue<Future<V>> completionQueue;
protected void done() { completionQueue.add(task); }
}
private static class DoneFutureTask<V> extends FutureTask<V> {
private Object outcome;
DoneFutureTask(Callable<V> task) {
super(task);
}
DoneFutureTask(Runnable task, V result) {
super(task, result);
}
@Override
protected void set(V v) {
super.set(v);
outcome = v;
}
@Override
public V get() throws InterruptedException, ExecutionException {
try {
return super.get();
} catch (CancellationException e) {
return (V)outcome;
}
}
}
private RunnableFuture<V> newTaskFor(Callable<V> task) {
return new DoneFutureTask<V>(task);
}
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
return new DoneFutureTask<V>(task, result);
}
/**
* Creates an MyCompletionService using the supplied
* executor for base task execution and a
* {@link LinkedBlockingQueue} as a completion queue.
*
* @param executor the executor to use
* @throws NullPointerException if executor is {@code null}
*/
public MyCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
/**
* Creates an MyCompletionService using the supplied
* executor for base task execution and the supplied queue as its
* completion queue.
*
* @param executor the executor to use
* @param completionQueue the queue to use as the completion queue
* normally one dedicated for use by this service. This
* queue is treated as unbounded -- failed attempted
* {@code Queue.add} operations for completed tasks cause
* them not to be retrievable.
* @throws NullPointerException if executor or completionQueue are {@code null}
*/
public MyCompletionService(Executor executor,
BlockingQueue<Future<V>> completionQueue) {
if (executor == null || completionQueue == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
}
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture<V>(f, completionQueue));
return f;
}
public Future<V> submit(Runnable task, V result) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
executor.execute(new QueueingFuture<V>(f, completionQueue));
return f;
}
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
public Future<V> poll() {
return completionQueue.poll();
}
public Future<V> poll(long timeout, TimeUnit unit)
throws InterruptedException {
return completionQueue.poll(timeout, unit);
}
}