我有一个通过上传视图获取 csv 文件的应用程序,它将文件中的数据读取到列表中,然后将列表传递给 celery 任务,然后该任务使用生成器函数将数据拆分为 200 个块(进行一些预处理以确保没有重复)。然后将这些块传递给任务集,以处理块并将行写入数据库,然后子任务将写入的行的结果等传递给主任务以记录完整任务集的日志。
我在加载大量数据时遇到问题,其中 1 个将 200 行的块写入数据库的任务随机冻结,如果我终止此任务,则队列继续处理。这在使用本地运行的 db 和 celery 进行测试时从未发生过,所以我假设这是某种 mysql 连接问题,celery.log 和 mysql 日志没有显示任何内容,由于我缺乏经验,我不完全确定如何对其进行故障排除。
我正在为任务队列运行 django、mysql、rabbitmq,并使用 1 个服务器用于 db、webserver 和另一个用于任务处理(2 x AWS EC2 运行 ubuntu 服务器 12.04)。
我的代码:
#views.py
reader = csv.reader(request.FILES['f'], delimiter=',', quotechar='"')
#Skip the header
reader.next()
uploaddata = [row for row in reader]
LegacyUploadManager.delay(uploaddata, log.pk)
#tasks.py
class LegacyUploadManager(Task):
"""
Task to take uploads, split into smaller tasks and manage
"""
def run(self, f, logpk):
#Create chunk generator to split chunks into sets of 200
ChunkGen = UploadChunkGen(f, 200)
#Generate chunks and create a taskset of chunk tasks
self.tasks = []
for chunk in ChunkGen:
self.tasks.append(LegacyUploader.subtask([chunk]))
#create taskset and start processing
job = TaskSet(self.tasks)
result = job.apply_async()
#Wait for tasks to complete
while result.waiting():
time.sleep(10)
#Write results to database via ImportLogger task
ImporterLogger.delay(logpk, result.join())
class LegacyUploader(Task):
"""
Task for uploading products imported from ESP via a full product layout.
"""
def run(self, f):
for row in f:
Product.objects.create(row)
我想我只需要在我的 'job.apply_async()' 中添加 'timeout=....' 参数,但我不确定这样做的后果。即我会丢失所有未在超时任务上处理的行,还是会在任务超时之前再次写入数据库中的行?