4

下面是一个简化的 C 程序,它演示了我在使用 GNU 内置比较和交换在英特尔 cpu 上实现的并发堆栈时遇到的问题。我花了一段时间才明白发生了什么,但现在我明白了,它完全在原子比较和交换提供的保证范围内。

当一个节点从堆栈中弹出,修改,然后放回堆栈时,修改后的值可能成为堆栈的新头部,从而破坏它。test_get 中的注释描述了导致这种情况的事件顺序。

有没有办法多次可靠地使用具有相同堆栈的相同节点?这是一个夸张的例子,但即使将未修改的节点返回给 gHead 也可能泄漏其他节点。这种数据结构的初衷是重复使用相同的节点。

typedef struct test_node {
    struct test_node *next;
    void *data;
} *test_node_p;

test_node_p gHead = NULL;
unsigned gThreadsDone = 0;

void test_put( test_node_p inMemory ) {
    test_node_p scratch;

    do {
        scratch = gHead;
        inMemory->next = scratch;
    } while ( !__sync_bool_compare_and_swap( &gHead , scratch , inMemory ) );
}

test_node_p test_get( void ) {
    test_node_p result;
    test_node_p scratch;

    do {
        result = gHead;
        if ( NULL == result ) break;
        //  other thread acquires result, modifies next
        scratch = result->next;     //  scratch is 0xDEFACED...
        //  other thread returns result to gHead
    } while ( !__sync_bool_compare_and_swap( &gHead , result , scratch ) );
    //  this thread corrupts gHead with 0xDEFACED... value

    if ( NULL == result ) {
        result = (test_node_p)malloc( sizeof(struct test_node) );
    }

    return result;
}

void *memory_entry( void *in ) {
    test_node_p node;
    int index , count = 1000;
    for ( index = 0 ; index < count ; ++index ) {
        node = test_get();
        *(uint64_t *)(node) |= 0xDEFACED000000000ULL;
        test_put( node );
    }

    __sync_add_and_fetch(&gThreadsDone,1);

    return NULL;
}

void main() {
    unsigned    index , count = 8;
    pthread_t   thread;

    gThreadsDone = 0;

    for ( index = 0 ; index < count ; ++index ) {
        pthread_create( &thread , NULL , memory_entry , NULL );
        pthread_detach( thread );
    }

    while ( __sync_add_and_fetch(&gThreadsDone,0) < count ) {}
}

我正在使用 8 个逻辑核心运行此测试。当我只使用 4 个线程时,问题很少发生,但使用 8 个线程很容易重现。我有一台配备 Intel Core i7 的 MacBook。

我对使用一些解决了这个问题的库或框架不感兴趣,我想知道它是如何解决的。谢谢你。

编辑:

这里有两个在我的情况下不起作用的解决方案。

一些体系结构提供了 ll/sc 指令对,它们对地址而不是值执行原子测试。对该地址的任何写入,即使是相同的值,也会阻止成功。

一些架构提供大于指针大小的比较和交换。有了这个,单调计数器与指针配对,每次使用时都会以原子方式递增,因此值总是会变化。一些英特尔芯片支持这一点,但没有 GNU 包装器。

这是一个问题。两个线程,A 和 B,到达test_get它们具有相同值的点,result而不是NULL。然后发生以下顺序:

  1. 线程 A 通过比较和交换并resulttest_get
  2. 线程A修改内容result
  3. 线程 B 取消引用result,得到任何线程 A 放在那里
  4. 线程 A 结束result并调用test_put
  5. 线程 A 通过比较和交换test_put将结果放回gHead
  6. 线程 B 到达比较并换入test_get并通过
  7. 线程 B 现在已经被gHead线程 A 的值破坏了

这是一个类似的场景,其中三个线程不需要线程 A 进行修改result

  1. 线程 A 通过比较和交换并resulttest_get
  2. 线程A不修改内容result
  3. 线程 B 取消引用result,在scratch
  4. 线程 C 调用test_put不相关的值并成功
  5. 线程A调用test_put并成功result放回gHead
  6. 线程 B 到达比较并换入test_get并通过
  7. 线程 B 现在已gHead通过泄漏任何线程 C 添加的内容而损坏

在任何一种情况下,问题都是线程 A 通过了比较和交换两次,一次是 get,然后是 put,在线程 B 到达比较和交换以获得 get 之前。线程 B 的任何从头开始的值都应该被丢弃,但这并不是因为 gHead 中的值看起来是正确的。

任何使这种可能性降低而不实际阻止它的解决方案只会使错误更难追踪。

请注意,临时变量只是在原子指令开始之前放置结果的取消引用值的位置的名称。删除名称确实会删除可能被中断的取消引用和比较之间的时间片。

另请注意,原子意味着它作为一个整体成功或失败。指针的任何对齐读取在相关硬件上都是隐式原子的。就其他线程而言,不存在只读取一半指针的可中断点。

4

3 回答 3

4

永远不要通过简单的评估访问原子变量。此外,我认为,对于像你这样的比较和交换循环,__sync_val_compare_and_swap更方便。

/* read the head atomically */
result = __sync_val_compare_and_swap(&gHead, 0, 0);
/* compare and swap until we succeed placing the next field */
while ((oldval = result)) {
   result = __sync_val_compare_and_swap(&gHead, result, result->next);
   if (oldval == result) break;
}
于 2013-06-22T07:25:45.497 回答
2

(我放弃了我之前的答案。)

问题是您没有原子读取gHeadand的机制gHead->next,但这是实现无锁堆栈所必需的。由于您打算忙循环来处理比较和交换冲突,您可以使用相当于自旋锁:

void lock_get () {
    while (!_sync_bool_compare_and_swap(&gGetLock, 0, 1)) {}
}

void unlock_get () {
    unlock_get_success = _sync_bool_compare_and_swap(&gGetLock, 1, 0);
    assert(unlock_get_success);
}

现在,循环 intest_get()可以被lock_get()and包围unlock_get()。的 CAS 循环test_get()只是一个线程与test_put(). Jens 的 CAS 循环实现看起来更简洁。

lock_get();
result = __sync_val_compare_and_swap(&gHead, 0, 0);
while ((oldval = result)) {
   result = __sync_val_compare_and_swap(&gHead, result, result->next);
   if (oldval == result) break;
}
unlock_get();

这实现了意图,即只有一个线程应该从头部弹出。

于 2013-06-22T07:06:39.657 回答
1

如果您有 CAS 变量(在您的情况下为 gHead)。您必须始终使用 CAS 才能访问它。或者用锁保护它。用于阅读和写作。诸如“结果= gHead;”之类的东西 是一个很大的禁忌。

重新阅读您的问题,后进先出是一个堆栈。实现任何基于 CAS 的数据结构都是基于只有一件事要改变。在堆栈顶部的堆栈中。你似乎在做一个链表。我相信有很酷的方法来做一个原子链表。

但是对于堆栈,像其他人一样做一个堆栈指针:)

于 2013-06-22T07:11:57.343 回答