背景和问题
我正在使用 Django 1.5.1 和 django-celery 3.0.17。我想编写一个自定义装饰器,以确保一次只运行一个函数实例,与此类似,但没有所有重复的 try/finally。
我怎样才能编写一个装饰器,以便 Celery 方法喜欢apply
并且delay
可以用参数调用?
其他人创造了这样的装饰器并取得了明显的成功。我错过了什么?
实现
我已经尝试将装饰器编写为一个函数和一个类,但是对于任何一种实现,当尝试用我的装饰器和 来装饰一个函数时@celery.task
,参数都不会传递给被装饰的函数,从而导致错误消息:
TypeError: foo() takes exactly 1 argument (0 given)
,其中foo
是修饰函数的名称。
功能实现
# util.py
from functools import wraps
from django.core.cache import get_cache
cache = get_cache('filesystem')
def cache_lock(lock_id, timeout=cache.get('TIMEOUT', 720)):
def _decorator(func):
try:
timeout_secs = timeout.total_seconds()
except AttributeError:
# Timeout is None (forever) or number of seconds.
timeout_secs = timeout
acquire_lock = lambda: cache.add(lock_id, 'true', timeout_secs) if timeout_secs else cache.add(lock_id, 'true')
release_lock = lambda: cache.delete(lock_id)
@wraps(func)
def _apply_lock(*args, **kwargs):
if acquire_lock():
try:
return func(*args, **kwargs)
finally:
release_lock()
else:
return False
return _apply_lock
return _decorator
基于类的实现
# util.py
from functools import wraps
from django.core.cache import get_cache
cache = get_cache('filesystem')
class cache_lock(object):
def __init__(self, lock_id, timeout=cache.get('TIMEOUT', 720)):
self.lock_id = lock_id
self.timeout = timeout
def __call__(self, func):
try:
timeout_secs = self.timeout.total_seconds()
except AttributeError:
# Timeout is None (forever) or number of seconds.
timeout_secs = self.timeout
acquire_lock = lambda: cache.add(self.lock_id, 'true', timeout_secs) if timeout_secs else cache.add(self.lock_id, 'true')
release_lock = lambda: cache.delete(self.lock_id)
@wraps(func)
def _apply_lock(*args, **kwargs):
if acquire_lock():
try:
return func(*args, **kwargs)
finally:
release_lock()
else:
return False
return _apply_lock
测试用例
对于这两种实现,第一个测试方法成功,第二个失败。
# tests.py
from datetime import timedelta
from celery import task # using celery.task.task does not help
from django.test import TestCase
from django.test.utils import override_settings
from .util import cache_lock
class UtilTests(TestCase):
def test_cache_lock_without_celery(self):
@cache_lock('recursive', timedelta(seconds=1))
def call_count(i):
self.assertFalse(call_count(i + 1))
return i + 1
self.assertEqual(call_count(0), 1) # succeeds
celery_settings = {
'CELERY_ALWAYS_EAGER': True,
'CELERY_EAGER_PROPAGATES_EXCEPTIONS': True,
'DEBUG': True,
}
@override_settings(**celery_settings)
def test_cache_lock_with_celery(self):
@task(name='test_cache_lock_with_celery')
@cache_lock('recursive', timedelta(seconds=600))
def call_count(i):
self.assertFalse(call_count.apply(i + 1).result)
return i + 1
self.assertEqual(call_count.apply(0).result, 1) # fails!