我有一个更新实体的请求处理程序,将其保存到数据存储区,然后需要在返回之前执行一些额外的工作(比如排队后台任务和 json 序列化一些结果)。我想并行化这段代码,以便在保存实体的同时完成额外的工作。
这是我的处理程序代码归结为:
class FooHandler(webapp2.RequestHandler):
@ndb.toplevel
def post(self):
foo = yield Foo.get_by_id_async(some_id)
# Do some work with foo
# Don't yield, as I want to perform the code that follows
# while foo is being saved to the datastore.
# I'm in a toplevel, so the handler will not exit as long as
# this async request is not finished.
foo.put_async()
taskqueue.add(...)
json_result = generate_result()
self.response.headers["Content-Type"] = "application/json; charset=UTF-8"
self.response.write(json_result)
但是,Appstats 显示datastore.Put
RPC 正在串行完成,之后taskqueue.Add
:
稍微挖掘一下,ndb.context.py
就会发现put_async()
调用最终被添加到 an中,AutoBatcher
而不是立即发出 RPC。
所以我认为当等待所有异步调用完成_put_batcher
时,最终会被刷新。toplevel
我知道批处理 put 在某些情况下确实有好处,但在我的情况下,我真的希望立即发送 put RPC,这样我就可以在保存实体的同时执行其他工作。
如果我这样做yield foo.put_async()
了,那么我会在 Appstats 中得到相同的瀑布,但datastore.Put
在其余部分之前完成:
这是意料之中的,因为让我的处理程序在执行其余代码之前yield
等待调用完成。put_async()
我也尝试过ndb.get_context().flush()
在之后添加一个调用foo.put_async()
,但是根据 Appstats 仍然没有并行进行datastore.Put
and调用。taskqueue.BulkAdd
所以我的问题是:如何强制调用put_async()
绕过自动批处理器并立即发出 RPC?