1

我在一个可访问的 mongoDB 集合中有几十万个文档,这些文档遵循名为 MyDoc 的 MongoEngine Schema。有一些东西(我们称之为 my_operation)要在这些文档中的每一个上运行。my_operation 还需要(只是读取,不改变)一个名为 data_dict 的 OrderedDict,它是通过 create_data_dict 函数构造的。我希望能够通过 celery workers 并行运行 my_operation 。

设置包括 django、mongo、mongoengine 和 celery。

选项1:

@celery.task()
def my_operation(my_doc_list):
   data_dict = create_data_dict()
   for doc in my_doc_list:
       do_something_to_doc(data_dict, doc)
       doc.save()

def create_data_dict():
   #blah blah blah
   return data_dict

#So I run everything like this:
batch_size = len(MyDoc.objects)/no_of_celery_workers
for b in xrange(0, len(MyDoc.objects), batch_size):
    my_operation.delay(MyDoc.objects[b:b+batch_size])

选项 2: my_operation 采用 data_dict 和 MyDoc 实例

@celery.task()
def my_operation(data_dict, my_doc):
   do_something_to_doc(data_dict, my_doc)
   my_doc.save()

def create_data_dict():
   #blah blah blah
   return data_dict

#So I run everything like this:
data_dict = create_data_dict()
celery.group([my_operation.s(data_dict, my_doc) for my_doc in MyDoc.objects]).apply_async()

选项 3:

@celery.task()
def my_operation(my_doc):
   data_dict = create_data_dict()
   do_something_to_doc(data_dict, my_doc)
   my_doc.save()

def create_data_dict():
   #blah blah blah
   return data_dict

#So I run everything like this:
celery.group([my_operation.s(my_doc) for my_doc in MyDoc.objects]).apply_async()

选项 4:

@celery.task()
def my_operation(my_doc):
   data_dict = get_data_dict()
   do_something_to_doc(data_dict, my_doc)
   my_doc.save()

def create_data_dict():
   #blah blah blah
   return data_dict

def get_data_dict():
   data_dict = cache.get("data_dict")
   if data_dict is None:
        data_dict = create_data_dict()
        cache.set("data_dict", data_dict)
   return data_dict

#So I run everything like this:
celery.group([my_operation.s(my_doc) for my_doc in MyDoc.objects]).apply_async()

如果 Option1 有效,我可能不会问这个问题,但可惜我不能将查询集响应切片或查询集本身传递给 celery 工人,因为它们不可腌制。回溯似乎主要是这样说的。

使用 Option2,我最终会在每项任务中传递 data_dict,这听起来不太吸引人。如果我在多台机器上运行 celery 工作程序(我确实打算这样做)data_dict,它本质上只需要传递一次就会毫无价值地消耗大量网络。

在 Option3 的情况下,data_dict 会为每个文档重新创建,现在这似乎是对处理能力的巨大浪费。

选项 4:我使用缓存来备份 data_dict 而不是重新计算它或与每个文档一起重新传输它。这听起来像是最好的主意,但有一个问题。下次我想对所有 MyDocs 执行 my_operation 时,我希望重新计算 data_dict,无论它是否在缓存中。有没有办法做到这一点?

问题:最好的方法是什么?

4

1 回答 1

0

从表面上看,选项 2 听起来就像选项 1 - 因为您正在传递对象或其数据。

这里有很多未知数,但鉴于您在选项 4 中提到可能存在一些缓存争用/竞争条件,我可能会选择每次生成数据并仅传递对象 ID,或者如果这过于昂贵,我会实施作为任务的一部分,用于存储缓存数据然后清理缓存(findAndModify 以停止竞争条件)的集合。

于 2013-03-01T10:16:34.143 回答