0

我需要一个 Lock 对象,类似于multiprocessing.Manager().Lock() 它只允许从实际获取它的进程中释放。

我的手动实现将类似于以下内容:

class KeyLock:
    def __init__(self):
        self._lock = Lock()
        self._key: Optional[str] = None

    def acquire(self, key: 'str', blocking: bool = True, timeout: float = 10.0) -> bool:
        if self._lock.acquire(blocking=blocking, timeout=timeout):
            self._key = key
            return True
        return False

    def release(self, key, raise_error: bool = False) -> bool:
        if self._key == key:
            self._lock.release()
            return True
        if raise_error:
            raise RuntimeError(
                'KeyLock.released called with a non matchin key!'
            )
        return False

    def locked(self):
        return self._lock.locked()

要创建此锁的实例并从多个进程中使用它,我将使用自定义管理器类:

class KeyLockManager(BaseManager):
    pass


KeyLockManager.register('KeyLock', KeyLock)

manager = KeyLockManager()
manager.start()
lock = manager.KeyLock()

然后我可以从不同的过程中执行以下操作:

lock.acquire(os.getpid())
# use shared ressource
...
lock.release(os.getpid())

这按预期工作,但对于一个相对简单的任务来说似乎是一项相当大的努力。 所以我想知道是否有更简单的方法来做到这一点?

4

1 回答 1

0

根据multiprocessing.RLock定义,它只能由获得它的进程释放。或者您可能会考虑类似以下的情况,其中Lock实例被封装并且仅用作上下文管理器,除非您已获取它,否则无法释放它,除非您违反封装。Lock当然,可以为类添加额外的保护,以保护违反封装并访问实例本身的尝试。当然,在您的实现中,也可能总是违反封装,因为可以获取其他进程的 pid。所以我们假设所有用户都遵守规则。

from multiprocessing import Lock

class KeyLock:
    def __init__(self):
        self.__lock = Lock()

    def __enter__(self):
        self.__lock.acquire()
        return None

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.__lock.release()
        return False

# Usage:

key_lock = KeyLock()
with key_lock:
    # do something
    ...
于 2021-08-28T18:34:30.487 回答