1

我想出了一个想法,我正在尝试实现一个不依赖引用计数来解决 ABA 问题的无锁堆栈,并且还可以正确处理内存回收。它在概念上类似于 RCU,并且依赖于两个功能:将列表条目标记为已删除,以及跟踪遍历列表的读者。前者很简单,它只是使用指针的 LSB。后者是我在实现无限制无锁堆栈的方法上的“聪明”尝试。

基本上,当任何线程尝试遍历列表时,一个原子计数器 (list.entries) 会递增。当遍历完成时,第二个计数器 (list.exits) 会增加。

节点分配由 push 处理,释放由 pop 处理。

push 和 pop 操作与朴素的无锁堆栈实现非常相似,但必须遍历标记为要删除的节点才能到达未标记的条目。因此,推送基本上很像链表插入。

pop 操作类似地遍历列表,但它使用 atomic_fetch_or 在遍历时将节点标记为已删除,直到到达未标记的节点。

在遍历 0 个或多个标记节点的列表后,正在弹出的线程将尝试对堆栈的头部进行 CAS。至少有一个线程并发弹出会成功,在此之后所有进入堆栈的读取器将不再看到以前标记的节点。

成功更新列表的线程然后加载原子 list.entries,并且基本上自旋加载 atomic.exits,直到该计数器最终超过 list.entries。这应该意味着列表的“旧”版本的所有读者都已完成。然后线程简单地释放它从列表顶部交换的标记节点列表。

所以 pop 操作的含义应该是(我认为)不存在 ABA 问题,因为在所有使用它们的并发读者完成之前,被释放的节点不会返回到可用的指针池中,显然内存回收问题出于同样的原因,也会被处理。

所以无论如何,这是理论上的,但我仍然对实现摸不着头脑,因为它目前不起作用(在多线程情况下)。似乎我在免费问题之后得到了一些写作,但我在发现问题时遇到了麻烦,或者我的假设可能有缺陷并且它不起作用。

任何关于概念和调试代码方法的见解都将不胜感激。

这是我当前的(损坏的)代码(使用 gcc -D_GNU_SOURCE -std=c11 -Wall -O0 -g -pthread -o list list.c 编译):

#include <pthread.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdlib.h>

#include <sys/resource.h>

#include <stdio.h>
#include <unistd.h>

#define NUM_THREADS 8
#define NUM_OPS (1024 * 1024)

typedef uint64_t list_data_t;

typedef struct list_node_t {
    struct list_node_t * _Atomic next;
    list_data_t data;
} list_node_t;

typedef struct {
    list_node_t * _Atomic head;
    int64_t _Atomic size;
    uint64_t _Atomic entries;
    uint64_t _Atomic exits;
} list_t;

enum {
    NODE_IDLE    = (0x0),
    NODE_REMOVED = (0x1 << 0),
    NODE_FREED   = (0x1 << 1),
    NODE_FLAGS    = (0x3),
};

static __thread struct {
    uint64_t add_count;
    uint64_t remove_count;
    uint64_t added;
    uint64_t removed;
    uint64_t mallocd;
    uint64_t freed;
} stats;

#define NODE_IS_SET(p, f) (((uintptr_t)p & f) == f)
#define NODE_SET_FLAG(p, f) ((void *)((uintptr_t)p | f))
#define NODE_CLR_FLAG(p, f) ((void *)((uintptr_t)p & ~f))
#define NODE_POINTER(p) ((void *)((uintptr_t)p & ~NODE_FLAGS))

list_node_t * list_node_new(list_data_t data)
{
    list_node_t * new = malloc(sizeof(*new));
    new->data = data;
    stats.mallocd++;

    return new;
}

void list_node_free(list_node_t * node)
{
    free(node);
    stats.freed++;
}

static void list_add(list_t * list, list_data_t data)
{
    atomic_fetch_add_explicit(&list->entries, 1, memory_order_seq_cst);

    list_node_t * new = list_node_new(data);
    list_node_t * _Atomic * next = &list->head;
    list_node_t * current = atomic_load_explicit(next,  memory_order_seq_cst);
    do
    {
        stats.add_count++;
        while ((NODE_POINTER(current) != NULL) &&
                NODE_IS_SET(current, NODE_REMOVED))
        {
                stats.add_count++;
                current = NODE_POINTER(current);
                next = &current->next;
                current = atomic_load_explicit(next, memory_order_seq_cst);
        }
        atomic_store_explicit(&new->next, current, memory_order_seq_cst);
    }
    while(!atomic_compare_exchange_weak_explicit(
            next, &current, new,
            memory_order_seq_cst, memory_order_seq_cst));

    atomic_fetch_add_explicit(&list->exits, 1, memory_order_seq_cst);
    atomic_fetch_add_explicit(&list->size, 1, memory_order_seq_cst);
    stats.added++;
}

static bool list_remove(list_t * list, list_data_t * pData)
{
    uint64_t entries = atomic_fetch_add_explicit(
            &list->entries, 1, memory_order_seq_cst);

    list_node_t * start = atomic_fetch_or_explicit(
            &list->head, NODE_REMOVED, memory_order_seq_cst);
    list_node_t * current = start;

    stats.remove_count++;
    while ((NODE_POINTER(current) != NULL) &&
            NODE_IS_SET(current, NODE_REMOVED))
    {
        stats.remove_count++;
        current = NODE_POINTER(current);
        current = atomic_fetch_or_explicit(&current->next,
                NODE_REMOVED, memory_order_seq_cst);
    }

    uint64_t exits = atomic_fetch_add_explicit(
            &list->exits, 1, memory_order_seq_cst) + 1;

    bool result = false;
    current = NODE_POINTER(current);
    if (current != NULL)
    {
        result = true;
        *pData = current->data;

        current = atomic_load_explicit(
                &current->next, memory_order_seq_cst);

        atomic_fetch_add_explicit(&list->size,
                -1, memory_order_seq_cst);

        stats.removed++;
    }

    start = NODE_SET_FLAG(start, NODE_REMOVED);
    if (atomic_compare_exchange_strong_explicit(
            &list->head, &start, current,
            memory_order_seq_cst, memory_order_seq_cst))
    {
        entries = atomic_load_explicit(&list->entries, memory_order_seq_cst);
        while ((int64_t)(entries - exits) > 0)
        {
            pthread_yield();
            exits = atomic_load_explicit(&list->exits, memory_order_seq_cst);
        }

        list_node_t * end = NODE_POINTER(current);
        list_node_t * current = NODE_POINTER(start);
        while (current != end)
        {
            list_node_t * tmp = current;
            current = atomic_load_explicit(&current->next, memory_order_seq_cst);
            list_node_free(tmp);
            current = NODE_POINTER(current);
        }
    }

    return result;
}

static list_t list;

pthread_mutex_t ioLock = PTHREAD_MUTEX_INITIALIZER;

void * thread_entry(void * arg)
{
    sleep(2);
    int id = *(int *)arg;

    for (int i = 0; i < NUM_OPS; i++)
    {
        bool insert = random() % 2;

        if (insert)
        {
            list_add(&list, i);
        }
        else
        {
            list_data_t data;
            list_remove(&list, &data);
        }
    }

    struct rusage u;
    getrusage(RUSAGE_THREAD, &u);

    pthread_mutex_lock(&ioLock);
    printf("Thread %d stats:\n", id);
    printf("\tadded = %lu\n", stats.added);
    printf("\tremoved = %lu\n", stats.removed);
    printf("\ttotal added = %ld\n", (int64_t)(stats.added - stats.removed));
    printf("\tadded count = %lu\n", stats.add_count);
    printf("\tremoved count = %lu\n", stats.remove_count);
    printf("\tadd average = %f\n", (float)stats.add_count / stats.added);
    printf("\tremove average = %f\n", (float)stats.remove_count / stats.removed);
    printf("\tmallocd = %lu\n", stats.mallocd);
    printf("\tfreed = %lu\n", stats.freed);
    printf("\ttotal mallocd = %ld\n", (int64_t)(stats.mallocd - stats.freed));
    printf("\tutime = %f\n", u.ru_utime.tv_sec
            + u.ru_utime.tv_usec / 1000000.0f);
    printf("\tstime = %f\n", u.ru_stime.tv_sec
                    + u.ru_stime.tv_usec / 1000000.0f);
    pthread_mutex_unlock(&ioLock);

    return NULL;
}

int main(int argc, char ** argv)
{
    struct {
            pthread_t thread;
            int id;
    }
    threads[NUM_THREADS];
    for (int i = 0; i < NUM_THREADS; i++)
    {
        threads[i].id = i;
        pthread_create(&threads[i].thread, NULL, thread_entry, &threads[i].id);
    }

    for (int i = 0; i < NUM_THREADS; i++)
    {
        pthread_join(threads[i].thread, NULL);
    }

    printf("Size = %ld\n", atomic_load(&list.size));

    uint32_t count = 0;

    list_data_t data;
    while(list_remove(&list, &data))
    {
        count++;
    }
    printf("Removed %u\n", count);
}
4

1 回答 1

9

您提到您正在尝试解决 ABA 问题,但描述和代码实际上是在尝试解决一个更难的问题:内存回收问题。

这个问题通常出现在用没有垃圾收集的语言实现的无锁收集的“删除”功能中。核心问题是从共享结构中删除节点的线程通常不知道何时可以安全地释放删除的节点,因为其他读取可能仍然引用它。解决这个问题通常,作为一个副作用,解决了 ABA 问题:这特别是关于 CAS 操作成功,即使底层指针(和对象的状态)在此期间至少更改了两次,最终得到原始,但呈现完全不同的状态。

The ABA problem is easier in the sense that there are several straightforward solutions to the ABA problem specifically that don't lead to a solution to the "memory reclamation" problem. It is also easier in the sense that hardware that can detect the modification of the location, e.g., with LL/SC or transactional memory primitives, might not exhibit the problem at all.

So that said, you are hunting for a solution to the memory reclamation problem, and it will also avoid the ABA problem.

The core of your issue is this statement:

The thread that successfully updates the list then loads the atomic list.entries, and basically spin-loads atomic.exits until that counter finally exceeds list.entries. This should imply that all readers of the "old" version of the list have completed. The thread then simply frees the the list of marked nodes that it swapped off the top of the list.

This logic doesn't hold. Waiting for list.exits (you say atomic.exits but I think it's a typo as you only talk about list.exits elsewhere) to be greater than list.entries only tells you there have now been more total exits than there were entries at the point the mutating thread captured the entry count. However, these exits may have been generated by new readers coming and going: it doesn't at all imply that all the old readers have finished as you claim!

Here's a simple example. First a writing thread T1 and a reading thread T2 access the list around the same time, so list.entries is 2 and list.exits is 0. The writing thread pops an node, and saves the current value (2) of list.entries and waits for lists.exits to be greater than 2. Now three more reading threads, T3, T4, T5 arrive and do a quick read of the list and leave. Now lists.exits is 3, and your condition is met and T1 frees the node. T2 hasn't gone anywhere though and blows up since it is reading a freed node!

The basic idea you have can work, but your two counter approach in particular definitely doesn't work.

This is a well-studied problem, so you don't have to invent your own algorithm (see the link above), or even write your own code since things like librcu and concurrencykit already exist.

For Educational Purposes

If you wanted to make this work for educational purposes though, one approach would be to use ensure that threads coming in after a modification have started use a different set of list.entry/exit counters. One way to do this would be a generation counter, and when the writer wants to modify the list, it increments the generation counter, which causes new readers to switch to a different set of list.entry/exit counters.

Now the writer just has to wait for list.entry[old] == list.exists[old], which means all the old readers have left. You could also just get away with a single counter per generation: you don't really two entry/exit counters (although it might help reduce contention).

Of course, you know have a new problem of managing this list of separate counters per generation... which kind of looks like the original problem of building a lock-free list! This problem is a bit easier though because you might put some reasonable bound on the number of generations "in flight" and just allocate them all up-front, or you might implement a limited type of lock-free list that is easier to reason about because additions and deletions only occur at the head or tail.

于 2018-06-11T20:08:02.353 回答