我正在整理一个基于 Celery 的数据摄取管道。我在文档中的任何地方都看不到的一件事是如何构建一个只有在有工作要做时才运行的流程。(老实说,这似乎是 Celery 设计中的一个重大缺陷)
我知道 Celery 本身不会处理实际服务器的自动缩放,这很好,但是当我模拟这个 Flower 时,除非提交任务时工作人员在线,否则看不到提交的工作。为什么?除非有实际工作要做,否则我会喜欢一个不为服务器付费的世界。
工作流程:
想象一个 While 循环,它使用该
celery_app.send_task
方法添加要处理的新数据。我有自定义代码,可以看到队列中有 N 条消息。它启动一个服务器,并为该任务启动一个 Celery 工作者。
Celery worker 上线,开始工作。
但。
Flower 没有该任务的记录,即使我看到经纪人有一条“消息”,并且在观察工人的输出时,我可以看到它做了它的事情。
如果我让工作人员在线,然后提交一个任务,它会监控一切都很好而且花花公子。
有谁知道为什么?