2

最近我一直在使用 python 3.x 在 Ubuntu 中使用 celery 和 Flower(用于单台机器上的仪表板和任务可视化)。首先我安装了rabbitmq-server、radis、celery 和flower。然后我创建了一个名为的脚本,tasks.py其中包含以下内容:

from celery import Celery

# py-advanced-message-queuing-protocol
app = Celery('tasks', backend='redis://localhost', broker='pyamqp://localhost//')

@app.task
def intensive_sum1(num):
    val = sum(x**4 for x in range(num))
    return val


@app.task
def intensive_sum2(num):
    val = sum(x**4 for x in range(num))
    return val

@app.task
def intensive_sum3(num):
    val = sum(x**4 for x in range(num))
    return val

然后我创建了一个run.py包含

from tasks import intensive_sum1, intensive_sum2, intensive_sum3
import time

start = time.time()
result1 = intensive_sum1.delay(100000000)
result2 = intensive_sum2.delay(100000000)
result3 = intensive_sum3.delay(100000000)
print(result1.get(), result2.get(), result3.get())
end = time.time()
print('time: ', end - start)

start = time.time()
result1 = sum(x**4 for x in range(100000000))
result2 = sum(x**4 for x in range(100000000))
result3 = sum(x**4 for x in range(100000000))
print(result1, result2, result3)
end = time.time()
print('time: ', end - start)

在运行后者之前,我启动了两个不同的终端并将目录更改为两个脚本的位置。sudo celery -A tasks flower然后我在一个终端和另一个终端跑celery -A tasks worker --loglevel=info。事实证明(意外惊喜) celery 可以将每个任务分配到一个单独的核心,从而节省大量时间。当然,这种节省时间只适用于大型函数,因为较小的函数会产生线程生成开销,这不会带来任何好处。

这让我想到了另一个问题。假设我有 3 台机器连接到同一个 WIFI 路由器,而不是一台机器。我可以使用ifconfig命令计算出每台 Ubuntu 机器的 IP 地址。让我们说其中一台机器是一台主机,其中包含一个main.py使用 Opencv-Python 捕获对象捕获实时图像的脚本。然后它获取每个图像,将其序列化并将其作为消息发送到两台工作机器。两台工作机器独立工作,并且都对相同的图像进行反序列化。一台工作机器进行猫分类并返回猫的概率,另一台机器进行狗分类并返回狗的概率。一台工作机器可能需要比另一台更长的时间才能得出结论。但是,对于该特定帧,主机需要等待两个分类结果,然后再将一些结果叠加在该特定帧的顶部。本能地,我被引导相信主机在继续之前需要检查两个工作是否都准备好了(e.g. result_worker_one.ready() == result_worker_two.ready() == True)。我怎样才能实现这种行为?如何在主机中序列化一个 RGB 图像并在工作机器中反序列化?backend每台broker机器需要什么?如何将其设置为客户端服务器架构?

4

1 回答 1

2

您对在多台机器上分配作业是正确的。事实上,它是芹菜的主要用途之一。

  1. 要检查两个异步作业是否已完成,您可以使用 celery 中的 Groups 和 Chords 选项。假设您的两个 celery 任务如下:

    @app.task
    def check_dog():
        #dog_classification code
    
    @app.task
    def check_cat():
        #cat classification code
    

    您可以将这些任务组合在一起,然后使用一个和弦(一个和弦是一个只有在一个组中的所有任务都执行完后才执行的任务)在这两个函数都执行后进入下一步。在下面显示的回调函数中包含两个任务之后所需的任何内容。相关文档可以在这里找到:http: //docs.celeryproject.org/en/master/userguide/canvas.html#groups

    chord([check_dog(),check_cat()])(callback)
    
  2. 看看这个图像序列化部分:将图像传递给 celery 任务

  3. 为了回答问题的第 3 部分,Celery 固有地遵循客户端服务器架构来支持并行计算。每当您调用 celery 任务时,它都会在您设置的消息代理上放置一条消息(在您的情况下,您使用的是 rabbitMQ)。此消息将包含有关要运行的任务以及所有必需参数的信息。消息队列将跨不同机器将消息传递给 Celery 工作人员。一旦一个worker得到消息,worker就会执行消息描述的任务。因此,如果你想在多台计算机之间分配你的任务,你所要做的就是在每台机器上启动一个 celery worker,它在你的主机上监听你的消息队列。您可以按如下方式配置工作人员

    app = Celery('tasks', backend='redis://localhost', broker='pyamqp://<username>:<password>@<ip of task queue host>')
    

    确保为每个 celery worker 提供一个任务文件,因为传递给 worker 的消息不包含源代码,而只包含任务名称本身。

于 2017-12-21T19:36:01.740 回答