我是 celery 新手,正在使用 Celery 运行异步任务。
- 我想将我的任务结果保存到 MongoDB。
- 我想使用 AMQP 代理。
芹菜项目示例对我没有多大帮助。谁能指出一些工作示例?
要使用 MongoDB 作为后端存储,您必须显式配置 Celery 以使用 MongoDB 作为后端。
http://docs.celeryproject.org/en/latest/getting-started/brokers/mongodb.html#broker-mongodb
正如您所说,文档没有显示完整的工作示例。我刚开始玩 Celery,但一直在使用 MongoDB。我使用 MongoDB 和 Celery 创建了一个简短的工作教程 http://skillachie.com/?p=953
然而,这些片段应该包含所有你需要让 Celery 和 MongoDB 打个招呼的世界
芹菜配置文件
from celery.schedules import crontab
CELERY_RESULT_BACKEND = "mongodb"
CELERY_MONGODB_BACKEND_SETTINGS = {
"host": "127.0.0.1",
"port": 27017,
"database": "jobs",
"taskmeta_collection": "stock_taskmeta_collection",
}
#used to schedule tasks periodically and passing optional arguments
#Can be very useful. Celery does not seem to support scheduled task but only periodic
CELERYBEAT_SCHEDULE = {
'every-minute': {
'task': 'tasks.add',
'schedule': crontab(minute='*/1'),
'args': (1,2),
},
}
任务.py
from celery import Celery
import time
#Specify mongodb host and datababse to connect to
BROKER_URL = 'mongodb://localhost:27017/jobs'
celery = Celery('EOD_TASKS',broker=BROKER_URL)
#Loads settings for Backend to store results of jobs
celery.config_from_object('celeryconfig')
@celery.task
def add(x, y):
time.sleep(30)
return x + y
我一直在测试 RabbitMQ 作为代理,MongoDB 作为后端,MongoDB 作为代理和后端。这些是我的发现。我希望他们能帮助那里的人。
假设:您在默认设置下运行 MongoDB(本地主机:21017)使用 conda 设置环境(您可以使用任何包管理器)
conda update -n base conda -c anaconda
conda create -n apps python=3.6 pymongo
conda install -n apps -c conda-forge celery
conda activate apps
更新我的 conda,创建一个名为 apps 的环境并安装 pymongo 和 celery。
RabbitMQ 作为代理,MongoDB 作为后端
sudo apt install rabbitmq-server
sudo service rabbitmq-server restart
sudo rabbitmqctl status
如果没有错误,则 rabbitmq 正在运行。让我们在 executor.py 中创建任务并在 runner.py 中调用它们
# executor.py
import time
from celery import Celery
BROKER_URL = 'amqp://localhost//'
BACKEND_URL = 'mongodb://localhost:27017/from_celery'
app = Celery('executor', broker=BROKER_URL, backend=BACKEND_URL)
@app.task
def pizza_bot(string:str, snooze=10):
'''return a dictionary with bot and
lower case string input
'''
print(f'Pretending to be working {snooze} seconds')
time.sleep(snooze)
return {'bot':string.lower()}
我们在 runner.py 中调用它们
# runner.py
import time
from datetime import datetime
from executor import pizza_bot
def run_pizza(msg:str, use_celery:bool=True):
start_time = datetime.now()
if use_celery: # Using celery
response = pizza_bot.delay(msg)
else: # Not using celery
response = pizza_bot(msg)
print(f'It took {datetime.now()-start_time}!'
' to run')
print(f'response: {response}')
return response
if __name__ == '__main__':
# Call using celery
response = run_pizza('This finishes extra fast')
while not response.ready():
print(f'[Waiting] It is {response.ready()} that we have results')
time.sleep(2) # sleep to second
print('\n We got results:')
print(response.result)
在终端 A 上运行 celery:
cd path_to_our_python_files
celery -A executor.app worker --loglevel=info
这仅在开发中完成。我想看看在后台发生了什么。在生产中,在 daemonize 中运行它。
在终端 B 上运行 runner.py:
cd path_to_our_python_files
conda activate apps
python runner.py
在终端 A 中,您将看到任务已收到,并在贪睡的几秒钟内完成。在您的 MongoDB 上,您将看到一个名为 from_celery 的新集合,其中包含消息和结果。
MongoDB 作为代理和后端
需要一个简单的修改来设置它。如前所述,我必须创建一个配置文件来设置 MongoDB 后端设置。
#mongo_config.py
#Backend Settings
CELERY_RESULT_BACKEND = "mongodb"
CELERY_MONGODB_BACKEND_SETTINGS = {
"host": "localhost",
"port": 27017,
"database": "celery",
"taskmeta_collection": "pizza_collection",
}
让我们创建 executor_updated.py,它与 executor.py 几乎相同,但代理现在是 MongoDB,后端是通过config_from_object添加的
# executor_updated.py
import time
from celery import Celery
BROKER_URL = 'mongodb://localhost:27017/celery'
app = Celery('executor_updated',broker=BROKER_URL)
#Load Backend Settings
app.config_from_object('mongo_config')
@app.task
def pizza_bot(string:str, snooze=10):
'''return a dictionary with bot and
lower case string input
'''
print(f'Pretending to be working {snooze} seconds')
time.sleep(snooze)
return {'bot':string.lower()}
在终端 C 上运行 celery:
cd path_to_our_python_files
celery -A executor_updated.app worker --loglevel=info
在终端 D 上运行 runner.py:
cd path_to_our_python_files
conda activate apps
python runner.py
现在我们将 MongoDB 作为代理和后端。在 MongoDB 中,您将看到一个名为celery的集合和一个表Pizza_collection
希望这有助于您开始使用这些很棒的工具。