我在 App Engine 数据存储区遇到了一个有趣的限制。我正在创建一个处理程序来帮助我们分析我们的一个生产服务器上的一些使用数据。要执行分析,我需要查询和汇总从数据存储中提取的 10,000 多个实体。计算并不难,它只是通过使用样本的特定过滤器的项目的直方图。我遇到的问题是,在达到查询截止日期之前,我无法足够快地从数据存储中取回数据以进行任何处理。
我已经尝试了所有我能想到的将查询分块为并行 RPC 调用以提高性能,但根据 appstats,我似乎无法让查询实际并行执行。无论我尝试哪种方法(见下文),RPC 似乎总是会退回到顺序下一个查询的瀑布。
注意:查询和分析代码确实有效,只是运行缓慢,因为我无法从数据存储区快速获取数据。
背景
我没有可以分享的实时版本,但这里是我正在谈论的系统部分的基本模型:
class Session(ndb.Model):
""" A tracked user session. (customer account (company), version, OS, etc) """
data = ndb.JsonProperty(required = False, indexed = False)
class Sample(ndb.Model):
name = ndb.StringProperty (required = True, indexed = True)
session = ndb.KeyProperty (required = True, kind = Session)
timestamp = ndb.DateTimeProperty(required = True, indexed = True)
tags = ndb.StringProperty (repeated = True, indexed = True)
您可以将样本视为用户使用给定名称的功能的时间。(例如:'systemA.feature_x')。标签基于客户详细信息、系统信息和功能。例如:['winxp'、'2.5.1'、'systemA'、'feature_x'、'premium_account'])。因此,这些标签形成了一组非规范化的标记,可用于查找感兴趣的样本。
我正在尝试进行的分析包括获取日期范围并询问每个客户帐户(公司,而不是每个用户)每天(或每小时)使用的一组功能(可能是所有功能)的次数。
所以处理程序的输入类似于:
- 开始日期
- 结束日期
- 标签
输出将是:
[{
'company_account': <string>,
'counts': [
{'timeperiod': <iso8601 date>, 'count': <int>}, ...
]
}, ...
]
查询通用代码
这是所有查询的一些共同代码。处理程序的一般结构是一个使用 webapp2 的简单 get 处理程序,它设置查询参数、运行查询、处理结果、创建要返回的数据。
# -- Build Query Object --- #
query_opts = {}
query_opts['batch_size'] = 500 # Bring in large groups of entities
q = Sample.query()
q = q.order(Sample.timestamp)
# Tags
tag_args = [(Sample.tags == t) for t in tags]
q = q.filter(ndb.query.AND(*tag_args))
def handle_sample(sample):
session_obj = sample.session.get() # Usually found in local or memcache thanks to ndb
count_key = session_obj.data['customer']
addCountForPeriod(count_key, sample.timestamp)
尝试的方法
我尝试了多种方法来尝试尽快并行地从数据存储中提取数据。到目前为止我尝试过的方法包括:
A. 单次迭代
这更像是与其他方法进行比较的简单基本情况。我只是构建查询并遍历所有项目,让 ndb 做它做的事情来一个接一个地拉它们。
q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
q_iter = q.iter(**query_opts)
for sample in q_iter:
handle_sample(sample)
B. 大取数
这里的想法是看看我是否可以进行一次非常大的提取。
q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
samples = q.fetch(20000, **query_opts)
for sample in samples:
handle_sample(sample)
C. 跨时间范围的异步获取
这里的想法是认识到样本在时间上的间隔相当好,因此我可以创建一组独立的查询,将整个时间区域分成块,并尝试使用异步并行运行每个查询:
# split up timestamp space into 20 equal parts and async query each of them
ts_delta = (end_time - start_time) / 20
cur_start_time = start_time
q_futures = []
for x in range(ts_intervals):
cur_end_time = (cur_start_time + ts_delta)
if x == (ts_intervals-1): # Last one has to cover full range
cur_end_time = end_time
f = q.filter(Sample.timestamp >= cur_start_time,
Sample.timestamp < cur_end_time).fetch_async(limit=None, **query_opts)
q_futures.append(f)
cur_start_time = cur_end_time
# Now loop through and collect results
for f in q_futures:
samples = f.get_result()
for sample in samples:
handle_sample(sample)
D. 异步映射
我尝试了这种方法,因为文档听起来像是 ndb 在使用 Query.map_async 方法时可能会自动利用一些并行性。
q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
@ndb.tasklet
def process_sample(sample):
period_ts = getPeriodTimestamp(sample.timestamp)
session_obj = yield sample.session.get_async() # Lookup the session object from cache
count_key = session_obj.data['customer']
addCountForPeriod(count_key, sample.timestamp)
raise ndb.Return(None)
q_future = q.map_async(process_sample, **query_opts)
res = q_future.get_result()
结果
我测试了一个示例查询来收集总体响应时间和 appstats 跟踪。结果是:
A. 单次迭代
真实:15.645s
这个顺序通过一个接一个地获取批次,然后从 memcache 中检索每个会话。
B. 大取数
真实:12.12s
实际上与选项 A 相同,但由于某种原因要快一些。
C. 跨时间范围的异步获取
真实:15.251s
似乎在开始时提供了更多的并行性,但似乎在结果迭代期间因一系列对 next 的调用而减慢了速度。似乎也无法将会话 memcache 查找与挂起的查询重叠。
D. 异步映射
真实:13.752s
这个是我最难理解的。看起来它有很多重叠,但一切似乎都在瀑布中延伸而不是平行。
建议
基于这一切,我错过了什么?我只是达到了 App Engine 的限制,还是有更好的方法来并行拉下大量实体?
我不知道接下来要尝试什么。我考虑过重写客户端以并行向应用程序引擎发出多个请求,但这似乎很暴力。我真的希望应用引擎应该能够处理这个用例,所以我猜我缺少一些东西。
更新
最后我发现选项 C 最适合我的情况。我能够优化它以在 6.1 秒内完成。仍然不完美,但要好得多。
在听取了几个人的建议后,我发现以下几点是理解和牢记的关键:
- 多个查询可以并行运行
- 一次只能有 10 个 RPC 飞行
- 尝试去规范化到没有辅助查询的程度
- 这种类型的任务最好留给映射reduce和任务队列,而不是实时查询
所以我做了什么让它更快:
- 我从一开始就根据时间对查询空间进行了分区。(注意:分区在返回的实体方面越相等越好)
- 我进一步对数据进行非规范化以消除对辅助会话查询的需要
- 我利用 ndb 异步操作和 wait_any() 将查询与处理重叠
我仍然没有得到我期望或喜欢的性能,但它现在是可行的。我只是希望它们是一种更好的方法,可以在处理程序中快速将大量顺序实体拉入内存。