我在一个可访问的 mongoDB 集合中有几十万个文档,这些文档遵循名为 MyDoc 的 MongoEngine Schema。有一些东西(我们称之为 my_operation)要在这些文档中的每一个上运行。my_operation 还需要(只是读取,不改变)一个名为 data_dict 的 OrderedDict,它是通过 create_data_dict 函数构造的。我希望能够通过 celery workers 并行运行 my_operation 。
设置包括 django、mongo、mongoengine 和 celery。
选项1:
@celery.task()
def my_operation(my_doc_list):
data_dict = create_data_dict()
for doc in my_doc_list:
do_something_to_doc(data_dict, doc)
doc.save()
def create_data_dict():
#blah blah blah
return data_dict
#So I run everything like this:
batch_size = len(MyDoc.objects)/no_of_celery_workers
for b in xrange(0, len(MyDoc.objects), batch_size):
my_operation.delay(MyDoc.objects[b:b+batch_size])
选项 2: my_operation 采用 data_dict 和 MyDoc 实例
@celery.task()
def my_operation(data_dict, my_doc):
do_something_to_doc(data_dict, my_doc)
my_doc.save()
def create_data_dict():
#blah blah blah
return data_dict
#So I run everything like this:
data_dict = create_data_dict()
celery.group([my_operation.s(data_dict, my_doc) for my_doc in MyDoc.objects]).apply_async()
选项 3:
@celery.task()
def my_operation(my_doc):
data_dict = create_data_dict()
do_something_to_doc(data_dict, my_doc)
my_doc.save()
def create_data_dict():
#blah blah blah
return data_dict
#So I run everything like this:
celery.group([my_operation.s(my_doc) for my_doc in MyDoc.objects]).apply_async()
选项 4:
@celery.task()
def my_operation(my_doc):
data_dict = get_data_dict()
do_something_to_doc(data_dict, my_doc)
my_doc.save()
def create_data_dict():
#blah blah blah
return data_dict
def get_data_dict():
data_dict = cache.get("data_dict")
if data_dict is None:
data_dict = create_data_dict()
cache.set("data_dict", data_dict)
return data_dict
#So I run everything like this:
celery.group([my_operation.s(my_doc) for my_doc in MyDoc.objects]).apply_async()
如果 Option1 有效,我可能不会问这个问题,但可惜我不能将查询集响应切片或查询集本身传递给 celery 工人,因为它们不可腌制。回溯似乎主要是这样说的。
使用 Option2,我最终会在每项任务中传递 data_dict,这听起来不太吸引人。如果我在多台机器上运行 celery 工作程序(我确实打算这样做)data_dict,它本质上只需要传递一次就会毫无价值地消耗大量网络。
在 Option3 的情况下,data_dict 会为每个文档重新创建,现在这似乎是对处理能力的巨大浪费。
选项 4:我使用缓存来备份 data_dict 而不是重新计算它或与每个文档一起重新传输它。这听起来像是最好的主意,但有一个问题。下次我想对所有 MyDocs 执行 my_operation 时,我希望重新计算 data_dict,无论它是否在缓存中。有没有办法做到这一点?
问题:最好的方法是什么?