有谁知道 C 语言中Peterson 的 Lock 算法的良好/正确实现?我似乎找不到这个。谢谢。
2 回答
Peterson 的算法无法在 C99 中正确实现,正如 whoordered memory fences on an x86中所解释的那样。
彼得森算法如下:
LOCK:
interested[id] = 1 interested[other] = 1
turn = other turn = id
while turn == other while turn == id
and interested[other] == 1 and interested[id] == 1
UNLOCK:
interested[id] = 0 interested[other] = 0
这里有一些隐藏的假设。首先,每个线程必须在放弃轮到之前注意它对获取锁的兴趣。放弃轮次必须使我们有兴趣获取锁的其他线程可见。
此外,与每个锁一样,临界区中的内存访问不能超过 lock() 调用,也不能超过 unlock()。即:lock()至少要有acquire语义,unlock()至少要有release语义。
在 C11 中,实现这一点的最简单方法是使用顺序一致的内存顺序,这使得代码运行起来就好像它是按程序顺序运行的线程的简单交错一样(警告:完全未经测试的代码,但它类似于示例在 Dmitriy V'jukov 的Relacy Race Detector中):
lock(int id)
{
atomic_store(&interested[id], 1);
atomic_store(&turn, 1 - id);
while (atomic_load(&turn) == 1 - id
&& atomic_load(&interested[1 - id]) == 1);
}
unlock(int id)
{
atomic_store(&interested[id], 0);
}
这确保编译器不会进行破坏算法的优化(通过在原子操作中提升/下沉加载/存储),并发出适当的 CPU 指令以确保 CPU 也不会破坏算法。未明确选择内存模型的 C11/C++11 原子操作的默认内存模型是顺序一致的内存模型。
C11/C++11 还支持较弱的内存模型,允许尽可能多的优化。以下是Anthony Williams将 Dmitriy V'jukov 在他自己的 Relacy Race Detector [petersons_lock_with_C++0x_atomics] [the-inscrutable-c-memory -模型]。如果这个算法不正确,那是我的错(警告:也是未经测试的代码,但基于 Dmitriy V'jukov 和 Anthony Williams 的优秀代码):
lock(int id)
{
atomic_store_explicit(&interested[id], 1, memory_order_relaxed);
atomic_exchange_explicit(&turn, 1 - id, memory_order_acq_rel);
while (atomic_load_explicit(&interested[1 - id], memory_order_acquire) == 1
&& atomic_load_explicit(&turn, memory_order_relaxed) == 1 - id);
}
unlock(int id)
{
atomic_store_explicit(&interested[id], 0, memory_order_release);
}
注意带有获取和释放语义的交换。交换是原子 RMW 操作。原子 RMW 操作始终读取在 RMW 操作中写入之前存储的最后一个值。此外,从同一原子对象的释放中读取写入的原子对象上的获取(或从执行释放的线程中对该对象的任何后续写入,或从任何原子 RMW 操作的任何后续写入)会创建一个 synchronizes-with释放和获取之间的关系。
所以,这个操作是线程之间的一个同步点,一个线程中的交换和任何线程执行的最后一个交换(或轮流的初始化,对于第一个交换)之间总是存在同步关系。
所以我们在 store tointerested[id]
和 exchange from/to之间有一个先序关系turn
,在两个连续交换 from/to 之间有一个 synchronizes-with 关系,turn
在交换 from/toturn
和 load之间有一个先序关系interested[1 - id]
。这相当于interested[x]
不同线程中的访问之间的先发生关系,并turn
提供线程之间的同步。这会强制执行使算法工作所需的所有排序。
那么在 C11 之前这些事情是怎么做的呢?它涉及使用编译器和特定于 CPU 的魔法。举个例子,让我们看看强排序的 x86。IIRC,所有 x86 负载都具有获取语义,并且所有存储都具有释放语义(在 SSE 中保存非临时移动,精确用于以偶尔需要发出 CPU 栅栏以实现 CPU 之间的一致性为代价来获得更高的性能)。但这对于 Peterson 的算法来说是不够的,正如 Bartosz Milewsky 在
who-ordered-memory-fences-on-an-x86中解释的那样,为了让 Peterson 的算法起作用,我们需要在访问turn
和之间建立一个排序interested
,如果不这样做可能会导致在看到interested[1 - id]
之前写入的负载时interested[id]
,这是一件坏事。
因此,在 GCC/x86 中执行此操作的一种方法是(警告:虽然我测试了类似于以下内容的内容,但实际上是错误实现的彼得森算法的代码的修改版本,但测试远不能保证多线程的正确性代码):
lock(int id)
{
interested[id] = 1;
turn = 1 - id;
__asm__ __volatile__("mfence");
do {
__asm__ __volatile__("":::"memory");
} while (turn == 1 - id
&& interested[1 - id] == 1);
}
unlock(int id)
{
interested[id] = 0;
}
这MFENCE
可以防止存储和加载到不同的内存地址被重新排序。否则,在继续加载时,写入interested[id]
可能会在存储缓冲区中排队interested[1 - id]
。在许多当前的微架构上,aSFENCE
可能就足够了,因为它可以实现为存储缓冲区消耗,但 IIUCSFENCE
不需要以这种方式实现,并且可以简单地防止存储之间的重新排序。所以SFENCE
可能到处都不够,我们需要一个完整的MFENCE
.
编译器屏障 ( __asm__ __volatile__("":::"memory")
) 阻止编译器确定它已经知道 的值turn
。我们告诉编译器我们已经破坏了内存,所以缓存在寄存器中的所有值都必须从内存中重新加载。
PS:我觉得这需要一个结束段落,但我的大脑已经筋疲力尽了。
我不会对实现的好坏做出任何断言,但它已经过测试(简要地)。这是维基百科上描述的算法的直接翻译。
struct petersonslock_t {
volatile unsigned flag[2];
volatile unsigned turn;
};
typedef struct petersonslock_t petersonslock_t;
petersonslock_t petersonslock () {
petersonslock_t l = { { 0U, 0U }, ~0U };
return l;
}
void petersonslock_lock (petersonslock_t *l, int p) {
assert(p == 0 || p == 1);
l->flag[p] = 1;
l->turn = !p;
while (l->flag[!p] && (l->turn == !p)) {}
};
void petersonslock_unlock (petersonslock_t *l, int p) {
assert(p == 0 || p == 1);
l->flag[p] = 0;
};
Greg 指出,在内存一致性稍微宽松的 SMP 架构(例如 x86)上,虽然加载到相同内存位置是有序的,但加载到一个处理器上不同位置的加载对于另一个处理器来说可能会出现乱序。
Jens Gustedt 和 ninjalj 建议修改原始算法以使用该atomic_flag
类型。这意味着设置标志和转弯将使用C11atomic_flag_test_and_set
并清除它们将使用atomic_flag_clear
。或者,可以在对 的更新之间施加内存屏障flag
。
编辑:我最初试图通过写入所有状态的相同内存位置来纠正这一点。ninjalj 指出,按位操作将状态操作变成了 RMW,而不是原始算法的加载和存储。因此,需要原子按位操作。C11 提供了这样的操作符,GCC 也提供了内置的操作符。下面的算法使用 GCC 内建函数,但封装在宏中,以便可以轻松地将其更改为其他一些实现。但是,修改上面的原始算法是首选的解决方案。
struct petersonslock_t {
volatile unsigned state;
};
typedef struct petersonslock_t petersonslock_t;
#define ATOMIC_OR(x,v) __sync_or_and_fetch(&x, v)
#define ATOMIC_AND(x,v) __sync_and_and_fetch(&x, v)
petersonslock_t petersonslock () {
petersonslock_t l = { 0x000000U };
return l;
}
void petersonslock_lock (petersonslock_t *l, int p) {
assert(p == 0 || p == 1);
unsigned mask = (p == 0) ? 0xFF0000 : 0x00FF00;
ATOMIC_OR(l->state, (p == 0) ? 0x000100 : 0x010000);
(p == 0) ? ATOMIC_OR(l->state, 0x000001) : ATOMIC_AND(l->state, 0xFFFF00);
while ((l->state & mask) && (l->state & 0x0000FF) == !p) {}
};
void petersonslock_unlock (petersonslock_t *l, int p) {
assert(p == 0 || p == 1);
ATOMIC_AND(l->state, (p == 0) ? 0xFF00FF : 0x00FFFF);
};