我尝试了python请求库文档中提供的示例。
使用async.map(rs)
,我得到响应代码,但我想获取请求的每个页面的内容。例如,这不起作用:
out = async.map(rs)
print out[0].content
我尝试了python请求库文档中提供的示例。
使用async.map(rs)
,我得到响应代码,但我想获取请求的每个页面的内容。例如,这不起作用:
out = async.map(rs)
print out[0].content
以下答案不适用于请求 v0.13.0+。编写此问题后,异步功能已移至grequests 。但是,你可以requests
用grequests
下面的替换它应该可以工作。
我留下这个答案是为了反映关于使用 requests < v0.13.0 的原始问题。
要async.map
异步执行多项任务,您必须:
async.map
所有请求/操作的列表例子:
from requests import async
# If using requests > v0.13.0, use
# from grequests import async
urls = [
'http://python-requests.org',
'http://httpbin.org',
'http://python-guide.org',
'http://kennethreitz.com'
]
# A simple task to do to each response object
def do_something(response):
print response.url
# A list to hold our things to do via async
async_list = []
for u in urls:
# The "hooks = {..." part is where you define what you want to do
#
# Note the lack of parentheses following do_something, this is
# because the response will be used as the first argument automatically
action_item = async.get(u, hooks = {'response' : do_something})
# Add the task to our list of things to do via async
async_list.append(action_item)
# Do our list of things to do via async
async.map(async_list)
async
现在是一个独立的模块:grequests
。
见这里:https ://github.com/kennethreitz/grequests
还有:通过 Python 发送多个 HTTP 请求的理想方法?
$ pip install grequests
建立一个堆栈:
import grequests
urls = [
'http://www.heroku.com',
'http://tablib.org',
'http://httpbin.org',
'http://python-requests.org',
'http://kennethreitz.com'
]
rs = (grequests.get(u) for u in urls)
发送堆栈
grequests.map(rs)
结果看起来像
[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]
grequests 似乎没有为并发请求设置限制,即当多个请求发送到同一服务器时。
我测试了requests-futures和grequests。Grequests 更快,但会带来猴子补丁和其他依赖问题。requests-futures 比 grequests 慢几倍。我决定编写自己的并将请求简单地包装到ThreadPoolExecutor中,它几乎与 grequests 一样快,但没有外部依赖项。
import requests
import concurrent.futures
def get_urls():
return ["url1","url2"]
def load_url(url, timeout):
return requests.get(url, timeout = timeout)
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
future_to_url = {executor.submit(load_url, url, 10): url for url in get_urls()}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
resp_err = resp_err + 1
else:
resp_ok = resp_ok + 1
也许requests-futures是另一种选择。
from requests_futures.sessions import FuturesSession
session = FuturesSession()
# first request is started in background
future_one = session.get('http://httpbin.org/get')
# second requests is started immediately
future_two = session.get('http://httpbin.org/get?foo=bar')
# wait for the first request to complete, if it hasn't already
response_one = future_one.result()
print('response one status: {0}'.format(response_one.status_code))
print(response_one.content)
# wait for the second request to complete, if it hasn't already
response_two = future_two.result()
print('response two status: {0}'.format(response_two.status_code))
print(response_two.content)
办公文档中也推荐。如果您不想涉及 gevent,这是一个很好的选择。
不幸的是,据我所知,请求库不具备执行异步请求的能力。async/await
您可以将语法包装在 . 周围requests
,但这将使底层请求的同步性不减。如果您想要真正的异步请求,则必须使用提供它的其他工具。一种这样的解决方案是aiohttp
(Python 3.5.3+)。根据我使用 Python 3.7async/await
语法的经验,它运行良好。下面我编写了三个执行 n 个 Web 请求的实现,使用
sync_requests_get_all
使用 Pythonrequests
库的纯同步请求 ( )async_requests_get_all
)和requests
async/await
asyncio
async_aiohttp_get_all
),使用aiohttp
Python 3.7async/await
语法包装的 Python 库和asyncio
"""
Tested in Python 3.5.10
"""
import time
import asyncio
import requests
import aiohttp
from asgiref import sync
def timed(func):
"""
records approximate durations of function calls
"""
def wrapper(*args, **kwargs):
start = time.time()
print('{name:<30} started'.format(name=func.__name__))
result = func(*args, **kwargs)
duration = "{name:<30} finished in {elapsed:.2f} seconds".format(
name=func.__name__, elapsed=time.time() - start
)
print(duration)
timed.durations.append(duration)
return result
return wrapper
timed.durations = []
@timed
def sync_requests_get_all(urls):
"""
performs synchronous get requests
"""
# use session to reduce network overhead
session = requests.Session()
return [session.get(url).json() for url in urls]
@timed
def async_requests_get_all(urls):
"""
asynchronous wrapper around synchronous requests
"""
session = requests.Session()
# wrap requests.get into an async function
def get(url):
return session.get(url).json()
async_get = sync.sync_to_async(get)
async def get_all(urls):
return await asyncio.gather(*[
async_get(url) for url in urls
])
# call get_all as a sync function to be used in a sync context
return sync.async_to_sync(get_all)(urls)
@timed
def async_aiohttp_get_all(urls):
"""
performs asynchronous get requests
"""
async def get_all(urls):
async with aiohttp.ClientSession() as session:
async def fetch(url):
async with session.get(url) as response:
return await response.json()
return await asyncio.gather(*[
fetch(url) for url in urls
])
# call get_all as a sync function to be used in a sync context
return sync.async_to_sync(get_all)(urls)
if __name__ == '__main__':
# this endpoint takes ~3 seconds to respond,
# so a purely synchronous implementation should take
# little more than 30 seconds and a purely asynchronous
# implementation should take little more than 3 seconds.
urls = ['https://postman-echo.com/delay/3']*10
async_aiohttp_get_all(urls)
async_requests_get_all(urls)
sync_requests_get_all(urls)
print('----------------------')
[print(duration) for duration in timed.durations]
在我的机器上,这是输出:
async_aiohttp_get_all started
async_aiohttp_get_all finished in 3.20 seconds
async_requests_get_all started
async_requests_get_all finished in 30.61 seconds
sync_requests_get_all started
sync_requests_get_all finished in 30.59 seconds
----------------------
async_aiohttp_get_all finished in 3.20 seconds
async_requests_get_all finished in 30.61 seconds
sync_requests_get_all finished in 30.59 seconds
我发布的大多数答案都有很多问题 - 它们要么使用已被移植但功能有限的已弃用库,要么提供了一个在执行请求时具有太多魔力的解决方案,从而难以处理错误。如果它们不属于上述类别之一,则它们是第 3 方库或已弃用。
一些解决方案纯粹在 http 请求中可以正常工作,但是对于任何其他类型的请求,这些解决方案都达不到要求,这很可笑。这里不需要高度定制的解决方案。
只需使用 python 内置库asyncio
就足以执行任何类型的异步请求,并为复杂和特定于用例的错误处理提供足够的流动性。
import asyncio
loop = asyncio.get_event_loop()
def do_thing(params):
async def get_rpc_info_and_do_chores(id):
# do things
response = perform_grpc_call(id)
do_chores(response)
async def get_httpapi_info_and_do_chores(id):
# do things
response = requests.get(URL)
do_chores(response)
async_tasks = []
for element in list(params.list_of_things):
async_tasks.append(loop.create_task(get_chan_info_and_do_chores(id)))
async_tasks.append(loop.create_task(get_httpapi_info_and_do_chores(ch_id)))
loop.run_until_complete(asyncio.gather(*async_tasks))
它的工作原理很简单。您正在创建一系列您希望异步执行的任务,然后要求循环执行这些任务并在完成后退出。没有因缺乏维护而导致的额外库,也没有缺乏所需的功能。
你可以使用httpx
它。
import httpx
async def get_async(url):
async with httpx.AsyncClient() as client:
return await client.get(url)
urls = ["http://google.com", "http://wikipedia.org"]
# Note that you need an async context to use `await`.
await asyncio.gather(*map(get_async, urls))
如果你想要一个函数式语法,gamla lib 将它包装成get_async
.
然后你可以做
await gamla.map(gamla.get_async(10))(["http://google.com", "http://wikipedia.org"])
10
是以秒为单位的超时。
(免责声明:我是它的作者)
我知道这已经关闭了一段时间,但我认为推广另一个基于 requests 库的异步解决方案可能会很有用。
list_of_requests = ['http://moop.com', 'http://doop.com', ...]
from simple_requests import Requests
for response in Requests().swarm(list_of_requests):
print response.content
文档在这里: http: //pythonhosted.org/simple-requests/
如果您想使用 asyncio,则为- https://github.com/encode/requests-asyncrequests-async
提供 async/await 功能requests
免责声明:Following code creates different threads for each function
。
这可能对某些情况有用,因为它更易于使用。但是要知道它不是异步的,而是使用多个线程产生异步的错觉,即使装饰器建议这样做。
您可以使用以下装饰器在函数执行完成后给出回调,回调必须处理函数返回的数据。
请注意,函数被修饰后会返回一个Future
对象。
import asyncio
## Decorator implementation of async runner !!
def run_async(callback, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
def inner(func):
def wrapper(*args, **kwargs):
def __exec():
out = func(*args, **kwargs)
callback(out)
return out
return loop.run_in_executor(None, __exec)
return wrapper
return inner
实施示例:
urls = ["https://google.com", "https://facebook.com", "https://apple.com", "https://netflix.com"]
loaded_urls = [] # OPTIONAL, used for showing realtime, which urls are loaded !!
def _callback(resp):
print(resp.url)
print(resp)
loaded_urls.append((resp.url, resp)) # OPTIONAL, used for showing realtime, which urls are loaded !!
# Must provide a callback function, callback func will be executed after the func completes execution
# Callback function will accept the value returned by the function.
@run_async(_callback)
def get(url):
return requests.get(url)
for url in urls:
get(url)
如果您希望查看实时加载的 url,您也可以在末尾添加以下代码:
while True:
print(loaded_urls)
if len(loaded_urls) == len(urls):
break
from threading import Thread
threads=list()
for requestURI in requests:
t = Thread(target=self.openURL, args=(requestURI,))
t.start()
threads.append(t)
for thread in threads:
thread.join()
...
def openURL(self, requestURI):
o = urllib2.urlopen(requestURI, timeout = 600)
o...
一段时间以来,我一直在使用 python 请求对 github 的 gist API 进行异步调用。
例如,请参阅此处的代码:
https://github.com/davidthewatson/flasgist/blob/master/views.py#L60-72
这种风格的 python 可能不是最清楚的例子,但我可以向你保证,代码是有效的。如果这让您感到困惑,请告诉我,我会记录下来。
我还尝试了一些使用 python 中的异步方法的东西,但是我在使用 twisted 进行异步编程时有更好的运气。它的问题更少,并且有据可查。这是与您尝试扭曲的内容类似的链接。
http://pythonquirks.blogspot.com/2011/04/twisted-asynchronous-http-request.html
我赞同上面使用HTTPX的建议,但我经常以不同的方式使用它,所以我添加了我的答案。
我个人使用asyncio.run
(在 Python 3.7 中引入)而不是asyncio.gather
也更喜欢这种aiostream
方法,它可以与 asyncio 和 httpx 结合使用。
正如我刚刚发布的这个示例一样,这种风格有助于异步处理一组 URL,即使(常见)错误发生也是如此。我特别喜欢这种风格如何阐明响应处理发生的位置以及便于错误处理(我发现异步调用往往会提供更多)。
发布一个异步触发一堆请求的简单示例更容易,但通常您还想处理响应内容(用它计算一些东西,也许参考您请求的 URL 的原始对象) .
该方法的核心如下所示:
async with httpx.AsyncClient(timeout=timeout) as session:
ws = stream.repeat(session)
xs = stream.zip(ws, stream.iterate(urls))
ys = stream.starmap(xs, fetch, ordered=False, task_limit=20)
process = partial(process_thing, things=things, pbar=pbar, verbose=verbose)
zs = stream.map(ys, process)
return await zs
在哪里:
process_thing
是一个异步响应内容处理函数things
是输入列表(urls
URL 字符串的生成器来自),例如对象/字典列表pbar
是一个进度条(例如tqdm.tqdm
)[可选但有用]所有这些都放在一个异步函数async_fetch_urlset
中,然后通过调用名为 eg 的同步“顶级”函数fetch_things
运行该函数,该函数运行协程 [这是异步函数返回的内容]并管理事件循环:
def fetch_things(urls, things, pbar=None, verbose=False):
return asyncio.run(async_fetch_urlset(urls, things, pbar, verbose))
由于作为输入传递的列表(这里是things
)可以就地修改,因此您可以有效地获取输出(就像我们习惯于从同步函数调用中一样)