6

我想为一项任务使用并行数组,在开始编码之前,我想知道这个小片段是否是线程安全的:

import collection.mutable._

var listBuffer = ListBuffer[String]("one","two","three","four","five","six","seven","eight","nine")
var jSyncList  = java.util.Collections.synchronizedList(new java.util.ArrayList[String]())
listBuffer.par.foreach { e =>
    println("processed :"+e)
    // using sleep here to simulate a random delay
    Thread.sleep((scala.math.random * 1000).toLong)
    jSyncList.add(e)
}
jSyncList.toArray.foreach(println)

有没有更好的方法来处理并行集合,并在其他地方累积结果?

4

5 回答 5

6

您发布的代码非常安全;不过,我不确定前提:为什么需要将并行集合的结果累积到非并行集合中?并行集合的全部要点之一是它们看起来像其他集合。

我认为并行集合也将提供一种seq切换到顺序集合的方法。所以你可能应该使用这个!

于 2011-05-07T12:20:32.113 回答
3

为了使这种模式安全:

listBuffer.par.foreach { e => f(e) }

f必须能够以安全的方式同时运行。我认为应用安全多线程所需的相同规则(访问共享状态需要是线程安全的,f不同的调用顺序e不是确定性的,当您开始同步语句时可能会遇到死锁f)。

此外,我不清楚并行集合为您提供的关于在处理时修改基础集合的保证是什么,因此可以添加/删除元素的可变列表缓冲区可能是一个糟糕的选择。你永远不知道下一个编码器何时会调用foo(listBuffer)你之前的东西foreach并将该引用传递给另一个线程,该线程可能会在处理列表时对其进行变异。

除此之外,我认为对于任何f需要很长时间,可以同时调用并且e可以乱序处理的地方,这是一个很好的模式。

immutCol.par.foreach { e => threadSafeOutOfOrderProcessingOf(e) }

免责声明:我自己没有尝试过 // colls,但我期待有 SO 问题/答案向我们展示什么是有效的。

于 2011-05-07T14:27:34.777 回答
2

synchronisedList应该是安全的,尽管可能println会产生意想不到的结果——您不能保证打印项目的顺序,甚至不能保证您的 printlns 不会在字符中间交错。

同步列表也不太可能是您执行此操作的最快方式,更安全的解决方案是map覆盖不可变集合(Vector可能是您最好的选择),然后打印所有行(按顺序):

val input = Vector("one","two","three","four","five","six","seven","eight","nine")
val output  = input.par.map { e =>
  val msg = "processed :" + e
  // using sleep here to simulate a random delay
  Thread.sleep((math.random * 1000).toLong)
  msg
}
println(output mkString "\n")

您还会注意到,此代码与您的示例一样实用:)

于 2011-05-07T12:28:41.360 回答
2

这段代码很奇怪——为什么要在需要同步的东西上并行添加东西?您将增加争用并且绝对没有任何回报。

事情的原理fold- 累积并行处理的结果,最好用,reduceaggregate.

于 2011-05-07T17:19:55.860 回答
2

您发布的代码是安全的 - 由于数组列表的状态不一致,不会出现错误,因为对它的访问是同步的。

但是,并行集合同时(同时)和无序地处理项目。乱序意味着 54. 元素可能在 2. 元素之前处理 - 您的同步数组列表将包含非预定义顺序的项目。

一般来说,最好使用map,和其他功能组合器将集合转换为另一个集合 - 如果集合有一些(就像s 一样)filter,这些将确保保留排序保证。Seq例如:

ParArray(1, 2, 3, 4).map(_ + 1)

总是返回ParArray(2, 3, 4, 5)

但是,如果您需要将特定的线程安全集合类型(例如 aConcurrentSkipListMap或同步集合)传递给某些 API 中的某个方法,则从并行 foreach 修改它是安全的。

最后,注意 - 并行集合提供对数据的并行批量操作。可变并行集合不是线程安全的,因为您可以从不同的线程向它们添加元素。可变操作(例如插入地图或附加缓冲区)仍然必须同步。

于 2011-05-08T08:47:14.703 回答