39

如何使用 aiohttp 在客户端设置每秒最大请求数(限制它们)?

4

4 回答 4

61

虽然它并不完全限制每秒请求数,但请注意,从 v2.0 开始,使用 a 时ClientSessionaiohttp会自动将同时连接数限制为 100。

您可以通过创建自己的限制TCPConnector并将其传递给ClientSession. 例如,要创建一个限制为 50 个同时请求的客户端:

import aiohttp

connector = aiohttp.TCPConnector(limit=50)
client = aiohttp.ClientSession(connector=connector)

如果它更适合您的用例,limit_per_host您还可以传递一个参数(默认情况下关闭)以限制同时连接到同一“端点”的数量。根据文档:

limit_per_host( int) – 同时连接到同一端点的限制。如果端点具有相等的(host, port, is_ssl)三元组,则它们是相同的。

示例用法:

import aiohttp

connector = aiohttp.TCPConnector(limit_per_host=50)
client = aiohttp.ClientSession(connector=connector)
于 2017-05-08T21:25:41.197 回答
32

I found one possible solution here: http://compiletoi.net/fast-scraping-in-python-with-asyncio.html

Doing 3 requests at the same time is cool, doing 5000, however, is not so nice. If you try to do too many requests at the same time, connections might start to get closed, or you might even get banned from the website.

To avoid this, you can use a semaphore. It is a synchronization tool that can be used to limit the number of coroutines that do something at some point. We'll just create the semaphore before creating the loop, passing as an argument the number of simultaneous requests we want to allow:

sem = asyncio.Semaphore(5)

Then, we just replace:

page = yield from get(url, compress=True)

by the same thing, but protected by a semaphore:

with (yield from sem):
    page = yield from get(url, compress=True)

This will ensure that at most 5 requests can be done at the same time.

于 2016-02-04T10:22:00.840 回答
5

您可以为每个请求设置延迟或将 URL 分批分组并限制批次以满足所需的频率。

1. 每个请求的延迟

使用强制脚本在请求之间等待asyncio.sleep

import asyncio
import aiohttp

delay_per_request = 0.5
urls = [
   # put some URLs here...
]

async def app():
    tasks = []
    for url in urls:
        tasks.append(asyncio.ensure_future(make_request(url)))
        await asyncio.sleep(delay_per_request)

    results = await asyncio.gather(*tasks)
    return results

async def make_request(url):
    print('$$$ making request')
    async with aiohttp.ClientSession() as sess:
        async with sess.get(url) as resp:
            status = resp.status
            text = await resp.text()
            print('### got page data')
            return url, status, text

这可以与例如运行results = asyncio.run(app())

2.批量油门

从上面使用make_request,您可以请求和限制批量 URL,如下所示:

import asyncio
import aiohttp
import time

max_requests_per_second = 0.5
urls = [[
   # put a few URLs here...
],[
   # put a few more URLs here...
]]

async def app():
    results = []
    for i, batch in enumerate(urls):
        t_0 = time.time()
        print(f'batch {i}')
        tasks = [asyncio.ensure_future(make_request(url)) for url in batch]
        for t in tasks:
            d = await t
            results.append(d)
        t_1 = time.time()

        # Throttle requests
        batch_time = (t_1 - t_0)
        batch_size = len(batch)
        wait_time = (batch_size / max_requests_per_second) - batch_time
        if wait_time > 0:
            print(f'Too fast! Waiting {wait_time} seconds')
            time.sleep(wait_time)

    return results

同样,这可以使用asyncio.run(app()).

于 2019-01-25T18:28:07.697 回答
4

这是一个没有 的示例aiohttp,但您可以包装任何异步方法或aiohttp.request使用Limit装饰器

import asyncio
import time


class Limit(object):
    def __init__(self, calls=5, period=1):
        self.calls = calls
        self.period = period
        self.clock = time.monotonic
        self.last_reset = 0
        self.num_calls = 0

    def __call__(self, func):
        async def wrapper(*args, **kwargs):
            if self.num_calls >= self.calls:
                await asyncio.sleep(self.__period_remaining())

            period_remaining = self.__period_remaining()

            if period_remaining <= 0:
                self.num_calls = 0
                self.last_reset = self.clock()

            self.num_calls += 1

            return await func(*args, **kwargs)

        return wrapper

    def __period_remaining(self):
        elapsed = self.clock() - self.last_reset
        return self.period - elapsed


@Limit(calls=5, period=2)
async def test_call(x):
    print(x)


async def worker():
    for x in range(100):
        await test_call(x + 1)


asyncio.run(worker())
于 2020-06-21T18:55:41.073 回答