我最近阅读了 Paul McKenney 的 2010 年白皮书“内存屏障:软件黑客的硬件视图”。
我非常感谢关于下面给出的一小部分 C 代码的反馈/评论/指导,这些代码实现了 M&S 队列入队功能,特别是在内存和编译器障碍方面。
这段代码使用指针-计数器对来处理ABA,为了这篇文章,应该被认为是为 x86/x64 编写的,并且只为 x86/x64 编写。
内联评论现在都是为这篇文章写的,并且是这篇文章的一部分,因为它们表达了我目前的想法。
为简洁起见,我已经剥离了结构中的断言代码和缓存行填充等。
目前,我认为代码很糟糕,但我想确保我这样认为是出于正确的原因。
#define PAC_SIZE 2
struct lfds_queue_element
{
struct lfds_queue_element
*volatile next[PAC_SIZE];
void
*user_data;
};
struct lfds_queue_state
{
struct lfds_queue_element
*volatile enqueue[PAC_SIZE];
struct lfds_queue_element
*volatile dequeue[PAC_SIZE];
atom_t volatile
aba_counter;
};
void lfds_queue_internal_dcas_pac_enqueue( struct lfds_queue_state *lqs, struct lfds_queue_element *lqe )
{
ALIGN(ALIGN_DOUBLE_POINTER) struct lfds_queue_element
*local_lqe[PAC_SIZE], *enqueue[PAC_SIZE], *next[PAC_SIZE];
unsigned char cas_result = 0;
unsigned int backoff_iteration = 1;
/* TRD : here we have been passed a new element to place
into the queue; we initialize it and its next
pointer/counter pair
*/
local_lqe[POINTER] = lqe;
local_lqe[COUNTER] = (struct lfds_queue_element *) lfds_abstraction_atomic_increment( &lqs->aba_counter );
local_lqe[POINTER]->next[POINTER] = NULL;
local_lqe[POINTER]->next[COUNTER] = (struct lfds_queue_element *) lfds_abstraction_atomic_increment( &lqs->aba_counter );
/* TRD : now, I think there is a issue here, in that these values
are by no means yet necessarily visible to other cores
however, they only need to be visible once
the element has entered the queue, and for that
to happen, the contigious double-word CAS must
have occurred - and on x86/x64, this carries
with it an mfence
however, that mfence will only act to empty our
local store buffer - it will not cause other cores
to flush their invalidation queues, so surely
it can all still go horribly wrong?
ah, but if all other cores are only accessing
these variables using atomic operations, they
too will be issuing mfences and so at that
point flushing their invalidate queues
*/
do
{
enqueue[COUNTER] = lqs->enqueue[COUNTER];
enqueue[POINTER] = lqs->enqueue[POINTER];
next[COUNTER] = enqueue[POINTER]->next[COUNTER];
next[POINTER] = enqueue[POINTER]->next[POINTER];
/* TRD : now, this is interesting
we load the enqueue pointer and its next pointer
we then (immediately below) check to see they're unchanged
but this check is totally bogus! we could be reading
old values from our cache, where our invalidate queue
has not been processed, so the initial read contains
old data *and* we then go ahead and check from our cache
the same old values a second time
what's worse is that I think we could also read the correct
values for enqueue but an incorrect (old) value for its
next pointer...!
so, in either case, we easily mistakenly pass the if()
and then enter into code which does things to the queue
now, in both cases, the CAS will mfence, which will
cause us to see from the data structure the true
values, but how much will that help us - we need
to look to see what is actually being done
the if() checks next[POINTER] is NULL
if we have read a NULL for next, then we think
the enqueue pointer is correcly placed (it's not
lagging behind) so we don't need to help; we then
try to add our element to the end of the queue
now, it may be we have read enqueue properly but
next improperly and so we now try to add our element
where it will in fact truncate the queue!
the CAS however will mfence and so at this point
we will actually see the correct value for enqueue-next,
and this will prevent that occurring
if we look now at the else clause, here we have seen
that enqueue->next is not NULL, so the enqueue pointer
is out of place and we need to help, which we do by
moving it down the queue
here though we could have read enqueue correctly
but next incorrectly; the CAS will mfence, which will
update the cache, but since we're only comparing
the enqueue pointer with our copy of the enqueue
pointer, the fact our next pointer is wrong won't
change! so here, we move the enqueue pointer to
some old element - which although it might be in the
queue (so this would be an inefficiency, you'd have
to do a bunch more queue walking to get the enqueue
pointer to the final element) it might not be, too!
it could in the meantime have been dequeued and
that of course would be death
*/
if( lqs->enqueue[POINTER] == enqueue[POINTER] and lqs->enqueue[COUNTER] == enqueue[COUNTER] )
{
if( next[POINTER] == NULL )
{
local_lqe[COUNTER] = next[COUNTER] + 1;
cas_result = lfds_abstraction_atomic_dcas_with_backoff( (atom_t volatile *) enqueue[POINTER]->next, (atom_t *) local_lqe, (atom_t *) next, &backoff_iteration );
}
else
{
next[COUNTER] = enqueue[COUNTER] + 1;
lfds_abstraction_atomic_dcas( (atom_t volatile *) lqs->enqueue, (atom_t *) next, (atom_t *) enqueue );
}
}
}
while( cas_result == 0 );
local_lqe[COUNTER] = enqueue[COUNTER] + 1;
lfds_abstraction_atomic_dcas( (atom_t volatile *) lqs->enqueue, (atom_t *) local_lqe, (atom_t *) enqueue );
return;
}