这是我使用的一个很好的模式(我倾向于每次都改变一点)。通常,我创建一个模块async_utils.py
并只导入顶级获取函数(例如这里fetch_things
),然后我的代码可以自由地忘记内部(除了错误处理)。你可以用其他方式来做,但我喜欢 aiostream 的“功能”风格,并且经常发现对 process 函数的重复调用会采用我使用设置的某些默认值functools.partial
。
您可以将tqdm.tqdm
进度条传递给pbar
(以已知大小初始化total=len(things)
)以在处理每个异步响应时更新它。
import asyncio
import httpx
from aiostream import stream
from functools import partial
__all__ = ["fetch", "process", "async_fetch_urlset", "fetch_things"]
async def fetch(session, url, raise_for_status=False):
response = await session.get(str(url))
if raise_for_status:
response.raise_for_status()
return response
async def process_thing(data, things, pbar=None, verbose=False):
# Map the response back to the thing it came from in the things list
source_url = data.history[0].url if data.history else data.url
thing = next(t for t in things if source_url == t.get("thing_url"))
# Handle `data.content` here, where `data` is the `httpx.Response`
if verbose:
print(f"Processing {source_url=}")
build.update({"computed_value": "result goes here"})
if pbar:
pbar.update()
async def async_fetch_urlset(urls, things, pbar=None, verbose=False, timeout_s=10.0):
timeout = httpx.Timeout(timeout=timeout_s)
async with httpx.AsyncClient(timeout=timeout) as session:
ws = stream.repeat(session)
xs = stream.zip(ws, stream.iterate(urls))
ys = stream.starmap(xs, fetch, ordered=False, task_limit=20)
process = partial(process_thing, things=things, pbar=pbar, verbose=verbose)
zs = stream.map(ys, process)
return await zs
def fetch_things(urls, things, pbar=None, verbose=False):
return asyncio.run(async_fetch_urlset(urls, things, pbar, verbose))
在此示例中,输入是 dicts 列表(带有字符串键和值),things: list[dict[str,str]]
并且"thing_url"
访问键以检索 URL。当您想将结果“映射”回它来自的对象时,需要一个 dict 或对象,而不仅仅是 URL 字符串。该process_thing
函数能够things
就地修改输入列表(即,任何更改都不在函数范围内,它们将其更改回调用它的范围内)。
您经常会发现在异步运行期间会出现同步运行时不会出现的错误,因此您需要捕获它们并重试。一个常见的问题是在错误的级别重试(例如,围绕整个循环)
特别是,您需要导入并捕获httpcore.ConnectTimeout
、httpx.ConnectTimeout
、httpx.RemoteProtocolError
和httpx.ReadTimeout
。
增加该timeout_s
参数将通过让 AsyncClient '等待'更长的时间来降低超时错误的频率,但这样做实际上可能会减慢您的程序(它不会“快速失败”那么快)。
这是如何使用async_utils
上面给出的模块的示例:
from async_utils import fetch_things
import httpx
import httpcore
# UNCOMMENT THIS TO SEE ALL THE HTTPX INTERNAL LOGGING
#import logging
#log = logging.getLogger()
#log.setLevel(logging.DEBUG)
#log_format = logging.Formatter('[%(asctime)s] [%(levelname)s] - %(message)s')
#console = logging.StreamHandler()
#console.setLevel(logging.DEBUG)
#console.setFormatter(log_format)
#log.addHandler(console)
things = [
{"url": "https://python.org", "name": "Python"},
{"url": "https://www.python-httpx.org/", "name": "HTTPX"},
]
#log.debug("URLSET:" + str(list(t.get("url") for t in things)))
def make_urlset(things):
"""Make a URL generator (empty if all have been fetched)"""
urlset = (t.get("url") for t in things if "computed_value" not in t)
return urlset
retryable_errors = (
httpcore.ConnectTimeout,
httpx.ConnectTimeout, httpx.RemoteProtocolError, httpx.ReadTimeout,
)
# ASYNCHRONOUS:
max_retries = 100
for i in range(max_retries):
print(f"Retry {i}")
try:
urlset = make_urlset(things)
foo = fetch_things(urls=urlset, things=things, verbose=True)
except retryable_errors as exc:
print(f"Caught {exc!r}")
if i == max_retries - 1:
raise
except Exception:
raise
# SYNCHRONOUS:
#for t in things:
# resp = httpx.get(t["url"])
在这个例子"computed_value"
中,一旦异步响应被成功处理,我就会在字典上设置一个键,然后阻止该 URL 在下一轮(make_urlset
再次调用时)输入到生成器中。通过这种方式,生成器逐渐变小。您也可以使用列表来执行此操作,但我发现要提取的 URL 的生成器可以可靠地工作。对于一个对象,您可以将字典键分配/访问 ( update
/ in
) 更改为属性分配/访问 ( settatr
/ hasattr
)。