5

我有一个无锁队列的实现,我认为它是正确的(或者至少是无数据竞争的):

#include <atomic>
#include <iostream>
#include <optional>
#include <thread>

struct Job {
  int id;
  int data;
};

class JobQueue {
  using stdmo = std::memory_order;

  struct Node {
    std::atomic<Node *> next = QUEUE_END;
    Job job;
  };

  static inline Node *const QUEUE_END = nullptr;
  static inline Node *const STACK_END = QUEUE_END + 1;

  struct GenNodePtr {
    Node *node;
    std::uintptr_t gen;
  };

  alignas(64) std::atomic<Node *> jobs_back;
  alignas(64) std::atomic<GenNodePtr> jobs_front;
  alignas(64) std::atomic<GenNodePtr> stack_top;

 public:
  JobQueue()
      : jobs_back{new Node{}},
        jobs_front{GenNodePtr{jobs_back.load(stdmo::relaxed), 1}},
        stack_top{GenNodePtr{STACK_END, 1}} {}

  ~JobQueue() {
    Node *cur_queue = jobs_front.load(stdmo::relaxed).node;
    while (cur_queue != QUEUE_END) {
      Node *next = cur_queue->next;
      delete cur_queue;
      cur_queue = next;
    }

    Node *cur_stack = stack_top.load(stdmo::relaxed).node;
    while (cur_stack != STACK_END) {
      Node *next = cur_stack->next;
      delete cur_stack;
      cur_stack = next;
    }
  }

  Node *allocate_node() {
    GenNodePtr cur_stack = stack_top.load(stdmo::acquire);
    while (true) {
      if (cur_stack.node == STACK_END) {
        return new Node{};
      }
      Node *cur_stack_next = cur_stack.node->next.load(stdmo::relaxed);
      GenNodePtr new_stack{cur_stack_next, cur_stack.gen + 1};
      if (stack_top.compare_exchange_weak(cur_stack, new_stack,
                                          stdmo::acq_rel)) {
        return cur_stack.node;
      }
    }
  }

  void deallocate_node(Node *node) {
    GenNodePtr cur_stack = stack_top.load(stdmo::acquire);
    while (true) {
      node->next.store(cur_stack.node, stdmo::relaxed);
      GenNodePtr new_stack{node, cur_stack.gen + 1};
      if (stack_top.compare_exchange_weak(cur_stack, new_stack,
                                          stdmo::acq_rel)) {
        break;
      }
    }
  }

 public:
  void enqueue(Job job) {
    Node *new_node = allocate_node();
    new_node->next.store(QUEUE_END, stdmo::relaxed);
    Node *old_dummy = jobs_back.exchange(new_node, stdmo::acq_rel);
    old_dummy->job = job;
    old_dummy->next.store(new_node, stdmo::release);
  }

  std::optional<Job> try_dequeue() {
    GenNodePtr old_front = jobs_front.load(stdmo::relaxed);
    while (true) {
      Node *old_front_next = old_front.node->next.load(stdmo::acquire);
      if (old_front_next == QUEUE_END) {
        return std::nullopt;
      }

      GenNodePtr new_front{old_front_next, old_front.gen + 1};
      if (jobs_front.compare_exchange_weak(old_front, new_front,
                                           stdmo::relaxed)) {
        break;
      }
    }

    Job job = old_front.node->job;
    deallocate_node(old_front.node);

    return job;
  }
};

int main() {
  JobQueue queue;

  std::atomic<int> i = 0;

  std::thread consumer{[&queue, &i]() {
    // producer enqueues 1
    while (i.load(std::memory_order_relaxed) != 1) {}
    std::atomic_thread_fence(std::memory_order_acq_rel);

    std::cout << queue.try_dequeue().value_or(Job{-1, -1}).data
              << std::endl;

    std::atomic_thread_fence(std::memory_order_acq_rel);
    i.store(2, std::memory_order_relaxed);

    // producer enqueues 2 and 3
  }};

  std::thread producer{[&queue, &i]() {
    queue.enqueue(Job{1, 1});

    std::atomic_thread_fence(std::memory_order_acq_rel);
    i.store(1, std::memory_order_relaxed);

    // consumer consumes here

    while (i.load(std::memory_order_relaxed) != 2) {}
    std::atomic_thread_fence(std::memory_order_acq_rel);

    queue.enqueue(Job{2, 2});
    queue.enqueue(Job{3, 3});
  }};

  producer.join();
  consumer.join();

  return 0;
}

这个队列被实现为一个单链双端链表。它使用虚拟节点来解耦生产者和消费者,并使用生成计数器和节点回收(使用内部堆栈)来避免 ABA 问题和try_dequeue.

在使用 Clang 13.0.1、Linux x64 编译的 TSan 下运行它,我得到了以下比赛:

WARNING: ThreadSanitizer: data race (pid=223081)
  Write of size 8 at 0x7b0400000008 by thread T2:
    #0 JobQueue::enqueue(Job) .../bug4.cpp:85 (bug4.tsan+0xe3e53)
    #1 operator() .../bug4.cpp:142 (bug4.tsan+0xe39ee)
    ...

  Previous read of size 8 at 0x7b0400000008 by thread T1:
    #0 JobQueue::try_dequeue() .../bug4.cpp:104 (bug4.tsan+0xe3c07)
    #1 operator() .../bug4.cpp:121 (bug4.tsan+0xe381c)
    ...

在 Godbolt 上运行(注意,由于 Godbolt 运行程序的方式,TSan 不显示行号信息)

这场比赛是在从线程try_dequeue调用的先前读入之间进行的:consumer

    Job job = old_front.node->job;

这稍后会写入,这是线程enqueue的第三次调用:enqueueproducer

    old_dummy->job = job;

我相信这场比赛是不可能的,因为线程应该通过获取-释放比较交换producer与线程同步到in和.consumerstack_topallocate_nodedeallocate_node

现在,奇怪的是,制造GenNodePointer alignas(32)消除了比赛。

在神螺栓上运行

问题:

  1. 这场比赛真的可能吗?
  2. 为什么增加 GenNodePointer 的对齐会使 TSan 不再注册比赛?
4

0 回答 0