7

用例

假设我们使用 CompletableFuture.runAsync(..) 运行执行,并且在 runnable 中我们有 try-with-resources 块(我们正在使用一些资源,无论发生什么都应该关闭),并且在某个时候在 try 块中执行未完成时,我们取消可完成的未来......尽管执行停止了应该关闭的资源没有关闭AutoClosable的close()没有被调用......


问题

这是一个java问题还是有办法正确地做到这一点?没有像使用期货(支持中断等)这样的变通办法,如果它的预期行为在不可中断的 CompletableFuture 被取消时应该如何处理类似情况......?


编码

public class AutoClosableResourceTest {

    public static class SomeService{
        public void connect(){
            System.out.println("connect");
        }

        public Integer disconnect(){
            System.out.println("disconnect");
            return null;
        }
    }

    public static class AutoClosableResource<T> implements AutoCloseable {

        private final T resource;
        private final Runnable closeFunction;

        private AutoClosableResource(T resource, Runnable closeFunction){
            this.resource = resource;
            this.closeFunction = closeFunction;
        }

        public T get(){
            return resource;
        }

        @Override
        public void close() throws Exception {
            closeFunction.run();
        }
    }

    @Test
    public void testTryWithResource() throws InterruptedException {
        SomeService service  = new SomeService();

        CompletableFuture<Void> async = CompletableFuture.runAsync(() -> {
            try (AutoClosableResource<SomeService> resource = new AutoClosableResource<>(service, service::disconnect)) {
                resource.get().connect();
                while (true) {
                    Thread.sleep(1000);
                    System.out.println("working...");
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        Thread.sleep(2500);
        async.cancel(true);
        Thread.sleep(2500);

    }
}

这将产生

connect
working...
working...
working...
working...

正如您所看到的,它不会调用 cancel() 并让资源处于打开状态......

4

6 回答 6

7

您似乎很难理解其目的CompletableFuture是什么。看看它的类文档的第一句话:

AFuture可以显式完成(设置其值和状态),...</p>

因此,与FutureTask哪个由执行其run方法的线程完成不同,aCompletableFuture可以由任何线程完成,该线程将在任意时间点设置其值/状态。不知道哪个线程将CompletableFuture完成它,甚至不知道当前是否有线程正在完成它。

因此CompletableFuture在被取消时不能中断正确的线程。这是其设计的基本部分。

如果您想要一个可以中断的工作线程,最好使用FutureTask/ ThreadPoolExecutor。以这种方式安排的任务可能仍会CompletableFuture在其结束时完成。

于 2014-09-25T15:24:19.230 回答
1

以下代码将陷入无限循环。调用 async.cancel 不会与希望停止的以下循环通信。

while (true) {
    Thread.sleep(1000);
    System.out.println("working...");
}

测试用例退出是因为卡在这个循环中的线程不是守护线程。

将 while 循环检查替换为以下内容,该检查在每次迭代时检查 isCancelled 标志。调用 CompletableFuture.cancel() 会将未来标记为已取消,但不会中断通过 runAsync 启动的线程。

while (isCancelled()) {
    Thread.sleep(1000);
   System.out.println("working...");
}
于 2014-09-25T14:29:59.547 回答
0

您可以使用 CompletableFuture 的“完成”方法来停止线程。

下面是一个显示行为的简单代码:

package com.ardevco;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureTest3 {
  public static void main(String[] args) throws Exception {

     ExecutorService pool = Executors.newFixedThreadPool(5);

     CompletableFuture<Integer> longRunningcompletableFuture = CompletableFuture.supplyAsync(() -> {
        for (int i = 0; i < 1; i--) {
           System.out.println("i " + i);
           sleep();
        }
        return 1; // we will newer reach this line so the thread will be stuck
     });

     CompletableFuture<Integer> completor = CompletableFuture.supplyAsync(() -> {
        System.out.println("completing the longRunningcompletableFuture");
        longRunningcompletableFuture.complete(1000);
        System.out.println("completed the longRunningcompletableFuture");
        return 10;
     });

     Thread.sleep(10000);

     System.out.println("completor...");
     int i = completor.get();
     System.out.println("completor i:" + i);
     System.out.println("completor...");

     System.out.println("completableFutureToBeCompleted2...");
     int i2 = longRunningcompletableFuture.get();
     System.out.println("completableFutureToBeCompleted2: " + i2);
     System.out.println("completableFutureToBeCompleted2...");

  }

  private static void sleep() {
     try {Thread.sleep(1000);}catch (Exception e) {}
  }

}

输出:

i 0 完成 longRunningcompletableFuture 完成 longRunningcompletableFuture i -1 i -2 i -3 i -4 i -5 i -6 i -7 i -8 i -9 i -10 完成者...完成者 i:10 完成者... completableFutureToBeCompleted2... completableFutureToBeCompleted2: 1000 completableFutureToBeCompleted2...

于 2015-12-16T15:56:34.010 回答
0

尽管有一个标记为正确的答案,但原因却大不相同 - 请参阅文档以了解CompletableFuture.cancel(mayInterruptIfRunning) 方法并阅读文章CompletableFuture 不能被中断以更好地理解问题。

这个问题在我的Tascalate Concurrent库中得到了解决,对您的代码的更改应该是:从 CompletableFuture<Void> async = CompletableFuture.runAsync(() -> { ... });

Promise<Void> async = CompletableTask.runAsync(() -> { ... }, someExplicitExecutor); ...你会得到预期的行为(执行线程被中断,AutoClosable 被关闭,async用 完成CancellationException)。

您可以在我的博客中阅读有关该库的更多信息

于 2017-07-30T12:21:39.297 回答
0

我在 Java 8 SE 中也遇​​到了这个问题。对我来说,重要的是不要使用第三方库。

cancel( mayInterruptIfRunning ) 此值在此实现中无效,因为中断不用于控制处理。

这个想法是在调用cancel( ) 时使用Thread.interrupt( ) ,但仅限于Runnable

/** Enable and disable the interrupt */
private static class Interruptor {

    volatile boolean interrupted;
    volatile Runnable interrupt;

    /** Enable interrupt support */
    synchronized boolean start() {
        if (interrupted) {
            return false;
        }
        Thread runThread = Thread.currentThread();
        interrupt = () -> {
            if (runThread != Thread.currentThread()) {
                runThread.interrupt();
            }
        };
        return true;
    }

    /** Interrupt Runnable */
    synchronized void interrupt() {
        if (interrupted) {
            return;
        }
        interrupted = true;
        if (interrupt != null) {
            interrupt.run();
            interrupt = null;
        }
    }

    /** Disable interrupt support */
    synchronized void finish() {
        interrupt = null;
    }
}


/** CompletableFuture with interrupt support */
public static CompletableFuture<Void> runAsyncInterrupted(Runnable run) {

    final Interruptor interruptor = new Interruptor();

    Runnable wrap = () -> {
        if (!interruptor.start()) { // allow interruption
            return; // was canceled before the thread started
        }
        try {
            run.run(); // can be interrupted
        } finally {
            interruptor.finish(); // can no longer be interrupted
        }
    };

    CompletableFuture<Void> cfRun = CompletableFuture.runAsync(wrap);

    // here is caught "CompletableFuture.cancel()"
    cfRun.whenComplete((r, t) -> {
        if (t instanceof CancellationException) {
            interruptor.interrupt();
        }
    });

    return cfRun;
}

使用示例

Runnable mySlowIoRun = () -> {
    try {
        InputStream is = openSomeResource(); // open resource
        try {
            // there may be problem (#1) with reading,
            // such as loss of network connection
            int bt = is.read();
            // ..
            // .. some code
        } finally {
            is.close(); // problem (#2): releases any system resources associated with the stream
        }
    } catch (Throwable th) {
        throw new RuntimeException(th);
    }
};

CompletableFuture<Void> cf = runAsyncInterrupted(mySlowIoRun);

try {
    cf.get(5, TimeUnit.SECONDS); // 5 sec timeout
} catch (Throwable th) {
    cf.cancel(true); // cancel with interrupt mySlowIoRun
    throw th;
}
于 2018-09-11T11:42:17.967 回答
0

所以这是我通常如何处理问题的概括。传入可取消状态,并在打开状态后立即关闭资源。

private static BufferedReader openFile(String fn) {
    try {
        return Files.newBufferedReader(Paths.get(fn));
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

static class Util {
    static void closeQuietly(AutoCloseable c) {
        if (c == null) return;
        try {
            c.close();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    static <T extends AutoCloseable, R> R runThenCloseQuietly(T c, Function<T,R> cb) {
        try {
            return cb.apply(c);
        } finally {
            closeQuietly(c);
        }
    }

    static <T extends AutoCloseable, R> Optional<R> runThenCloseQuietlyCancellable(BooleanSupplier cancelled
        , T c, Function<T,Optional<R>> cb) {
        if (c == null) return Optional.empty(); // safe doesn't throw
        try {
            if (cancelled.getAsBoolean()) return Optional.empty(); // might throw, wrap for safety
            return cb.apply(c); // might throw
        } finally {
            closeQuietly(c); // might throw, but at least we're closed
        }
    }

    private static Optional<String> emptyString() {
        return Optional.empty();
    }
}

interface Cancellable {
    boolean isCancelled();
    void cancel();
}

static class CancellableAB implements Cancellable {
    private final AtomicBoolean cancelled;

    CancellableAB(AtomicBoolean cancelled) {
        this.cancelled = cancelled;
    }

    @Override
    public boolean isCancelled() {
        return cancelled.get();
    }

    @Override
    public void cancel() {
        cancelled.set(true);
    }
}
static class CancellableArray implements Cancellable {
    private final boolean[] cancelled;
    private final int idx;
    CancellableArray(boolean[] cancelled) {
        this(cancelled, 0);
    }
    CancellableArray(boolean[] cancelled, int idx) {
        this.cancelled = cancelled;
        this.idx = idx;
    }

    @Override
    public boolean isCancelled() {
        return cancelled[idx];
    }

    @Override
    public void cancel() {
        cancelled[idx]=true;
    }
}

static class CancellableV implements Cancellable {
    volatile boolean cancelled;

    @Override
    public boolean isCancelled() {
        return cancelled;
    }

    @Override
    public void cancel() {
        this.cancelled = true;
    }
}

/**
 * The only reason this is a class is because we need SOME external object for the lambda to check for mutated
 * cancelled state.
 * This gives the added benefit that we can directly call cancel on the resource.
 * We allow a cancellable to be passed in to CHAIN-IN cancellable state.  e.g. if cancellation should affect MULTIPLE
 * CompletableFuture states, we don't want other promises to tie references to this task.. So the cancellable
 * object can be externalized.
 * 
 * Normally you don't need this much genericism, you can directly implement a volatile 'cancel boolean'.
 * But this allows you to create a C.F. task as a 3rd party library call - gives maximum flexibility to invoker.
 *
 */
static class FooTask {
    volatile Cancellable cancelled;
    String fileName;

    public FooTask(String fileName) {
        this.fileName = fileName;
        this.cancelled = new CancellableV();
    }

    public FooTask(String fileName, Cancellable cancelled) {
        this.cancelled = cancelled;
    }


    public boolean isCancelled() {
        return cancelled.isCancelled();
    }

    public void cancel() {
        cancelled.cancel();
    }

    /**
     * asynchronously opens file, scans for first valid line (closes file), then processes the line.
     * Note if an exception happens, it's the same as not finding any lines. Don't need to special case.
     * Use of utility functions is mostly for generic-mapping
     * (avoiding annoying double-type-casting plus editor warnings)
     */
    CompletableFuture<Optional<Long>> run1() {
        return
            CompletableFuture.supplyAsync(() -> openFile(fileName))
                .thenApplyAsync(c ->  { // this stage MUST close the prior stage
                        if(cancelled.isCancelled() || c == null) return Util.emptyString(); // shouldn't throw
                        try {
                            return c
                                .lines()
                                .filter(line -> !cancelled.isCancelled())
                                .filter(line -> !line.startsWith("#"))
                                .findFirst();
                        } catch (RuntimeException e) {
                            Util.closeQuietly(c);
                            throw new RuntimeException(e);
                        }
                    }
                )
                .thenApplyAsync(oLine -> // this stage doesn't need closing
                    oLine
                        .map(line -> line.split(":"))
                        .map(cols -> cols[2])
                        .map(Long::valueOf)
                        )
            ;
    }


    /**
     * Same as run1 but avoids messy brackets + try-finally
     */
    CompletableFuture<Optional<Long>> run2() {
        return
            CompletableFuture.supplyAsync(() -> openFile(fileName))
                .thenApplyAsync(c ->  // this stage MUST close the prior stage
                    Util.runThenCloseQuietly(
                        c
                        , r -> cancelled.isCancelled() ? Util.emptyString() // shouldn't throw
                            : r
                            .lines()
                            .filter(line -> !cancelled.isCancelled())
                            .filter(line -> !line.startsWith("#"))
                            .findFirst()
                    ))
                .thenApplyAsync(oLine -> // this stage doesn't need closing
                    oLine
                        .map(line -> line.split(":"))
                        .map(cols -> cols[2])
                        .map(Long::valueOf)
                        )
            ;
    }

    /**
     * Same as run2 but avoids needing the teneary operator - says Cancellable in func-name so is more readable
     */
    CompletableFuture<Optional<Long>> run3() {
        return
            CompletableFuture.supplyAsync(() -> openFile(fileName))
                .thenApplyAsync(c ->  // this stage MUST close the prior stage
                    Util.runThenCloseQuietlyCancellable(
                    cancelled::isCancelled // lambda here is slightly easier to read than explicit if-statement
                    , c
                    , r ->  r
                            .lines()
                            .filter(line -> !cancelled.isCancelled())
                            .filter(line -> !line.startsWith("#"))
                            .findFirst()
                ))
                .thenApplyAsync(oLine -> // this stage doesn't need closing
                    oLine
                        .map(line -> line.split(":"))
                        .map(cols -> cols[2])
                        .map(Long::valueOf)
                        )
        ;
    }

}

@Test
public void testFooGood() {
    var task = new FooTask("/etc/passwd");
    var cf = task.run3();

    var oVal = cf.join();
    assertTrue(oVal.isPresent());
    System.out.println(oVal.get()); // should not throw
}

@Test
public void testFooCancel() {
    var task = new FooTask("/etc/passwd");
    var cf = task.run3();
    task.cancel();

    var oVal = cf.join();
    assertTrue(oVal.isEmpty());
}
于 2019-07-15T15:41:58.733 回答