让我们实现一个同步屏障。它必须预先知道它将处理的线程数n。在前n - 1次调用sync
屏障期间,将导致调用线程等待。调用号n将唤醒所有线程。
class Barrier
def initialize(count)
@mutex = Mutex.new
@cond = ConditionVariable.new
@count = count
end
def sync
@mutex.synchronize do
@count -= 1
if @count > 0
@cond.wait @mutex
else
@cond.broadcast
end
end
end
end
整体sync
是一个临界区,即不能由两个线程同时执行。因此调用Mutex#synchronize
.
当 的减小值为@count
正时,线程被冻结。将互斥锁作为参数传递给调用ConditionVariable#wait
对于防止死锁至关重要。它导致互斥锁在冻结线程之前被解锁。
一个简单的实验启动 1k 个线程并让它们将元素添加到数组中。首先,它们添加零,然后它们同步并添加一。预期的结果是一个有 2k 个元素的排序数组,其中 1k 个是 0,1k 个是 1。
mtx = Mutex.new
arr = []
num = 1000
barrier = Barrier.new num
num.times.map do
Thread.start do
mtx.synchronize { arr << 0 }
barrier.sync
mtx.synchronize { arr << 1 }
end
end .map &:join;
# Prints true. See it break by deleting `barrier.sync`.
puts [
arr.sort == arr,
arr.count == 2 * num,
arr.count(&:zero?) == num,
arr.uniq == [0, 1],
].all?
事实上,有一个名为 barrier 的 gem和我上面描述的完全一样。
最后一点,在这种情况下不要使用 sleep 来等待。这称为忙等待,被认为是一种不好的做法。