我将开始在我的项目中使用 django-rq。
Django 与 RQ 集成,这是一个基于 Redis 的 Python 队列库。
测试使用 RQ 的 django 应用程序的最佳实践是什么?
例如,如果我想将我的应用程序作为黑盒进行测试,在用户执行一些操作后,我想执行当前队列中的所有作业,然后检查我的数据库中的所有结果。我怎样才能在我的 django-tests 中做到这一点?
我刚刚发现django-rq
,它允许您在测试环境中启动一个工作人员,该工作人员执行队列中的任何任务然后退出。
from django.test impor TestCase
from django_rq import get_worker
class MyTest(TestCase):
def test_something_that_creates_jobs(self):
... # Stuff that init jobs.
get_worker().work(burst=True) # Processes all jobs then stop.
... # Asserts that the job stuff is done.
我把我的rq
测试分成几部分。
rq
的测试套件应该涵盖这一点)。正在测试的代码:
def handle(self, *args, **options):
uid = options.get('user_id')
# @@@ Need to exclude out users who have gotten an email within $window
# days.
if uid is None:
uids = User.objects.filter(is_active=True, userprofile__waitlisted=False).values_list('id', flat=True)
else:
uids = [uid]
q = rq.Queue(connection=redis.Redis())
for user_id in uids:
q.enqueue(mail_user, user_id)
我的测试:
class DjangoMailUsersTest(DjangoTestCase):
def setUp(self):
self.cmd = MailUserCommand()
@patch('redis.Redis')
@patch('rq.Queue')
def test_no_userid_queues_all_userids(self, queue, _):
u1 = UserF.create(userprofile__waitlisted=False)
u2 = UserF.create(userprofile__waitlisted=False)
self.cmd.handle()
self.assertItemsEqual(queue.return_value.enqueue.mock_calls,
[call(ANY, u1.pk), call(ANY, u2.pk)])
@patch('redis.Redis')
@patch('rq.Queue')
def test_waitlisted_people_excluded(self, queue, _):
u1 = UserF.create(userprofile__waitlisted=False)
UserF.create(userprofile__waitlisted=True)
self.cmd.handle()
self.assertItemsEqual(queue.return_value.enqueue.mock_calls, [call(ANY, u1.pk)])
我提交了一个补丁,可以让你这样做:
from django.test impor TestCase
from django_rq import get_queue
class MyTest(TestCase):
def test_something_that_creates_jobs(self):
queue = get_queue(async=False)
queue.enqueue(func) # func will be executed right away
# Test for job completion
这应该使测试 RQ 作业更容易。希望有帮助!
我为这个案例所做的是检测我是否正在测试,并在测试期间使用 fakeredis。最后,在测试本身中,我将 redis worker 任务排入同步模式:
首先,定义一个检测您是否正在测试的函数:
TESTING = len(sys.argv) > 1 and sys.argv[1] == 'test'
def am_testing():
return TESTING
然后在使用redis排队任务的文件中,以这种方式管理队列。如果需要,您可以扩展 get_queue 以指定队列名称:
if am_testing():
from fakeredis import FakeStrictRedis
from rq import Queue
def get_queue():
return Queue(connection=FakeStrictRedis())
else:
import django_rq
def get_queue():
return django_rq.get_queue()
然后,像这样将您的任务排入队列:
queue = get_queue()
queue.enqueue(task_mytask, arg1, arg2)
最后,在您的测试程序中,以同步模式运行您正在测试的任务,以便它与您的测试在同一进程中运行。作为实践,我首先清除 fakeredis 队列,但我认为没有必要,因为没有工人:
from rq import Queue
from fakeredis import FakeStrictRedis
FakeStrictRedis().flushall()
queue = Queue(async=False, connection=FakeStrictRedis())
queue.enqueue(task_mytask, arg1, arg2)
我的 settings.py 有正常的 django_redis 设置,所以 django_rq.getqueue() 在部署时使用这些:
RQ_QUEUES = {
'default': {
'HOST': env_var('REDIS_HOST'),
'PORT': 6379,
'DB': 0,
# 'PASSWORD': 'some-password',
'DEFAULT_TIMEOUT': 360,
},
'high': {
'HOST': env_var('REDIS_HOST'),
'PORT': 6379,
'DB': 0,
'DEFAULT_TIMEOUT': 500,
},
'low': {
'HOST': env_var('REDIS_HOST'),
'PORT': 6379,
'DB': 0,
}
}
当队列中仍有作业时,您需要暂停测试。为此,您可以检查Queue.is_empty()
,如果队列中仍有作业,则暂停执行:
import time
from django.utils.unittest import TestCase
import django_rq
class TestQueue(TestCase):
def test_something(self):
# simulate some User actions which will queue up some tasks
# Wait for the queued tasks to run
queue = django_rq.get_queue('default')
while not queue.is_empty():
time.sleep(5) # adjust this depending on how long your tasks take to execute
# queued tasks are done, check state of the DB
self.assert(.....)
以防万一这对任何人都有帮助。我使用带有自定义模拟对象的补丁来执行将立即运行的入队
#patch django_rq.get_queue
with patch('django_rq.get_queue', return_value=MockBulkJobGetQueue()) as mock_django_rq_get_queue:
#Perform web operation that starts job. In my case a post to a url
然后模拟对象只有一种方法:
class MockBulkJobGetQueue(object):
def enqueue(self, f, *args, **kwargs):
# Call the function
f(
**kwargs.pop('kwargs', None)
)
我遇到了同样的问题。此外,我在我的作业中执行了一些邮件功能,然后想检查 Django 测试邮箱是否有任何电子邮件。但是,由于使用 Django RQ,作业不会在与 Django 测试相同的上下文中执行,因此发送的电子邮件不会最终进入测试邮箱。
因此我需要在相同的上下文中执行作业。这可以通过以下方式实现:
from django_rq import get_queue
queue = get_queue('default')
queue.enqueue(some_job_callable)
# execute input watcher
jobs = queue.get_jobs()
# execute in the same context as test
while jobs:
for job in jobs:
queue.remove(job)
job.perform()
jobs = queue.get_jobs()
# check no jobs left in queue
assert not jobs
在这里,您只需从队列中获取所有作业并直接在测试中执行它们。可以在 TestCase 类中很好地实现这一点并重用此功能。