我已经实现了一个装饰器来处理这个问题。它基于确保从官方 Celery 文档中一次只执行一个任务。
它使用函数的名称及其 args 和 kwargs 创建一个 lock_id,它在 Django 的缓存层中设置/获取(我只用 Memcached 测试过,但它也应该适用于 Redis)。如果 lock_id 已经在缓存中设置,它会将任务放回队列并退出。
CACHE_LOCK_EXPIRE = 30
def no_simultaneous_execution(f):
"""
Decorator that prevents a task form being executed with the
same *args and **kwargs more than one at a time.
"""
@functools.wraps(f)
def wrapper(self, *args, **kwargs):
# Create lock_id used as cache key
lock_id = '{}-{}-{}'.format(self.name, args, kwargs)
# Timeout with a small diff, so we'll leave the lock delete
# to the cache if it's close to being auto-removed/expired
timeout_at = monotonic() + CACHE_LOCK_EXPIRE - 3
# Try to acquire a lock, or put task back on queue
lock_acquired = cache.add(lock_id, True, CACHE_LOCK_EXPIRE)
if not lock_acquired:
self.apply_async(args=args, kwargs=kwargs, countdown=3)
return
try:
f(self, *args, **kwargs)
finally:
# Release the lock
if monotonic() < timeout_at:
cache.delete(lock_id)
return wrapper
然后,您可以将其作为第一个装饰器应用于任何任务:
@shared_task(bind=True, base=MyTask)
@no_simultaneous_execution
def sometask(self, some_arg):
...