我想出了一个想法,我正在尝试实现一个不依赖引用计数来解决 ABA 问题的无锁堆栈,并且还可以正确处理内存回收。它在概念上类似于 RCU,并且依赖于两个功能:将列表条目标记为已删除,以及跟踪遍历列表的读者。前者很简单,它只是使用指针的 LSB。后者是我在实现无限制无锁堆栈的方法上的“聪明”尝试。
基本上,当任何线程尝试遍历列表时,一个原子计数器 (list.entries) 会递增。当遍历完成时,第二个计数器 (list.exits) 会增加。
节点分配由 push 处理,释放由 pop 处理。
push 和 pop 操作与朴素的无锁堆栈实现非常相似,但必须遍历标记为要删除的节点才能到达未标记的条目。因此,推送基本上很像链表插入。
pop 操作类似地遍历列表,但它使用 atomic_fetch_or 在遍历时将节点标记为已删除,直到到达未标记的节点。
在遍历 0 个或多个标记节点的列表后,正在弹出的线程将尝试对堆栈的头部进行 CAS。至少有一个线程并发弹出会成功,在此之后所有进入堆栈的读取器将不再看到以前标记的节点。
成功更新列表的线程然后加载原子 list.entries,并且基本上自旋加载 atomic.exits,直到该计数器最终超过 list.entries。这应该意味着列表的“旧”版本的所有读者都已完成。然后线程简单地释放它从列表顶部交换的标记节点列表。
所以 pop 操作的含义应该是(我认为)不存在 ABA 问题,因为在所有使用它们的并发读者完成之前,被释放的节点不会返回到可用的指针池中,显然内存回收问题出于同样的原因,也会被处理。
所以无论如何,这是理论上的,但我仍然对实现摸不着头脑,因为它目前不起作用(在多线程情况下)。似乎我在免费问题之后得到了一些写作,但我在发现问题时遇到了麻烦,或者我的假设可能有缺陷并且它不起作用。
任何关于概念和调试代码方法的见解都将不胜感激。
这是我当前的(损坏的)代码(使用 gcc -D_GNU_SOURCE -std=c11 -Wall -O0 -g -pthread -o list list.c 编译):
#include <pthread.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdlib.h>
#include <sys/resource.h>
#include <stdio.h>
#include <unistd.h>
#define NUM_THREADS 8
#define NUM_OPS (1024 * 1024)
typedef uint64_t list_data_t;
typedef struct list_node_t {
struct list_node_t * _Atomic next;
list_data_t data;
} list_node_t;
typedef struct {
list_node_t * _Atomic head;
int64_t _Atomic size;
uint64_t _Atomic entries;
uint64_t _Atomic exits;
} list_t;
enum {
NODE_IDLE = (0x0),
NODE_REMOVED = (0x1 << 0),
NODE_FREED = (0x1 << 1),
NODE_FLAGS = (0x3),
};
static __thread struct {
uint64_t add_count;
uint64_t remove_count;
uint64_t added;
uint64_t removed;
uint64_t mallocd;
uint64_t freed;
} stats;
#define NODE_IS_SET(p, f) (((uintptr_t)p & f) == f)
#define NODE_SET_FLAG(p, f) ((void *)((uintptr_t)p | f))
#define NODE_CLR_FLAG(p, f) ((void *)((uintptr_t)p & ~f))
#define NODE_POINTER(p) ((void *)((uintptr_t)p & ~NODE_FLAGS))
list_node_t * list_node_new(list_data_t data)
{
list_node_t * new = malloc(sizeof(*new));
new->data = data;
stats.mallocd++;
return new;
}
void list_node_free(list_node_t * node)
{
free(node);
stats.freed++;
}
static void list_add(list_t * list, list_data_t data)
{
atomic_fetch_add_explicit(&list->entries, 1, memory_order_seq_cst);
list_node_t * new = list_node_new(data);
list_node_t * _Atomic * next = &list->head;
list_node_t * current = atomic_load_explicit(next, memory_order_seq_cst);
do
{
stats.add_count++;
while ((NODE_POINTER(current) != NULL) &&
NODE_IS_SET(current, NODE_REMOVED))
{
stats.add_count++;
current = NODE_POINTER(current);
next = ¤t->next;
current = atomic_load_explicit(next, memory_order_seq_cst);
}
atomic_store_explicit(&new->next, current, memory_order_seq_cst);
}
while(!atomic_compare_exchange_weak_explicit(
next, ¤t, new,
memory_order_seq_cst, memory_order_seq_cst));
atomic_fetch_add_explicit(&list->exits, 1, memory_order_seq_cst);
atomic_fetch_add_explicit(&list->size, 1, memory_order_seq_cst);
stats.added++;
}
static bool list_remove(list_t * list, list_data_t * pData)
{
uint64_t entries = atomic_fetch_add_explicit(
&list->entries, 1, memory_order_seq_cst);
list_node_t * start = atomic_fetch_or_explicit(
&list->head, NODE_REMOVED, memory_order_seq_cst);
list_node_t * current = start;
stats.remove_count++;
while ((NODE_POINTER(current) != NULL) &&
NODE_IS_SET(current, NODE_REMOVED))
{
stats.remove_count++;
current = NODE_POINTER(current);
current = atomic_fetch_or_explicit(¤t->next,
NODE_REMOVED, memory_order_seq_cst);
}
uint64_t exits = atomic_fetch_add_explicit(
&list->exits, 1, memory_order_seq_cst) + 1;
bool result = false;
current = NODE_POINTER(current);
if (current != NULL)
{
result = true;
*pData = current->data;
current = atomic_load_explicit(
¤t->next, memory_order_seq_cst);
atomic_fetch_add_explicit(&list->size,
-1, memory_order_seq_cst);
stats.removed++;
}
start = NODE_SET_FLAG(start, NODE_REMOVED);
if (atomic_compare_exchange_strong_explicit(
&list->head, &start, current,
memory_order_seq_cst, memory_order_seq_cst))
{
entries = atomic_load_explicit(&list->entries, memory_order_seq_cst);
while ((int64_t)(entries - exits) > 0)
{
pthread_yield();
exits = atomic_load_explicit(&list->exits, memory_order_seq_cst);
}
list_node_t * end = NODE_POINTER(current);
list_node_t * current = NODE_POINTER(start);
while (current != end)
{
list_node_t * tmp = current;
current = atomic_load_explicit(¤t->next, memory_order_seq_cst);
list_node_free(tmp);
current = NODE_POINTER(current);
}
}
return result;
}
static list_t list;
pthread_mutex_t ioLock = PTHREAD_MUTEX_INITIALIZER;
void * thread_entry(void * arg)
{
sleep(2);
int id = *(int *)arg;
for (int i = 0; i < NUM_OPS; i++)
{
bool insert = random() % 2;
if (insert)
{
list_add(&list, i);
}
else
{
list_data_t data;
list_remove(&list, &data);
}
}
struct rusage u;
getrusage(RUSAGE_THREAD, &u);
pthread_mutex_lock(&ioLock);
printf("Thread %d stats:\n", id);
printf("\tadded = %lu\n", stats.added);
printf("\tremoved = %lu\n", stats.removed);
printf("\ttotal added = %ld\n", (int64_t)(stats.added - stats.removed));
printf("\tadded count = %lu\n", stats.add_count);
printf("\tremoved count = %lu\n", stats.remove_count);
printf("\tadd average = %f\n", (float)stats.add_count / stats.added);
printf("\tremove average = %f\n", (float)stats.remove_count / stats.removed);
printf("\tmallocd = %lu\n", stats.mallocd);
printf("\tfreed = %lu\n", stats.freed);
printf("\ttotal mallocd = %ld\n", (int64_t)(stats.mallocd - stats.freed));
printf("\tutime = %f\n", u.ru_utime.tv_sec
+ u.ru_utime.tv_usec / 1000000.0f);
printf("\tstime = %f\n", u.ru_stime.tv_sec
+ u.ru_stime.tv_usec / 1000000.0f);
pthread_mutex_unlock(&ioLock);
return NULL;
}
int main(int argc, char ** argv)
{
struct {
pthread_t thread;
int id;
}
threads[NUM_THREADS];
for (int i = 0; i < NUM_THREADS; i++)
{
threads[i].id = i;
pthread_create(&threads[i].thread, NULL, thread_entry, &threads[i].id);
}
for (int i = 0; i < NUM_THREADS; i++)
{
pthread_join(threads[i].thread, NULL);
}
printf("Size = %ld\n", atomic_load(&list.size));
uint32_t count = 0;
list_data_t data;
while(list_remove(&list, &data))
{
count++;
}
printf("Removed %u\n", count);
}