虽然我同意它不能用上下文管理器来完成......它可以用两个上下文管理器来完成!
结果有点尴尬,我不确定我是否批准我自己的代码,但这是客户端的样子:
with RetryManager(retries=3) as rm:
while rm:
with rm.protect:
print("Attempt #%d of %d" % (rm.attempt_count, rm.max_retries))
# Atomic DB statements
仍然有一个明确的while
循环,而不是一个,而是两个with
语句,这给我留下了太多错误的机会。
这是代码:
class RetryManager(object):
""" Context manager that counts attempts to run statements without
exceptions being raised.
- returns True when there should be more attempts
"""
class _RetryProtector(object):
""" Context manager that only raises exceptions if its parent
RetryManager has given up."""
def __init__(self, retry_manager):
self._retry_manager = retry_manager
def __enter__(self):
self._retry_manager._note_try()
return self
def __exit__(self, exc_type, exc_val, traceback):
if exc_type is None:
self._retry_manager._note_success()
else:
# This would be a good place to implement sleep between
# retries.
pass
# Suppress exception if the retry manager is still alive.
return self._retry_manager.is_still_trying()
def __init__(self, retries=1):
self.max_retries = retries
self.attempt_count = 0 # Note: 1-based.
self._success = False
self.protect = RetryManager._RetryProtector(self)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, traceback):
pass
def _note_try(self):
self.attempt_count += 1
def _note_success(self):
self._success = True
def is_still_trying(self):
return not self._success and self.attempt_count < self.max_retries
def __bool__(self):
return self.is_still_trying()
奖励:我知道你不想将你的工作分成用装饰器包装的单独函数......但如果你对此感到满意,Mozilla 的重做包提供了装饰器来做到这一点,所以你不必滚动你自己的。甚至还有一个上下文管理器可以有效地充当函数的临时装饰器,但它仍然依赖于将可检索代码分解为单个函数。