好的,所以我试图在 Promela 中模拟一个CLH-RW 锁。
锁的工作方式很简单,真的:
队列由 . 组成,tail
读取者和写入者都将一个包含单个节点的节点排入队列,bool succ_must_wait
他们通过创建一个新节点并使用tail
.
尾部因此成为节点的前任,pred
。
然后他们旋转等待pred.succ_must_wait
直到它是false
。
读者首先增加一个读者计数器ncritR
,然后将自己的标志设置为false
,允许多个读者同时在临界区。释放一个读锁仅仅意味着ncritR
再次递减。
作家等到ncritR
达到零,然后进入临界区。false
在释放锁之前,它们不会将其标志设置为。
不过,我有点努力在 promela 中建模。
我当前的尝试(见下文)尝试使用数组,其中每个节点基本上由许多数组条目组成。
这失败了,因为让我们说A
自己入队,然后B
自己入队。然后队列将如下所示:
S <- A <- B
S
哨兵节点在哪里。
现在的问题是,当A
运行到完整性并重新入队时,队列看起来像
S <- A <- B <- A'
在实际执行中,这绝对没问题,因为A
和A'
是不同的节点对象。并且由于A.succ_must_wait
将已设置为首次释放false
时A
的锁,B
最终会取得进展,因此A'
最终也会取得进展。
但是,在下面的基于数组的 promela 模型中发生的情况是A
和A'
占据相同的数组位置,导致错过已释放锁B
的事实,从而在(错误地)等待而不是等待(正确)对于。A
B
A'
A
A'
B
对此的一个可能的“解决方案”可能是A
等到B
确认发布。但这并不适用于锁的工作方式。
另一个“解决方案”是等待一个 CHANGE in pred.succ_must_wait
,其中一个 release 会增加succ_must_wait
,而不是重置它到0
.
但是我打算对锁的一个版本进行建模,其中pred
可能会发生变化(即,一个节点可能被允许忽略它的一些前辈),而且我并不完全相信像增加版本这样的东西不会引起问题有了这个变化。
那么在 promela 中对像这样的隐式队列进行建模的“最智能”方法是什么?
/* CLH-RW Lock */
/*pid: 0 = init, 1-2 = reader, 3-4 = writer*/
ltl liveness{
([]<> reader[1]@progress_reader)
&& ([]<> reader[2]@progress_reader)
&& ([]<> writer[3]@progress_writer)
&& ([]<> writer[4]@progress_writer)
}
bool initialised = 0;
byte ncritR;
byte ncritW;
byte tail;
bool succ_must_wait[5]
byte pred[5]
init{
assert(_pid == 0);
ncritR = 0;
ncritW = 0;
/*sentinel node*/
tail =0;
pred[0] = 0;
succ_must_wait[0] = 0;
initialised = 1;
}
active [2] proctype reader()
{
assert(_pid >= 1);
(initialised == 1)
do
:: else ->
succ_must_wait[_pid] = 1;
atomic {
pred[_pid] = tail;
tail = _pid;
}
(succ_must_wait[pred[_pid]] == 0)
ncritR++;
succ_must_wait[_pid] = 0;
atomic {
/*freeing previous node for garbage collection*/
pred[_pid] = 0;
}
/*CRITICAL SECTION*/
progress_reader:
assert(ncritR >= 1);
assert(ncritW == 0);
ncritR--;
atomic {
/*necessary to model the fact that the next access creates a new queue node*/
if
:: tail == _pid -> tail = 0;
:: else ->
fi
}
od
}
active [2] proctype writer()
{
assert(_pid >= 1);
(initialised == 1)
do
:: else ->
succ_must_wait[_pid] = 1;
atomic {
pred[_pid] = tail;
tail = _pid;
}
(succ_must_wait[pred[_pid]] == 0)
(ncritR == 0)
atomic {
/*freeing previous node for garbage collection*/
pred[_pid] = 0;
}
ncritW++;
/* CRITICAL SECTION */
progress_writer:
assert(ncritR == 0);
assert(ncritW == 1);
ncritW--;
succ_must_wait[_pid] = 0;
atomic {
/*necessary to model the fact that the next access creates a new queue node*/
if
:: tail == _pid -> tail = 0;
:: else ->
fi
}
od
}