16

我最近阅读了 Paul McKenney 的 2010 年白皮书“内存屏障:软件黑客的硬件视图”

我非常感谢关于下面给出的一小部分 C 代码的反馈/评论/指导,这些代码实现了 M&S 队列入队功能,特别是在内存和编译器障碍方面。

这段代码使用指针-计数器对来处理ABA,为了这篇文章,应该被认为是为 x86/x64 编写的,并且只为 x86/x64 编写。

内联评论现在都是为这篇文章写的,并且是这篇文章的一部分,因为它们表达了我目前的想法。

为简洁起见,我已经剥离了结构中的断言代码和缓存行填充等。

目前,我认为代码很糟糕,但我想确保我这样认为是出于正确的原因。


#define PAC_SIZE 2

struct lfds_queue_element
{
  struct lfds_queue_element
    *volatile next[PAC_SIZE];

  void
    *user_data;
};

struct lfds_queue_state
{
  struct lfds_queue_element
    *volatile enqueue[PAC_SIZE];

  struct lfds_queue_element
    *volatile dequeue[PAC_SIZE];

  atom_t volatile
    aba_counter;
};

void lfds_queue_internal_dcas_pac_enqueue( struct lfds_queue_state *lqs, struct lfds_queue_element *lqe )
{
  ALIGN(ALIGN_DOUBLE_POINTER) struct lfds_queue_element
    *local_lqe[PAC_SIZE], *enqueue[PAC_SIZE], *next[PAC_SIZE];
  unsigned char cas_result = 0;
  unsigned int backoff_iteration = 1;

  /* TRD : here we have been passed a new element to place
           into the queue; we initialize it and its next
           pointer/counter pair
  */

  local_lqe[POINTER] = lqe;
  local_lqe[COUNTER] = (struct lfds_queue_element *) lfds_abstraction_atomic_increment( &lqs->aba_counter );

  local_lqe[POINTER]->next[POINTER] = NULL;
  local_lqe[POINTER]->next[COUNTER] = (struct lfds_queue_element *) lfds_abstraction_atomic_increment( &lqs->aba_counter );

  /* TRD : now, I think there is a issue here, in that these values
           are by no means yet necessarily visible to other cores

           however, they only need to be visible once
           the element has entered the queue, and for that
           to happen, the contigious double-word CAS must
           have occurred - and on x86/x64, this carries
           with it an mfence

           however, that mfence will only act to empty our
           local store buffer - it will not cause other cores
           to flush their invalidation queues, so surely
           it can all still go horribly wrong?

           ah, but if all other cores are only accessing
           these variables using atomic operations, they
           too will be issuing mfences and so at that
           point flushing their invalidate queues
  */

  do
  {
    enqueue[COUNTER] = lqs->enqueue[COUNTER];
    enqueue[POINTER] = lqs->enqueue[POINTER];

    next[COUNTER] = enqueue[POINTER]->next[COUNTER];
    next[POINTER] = enqueue[POINTER]->next[POINTER];

    /* TRD : now, this is interesting

             we load the enqueue pointer and its next pointer
             we then (immediately below) check to see they're unchanged

             but this check is totally bogus!  we could be reading
             old values from our cache, where our invalidate queue
             has not been processed, so the initial read contains
             old data *and* we then go ahead and check from our cache
             the same old values a second time

             what's worse is that I think we could also read the correct
             values for enqueue but an incorrect (old) value for its
             next pointer...!

             so, in either case, we easily mistakenly pass the if()
             and then enter into code which does things to the queue

             now, in both cases, the CAS will mfence, which will
             cause us to see from the data structure the true
             values, but how much will that help us - we need
             to look to see what is actually being done

             the if() checks next[POINTER] is NULL

             if we have read a NULL for next, then we think
             the enqueue pointer is correcly placed (it's not
             lagging behind) so we don't need to help; we then
             try to add our element to the end of the queue

             now, it may be we have read enqueue properly but
             next improperly and so we now try to add our element
             where it will in fact truncate the queue!

             the CAS however will mfence and so at this point
             we will actually see the correct value for enqueue-next,
             and this will prevent that occurring

             if we look now at the else clause, here we have seen
             that enqueue->next is not NULL, so the enqueue pointer
             is out of place and we need to help, which we do by
             moving it down the queue

             here though we could have read enqueue correctly
             but next incorrectly; the CAS will mfence, which will
             update the cache, but since we're only comparing
             the enqueue pointer with our copy of the enqueue
             pointer, the fact our next pointer is wrong won't
             change!  so here, we move the enqueue pointer to
             some old element - which although it might be in the
             queue (so this would be an inefficiency, you'd have
             to do a bunch more queue walking to get the enqueue
             pointer to the final element) it might not be, too!
             it could in the meantime have been dequeued and
             that of course would be death
    */

    if( lqs->enqueue[POINTER] == enqueue[POINTER] and lqs->enqueue[COUNTER] == enqueue[COUNTER] )
    {
      if( next[POINTER] == NULL )
      {
        local_lqe[COUNTER] = next[COUNTER] + 1;
        cas_result = lfds_abstraction_atomic_dcas_with_backoff( (atom_t volatile *) enqueue[POINTER]->next, (atom_t *) local_lqe, (atom_t *) next, &backoff_iteration );
      }
      else
      {
        next[COUNTER] = enqueue[COUNTER] + 1;
        lfds_abstraction_atomic_dcas( (atom_t volatile *) lqs->enqueue, (atom_t *) next, (atom_t *) enqueue );
      }
    }
  }
  while( cas_result == 0 );

  local_lqe[COUNTER] = enqueue[COUNTER] + 1;

  lfds_abstraction_atomic_dcas( (atom_t volatile *) lqs->enqueue, (atom_t *) local_lqe, (atom_t *) enqueue );

  return;
}
4

1 回答 1

4

CAS 是原子的,因此如果一个线程成功而另一个线程尝试,则另一个线程将失败并重试。

只有当所有线程都尝试使用相同的机制访问相同的内存时,它才有效,也就是说,它们都使用 CAS 访问。如果他们不这样做,与 CAS 相关的保证(在本例中为内存栅栏)就会消失。

于 2012-09-17T00:24:41.650 回答