18

我正在使用 testbed 对我的 google app engine 应用程序进行单元测试,并且我的应用程序使用任务队列。

当我在单元测试期间向任务队列提交任务时,似乎该任务在队列中,但该任务没有执行。

如何在单元测试期间执行任务?

4

4 回答 4

24

使用撒克逊人的出色答案,我能够使用 testbed 而不是 gaetestbed 来做同样的事情。这就是我所做的。

将此添加到我的setUp()

    self.taskqueue_stub = apiproxy_stub_map.apiproxy.GetStub('taskqueue')

然后,在我的测试中,我使用了以下内容:

    # Execute the task in the taskqueue
    tasks = self.taskqueue_stub.GetTasks("default")
    self.assertEqual(len(tasks), 1)
    task = tasks[0]
    params = base64.b64decode(task["body"])
    response = self.app.post(task["url"], params)

在某个地方,POST 参数被 base64 编码,因此必须撤消它才能使其工作。

我比撒克逊人的回答更喜欢这个,因为我可以使用官方的 testbed 包,而且我可以在我自己的测试代码中完成这一切。

编辑:我后来想对使用延迟库提交的任务做同样的事情,并且花了一些时间来弄清楚它,所以我在这里分享以减轻其他人的痛苦。

如果您的任务队列仅包含延迟提交的任务,那么这将运行所有任务以及由这些任务排队的任何任务:

def submit_deferred(taskq):
    tasks = taskq.GetTasks("default")
    taskq.FlushQueue("default")
    while tasks:
        for task in tasks:
            (func, args, opts) = pickle.loads(base64.b64decode(task["body"]))
            func(*args)
        tasks = taskq.GetTasks("default")
        taskq.FlushQueue("default")
于 2011-07-09T16:12:34.827 回答
14

实现此目的的另一个(更清洁)选项是使用测试平台中的任务队列存根。为此,您首先必须通过将以下内容添加到您的setUp()方法来初始化任务队列存根:

self.testbed = init_testbed()
self.testbed.init_taskqueue_stub()

可以使用以下代码访问任务调度程序:

taskq = self.testbed.get_stub(testbed.TASKQUEUE_SERVICE_NAME)

使用队列存根的界面如下:

GetQueues() #returns a list of dictionaries with information about the available queues

#returns a list of dictionaries with information about the tasks in a given queue
GetTasks(queue_name)

DeleteTask(queue_name, task_name) #removes the task task_name from the given queue

FlushQueue(queue_name) #removes all the tasks from the queue

#returns tasks filtered by name & url pointed to by the task from the given queues
get_filtered_tasks(url, name, queue_names)

StartBackgroundExecution() #Executes the queued tasks

Shutdown() #Requests the task scheduler to shutdown.

此外,由于它使用 App Engine SDK 自己的设施 - 它与延迟库一起工作得很好。

于 2011-09-07T12:23:32.127 回答
8

开发应用服务器是单线程的,因此它不能在前台线程运行测试时在后台运行任务。

我在 gaetestbed 的 taskqueue.py 中修改了 TaskQueueTestCase 以添加以下函数:

def execute_tasks(self, application):
    """
    Executes all currently queued tasks, and also removes them from the 
    queue.
    The tasks are execute against the provided web application.
    """

    # Set up the application for webtest to use (resetting _app in case a
    # different one has been used before). 
    self._app = None
    self.APPLICATION = application

    # Get all of the tasks, and then clear them.
    tasks = self.get_tasks()
    self.clear_task_queue()

    # Run each of the tasks, checking that they succeeded.
    for task in tasks:
        response = self.post(task['url'], task['params'])
        self.assertOK(response)

为此,我还必须将 TaskQueueTestCase 的基类从 BaseTestCase 更改为 WebTestCase。

我的测试然后做这样的事情:

# Do something which enqueues a task.

# Check that a task was enqueued, then execute it.
self.assertTrue(len(self.get_tasks()), 1)
self.execute_tasks(some_module.application)

# Now test that the task did what was expected.

因此,这直接从前台单元测试执行任务。这与生产环境不太一样(即,任务将在“一段时间后”根据单独的请求执行),但它对我来说已经足够好了。

于 2011-07-09T13:32:05.470 回答
4

您可能想尝试以下代码。完整的解释在这里:http ://www.geewax.org/task-queue-support-in-app-engines-ext-testbed/

import unittest
from google.appengine.api import taskqueue
from google.appengine.ext import testbed


class TaskQueueTestCase(unittest.TestCase):

  def setUp(self):
    self.testbed = testbed.Testbed()
    self.testbed.activate()
    self.testbed.init_taskqueue_stub()
    self.task_queue_stub = self.testbed.get_stub(testbed.TASKQUEUE_SERVICE_NAME)

  def tearDown(self):
    self.testbed.deactivate()

  def testTaskAdded(self):
    taskqueue.add(url='/path/to/task')

    tasks = self.taskqueue_stub.get_filtered_tasks(url='/path/to/task')
    self.assertEqual(1, len(tasks))
    self.assertEqual('/path/to/task', tasks[0].url)

unittest.main()
于 2013-07-11T17:58:47.277 回答