0

我有这个 HashMap 数组定义如下

var distinctElementsDefinitionMap: scala.collection.mutable.ArrayBuffer[HashMap[String, Int]] = new scala.collection.mutable.ArrayBuffer[HashMap[String, Int]](300) with scala.collection.mutable.SynchronizedBuffer[HashMap[String, Int]]

现在,我有 300 个元素的并行集合

val max_length = 300
val columnArray = (0 until max_length).toParArray
import scala.collection.parallel.ForkJoinTaskSupport
columnArray.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(100))
columnArray foreach(i => {
    // Do Some Computation and get a HashMap
    var distinctElementsMap: HashMap[String, Int] = //Some Value
    //This line might result in Concurrent Access Exception
    distinctElementsDefinitionMap.update(i, distinctElementsMap)
})

我现在在上面定义的foreach循环中运行计算密集型任务。columnArray计算完成后,我希望每个线程更新distinctElementsDefinitionMap数组的特定条目。每个线程只会更新特定的索引值,对执行它的线程来说是唯一的。我想知道这个数组条目的更新是否安全,多个线程可能同时写入它?如果没有,有没有synchronized办法做到这一点,所以它是线程安全的?谢谢你!

更新:看来这确实不是安全的方法。我得到了java.util.ConcurrentModificationException 关于如何在使用并行集合时避免这种情况的任何提示。

4

1 回答 1

0

使用.groupBy操作,据我判断是并行的(不像其他一些方法,比如.sorted

case class Row(a: String, b: String, c: String)
val data = Vector(
  Row("foo", "", ""), 
  Row("bar", "", ""), 
  Row("foo", "", "")
)

data.par.groupBy(x => x.a).seq
// Map(bar -> ParVector(Row(bar,,)), foo -> ParVector(Row(foo,,), Row(foo,,)))

希望你明白了。

或者,如果您的 RAM 允许您在每列而不是行上并行处理,它必须比您当前的方法更有效(更少争用)。

val columnsCount = 3 // 300 in your case
Vector.range(0, columnsCount).par.map { column => 
  data.groupBy(row => row(column))
}.seq 

尽管即使使用单列也可能会出现内存问题(8M 行可能很多)。

于 2014-07-11T21:50:04.120 回答