最近我一直在使用 python 3.x 在 Ubuntu 中使用 celery 和 Flower(用于单台机器上的仪表板和任务可视化)。首先我安装了rabbitmq-server、radis、celery 和flower。然后我创建了一个名为的脚本,tasks.py
其中包含以下内容:
from celery import Celery
# py-advanced-message-queuing-protocol
app = Celery('tasks', backend='redis://localhost', broker='pyamqp://localhost//')
@app.task
def intensive_sum1(num):
val = sum(x**4 for x in range(num))
return val
@app.task
def intensive_sum2(num):
val = sum(x**4 for x in range(num))
return val
@app.task
def intensive_sum3(num):
val = sum(x**4 for x in range(num))
return val
然后我创建了一个run.py
包含
from tasks import intensive_sum1, intensive_sum2, intensive_sum3
import time
start = time.time()
result1 = intensive_sum1.delay(100000000)
result2 = intensive_sum2.delay(100000000)
result3 = intensive_sum3.delay(100000000)
print(result1.get(), result2.get(), result3.get())
end = time.time()
print('time: ', end - start)
start = time.time()
result1 = sum(x**4 for x in range(100000000))
result2 = sum(x**4 for x in range(100000000))
result3 = sum(x**4 for x in range(100000000))
print(result1, result2, result3)
end = time.time()
print('time: ', end - start)
在运行后者之前,我启动了两个不同的终端并将目录更改为两个脚本的位置。sudo celery -A tasks flower
然后我在一个终端和另一个终端跑celery -A tasks worker --loglevel=info
。事实证明(意外惊喜) celery 可以将每个任务分配到一个单独的核心,从而节省大量时间。当然,这种节省时间只适用于大型函数,因为较小的函数会产生线程生成开销,这不会带来任何好处。
这让我想到了另一个问题。假设我有 3 台机器连接到同一个 WIFI 路由器,而不是一台机器。我可以使用ifconfig
命令计算出每台 Ubuntu 机器的 IP 地址。让我们说其中一台机器是一台主机,其中包含一个main.py
使用 Opencv-Python 捕获对象捕获实时图像的脚本。然后它获取每个图像,将其序列化并将其作为消息发送到两台工作机器。两台工作机器独立工作,并且都对相同的图像进行反序列化。一台工作机器进行猫分类并返回猫的概率,另一台机器进行狗分类并返回狗的概率。一台工作机器可能需要比另一台更长的时间才能得出结论。但是,对于该特定帧,主机需要等待两个分类结果,然后再将一些结果叠加在该特定帧的顶部。本能地,我被引导相信主机在继续之前需要检查两个工作是否都准备好了(e.g. result_worker_one.ready() == result_worker_two.ready() == True
)。我怎样才能实现这种行为?如何在主机中序列化一个 RGB 图像并在工作机器中反序列化?backend
每台broker
机器需要什么?如何将其设置为客户端服务器架构?