13

我的应用程序遇到了瓶颈,并且很难找到解决方案。一点背景:

  • 我的应用程序 ping API 以收集有关数十万个项目的信息并将它们存储到数据存储区
  • 我们需要对这些项目的混合维度执行简单的聚合,我们在存储项目期间尝试和计算

当前实施:

  • 我们会根据需要手动启动这些项目的下载,这会在专门用于下载这些项目的后端创建任务。每个任务将根据分页和获取每个项目所需的 API 调用数启动更多任务。
  • 每个任务都将下载、解析和批量存储项目,同时通过使用字典将我们想要的聚合保存在内存中。
  • 在每个任务执行结束时,我们将聚合字典写入拉取队列。
  • 一旦我们检测到我们即将结束 API 调用,我们就会启动一个聚合任务到第二个后端配置
  • 这个“聚合任务”从拉队列中拉出(一次 20 个),并在尝试存储每个聚合之前合并每个任务中找到的字典(进一步在内存聚合中进行)。该任务还将启动其他任务以对拉取队列中的剩余任务(数百个)执行聚合
  • 我们使用分片计数器方法来帮助缓解存储到数据存储时的任何争用
  • 每个聚合任务可以尝试存储 500-1500 个聚合,它们应该都是相互独立的

那里有额外的检查等,以确保正确处理所有拉队列任务并下载所有项目。

问题:

我们希望尽快下载和存储所有项目和聚合。我为每个描述的后端配置启用了 20 个实例(我将它们称为“聚合器”后端和“下载器”后端)。下载器后端似乎相当快地通过 API 调用。我大量使用 NDB 库和异步 URL Fetches/Datastore 调用来获得它。我还启用了 threadsafe:true 以便在开始下一个任务之前没有实例会等待 RPC 调用完成(所有任务都可以相互独立地操作并且是幂等的)。

聚合器后端是重要的时间汇发挥作用的地方。通过事务异步存储 500-1500 个这些聚合需要 40 秒或更长时间(我什至不认为所有事务都被正确提交)。我使用 threadsafe:false 保留此后端,因为我使用 300 秒的拉取队列到期期限,但如果我允许在单个实例上执行多个任务,它们可能会级联并在 300 秒内完成一些任务标记,从而允许另一个任务第二次拉同一个任务,并可能重复计算。

日志显示BadRequestError: Nested transactions are not supported.先前的错误(在堆栈跟踪中)为TransactionFailedError: too much contention on these datastore entities. please try again.. 我经常看到的另一个错误是BadRequestError(The referenced transaction has expired or is no longer valid.)

据我了解,有时这些错误意味着事务仍然可以在没有进一步交互的情况下提交。我怎么知道这是否已正确提交?我是以一种合乎逻辑/有效的方式来做这件事,还是有更多的并发空间而不会有把一切搞砸的风险?

相关代码:

class GeneralShardConfig(ndb.Model):
    """Tracks the number of shards for each named counter."""
    name = ndb.StringProperty(required=True)
    num_shards = ndb.IntegerProperty(default=4)

class GeneralAggregateShard(ndb.Model):
    """Shards for each named counter"""
    name = ndb.StringProperty(name='n', required=True)
    count = ndb.FloatProperty(name='c', default=0.00) #acts as a total now

@ndb.tasklet
def increment_batch(data_set):
    def run_txn(name, value):
        @ndb.tasklet
        def txn():
            to_put = []
            dbkey = ndb.Key(GeneralShardConfig, name)
            config = yield dbkey.get_async(use_memcache=False)
            if not config:
                config = GeneralShardConfig(key=dbkey,name=name)
                to_put.append(config)
            index = random.randint(0, config.num_shards-1)
            shard_name =  name + str(index)
            dbkey = ndb.Key(GeneralAggregateShard, shard_name)
            counter = yield dbkey.get_async()
            if not counter:
                counter = GeneralAggregateShard(key=dbkey, name=name)
            counter.count += value
            to_put.append(counter)
            yield ndb.put_multi_async(to_put)
        return ndb.transaction_async(txn, use_memcache=False, xg=True)
    res = yield[run_txn(key, value) for key, value in data_set.iteritems() if value != 0.00]
    raise ndb.Return(res)

鉴于实现,我看到的“争用”的唯一空间是 2 个或更多聚合任务是否需要更新相同的聚合名称,这不应该太频繁地发生,并且对于分片计数器,我希望这种重叠很少,如果有的话, 发生。我假设 BadRequestError(The referenced transaction has expired or is no longer valid.)当事件循环正在检查所有小任务的状态并点击对已完成事务的引用时会出现错误。这里的问题是它出错了,这是否意味着所有交易都被过早地切断了,还是我可以假设所有交易都通过了?我进一步假设这条线res = yield[run_txn(key, value) for key, value in data_set.iteritems() if value != 0.00]需要被分解成一个 try/except 以检测这些错误。

在我为此发疯之前,我将不胜感激有关如何优化此过程并以可靠方式进行的任何指导/帮助。

编辑1: 我修改了聚合器任务行为如下:

  • 如果从队列中租用了多个任务,则将这些任务聚合到内存中,然后将结果存储到拉入队列中的另一个任务中,并立即启动另一个“聚合器任务”
  • 否则,如果租用了 1 个任务,请尝试保存结果

这有助于减少我所看到的争用错误,但它仍然不是很可靠。最近,我遇到BadRequestError: Nested transactions are not supported.了一个堆栈跟踪,表明RuntimeError: Deadlock waiting for <Future fbf0db50 created by transaction_async(model.py:3345) for tasklet transaction(context.py:806) suspended generator transaction(context.py:876); pending>

我相信这种修改应该通过允许聚合过程中所有可能的重叠在单个实例中组合和尝试所有可能的重叠来优化过程,而不是多个实例都执行可能发生冲突的事务。我仍然无法以可靠的方式保存结果。

4

2 回答 2

5

通过减少数据存储 I/O(将工作留给自动批处理程序并禁用索引),您可以更加确定数据存储写入完成(更少争用)并且应该更快。

配置(重命名的计数器)获取在事务之外,并且可以在遍历事务的同时并发运行。

方法和总属性被添加到 Counter 以(希望)使其更容易在将来修改。

为十进制支持创建了一个新的 ndb 属性(假设这就是您指定 0.00 而不是 0.0 的原因)。

编辑:

消除了对交易的需求并更改了分片系统以提高可靠性。

import webapp2

import copy
import decimal
import logging
import random
import string

from google.appengine.api import datastore_errors
from google.appengine.datastore import entity_pb
from google.appengine.ext import deferred
from google.appengine.ext import ndb


TEST_BATCH_SIZE = 250
TEST_NAME_LEN = 12


class DecimalProperty(ndb.Property):
    """A Property whose value is a decimal.Decimal object."""

    def _datastore_type(self, value):
      return str(value)

    def _validate(self, value):
      if not isinstance(value, decimal.Decimal):
        raise datastore_errors.BadValueError('Expected decimal.Decimal, got %r'
                                             % (value,))
      return value

    def _db_set_value(self, v, p, value):
        value = str(value)
        v.set_stringvalue(value)
        if not self._indexed:
            p.set_meaning(entity_pb.Property.TEXT)

    def _db_get_value(self, v, _):
        if not v.has_stringvalue():
            return None
        value = v.stringvalue()
        return decimal.Decimal(value)

class BatchInProgress(ndb.Model):
    """Use a scheduler to delete batches in progress after a certain time"""

    started = ndb.DateTimeProperty(auto_now=True)

    def clean_up(self):
        qry = Shard.query().filter(Shard.batch_key == self.key)
        keys = qry.fetch(keys_only=True)
        while keys:
            ndb.delete_multi(keys)
            keys = qry.fetch(keys_only=True)

def cleanup_failed_batch(batch_key):
    batch = batch_key.get()

    if batch:
        batch.clean_up()
        batch.delete()

class Shard(ndb.Model):
    """Shards for each named counter"""

    counter_key = ndb.KeyProperty(name='c')
    batch_key = ndb.KeyProperty(name='b')
    count = DecimalProperty(name='v', default=decimal.Decimal('0.00'),
                            indexed=False)

class Counter(ndb.Model):
    """Tracks the number of shards for each named counter"""

    @property
    def shards(self):
        qry = Shard.query().filter(Shard.counter_key == self.key)
        results = qry.fetch(use_cache=False, use_memcache=False)
        return filter(None, results)

    @property
    def total(self):
        count = decimal.Decimal('0.00') # Use initial value if no shards

        for shard in self.shards:
            count += shard.count

        return count

    @ndb.tasklet
    def incr_async(self, value, batch_key):
        index = batch_key.id()
        name = self.key.id() + str(index)

        shard = Shard(id=name, count=value,
                      counter_key=self.key, batch_key=batch_key)

        yield shard.put_async(use_cache=False, use_memcache=False)

    def incr(self, *args, **kwargs):
        return self.incr_async(*args, **kwargs).get_result()

@ndb.tasklet
def increment_batch(data_set):
    batch_key = yield BatchInProgress().put_async()
    deferred.defer(cleanup_failed_batch, batch_key, _countdown=3600)

    # NOTE: mapping is modified in place, hence copying
    mapping = copy.copy(data_set)

    # (1/3) filter and fire off counter gets
    #       so the futures can autobatch
    counters = {}
    ctr_futs = {}
    ctr_put_futs = []
    zero_values = set()
    for name, value in mapping.iteritems():
        if value != decimal.Decimal('0.00'):
            ctr_fut = Counter.get_by_id_async(name) # Use cache(s)
            ctr_futs[name] = ctr_fut
        else:
            # Skip zero values because...
            zero_values.add(name)
            continue

    for name in zero_values:
        del mapping[name] # Remove all zero values from the mapping
    del zero_values

    while mapping: # Repeat until all transactions succeed

        # (2/3) wait on counter gets and fire off increment transactions
        #       this way autobatchers should fill time
        incr_futs = {}
        for name, value in mapping.iteritems():
            counter = counters.get(name)
            if not counter:
                counter = counters[name] = yield ctr_futs.pop(name)
            if not counter:
                logging.info('Creating new counter %s', name)
                counter = counters[name] = Counter(id=name)
                ctr_put_futs.append(counter.put_async())
            else:
                logging.debug('Reusing counter %s', name)
            incr_fut = counter.incr_async(value, batch_key)
            incr_futs[(name, value)] = incr_fut

        # (3/3) wait on increments and handle errors
        #       by using a tuple key for variable access
        for (name, value), incr_fut in incr_futs.iteritems():
            counter = counters[name]
            try:
                yield incr_fut
            except:
                pass
            else:
                del mapping[name]

        if mapping:
            logging.warning('%i increments failed this batch.' % len(mapping))

    yield batch_key.delete_async(), ctr_put_futs

    raise ndb.Return(counters.values())

class ShardTestHandler(webapp2.RequestHandler):

    @ndb.synctasklet
    def get(self):
        if self.request.GET.get('delete'):
            ndb.delete_multi_async(Shard.query().fetch(keys_only=True))
            ndb.delete_multi_async(Counter.query().fetch(keys_only=True))
            ndb.delete_multi_async(BatchInProgress.query().fetch(keys_only=True))
        else:
            data_set_test = {}
            for _ in xrange(TEST_BATCH_SIZE):
                name = ''
                for _ in xrange(TEST_NAME_LEN):
                    name += random.choice(string.letters)
                value = decimal.Decimal('{0:.2f}'.format(random.random() * 100))
                data_set_test[name] = value
            yield increment_batch(data_set_test)
        self.response.out.write("Done!")

app = webapp2.WSGIApplication([('/shard_test/', ShardTestHandler)], debug=True)
app = ndb.toplevel(app.__call__)
于 2012-07-02T20:07:21.033 回答
5

特别是关于“引用的事务已过期或不再有效”BadRequestError 的主题,这是一个鲜为人知的事实,即事务将比请求更快地超时。从创建开始,您将免费获得 15 秒的生命,之后如果事务连续闲置 15 秒(因此有效的最小生命周期为 30 秒)将被杀死,并且在 60 秒后无论如何都会被硬杀死。这使得并行运行大量事务变得困难,因为 CPU 争用和不公平的 tasklet 调度算法可能会导致一些事务空闲时间过长。

ndb 的事务方法的以下猴子补丁通过重试过期事务来提供一些帮助,但最终您必须调整批处理以将争用减少到可管理的水平。

_ndb_context_transaction = ndb.Context.transaction

@ndb.tasklet
def _patched_transaction(self, callback, **ctx_options):
  if (self.in_transaction() and
      ctx_options.get('propagation') != ndb.TransactionOptions.INDEPENDENT):
    raise ndb.Return((yield _ndb_context_transaction(self, callback, **ctx_options)))

  attempts = 1
  start_time = time.time()
  me = random.getrandbits(16)
  logging.debug('Transaction started <%04x>', me)
  while True:
    try:
      result = yield _ndb_context_transaction(self, callback, **ctx_options)
    except datastore_errors.BadRequestError as e:
      if not ('expired' in str(e) and
              attempts < _MAX_BAD_REQUEST_RECOVERY_ATTEMPTS):
        raise
      logging.warning(
          'Transaction retrying <%04x> (attempt #%d, %.1f seconds) on BadRequestError: %s',
          me, attempts, time.time() - start_time, e)
      attempts += 1
    else:
      logging.debug(
          'Transaction finished <%04x> (attempt #%d, %.1f seconds)',
           me, attempts, time.time() - start_time)
      raise ndb.Return(result)

ndb.Context.transaction = _patched_transaction
于 2013-01-10T22:15:53.700 回答