2

当然,应用引擎数据存储有停机时间。但是,我想要一个“故障安全”放置,它在面对数据存储错误时更加健壮(参见下面的动机)。当数据存储不可用时,任务队列似乎是一个明显的延迟写入的地方。不过,我不知道任何其他解决方案(除了通过 urlfetch 将数据发送给第三方)。

动机:我有一个真正需要放入数据存储区的实体 - 仅向用户显示错误消息是行不通的。例如,可能发生了一些无法轻易撤消的副作用(可能是与第三方站点的一些交互)。

我想出了一个简单的包装器(我认为)它提供了一个合理的“故障安全”放置(见下文)。您对此是否有任何问题,或者有一个更强大的实现的想法?(注意:感谢 Nick Johnson 和 Saxon Druce 在答案中发布的建议,这篇文章已经过编辑,对代码进行了一些改进。)

import logging
from google.appengine.api.labs.taskqueue import taskqueue
from google.appengine.datastore import entity_pb
from google.appengine.ext import db
from google.appengine.runtime.apiproxy_errors import CapabilityDisabledError

def put_failsafe(e, db_put_deadline=20, retry_countdown=60, queue_name='default'):
    """Tries to e.put().  On success, 1 is returned.  If this raises a db.Error
    or CapabilityDisabledError, then a task will be enqueued to try to put the
    entity (the task will execute after retry_countdown seconds) and 2 will be
    returned.  If the task cannot be enqueued, then 0 will be returned.  Thus a
    falsey value is only returned on complete failure.

    Note that since the taskqueue payloads are limited to 10kB, if the protobuf
    representing e is larger than 10kB then the put will be unable to be
    deferred to the taskqueue.

    If a put is deferred to the taskqueue, then it won't necessarily be
    completed as soon as the datastore is back up.  Thus it is possible that
    e.put() will occur *after* other, later puts when 1 is returned.

    Ensure e's model is imported in the code which defines the task which tries
    to re-put e (so that e can be deserialized).
    """
    try:
        e.put(rpc=db.create_rpc(deadline=db_put_deadline))
        return 1
    except (db.Error, CapabilityDisabledError), ex1:
        try:
            taskqueue.add(queue_name=queue_name,
                          countdown=retry_countdown,
                          url='/task/retry_put',
                          payload=db.model_to_protobuf(e).Encode())
            logging.info('failed to put to db now, but deferred put to the taskqueue e=%s ex=%s' % (e, ex1))
            return 2
        except (taskqueue.Error, CapabilityDisabledError), ex2:
            return 0

任务的请求处理程序:

from google.appengine.ext import db, webapp

# IMPORTANT: This task deserializes entity protobufs.  To ensure that this is
#            successful, you must import any db.Model that may need to be
#            deserialized here (otherwise this task may raise a KindError).

class RetryPut(webapp.RequestHandler):
    def post(self):
        e = db.model_from_protobuf(entity_pb.EntityProto(self.request.body))
        e.put() # failure will raise an exception => the task to be retried

希望每次put 都使用它——大多数时候,显示错误消息就可以了。每次 put 都使用它很诱人,但我认为有时如果我告诉他们他们的更改将在稍后出现(并继续向他们显示旧数据,直到数据存储区备份延迟执行)。

4

2 回答 2

2

您的方法是合理的,但有几个警告:

  • 默认情况下,put 操作将重试,直到时间用完。由于您有备份策略,您可能希望尽快放弃 - 在这种情况下,您应该为 put 方法调用提供一个 rpc 参数,指定自定义截止日期。
  • 无需设置明确的倒计时 - 任务队列会以越来越长的间隔为您重试失败的操作。
  • 您不需要使用 pickle - Protocol Buffers 具有更有效的自然字符串编码。有关如何使用它的演示,请参阅这篇文章。
  • 正如 Saxon 指出的那样,任务队列有效负载限制为 10 KB,因此您可能会遇到大型实体的问题。
  • 最重要的是,这将数据存储一致性模型从“强一致”更改为“最终一致”。也就是说,您排入任务队列的 put 可以在将来的任何时间应用,覆盖在此期间所做的任何更改。任何数量的竞争条件都是可能的,如果任务队列上有待处理的放置,则基本上会使事务无用。
于 2010-09-28T10:09:16.117 回答
1

一个潜在的问题是任务限制为 10kb 的数据,因此如果您的实体大于曾经腌制的实体,这将不起作用。

于 2010-09-28T06:01:55.350 回答