打算回答我自己的问题,因为我必须自己解决这个问题,而且关于这件事的信息似乎很少。
思路如下。与 GRequests 一起使用的每个请求对象在创建时都可以将会话对象作为参数。另一方面,会话对象可以安装在发出请求时使用的 HTTP 适配器。通过创建我们自己的适配器,我们可以拦截请求并以我们认为最适合我们的应用程序的方式限制它们。就我而言,我最终得到了下面的代码。
用于节流的对象:
DEFAULT_BURST_WINDOW = datetime.timedelta(seconds=5)
DEFAULT_WAIT_WINDOW = datetime.timedelta(seconds=15)
class BurstThrottle(object):
max_hits = None
hits = None
burst_window = None
total_window = None
timestamp = None
def __init__(self, max_hits, burst_window, wait_window):
self.max_hits = max_hits
self.hits = 0
self.burst_window = burst_window
self.total_window = burst_window + wait_window
self.timestamp = datetime.datetime.min
def throttle(self):
now = datetime.datetime.utcnow()
if now < self.timestamp + self.total_window:
if (now < self.timestamp + self.burst_window) and (self.hits < self.max_hits):
self.hits += 1
return datetime.timedelta(0)
else:
return self.timestamp + self.total_window - now
else:
self.timestamp = now
self.hits = 1
return datetime.timedelta(0)
HTTP 适配器:
class MyHttpAdapter(requests.adapters.HTTPAdapter):
throttle = None
def __init__(self, pool_connections=requests.adapters.DEFAULT_POOLSIZE,
pool_maxsize=requests.adapters.DEFAULT_POOLSIZE, max_retries=requests.adapters.DEFAULT_RETRIES,
pool_block=requests.adapters.DEFAULT_POOLBLOCK, burst_window=DEFAULT_BURST_WINDOW,
wait_window=DEFAULT_WAIT_WINDOW):
self.throttle = BurstThrottle(pool_maxsize, burst_window, wait_window)
super(MyHttpAdapter, self).__init__(pool_connections=pool_connections, pool_maxsize=pool_maxsize,
max_retries=max_retries, pool_block=pool_block)
def send(self, request, stream=False, timeout=None, verify=True, cert=None, proxies=None):
request_successful = False
response = None
while not request_successful:
wait_time = self.throttle.throttle()
while wait_time > datetime.timedelta(0):
gevent.sleep(wait_time.total_seconds(), ref=True)
wait_time = self.throttle.throttle()
response = super(MyHttpAdapter, self).send(request, stream=stream, timeout=timeout,
verify=verify, cert=cert, proxies=proxies)
if response.status_code != 429:
request_successful = True
return response
设置:
requests_adapter = adapter.MyHttpAdapter(
pool_connections=__CONCURRENT_LIMIT__,
pool_maxsize=__CONCURRENT_LIMIT__,
max_retries=0,
pool_block=False,
burst_window=datetime.timedelta(seconds=5),
wait_window=datetime.timedelta(seconds=20))
requests_session = requests.session()
requests_session.mount('http://', requests_adapter)
requests_session.mount('https://', requests_adapter)
unsent_requests = (grequests.get(url,
hooks={'response': handle_response},
session=requests_session) for url in urls)
grequests.map(unsent_requests, size=__CONCURRENT_LIMIT__)