5

我有一个更新实体的请求处理程序,将其保存到数据存储区,然后需要在返回之前执行一些额外的工作(比如排队后台任务和 json 序列化一些结果)。我想并行化这段代码,以便在保存实体的同时完成额外的工作。

这是我的处理程序代码归结为:

class FooHandler(webapp2.RequestHandler):
    @ndb.toplevel
    def post(self):
        foo = yield Foo.get_by_id_async(some_id)

        # Do some work with foo

        # Don't yield, as I want to perform the code that follows
        # while foo is being saved to the datastore.
        # I'm in a toplevel, so the handler will not exit as long as
        # this async request is not finished.
        foo.put_async()

        taskqueue.add(...)
        json_result = generate_result()
        self.response.headers["Content-Type"] = "application/json; charset=UTF-8"
        self.response.write(json_result)

但是,Appstats 显示datastore.PutRPC 正在串行完成,之后taskqueue.Add

应用统计截图

稍微挖掘一下,ndb.context.py就会发现put_async()调用最终被添加到 an中,AutoBatcher而不是立即发出 RPC。

所以我认为当等待所有异步调用完成_put_batcher时,最终会被刷新。toplevel

我知道批处理 put 在某些情况下确实有好处,但在我的情况下,我真的希望立即发送 put RPC,这样我就可以在保存实体的同时执行其他工作。

如果我这样做yield foo.put_async()了,那么我会在 Appstats 中得到相同的瀑布,但datastore.Put在其余部分之前完成:

第二个 Appstats 截图

这是意料之中的,因为让我的处理程序在执行其余代码之前yield等待调用完成。put_async()

我也尝试过ndb.get_context().flush()在之后添加一个调用foo.put_async(),但是根据 Appstats 仍然没有并行进行datastore.Putand调用。taskqueue.BulkAdd

所以我的问题是:如何强制调用put_async()绕过自动批处理器并立即发出 RPC?

4

2 回答 2

6

没有支持的方式来做到这一点。也许应该有。如果这行得通,你可以试试吗?

loop - ndb.eventloop.get_event_loop()
while loop.run_idle():
    pass

您可能需要查看 ndb/eventloop.py 的源代码以了解您还可以尝试什么——基本上您想尝试 run0() 的大部分功能,除了等待 RPC。特别是,您可能必须这样做:

while loop.current:
    loop.run0()
while loop.run_idle():
    pass

(这仍然不受支持,因为您可能还必须处理其他条件,但在您的示例中似乎没有出现这些情况。)

于 2013-02-22T15:58:57.850 回答
-2

试试这个,我不是 100% 肯定它会有所帮助:

foo = yield Foo.get_by_id_async(some_id)
future = foo.put_async()
future.done()

ndb 请求被放入自动批处理器,当您需要结果时,批处理被发送到 RPC。由于您不需要 foo.put_async() 的结果,因此在您进行另一个 ndb 调用(您不需要)或直到 @ndb.toplevel 结束之前,它不会被发送。

调用 future.done() 不会阻塞,但我猜它可能会触发请求。

尝试强制操作的另一件事是:

ndb.get_context().flush()
于 2013-02-21T17:24:10.920 回答