1

您知道如何强制 GAE SDK 同步执行任务以进行单元测试以检查任务结果吗?

您知道如何在单元测试中等待队列中的所有任务执行以检查任务结果吗?

任务的默认异步执行不允许简单的测试,因此不适合例如在使用任务队列更新后检查计数器。

考虑测试这样简单的代码(完成测试后检查结果)。

  payload = pickle.dumps(args, protocol = pickle.HIGHEST_PROTOCOL)
  task = taskqueue.Task(url = TASK_ADD_ORDER_REVIEW_COUNTER,
                        payload = payload)
  task.add(queue_name = COUNTERS_QUEQUE)

可以跳过 task.add 并直接从 TASK_ADD_ORDER_REVIEW_COUNTER 调用代码,但这是非常丑陋的解决方法,我认为它应该更简单。

4

1 回答 1

1

我建议您将测试分为两部分:

  1. 检查是否添加了任务
  2. 检查处理任务的处理程序是否按预期工作

对于 1.,我会像这样使用 ext.testbed

from google.appengine.ext import testbed

def setUp():
  self.testbed = testbed.Testbed()
  self.testbed.activate()
  self.testbed.init_taskqueue_stub()

def test():
  [do whatever triggers tasks]
  stub = self.testbed.get_stub('taskqueue')
  tasks = stub.get_filtered_tasks() # see (taskqueue_stub.py for details[2])
  [verify tasks is what you expected]

对于 2.,我会按照处理程序测试文章向您的任务处理程序发送请求。毕竟,任务是作为普通的 POST 请求(编码为请求参数的参数)调用的。

1 http://code.google.com/p/googleappengine/source/browse/trunk/python/google/appengine/api/taskqueue/taskqueue_stub.py#2453

于 2012-11-05T18:55:36.920 回答