@asksol 的答案是在 Django 应用程序中需要什么。
对于非 django 应用程序,您可以使用类似于Django 的 django-celery-beatcelery-sqlalchemy-scheduler
建模,因为它也使用 database 而不是 file 。celerybeat-schedule
这是一个运行时添加新任务的示例。
任务.py
from celery import Celery
celery = Celery('tasks')
beat_dburi = 'sqlite:///schedule.db'
celery.conf.update(
{'beat_dburi': beat_dburi}
)
@celery.task
def my_task(arg1, arg2, be_careful):
print(f"{arg1} {arg2} be_careful {be_careful}")
日志(生产者)
$ celery --app=tasks beat --scheduler=celery_sqlalchemy_scheduler.schedulers:DatabaseScheduler --loglevel=INFO
celery beat v5.1.2 (sun-harmonics) is starting.
[2021-08-20 15:20:20,927: INFO/MainProcess] beat: Starting...
日志(消费者)
$ celery --app=tasks worker --queues=celery --loglevel=INFO
-------------- celery@ubuntu20 v5.1.2 (sun-harmonics)
[2021-08-20 15:20:02,287: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
数据库计划
$ sqlite3 schedule.db
sqlite> .databases
main: /home/nponcian/Documents/Program/1/db/schedule.db
sqlite> .tables
celery_crontab_schedule celery_periodic_task_changed
celery_interval_schedule celery_solar_schedule
celery_periodic_task
sqlite> select * from celery_periodic_task;
1|celery.backend_cleanup|celery.backend_cleanup||1||[]|{}|||||2021-08-20 19:20:20.955246|0||1||0|2021-08-20 07:20:20|
现在,当这些工作人员已经在运行时,让我们通过添加一个新的计划任务来更新计划。请注意,这是在运行时,无需重新启动工作人员。
$ python3
>>> # Setup the session.
>>> from celery_sqlalchemy_scheduler.models import PeriodicTask, IntervalSchedule
>>> from celery_sqlalchemy_scheduler.session import SessionManager
>>> from tasks import beat_dburi
>>> session_manager = SessionManager()
>>> engine, Session = session_manager.create_session(beat_dburi)
>>> session = Session()
>>>
>>> # Setup the schedule (executes every 10 seconds).
>>> schedule = session.query(IntervalSchedule).filter_by(every=10, period=IntervalSchedule.SECONDS).first()
>>> if not schedule:
... schedule = IntervalSchedule(every=10, period=IntervalSchedule.SECONDS)
... session.add(schedule)
... session.commit()
...
>>>
>>> # Create the periodic task
>>> import json
>>> periodic_task = PeriodicTask(
... interval=schedule, # we created this above.
... name='My task', # simply describes this periodic task.
... task='tasks.my_task', # name of task.
... args=json.dumps(['arg1', 'arg2']),
... kwargs=json.dumps({
... 'be_careful': True,
... }),
... )
>>> session.add(periodic_task)
>>> session.commit()
数据库计划(更新)
- 我们现在可以看到,新添加的时间表已反映到由 celery beat 调度程序持续读取的数据库中。因此,如果 args 或 kwargs 的值有任何更新,我们可以轻松地对数据库执行 SQL 更新,并且它应该与正在运行的工作人员实时反映(无需重新启动)。
sqlite> select * from celery_periodic_task;
1|celery.backend_cleanup|celery.backend_cleanup||1||[]|{}|||||2021-08-20 19:20:20.955246|0||1||0|2021-08-20 07:20:20|
2|My task|tasks.my_task|1|||["arg1", "arg2"]|{"be_careful": true}||||||0||1||0|2021-08-20 07:26:49|
日志(生产者)
[2021-08-20 15:26:51,768: INFO/MainProcess] DatabaseScheduler: Schedule changed.
[2021-08-20 15:26:51,768: INFO/MainProcess] Writing entries...
[2021-08-20 15:27:01,789: INFO/MainProcess] Scheduler: Sending due task My task (tasks.my_task)
[2021-08-20 15:27:11,776: INFO/MainProcess] Scheduler: Sending due task My task (tasks.my_task)
[2021-08-20 15:27:21,791: INFO/MainProcess] Scheduler: Sending due task My task (tasks.my_task)
日志(消费者)
[2021-08-20 15:27:01,797: INFO/MainProcess] Task tasks.my_task[04dcb40c-0a77-437b-a129-57eb52850a51] received
[2021-08-20 15:27:01,798: WARNING/ForkPoolWorker-4] arg1 arg2 be_careful True
[2021-08-20 15:27:01,799: WARNING/ForkPoolWorker-4]
[2021-08-20 15:27:01,799: INFO/ForkPoolWorker-4] Task tasks.my_task[04dcb40c-0a77-437b-a129-57eb52850a51] succeeded in 0.000763321000704309s: None
[2021-08-20 15:27:11,783: INFO/MainProcess] Task tasks.my_task[e8370a6b-085f-4bd5-b7ad-8f85f4b61908] received
[2021-08-20 15:27:11,786: WARNING/ForkPoolWorker-4] arg1 arg2 be_careful True
[2021-08-20 15:27:11,786: WARNING/ForkPoolWorker-4]
[2021-08-20 15:27:11,787: INFO/ForkPoolWorker-4] Task tasks.my_task[e8370a6b-085f-4bd5-b7ad-8f85f4b61908] succeeded in 0.0006725780003762338s: None
[2021-08-20 15:27:21,797: INFO/MainProcess] Task tasks.my_task[c14d875d-7f6c-45c2-a76b-4e9483273185] received
[2021-08-20 15:27:21,799: WARNING/ForkPoolWorker-4] arg1 arg2 be_careful True
[2021-08-20 15:27:21,799: WARNING/ForkPoolWorker-4]
[2021-08-20 15:27:21,800: INFO/ForkPoolWorker-4] Task tasks.my_task[c14d875d-7f6c-45c2-a76b-4e9483273185] succeeded in 0.0006371149993356084s: None