6

我的问题是如何使用 Completable Future。

我有一个实现 Callable 的类。

public class Executor implements Callable<Collection>

早先是用来做 -

service.submit(collectorService);

这将返回一个Future<Collection>. 但是我们不想再使用 future 并且需要CompletableFuture. 一个想法是我们不需要使用 CompletableFuture 进行轮询,也不需要等待和阻塞,直到它准备好。

那么我将如何使用可完成的未来并isDone()callable线程完成时调用一个函数。

4

4 回答 4

8

给定 a CompletableFuture<T> f,您可以使用以下方法启动同步或异步任务以在完成后运行:

f.thenApply(result -> isDone(result));      // sync callback
f.thenApplyAsync(result -> isDone(result)); // async callback

...或者,如果您不需要结果:

f.thenRun(() -> isDone());
f.thenRunAsync(() -> isDone());
于 2015-03-09T13:32:26.933 回答
2

您可以创建一个调用现有 collectorService 的 lambda 表达式。CompletableFuture.supplyAsync 将接受的供应商 lambda 表达式看起来像这样

 Supplier<Collection> supplier = () -> collectorService.call();

并且可以与 CompletableFuture 一起使用,如下所示

  CompletableFuture.supplyAsync(() -> collectorService.call(),service)
         .thenApply(collection->isDone(collection);

正如其他人指出的那样, thenApply 将在 collectorService.call() 方法返回结果时执行 - 在执行 Future 任务的同一线程上。使用 thenApplyAsync 会将另一个任务重新提交给执行程序服务(原始性能大约要慢一个数量级,因此除非您也有充分的理由,否则不要这样做!)。

于 2015-09-18T11:22:52.763 回答
1

如果我对您的理解正确,您想知道如何提交返回 CompletableFuture 的“任务”(您以前的“执行者”)。

你通过调用来做到这一点

CompletableFuture.supplyAsync(collectorService)

不同之处在于您的“执行者”现在必须实施供应商而不是可调用

于 2015-03-09T20:05:42.393 回答
0

我们不会在 completableFuture 中传递 runnable 或 callable。它采用供应商类型,这是一个功能接口。只需创建普通方法并将它们与执行器对象一起传递。参考下面的例子。

package completableFuture;

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompFuture {
    ExecutorService firstExecService = Executors.newFixedThreadPool(5);

    public static void main(String[] args) {

        CompFuture compFuture = new CompFuture();
        compFuture.testMe("Java");
    }

    public String m1(String param) {

        Random r = new Random();
        int val = r.nextInt(20) * 1000;
        System.out.println(Thread.currentThread().getName() + " " + val);

        try {
            Thread.sleep(val);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return param + " Hello my";
    }

    public void m2(String salutation) {
        System.out.println(Thread.currentThread().getName() + "  ##" + salutation + " Friend!");
    }

    public void testMe(String start) {
        System.out.println("TM: " + Thread.currentThread());

        for (int i = 0; i < 5; i++) {
            CompletableFuture.supplyAsync(() -> m1(start), firstExecService).thenAccept(s -> m2(s));
        }
    }

}

上述程序的输出:: 执行时间最短的线程首先给出它的输出。

TM:线程[主,5,主]

池 1 线程 1 1000

池 1 线程 2 14000

池 1 线程 4 3000

池 1 线程 3 0

池 1 线程 5 9000

pool-1-thread-3 ##Java 你好,我的朋友!

pool-1-thread-1 ##Java 你好,我的朋友!

pool-1-thread-4 ##Java 你好,我的朋友!

pool-1-thread-5 ##Java 你好,我的朋友!

pool-1-thread-2 ##Java 你好,我的朋友!

于 2019-09-05T07:03:32.303 回答