28

我知道CompletableFuture设计不会通过中断来控制其执行,但我想你们中的一些人可能会遇到这个问题。CompletableFutures 是组成异步执行的非常好的方法,但是如果您希望在取消未来时中断或停止底层执行,我们该怎么做呢?或者我们必须接受任何取消或手动完成CompletableFuture都不会影响在那里完成它的线程?

也就是说,在我看来,这显然是一项需要 executor worker 时间的无用工作。我想知道在这种情况下哪种方法或设计可能会有所帮助?

更新

这是一个简单的测试

public class SimpleTest {

  @Test
  public void testCompletableFuture() throws Exception {
    CompletableFuture<Void> cf = CompletableFuture.runAsync(()->longOperation());

    bearSleep(1);

    //cf.cancel(true);
    cf.complete(null);

    System.out.println("it should die now already");
    bearSleep(7);
  }

  public static void longOperation(){
    System.out.println("started");
    bearSleep(5);
    System.out.println("completed");
  }

  private static void bearSleep(long seconds){
    try {
      TimeUnit.SECONDS.sleep(seconds);
    } catch (InterruptedException e) {
      System.out.println("OMG!!! Interrupt!!!");
    }
  }
}
4

6 回答 6

18

A CompletableFuture is not related to the asynchronous action that may eventually complete it.

Since (unlike FutureTask) this class has no direct control over the computation that causes it to be completed, cancellation is treated as just another form of exceptional completion. Method cancel has the same effect as completeExceptionally(new CancellationException()).

There may not even be a separate thread working on completing it (there may even be many threads working on it). Even if there is, there's no link from a CompletableFuture to any thread that has a reference to it.

As such, there's nothing you can do through CompletableFuture to interrupt any thread that may be running some task that will complete it. You'll have to write your own logic which tracks any Thread instances which acquire a reference to the CompletableFuture with the intention to complete it.


Here's an example of the type of execution I think you could get away with.

public static void main(String[] args) throws Exception {
    ExecutorService service = Executors.newFixedThreadPool(1);
    CompletableFuture<String> completable = new CompletableFuture<>();
    Future<?> future = service.submit(new Runnable() {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                if (Thread.interrupted()) {
                    return; // remains uncompleted
                }
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    return; // remains uncompleted
                }
            }
            completable.complete("done");
        }
    });

    Thread.sleep(2000);

    // not atomic across the two
    boolean cancelled = future.cancel(true);
    if (cancelled)
        completable.cancel(true); // may not have been cancelled if execution has already completed
    if (completable.isCancelled()) {
        System.out.println("cancelled");
    } else if (completable.isCompletedExceptionally()) {
        System.out.println("exception");
    } else {
        System.out.println("success");
    }
    service.shutdown();
}

This assumes that the task being executed is setup to handle interruptions correctly.

于 2015-03-12T15:41:53.773 回答
3

那这个呢?

public static <T> CompletableFuture<T> supplyAsync(final Supplier<T> supplier) {

    final ExecutorService executorService = Executors.newFixedThreadPool(1);

    final CompletableFuture<T> cf = new CompletableFuture<T>() {
        @Override
        public boolean complete(T value) {
            if (isDone()) {
                return false;
            }
            executorService.shutdownNow();
            return super.complete(value);
        }

        @Override
        public boolean completeExceptionally(Throwable ex) {
            if (isDone()) {
                return false;
            }
            executorService.shutdownNow();
            return super.completeExceptionally(ex);
        }
    };

    // submit task
    executorService.submit(() -> {
        try {
            cf.complete(supplier.get());
        } catch (Throwable ex) {
            cf.completeExceptionally(ex);
        }
    });

    return cf;
}

简单测试:

    CompletableFuture<String> cf = supplyAsync(() -> {
        try {
            Thread.sleep(1000L);
        } catch (Exception e) {
            System.out.println("got interrupted");
            return "got interrupted";
        }
        System.out.println("normal complete");
        return "normal complete";
    });

    cf.complete("manual complete");
    System.out.println(cf.get());

我不喜欢每次都必须创建一个 Executor 服务的想法,但也许你可以找到一种方法来重用 ForkJoinPool。

于 2015-03-13T20:35:25.150 回答
1

我有类似的问题,我需要模拟一个 InterruptedException。

我嘲笑了应该返回 CompletetableFuture 的方法调用,并且我在返回值上放置了一个间谍,这样CompletableFuture#get会抛出异常。

它按我的预期工作,并且我能够测试该代码是否正确处理了异常。

        CompletableFuture spiedFuture = spy(CompletableFuture.completedFuture(null));
        when(spiedFuture .get()).thenThrow(new InterruptedException());

        when(servuce.getById(anyString())).thenReturn(spiedFuture );
于 2021-01-20T15:52:53.087 回答
1

Future这是创建可以取消的任务的超短版本:

public static <T> Future<T> supplyAsync(Function<Future<T>, T> operation) {
    CompletableFuture<T> future = new CompletableFuture<>();
    return future.completeAsync(() -> operation.apply(future));
}

CompletableFuture传递给操作Function以检查 的取消状态Future

Future<Result> future = supplyAsync(task -> {
   while (!task.isCancelled()) {
       // computation
   }
   return result;
});
// later you may cancel
future.cancel(false);
// or retrieve the result
Result result = future.get(5, TimeUnit.SECONDS);

然而,这不会中断Thread正在运行的操作。如果您还希望能够中断Thread,那么您必须存储对它的引用并覆盖Future.cancel(..)以中断它。

public static <T> Future<T> supplyAsync(Function<Future<T>, T> action) {
    return supplyAsync(action, r -> new Thread(r).start());
}

public static <T> Future<T> supplyAsync(Function<Future<T>, T> action, Executor executor) {

    AtomicReference<Thread> interruptThread = new AtomicReference<>();
    CompletableFuture<T> future = new CompletableFuture<>() {

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            if (!interruptThread.compareAndSet(null, Thread.currentThread()) 
                   && mayInterruptIfRunning) {
                interruptThread.get().interrupt();
            }
            return super.cancel(mayInterruptIfRunning);
        }
    };

    executor.execute(() -> {
        if (interruptThread.compareAndSet(null, Thread.currentThread())) try {
            future.complete(action.apply(future));
        } catch (Throwable e) {
            future.completeExceptionally(e);
        }
    });

    return future;
}

以下测试检查Thread执行我们是否Function被中断:

@Test
void supplyAsyncWithCancelOnInterrupt() throws Exception {
    Object lock = new Object();
    CountDownLatch done = new CountDownLatch(1);
    CountDownLatch started = new CountDownLatch(1);

    Future<Object> future = supplyAsync(m -> {
        started.countDown();
        synchronized (lock) {
            try {
                lock.wait(); // let's get interrupted
            } catch (InterruptedException e) {
                done.countDown();
            }
        }
        return null;
    });

    assertFalse(future.isCancelled());
    assertFalse(future.isDone());

    assertTrue(started.await(5, TimeUnit.SECONDS));
    assertTrue(future.cancel(true));

    assertTrue(future.isCancelled());
    assertTrue(future.isDone());
    assertThrows(CancellationException.class, () -> future.get());
    assertTrue(done.await(5, TimeUnit.SECONDS));
}
于 2021-03-27T21:03:14.780 回答
1

如果你使用

cf.get();

代替

cf.join();

等待完成的线程可以被中断。这让我陷入了**,所以我只是把它放在那里。然后,您需要进一步传播此中断/使用 cf.cancel(...) 来真正完成执行。

于 2019-03-19T13:00:43.313 回答
0

关于什么?

/** @return {@link CompletableFuture} which when cancelled will interrupt the supplier
 */
public static <T> CompletableFuture<T> supplyAsyncInterruptibly(Supplier<T> supplier, Executor executor) {
    return produceInterruptibleCompletableFuture((s) -> CompletableFuture.supplyAsync(s, executor), supplier);
}

// in case we want to do the same for similar methods later
private static <T> CompletableFuture<T> produceInterruptibleCompletableFuture(
        Function<Supplier<T>,CompletableFuture<T>> completableFutureAsyncSupplier, Supplier<T> action) {
    FutureTask<T> task = new FutureTask<>(action::get);
    return addCancellationAction(completableFutureAsyncSupplier.apply(asSupplier(task)), () ->
            task.cancel(true));
}

/** Ensures the specified action is executed if the given {@link CompletableFuture} is cancelled.
 */
public static <T> CompletableFuture<T> addCancellationAction(CompletableFuture<T> completableFuture,
                                                             @NonNull Runnable onCancellationAction) {
    completableFuture.whenComplete((result, throwable) -> {
        if (completableFuture.isCancelled()) {
            onCancellationAction.run();
        }
    });
    return completableFuture;  // return original CompletableFuture
}

/** @return {@link Supplier} wrapper for the given {@link RunnableFuture} which calls {@link RunnableFuture#run()}
 *          followed by {@link RunnableFuture#get()}.
 */
public static <T> Supplier<T> asSupplier(RunnableFuture<T> futureTask) throws CompletionException {
    return () -> {
        try {
            futureTask.run();
            try {
                return futureTask.get();
            } catch (ExecutionException e) {  // unwrap ExecutionExceptions
                final Throwable cause = e.getCause();
                throw (cause != null) ? cause : e;
            }
        } catch (CompletionException e) {
            throw e;
        } catch (Throwable t) {
            throw new CompletionException(t);
        }
    };
}
于 2018-11-09T22:50:19.617 回答