0

我的问题与 Kazoo/Zookeeper 有关,但更一般地说,它是关于分配资源,然后让该资源的调用者不应该暂停。来自 Kazoo 文档:

强烈建议使用 add_listener() 添加状态侦听器,并注意 LOST 和 SUSPENDED 状态更改并适当地重新采取行动。如果发生 LOST 状态,则可以确定锁定和/或租约已丢失。

我正在使用上下文管理器来分配锁:

@contextmanager
def lock(self, path, identifier):
  lock = zk.Lock(path, identifier)
  lock.acquire()
  yield lock      <--- how to communicate connection loss from client?
  finally: 
    lock.release()

像这样使用:

with lock('/some_op/') as lck:
  # do something with lock, but pause in case of connection loss? 

如果连接丢失,您如何分配锁但撤销访问?

以下是 Kazoo 建议您实现连接状态侦听器的方式:

def my_listener(state):
    if state == KazooState.LOST:
        # Register somewhere that the session was lost
    elif state == KazooState.SUSPENDED:
        # Handle being disconnected from Zookeeper
    else:
        # Handle being connected/reconnected to Zookeeper
4

1 回答 1

1

这可能不是最好的设计,但您可以提供对with语句体的锁的引用:

@contextmanager
def acquire_lock(self, path, identifier):
  lock = zk.Lock(path, identifier)
  try: 
    retry = KazooRetry()
    retry(lock.acquire)
    yield lock
  finally: 
    lock.release()

with acquire_lock('/some_op/') as lock:
    ...
    if necessary:
        lock.release()
    ...

具体到您的问题(我不熟悉 Kazoo),您可以my_listener在上下文管理器中定义它可以关闭锁。

@contextmanager
def acquire_lock(self, path, identifier):
    lock = zk.Lock(path, identifier)
    def my_listener(state):
        if state == KazooState.LOST:
            lock.release()
        elif state == KazooState.SUSPENDED:
            ...

    try:
        retry = KazooRetry()
    # etc

您的上下文可能需要产生侦听器,或者注册它(或您对侦听器所做的任何事情),但无论您对它做什么,它都可以访问您在上下文管理器中创建的锁。

于 2019-05-29T19:18:20.193 回答