6

我不习惯在互联网上发布任何问题,所以如果我做错了什么,请告诉我。

简而言之

  1. 如何正确防止 CPU 缓存线大小为 64 字节的 64 位架构上的错误共享?

  2. C++ 'alignas' 关键字和简单字节数组(例如:char[64])的使用如何影响多线程效率?

语境

在对Single Consumer Single Producer Queue进行非常有效的实现时,我在对代码进行基准测试时遇到了来自 GCC 编译器的不合逻辑的行为。

全文

我希望有人有必要的知识来解释发生了什么。

我目前在 Arch linux 上使用 GCC 10.2.0 及其 C++ 20 实现。我的笔记本电脑是配备 i7-7500U 处理器的联想 T470S。

让我从数据结构开始:

class SPSCQueue
{
public:
    ...

private:
    alignas(64) std::atomic<size_t> _tail { 0 }; // Tail accessed by both producer and consumer
    Buffer _buffer {}; // Buffer cache for the producer, equivalent to _buffer2
    std::size_t _headCache { 0 }; // Head cache for the producer
    char _pad0[64 - sizeof(Buffer) - sizeof(std::size_t)]; // 64 bytes alignment padding

    alignas(64) std::atomic<size_t> _head { 0 }; // Head accessed by both producer and consumer
    Buffer _buffer2 {}; // Buffer cache for the consumer, equivalent to _buffer2
    std::size_t _tailCache { 0 }; // Head cache for the consumer
    char _pad1[64 - sizeof(Buffer) - sizeof(std::size_t)]; // 64 bytes alignment padding
};

以下数据结构在我的系统上推送/弹出时获得了快速且稳定的 20ns。

但是,使用以下成员更改对齐方式会使基准不稳定并给出 20 到 30ns 之间的时间。

    alignas(64) std::atomic<size_t> _tail { 0 }; // Tail accessed by both producer and consumer
    struct alignas(64) {
        Buffer _buffer {}; // Buffer cache for the producer, equivalent to _buffer2
        std::size_t _headCache { 0 }; // Head cache for the producer
    };

    alignas(64) std::atomic<size_t> _head { 0 }; // Head accessed by both producer and consumer
    struct alignas(64) {
        Buffer _buffer2 {}; // Buffer cache for the consumer, equivalent to _buffer1
        std::size_t _tailCache { 0 }; // Tail cache for the consumer
    };

最后,当我尝试这种配置时,我更加迷失了,结果在 40 到 55ns 之间。


    std::atomic<size_t> _tail { 0 }; // Tail accessed by both producer and consumer
    char _pad0[64 - sizeof(std::atomic<size_t>)];
    Buffer _buffer {}; // Buffer cache for the producer, equivalent to _buffer2
    std::size_t _headCache { 0 }; // Head cache for the producer
    char _pad1[64 - sizeof(Buffer) - sizeof(std::size_t)];

    std::atomic<size_t> _head { 0 }; // Head accessed by both producer and consumer
    char _pad2[64 - sizeof(std::atomic<size_t>)];
    Buffer _buffer2 {}; // Buffer cache for the consumer, equivalent to _buffer2
    std::size_t _tailCache { 0 }; // Head cache for the consumer
    char _pad3[64 - sizeof(Buffer) - sizeof(std::size_t)];

这次我让队列推送/弹出在 40 到 55ns 之间振荡。

在这一点上我很迷茫,因为我不知道我应该在哪里寻找答案。到目前为止,C++ 内存布局对我来说非常直观,但我意识到我仍然错过了非常重要的知识,以便更好地处理高频多线程。

最少的代码示例

如果你想编译整个代码来自己测试,这里是需要的几个文件:

SPSCQueue.hpp:


#pragma once

#include <atomic>
#include <cstdlib>
#include <cinttypes>

#define KF_ALIGN_CACHELINE alignas(kF::Core::Utils::CacheLineSize)

namespace kF::Core
{
    template<typename Type>
    class SPSCQueue;

    namespace Utils
    {
        /** @brief Helper used to perfect forward move / copy constructor */
        template<typename Type, bool ForceCopy = false>
        void ForwardConstruct(Type *dest, Type *source) {
            if constexpr (!ForceCopy && std::is_move_assignable_v<Type>)
                new (dest) Type(std::move(*source));
            else
                new (dest) Type(*source);
        }

        /** @brief Helper used to perfect forward move / copy assignment */
        template<typename Type, bool ForceCopy = false>
        void ForwardAssign(Type *dest, Type *source) {
            if constexpr (!ForceCopy && std::is_move_assignable_v<Type>)
                *dest = std::move(*source);
            else
                *dest = *source;
        }

        /** @brief Theorical cacheline size */
        constexpr std::size_t CacheLineSize = 64ul;
    }
}

/**
 * @brief The SPSC queue is a lock-free queue that only supports a Single Producer and a Single Consumer
 * The queue is really fast compared to other more flexible implementations because the fact that only two thread can simultaneously read / write
 * means that less synchronization is needed for each operation.
 * The queue supports ranged push / pop to insert multiple elements without performance impact
 *
 * @tparam Type to be inserted
 */
template<typename Type>
class kF::Core::SPSCQueue
{
public:
    /** @brief Buffer structure containing all cells */
    struct Buffer
    {
        Type *data { nullptr };
        std::size_t capacity { 0 };
    };

    /** @brief Local thread cache */
    struct Cache
    {
        Buffer buffer {};
        std::size_t value { 0 };
    };

    /** @brief Default constructor initialize the queue */
    SPSCQueue(const std::size_t capacity);

    /** @brief Destruct and release all memory (unsafe) */
    ~SPSCQueue(void) { clear(); std::free(_buffer.data); }

    /** @brief Push a single element into the queue
     *  @return true if the element has been inserted */
    template<typename ...Args>
    [[nodiscard]] inline bool push(Args &&...args);

    /** @brief Pop a single element from the queue
     *  @return true if an element has been extracted */
    [[nodiscard]] inline bool pop(Type &value);

    /** @brief Clear all elements of the queue (unsafe) */
    void clear(void);

private:
    KF_ALIGN_CACHELINE std::atomic<size_t> _tail { 0 }; // Tail accessed by both producer and consumer
    struct {
        Buffer _buffer {}; // Buffer cache for the producer, equivalent to _buffer2
        std::size_t _headCache { 0 }; // Head cache for the producer
        char _pad0[Utils::CacheLineSize - sizeof(Buffer) - sizeof(std::size_t)];
    };

    KF_ALIGN_CACHELINE std::atomic<size_t> _head { 0 }; // Head accessed by both producer and consumer
    struct{
        Buffer _buffer2 {}; // Buffer cache for the consumer, equivalent to _buffer2
        std::size_t _tailCache { 0 }; // Head cache for the consumer
        char _pad1[Utils::CacheLineSize - sizeof(Buffer) - sizeof(std::size_t)];
    };

    /** @brief Copy and move constructors disabled */
    SPSCQueue(const SPSCQueue &other) = delete;
    SPSCQueue(SPSCQueue &&other) = delete;
};

static_assert(sizeof(kF::Core::SPSCQueue<int>) == 4 * kF::Core::Utils::CacheLineSize);

template<typename Type>
kF::Core::SPSCQueue<Type>::SPSCQueue(const std::size_t capacity)
{
    _buffer.capacity = capacity;
    if (_buffer.data = reinterpret_cast<Type *>(std::malloc(sizeof(Type) * capacity)); !_buffer.data)
        throw std::runtime_error("Core::SPSCQueue: Malloc failed");
    _buffer2 = _buffer;
}

template<typename Type>
template<typename ...Args>
bool kF::Core::SPSCQueue<Type>::push(Args &&...args)
{
    static_assert(std::is_constructible<Type, Args...>::value, "Type must be constructible from Args...");

    const auto tail = _tail.load(std::memory_order_relaxed);
    auto next = tail + 1;

    if (next == _buffer.capacity) [[unlikely]]
        next = 0;
    if (auto head = _headCache; next == head) [[unlikely]] {
        head = _headCache = _head.load(std::memory_order_acquire);
        if (next == head) [[unlikely]]
            return false;
    }
    new (_buffer.data + tail) Type{ std::forward<Args>(args)... };
    _tail.store(next, std::memory_order_release);
    return true;
}

template<typename Type>
bool kF::Core::SPSCQueue<Type>::pop(Type &value)
{
    const auto head = _head.load(std::memory_order_relaxed);

    if (auto tail = _tailCache; head == tail) [[unlikely]] {
        tail = _tailCache = _tail.load(std::memory_order_acquire);
        if (head == tail) [[unlikely]]
            return false;
    }
    auto *elem = reinterpret_cast<Type *>(_buffer2.data + head);
    auto next = head + 1;
    if (next == _buffer2.capacity) [[unlikely]]
        next = 0;
    value = std::move(*elem);
    elem->~Type();
    _head.store(next, std::memory_order_release);
    return true;
}

template<typename Type>
void kF::Core::SPSCQueue<Type>::clear(void)
{
    for (Type type; pop(type););
}

基准,使用google benchmark。bench_SPSCQueue.cpp:

#include <thread>

#include <benchmark/benchmark.h>

#include "SPSCQueue.hpp"

using namespace kF;

using Queue = Core::SPSCQueue<std::size_t>;

constexpr std::size_t Capacity = 4096;

static void SPSCQueue_NoisyPush(benchmark::State &state)
{
    Queue queue(Capacity);
    std::atomic<bool> running = true;
    std::size_t i = 0ul;
    std::thread thd([&queue, &running] { for (std::size_t tmp; running; benchmark::DoNotOptimize(queue.pop(tmp))); });
    for (auto _ : state) {
        decltype(std::chrono::high_resolution_clock::now()) start;
        do {
            start = std::chrono::high_resolution_clock::now();
        } while (!queue.push(42ul));
        auto end = std::chrono::high_resolution_clock::now();
        auto elapsed = std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
        auto iterationTime = elapsed.count();
        state.SetIterationTime(iterationTime);
    }
    running = false;
    if (thd.joinable())
        thd.join();
}
BENCHMARK(SPSCQueue_NoisyPush)->UseManualTime();

static void SPSCQueue_NoisyPop(benchmark::State &state)
{
    Queue queue(Capacity);
    std::atomic<bool> running = true;
    std::size_t i = 0ul;
    std::thread thd([&queue, &running] { while (running) benchmark::DoNotOptimize(queue.push(42ul)); });
    for (auto _ : state) {
        std::size_t tmp;
        decltype(std::chrono::high_resolution_clock::now()) start;
        do {
            start = std::chrono::high_resolution_clock::now();
        } while (!queue.pop(tmp));
        auto end = std::chrono::high_resolution_clock::now();
        auto elapsed = std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
        auto iterationTime = elapsed.count();
        state.SetIterationTime(iterationTime);
    }
    running = false;
    if (thd.joinable())
        thd.join();
}
BENCHMARK(SPSCQueue_NoisyPop)->UseManualTime();

4

1 回答 1

1

感谢您的有用评论(主要感谢 Peter Cordes),看来问题出在 L2 数据预取器上。

由于我的 SPSC 队列设计,每个线程必须访问两个连续的缓存线来推送/弹出队列。如果结构本身未与 128 字节对齐,则其地址将不会在 128 字节上对齐,编译器将无法优化两个对齐的高速缓存行的访问。

因此,简单的解决方法是:

template<typename Type>
class alignas(128) SPSCQueue { ... };

此处(第 2.5.5.4 节数据预取)是英特尔的一篇有趣的论文,解释了对其架构的优化以及如何在不同级别的缓存中进行预取。

于 2020-09-23T13:39:17.837 回答