我目前正在研究 Celery 用于视频处理后端。基本上我的问题如下:
- 我有一个前端 Web 服务器,它同时处理大量视频流(大约数千个)。
- 每个流必须独立并并行处理。
- 流处理可以分为两类操作:
- 逐帧操作(不需要有关前一帧或后一帧的信息的计算)
- 流级操作(在有序、相邻帧的子集上工作的计算)
鉴于第 3 点,我需要在整个过程中维护和更新帧的有序结构,并将该结构的子部分的农场计算提供给 Celery 工人。最初,我考虑按如下方式组织事情:
[frontend server] -stream-> [celery worker 1 (greenlet)] --> [celery worker 2 (prefork)]
这个想法是celery worker 1
执行主要是 I/O-bound的长时间运行的任务。本质上,这些任务只会做以下事情:
- 从前端服务器读取帧
- 从它的 base64 表示中解码帧
- 将其排入上述有序数据结构(
collections.deque
当前的对象)中。
任何受 CPU 限制的操作(即图像分析)都被运送到celery worker 2
.
我的问题如下:
我想将协程作为任务执行,这样我就有一个长时间运行的任务,我可以yield
从中不阻塞celery worker 1
的操作。换句话说,我希望能够做类似的事情:
def coroutine(func):
@wraps(func)
def start(*args, **kwargs):
cr = func(*args, **kwargs)
cr.next()
return cr
return start
@coroutine
def my_taks():
stream = deque() # collections.deque
source = MyAsynchronousInputThingy() # something i'll make myself, probably using select
while source.open:
if source.has_data:
stream.append(Frame(source.readline())) # read data, build frame and enqueue to persistent structure
yield # cooperatively interrupt so that other tasks can execute
有没有办法让基于协程的任务无限期地运行,理想情况下在它们被yield
编辑时产生结果?