3

我想用ManagedBlockerwithCompletableFuture来防止ForkJoinPool.commonPool疲惫,即:

@Test
public void testCompletableFutureWithManagedBlocker() throws ExecutionException, InterruptedException {
    final long startingTime = System.currentTimeMillis();
    final int numberOfFuture = 32;
    final CountDownLatch countDownLatch = new CountDownLatch(numberOfFuture);

    final List<CompletableFuture<Void>> futures = Stream
            .generate(() -> CompletableFuture.runAsync(() -> {
                countDownLatch.countDown();
                BlockingTasks.callInManagedBlock((() -> {sleep(); return null; }));
            }))
            .limit(numberOfFuture)
            .collect(Collectors.toList());

    futures.forEach((future) -> {
        try { countDownLatch.await(); } catch (InterruptedException ignored) {}
        future.join();
    });

    System.out.println("Time taken roughly: [" + (System.currentTimeMillis() - startingTime) + "]ms");
}

public class BlockingTasks {

    public static<T> T callInManagedBlock(final Supplier<T> supplier) {
        final SupplierManagedBlock<T> managedBlock = new SupplierManagedBlock<>(supplier);
        try {
            ForkJoinPool.managedBlock(managedBlock);
        } catch (InterruptedException e) {
            throw new Error(e);
        }
        return managedBlock.getResult();
    }

    private static class SupplierManagedBlock<T> implements ForkJoinPool.ManagedBlocker {
        private final Supplier<T> supplier;
        private T result;
        private boolean done = false;

        private SupplierManagedBlock(final Supplier<T> supplier) {
            this.supplier = supplier;
        }

        @Override
        public boolean block() {
            result = supplier.get();
            done = true;
            return true;
        }

        @Override
        public boolean isReleasable() {
            return done;
        }

        public T getResult() {
            return result;
        }
    }
}

我应该注意使用ManagedBlockerwith的任何注意事项吗?CompletableFuture

4

0 回答 0