即使Future.cancel(..)
被调用,等待的调用仍然会阻塞。正如其他人所提到的,CompletableFuture
不会使用中断来取消任务。
根据javadoc CompletableFuture.cancel(..)
:
mayInterruptIfRunning这个值在这个实现中没有影响,因为中断不是用来控制处理的。
即使实现会导致中断,您仍然需要阻塞操作才能取消任务或通过Thread.interrupted()
.
您可以在操作中设置检查点,而不是中断Thread
(这可能并不总是那么容易),您可以在其中优雅地终止当前任务。这可以在将要处理的某些元素的循环中完成,或者您在操作的每个步骤之前检查取消状态并CancellationException
自己抛出一个。
棘手的部分是获取CompletableFuture
任务内的引用以便调用Future.isCancelled()
. 这是如何完成的示例:
public abstract class CancelableTask<T> {
private CompletableFuture<T> task;
private T run() {
try {
return compute();
} catch (Throwable e) {
task.completeExceptionally(e);
}
return null;
}
protected abstract T compute() throws Exception;
protected boolean isCancelled() {
Future<T> future = task;
return future != null && future.isCancelled();
}
public Future<T> start() {
synchronized (this) {
if (task != null) throw new IllegalStateException("Task already started.");
task = new CompletableFuture<>();
}
return task.completeAsync(this::run);
}
}
编辑:这里将改进CancelableTask
版本作为静态工厂:
public static <T> CompletableFuture<T> supplyAsync(Function<Future<T>, T> operation) {
CompletableFuture<T> future = new CompletableFuture<>();
return future.completeAsync(() -> operation.apply(future));
}
这是测试方法:
@Test
void testFuture() throws InterruptedException {
CountDownLatch started = new CountDownLatch(1);
CountDownLatch done = new CountDownLatch(1);
AtomicInteger counter = new AtomicInteger();
Future<Object> future = supplyAsync(task -> {
started.countDown();
while (!task.isCancelled()) {
System.out.println("Count: " + counter.getAndIncrement());
}
System.out.println("Task cancelled");
done.countDown();
return null;
});
// wait until the task is started
assertTrue(started.await(5, TimeUnit.SECONDS));
future.cancel(true);
System.out.println("Cancel called");
assertTrue(future.isCancelled());
assertTrue(future.isDone());
assertTrue(done.await(5, TimeUnit.SECONDS));
}
如果您真的想使用除 之外的中断CompletableFuture
,那么您可以将自定义传递Executor
到CompletableFuture.completeAsync(..)
您创建自己的位置Thread
,覆盖cancel(..)
并CompletableFuture
中断您的Thread
.