19

文档说要重用 ClientSession:

不要为每个请求创建会话。您很可能需要每个应用程序一个会话来完全执行所有请求。

会话内部包含一个连接池,连接重用和保持活动(默认情况下都打开)可以提高整体性能。1

但是文档中似乎没有关于如何做到这一点的任何解释?有一个可能相关的示例,但它没有显示如何在其他地方重用池:http: //aiohttp.readthedocs.io/en/stable/client.html#keep-alive-connection-pooling-and-cookie-分享

这样的事情是正确的方法吗?

@app.listener('before_server_start')
async def before_server_start(app, loop):
    app.pg_pool = await asyncpg.create_pool(**DB_CONFIG, loop=loop, max_size=100)
    app.http_session_pool = aiohttp.ClientSession()


@app.listener('after_server_stop')
async def after_server_stop(app, loop):
    app.http_session_pool.close()
    app.pg_pool.close()


@app.post("/api/register")
async def register(request):
    # json validation
    async with app.pg_pool.acquire() as pg:
        await pg.execute()  # create unactivated user in db
        async with app.http_session_pool as session:
            # TODO send activation email using SES API
            async with session.post('http://httpbin.org/post', data=b'data') as resp:
                print(resp.status)
                print(await resp.text())
        return HTTPResponse(status=204)
4

3 回答 3

7

我认为可以改进的地方很少:

1)

的实例ClientSession是一个会话对象。这个会话包含连接池,但它本身不是“session_pool”。我建议重命名http_session_poolhttp_session或可能是client_session.

2)

Session 的close()方法是一个 corountine。你应该等待它:

await app.client_session.close()

甚至更好(恕我直言),而不是考虑如何正确打开/关闭会话使用标准异步上下文管理器等待__aenter__/ __aexit__

@app.listener('before_server_start')
async def before_server_start(app, loop):
    # ...
    app.client_session = await aiohttp.ClientSession().__aenter__()


@app.listener('after_server_stop')
async def after_server_stop(app, loop):
    await app.client_session.__aexit__(None, None, None)
    # ...

3)

注意这个信息

但是,如果事件循环在底层连接关闭之前停止,则会ResourceWarning: unclosed transport发出警告(启用警告时)。

为了避免这种情况,必须在关闭事件循环之前添加一个小的延迟,以允许任何打开的底层连接关闭。

我不确定在您的情况下这是强制性的,但是在await asyncio.sleep(0)内部添加after_server_stop作为文档建议并没有什么不好:

@app.listener('after_server_stop')
async def after_server_stop(app, loop):
    # ...
    await asyncio.sleep(0)  # http://aiohttp.readthedocs.io/en/stable/client.html#graceful-shutdown

更新:

实现__aenter__/的类__aexit__可以用作异步上下文管理器(可以在async with语句中使用)。它允许在执行内部块之前和之后执行一些操作。这与常规上下文管理器非常相似,但asyncio相关。与常规上下文管理器异步相同,可以直接使用(无需async with)手动等待__aenter__/ __aexit__

例如,为什么我认为使用__aenter__/__aexit__手动创建/释放会话而不是使用更好close()__aenter__因为我们不应该担心/内部实际发生了什么__aexit__。想象一下,在未来版本的会话创建中,例如aiohttp需要等待而改变。open()如果您将使用__aenter__/__aexit__您将不需要以某种方式更改您的代码。

于 2017-10-28T20:27:41.743 回答
1

在我的代码触发此警告消息后,我在 Google 上搜索如何重用 aiohttp ClientSession 实例后发现了这个问题:UserWarning: Creating a client session outside of coroutine is a very dangerous idea

这段代码虽然是相关的,但可能无法解决上述问题。我是 asyncio 和 aiohttp 的新手,所以这可能不是最佳实践。这是我在阅读了很多看似矛盾的信息后能想到的最好的方法。

我从打开上下文的 Python 文档中创建了一个类 ResourceManager。

ResourceManager 实例通过魔术方法__aenter__以及__aexit__BaseScraper.set_session 和 BaseScraper.close_session 包装器方法处理 aiohttp ClientSession 实例的打开和关闭。

我能够使用以下代码重用 ClientSession 实例。

BaseScraper 类也有认证方法。它依赖于 lxml 第三方包。

import asyncio
from time import time
from contextlib import contextmanager, AbstractContextManager, ExitStack

import aiohttp
import lxml.html


class ResourceManager(AbstractContextManager):
    # Code taken from Python docs: 29.6.2.4. of https://docs.python.org/3.6/library/contextlib.html

    def __init__(self, scraper, check_resource_ok=None):
        self.acquire_resource = scraper.acquire_resource
        self.release_resource = scraper.release_resource
        if check_resource_ok is None:

            def check_resource_ok(resource):
                return True

        self.check_resource_ok = check_resource_ok

    @contextmanager
    def _cleanup_on_error(self):
        with ExitStack() as stack:
            stack.push(self)
            yield
            # The validation check passed and didn't raise an exception
            # Accordingly, we want to keep the resource, and pass it
            # back to our caller
            stack.pop_all()

    def __enter__(self):
        resource = self.acquire_resource()
        with self._cleanup_on_error():
            if not self.check_resource_ok(resource):
                msg = "Failed validation for {!r}"
                raise RuntimeError(msg.format(resource))
        return resource

    def __exit__(self, *exc_details):
        # We don't need to duplicate any of our resource release logic
        self.release_resource()


class BaseScraper:
    login_url = ""
    login_data = dict()  # dict of key, value pairs to fill the login form
    loop = asyncio.get_event_loop()

    def __init__(self, urls):
        self.urls = urls
        self.acquire_resource = self.set_session
        self.release_resource = self.close_session

    async def _set_session(self):
        self.session = await aiohttp.ClientSession().__aenter__()

    def set_session(self):
        set_session_attr = self.loop.create_task(self._set_session())
        self.loop.run_until_complete(set_session_attr)
        return self  # variable after "as" becomes instance of BaseScraper

    async def _close_session(self):
        await self.session.__aexit__(None, None, None)

    def close_session(self):
        close_session = self.loop.create_task(self._close_session())
        self.loop.run_until_complete(close_session)

    def __call__(self):
        fetch_urls = self.loop.create_task(self._fetch())
        return self.loop.run_until_complete(fetch_urls)

    async def _get(self, url):
        async with self.session.get(url) as response:
            result = await response.read()
        return url, result

    async def _fetch(self):
        tasks = (self.loop.create_task(self._get(url)) for url in self.urls)
        start = time()
        results = await asyncio.gather(*tasks)
        print(
            "time elapsed: {} seconds \nurls count: {}".format(
                time() - start, len(urls)
            )
        )
        return results

    @property
    def form(self):
        """Create and return form for authentication."""
        form = aiohttp.FormData(self.login_data)
        get_login_page = self.loop.create_task(self._get(self.login_url))
        url, login_page = self.loop.run_until_complete(get_login_page)

        login_html = lxml.html.fromstring(login_page)
        hidden_inputs = login_html.xpath(r'//form//input[@type="hidden"]')
        login_form = {x.attrib["name"]: x.attrib["value"] for x in hidden_inputs}
        for key, value in login_form.items():
            form.add_field(key, value)
        return form

    async def _login(self, form):
        async with self.session.post(self.login_url, data=form) as response:
            if response.status != 200:
                response.raise_for_status()
            print("logged into {}".format(url))
            await response.release()

    def login(self):
        post_login_form = self.loop.create_task(self._login(self.form))
        self.loop.run_until_complete(post_login_form)


if __name__ == "__main__":
    urls = ("http://example.com",) * 10
    base_scraper = BaseScraper(urls)
    with ResourceManager(base_scraper) as scraper:
        for url, html in scraper():
            print(url, len(html))
于 2018-10-27T17:59:24.217 回答
0

aiohttp 中似乎没有会话池。
// 只是发布一些官方文档。


持续会话

这是persistent-session官方网站上的用法演示
https://docs.aiohttp.org/en/latest/client_advanced.html#persistent-session

app.cleanup_ctx.append(persistent_session)

async def persistent_session(app):
   app['PERSISTENT_SESSION'] = session = aiohttp.ClientSession()
   yield
   await session.close()

async def my_request_handler(request):
   session = request.app['PERSISTENT_SESSION']
   async with session.get("http://python.org") as resp:
       print(resp.status)

//TODO:一个完整​​的可运行的演示代码

连接池

它有一个连接池:
https ://docs.aiohttp.org/en/latest/client_advanced.html#connectors

conn = aiohttp.TCPConnector()
#conn = aiohttp.TCPConnector(limit=30)
#conn = aiohttp.TCPConnector(limit=0)  # nolimit, default is 100.
#conn = aiohttp.TCPConnector(limit_per_host=30) # default is 0

session = aiohttp.ClientSession(connector=conn)

于 2022-01-01T09:57:21.603 回答