32

我一直在使用 Java 8 中的 CompletionStage/CompletableFuture 来进行异步处理,效果很好。但是,有时我想要一个阶段来执行迭代器/项目流的异步处理,但似乎没有办法做到这一点。

具体来说, Stream.forEach() 具有在调用后所有项目都已处理的语义。我想要同样的东西,但是用 CompletionStage 代替,例如:

CompletionStage<Void> done = stream.forEach(...);
done.thenRun(...);

如果 Stream 由异步流结果支持,这将比在上述代码本身中等待它完成要好得多。

是否可以用当前的 Java 8 API 以某种方式构造它?解决方法?

4

5 回答 5

23

据我所知,流 API 不支持异步事件处理。听起来您想要类似 Reactive Extensions for .NET 之类的东西,并且有一个名为RxJava的 Java 端口,由 Netflix 创建。

RxJava 支持许多与 Java 8 流相同的高级操作(例如 map 和 filter)并且是异步的。

更新:现在有一个响应式流计划正在进行中,看起来 JDK 9 将通过Flow类包括对它的至少一部分的支持。

于 2013-09-16T11:16:32.163 回答
7

正如@KarolKrol 所暗示的那样,您可以使用CompletableFuture.

有一个构建在 JDK8 流之上的库,以方便使用CompletableFuture称为cyclops-react的流。

要编写你的流,你可以使用 cyclops-react 的 fluent promise ike API 或者你可以使用 simple-react 的Stages

于 2015-03-05T15:57:25.393 回答
3

cyclops-react(我是这个库的作者),提供了一个StreamUtils类来处理 Streams。它提供的功能之一是 futureOperations,它提供了对标准 Stream 终端操作(然后是一些)的访问,但有一点不同——Stream 是异步执行的,结果在 CompletableFuture 中返回。。例如

 Stream<Integer> stream = Stream.of(1,2,3,4,5,6)
                                       .map(i->i+2);
 CompletableFuture<List<Integer>> asyncResult =  StreamUtils.futureOperations(stream,
                                             Executors.newFixedThreadPool(1))
                                       .collect(Collectors.toList());

还有一个方便的类 ReactiveSeq,它包装了 Stream 并提供了相同的功能,并带有一个很好的流式 API

 CompletableFuture<List<Integer>> asyncResult = ReactiveSeq.of(1,2,3,4,5,6)
                                       .map(i->i+2)
                                       .futureOperations(
                                             Executors.newFixedThreadPool(1))
                                       .collect(Collectors.toList());

正如 Adam 指出的那样,cyclops -react FutureStreams 旨在异步处理数据(通过将 Futures 和 Streams 混合在一起) - 它特别适合涉及阻塞 I/O 的多线程操作(例如读取文件、进行 db 调用、休息电话等)。

于 2015-09-21T11:13:14.520 回答
1

可以生成一个流,将每个元素映射到CompletionStage,并使用 收集结果CompletionStage.thenCombine(),但是生成的代码不会比使用这样的简单 for 更具可读性。

    CompletionStage<Collection<Result>> collectionStage =
        CompletableFuture.completedFuture(
            new LinkedList<>()
        );

    for (Request request : requests) {
        CompletionStage<Result> resultStage = performRequest(request);
        collectionStage = collectionStage.thenCombine(
            resultStage,
            (collection, result) -> {
                collection.add(result);
                return collection;
            }
        );
    }

    return collectionStage;

这个例子可以很容易地转换为函数式 forEach 而不会失去可读性。但是使用流的reduceorcollect需要额外的不太好的代码。

更新:CompletableFuture.allOfCompletableFuture.join提供另一种更易读的方式将未来结果集合转换为未来结果集合。

于 2017-03-28T21:10:47.813 回答
0

如前所述,使用反应式编程:

Maven:
groupId: org.projectreactor
artifactId: reactor-spring

示例:

Flux.fromIterable(Arrays.asList("een", "twee"))
    .flatMap(string -> translateDutchToEnglish(string))
    .parallel()
    .runOn(Schedulers.parallel())
    .sequential()
    .collectList()
    .block();

它的作用是创建一个 Flux,它是一组可解析的项目,您可以在其上执行一个操作:带有回调的 flatMap。
然后,您使用 parallel() 方法使其异步。runOn() 用于指示在哪个线程上运行它。Schedulers.parallel() 表示使用与 cpu 内核一样多的线程。
sequence() 将其恢复为同步状态,让 collectList() 再次将所有结果收集到一个列表中。
要等待所有线程完成其业务,请使用 block()。结果是带有结果的列表。

于 2019-10-24T16:32:22.213 回答