2

介绍

我目前正在开发一个我使用的程序Java.util.Collection.parallelStream(),并想知道是否有可能使其更加多线程。

几个小地图

我想知道使用多个map是否可以Java.util.Collection.parallelStream()更好地分配任务:

List<InsertOneModel<Document>> bulkWrites = puzzles.parallelStream()
        .map(gson::toJson)
        .map(Document::parse)
        .map(InsertOneModel::new)
        .toList();

单张大图

例如,比以下更好的分布:

List<InsertOneModel<Document>> bulkWrites = puzzles.parallelStream()
        .map(puzzle -> new InsertOneModel<>(Document.parse(gson.toJson(puzzle))))
        .toList();

问题

有没有更适合的方案之一Java.util.Collection.parallelStream(),还是两者没有太大区别?

4

4 回答 4

4

我查看了Stream源代码。map 操作的结果只是被输入到下一个操作中。map()所以一个大调用和几个小调用几乎没有区别map()

对于map()操作而言,并行Stream根本没有区别。Thread这意味着在任何情况下,每个输入对象都将被处理到最后。

另请注意:并行Stream仅在操作链允许并且有足够的数据要处理时才拆分工作。因此,对于不允许随机访问的 smallCollection或 a ,parallel 的行为类似于 sequence 。CollectionStreamStream

于 2022-01-30T08:21:21.273 回答
2

如果您将它与多个地图链接起来,我认为它不会做得更好。如果您的代码不是很复杂,我宁愿使用单个大地图
要理解这一点,我们必须检查map函数内部的代码。关联

public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
    Objects.requireNonNull(mapper);
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
            return new Sink.ChainedReference<P_OUT, R>(sink) {
                @Override
                public void accept(P_OUT u) {
                    downstream.accept(mapper.apply(u));
                }
            };
        }
    };
}

正如你所看到的,很多事情发生在幕后。创建多个对象并调用多个方法。因此,对于每个链接的map函数调用,所有这些都会重复。

现在回过头来ParallelStreams,他们研究并行性的概念。
流文档

在此处输入图像描述
并行流是将其元素拆分为多个块的流,并使用不同的线程处理每个块。因此,您可以在多核处理器的所有内核上自动划分给定操作的工作负载,并保持所有内核同样繁忙。

Parallel流在内部使用 default ForkJoinPool,默认情况下,它的线程数与处理器数一样多,由Runtime.getRuntime().availableProcessors(). 但是您可以使用系统属性更改此池的大小java.util.concurrent.ForkJoinPool.common.parallelism.

ParallelStream在集合对象上调用 spliterator() ,该对象返回一个Spliterator实现,该实现提供了拆分任务的逻辑。每个源或集合都有自己的拆分器实现。使用这些拆分器,并行流尽可能长时间地拆分任务,最后当任务变得太小时,它会顺序执行它并合并来自所有子任务的部分结果。

所以我更喜欢parallelStream什么时候

  • 我一次要处理大量数据
  • 我有多个核心来处理数据
  • 现有实现的性能问题
  • 我已经没有运行多线程进程,因为它会增加复杂性。

性能影响

  • 开销:有时当数据集很小时,将顺序流转换为并行流会导致性能下降。管理threads、来源和结果的开销是比实际工作更昂贵的操作。
  • 拆分Arrays可以便宜且均匀地拆分,但LinkedList没有这些属性。TreeMap并且HashSet拆分比数组好,LinkedList但不如数组好。
  • 合并:合并操作对于一些操作来说真的很便宜,比如归约和加法,但是像分组到集合或映射这样的合并操作可能非常昂贵。

结论:大量数据和每个元素完成的许多计算表明并行性可能是一个不错的选择。

于 2022-01-30T19:06:58.253 回答
2

这三个步骤(toJson/parse/new)必须按顺序执行,因此您实际上所做的只是比较s.map(g.compose(f))s.map(f).map(g). 由于是 monad,Java Streams 是函子,第二函子定律指出,本质上,s.map(g.compose(f)) == s.map(f).map(g)表示计算的两种替代方式将产生相同的结果。从性能的角度来看,两者之间的差异可能很小。

但是,通常您应该小心使用Collection.parallelStream. 它使用通用的 forkJoinPool,本质上是整个 JVM 共享的固定线程池。池的大小由主机上的核心数决定。使用公共池的问题是同一进程中的其他线程也可能与您的代码同时使用它。这可能会导致您的代码随机且莫名其妙地变慢 - 例如,如果代码的另一部分暂时耗尽了公共线程池。

更可取的是通过使用 Executors 上的创建者方法之一创建您自己的 ExecutorService,然后将您的任务提交给它。

private static final ExecutorService EX_SVC = Executors.newFixedThreadPool(16);

public static List<InsertOneModel<Document>> process(Stream<Puzzle> puzzles) throws InterruptedException {
    final Collection<Callable<InsertOneModel<Document>>> callables =
            puzzles.map(puzzle ->
                    (Callable<InsertOneModel<Document>>)
                            () -> new InsertOneModel<>(Document.parse(gson.toJson(puzzle)))
            ).collect(Collectors.toList());

    return EX_SVC.invokeAll(callables).stream()
            .map(fut -> {
                try {
                    return fut.get();
                } catch (ExecutionException|InterruptedException ex) {
                    throw new RuntimeException(ex);
                }
            }).collect(Collectors.toList());
}
于 2022-01-31T21:51:24.660 回答
1

我怀疑性能上有很大的不同,但即使你证明它确实有更快的性能,我仍然希望在我必须维护的代码中看到并使用第一种样式。

第一种多地图样式更容易被其他人理解,更容易维护和调试——例如peek为处理链的任何阶段添加阶段。

List<InsertOneModel<Document>> bulkWrites = puzzles.parallelStream()
    .map(gson::toJson)
    // easy to make changes for debug, moving peek up/down
    // .peek(System.out::println)
    .map(Document::parse)
    // easy to filter:
    // .filter(this::somecondition)
    .map(InsertOneModel::new)
    .toList();

如果您的需求发生变化 - 例如需要过滤输出,或通过拆分为 2 个集合来捕获中间数据,则第一种方法每次都胜过第二次。

于 2022-01-30T10:12:04.290 回答