7

我正在开发一个小型但计算密集型的 Python 应用程序。计算密集型工作可以分成几个可以同时执行的部分。我正在尝试确定一个合适的堆栈来实现这一点。

目前我计划在 Apache2+WSGI 上使用 Flask 应用程序和 Celery 作为任务队列。

在下面,如果有 3 个或更多工作人员可用,will 和 execute 会同时执行a_long_process()another_long_process()yet_another_long_process()进程执行时 Flask 应用程序会被阻塞吗?

从 Flask 应用程序:

@myapp.route('/foo')
def bar():
    task_1 = a_long_process.delay(x, y)
    task_1_result = task_1.get(timeout=1)
    task_2 = another_long_process.delay(x, y)
    task_2_result = task_2.get(timeout=1)
    task_3 = yet_another_long_process.delay(x, y)
    task_3_result = task_3.get(timeout=1)
    return task_1 + task_2 + task_3

任务.py:

from celery import Celery
celery = Celery('tasks', broker="amqp://guest@localhost//", backend="amqp://")
@celery.task
def a_long_process(x, y):
    return something
@celery.task
def another_long_process(x, y):
    return something_else
@celery.task
def yet_another_long_process(x, y):
    return a_third_thing
4

3 回答 3

7

您应该更改代码,以便工作人员可以并行工作:

@myapp.route('/foo')
def bar():
    # start tasks
    task_1 = a_long_process.delay(x, y)
    task_2 = another_long_process.delay(x, y)
    task_3 = yet_another_long_process.delay(x, y)
    # fetch results
    try:
        task_1_result = task_1.get(timeout=1)
        task_2_result = task_2.get(timeout=1)
        task_3_result = task_3.get(timeout=1)
    except TimeoutError:
        # Handle this or don't specify a timeout.
        raise
    # combine results
    return task_1 + task_2 + task_3

此代码将阻塞,直到所有结果都可用(或达到超时)。

进程执行时 Flask 应用程序会被阻塞吗?

此代码只会阻止您的 WSGI 容器的一个工作人员。整个站点是否无响应取决于您使用的 WSGI 容器。(例如 Apache + mod_wsgi、uWSGI、gunicorn 等)大多数 WSGI 容器会产生多个工作人员,因此在您的代码等待任务结果时只有一个工作人员会被阻塞。

对于这种应用程序,我建议使用gevent,它为每个请求生成一个单独的 greenlet,并且非常轻量级。

于 2013-01-29T17:28:04.403 回答
1

根据文档result.get(),它会等到结果准备好后再返回,所以通常它实际上是阻塞的。但是,由于您有,如果任务完成时间超过 1 秒timeout=1,调用将引发 TimeoutError。get()

默认情况下,Celery 工作线程的并发级别设置为等于可用 CPU 的数量。并发级别似乎决定了可用于处理任务的线程数。因此,在并发级别 >= 3 的情况下,Celery 工作人员似乎应该能够同时处理那么多任务(也许具有更高 Celery 专业知识的人可以验证这一点?)。

于 2013-01-29T17:19:32.117 回答
0

使用 celery canvas 的Group功能:

group 原语是一个签名,它采用应该并行应用的任务列表。

这是文档中提供的示例:

from celery import group
from proj.tasks import add

g = group(add.s(2, 2), add.s(4, 4))
res = g()
res.get()

哪个输出[4, 8]

于 2014-07-17T12:40:04.213 回答