3

我正在使用 Python GAE SDK。

我有一些处理需要在 6000 多个MyKind. 在单个请求中完成太慢了,所以我正在使用任务队列。如果我让一个任务只处理一个实体,那么它应该只需要几秒钟。

文档说只能在“批次”中添加 100 个任务。(这是什么意思?在一个请求中?在一项任务中?)

因此,假设“批处理”意味着“请求”,我试图找出为数据存储中的每个实体创建任务的最佳方法。你怎么看?

如果我可以假设的顺序MyKind永远不会改变,那就更容易了。(处理过程永远不会真正改变MyKind实例——它只会创建其他类型的新实例。)我可以做一堆任务,给每个任务一个从哪里开始的偏移量,间隔小于 100。然后,每个任务都可以创建执行实际处理的单独任务。

但是如果实体太多以至于原始请求无法添加所有必要的调度任务怎么办?这让我觉得我需要一个递归解决方案——每个任务都会查看它给定的范围。如果该范围内仅存在一个元素,则会对其进行处理。否则,它将范围进一步细分为后续任务。

如果我不能指望使用偏移量和限制来识别实体(因为不能确保它们的顺序是恒定的),也许我可以只使用它们的键?但随后我可能会发送 1000 多个密钥,这看起来很笨拙。

我在这里走的是正确的道路,还是我应该考虑另一种设计?

4

2 回答 2

9

当您运行代码时就像taskqueue.add(url='/worker', params={'cursor': cursor})在排队执行任务一样;使用您提供的参数安排在带外执行的请求。您显然可以在一次操作中安排多达 100 个这样的操作。

不过,我不认为你愿意。任务链将使这更简单:

你的工作任务会做这样的事情:

  • 运行查询以获取一些记录以进行处理。如果任务参数中提供了游标,请使用它。将查询限制为 10 条记录,或者您认为可以在 30 秒内完成的任何内容。

  • 处理您的 10 条记录

  • 如果您的查询返回 10 条记录,则将另一个任务排入队列并将查询中更新的游标传递给它,以便它可以从您离开的地方继续。

  • 如果您获得的记录少于 10 条,那么您就完成了。万岁!关闭电子邮件或其他内容并退出。

有了这条路线,你只需要启动第一个任务,其余的自己添加。

请注意,如果任务失败,App Engine 将重试它,直到它成功,因此您无需担心数据存储中断会导致一项任务超时并中断链。

编辑:

上述步骤不保证一个实体只会被处理一次。任务通常应该只运行一次,但 Google 确实建议您为幂等性进行设计。如果这是一个主要问题,这是处理它的一种方法:

  • 在每个要处理的实体上放置一个状态标志,或创建一个补充实体来保存该标志。它应该具有类似于 Pending、Processing 和 Processed 的状态。

  • 当您获取要处理的新实体时,以事务方式锁定并增加处理标志。仅当实体处于待处理状态时才运行该实体。处理完成后,再次增加标志。

请注意,在开始之前将处理标志添加到每个实体并不是绝对必要的。您的“待定”状态可能只是意味着该属性或相应的实体尚不存在。

于 2010-07-12T02:36:45.983 回答
0

另外根据您的设计,您可以执行我所做的,即对所有需要处理的记录进行编号。我处理了大约 3500 个项目,每个项目需要 3 秒左右的时间来处理。为了避免重叠、超时并考虑将来的扩展,我的第一个任务是从数据库中获取所有此类唯一项目的列表。然后它将它分成每个项目标识符 500 个的列表,循环直到它考虑到我的数据库中的所有唯一项目,并将每个 500 个标识符的块发布到第二层处理程序任务。第二个处理程序任务中的每一个,目前是七个或八个不同的任务,然后都有一个包含 500 个项目的唯一列表,每个处理程序任务添加 500 个任务,每个唯一标识符一个。

由于这一切都是通过循环管理并根据我的数据库中唯一项目的数量进行计数,因此我可以添加任意数量的唯一项目,并且任务数量将扩大以适应它们而绝对没有重复。我每天都用它来跟踪游戏中的价格,所以它都是使用 cron 作业启动的,根本不需要我的干预。

于 2011-01-29T19:01:57.040 回答