这是基于@dalore's answer的完整工作示例。
首先tasks.py
。
import time
from celery import Celery, group
app = Celery('tasks', broker='pyamqp://guest@127.0.0.1//', backend='redis://localhost')
@app.task(trail=True)
def add(x, y):
time.sleep(1)
return x + y
@app.task(trail=True)
def group_add(l1, l2):
return group(add.s(x1, x2) for x1, x2 in zip(l1, l2))()
使用 Docker 启动 redis 服务器:docker run --name my-redis -p 6379:6379 -d redis
.
使用 Docker 启动 RabbitMQ docker run -d --hostname my-rabbit --name my-rabbit -p 5672:5672 rabbitmq:alpine
:.
在单独的 shell 中启动单个进程 celery worker celery -A tasks worker --loglevel=info -c 1
:.
然后运行下面的测试脚本。
from tasks import group_add
from tqdm import tqdm
total = 10
l1 = range(total)
l2 = range(total)
delayed_results = group_add.delay(l1, l2)
delayed_results.get() # Wait for parent task to be ready.
results = []
for result in tqdm(delayed_results.children[0], total=total):
results.append(result.get())
print(results)
您应该会看到类似下面的内容,其中进度条每秒增加 10%。
50%|##### | 5/10 [00:05<00:05, 1.01s/it
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
最后,清理你的 redis 和 rabbitmq 容器。
docker stop my-rabbit my-redis
docker rm my-rabbit my-redis