1

我有一个关于在多线程应用程序中使用地图的问题。假设我们有这样的场景:

  1. 线程接收List<Map<String, Object>>由 Jackson Json 反序列化的 json 数据。
  2. 该线程修改接收到的地图。
  3. 然后将列表放入阻塞队列以供另一个线程使用。

如您所见, map 仅由单个线程修改,但随后它“变为”只读(没有变化,只是不再修改)并传递给另一个线程。接下来,当我研究HasMap(also TreeMap) 和ConcurrentHashMap的实现时,后者有volatile字段,而前两个没有。那么,在这种情况下我应该使用哪个实现MapConcurrentHashMap是矫枉过正的选择还是由于线程间传输而必须使用?

我的简单测试表明,HashMap/TreeMap当它们被同步修改并且有效时我可以使用,但是我的结论或我的测试代码可能是错误的:

def map = new TreeMap() // or HashMap
def start = new CountDownLatch(1)
def threads = (1..5)
println("Threads: " + threads)
def created = new CountDownLatch(threads.size())
def completed = new CountDownLatch(threads.size())
threads.each {i ->
    new Thread({
        def from = i * 10
        def to = from + 10
        def local = (from..to)
        println(Thread.currentThread().name + " " + local)
        created.countDown()
        start.await()
        println('Mutating by ' + local)
        local.each {number ->
            synchronized (map) {
                map.put(number, ThreadLocalRandom.current().nextInt())
            }
            println(Thread.currentThread().name + ' added ' + number +  ': ' + map.keySet())
        }
        println 'Done: ' + Thread.currentThread().name
        completed.countDown()
    }).start()
}

created.await()
start.countDown()
completed.await()
println('Completed:')
map.each { e ->
    println('' + e.key + ': ' + e.value)
}

主线程产生 5 个同步更新公共地图的子线程,当它们完成主线程成功看到子线程的所有更新时。

4

2 回答 2

2

这些java.util.concurrent类对排序有特殊保证:

内存一致性效果:与其他并发集合一样,线程中的操作在将对象放入BlockingQueue 之前的操作之前发生在BlockingQueue从另一个线程中访问或删除该元素之后的操作。

这意味着您可以自由使用任何类型的可变对象并根据需要对其进行操作,然后将其放入队列中。检索到它时,您应用的所有操作都将可见。

(更一般地请注意,您演示的那种测试只能证明缺乏安全性;在大多数实际情况下,非同步代码在 99% 的情况下都可以正常工作。最后 1% 会咬你。)

于 2020-09-18T09:06:26.997 回答
1

这个问题的范围很广。

你原来的场景

你说 :

[A] map 仅由单线程修改,但随后“变为”只读

棘手的部分是“然后”这个词。当你,程序员说“那么”时,你指的是“时钟时间”,例如我已经做了这个,现在做那个。但是由于各种各样的原因,计算机不会以这种方式“思考”(执行代码)。之前发生的事情和之后发生的事情需要“手动同步”,以便计算机以我们看到的方式看待世界。

这就是 Java 内存模型表达内容的方式:如果您希望您的对象在并发环境中以可预测的方式运行,您必须确保建立“发生在之前”的边界。

在 java 代码中的关系之前,有一些事情会发生。稍微简化一下,仅举几例:

  • 单个线程中的执行顺序(如果语句 1 和 2 由同一线程按该顺序执行,则语句 2 始终可以看到语句 1 所做的任何事情)
  • 当线程 t1 start()s t2 时,t1 在启动 t2 之前所做的一切都对 t2 可见。互惠互利join()
  • synchronized对象监视器也是如此:同步块内的线程所做的每一个动作都可以被在同一实例上同步的另一个线程看到
  • 任何专门的java.util.concurrent类方法也是如此。例如锁和信号量,当然还有集合:如果你把一个元素放在一个同步集合中,把它拉出来的线程在把它放进去的线程上有一个happens-before。
  • 如果 T2 与 T1 发生过,如果 T3 与 T2 发生过一次,那么 T3 也与 T1 发生过。

所以回到你的短语

然后它“变成”只读的

它确实变成只读的。但要让电脑看到,你得给“then”一个意思;也就是说:你必须happen before relationship在你的代码中加入一个。

后来你说:

然后将列表放入阻塞队列

一个java.util.concurrent队列?多么整洁!碰巧的是,从并发队列中拉出对象的线程与将所述对象放入队列的线程具有“发生之前”的关系。

你已经建立了关系。将对象放入队列的线程(之前)所做的所有突变对于将其拉出的线程都是安全可见的。在这种情况下,您不需要 a ConcurrentHashMap(当然,如果没有其他线程改变相同的数据)。

您的示例代码

您的示例代码不使用队列。它会改变由多个线程修改的单个地图(而不是您的场景提到的相反方式)。所以,只是……不一样。但无论哪种方式,你的代码都很好。

访问地图的线程这样做:

synchronized (map) {
    map.put(number, ThreadLocalRandom.current().nextInt())
}

提供 1)线程的synchornize互斥和 2)a 发生在之前。因此,进入同步的每个线程都会在另一个线程中看到所有“之前发生的”,该线程也同步了它(这是所有线程)。

所以这里没有问题。

然后你的主线程会:

completed.await()
println('Completed:')
map.each { e ->
   println('' + e.key + ': ' + e.value)
}

在这里拯救你的是completed.await(). 这为每个调用的线程建立了一个发生之前countDown(),这是所有线程。所以你的主线程可以看到工作线程所做的一切。一切皆好。

除了......我们经常忘记检查线程的引导。工作人员第一次在地图实例上同步时,以前没有人这样做过。我们怎么能确定他们看到了一个完全初始化并准备好的地图实例。

好吧,有两个原因:

  1. 您在调用之前初始化地图实例thread.start(),这会在之前建立一个发生。这样就够了
  2. 在您的工作线程中,您还可以在开始工作之前使用闩锁,然后再次建立关系。

你是双重安全的。

于 2020-09-18T12:13:14.620 回答