12

我有一个设置,其中 Tornado 被用作工人的一种传递。Tornado 收到请求,将请求发送给 N 个工作人员,汇总结果并将其发送回客户端。哪个工作正常,除非由于某种原因发生超时 - 然后我有内存泄漏。

我有一个类似于这个伪代码的设置:

workers = ["http://worker1.example.com:1234/",
           "http://worker2.example.com:1234/", 
           "http://worker3.example.com:1234/" ...]

class MyHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def post(self):
        responses = []

        def __callback(response):
            responses.append(response)
            if len(responses) == len(workers):
                self._finish_req(responses)

        for url in workers:
            async_client = tornado.httpclient.AsyncHTTPClient()
            request = tornado.httpclient.HTTPRequest(url, method=self.request.method, body=body)
            async_client.fetch(request, __callback) 

    def _finish_req(self, responses):
        good_responses = [r for r in responses if not r.error]
        if not good_responses:
            raise tornado.web.HTTPError(500, "\n".join(str(r.error) for r in responses))
        results = aggregate_results(good_responses)
        self.set_header("Content-Type", "application/json")
        self.write(json.dumps(results))
        self.finish()

application = tornado.web.Application([
    (r"/", MyHandler),
])

if __name__ == "__main__":
    ##.. some locking code 
    application.listen()
    tornado.ioloop.IOLoop.instance().start()

我究竟做错了什么?内存泄漏从何而来?

4

2 回答 2

5

我不知道问题的根源,似乎 gc 应该可以解决它,但是您可以尝试两件事。

第一种方法是简化一些引用(看起来可能还有responses对 RequestHandler 完成时的引用):

class MyHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def post(self):
        self.responses = []

        for url in workers:
            async_client = tornado.httpclient.AsyncHTTPClient()
            request = tornado.httpclient.HTTPRequest(url, method=self.request.method, body=body)
            async_client.fetch(request, self._handle_worker_response) 

    def _handle_worker_response(self, response):
        self.responses.append(response)
        if len(self.responses) == len(workers):
            self._finish_req()

    def _finish_req(self):
        ....

如果这不起作用,您始终可以手动调用垃圾收集:

import gc
class MyHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def post(self):
        ....

    def _finish_req(self):
        ....

    def on_connection_close(self):
        gc.collect()
于 2012-10-05T07:39:36.993 回答
1

代码看起来不错。泄漏可能在 Tornado 内部。

我只是偶然发现了这一行:

async_client = tornado.httpclient.AsyncHTTPClient()

你知道这个构造函数中的实例化魔法吗?从文档:

"""
The constructor for this class is magic in several respects:  It actually
creates an instance of an implementation-specific subclass, and instances
are reused as a kind of pseudo-singleton (one per IOLoop).  The keyword
argument force_instance=True can be used to suppress this singleton
behavior.  Constructor arguments other than io_loop and force_instance
are deprecated.  The implementation subclass as well as arguments to
its constructor can be set with the static method configure()
"""

所以实际上,您不需要在循环内执行此操作。(另一方面,它不应该造成任何伤害。)但是您使用的是哪个实现 CurlAsyncHTTPClient 或 SimpleAsyncHTTPClient?

如果是 SimpleAsyncHTTPClient,请注意代码中的这条注释:

"""
This class has not been tested extensively in production and
should be considered somewhat experimental as of the release of
tornado 1.2. 
"""

您可以尝试切换到 CurlAsyncHTTPClient。或者按照 Nikolay Fominyh 的建议跟踪对 __callback() 的调用。

于 2012-10-09T07:33:48.840 回答