6

我是 celery 新手,正在使用 Celery 运行异步任务。

  1. 我想将我的任务结果保存到 MongoDB。
  2. 我想使用 AMQP 代理。

芹菜项目示例对我没有多大帮助。谁能指出一些工作示例?

4

2 回答 2

6

要使用 MongoDB 作为后端存储,您必须显式配置 Celery 以使用 MongoDB 作为后端。

http://docs.celeryproject.org/en/latest/getting-started/brokers/mongodb.html#broker-mongodb

正如您所说,文档没有显示完整的工作示例。我刚开始玩 Celery,但一直在使用 MongoDB。我使用 MongoDB 和 Celery 创建了一个简短的工作教程 http://skillachie.com/?p=953

然而,这些片段应该包含所有你需要让 Celery 和 MongoDB 打个招呼的世界

芹菜配置文件

 from celery.schedules import crontab

CELERY_RESULT_BACKEND = "mongodb"
CELERY_MONGODB_BACKEND_SETTINGS = {
    "host": "127.0.0.1",
    "port": 27017,
    "database": "jobs", 
    "taskmeta_collection": "stock_taskmeta_collection",
}

#used to schedule tasks periodically and passing optional arguments 
#Can be very useful. Celery does not seem to support scheduled task but only periodic
CELERYBEAT_SCHEDULE = {
    'every-minute': {
        'task': 'tasks.add',
        'schedule': crontab(minute='*/1'),
        'args': (1,2),
    },
}

任务.py

from celery import Celery
import time 

#Specify mongodb host and datababse to connect to
BROKER_URL = 'mongodb://localhost:27017/jobs'

celery = Celery('EOD_TASKS',broker=BROKER_URL)

#Loads settings for Backend to store results of jobs 
celery.config_from_object('celeryconfig')

@celery.task
def add(x, y):
    time.sleep(30)
    return x + y
于 2013-06-15T19:23:53.340 回答
4

我一直在测试 RabbitMQ 作为代理,MongoDB 作为后端,MongoDB 作为代理和后端。这些是我的发现。我希望他们能帮助那里的人。

假设:您在默认设置下运行 MongoDB(本地主机:21017)使用 conda 设置环境(您可以使用任何包管理器)

conda update -n base conda -c anaconda
conda create -n apps python=3.6 pymongo
conda install -n apps -c conda-forge celery
conda activate apps

更新我的 conda,创建一个名为 apps 的环境并安装 pymongo 和 celery。

RabbitMQ 作为代理,MongoDB 作为后端

sudo apt install rabbitmq-server
sudo service rabbitmq-server restart 
sudo rabbitmqctl status

如果没有错误,则 rabbitmq 正在运行。让我们在 executor.py 中创建任务并在 runner.py 中调用它们

# executor.py
import time
from celery import Celery

BROKER_URL = 'amqp://localhost//'
BACKEND_URL = 'mongodb://localhost:27017/from_celery'
app = Celery('executor', broker=BROKER_URL, backend=BACKEND_URL)

@app.task
def pizza_bot(string:str, snooze=10):
    '''return a dictionary with bot and
    lower case string input
    '''
    print(f'Pretending to be working {snooze} seconds')
    time.sleep(snooze)
    return {'bot':string.lower()}

我们在 runner.py 中调用它们

# runner.py
import time
from datetime import datetime

from executor import pizza_bot


def run_pizza(msg:str, use_celery:bool=True):
      
      
      start_time = datetime.now()
      if use_celery: # Using celery
            response = pizza_bot.delay(msg)
      else: # Not using celery
            response = pizza_bot(msg)

      print(f'It took {datetime.now()-start_time}!'
            ' to run')
      print(f'response: {response}')

      return response

if __name__ == '__main__':
   
      # Call using celery
      response = run_pizza('This finishes extra fast')
      while not response.ready():
            print(f'[Waiting] It is {response.ready()} that we have results')
            time.sleep(2) # sleep to second
      print('\n We got results:')
      print(response.result)

在终端 A 上运行 celery:

cd path_to_our_python_files
celery -A executor.app worker --loglevel=info

这仅在开发中完成。我想看看在后台发生了什么。在生产中,在 daemonize 中运行它。

在终端 B 上运行 runner.py:

cd path_to_our_python_files
conda activate apps
python runner.py

在终端 A 中,您将看到任务已收到,并在贪睡的几秒钟内完成。在您的 MongoDB 上,您将看到一个名为 from_celery 的新集合,其中包含消息和结果。

MongoDB 作为代理和后端

需要一个简单的修改来设置它。如前所述,我必须创建一个配置文件来设置 MongoDB 后端设置。

#mongo_config.py
#Backend Settings
CELERY_RESULT_BACKEND = "mongodb"
CELERY_MONGODB_BACKEND_SETTINGS = {
    "host": "localhost",
    "port": 27017,
    "database": "celery", 
    "taskmeta_collection": "pizza_collection",
}

让我们创建 executor_updated.py,它与 executor.py 几乎相同,但代理现在是 MongoDB,后端是通过config_from_object添加的

# executor_updated.py
import time
from celery import Celery

BROKER_URL = 'mongodb://localhost:27017/celery'
app = Celery('executor_updated',broker=BROKER_URL)

#Load Backend Settings
app.config_from_object('mongo_config')

@app.task
def pizza_bot(string:str, snooze=10):
    '''return a dictionary with bot and
    lower case string input
    '''
    print(f'Pretending to be working {snooze} seconds')
    time.sleep(snooze)
    return {'bot':string.lower()}

在终端 C 上运行 celery:

cd path_to_our_python_files
celery -A executor_updated.app worker --loglevel=info

在终端 D 上运行 runner.py:

cd path_to_our_python_files
conda activate apps
python runner.py

现在我们将 MongoDB 作为代理和后端。在 MongoDB 中,您将看到一个名为celery的集合和一个表Pizza_collection

希望这有助于您开始使用这些很棒的工具。

于 2019-04-17T06:50:44.037 回答