我想将django_rq
andrq-scheduler
用于离线任务,但我不确定在哪里调用rq-scheduler
' 安排重复任务的能力。现在,我已将我的日程安排添加到tasks.py
我的应用程序中的一个模块中,并将其导入到__init__
.py 中。不过,必须有更好的方法来做到这一点,对吧?
提前致谢。
我想将django_rq
andrq-scheduler
用于离线任务,但我不确定在哪里调用rq-scheduler
' 安排重复任务的能力。现在,我已将我的日程安排添加到tasks.py
我的应用程序中的一个模块中,并将其导入到__init__
.py 中。不过,必须有更好的方法来做到这一点,对吧?
提前致谢。
我发现运行它的最佳位置是在您AppConfig
的apps.py
.
def ready(self):
scheduler = django_rq.get_scheduler('default')
# Delete any existing jobs in the scheduler when the app starts up
for job in scheduler.get_jobs():
job.delete()
# Have 'mytask' run every 5 minutes
scheduler.schedule(datetime.utcnow(), 'mytask', interval=60*5)
我创建了一个自定义管理命令,它修改和rqscheduler
替换django_rq
. 此处提供了一个示例:https ://github.com/rq/rq-scheduler/issues/51#issuecomment-362352497
我已经__init__
在我的一个项目应用程序中的一个模块中添加了调度(就 Django 而言),但使用了一个小功能,可以防止两次或更多排队作业。调度策略可能取决于您的特定需求(即您可能需要额外检查工作参数)。
适合我并满足我需求的代码:
import django_rq
from collections import defaultdict
import tasks
scheduler = django_rq.get_scheduler('default')
jobs = scheduler.get_jobs()
functions = defaultdict(lambda: list())
map(lambda x: functions[x.func].append(x.meta.get('interval')), jobs)
now = datetime.datetime.now()
def schedule_once(func, interval):
"""
Schedule job once or reschedule when interval changes
"""
if not func in functions or not interval in functions[func]\
or len(functions[func])>1:
# clear all scheduled jobs for this function
map(scheduler.cancel, filter(lambda x: x.func==func, jobs))
# schedule with new interval
scheduler.schedule(now+datetime.timedelta(seconds=interval), func,
interval=interval)
schedule_once(tasks.some_task_a, interval=60*5)
schedule_once(tasks.some_task_b, interval=120)
我还包装了这个片段以避免在包级别导入:
def init_scheduler():
# paste here initialization code
init_scheduler()
您应该使用 django 命令来运行计划作业https://docs.djangoproject.com/en/3.2/howto/custom-management-commands/
像这样
class Command(BaseCommand):
def handle(self, *args, **options):
scheduler = django_rq.get_scheduler('crontab_job')
for job in scheduler.get_jobs():
scheduler.cancel(job)
# 定时任务例子1
scheduler.cron(
"*/3 * * * *", # 每周一零点零时零分执行 0 0 * * 0 测试可以使用 */3 * * * * 每3分钟执行一次
func=gong_an_job, # Function to be queued
kwargs={'msg': '我是王龙飞1,我喜欢修仙', 'number': 1}, # Keyword arguments passed into function when executed
repeat=None, # Repeat this number of times (None means repeat forever)
queue_name='crontab_job', # In which queue the job should be put in
use_local_timezone=False # Interpret hours in the local timezone
)
# 定时任务例子2
scheduler.cron(
"*/5 * * * *", # 每周一零点零时零分执行 0 0 * * 0 测试可以使用 */3 * * * * 每3分钟执行一次
func=gong_an_job, # Function to be queued
kwargs={'msg': '我是王龙飞222222,我喜欢修仙', 'number': 22222}, # Keyword arguments passed into function when executed
repeat=None, # Repeat this number of times (None means repeat forever)
queue_name='crontab_job', # In which queue the job should be put in
use_local_timezone=False # Interpret hours in the local timezone
)
#创建crontab作业
python manage.py rq_crontab_job
#检查 crontab 作业并将 crontab 作业放入队列
python manage.py rqscheduler --queue crontab_job
#运行crontab作业
python manage.py rqworker crontab_job
我认为第一个答案很好,但是在多进程环境中可能会有一些问题,你应该只运行一次来创建 crontab 作业!