48

请参阅下面的简单示例,该示例计算列表中每个单词的出现次数:

Stream<String> words = Stream.of("a", "b", "a", "c");
Map<String, Integer> wordsCount = words.collect(toMap(s -> s, s -> 1,
                                                      (i, j) -> i + j));

最后,wordsCount{a=2, b=1, c=1}

但是我的流非常大,我想并行化这项工作,所以我写:

Map<String, Integer> wordsCount = words.parallel()
                                       .collect(toMap(s -> s, s -> 1,
                                                      (i, j) -> i + j));

但是我注意到这wordsCount很简单HashMap,所以我想知道是否需要明确要求并发映射以确保线程安全:

Map<String, Integer> wordsCount = words.parallel()
                                       .collect(toConcurrentMap(s -> s, s -> 1,
                                                                (i, j) -> i + j));

非并发收集器可以安全地与并行流一起使用,还是应该在从并行流收集时只使用并发版本?

4

3 回答 3

48

非并发收集器可以安全地与并行流一起使用,还是应该在从并行流收集时只使用并发版本?

collect在并行流的操作中使用非并发收集器是安全的。

在接口的规范Collector,在有六个要点的部分中,是这样的:

对于非并发收集器,从结果提供者、累加器或组合器函数返回的任何结果都必须是串行线程限制的。这使得收集可以并行发生,而收集器不需要实现任何额外的同步。归约实现必须管理输入被正确分区,分区被隔离处理,并且只有在累积完成后才发生组合。

这意味着Collectors该类提供的各种实现可以与并行流一起使用,即使其中一些实现可能不是并发收集器。这也适用于您可能实现的任何您自己的非并发收集器。它们可以安全地与并行流一起使用,前提是您的收集器不干扰流源、无副作用、与顺序无关等。

我还建议阅读 java.util.stream 包文档的Mutable Reduction部分。本节中间是一个示例,该示例被声明为可并行化的,但它将结果收集到一个ArrayList中,这不是线程安全的。

其工作方式是在非并发收集器中结束的并行流确保不同的线程始终在中间结果集合的不同实例上运行。这就是为什么收集器有一个Supplier功能,用于创建与线程一样多的中间集合,因此每个线程都可以累积成自己的。当要合并中间结果时,它们会在线程之间安全地传递,并且在任何给定时间,只有一个线程正在合并任何一对中间结果。

于 2014-03-12T20:17:24.910 回答
25

所有收集器,如果它们遵循规范中的规则,则可以安全地并行或顺序运行。并行准备是这里设计的关键部分。

并发和非并发收集器之间的区别与并行化方法有关。

普通(非并发)收集器通过合并子结果进行操作。所以源被分割成一堆块,每个块被收集到一个结果容器中(如列表或地图),然后将子结果合并到一个更大的结果容器中。这是安全的并且可以保持顺序,但是对于某些类型的容器——尤其是地图——可能会很昂贵,因为通过键合并两个地图通常很昂贵。

相反,并发收集器创建一个结果容器,其插入操作被保证是线程安全的,并从多个线程将元素爆炸到其中。使用像 ConcurrentHashMap 这样的高并发结果容器,这种方法可能比合并普通的 HashMap 表现得更好。

因此,并发收集器是对其普通对应物的严格优化。他们不是没有代价的。因为元素是从许多线程中被爆破的,并发收集器通常不能保持遇到顺序。(但是,通常你并不关心——在创建字数直方图时,你并不关心你首先计算了哪个“foo”实例。)

于 2014-04-29T16:36:32.743 回答
12

将非并发集合和非原子计数器与并行流一起使用是安全的。

如果您查看Stream::collect的文档,您会发现以下段落:

就像reduce(Object, BinaryOperator),收集操作可以并行化,而不需要额外的同步。

对于Stream::reduce方法:

虽然与简单地在循环中改变运行总数相比,这似乎是一种更迂回的方式来执行聚合,但归约操作更优雅地并行化,不需要额外的同步,并且大大降低了数据竞争的风险。

这可能有点令人惊讶。但是,请注意并行流基于fork-join 模型。这意味着并发执行的工作方式如下:

  • 将序列分成大小大致相同的两部分
  • 单独处理每个部分
  • 收集两个部分的结果并将它们组合成一个结果

在第二步中,这三个步骤递归地应用于子序列。

一个例子应该清楚地说明这一点。这

IntStream.range(0, 4)
    .parallel()
    .collect(Trace::new, Trace::accumulate, Trace::combine);

Trace类的唯一目的是记录构造函数和方法调用。如果执行此语句,它将打印以下行:

thread:  9  /  operation: new
thread: 10  /  operation: new
thread: 10  /  operation: accumulate
thread:  1  /  operation: new
thread:  1  /  operation: accumulate
thread:  1  /  operation: combine
thread: 11  /  operation: new
thread: 11  /  operation: accumulate
thread:  9  /  operation: accumulate
thread:  9  /  operation: combine
thread:  9  /  operation: combine

可以看到,已经创建了四个Trace对象,对每个对象调用了一次accumulate ,并使用了combine三次将四个对象合并为一个。每个对象一次只能被一个线程访问。这使得代码是线程安全的,同样适用于方法Collectors::toMap

于 2014-03-12T20:22:18.550 回答