2

我目前正在研究 Celery 用于视频处理后端。基本上我的问题如下:

  1. 我有一个前端 Web 服务器,它同时处理大量视频流(大约数千个)。
  2. 每个流必须独立并行处理。
  3. 流处理可以分为两类操作:
    1. 逐帧操作(不需要有关前一帧或后一帧的信息的计算)
    2. 流级操作(在有序、相邻帧的子集上工作的计算)

鉴于第 3 点,我需要在整个过程中维护和更新帧的有序结构,并将该结构的子部分的农场计算提供给 Celery 工人。最初,我考虑按如下方式组织事情:

[frontend server]  -stream-> [celery worker 1 (greenlet)] --> [celery worker 2 (prefork)]

这个想法是celery worker 1执行主要是 I/O-bound的长时间运行的任务。本质上,这些任务只会做以下事情:

  1. 从前端服务器读取帧
  2. 从它的 base64 表示中解码帧
  3. 将其排入上述有序数据结构(collections.deque当前的对象)中。

任何受 CPU 限制的操作(图像分析)都被运送到celery worker 2.

我的问题如下:

我想将协程作为任务执行,这样我就有一个长时间运行的任务,我可以yield从中不阻塞celery worker 1的操作。换句话说,我希望能够做类似的事情:

def coroutine(func):
    @wraps(func)
    def start(*args, **kwargs):
        cr = func(*args, **kwargs)
        cr.next()
        return cr
    return start

@coroutine
def my_taks():
    stream = deque()  # collections.deque
    source = MyAsynchronousInputThingy()  # something i'll make myself, probably using select

    while source.open:
        if source.has_data:
            stream.append(Frame(source.readline()))  # read data, build frame and enqueue to persistent structure
        yield  # cooperatively interrupt so that other tasks can execute

有没有办法让基于协程的任务无限期地运行,理想情况下在它们被yield编辑时产生结果?

4

1 回答 1

4

Eventlet 背后的主要思想是,您要编写同步代码,就像使用线程一样,socket.recv()应该阻塞当前线程,直到下一条语句。这种风格在调试时非常易于阅读、维护和推理。为了使事情变得有效和可扩展,在幕后,Eventlet 神奇地用绿色线程和 epoll/kqueue/etc 机制替换看似阻塞的代码,以在适当的时间唤醒这些绿色线程。

因此,您需要eventlet.monkey_patch()尽快执行(例如模块中的第二行)并确保在MyInputThingy. 忘记异步,只需像编写线程一样编写普通的阻塞代码。

Eventlet 让同步代码再次变得美好。

于 2014-11-02T09:15:30.083 回答