2

我正在开发一个系统,该系统将为 AI 项目构建一个巨大的 n-gram 模型。
我的管道如下:
资源输入-->获取数据-->解析器-->培训师
资源输入(基本上是必须解析的 URL)不是恒定的,这意味着我可以一次引入大量数千个资源,后来又一大堆几十个等等。

我的想法是将管道的每一步都实现为 Celery 任务并将其部署在云上(例如,使用 Heroku 的 worker dynos)。但是我是 Celery 的新手,我对如何将这些任务排队以使我的工人 100% 工作并同时保持系统的完整性有疑问。
直接的方法是在前一个任务完成后立即开始排队任务,例如,如果我得到 1000 个项目的资源输入,我会安排 1000 个“获取数据”任务,每个任务都会排队一个“解析”任务完成后等等。但这会导致一个巨大的队列,因为在这些任务完成之前会有更多的资源进入,而且我知道构建模型需要几个月的时间(如果它完成的话!)。

所以我不确定 Celery 是否可以在不陷入内存问题(Heroku 有其限制)或我现在无法想象的任何其他问题的情况下处理所有这些问题。或者,也许我应该使用更复杂的技术,例如每 X 分钟调度一次任务块,将部分结果存储在数据库中等。这可能会避免其中一些问题,但我不会得到 100% 的工作时间。

有什么想法吗?
谢谢!


编辑

我的问题的答案实际上在接受答案的评论中

4

1 回答 1

2

通过为每个任务设置单独的队列并为每个队列运行一个专用的工作程序,您可以确保您的系统将利用 100% 的系统资源,同时关注每个任务。此外,您可以添加工作人员以根据任务运行时平衡任务处理。

例如,定义任务

@celery.task
def fetch(url):
    # fetch url
    return html

@celery.task
def parse(html):
    pass

并配置自动路由

CELERY_ROUTES = {'tasks.fetch': {'queue': 'fetch_queue'}, 'tasks.parse': {'queue': 'parse_queue'}}

和运行工人:

$ celery worker -Q fetch_queue

$ celery worker -Q parse_queue

每种任务类型都有一个单独的工作人员。

使用回调,您可以在获取后轻松解析:

fetch.apply_async((url), link=parse.subtask())

PS 对于获取工作人员,您可以使用Eventlet 池来利用异步 IO。

于 2012-07-28T09:12:04.950 回答