1

我正在尝试编写一个装饰器,它采用一个与 mongodb 交互的函数,如果发生异常,它会重试交互。我有以下代码:

def handle_failover(f):
    def wrapper(*args):
        for i in range(40):
            try:
                yield f(*args)
                break
            except pymongo.errors.AutoReconnect:
                loop = IOLoop.instance()
                yield gen.Task(loop.add_timeout, time.time() + 0.25)
    return wrapper


class CreateHandler(DatabaseHandler):
    @handle_failover
    def create_counter(self, collection):
        object_id = yield self.db[collection].insert({'n': 0})
        return object_id

    @gen.coroutine
    def post(self, collection):
        object_id = yield self.create_counter(collection)
        self.finish({'id': object_id})

但这不起作用。它给出了一个错误,即 create_counter 产生了一个生成器。我已经尝试制作所有功能 @gen.coroutines 并没有帮助。

如何使 handle_failover 装饰器工作?

编辑:暂时没有装饰器。这应该可靠地创建一个计数器并将 object_id 返回给用户。如果引发异常,则会显示 500 页。

class CreateHandler(DatabaseHandler):
    @gen.coroutine
    def create_counter(self, collection, data):
        for i in range(FAILOVER_TRIES):
            try:
                yield self.db[collection].insert(data)
                break
            except pymongo.errors.AutoReconnect:
                loop = IOLoop.instance()
                yield gen.Task(loop.add_timeout, time.time() + FAILOVER_SLEEP)
            except pymongo.errors.DuplicateKeyError:
                break
        else:
            raise Exception("Can't create new counter.")

    @gen.coroutine
    def post(self, collection):
        object_id = bson.objectid.ObjectId()
        data = {
            '_id': object_id,
            'n': 0
        }
        yield self.create_counter(collection, data)
        self.set_status(201)
        self.set_header('Location', '/%s/%s' % (collection, str(object_id)))
        self.finish({})

虽然我仍然不知道如何使计数器幂等的增量,因为 DuplicateKeyError 的技巧在这里不适用:

class CounterHandler(CounterIDHandler):
    def increment(self, collection, object_id, n):
        result = yield self.db[collection].update({'_id': object_id}, {'$inc': {'n': int(n)}})
        return result

    @gen.coroutine
    def post(self, collection, counter_id, n):
        object_id = self.get_object_id(counter_id)
        if not n or not int(n):
            n = 1
        result = yield self.increment(collection, object_id, n)
        self.finish({'resp': result['updatedExisting']})
4

1 回答 1

3

你很可能不想这样做。向用户显示错误比重试操作要好。

盲目地重试任何引发 AutoReconnect 的插入是一个坏主意,因为您不知道 MongoDB 在失去连接之前是否执行了插入。在这种情况下,您不知道最终是否会得到一个或两个带有{'n': 0}. 因此,您应该确保以这种方式重试的任何操作都是幂等的。有关详细信息,请参阅我的“拯救猴子”文章

如果您确实想制作这样的包装器,则需要确保fwrapper都是协程。此外,如果f抛出错误 40 次,您必须重新引发最终错误。如果f成功,您必须返回其返回值:

def handle_failover(f):
    @gen.coroutine
    def wrapper(*args):
        retries = 40
        i = 0
        while True:
            try:
                ret = yield gen.coroutine(f)(*args)
                raise gen.Return(ret)
            except pymongo.errors.AutoReconnect:
                if i < retries:
                    i += 1
                    loop = IOLoop.instance()
                    yield gen.Task(loop.add_timeout, time.time() + 0.25)
                else:
                    raise
    return wrapper

但仅对幂等操作执行此操作!

于 2014-04-19T21:35:59.800 回答