10

给定一个非常大的collection.parallel.mutable.ParHashMap实例(或任何其他并行集合),一旦找到给定的(例如 50 个)匹配数,如何中止过滤并行扫描?

尝试在线程安全的“外部”数据结构中累积中间匹配项或保留具有结果计数的外部 AtomicInteger 在 4 个内核上似乎比使用常规collection.mutable.HashMap并将单个内核固定为 100慢 2 到 3 倍%。

我知道Par* 集合上的find存在确实会“在内部”中止。有没有一种方法可以概括这一点以找到多个结果?

这里的代码在 ParHashMap 上似乎仍然慢了 2 到 3 倍,大约有 79,000 个条目,并且还存在将超过 maxResults 结果填充到结果 CHM 中的问题可能是由于线程在incrementAndGet之后但在break之前被抢占允许其他线程添加更多元素)。更新:似乎速度变慢是由于工作线程在 counter.incrementAndGet() 上竞争,这当然违背了整个并行扫描的目的:-(

def find(filter: Node => Boolean, maxResults: Int): Iterable[Node] =
{
  val counter = new AtomicInteger(0)
  val results = new ConcurrentHashMap[Key,  Node](maxResults)

  import util.control.Breaks._

  breakable
  {
    for ((key, node) <- parHashMap if filter(node))
    {
      results.put(key, node)
      val total = counter.incrementAndGet()
      if (total > maxResults) break
    }
  }

  results.values.toArray(new Array[Node](results.size))
}
4

3 回答 3

2

我将首先进行并行扫描,其中变量 maxResults 将是线程本地的。这将找到最多 (maxResults * numberOfThreads) 个结果。

然后我会进行单线程扫描以将其减少到 maxResults。

于 2011-11-24T11:17:14.147 回答
1

我对你的案子进行了一次有趣的调查。

调查推理

我怀疑问题出在输入 Map 的可变性上,我将尝试向您解释原因: HashMap 实现将数据组织在不同的存储桶中,正如在 Wikipedia 上所见。

维基百科哈希图

Java 中的第一个线程安全集合,同步集合基于同步底层实现的所有方法,导致性能不佳。进一步的研究和思考带来了性能更高的并发集合,例如 ConcurrentHashMap 哪种方法更智能:为什么我们不用特定的锁来保护每个存储桶?

根据我的感觉,出现性能问题是因为:

  • 当您并行运行过滤器时,一些线程会在一次访问同一个存储桶时发生冲突,并且会遇到同一个锁,因为您的地图是mutable
  • 您拿着一个计数器来查看您有多少结果,同时您可以实际检查结果的大小。如果您有一种线程安全的方式来构建集合,那么您也不需要线程安全的计数器。

调查结果

我开发了一个测试用例,我发现我错了。问题在于输出映射的并发性。事实上,这就是发生碰撞的地方,当您将元素放入地图时,而不是在您对其进行迭代时。此外,由于您只需要值的结果,因此不需要键和散列以及所有地图功能。如果您删除AtomicCounter并且仅使用result地图来检查您是否收集了足够的元素,那么您的版本执行情况可能会很有趣。

请注意 Scala 2.9.2 中的以下代码。我在另一篇文章中解释了为什么并行版本和非并行版本需要两个不同的函数:Calling map on a parallel collection via a reference to an ancient type

object MapPerformance {

  val size = 100000
  val items = Seq.tabulate(size)( x => (x,x*2))


  val concurrentParallelMap = ImmutableParHashMap(items:_*)

  val concurrentMutableParallelMap = MutableParHashMap(items:_*)

  val unparallelMap = Map(items:_*)


  class ThreadSafeIndexedSeqBuilder[T](maxSize:Int) {
    val underlyingBuilder = new VectorBuilder[T]()
    var counter = 0
    def sizeHint(hint:Int) { underlyingBuilder.sizeHint(hint) }
    def +=(item:T):Boolean ={
      synchronized{
        if(counter>=maxSize)
          false
        else{
          underlyingBuilder+=item
          counter+=1
          true
        }
      }
    }
    def result():Vector[T] = underlyingBuilder.result()

  }

  def find(map:ParMap[Int,Int],filter: Int => Boolean, maxResults: Int): Iterable[Int] =
  {

    // we already know the maximum size
    val resultsBuilder = new ThreadSafeIndexedSeqBuilder[Int](maxResults)
    resultsBuilder.sizeHint(maxResults)

    import util.control.Breaks._
    breakable
    {
      for ((key, node) <- map if filter(node))
      {
        val newItemAdded = resultsBuilder+=node
        if (!newItemAdded)
          break()

      }
    }
    resultsBuilder.result().seq

  }

  def findUnParallel(map:Map[Int,Int],filter: Int => Boolean, maxResults: Int): Iterable[Int] =
  {

    // we already know the maximum size
    val resultsBuilder = Array.newBuilder[Int]
    resultsBuilder.sizeHint(maxResults)

    var counter = 0
      for {
        (key, node) <- map if filter(node)
        if counter < maxResults
      }{
        resultsBuilder+=node
        counter+=1
      }

    resultsBuilder.result()

  }

  def measureTime[K](f: => K):(Long,K) = {
    val startMutable = System.currentTimeMillis()
    val result = f
    val endMutable = System.currentTimeMillis()
    (endMutable-startMutable,result)
  }

  def main(args:Array[String]) = {
    val maxResultSetting=10
    (1 to 10).foreach{
      tryNumber =>
        println("Try number " +tryNumber)
        val (mutableTime, mutableResult) = measureTime(find(concurrentMutableParallelMap,_%2==0,maxResultSetting))
        val (immutableTime, immutableResult) = measureTime(find(concurrentMutableParallelMap,_%2==0,maxResultSetting))
        val (unparallelTime, unparallelResult) = measureTime(findUnParallel(unparallelMap,_%2==0,maxResultSetting))
        assert(mutableResult.size==maxResultSetting)
        assert(immutableResult.size==maxResultSetting)
        assert(unparallelResult.size==maxResultSetting)
        println(" The mutable version has taken " + mutableTime + " milliseconds")
        println(" The immutable version has taken " + immutableTime + " milliseconds")
        println(" The unparallel version has taken " + unparallelTime + " milliseconds")
     }
  }

}

使用此代码,我系统地并行(输入映射的可变和不可变版本)比我的机器上的无与伦比的速度快大约 3.5 倍。

于 2013-01-18T18:05:50.153 回答
0

您可以尝试获取一个迭代器,然后创建一个惰性列表(一个 Stream),您可以在其中过滤(使用您的谓词)并获取您想要的元素数量。因为它是非严格的,所以不会评估元素的这种“获取”。之后,您可以通过将“.par”添加到整个事物来强制执行并实现并行化。

示例代码:

具有随机值的并行映射(模拟您的并行哈希映射):

scala> myMap
res14: scala.collection.parallel.immutable.ParMap[Int,Int] = ParMap(66978401 -> -1331298976, 256964068 -> 126442706, 1698061835 -> 1622679396, -1556333580 -> -1737927220, 791194343 -> -591951714, -1907806173 -> 365922424, 1970481797 -> 162004380, -475841243 -> -445098544, -33856724 -> -1418863050, 1851826878 -> 64176692, 1797820893 -> 405915272, -1838192182 -> 1152824098, 1028423518 -> -2124589278, -670924872 -> 1056679706, 1530917115 -> 1265988738, -808655189 -> -1742792788, 873935965 -> 733748120, -1026980400 -> -163182914, 576661388 -> 900607992, -1950678599 -> -731236098)

获取一个迭代器并从迭代器创建一个 Stream 并过滤它。在这种情况下,我的谓词只接受(映射的值成员的)对。我想得到 10 个偶数元素,所以我取 10 个元素,只有在我强制它时才会评估:

scala> val mapIterator = myMap.toIterator
mapIterator: Iterator[(Int, Int)] = HashTrieIterator(20)


scala> val r = Stream.continually(mapIterator.next()).filter(_._2 % 2 == 0).take(10)
r: scala.collection.immutable.Stream[(Int, Int)] = Stream((66978401,-1331298976), ?)

最后,我强制按计划只获得 10 个元素的评估

scala> r.force
res16: scala.collection.immutable.Stream[(Int, Int)] = Stream((66978401,-1331298976), (256964068,126442706), (1698061835,1622679396), (-1556333580,-1737927220), (791194343,-591951714), (-1907806173,365922424), (1970481797,162004380), (-475841243,-445098544), (-33856724,-1418863050), (1851826878,64176692))

这样,您只会获得所需的元素数量(无需处理剩余的元素),并且您可以在没有锁、原子或中断的情况下并行化该过程。

请将此与您的解决方案进行比较,看看它是否有任何好处。

于 2011-11-10T00:45:55.747 回答