我有一个无锁队列的实现,我认为它是正确的(或者至少是无数据竞争的):
#include <atomic>
#include <iostream>
#include <optional>
#include <thread>
struct Job {
int id;
int data;
};
class JobQueue {
using stdmo = std::memory_order;
struct Node {
std::atomic<Node *> next = QUEUE_END;
Job job;
};
static inline Node *const QUEUE_END = nullptr;
static inline Node *const STACK_END = QUEUE_END + 1;
struct GenNodePtr {
Node *node;
std::uintptr_t gen;
};
alignas(64) std::atomic<Node *> jobs_back;
alignas(64) std::atomic<GenNodePtr> jobs_front;
alignas(64) std::atomic<GenNodePtr> stack_top;
public:
JobQueue()
: jobs_back{new Node{}},
jobs_front{GenNodePtr{jobs_back.load(stdmo::relaxed), 1}},
stack_top{GenNodePtr{STACK_END, 1}} {}
~JobQueue() {
Node *cur_queue = jobs_front.load(stdmo::relaxed).node;
while (cur_queue != QUEUE_END) {
Node *next = cur_queue->next;
delete cur_queue;
cur_queue = next;
}
Node *cur_stack = stack_top.load(stdmo::relaxed).node;
while (cur_stack != STACK_END) {
Node *next = cur_stack->next;
delete cur_stack;
cur_stack = next;
}
}
Node *allocate_node() {
GenNodePtr cur_stack = stack_top.load(stdmo::acquire);
while (true) {
if (cur_stack.node == STACK_END) {
return new Node{};
}
Node *cur_stack_next = cur_stack.node->next.load(stdmo::relaxed);
GenNodePtr new_stack{cur_stack_next, cur_stack.gen + 1};
if (stack_top.compare_exchange_weak(cur_stack, new_stack,
stdmo::acq_rel)) {
return cur_stack.node;
}
}
}
void deallocate_node(Node *node) {
GenNodePtr cur_stack = stack_top.load(stdmo::acquire);
while (true) {
node->next.store(cur_stack.node, stdmo::relaxed);
GenNodePtr new_stack{node, cur_stack.gen + 1};
if (stack_top.compare_exchange_weak(cur_stack, new_stack,
stdmo::acq_rel)) {
break;
}
}
}
public:
void enqueue(Job job) {
Node *new_node = allocate_node();
new_node->next.store(QUEUE_END, stdmo::relaxed);
Node *old_dummy = jobs_back.exchange(new_node, stdmo::acq_rel);
old_dummy->job = job;
old_dummy->next.store(new_node, stdmo::release);
}
std::optional<Job> try_dequeue() {
GenNodePtr old_front = jobs_front.load(stdmo::relaxed);
while (true) {
Node *old_front_next = old_front.node->next.load(stdmo::acquire);
if (old_front_next == QUEUE_END) {
return std::nullopt;
}
GenNodePtr new_front{old_front_next, old_front.gen + 1};
if (jobs_front.compare_exchange_weak(old_front, new_front,
stdmo::relaxed)) {
break;
}
}
Job job = old_front.node->job;
deallocate_node(old_front.node);
return job;
}
};
int main() {
JobQueue queue;
std::atomic<int> i = 0;
std::thread consumer{[&queue, &i]() {
// producer enqueues 1
while (i.load(std::memory_order_relaxed) != 1) {}
std::atomic_thread_fence(std::memory_order_acq_rel);
std::cout << queue.try_dequeue().value_or(Job{-1, -1}).data
<< std::endl;
std::atomic_thread_fence(std::memory_order_acq_rel);
i.store(2, std::memory_order_relaxed);
// producer enqueues 2 and 3
}};
std::thread producer{[&queue, &i]() {
queue.enqueue(Job{1, 1});
std::atomic_thread_fence(std::memory_order_acq_rel);
i.store(1, std::memory_order_relaxed);
// consumer consumes here
while (i.load(std::memory_order_relaxed) != 2) {}
std::atomic_thread_fence(std::memory_order_acq_rel);
queue.enqueue(Job{2, 2});
queue.enqueue(Job{3, 3});
}};
producer.join();
consumer.join();
return 0;
}
这个队列被实现为一个单链双端链表。它使用虚拟节点来解耦生产者和消费者,并使用生成计数器和节点回收(使用内部堆栈)来避免 ABA 问题和try_dequeue
.
在使用 Clang 13.0.1、Linux x64 编译的 TSan 下运行它,我得到了以下比赛:
WARNING: ThreadSanitizer: data race (pid=223081)
Write of size 8 at 0x7b0400000008 by thread T2:
#0 JobQueue::enqueue(Job) .../bug4.cpp:85 (bug4.tsan+0xe3e53)
#1 operator() .../bug4.cpp:142 (bug4.tsan+0xe39ee)
...
Previous read of size 8 at 0x7b0400000008 by thread T1:
#0 JobQueue::try_dequeue() .../bug4.cpp:104 (bug4.tsan+0xe3c07)
#1 operator() .../bug4.cpp:121 (bug4.tsan+0xe381c)
...
在 Godbolt 上运行(注意,由于 Godbolt 运行程序的方式,TSan 不显示行号信息)
这场比赛是在从线程try_dequeue
调用的先前读入之间进行的:consumer
Job job = old_front.node->job;
这稍后会写入,这是线程enqueue
的第三次调用:enqueue
producer
old_dummy->job = job;
我相信这场比赛是不可能的,因为线程应该通过获取-释放比较交换producer
与线程同步到in和.consumer
stack_top
allocate_node
deallocate_node
现在,奇怪的是,制造GenNodePointer
alignas(32)
消除了比赛。
问题:
- 这场比赛真的可能吗?
- 为什么增加 GenNodePointer 的对齐会使 TSan 不再注册比赛?