13

我想将django_rqandrq-scheduler用于离线任务,但我不确定在哪里调用rq-scheduler' 安排重复任务的能力。现在,我已将我的日程安排添加到tasks.py我的应用程序中的一个模块中,并将其导入到__init__.py 中。不过,必须有更好的方法来做到这一点,对吧?

提前致谢。

4

4 回答 4

14

我发现运行它的最佳位置是在您AppConfigapps.py.

def ready(self):
    scheduler = django_rq.get_scheduler('default')

    # Delete any existing jobs in the scheduler when the app starts up
    for job in scheduler.get_jobs():
        job.delete()

    # Have 'mytask' run every 5 minutes
    scheduler.schedule(datetime.utcnow(), 'mytask', interval=60*5)
于 2015-08-16T09:06:11.540 回答
11

我创建了一个自定义管理命令,它修改和rqscheduler替换django_rq. 此处提供了一个示例:https ://github.com/rq/rq-scheduler/issues/51#issuecomment-362352497

于 2018-02-01T18:09:58.527 回答
9

我已经__init__在我的一个项目应用程序中的一个模块中添加了调度(就 Django 而言),但使用了一个小功能,可以防止两次或更多排队作业。调度策略可能取决于您的特定需求(即您可能需要额外检查工作参数)。

适合我并满足我需求的代码:

import django_rq
from collections import defaultdict
import tasks

scheduler = django_rq.get_scheduler('default')

jobs = scheduler.get_jobs()
functions = defaultdict(lambda: list())

map(lambda x: functions[x.func].append(x.meta.get('interval')), jobs)

now = datetime.datetime.now()

def schedule_once(func, interval):
    """
    Schedule job once or reschedule when interval changes
    """
    if not func in functions or not interval in functions[func]\
            or len(functions[func])>1:

        # clear all scheduled jobs for this function
        map(scheduler.cancel, filter(lambda x: x.func==func, jobs))

        # schedule with new interval
        scheduler.schedule(now+datetime.timedelta(seconds=interval), func,
                interval=interval)

schedule_once(tasks.some_task_a, interval=60*5)
schedule_once(tasks.some_task_b, interval=120)

我还包装了这个片段以避免在包级别导入:

def init_scheduler():
    # paste here initialization code

init_scheduler()
于 2013-09-03T14:53:29.150 回答
0

您应该使用 django 命令来运行计划作业https://docs.djangoproject.com/en/3.2/howto/custom-management-commands/

像这样

在此处输入图像描述

class Command(BaseCommand):

  def handle(self, *args, **options):
    scheduler = django_rq.get_scheduler('crontab_job')
    for job in scheduler.get_jobs():
        scheduler.cancel(job)
    # 定时任务例子1
    scheduler.cron(
        "*/3 * * * *",  # 每周一零点零时零分执行 0 0 * * 0    测试可以使用 */3 * * * * 每3分钟执行一次
        func=gong_an_job,  # Function to be queued
        kwargs={'msg': '我是王龙飞1,我喜欢修仙', 'number': 1},  # Keyword arguments passed into function when executed
        repeat=None,  # Repeat this number of times (None means repeat forever)
        queue_name='crontab_job',  # In which queue the job should be put in
        use_local_timezone=False  # Interpret hours in the local timezone
    )
    # 定时任务例子2
    scheduler.cron(
        "*/5 * * * *",  # 每周一零点零时零分执行 0 0 * * 0    测试可以使用 */3 * * * * 每3分钟执行一次
        func=gong_an_job,  # Function to be queued
        kwargs={'msg': '我是王龙飞222222,我喜欢修仙', 'number': 22222},  # Keyword arguments passed into function when executed
        repeat=None,  # Repeat this number of times (None means repeat forever)
        queue_name='crontab_job',  # In which queue the job should be put in
        use_local_timezone=False  # Interpret hours in the local timezone
    )

#创建crontab作业

python manage.py rq_crontab_job

#检查 crontab 作业并将 crontab 作业放入队列

python manage.py rqscheduler --queue crontab_job

#运行crontab作业

python manage.py rqworker crontab_job

我认为第一个答案很好,但是在多进程环境中可能会有一些问题,你应该只运行一次来​​创建 crontab 作业!

于 2021-07-16T08:04:04.870 回答