我的应用程序遇到了瓶颈,并且很难找到解决方案。一点背景:
- 我的应用程序 ping API 以收集有关数十万个项目的信息并将它们存储到数据存储区
- 我们需要对这些项目的混合维度执行简单的聚合,我们在存储项目期间尝试和计算
当前实施:
- 我们会根据需要手动启动这些项目的下载,这会在专门用于下载这些项目的后端创建任务。每个任务将根据分页和获取每个项目所需的 API 调用数启动更多任务。
- 每个任务都将下载、解析和批量存储项目,同时通过使用字典将我们想要的聚合保存在内存中。
- 在每个任务执行结束时,我们将聚合字典写入拉取队列。
- 一旦我们检测到我们即将结束 API 调用,我们就会启动一个聚合任务到第二个后端配置
- 这个“聚合任务”从拉队列中拉出(一次 20 个),并在尝试存储每个聚合之前合并每个任务中找到的字典(进一步在内存聚合中进行)。该任务还将启动其他任务以对拉取队列中的剩余任务(数百个)执行聚合
- 我们使用分片计数器方法来帮助缓解存储到数据存储时的任何争用
- 每个聚合任务可以尝试存储 500-1500 个聚合,它们应该都是相互独立的
那里有额外的检查等,以确保正确处理所有拉队列任务并下载所有项目。
问题:
我们希望尽快下载和存储所有项目和聚合。我为每个描述的后端配置启用了 20 个实例(我将它们称为“聚合器”后端和“下载器”后端)。下载器后端似乎相当快地通过 API 调用。我大量使用 NDB 库和异步 URL Fetches/Datastore 调用来获得它。我还启用了 threadsafe:true 以便在开始下一个任务之前没有实例会等待 RPC 调用完成(所有任务都可以相互独立地操作并且是幂等的)。
聚合器后端是重要的时间汇发挥作用的地方。通过事务异步存储 500-1500 个这些聚合需要 40 秒或更长时间(我什至不认为所有事务都被正确提交)。我使用 threadsafe:false 保留此后端,因为我使用 300 秒的拉取队列到期期限,但如果我允许在单个实例上执行多个任务,它们可能会级联并在 300 秒内完成一些任务标记,从而允许另一个任务第二次拉同一个任务,并可能重复计算。
日志显示BadRequestError: Nested transactions are not supported.
先前的错误(在堆栈跟踪中)为TransactionFailedError: too much contention on these datastore entities. please try again.
. 我经常看到的另一个错误是BadRequestError(The referenced transaction has expired or is no longer valid.)
据我了解,有时这些错误意味着事务仍然可以在没有进一步交互的情况下提交。我怎么知道这是否已正确提交?我是以一种合乎逻辑/有效的方式来做这件事,还是有更多的并发空间而不会有把一切搞砸的风险?
相关代码:
class GeneralShardConfig(ndb.Model):
"""Tracks the number of shards for each named counter."""
name = ndb.StringProperty(required=True)
num_shards = ndb.IntegerProperty(default=4)
class GeneralAggregateShard(ndb.Model):
"""Shards for each named counter"""
name = ndb.StringProperty(name='n', required=True)
count = ndb.FloatProperty(name='c', default=0.00) #acts as a total now
@ndb.tasklet
def increment_batch(data_set):
def run_txn(name, value):
@ndb.tasklet
def txn():
to_put = []
dbkey = ndb.Key(GeneralShardConfig, name)
config = yield dbkey.get_async(use_memcache=False)
if not config:
config = GeneralShardConfig(key=dbkey,name=name)
to_put.append(config)
index = random.randint(0, config.num_shards-1)
shard_name = name + str(index)
dbkey = ndb.Key(GeneralAggregateShard, shard_name)
counter = yield dbkey.get_async()
if not counter:
counter = GeneralAggregateShard(key=dbkey, name=name)
counter.count += value
to_put.append(counter)
yield ndb.put_multi_async(to_put)
return ndb.transaction_async(txn, use_memcache=False, xg=True)
res = yield[run_txn(key, value) for key, value in data_set.iteritems() if value != 0.00]
raise ndb.Return(res)
鉴于实现,我看到的“争用”的唯一空间是 2 个或更多聚合任务是否需要更新相同的聚合名称,这不应该太频繁地发生,并且对于分片计数器,我希望这种重叠很少,如果有的话, 发生。我假设
BadRequestError(The referenced transaction has expired or is no longer valid.)
当事件循环正在检查所有小任务的状态并点击对已完成事务的引用时会出现错误。这里的问题是它出错了,这是否意味着所有交易都被过早地切断了,还是我可以假设所有交易都通过了?我进一步假设这条线res = yield[run_txn(key, value) for key, value in data_set.iteritems() if value != 0.00]
需要被分解成一个 try/except 以检测这些错误。
在我为此发疯之前,我将不胜感激有关如何优化此过程并以可靠方式进行的任何指导/帮助。
编辑1: 我修改了聚合器任务行为如下:
- 如果从队列中租用了多个任务,则将这些任务聚合到内存中,然后将结果存储到拉入队列中的另一个任务中,并立即启动另一个“聚合器任务”
- 否则,如果租用了 1 个任务,请尝试保存结果
这有助于减少我所看到的争用错误,但它仍然不是很可靠。最近,我遇到BadRequestError: Nested transactions are not supported.
了一个堆栈跟踪,表明RuntimeError: Deadlock waiting for <Future fbf0db50 created by transaction_async(model.py:3345) for tasklet transaction(context.py:806) suspended generator transaction(context.py:876); pending>
我相信这种修改应该通过允许聚合过程中所有可能的重叠在单个实例中组合和尝试所有可能的重叠来优化过程,而不是多个实例都执行可能发生冲突的事务。我仍然无法以可靠的方式保存结果。