作为工作项目的一部分,我必须使用 Win32 api 在 C++ 中实现读/写锁。所有现有的解决方案都使用在执行期间需要上下文切换的内核对象(信号量和互斥体)。这对我的应用程序来说太慢了。
如果可能的话,我想只使用关键部分来实现一个。锁不必是进程安全的,只有线程安全的。关于如何解决这个问题的任何想法?
作为工作项目的一部分,我必须使用 Win32 api 在 C++ 中实现读/写锁。所有现有的解决方案都使用在执行期间需要上下文切换的内核对象(信号量和互斥体)。这对我的应用程序来说太慢了。
如果可能的话,我想只使用关键部分来实现一个。锁不必是进程安全的,只有线程安全的。关于如何解决这个问题的任何想法?
我认为如果不使用至少一个内核级对象(Mutex 或 Semaphore)就无法做到这一点,因为您需要内核的帮助才能使调用进程阻塞,直到锁可用。
关键部分确实提供了阻塞,但 API 太有限了。例如,您无法获取 CS,发现读锁可用但没有写锁,然后等待其他进程完成读取(因为如果其他进程具有临界区,它将阻塞其他错误的读取器,如果它不会那么你的进程不会阻塞,而是旋转,燃烧 CPU 周期。)
但是,您可以做的是使用自旋锁并在出现争用时回退到互斥锁。临界区本身就是以这种方式实现的。我将采用现有的关键部分实现,并用单独的读写器计数替换 PID 字段。
老问题,但这是应该工作的。它不依赖于争论。如果读者很少或没有争用,他们会产生有限的额外成本,因为SetEvent
被称为惰性(查看编辑历史以获取没有此优化的更重量级版本)。
#include <windows.h>
typedef struct _RW_LOCK {
CRITICAL_SECTION countsLock;
CRITICAL_SECTION writerLock;
HANDLE noReaders;
int readerCount;
BOOL waitingWriter;
} RW_LOCK, *PRW_LOCK;
void rwlock_init(PRW_LOCK rwlock)
{
InitializeCriticalSection(&rwlock->writerLock);
InitializeCriticalSection(&rwlock->countsLock);
/*
* Could use a semaphore as well. There can only be one waiter ever,
* so I'm showing an auto-reset event here.
*/
rwlock->noReaders = CreateEvent (NULL, FALSE, FALSE, NULL);
}
void rwlock_rdlock(PRW_LOCK rwlock)
{
/*
* We need to lock the writerLock too, otherwise a writer could
* do the whole of rwlock_wrlock after the readerCount changed
* from 0 to 1, but before the event was reset.
*/
EnterCriticalSection(&rwlock->writerLock);
EnterCriticalSection(&rwlock->countsLock);
++rwlock->readerCount;
LeaveCriticalSection(&rwlock->countsLock);
LeaveCriticalSection(&rwlock->writerLock);
}
int rwlock_wrlock(PRW_LOCK rwlock)
{
EnterCriticalSection(&rwlock->writerLock);
/*
* readerCount cannot become non-zero within the writerLock CS,
* but it can become zero...
*/
if (rwlock->readerCount > 0) {
EnterCriticalSection(&rwlock->countsLock);
/* ... so test it again. */
if (rwlock->readerCount > 0) {
rwlock->waitingWriter = TRUE;
LeaveCriticalSection(&rwlock->countsLock);
WaitForSingleObject(rwlock->noReaders, INFINITE);
} else {
/* How lucky, no need to wait. */
LeaveCriticalSection(&rwlock->countsLock);
}
}
/* writerLock remains locked. */
}
void rwlock_rdunlock(PRW_LOCK rwlock)
{
EnterCriticalSection(&rwlock->countsLock);
assert (rwlock->readerCount > 0);
if (--rwlock->readerCount == 0) {
if (rwlock->waitingWriter) {
/*
* Clear waitingWriter here to avoid taking countsLock
* again in wrlock.
*/
rwlock->waitingWriter = FALSE;
SetEvent(rwlock->noReaders);
}
}
LeaveCriticalSection(&rwlock->countsLock);
}
void rwlock_wrunlock(PRW_LOCK rwlock)
{
LeaveCriticalSection(&rwlock->writerLock);
}
您可以通过使用单个来降低读者的成本CRITICAL_SECTION
:
countsLock
替换为writerLock
rdlock 和 rdunlock
rwlock->waitingWriter = FALSE
在 wrunlock 中删除
wrlock的身体变成了
EnterCriticalSection(&rwlock->writerLock);
rwlock->waitingWriter = TRUE;
while (rwlock->readerCount > 0) {
LeaveCriticalSection(&rwlock->writerLock);
WaitForSingleObject(rwlock->noReaders, INFINITE);
EnterCriticalSection(&rwlock->writerLock);
}
rwlock->waitingWriter = FALSE;
/* writerLock remains locked. */
然而,这不公平,所以我更喜欢上述解决方案。
看看“ Windows 上的并发编程”一书,它有很多不同的读写器锁参考示例。
查看英特尔线程构建块中的spin_rw_mutex ...
spin_rw_mutex
严格在用户领域并使用自旋等待进行阻塞
这是一个老问题,但也许有人会发现这很有用。我们为 Windows开发了一个高性能的开源RWLock
软件,如果可用,它会自动使用SRWLock
提到的 Vista+ Michael,否则会退回到用户空间实现。
作为额外的奖励,它有四种不同的“口味”(尽管您可以坚持基本的,这也是最快的),每种都提供更多的同步选项。它从RWLock()
不可重入的基础开始,仅限于单进程同步,并且不会将读/写锁交换为具有重入支持和读/写降级的成熟的跨进程 IPC RWLock。
如前所述,它们会在可能的情况下动态切换到 Vista+ slim 读写锁以获得最佳性能,但您根本不必担心这一点,因为它会退回到 Windows XP 及其上的完全兼容实现。同类。
如果您已经知道仅使用互斥锁的解决方案,您应该能够将其修改为使用临界区。
我们使用两个关键部分和一些计数器滚动我们自己的。它符合我们的需求——我们的作者数量非常少,作者优先于读者,等等。我不能自由发表我们的文章,但可以说没有互斥锁和信号量是可能的。
这是我能想到的最小的解决方案:
http://www.baboonz.org/rwlock.php
并逐字粘贴:
/** A simple Reader/Writer Lock.
This RWL has no events - we rely solely on spinlocks and sleep() to yield control to other threads.
I don't know what the exact penalty is for using sleep vs events, but at least when there is no contention, we are basically
as fast as a critical section. This code is written for Windows, but it should be trivial to find the appropriate
equivalents on another OS.
**/
class TinyReaderWriterLock
{
public:
volatile uint32 Main;
static const uint32 WriteDesireBit = 0x80000000;
void Noop( uint32 tick )
{
if ( ((tick + 1) & 0xfff) == 0 ) // Sleep after 4k cycles. Crude, but usually better than spinning indefinitely.
Sleep(0);
}
TinyReaderWriterLock() { Main = 0; }
~TinyReaderWriterLock() { ASSERT( Main == 0 ); }
void EnterRead()
{
for ( uint32 tick = 0 ;; tick++ )
{
uint32 oldVal = Main;
if ( (oldVal & WriteDesireBit) == 0 )
{
if ( InterlockedCompareExchange( (LONG*) &Main, oldVal + 1, oldVal ) == oldVal )
break;
}
Noop(tick);
}
}
void EnterWrite()
{
for ( uint32 tick = 0 ;; tick++ )
{
if ( (tick & 0xfff) == 0 ) // Set the write-desire bit every 4k cycles (including cycle 0).
_InterlockedOr( (LONG*) &Main, WriteDesireBit );
uint32 oldVal = Main;
if ( oldVal == WriteDesireBit )
{
if ( InterlockedCompareExchange( (LONG*) &Main, -1, WriteDesireBit ) == WriteDesireBit )
break;
}
Noop(tick);
}
}
void LeaveRead()
{
ASSERT( Main != -1 );
InterlockedDecrement( (LONG*) &Main );
}
void LeaveWrite()
{
ASSERT( Main == -1 );
InterlockedIncrement( (LONG*) &Main );
}
};
我只使用关键部分编写了以下代码。
class ReadWriteLock {
volatile LONG writelockcount;
volatile LONG readlockcount;
CRITICAL_SECTION cs;
public:
ReadWriteLock() {
InitializeCriticalSection(&cs);
writelockcount = 0;
readlockcount = 0;
}
~ReadWriteLock() {
DeleteCriticalSection(&cs);
}
void AcquireReaderLock() {
retry:
while (writelockcount) {
Sleep(0);
}
EnterCriticalSection(&cs);
if (!writelockcount) {
readlockcount++;
}
else {
LeaveCriticalSection(&cs);
goto retry;
}
LeaveCriticalSection(&cs);
}
void ReleaseReaderLock() {
EnterCriticalSection(&cs);
readlockcount--;
LeaveCriticalSection(&cs);
}
void AcquireWriterLock() {
retry:
while (writelockcount||readlockcount) {
Sleep(0);
}
EnterCriticalSection(&cs);
if (!writelockcount&&!readlockcount) {
writelockcount++;
}
else {
LeaveCriticalSection(&cs);
goto retry;
}
LeaveCriticalSection(&cs);
}
void ReleaseWriterLock() {
EnterCriticalSection(&cs);
writelockcount--;
LeaveCriticalSection(&cs);
}
};
要执行旋转等待,请使用 Sleep(0) 注释这些行。
在这里查看我的实现:
https://github.com/coolsoftware/LockLib
VRWLock 是一个 C++ 类,它实现了单写 - 多读逻辑。
还可以查看测试项目 TestLock.sln。
UPD。下面是读写器的简单代码:
LONG gCounter = 0;
// reader
for (;;) //loop
{
LONG n = InterlockedIncrement(&gCounter);
// n = value of gCounter after increment
if (n <= MAX_READERS) break; // writer does not write anything - we can read
InterlockedDecrement(&gCounter);
}
// read data here
InterlockedDecrement(&gCounter); // release reader
// writer
for (;;) //loop
{
LONG n = InterlockedCompareExchange(&gCounter, (MAX_READERS+1), 0);
// n = value of gCounter before attempt to replace it by MAX_READERS+1 in InterlockedCompareExchange
// if gCounter was 0 - no readers/writers and in gCounter will be MAX_READERS+1
// if gCounter was not 0 - gCounter stays unchanged
if (n == 0) break;
}
// write data here
InterlockedExchangeAdd(&gCounter, -(MAX_READERS+1)); // release writer
VRWLock 类支持自旋计数和线程特定的引用计数,允许释放终止线程的锁。