我正在使用 testbed 对我的 google app engine 应用程序进行单元测试,并且我的应用程序使用任务队列。
当我在单元测试期间向任务队列提交任务时,似乎该任务在队列中,但该任务没有执行。
如何在单元测试期间执行任务?
我正在使用 testbed 对我的 google app engine 应用程序进行单元测试,并且我的应用程序使用任务队列。
当我在单元测试期间向任务队列提交任务时,似乎该任务在队列中,但该任务没有执行。
如何在单元测试期间执行任务?
使用撒克逊人的出色答案,我能够使用 testbed 而不是 gaetestbed 来做同样的事情。这就是我所做的。
将此添加到我的setUp()
:
self.taskqueue_stub = apiproxy_stub_map.apiproxy.GetStub('taskqueue')
然后,在我的测试中,我使用了以下内容:
# Execute the task in the taskqueue
tasks = self.taskqueue_stub.GetTasks("default")
self.assertEqual(len(tasks), 1)
task = tasks[0]
params = base64.b64decode(task["body"])
response = self.app.post(task["url"], params)
在某个地方,POST 参数被 base64 编码,因此必须撤消它才能使其工作。
我比撒克逊人的回答更喜欢这个,因为我可以使用官方的 testbed 包,而且我可以在我自己的测试代码中完成这一切。
编辑:我后来想对使用延迟库提交的任务做同样的事情,并且花了一些时间来弄清楚它,所以我在这里分享以减轻其他人的痛苦。
如果您的任务队列仅包含延迟提交的任务,那么这将运行所有任务以及由这些任务排队的任何任务:
def submit_deferred(taskq):
tasks = taskq.GetTasks("default")
taskq.FlushQueue("default")
while tasks:
for task in tasks:
(func, args, opts) = pickle.loads(base64.b64decode(task["body"]))
func(*args)
tasks = taskq.GetTasks("default")
taskq.FlushQueue("default")
实现此目的的另一个(更清洁)选项是使用测试平台中的任务队列存根。为此,您首先必须通过将以下内容添加到您的setUp()
方法来初始化任务队列存根:
self.testbed = init_testbed()
self.testbed.init_taskqueue_stub()
可以使用以下代码访问任务调度程序:
taskq = self.testbed.get_stub(testbed.TASKQUEUE_SERVICE_NAME)
使用队列存根的界面如下:
GetQueues() #returns a list of dictionaries with information about the available queues
#returns a list of dictionaries with information about the tasks in a given queue
GetTasks(queue_name)
DeleteTask(queue_name, task_name) #removes the task task_name from the given queue
FlushQueue(queue_name) #removes all the tasks from the queue
#returns tasks filtered by name & url pointed to by the task from the given queues
get_filtered_tasks(url, name, queue_names)
StartBackgroundExecution() #Executes the queued tasks
Shutdown() #Requests the task scheduler to shutdown.
此外,由于它使用 App Engine SDK 自己的设施 - 它与延迟库一起工作得很好。
开发应用服务器是单线程的,因此它不能在前台线程运行测试时在后台运行任务。
我在 gaetestbed 的 taskqueue.py 中修改了 TaskQueueTestCase 以添加以下函数:
def execute_tasks(self, application):
"""
Executes all currently queued tasks, and also removes them from the
queue.
The tasks are execute against the provided web application.
"""
# Set up the application for webtest to use (resetting _app in case a
# different one has been used before).
self._app = None
self.APPLICATION = application
# Get all of the tasks, and then clear them.
tasks = self.get_tasks()
self.clear_task_queue()
# Run each of the tasks, checking that they succeeded.
for task in tasks:
response = self.post(task['url'], task['params'])
self.assertOK(response)
为此,我还必须将 TaskQueueTestCase 的基类从 BaseTestCase 更改为 WebTestCase。
我的测试然后做这样的事情:
# Do something which enqueues a task.
# Check that a task was enqueued, then execute it.
self.assertTrue(len(self.get_tasks()), 1)
self.execute_tasks(some_module.application)
# Now test that the task did what was expected.
因此,这直接从前台单元测试执行任务。这与生产环境不太一样(即,任务将在“一段时间后”根据单独的请求执行),但它对我来说已经足够好了。
您可能想尝试以下代码。完整的解释在这里:http ://www.geewax.org/task-queue-support-in-app-engines-ext-testbed/
import unittest
from google.appengine.api import taskqueue
from google.appengine.ext import testbed
class TaskQueueTestCase(unittest.TestCase):
def setUp(self):
self.testbed = testbed.Testbed()
self.testbed.activate()
self.testbed.init_taskqueue_stub()
self.task_queue_stub = self.testbed.get_stub(testbed.TASKQUEUE_SERVICE_NAME)
def tearDown(self):
self.testbed.deactivate()
def testTaskAdded(self):
taskqueue.add(url='/path/to/task')
tasks = self.taskqueue_stub.get_filtered_tasks(url='/path/to/task')
self.assertEqual(1, len(tasks))
self.assertEqual('/path/to/task', tasks[0].url)
unittest.main()