这是在为用户刷新访问令牌的上下文中。假设我们有refresh_user_token
一个将CustomUser
对象作为user
.
def refresh_user_token(user):
...
...
return result
我想为这个函数的每次执行指定一个特定CustomUser
的时间安排在 9 天内重复。
def refresh_user_token(user):
...
...
next_refresh = datetime.now() + timedelta(days=9)
schedule_refresh(user, scheduled_time=next_refresh)
return result
我看到的 Celery 的大多数用例都与执行批处理操作有关,但是对于这种用途,我需要能够使用 Celery 似乎不可行的参数来执行函数。
有人确实建议设置一个 cron 作业来检查任何需要刷新 x 秒的令牌。
所以在CustomUser
对象上,我们有一个DateTimeField
被调用的last_token_refresh
.
@Celery.task
def refresh_auth_tokens():
users = CustomUser.objects.all()
for user in users:
last_refresh_delta = datetime.now(timezone.utc) - user.last_token_refresh
if last_refresh_delta.days >= 9:
refresh_user_token(user)
return True
else:
return False
这可以工作,但是当消息代理可以用于只安排所需的任务时,我觉得它非常费力。