我需要一个 Lock 对象,类似于multiprocessing.Manager().Lock()
它只允许从实际获取它的进程中释放。
我的手动实现将类似于以下内容:
class KeyLock:
def __init__(self):
self._lock = Lock()
self._key: Optional[str] = None
def acquire(self, key: 'str', blocking: bool = True, timeout: float = 10.0) -> bool:
if self._lock.acquire(blocking=blocking, timeout=timeout):
self._key = key
return True
return False
def release(self, key, raise_error: bool = False) -> bool:
if self._key == key:
self._lock.release()
return True
if raise_error:
raise RuntimeError(
'KeyLock.released called with a non matchin key!'
)
return False
def locked(self):
return self._lock.locked()
要创建此锁的实例并从多个进程中使用它,我将使用自定义管理器类:
class KeyLockManager(BaseManager):
pass
KeyLockManager.register('KeyLock', KeyLock)
manager = KeyLockManager()
manager.start()
lock = manager.KeyLock()
然后我可以从不同的过程中执行以下操作:
lock.acquire(os.getpid())
# use shared ressource
...
lock.release(os.getpid())
这按预期工作,但对于一个相对简单的任务来说似乎是一项相当大的努力。 所以我想知道是否有更简单的方法来做到这一点?