3

我有 32 个机器线程和一个ConcurrentHashMap<Key,Value> map,其中包含很多键。Key定义了一个公共方法visit()。我想visit()使用我可用的处理能力和可能的某种线程池来精确地映射每个元素。

我可以尝试的事情:

  • 我可以使用该方法map.keys()。结果Enumeration<Key>可以通过 using 进行迭代nextElement(),但由于调用key.visit()非常简短,我无法让线程保持忙碌。枚举本质上是单线程的。
  • 我可以使用 synchronizedHashSet<Key>代替,调用一个方法toArray()并将数组上的工作拆分为所有 32 个线程。我严重怀疑此解决方案,因为该方法toArray()可能是单线程瓶颈。
  • 我可以尝试继承 from ConcurrentHashMap,获取其内部实例Segment<K,V>,尝试将它们分成 32 个组并分别处理每个组。不过,这听起来像是一种硬核方法。
  • 或类似的魔法Enumeration<Key>

理想情况下:

  • 理想情况下,aConcurrentHashMap<Key, Value>会定义一个方法keysEnumerator(int approximatePosition),这可能会让我失去一个缺少大约前 1/32 个元素的枚举器,即map.keysEnumerator(map.size()/32)

我错过了什么明显的东西吗?有没有人遇到过类似的问题?

编辑

我已经进行了分析,看看这个问题是否真的会影响实践中的性能。由于目前我无法访问集群,因此我使用笔记本电脑并尝试将结果外推到更大的数据集。visit()在我的机器上,我可以创建一个 200 万个键的 ConcurrentHashMap,并且在每个键上调用该方法需要大约 1 秒的时间来迭代它。该程序应该扩展到 8500 万键(及以上)。集群的处理器稍微快一些,但它仍然需要大约 40 秒来迭代整个地图。现在谈谈程序的逻辑流程。呈现的逻辑是顺序的,即在上一步中的所有线程完成之前,不允许任何线程进行下一步:

  1. 创建哈希映射,创建键并填充哈希映射
  2. 遍历访问所有键的整个哈希映射。
  3. 做一些数据混洗,即并行插入和删除。
  4. 重复步骤 2 和 3 数百次。

该逻辑流程意味着 40 秒的迭代将重复数百次,例如 100 次。这让我们仅在访问节点上花费了一个多小时。使用一组 32 个并行迭代器,它可以缩短到几分钟,这是一个显着的性能改进。

现在谈谈如何ConcurrentHashMap工作(或者我相信它是如何工作的)。每个ConcurrentHashMap由段组成(默认为 16)。对哈希映射的每次写入都会在相关段上同步。假设我们正在尝试将两个新键 k1 和 k2 写入哈希映射,并且它们将被解析为属于同一段,例如 s1。如果尝试同时写入它们,则其中一个将首先获取锁,然后再添加另一个。两个元素被解析为属于同一段的机会是多少?如果我们有一个好的散列函数和 16 个段,那么它就是 1/16。

我相信ConcurrentHashMap应该有一个方法concurrentKeys(),它将返回一个枚举数组,每个段一个。我有一些想法如何ConcurrentHashMap通过继承添加它,如果我成功了,我会告诉你的。就目前而言,解决方案似乎是创建一个 ConcurrentHashMaps 数组并预​​先散列每个键以解析为此类数组的一个成员。一旦准备好,我也会分享该代码。

编辑

这是不同语言的相同问题:

并行迭代器

4

3 回答 3

3

我可以尝试从 ConcurrentHashMap 继承,掌握其内部 Segment 的实例,尝试将它们分成 32 个组并分别处理每个组。不过,这听起来像是一种硬核方法。

确实是铁杆,但关于我唯一能看到的东西。 toArray()通过进行枚举来构建数组,因此在那里没有胜利。除非运行与其他地图操作的比率非常高,否则我无法相信同步HashSet会更好。visit()

使用Segments 的问题是你必须非常小心你的代码是有弹性的,因为我假设其他线程可能在你访问节点的同时改变表,你需要避免不可避免的比赛条件。细腻肯定。

我心中最大的问题是这是否有必要?探查器或计时运行是否向您显示这visit()对一个线程中的每个键都花费了太长时间?您是否尝试过为每个visit()调用创建一个线程池,并让一个线程进行枚举,而池线程则执行visit()

于 2013-11-08T13:57:23.060 回答
2

如果我是你,我会尝试迭代第一个键集ConcurrentHashMap。您可以尝试将键的处理传递给线程池(如果任务太轻,则以捆绑的形式),甚至传递给 ForkJoin 任务,但只有在确实有必要时才应该这样做。

话虽如此,您可以使用 a ConcurrentSkipListMap,在其中您可以获得 a NavigableSetof 键。然后,您可以使用该subSet方法从中取出分区。但是,对于,操作ConcurrentHashMap会有更好的性能(还要注意它会使用而不是)。这种情况更好的情况似乎不太可能。putgetcompareTohashCode

于 2013-11-08T14:11:05.823 回答
0

我最终会采用的解决方案是一个数组ConcurrentHashMaps而不是一个ConcurrentHashMap。这是临时的,但似乎与我的用例有关。我不在乎第二步的速度很慢,因为它不会影响我的代码的性能。解决方案是:

对象创建:

  1. 创建一个大小为 t 的 ConcurrentHashMaps 数组,其中 t 是线程数。
  2. 创建一个大小为 t 的 Runnables 数组。

数组填充(单线程,不是问题):

  1. 创建键并应用 pre-hash 函数,它将返回 0 ... t-1 范围内的 int。在我的情况下,只需模 t。
  2. 通过访问数组中的适当条目,将键放入哈希图中。例如,如果预哈希导致索引为 4,则使用 hashArray[4].put(key)

数组迭代(很好的多线程,性能提升):

  1. 为 Runnables 数组中的每个线程分配一个使用相应索引遍历 hashmap 的作业。与单线程相比,这应该会给予更短的迭代时间。

要查看概念验证代码(因为它有一些来自项目的依赖项,我无法在此处发布)前往 我在 github 上的项目

编辑

实际上,为我的系统实施上述概念证明已被证明是耗时、容易出错且令人非常失望的。此外,我发现我会错过标准库 ConcurrentHashMap 的许多功能。我最近一直在探索的解决方案是使用 Scala,它看起来不那么特别而且更有希望,它产生的字节码可以与 Java 完全互操作。概念证明依赖于本文中描述的令人惊叹的库和 AFAIK,鉴于标准库和相应第三方库的当前状态,目前不可能在不编写数千行代码的情况下在 vanilla Java 中实现相应的解决方案。

import scala.collection.parallel.mutable.ParHashMap

class Node(value: Int, id: Int){
    var v = value
    var i = id
    override def toString(): String = v toString
}

object testParHashMap{
    def visit(entry: Tuple2[Int, Node]){
        entry._2.v += 1
    }
    def main(args: Array[String]){
        val hm = new ParHashMap[Int, Node]()
        for (i <- 1 to 10){
            var node = new Node(0, i)
            hm.put(node.i, node)
        }

        println("========== BEFORE ==========")
        hm.foreach{println}

        hm.foreach{visit}

        println("========== AFTER ==========")
        hm.foreach{println}

    }
}
于 2013-11-09T16:58:24.807 回答