3

我的问题与Scala 中的 List[List[T]] 中每个元素的出现次数非常相似,只是我希望有一个涉及并行集合的有效解决方案。

具体来说,我有一个大 (~10^7)vec的短 (~10) 个 Int 列表向量,我想为每个 Int 获取出现x的次数x,例如作为Map[Int,Int]. 不同整数的数量为 10^6。

由于需要在机器上完成此操作具有相当数量的内存 (150GB) 和内核数 (>100),因此并行集合似乎是一个不错的选择。下面的代码是一个好方法吗?

val flatpvec = vec.par.flatten
val flatvec = flatpvec.seq
val unique = flatpvec.distinct
val counts = unique map (x => (x -> flatvec.count(_ == x)))
counts.toMap

还是有更好的解决方案?如果您想知道 .seq 转换:由于某种原因,以下代码似乎没有终止,即使对于小示例也是如此:

val flatpvec = vec.par.flatten
val unique = flatpvec.distinct
val counts = unique map (x => (x -> flatpvec.count(_ == x)))
counts.toMap
4

2 回答 2

3

这有一些作用。 aggregate就像fold除了你还结合了顺序折叠的结果。

更新: 中存在开销并不奇怪.par.groupBy,但我对常数因素感到惊讶。根据这些数字,你永远不会这样计算。此外,我不得不提高记忆力。

本文描述了用于构建结果映射的有趣技术,链接自概述。(它巧妙地保存了中间结果,然后在最后将它们并行合并。)

groupBy但是,如果您真正想要的只是计数,那么复制中间结果的成本会很高。

这些数字是比较顺序的groupBy、并行的和最后aggregate的。

apm@mara:~/tmp$ scalacm countints.scala ; scalam -J-Xms8g -J-Xmx8g -J-Xss1m countints.Test
GroupBy: Starting...
Finished in 12695
GroupBy: List((233,10078), (237,20041), (268,9939), (279,9958), (315,10141), (387,9917), (462,9937), (680,9932), (848,10139), (858,10000))
Par GroupBy: Starting...
Finished in 51481
Par GroupBy: List((233,10078), (237,20041), (268,9939), (279,9958), (315,10141), (387,9917), (462,9937), (680,9932), (848,10139), (858,10000))
Aggregate: Starting...
Finished in 2672
Aggregate: List((233,10078), (237,20041), (268,9939), (279,9958), (315,10141), (387,9917), (462,9937), (680,9932), (848,10139), (858,10000))

测试代码中没有什么神奇的。

import collection.GenTraversableOnce
import collection.concurrent.TrieMap
import collection.mutable

import concurrent.duration._

trait Timed {
  def now = System.nanoTime
  def timed[A](op: =>A): A =  {
    val start = now
    val res = op
    val end = now
    val lapsed = (end - start).nanos.toMillis
    Console println s"Finished in $lapsed"
    res
  }
  def showtime(title: String, op: =>GenTraversableOnce[(Int,Int)]): Unit = {
    Console println s"$title: Starting..."
    val res = timed(op)
    //val showable = res.toIterator.min   //(res.toIterator take 10).toList
    val showable = res.toList.sorted take 10
    Console println s"$title: $showable"
  }
}

它会生成一些感兴趣的随机数据。

object Test extends App with Timed {

  val upto = math.pow(10,6).toInt
  val ran = new java.util.Random
  val ten = (1 to 10).toList
  val maxSamples = 1000
  // samples of ten random numbers in the desired range
  val samples = (1 to maxSamples).toList map (_ => ten map (_ => ran nextInt upto))
  // pick a sample at random
  def anyten = samples(ran nextInt maxSamples)
  def mag = 7
  val data: Vector[List[Int]] = Vector.fill(math.pow(10,mag).toInt)(anyten)

从任务中调用顺序操作和组合操作aggregate,并将结果分配给 volatile var。

  def z: mutable.Map[Int,Int] = mutable.Map.empty[Int,Int]
  def so(m: mutable.Map[Int,Int], is: List[Int]) = {
    for (i <- is) {
      val v = m.getOrElse(i, 0)
      m(i) = v + 1
    }
    m
  }
  def co(m: mutable.Map[Int,Int], n: mutable.Map[Int,Int]) = {
    for ((i, count) <- n) {
      val v = m.getOrElse(i, 0)
      m(i) = v + count
    }
    m
  }
  showtime("GroupBy", data.flatten groupBy identity map { case (k, vs) => (k, vs.size) })
  showtime("Par GroupBy", data.flatten.par groupBy identity map { case (k, vs) => (k, vs.size) })
  showtime("Aggregate", data.par.aggregate(z)(so, co))
}
于 2013-08-20T17:40:10.607 回答
2

如果你想使用并行集合和 Scala 标准工具,你可以这样做。按标识对您的集合进行分组,然后将其映射到 (Value, Count):

scala> val longList = List(1, 5, 2, 3, 7, 4, 2, 3, 7, 3, 2, 1, 7)
longList: List[Int] = List(1, 5, 2, 3, 7, 4, 2, 3, 7, 3, 2, 1, 7)                                                                                            

scala> longList.par.groupBy(x => x)
res0: scala.collection.parallel.immutable.ParMap[Int,scala.collection.parallel.immutable.ParSeq[Int]] = ParMap(5 -> ParVector(5), 1 -> ParVector(1, 1), 2 -> ParVector(2, 2, 2), 7 -> ParVector(7, 7, 7), 3 -> ParVector(3, 3, 3), 4 -> ParVector(4))                                                                     

scala> longList.par.groupBy(x => x).map(x => (x._1, x._2.size))
res1: scala.collection.parallel.immutable.ParMap[Int,Int] = ParMap(5 -> 1, 1 -> 2, 2 -> 3, 7 -> 3, 3 -> 3, 4 -> 1)                                           

甚至更好,如评论中建议的 pagoda_5b:

scala> longList.par.groupBy(identity).mapValues(_.size)
res1: scala.collection.parallel.ParMap[Int,Int] = ParMap(5 -> 1, 1 -> 2, 2 -> 3, 7 -> 3, 3 -> 3, 4 -> 1)
于 2013-08-20T15:49:52.500 回答