-1

这是在为用户刷新访问令牌的上下文中。假设我们有refresh_user_token一个将CustomUser对象作为user.

def refresh_user_token(user):
    ...
    ...
    return result

我想为这个函数的每次执行指定一个特定CustomUser的时间安排在 9 天内重复。

def refresh_user_token(user):
    ...
    ...
    next_refresh = datetime.now() + timedelta(days=9)
    schedule_refresh(user, scheduled_time=next_refresh)
    return result

我看到的 Celery 的大多数用例都与执行批处理操作有关,但是对于这种用途,我需要能够使用 Celery 似乎不可行的参数来执行函数。


有人确实建议设置一个 cron 作业来检查任何需要刷新 x 秒的令牌。

所以在CustomUser对象上,我们有一个DateTimeField被调用的last_token_refresh.

@Celery.task
def refresh_auth_tokens():
    users = CustomUser.objects.all()
    for user in users:
        last_refresh_delta = datetime.now(timezone.utc) - user.last_token_refresh
        if last_refresh_delta.days >= 9:
            refresh_user_token(user)
            return True
        else:
            return False

这可以工作,但是当消息代理可以用于只安排所需的任务时,我觉得它非常费力。

4

1 回答 1

-1

您可以使用Celery Beat使用 crontabs 来安排 celery 任务。只需创建您的常规 Celery 任务并使用 Beat 来说明何时运行。

这是我在一个项目中使用的 Celery Beat 设置示例:

CELERY_BEAT_SCHEDULE = {
    'populate_controller': {
        'task': 'common.tasks.populate_controller',
        # Will be executed Mondays, at 08:30 
        'schedule': crontab(day_of_week=2, 
                            hour=8, 
                            minute=30),
    'options': {'queue': 'populate_controller'}
    },
 }
于 2019-03-29T17:25:10.563 回答