cyclops-react(我是这个库的作者),提供了一个StreamUtils类来处理 Streams。它提供的功能之一是 futureOperations,它提供了对标准 Stream 终端操作(然后是一些)的访问,但有一点不同——Stream 是异步执行的,结果在 CompletableFuture 中返回。。例如
Stream<Integer> stream = Stream.of(1,2,3,4,5,6)
.map(i->i+2);
CompletableFuture<List<Integer>> asyncResult = StreamUtils.futureOperations(stream,
Executors.newFixedThreadPool(1))
.collect(Collectors.toList());
还有一个方便的类 ReactiveSeq,它包装了 Stream 并提供了相同的功能,并带有一个很好的流式 API
CompletableFuture<List<Integer>> asyncResult = ReactiveSeq.of(1,2,3,4,5,6)
.map(i->i+2)
.futureOperations(
Executors.newFixedThreadPool(1))
.collect(Collectors.toList());
正如 Adam 指出的那样,cyclops -react FutureStreams 旨在异步处理数据(通过将 Futures 和 Streams 混合在一起) - 它特别适合涉及阻塞 I/O 的多线程操作(例如读取文件、进行 db 调用、休息电话等)。