我的芹菜任务设置如下:
@celeryapp.task
def heavy_task(x, y):
# some stuff
for _ in range(10000):
heavy_task_2.apply_async(args=(x,y),
countdown=random.randint(60,120))
return x+y
@celeryapp.task
def heavy_task_2(x, y):
# some stuff
return x+y
我有 5 个工人(prefork),每个工人有 30 个并发。所有运行 prefetch_multiplier=1 和 -Ofair 参数。我正在使用带有 CELERY_ACKS_LATE=True 的 redis 代理
现在,我从 celery 节拍计划中调用 heavy_task(1,2).delay() ,任务转到任何 1 个工作人员,然后它创建的所有 10,000 个任务仅与该工作人员一起驻留,而不发布给经纪人,以便其他工作人员可以从事这些任务。worker 的 prefetch_count 不断增加到 10,000
只有在原始工作人员的内存开始消耗接近 90% 之后,这些任务才会发布到代理,从而转移到其他工作人员。有时工作人员也会被操作系统杀死,所以我的任务永远丢失了,因为它们在 redis 代理中没有“未被确认”。
我应该怎么做才能让这些次要任务立即交给经纪人,而不会让一名工人承担所有任务?