62

我在 App Engine 数据存储区遇到了一个有趣的限制。我正在创建一个处理程序来帮助我们分析我们的一个生产服务器上的一些使用数据。要执行分析,我需要查询和汇总从数据存储中提取的 10,000 多个实体。计算并不难,它只是通过使用样本的特定过滤器的项目的直方图。我遇到的问题是,在达到查询截止日期之前,我无法足够快地从数据存储中取回数据以进行任何处理。

我已经尝试了所有我能想到的将查询分块为并行 RPC 调用以提高性能,但根据 appstats,我似乎无法让查询实际并行执行。无论我尝试哪种方法(见下文),RPC 似乎总是会退回到顺序下一个查询的瀑布。

注意:查询和分析代码确实有效,只是运行缓慢,因为我无法从数据存储区快速获取数据。

背景

我没有可以分享的实时版本,但这里是我正在谈论的系统部分的基本模型:

class Session(ndb.Model):
   """ A tracked user session. (customer account (company), version, OS, etc) """
   data = ndb.JsonProperty(required = False, indexed = False)

class Sample(ndb.Model):
   name      = ndb.StringProperty  (required = True,  indexed = True)
   session   = ndb.KeyProperty     (required = True,  kind = Session)
   timestamp = ndb.DateTimeProperty(required = True,  indexed = True)
   tags      = ndb.StringProperty  (repeated = True,  indexed = True)

您可以将样本视为用户使用给定名称的功能的时间。(例如:'systemA.feature_x')。标签基于客户详细信息、系统信息和功能。例如:['winxp'、'2.5.1'、'systemA'、'feature_x'、'premium_account'])。因此,这些标签形成了一组非规范化的标记,可用于查找感兴趣的样本。

我正在尝试进行的分析包括获取日期范围并询问每个客户帐户(公司,而不是每个用户)每天(或每小时)使用的一组功能(可能是所有功能)的次数。

所以处理程序的输入类似于:

  • 开始日期
  • 结束日期
  • 标签

输出将是:

[{
   'company_account': <string>,
   'counts': [
      {'timeperiod': <iso8601 date>, 'count': <int>}, ...
   ]
 }, ...
]

查询通用代码

这是所有查询的一些共同代码。处理程序的一般结构是一个使用 webapp2 的简单 get 处理程序,它设置查询参数、运行查询、处理结果、创建要返回的数据。

# -- Build Query Object --- #
query_opts = {}
query_opts['batch_size'] = 500   # Bring in large groups of entities

q = Sample.query()
q = q.order(Sample.timestamp)

# Tags
tag_args = [(Sample.tags == t) for t in tags]
q = q.filter(ndb.query.AND(*tag_args))

def handle_sample(sample):
   session_obj = sample.session.get()    # Usually found in local or memcache thanks to ndb
   count_key   = session_obj.data['customer']
   addCountForPeriod(count_key, sample.timestamp)

尝试的方法

我尝试了多种方法来尝试尽快并行地从数据存储中提取数据。到目前为止我尝试过的方法包括:

A. 单次迭代

这更像是与其他方法进行比较的简单基本情况。我只是构建查询并遍历所有项目,让 ndb 做它做的事情来一个接一个地拉它们。

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
q_iter = q.iter(**query_opts)

for sample in q_iter:
   handle_sample(sample)

B. 大取数

这里的想法是看看我是否可以进行一次非常大的提取。

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
samples = q.fetch(20000, **query_opts)

for sample in samples:
   handle_sample(sample)

C. 跨时间范围的异步获取

这里的想法是认识到样本在时间上的间隔相当好,因此我可以创建一组独立的查询,将整个时间区域分成块,并尝试使用异步并行运行每个查询:

# split up timestamp space into 20 equal parts and async query each of them
ts_delta       = (end_time - start_time) / 20
cur_start_time = start_time
q_futures = []

for x in range(ts_intervals):
   cur_end_time = (cur_start_time + ts_delta)
   if x == (ts_intervals-1):    # Last one has to cover full range
      cur_end_time = end_time

   f = q.filter(Sample.timestamp >= cur_start_time,
                Sample.timestamp < cur_end_time).fetch_async(limit=None, **query_opts)
   q_futures.append(f)
   cur_start_time = cur_end_time

# Now loop through and collect results
for f in q_futures:
   samples = f.get_result()
   for sample in samples:
      handle_sample(sample)

D. 异步映射

我尝试了这种方法,因为文档听起来像是 ndb 在使用 Query.map_async 方法时可能会自动利用一些并行性。

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)

@ndb.tasklet
def process_sample(sample):
   period_ts   = getPeriodTimestamp(sample.timestamp)
   session_obj = yield sample.session.get_async()    # Lookup the session object from cache
   count_key   = session_obj.data['customer']
   addCountForPeriod(count_key, sample.timestamp)
   raise ndb.Return(None)

q_future = q.map_async(process_sample, **query_opts)
res = q_future.get_result()

结果

我测试了一个示例查询来收集总体响应时间和 appstats 跟踪。结果是:

A. 单次迭代

真实:15.645s

这个顺序通过一个接一个地获取批次,然后从 memcache 中检索每个会话。

方法 A appstats

B. 大取数

真实:12.12s

实际上与选项 A 相同,但由于某种原因要快一些。

方法 B 应用统计

C. 跨时间范围的异步获取

真实:15.251s

似乎在开始时提供了更多的并行性,但似乎在结果迭代期间因一系列对 next 的调用而减慢了速度。似乎也无法将会话 memcache 查找与挂起的查询重叠。

方法 C 应用程序统计

D. 异步映射

真实:13.752s

这个是我最难理解的。看起来它有很多重叠,但一切似乎都在瀑布中延伸而不是平行。

方法 D appstats

建议

基于这一切,我错过了什么?我只是达到了 App Engine 的限制,还是有更好的方法来并行拉下大量实体?

我不知道接下来要尝试什么。我考虑过重写客户端以并行向应用程序引擎发出多个请求,但这似乎很暴力。我真的希望应用引擎应该能够处理这个用例,所以我猜我缺少一些东西。

更新

最后我发现选项 C 最适合我的情况。我能够优化它以在 6.1 秒内完成。仍然不完美,但要好得多。

在听取了几个人的建议后,我发现以下几点是理解和牢记的关键:

  • 多个查询可以并行运行
  • 一次只能有 10 个 RPC 飞行
  • 尝试去规范化到没有辅助查询的程度
  • 这种类型的任务最好留给映射reduce和任务队列,而不是实时查询

所以我做了什么让它更快:

  • 我从一开始就根据时间对查询空间进行了分区。(注意:分区在返回的实体方面越相等越好)
  • 我进一步对数据进行非规范化以消除对辅助会话查询的需要
  • 我利用 ndb 异步操作和 wait_any() 将查询与处理重叠

我仍然没有得到我期望或喜欢的性能,但它现在是可行的。我只是希望它们是一种更好的方法,可以在处理程序中快速将大量顺序实体拉入内存。

4

4 回答 4

8

像这样的大型处理不应该在有 60 秒时间限制的用户请求中完成。相反,它应该在支持长时间运行请求的上下文中完成。任务队列最多支持 10 分钟的请求,并且(我相信)正常的内存限制(F1 实例,默认情况下,有128MB 的内存)。对于更高的限制(无请求超时,1GB+ 内存),请使用backends

这里有一些可以尝试的方法:设置一个 URL,当访问该 URL 时,它会触发一个任务队列任务。如果任务队列任务尚未完成,它会返回一个网页,该网页每隔约 5 秒轮询另一个以 true/false 响应的 URL。任务队列处理数据,这可能需要大约 10 秒,并将结果作为计算数据或呈现的网页保存到数据存储区。一旦初始页面检测到它已经完成,用户就会被重定向到该页面,该页面从数据存储中获取现在计算的结果。

于 2012-07-17T07:50:46.403 回答
2

新的实验性数据处理功能(MapReduce 的 AppEngine API)看起来非常适合解决这个问题。它执行自动分片以执行多个并行工作进程。

于 2014-02-22T05:12:00.100 回答
1

我有一个类似的问题,在与谷歌支持合作几周后,我可以确认至少在 2017 年 12 月没有神奇的解决方案。

tl;dr:对于运行在 B1 实例上的标准 SDK,可以预期吞吐量为220个实体/秒,而对于在 B8 实例上运行的已修补 SDK,吞吐量可以达到900个实体/秒。

限制与 CPU 相关,更改实例类型直接影响性能。在 B4 和 B4_1G 实例上获得的类似结果证实了这一点

对于具有大约 30 个字段的 Expando 实体,我获得的最佳吞吐量是:

标准 GAE SDK

  • B1 实例:~220 个实体/秒
  • B2 实例:~250 个实体/秒
  • B4 实例:~560 个实体/秒
  • B4_1G 实例:~560 个实体/秒
  • B8 实例:~650 个实体/秒

已修补的 GAE SDK

  • B1 实例:~420 个实体/秒
  • B8 实例:~900 个实体/秒

对于标准 GAE SDK,我尝试了各种方法,包括多线程,但最好的方法是fetch_async使用wait_any. 当前的 NDB 库已经在后台使用异步和期货方面做得很好,所以任何使用线程来推动它的尝试只会让情况变得更糟。

我发现了两种有趣的方法来优化它:

Matt Faus 很好地解释了这个问题:

GAE SDK 提供了一个 API,用于读取和写入从您的类派生的对象到数据存储区。这为您省去了验证从数据存储返回的原始数据并将其重新打包成易于使用的对象的枯燥工作。特别是,GAE 使用协议缓冲区将原始数据从存储传输到需要它的前端机器。然后 SDK 负责解码此格式并将干净的对象返回给您的代码。这个实用程序很棒,但有时它做的工作比你想要的要多。[...] 使用我们的分析工具,我发现在 protobuf-to-python-object 解码阶段花费了整整 50% 的时间来获取这些实体。这意味着前端服务器上的 CPU 是这些数据存储读取的瓶颈!

GAE 数据访问网络请求

这两种方法都试图通过减少解码字段的数量来减少对 Python 解码执行 protobuf 所花费的时间。

我尝试了这两种方法,但我只用马特的方法成功了。自 Evan 发布他的解决方案以来,SDK 内部结构发生了变化。我不得不稍微修改一下 Matt在这里发布的代码,但这很容易——如果有兴趣,我可以发布最终代码。

对于具有大约 30 个字段的常规 Expando 实体,我使用 Matt 的解决方案仅解码几个字段并获得了显着改进。

总之,需要相应地进行计划,并且不要期望能够在“实时”GAE 请求中处理超过数百个实体。

于 2017-12-07T12:35:12.690 回答
0

App Engine 上的大数据操作最好使用某种 mapreduce 操作来实现。

这是描述该过程的视频,但包括 BigQuery https://developers.google.com/events/io/sessions/gooio2012/307/

听起来您不需要 BigQuery,但您可能希望同时使用管道的 Map 和 Reduce 部分。

您正在做的事情与 mapreduce 情况之间的主要区别在于,您正在启动一个实例并遍历查询,而在 mapreduce 上,您将为每个查询并行运行一个单独的实例。您将需要一个 reduce 操作来“总结”所有数据,并将结果写入某处。

您遇到的另一个问题是您应该使用游标进行迭代。 https://developers.google.com/appengine/docs/java/datastore/queries#Query_Cursors

如果迭代器使用查询偏移量,它会效率低下,因为偏移量发出相同的查询,跳过一些结果,并给你下一组,而光标直接跳转到下一组。

于 2012-07-16T17:43:52.227 回答