4

我最近在 C++ 中实现了一个公平的读写器票据自旋锁。代码相当简单,我认为它工作得很好。我已经将自旋锁集成到一个更大的应用程序中,并且我注意到在极少数情况下,代码运行非常缓慢,而大多数时候,它运行得非常快。我知道这是由于自旋锁,因为如果我立即用简单的读写器自旋锁替换它(不公平且没有票),代码突然运行得更快。它在不同的机器上发生了几次。我知道如果你用比内核更多的线程来运行这些锁,它们会运行得很慢,但我在一台有 48 个内核的机器上用 16 个线程运行它。我无法在具有 4 个线程和 4 个内核的笔记本电脑上重现该问题。这是代码:

    inline size_t rndup(size_t v) {

        v--;
        v |= v >> 1;
        v |= v >> 2;
        v |= v >> 4;
        v |= v >> 8;
        v |= v >> 16;
        v |= v >> 32;
        v++;

        return v;
    }    

    class SpinLockRW_MCS {

        public:

            SpinLockRW_MCS(const size_t nb_readers) :   writer(nullptr), lock_pool(nullptr), it_lock_pool(0),
                                                        load_lock_pool(0), mask_it(rndup(2 * nb_readers + 1) - 1),
                                                        padding1{0}, padding2{0}, padding3{0}, padding4{0} {

                if (nb_readers <= std::thread::hardware_concurrency()){

                    lock_pool = new Lock[mask_it + 1];
                    lock_pool[0].is_locked = false;
                }
            }

            ~SpinLockRW_MCS() {

                clear();
            }

            inline void clear() {

                if (lock_pool != nullptr){

                    delete[] lock_pool;
                    lock_pool = nullptr;
                }

                writer = nullptr;

                it_lock_pool = 0;
                load_lock_pool = 0;
            }

            inline void acquire_reader() {

                uint_fast32_t retry = 0;

                const size_t prev_reader_id = it_lock_pool.fetch_add(1) & mask_it;
                const size_t new_reader_id = (prev_reader_id + 1) & mask_it;

                while (lock_pool[prev_reader_id].is_locked){

                    if (++retry > 100) this_thread::yield();
                }

                ++load_lock_pool;

                lock_pool[prev_reader_id].is_locked = true;
                lock_pool[new_reader_id].is_locked = false;
            }

            inline void release_reader() {

                --load_lock_pool;
            }

            inline void acquire_writer() {

                uint_fast32_t retry = 0;

                const size_t prev_reader_id = it_lock_pool.fetch_add(1) & mask_it;
                const size_t new_reader_id = (prev_reader_id + 1) & mask_it;

                while (lock_pool[prev_reader_id].is_locked){

                    if (++retry > 100) this_thread::yield();
                }

                while (load_lock_pool){

                    if (++retry > 100) this_thread::yield();
                }

                lock_pool[prev_reader_id].is_locked = true;

                writer = &lock_pool[new_reader_id];
            }

            inline void release_writer() {

                writer->is_locked = false;
            }

            inline void release_writer_acquire_reader() {

                ++load_lock_pool;

                writer->is_locked = false;
            }

        private:

            struct Lock {

                std::atomic<bool> is_locked;
                const int padding[15];

                Lock() : is_locked(true), padding{0} {}
            };

            Lock* writer;
            const int padding1[14];
            Lock* lock_pool;
            const int padding2[14];
            const size_t mask_it;
            const int padding3[14];
            std::atomic<size_t> it_lock_pool;
            const int padding4[14];
            std::atomic<size_t> load_lock_pool;
    };

任何建议将不胜感激!谢谢!

4

2 回答 2

4

在没有更多细节的情况下评估问题有点困难,但这是我在黑暗中的镜头:我怀疑在您的场景中,读者需要非常频繁地获取锁(否则,使用传统锁可能会更好)。这是你的问题:

任何一个线程都能够饿死所有其他线程。

这对读者和作者都是如此,而在非公平算法中,它通常只对作者是正确的。当您有多个阅读器排队等待读取访问时,您的情况就会出现问题。每个线程都将等待前面的锁变为可用(while (lock_pool[prev_reader_id].is_locked) ...)。如果他们能得到那个锁,一切都很好,但是一旦一个线程无法得到它,你就会遇到麻烦。所有阅读器线程都排队等待其前任翻转到false. 他们每个人都依赖于他们的直接前任。

现在想象第一个读者无法获得锁。它会继续旋转一段时间,最终yield()。这实际上意味着您的线程现在不再运行。操作系统将其从调度队列中删除,并且它不会运行很长时间(它们的其余时间片,与完成 100 次旋转所需的时间相比要长)。因此,等待线程的完整链很可能会进入 yield。

最终,第一个线程正在等待的标志将翻转为false. 但是您的调度程序现在陷入困境。它周围有一堆线程,但它们只旋转了很短的时间,然后再次进入屈服状态。这里的期望是,除了链条中的第一个线程之外,如果它们被选中,它们几乎肯定会注定要休眠以获得更完整的时间片。因此,如果这种情况发生在等待线程链中的早期线程,您也会谴责链中的所有其他线程至少等待同样长的时间。

您在这里玩的是一种概率游戏,随着队列中读者数量的增加,您获胜的几率会显着降低。这就是为什么当从 4 个线程移动到 16 个线程时问题变得更糟的原因。特别是,一旦达到新读者到达队列所需的平均时间大致与线程在队列中移动所需的时间顺序相同,您将很难返回再次到一个空队列。这并非不可能,因为我们在这里讨论的是多个时间片,这会将您带到几十到几百毫秒的数量级。

这是公平调度算法中的典型权衡。公平是有代价的,在这种情况下,一个读者可以屏蔽所有人。由于如果您设法先进行获取电话,我的读者永远不会超过您的读者,如果您不继续前进,我将不得不永远等待。这个问题的一个解决方案是给调度器额外的信息每个线程正在等待什么,这样它就有更好的机会以正确的顺序唤醒它们。另一种方法是选择更适合您的特定场景的不同算法。

于 2019-04-25T14:35:35.840 回答
0

我敢打赌,您的问题在以下几行附近:

if (++retry > 100) this_thread::yield();

我知道这就是您计划“乐观”的方式,但是像这样的硬编码(任意)值(在本例中为“100”)通常表明在处理此类问题时存在设计缺陷 - 就像您说的那样查看另一个系统上的问题,这可能是该问题的症状(因为使用此值,它似乎适用于您的系统)。这反过来又指出this_thread::yield()了问题的一部分。

于 2019-04-24T11:17:23.920 回答