所以这是我通常如何处理问题的概括。传入可取消状态,并在打开状态后立即关闭资源。
private static BufferedReader openFile(String fn) {
try {
return Files.newBufferedReader(Paths.get(fn));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
static class Util {
static void closeQuietly(AutoCloseable c) {
if (c == null) return;
try {
c.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
static <T extends AutoCloseable, R> R runThenCloseQuietly(T c, Function<T,R> cb) {
try {
return cb.apply(c);
} finally {
closeQuietly(c);
}
}
static <T extends AutoCloseable, R> Optional<R> runThenCloseQuietlyCancellable(BooleanSupplier cancelled
, T c, Function<T,Optional<R>> cb) {
if (c == null) return Optional.empty(); // safe doesn't throw
try {
if (cancelled.getAsBoolean()) return Optional.empty(); // might throw, wrap for safety
return cb.apply(c); // might throw
} finally {
closeQuietly(c); // might throw, but at least we're closed
}
}
private static Optional<String> emptyString() {
return Optional.empty();
}
}
interface Cancellable {
boolean isCancelled();
void cancel();
}
static class CancellableAB implements Cancellable {
private final AtomicBoolean cancelled;
CancellableAB(AtomicBoolean cancelled) {
this.cancelled = cancelled;
}
@Override
public boolean isCancelled() {
return cancelled.get();
}
@Override
public void cancel() {
cancelled.set(true);
}
}
static class CancellableArray implements Cancellable {
private final boolean[] cancelled;
private final int idx;
CancellableArray(boolean[] cancelled) {
this(cancelled, 0);
}
CancellableArray(boolean[] cancelled, int idx) {
this.cancelled = cancelled;
this.idx = idx;
}
@Override
public boolean isCancelled() {
return cancelled[idx];
}
@Override
public void cancel() {
cancelled[idx]=true;
}
}
static class CancellableV implements Cancellable {
volatile boolean cancelled;
@Override
public boolean isCancelled() {
return cancelled;
}
@Override
public void cancel() {
this.cancelled = true;
}
}
/**
* The only reason this is a class is because we need SOME external object for the lambda to check for mutated
* cancelled state.
* This gives the added benefit that we can directly call cancel on the resource.
* We allow a cancellable to be passed in to CHAIN-IN cancellable state. e.g. if cancellation should affect MULTIPLE
* CompletableFuture states, we don't want other promises to tie references to this task.. So the cancellable
* object can be externalized.
*
* Normally you don't need this much genericism, you can directly implement a volatile 'cancel boolean'.
* But this allows you to create a C.F. task as a 3rd party library call - gives maximum flexibility to invoker.
*
*/
static class FooTask {
volatile Cancellable cancelled;
String fileName;
public FooTask(String fileName) {
this.fileName = fileName;
this.cancelled = new CancellableV();
}
public FooTask(String fileName, Cancellable cancelled) {
this.cancelled = cancelled;
}
public boolean isCancelled() {
return cancelled.isCancelled();
}
public void cancel() {
cancelled.cancel();
}
/**
* asynchronously opens file, scans for first valid line (closes file), then processes the line.
* Note if an exception happens, it's the same as not finding any lines. Don't need to special case.
* Use of utility functions is mostly for generic-mapping
* (avoiding annoying double-type-casting plus editor warnings)
*/
CompletableFuture<Optional<Long>> run1() {
return
CompletableFuture.supplyAsync(() -> openFile(fileName))
.thenApplyAsync(c -> { // this stage MUST close the prior stage
if(cancelled.isCancelled() || c == null) return Util.emptyString(); // shouldn't throw
try {
return c
.lines()
.filter(line -> !cancelled.isCancelled())
.filter(line -> !line.startsWith("#"))
.findFirst();
} catch (RuntimeException e) {
Util.closeQuietly(c);
throw new RuntimeException(e);
}
}
)
.thenApplyAsync(oLine -> // this stage doesn't need closing
oLine
.map(line -> line.split(":"))
.map(cols -> cols[2])
.map(Long::valueOf)
)
;
}
/**
* Same as run1 but avoids messy brackets + try-finally
*/
CompletableFuture<Optional<Long>> run2() {
return
CompletableFuture.supplyAsync(() -> openFile(fileName))
.thenApplyAsync(c -> // this stage MUST close the prior stage
Util.runThenCloseQuietly(
c
, r -> cancelled.isCancelled() ? Util.emptyString() // shouldn't throw
: r
.lines()
.filter(line -> !cancelled.isCancelled())
.filter(line -> !line.startsWith("#"))
.findFirst()
))
.thenApplyAsync(oLine -> // this stage doesn't need closing
oLine
.map(line -> line.split(":"))
.map(cols -> cols[2])
.map(Long::valueOf)
)
;
}
/**
* Same as run2 but avoids needing the teneary operator - says Cancellable in func-name so is more readable
*/
CompletableFuture<Optional<Long>> run3() {
return
CompletableFuture.supplyAsync(() -> openFile(fileName))
.thenApplyAsync(c -> // this stage MUST close the prior stage
Util.runThenCloseQuietlyCancellable(
cancelled::isCancelled // lambda here is slightly easier to read than explicit if-statement
, c
, r -> r
.lines()
.filter(line -> !cancelled.isCancelled())
.filter(line -> !line.startsWith("#"))
.findFirst()
))
.thenApplyAsync(oLine -> // this stage doesn't need closing
oLine
.map(line -> line.split(":"))
.map(cols -> cols[2])
.map(Long::valueOf)
)
;
}
}
@Test
public void testFooGood() {
var task = new FooTask("/etc/passwd");
var cf = task.run3();
var oVal = cf.join();
assertTrue(oVal.isPresent());
System.out.println(oVal.get()); // should not throw
}
@Test
public void testFooCancel() {
var task = new FooTask("/etc/passwd");
var cf = task.run3();
task.cancel();
var oVal = cf.join();
assertTrue(oVal.isEmpty());
}