42

我们发现在我们的代码中有几个地方,对受互斥体保护的数据的并发读取相当普遍,而写入很少见。我们的测量似乎表明,使用简单的互斥锁会严重影响读取该数据的代码的性能。所以我们需要的是一个多读/单写互斥锁。我知道这可以建立在更简单的原语之上,但在我尝试这样做之前,我宁愿询问现有的知识:

用更简单的同步原语构建多读/单写锁的认可方法是什么?

我确实知道如何做到这一点,但我宁愿得到不受我(可能是错误地)提出的答案的偏见。(注意:我期望的是如何做到这一点的解释,可能是伪代码,而不是完整的实现。我当然可以自己编写代码。)

注意事项:

  • 这需要有合理的表现。(我的想法是每次访问都需要两次锁定/解锁操作。现在这可能还不够好,但需要其中很多似乎是不合理的。)

  • 通常,读取次数更多,但写入比读取更重要且对性能更敏感。读者不能饿死作家。

  • 我们被困在一个相当老的嵌入式平台(VxWorks 5.5 的专有变体)上,有一个相当老的编译器(GCC 4.1.2)和 boost 1.52——除了大部分 boost 的部分依赖于 POSIX,因为 POSIX 没有完全实现在那个平台上。可用的锁定原语基本上是几种信号量(二进制、计数等),在它们之上我们已经创建了互斥锁、条件变量和监视器。

  • 这是 IA32,单核。

4

8 回答 8

22

乍一看,我以为我认为这个答案与 Alexander Terekhov 介绍的算法相同。但是在研究它之后,我认为它是有缺陷的。两个写者同时等待是可能的m_exclusive_cond。当其中一个写入器唤醒并获得排他锁时,它将设置exclusive_waiting_blocked = false为 on unlock,从而将互斥锁设置为不一致的状态。在那之后,互斥体可能会被冲洗掉。

首次提出的N2406std::shared_mutex包含一个部分实现,下面用更新的语法重复该实现。

class shared_mutex
{
    mutex    mut_;
    condition_variable gate1_;
    condition_variable gate2_;
    unsigned state_;

    static const unsigned write_entered_ = 1U << (sizeof(unsigned)*CHAR_BIT - 1);
    static const unsigned n_readers_ = ~write_entered_;

public:

    shared_mutex() : state_(0) {}

// Exclusive ownership

    void lock();
    bool try_lock();
    void unlock();

// Shared ownership

    void lock_shared();
    bool try_lock_shared();
    void unlock_shared();
};

// Exclusive ownership

void
shared_mutex::lock()
{
    unique_lock<mutex> lk(mut_);
    while (state_ & write_entered_)
        gate1_.wait(lk);
    state_ |= write_entered_;
    while (state_ & n_readers_)
        gate2_.wait(lk);
}

bool
shared_mutex::try_lock()
{
    unique_lock<mutex> lk(mut_, try_to_lock);
    if (lk.owns_lock() && state_ == 0)
    {
        state_ = write_entered_;
        return true;
    }
    return false;
}

void
shared_mutex::unlock()
{
    {
    lock_guard<mutex> _(mut_);
    state_ = 0;
    }
    gate1_.notify_all();
}

// Shared ownership

void
shared_mutex::lock_shared()
{
    unique_lock<mutex> lk(mut_);
    while ((state_ & write_entered_) || (state_ & n_readers_) == n_readers_)
        gate1_.wait(lk);
    unsigned num_readers = (state_ & n_readers_) + 1;
    state_ &= ~n_readers_;
    state_ |= num_readers;
}

bool
shared_mutex::try_lock_shared()
{
    unique_lock<mutex> lk(mut_, try_to_lock);
    unsigned num_readers = state_ & n_readers_;
    if (lk.owns_lock() && !(state_ & write_entered_) && num_readers != n_readers_)
    {
        ++num_readers;
        state_ &= ~n_readers_;
        state_ |= num_readers;
        return true;
    }
    return false;
}

void
shared_mutex::unlock_shared()
{
    lock_guard<mutex> _(mut_);
    unsigned num_readers = (state_ & n_readers_) - 1;
    state_ &= ~n_readers_;
    state_ |= num_readers;
    if (state_ & write_entered_)
    {
        if (num_readers == 0)
            gate2_.notify_one();
    }
    else
    {
        if (num_readers == n_readers_ - 1)
            gate1_.notify_one();
    }
}

该算法源自 Alexander Terekhov 的旧新闻组帖子。它既不会饿死读者,也不会饿死作者。

有两个“门”,gate1_gate2_。读者和作者必须通过gate1_,并且在尝试这样做时可能会被阻止。一旦读者过去了gate1_,它就读锁定了互斥锁。gate1_只要拥有所有权的读者没有达到最大数量,并且只要作者没有通过,读者就可以通过gate1_

一次只有一个作家可以通过gate1_gate1_即使读者拥有所有权,作家也可以过去。但是一旦过去gate1_,作家仍然没有所有权。它必须先过去gate2_gate2_在所有拥有所有权的读者都放弃之前,作家无法超越。gate1_回想一下,当作家在等待时,新读者无法通过gate2_gate1_当一个作家在等待时,一个新作家也不能过去gate2_

读者和作者都被gate1_(几乎)相同的要求阻止的特征是什么让这个算法对读者和作者都公平,两者都不挨饿。

互斥体“状态”被有意保留在一个单词中,以暗示对某些状态更改部分使用原子(作为优化)是可能的(即,对于无争议的“快速路径”)。然而,这里没有展示这种优化。一个示例是,如果编写器线程可以原子地state_从 0 更改为write_entered然后他获得锁而无需阻塞甚至 lock/unlock mut_。并且unlock()可以用原子存储来实现。等等。这些优化在这里没有显示,因为它们比这个简单的描述听起来更难正确实现。

于 2015-01-25T19:53:03.937 回答
19

似乎您只有 mutex 和 condition_variable 作为同步原语。所以,我在这里写了一个读写锁,让读者饿死。它使用一个互斥体、两个条件变量和三个整数。

readers - readers in the cv readerQ plus the reading reader
writers - writers in cv writerQ plus the writing writer
active_writers - the writer currently writing. can only be 1 or 0.

它以这种方式饿死读者。如果有几个作家要写,读者将永远没有机会阅读,直到所有作家都完成写作。这是因为以后的读者需要检查writers变量。同时,该active_writers变量将保证一次只能写一个作者。

class RWLock {
public:
    RWLock()
    : shared()
    , readerQ(), writerQ()
    , active_readers(0), waiting_writers(0), active_writers(0)
    {}

    void ReadLock() {
        std::unique_lock<std::mutex> lk(shared);
        while( waiting_writers != 0 )
            readerQ.wait(lk);
        ++active_readers;
        lk.unlock();
    }

    void ReadUnlock() {
        std::unique_lock<std::mutex> lk(shared);
        --active_readers;
        lk.unlock();
        writerQ.notify_one();
    }

    void WriteLock() {
        std::unique_lock<std::mutex> lk(shared);
        ++waiting_writers;
        while( active_readers != 0 || active_writers != 0 )
            writerQ.wait(lk);
        ++active_writers;
        lk.unlock();
    }

    void WriteUnlock() {
        std::unique_lock<std::mutex> lk(shared);
        --waiting_writers;
        --active_writers;
        if(waiting_writers > 0)
            writerQ.notify_one();
        else
            readerQ.notify_all();
        lk.unlock();
    }

private:
    std::mutex              shared;
    std::condition_variable readerQ;
    std::condition_variable writerQ;
    int                     active_readers;
    int                     waiting_writers;
    int                     active_writers;
};
于 2015-01-24T01:59:57.883 回答
5

由互斥锁保护的数据的并发读取相当普遍,而写入很少见

这听起来像是用户空间 RCU的理想场景:

URCU 类似于它的 Linux 内核对应物,提供了读写器锁定的替代品,以及其他用途。这种相似性继续存在于阅读器不直接与 RCU 更新器同步,从而使 RCU 读取端代码路径非常快,同时还允许 RCU 阅读器即使在与 RCU 更新器同时运行时也能进行有用的向前推进,反之亦然。

于 2015-01-23T11:01:35.040 回答
4

您可以采取一些好的技巧来提供帮助。

一是性能好。VxWorks 以其非常好的上下文切换时间而著称。无论您使用哪种锁定解决方案,它都可能涉及信号量。我不会害怕为此使用信号量(复数),它们在 VxWorks 中得到了很好的优化,并且快速的上下文切换时间有助于减少评估许多信号量状态等导致的性能下降。

此外,我会忘记使用 POSIX 信号量,它们将简单地叠加在 VxWork 自己的信号量之上。VxWorks 提供本地计数、二进制和互斥信号量;使用适合的那一款会让这一切变得更快。二进制有时非常有用。多次发布,永远不会超过1的值。

其次,写比读更重要。当我在 VxWorks 中有这种需求并且一直使用信号量来控制访问时,我使用任务优先级来指示哪个任务更重要并且应该首先访问资源。这工作得很好;从字面上看,VxWorks 中的所有东西都是一个任务(嗯,线程),就像其他任何东西一样,包括所有设备驱动程序等。

VxWorks 还解决了优先级倒置(Linus Torvalds 讨厌的那种事情)。因此,如果您使用信号量实现锁定,则可以依靠 OS 调度程序在较低优先级的读取器阻塞较高优先级的写入器时分配它们。它可以导致更简单的代码,并且您也可以充分利用操作系统。

因此,一个潜在的解决方案是使用单个 VxWorks 计数信号量来保护资源,并将其初始化为等于读取器数量的值。每次读者想要阅读时,它都会使用信号量(将计数减少 1。每次读取完成时,它都会发布信号量,将计数增加 1。每次写入者想要写入时,它都会使用信号量 n(n =读者数)次,完成后发布n次。最后使写任务的优先级高于任何读者,并依靠操作系统快速的上下文切换时间和优先级倒置。

请记住,您是在硬实时操作系统上编程,而不是在 Linux 上。获取/发布本机 VxWorks 信号量并不涉及与 Linux 上的类似行为相同的运行时间,尽管如今即使 Linux 也相当不错(我现在正在使用 PREEMPT_RT)。可以依赖 VxWorks 调度程序和所有设备驱动程序来运行。如果您愿意,您甚至可以将您的编写器任务设置为整个系统中的最高优先级,甚至高于所有设备驱动程序!

为了帮助解决问题,还要考虑每个线程在做什么。VxWorks 允许您指示任务是否正在使用 FPU。如果您使用本地 VxWorks TaskSpawn 例程而不是 pthread_create,那么您有机会指定它。这意味着如果您的线程/任务没有进行任何浮点数学运算,并且您在调用 TaskSpawn 时已经说过,上下文切换时间会更快,因为调度程序不会费心保留 /恢复 FPU 状态。

这很有可能成为您正在开发的平台上的最佳解决方案。它发挥了操作系统的优势(快速信号量、快速上下文切换时间),而无需引入大量额外代码来重新创建其他平台上常见的替代(可能更优雅)解决方案。

第三,坚持使用旧的 GCC 和旧的 Boost。基本上,除了打电话给 WindRiver 和讨论购买升级的低价值建议外,我无能为力。就我个人而言,当我为 VxWorks 编程时,我使用的是 VxWork 的原生 API 而不是 POSIX。好的,所以代码不是很便携,但它最终很快;无论如何,POSIX 只是原生 API 之上的一层,这总是会减慢速度。

也就是说,POSIX 计数和互斥信号量与 VxWork 的本机计数和互斥信号量非常相似。这可能意味着 POSIX 分层不是很厚。

关于 VxWorks 编程的一般说明

调试我一直试图使用可用于 Solaris 的开发工具 (Tornado)。这是迄今为止我遇到过的最好的多线程调试环境。为什么?它允许您启动多个调试会话,一个用于系统中的每个线程/任务。您最终会为每个线程提供一个调试窗口,并且您可以单独且独立地调试每个线程。跳过阻塞操作,该调试窗口被阻塞。将鼠标焦点移动到另一个调试窗口,跳过将释放块的操作并观察第一个窗口完成它的步骤。

你最终会得到很多调试窗口,但它是迄今为止调试多线程东西的最佳方式。它使编写非常复杂的东西并发现问题变得非常容易。您可以轻松地探索应用程序中的不同动态交互,因为您可以随时简单而强大地控制每个线程正在执行的操作。

具有讽刺意味的是,Windows 版本的 Tornado 不允许您这样做。每个系统只有一个可悲的单个调试窗口,就像任何其他无聊的旧 IDE(如 Visual Studio 等)一样。我从未见过甚至现代 IDE 在多线程调试方面与 Solaris 上的 Tornado 一样好。

硬盘如果您的读者和作者正在使用磁盘上的文件,请考虑 VxWorks 5.5 已经相当老了。不会支持像 NCQ 这样的东西。在这种情况下,我提出的解决方案(如上所述)可能会更好地使用单个互斥信号量来完成,以阻止多个读取器在读取磁盘的不同部分时相互绊倒。这取决于您的读者到底在做什么,但如果他们正在从文件中读取连续数据,这将避免读/写头在磁盘表面来回颠簸(非常慢)。

在我的例子中,我使用这个技巧来调整网络接口上的流量;每个任务都在发送不同类型的数据,任务优先级反映了网络上数据的优先级。它非常优雅,没有消息被分割,但重要的消息占据了可用带宽的最大份额。

于 2015-01-27T07:07:06.190 回答
2

一如既往,最好的解决方案将取决于细节。读写自旋锁可能是您正在寻找的,但其他方法(例如上面建议的 read-copy-update)可能是一种解决方案 - 尽管在旧的嵌入式平台上使用的额外内存可能是一个问题。对于罕见的写入,我经常使用任务系统来安排工作,这样写入只能在没有从该数据结构读取的情况下发生,但这取决于算法。

于 2015-01-23T11:19:01.077 回答
1

在Concurrent Control with Readers and Writers中描述了一种基于信号量和互斥锁的算法 PJ Courtois、F. Heymans 和 DL Parnas;MBLE研究实验室;比利时布鲁塞尔

于 2015-01-23T18:44:26.600 回答
0

这是基于我的 Boost 标头的简化答案(我将 Boost 称为批准的方式)。它只需要条件变量和互斥锁。我使用 Windows 原语重写了它,因为我发现它们具有描述性且非常简单,但将其视为伪代码。

这是一个非常简单的解决方案,它不支持互斥锁升级或 try_lock() 操作。如果你愿意,我可以添加这些。我还去掉了一些装饰,比如禁用并非绝对必要的中断。

此外,值得一试boost\thread\pthread\shared_mutex.hpp(这是基于此)。它是人类可读的。

class SharedMutex {
  CRITICAL_SECTION m_state_mutex;
  CONDITION_VARIABLE m_shared_cond;
  CONDITION_VARIABLE m_exclusive_cond;

  size_t shared_count;
  bool exclusive;

  // This causes write blocks to prevent further read blocks
  bool exclusive_wait_blocked;

  SharedMutex() : shared_count(0), exclusive(false)
  {
    InitializeConditionVariable (m_shared_cond);
    InitializeConditionVariable (m_exclusive_cond);
    InitializeCriticalSection (m_state_mutex);
  }

  ~SharedMutex() 
  {
    DeleteCriticalSection (&m_state_mutex);
    DeleteConditionVariable (&m_exclusive_cond);
    DeleteConditionVariable (&m_shared_cond);
  }

  // Write lock
  void lock(void)
  {
    EnterCriticalSection (&m_state_mutex);
    while (shared_count > 0 || exclusive)
    {
      exclusive_waiting_blocked = true;
      SleepConditionVariableCS (&m_exclusive_cond, &m_state_mutex, INFINITE)
    }
    // This thread now 'owns' the mutex
    exclusive = true;

    LeaveCriticalSection (&m_state_mutex);
  }

  void unlock(void)
  {
    EnterCriticalSection (&m_state_mutex);
    exclusive = false;
    exclusive_waiting_blocked = false;
    LeaveCriticalSection (&m_state_mutex);
    WakeConditionVariable (&m_exclusive_cond);
    WakeAllConditionVariable (&m_shared_cond);
  }

  // Read lock
  void lock_shared(void)
  {
    EnterCriticalSection (&m_state_mutex);
    while (exclusive || exclusive_waiting_blocked)
    {
      SleepConditionVariableCS (&m_shared_cond, m_state_mutex, INFINITE);
    }
    ++shared_count;
    LeaveCriticalSection (&m_state_mutex);
  }

  void unlock_shared(void)
  {
    EnterCriticalSection (&m_state_mutex);
    --shared_count;

    if (shared_count == 0)
    {
      exclusive_waiting_blocked = false;
      LeaveCriticalSection (&m_state_mutex);
      WakeConditionVariable (&m_exclusive_cond);
      WakeAllConditionVariable (&m_shared_cond);
    }
    else
    {
      LeaveCriticalSection (&m_state_mutex);
    }
  }
};

行为

好的,这个算法的行为有些混乱,所以这里是它的工作原理。

在写入锁定期间- 读取器和写入器都被阻止。

在写锁结束时——读者线程和一个写者线程将竞相看哪一个开始。

在读取锁定期间- 写入器被阻止。当且仅当Writer 被阻止时, Reader 也会被阻止。

在最终的读取锁释放时——读取器线程和一个写入器线程将竞相看哪一个启动。

如果处理器在通知期间频繁地将上下文切换到线程,这可能会导致读者饿死作者,但我怀疑这个问题是理论上的而不是实际的,因为它是 Boost 的算法。m_shared_condm_exclusive_cond

于 2015-01-23T19:42:13.320 回答
0

现在微软已经开放了 .NET 源代码,你可以看看他们的ReaderWRiterLockSlim实现。

我不确定他们使用的更基本的原语是否可供您使用,其中一些也是 .NET 库的一部分,并且它们的代码也可用。

Microsoft 已经花费了大量时间来改进其锁定机制的性能,因此这可能是一个很好的起点。

于 2015-01-30T08:18:19.573 回答