34

我希望将数据库事务的逻辑封装到一个with块中;将代码包装在事务中并处理各种异常(锁定问题)。这很简单,但是我还想让块封装在某些异常之后重试代码块。我看不到将其整齐地打包到上下文管理器中的方法。

是否可以在with语句中重复代码?

我想这么简单地使用它,这真的很整洁。

def do_work():
    ...
    # This is ideal!
    with transaction(retries=3):
        # Atomic DB statements
        ...
    ...

我目前正在使用装饰器处理此问题,但我更愿意提供上下文管理器(或实际上两者都提供),因此我可以选择在with块中包装几行代码,而不是包装在装饰器中的内联函数,这就是我目前所做的:

def do_work():
    ...
    # This is not ideal!
    @transaction(retries=3)
    def _perform_in_transaction():
        # Atomic DB statements
        ...
    _perform_in_transaction()
    ...
4

5 回答 5

13

是否可以在with语句中重复代码?

不。

正如前面在邮件列表线程中指出的那样,您可以通过使装饰器调用传递的函数来减少一些重复:

def do_work():
    ...
    # This is not ideal!
    @transaction(retries=3)
    def _perform_in_transaction():
        # Atomic DB statements
        ...
    # called implicitly
    ...
于 2013-06-04T13:57:06.133 回答
6

我想到的方法只是实现一个标准的数据库事务上下文管理器,但允许它retries在构造函数中接受一个参数。然后我将把它包装在你的方法实现中。像这样的东西:

class transaction(object):
    def __init__(self, retries=0):
        self.retries = retries
    def __enter__(self):
        return self
    def __exit__(self, exc_type, exc_val, traceback):
        pass

    # Implementation...
    def execute(self, query):
        err = None
        for _ in range(self.retries):
            try:
                return self._cursor.execute(query)
            except Exception as e:
                err = e # probably ought to save all errors, but hey
        raise err

with transaction(retries=3) as cursor:
    cursor.execute('BLAH')
于 2013-06-04T14:00:49.950 回答
4

由于装饰器本身就是函数,您可以执行以下操作:

with transaction(_perform_in_transaction, retries=3) as _perf:
    _perf()

有关详细信息,您需要实现transaction()一个工厂方法,该方法返回一个带有__callable__()集合的对象以调用原始方法并retries在失败时重复它最多次数;__enter__()并将__exit__()被定义为数据库事务上下文管理器的正常值。

您也可以设置transaction()为它自己执行传递的方法最多retries多次,这可能需要与实现上下文管理器大致相同的工作量,但意味着实际使用量将减少到只是transaction(_perform_in_transaction, retries=3)(实际上,相当于 delnan 提供的装饰器示例)。

于 2013-06-04T13:52:47.853 回答
2

虽然我同意它不能用上下文管理器来完成......它可以用两个上下文管理器来完成!

结果有点尴尬,我不确定我是否批准我自己的代码,但这是客户端的样子:

with RetryManager(retries=3) as rm:
    while rm:
        with rm.protect:
            print("Attempt #%d of %d" % (rm.attempt_count, rm.max_retries))
             # Atomic DB statements

仍然有一个明确的while循环,而不是一个,而是两个with语句,这给我留下了太多错误的机会。

这是代码:

class RetryManager(object):
    """ Context manager that counts attempts to run statements without
        exceptions being raised.
        - returns True when there should be more attempts
    """

    class _RetryProtector(object):
        """ Context manager that only raises exceptions if its parent
            RetryManager has given up."""
        def __init__(self, retry_manager):
            self._retry_manager = retry_manager

        def __enter__(self):
            self._retry_manager._note_try()
            return self

        def __exit__(self, exc_type, exc_val, traceback):
            if exc_type is None:
                self._retry_manager._note_success()
            else:
                # This would be a good place to implement sleep between
                # retries.
                pass

            # Suppress exception if the retry manager is still alive.
            return self._retry_manager.is_still_trying()

    def __init__(self, retries=1):

        self.max_retries = retries
        self.attempt_count = 0 # Note: 1-based.
        self._success = False

        self.protect = RetryManager._RetryProtector(self)

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, traceback):
        pass

    def _note_try(self):
        self.attempt_count += 1

    def _note_success(self):
        self._success = True

    def is_still_trying(self):
        return not self._success and self.attempt_count < self.max_retries

    def __bool__(self):
        return self.is_still_trying()

奖励:我知道你不想将你的工作分成用装饰器包装的单独函数......但如果你对此感到满意,Mozilla 的重做包提供了装饰器来做到这一点,所以你不必滚动你自己的。甚至还有一个上下文管理器可以有效地充当函数的临时装饰器,但它仍然依赖于将可检索代码分解为单个函数。

于 2018-09-17T05:27:33.317 回答
0

这个问题已经有几年了,但在阅读了答案后,我决定试一试。

该解决方案需要使用“帮助器”类,但我认为它确实提供了一个接口,其中包含通过上下文管理器配置的重试。

class Client:
    def _request(self):
        # do request stuff
        print("tried")
        raise Exception()

    def request(self):
        retry = getattr(self, "_retry", None)
        if not retry:
            return self._request()
        else:
            for n in range(retry.tries):
                try:
                    return self._request()
                except Exception:
                    retry.attempts += 1


class Retry:
    def __init__(self, client, tries=1):
        self.client = client
        self.tries = tries
        self.attempts = 0

    def __enter__(self):
        self.client._retry = self

    def __exit__(self, *exc):
        print(f"Tried {self.attempts} times")
        del self.client._retry


>>> client = Client()
>>> with Retry(client, tries=3):
    ... # will try 3 times
    ... response = client.request()

tried once
tried once
tried once
Tried 3 times
于 2020-08-16T06:41:10.523 回答