2

我正在尝试掌握 NDB 引入的异步操作,我想使用@ndb.tasklet来异步我的一些工作。

简单的例子是在被覆盖的 string_id 生成get_or_insert_async

这是做事的正确方法吗?这里有什么可以改进的?

@classmethod
@ndb.tasklet
def get_or_insert_async(cls, *args):
    id = cls.make_string_id(*args) 
    model = yield super(MyModel, cls).get_or_insert_async(id)
    raise ndb.Return(model)

另一个例子是以扇出的方式在循环中做事。这个对吗?

@classmethod
@ndb.tasklet
def do_stuff(cls, some_collection):

    @ndb.tasklet
    def internal_tasklet(data):
        do_some_long_taking_stuff(data)
        id = make_stuff_needed_for_id(data)
        model = yield cls.get_or_insert_async(id)
        model.long_processing(data)
        yield model.put_async()
        raise ndb.Return(None)

    for data in some_collection:
        # will it parallelise internal_tasklet execution? 
        yield internal_tasklet(data)

    raise ndb.Return(None)

编辑:

正如对整个概念的理解,yields这里提供一个Future对象,然后并行收集(在可能的情况下)并异步执行。我对么?

在尼克的提示之后(这是你的意思吗?):

@classmethod
@ndb.tasklet
def do_stuff(cls, some_collection):

    @ndb.tasklet
    def internal_tasklet(data):
        do_some_long_taking_stuff(data)
        id = make_stuff_needed_for_id(data)
        model = yield cls.get_or_insert_async(id)
        model.long_processing(data)
        raise ndb.Return(model)                # change here

    models = []
    for data in some_collection:
        # will it parallelise internal_tasklet execution? 
        m = yield internal_tasklet(data)       # change here
        models.appedn(m)                       # change here

    keys = yield ndb.put_multi_async(models)   # change here
    raise ndb.Return(keys)                     # change here

编辑:

新修订版……</p>

@classmethod
@ndb.tasklet
def do_stuff(cls, some_collection):

    @ndb.tasklet
    def internal_tasklet(data):
        do_some_long_taking_stuff(data)
        id = make_stuff_needed_for_id(data)
        model = yield cls.get_or_insert_async(id)
        model.long_processing(data)
        raise ndb.Return(model)                

    futures = []
    for data in some_collection:
        # tasklets won't run in parallel but while 
        # one is waiting on a yield (and RPC underneath)  
        # the other will advance it's execution 
        # up to a next yield or return
        fut = internal_tasklet(data))          # change here
        futures.append(fut)                    # change here

    Future.wait_all(futures)                   # change here

    models = [fut.get_result() for fut in futures]
    keys = yield ndb.put_multi_async(models)   # change here
    raise ndb.Return(keys)                     # change here
4

1 回答 1

1

如果您只想使用不同的参数调用异步操作,则不需要使用 tasklet - 只需返回包装函数的返回值,如下所示:

def get_or_insert_async(cls, *args):
  id = cls.make_string_id(*args)
  return super(MyModel, cls).get_or_insert_async(id)

不过,出于以下几个原因,我会对此保持谨慎:您正在更改内置函数的含义,这通常是一个坏主意,您正在更改签名(位置参数但没有关键字参数),并且您不会将额外的参数传递给原始函数。

对于你的第二个例子,一次让出一个东西将迫使 NDB 等待它们的完成——“yield”是“wait”的同义词。相反,为集合中的每个元素执行 tasklet 函数,然后同时等待它们(通过调用列表上的 yield)。

于 2012-04-05T04:24:49.093 回答