假设我有一个包含键值的字典,其中的值是一个带锁的对象。每个进程都需要获取锁来修改具体的对象,但是字典中的键是稳定的。
现在,正如我之前所说,如果要为新对象添加新键(不经常),有没有办法使用锁,当我将键添加到字典时,将排除其他进程访问对象?
这样做的原因是,当字典不太可能对相同的对象进行操作时,我看不出将字典设为所有进程专有的意义,因此对象是唯一需要等待的对象。当我修改字典的完整性时,我想阻止每个进程访问它,但虽然不是,但我希望它们能够并行访问它。
请问我是否不清楚。
假设我有一个包含键值的字典,其中的值是一个带锁的对象。每个进程都需要获取锁来修改具体的对象,但是字典中的键是稳定的。
现在,正如我之前所说,如果要为新对象添加新键(不经常),有没有办法使用锁,当我将键添加到字典时,将排除其他进程访问对象?
这样做的原因是,当字典不太可能对相同的对象进行操作时,我看不出将字典设为所有进程专有的意义,因此对象是唯一需要等待的对象。当我修改字典的完整性时,我想阻止每个进程访问它,但虽然不是,但我希望它们能够并行访问它。
请问我是否不清楚。
只是我为自己使用而推出的另一个 rwlock。虽然我没有在严重围困的生产代码中使用它,但它适用于我的用例,并且它是通过镜像在 linux 服务器中使用/测试了大约一年的 C++ 类的代码而编写的。它没有从读到写的升级方法......
import threading
class RWLock:
""" Non-reentrant write-preferring rwlock. """
DEBUG = 0
def __init__(self):
self.lock = threading.Lock()
self.active_writer_lock = threading.Lock()
# The total number of writers including the active writer and
# those blocking on active_writer_lock or readers_finished_cond.
self.writer_count = 0
# Number of events that are blocking on writers_finished_cond.
self.waiting_reader_count = 0
# Number of events currently using the resource.
self.active_reader_count = 0
self.readers_finished_cond = threading.Condition(self.lock)
self.writers_finished_cond = threading.Condition(self.lock)
class _ReadAccess:
def __init__(self, rwlock):
self.rwlock = rwlock
def __enter__(self):
self.rwlock.acquire_read()
return self.rwlock
def __exit__(self, type, value, tb):
self.rwlock.release_read()
# support for the with statement
self.read_access = _ReadAccess(self)
class _WriteAccess:
def __init__(self, rwlock):
self.rwlock = rwlock
def __enter__(self):
self.rwlock.acquire_write()
return self.rwlock
def __exit__(self, type, value, tb):
self.rwlock.release_write()
# support for the with statement
self.write_access = _WriteAccess(self)
if self.DEBUG:
self.active_readers = set()
self.active_writer = None
def acquire_read(self):
with self.lock:
if self.DEBUG:
me = threading.currentThread()
assert me not in self.active_readers, 'This thread has already acquired read access and this lock isn\'t reader-reentrant!'
assert me != self.active_writer, 'This thread already has write access, release that before acquiring read access!'
self.active_readers.add(me)
if self.writer_count:
self.waiting_reader_count += 1
self.writers_finished_cond.wait()
# Even if the last writer thread notifies us it can happen that a new
# incoming writer thread acquires the lock earlier than this reader
# thread so we test for the writer_count after each wait()...
# We also protect ourselves from spurious wakeups that happen with some POSIX libraries.
while self.writer_count:
self.writers_finished_cond.wait()
self.waiting_reader_count -= 1
self.active_reader_count += 1
def release_read(self):
with self.lock:
if self.DEBUG:
me = threading.currentThread()
assert me in self.active_readers, 'Trying to release read access when it hasn\'t been acquired by this thread!'
self.active_readers.remove(me)
assert self.active_reader_count > 0
self.active_reader_count -= 1
if not self.active_reader_count and self.writer_count:
self.readers_finished_cond.notifyAll()
def acquire_write(self):
with self.lock:
if self.DEBUG:
me = threading.currentThread()
assert me not in self.active_readers, 'This thread already has read access - release that before acquiring write access!'
assert me != self.active_writer, 'This thread already has write access and this lock isn\'t writer-reentrant!'
self.writer_count += 1
if self.active_reader_count:
self.readers_finished_cond.wait()
while self.active_reader_count:
self.readers_finished_cond.wait()
self.active_writer_lock.acquire()
if self.DEBUG:
self.active_writer = me
def release_write(self):
if not self.DEBUG:
self.active_writer_lock.release()
with self.lock:
if self.DEBUG:
me = threading.currentThread()
assert me == self.active_writer, 'Trying to release write access when it hasn\'t been acquired by this thread!'
self.active_writer = None
self.active_writer_lock.release()
assert self.writer_count > 0
self.writer_count -= 1
if not self.writer_count and self.waiting_reader_count:
self.writers_finished_cond.notifyAll()
def get_state(self):
with self.lock:
return (self.writer_count, self.waiting_reader_count, self.active_reader_count)
if __name__ == '__main__':
import time, sys
lock = RWLock()
start_time = time.time()
print_lock = threading.Lock()
def p(msg):
with print_lock:
print '%5.2f [%2s] %-15s' % (time.time()-start_time, threading.currentThread().myid, msg)
def p_state(msg):
with print_lock:
print '%5.2f [%2s] %-15s writer_count=%s waiting_reader_count=%s active_reader_count=%s' % \
((time.time()-start_time, threading.currentThread().myid, msg) + lock.get_state())
def w():
p('write wait...')
with lock.write_access:
p_state('write started.')
time.sleep(threading.currentThread().mytimeout)
p_state('write ended.')
def r():
p('read wait...')
with lock.read_access:
p_state('read started.')
time.sleep(threading.currentThread().mytimeout)
p_state('read ended.')
def start_thread(id, func, timeout):
thread = threading.Thread(target=func)
thread.myid = id
thread.mytimeout = timeout
thread.start()
return thread
TEST_LOCKS = [
# (id, start_time, duration, r/w)
# Testing the branches of acquire_read() and release_read()
(1, 0, 1, r),
(2, 0.1, 0.5, r),
(-1, 2, 0, 0),
(3, 2, 0.5, w),
(4, 2.1, 0.5, w),
(5, 2.1, 1, r),
(6, 2.1, 1, r),
(7, 2.2, 0.1, w),
(-1, 5, 0, 0),
(8, 5, 0.5, r),
(9, 5.1, 0.5, w),
(10, 5.1, 0.5, w),
# Testing the branches of acquire_write() and release_write()
(-1, 8, 0, 0),
(11, 8, 1, w),
(12, 8.1, 0.5, w),
(-1, 10, 0, 0),
(13, 10, 0.5, r),
(14, 10.1, 0.5, w),
(15, 10.1, 0.5, r),
(16, 10.2, 0.5, r),
(17, 10.3, 0.5, w),
]
threading.currentThread().myid = 0
t = 0
for id, start, duration, rw in sorted(TEST_LOCKS, key=lambda x:x[1]):
time.sleep(start - t)
t = start
if id < 0:
p('-----------------------------')
else:
start_thread(id, rw, duration)
ReadWriteLock 的另一种实现,负责解决写入器饥饿问题,并支持将读取锁提升为写入锁(如果在构建期间请求)。它只使用一个锁和一个条件。
# From O'Reilly Python Cookbook by David Ascher, Alex Martelli
# With changes to cover the starvation situation where a continuous
# stream of readers may starve a writer, Lock Promotion and Context Managers
class ReadWriteLock:
""" A lock object that allows many simultaneous "read locks", but
only one "write lock." """
def __init__(self, withPromotion=False):
self._read_ready = threading.Condition(threading.RLock( ))
self._readers = 0
self._writers = 0
self._promote = withPromotion
self._readerList = [] # List of Reader thread IDs
self._writerList = [] # List of Writer thread IDs
def acquire_read(self):
logging.debug("RWL : acquire_read()")
""" Acquire a read lock. Blocks only if a thread has
acquired the write lock. """
self._read_ready.acquire( )
try:
while self._writers > 0:
self._read_ready.wait()
self._readers += 1
finally:
self._readerList.append(threading.get_ident())
self._read_ready.release( )
def release_read(self):
logging.debug("RWL : release_read()")
""" Release a read lock. """
self._read_ready.acquire( )
try:
self._readers -= 1
if not self._readers:
self._read_ready.notifyAll( )
finally:
self._readerList.remove(threading.get_ident())
self._read_ready.release( )
def acquire_write(self):
logging.debug("RWL : acquire_write()")
""" Acquire a write lock. Blocks until there are no
acquired read or write locks. """
self._read_ready.acquire( ) # A re-entrant lock lets a thread re-acquire the lock
self._writers += 1
self._writerList.append(threading.get_ident())
while self._readers > 0:
# promote to write lock, only if all the readers are trying to promote to writer
# If there are other reader threads, then wait till they complete reading
if self._promote and threading.get_ident() in self._readerList and set(self._readerList).issubset(set(self._writerList)):
break
else:
self._read_ready.wait( )
def release_write(self):
logging.debug("RWL : release_write()")
""" Release a write lock. """
self._writers -= 1
self._writerList.remove(threading.get_ident())
self._read_ready.notifyAll( )
self._read_ready.release( )
#----------------------------------------------------------------------------------------------------------
class ReadRWLock:
# Context Manager class for ReadWriteLock
def __init__(self, rwLock):
self.rwLock = rwLock
def __enter__(self):
self.rwLock.acquire_read()
return self # Not mandatory, but returning to be safe
def __exit__(self, exc_type, exc_value, traceback):
self.rwLock.release_read()
return False # Raise the exception, if exited due to an exception
#----------------------------------------------------------------------------------------------------------
class WriteRWLock:
# Context Manager class for ReadWriteLock
def __init__(self, rwLock):
self.rwLock = rwLock
def __enter__(self):
self.rwLock.acquire_write()
return self # Not mandatory, but returning to be safe
def __exit__(self, exc_type, exc_value, traceback):
self.rwLock.release_write()
return False # Raise the exception, if exited due to an exception
#----------------------------------------------------------------------------------------------------------
如果你使用 django,你可能想要使用这个读写锁:django.utils.synch.RWLock
供参考:pygolang提供了sync.RWMutex ( 2 , 3 ),它是读写器互斥锁,写入器优先。
字典必须在相应的 rwmutex 写锁定下更新,而对字典的读访问必须在读锁定下完成。