1

我对异步编程非常陌生,我正在玩 httpx。我有以下代码,我确定我做错了什么 - 只是不知道它是什么。有两种方法,一种是同步的,一种是异步的。他们都来自谷歌金融。在我的系统上,我看到花费的时间如下:

异步:5.015218734741211
同步:5.173618316650391

这是代码:


import httpx
import asyncio
import time



#
#--------------------------------------------------------------------
#
#--------------------------------------------------------------------
#
def sync_pull(url):
  r = httpx.get(url)
  print(r.status_code)


#
#--------------------------------------------------------------------
#
#--------------------------------------------------------------------
#
async def async_pull(url):
  async with httpx.AsyncClient() as client:
    r = await client.get(url)
    print(r.status_code)


#
#--------------------------------------------------------------------
#
#--------------------------------------------------------------------
#
if __name__ == "__main__":

  goog_fin_nyse_url = 'https://www.google.com/finance/quote/'
  tickers = ['F', 'TWTR', 'CVX', 'VZ', 'GME', 'GM', 'PG', 'AAL', 
             'MARK', 'AAP', 'THO', 'NGD', 'ZSAN', 'SEAC',
             ]  

  print("Running asynchronously...")
  async_start = time.time()
  for ticker in tickers:
    url = goog_fin_nyse_url + ticker + ':NYSE'
    asyncio.run(async_pull(url))
  async_end = time.time()
  print(f"Time lapsed is: {async_end - async_start}")


  print("Running synchronously...")
  sync_start = time.time()
  for ticker in tickers:
    url = goog_fin_nyse_url + ticker + ':NYSE'
    sync_pull(url)
  sync_end = time.time()
  print(f"Time lapsed is: {sync_end - sync_start}")

我曾希望异步方法方法只需要同步方法所需时间的一小部分。我究竟做错了什么?

4

3 回答 3

4

当你说asyncio.run(async_pull)你说运行'async_pull'并等待结果回来时。由于您对循环中的每个代码都执行一次此操作,因此您实际上是在使用 asyncio 同步运行事物并且不会看到性能优势。

您需要做的是创建几个异步调用并同时运行它们。有几种方法可以做到这一点,最简单的方法是使用asyncio.gather(参见https://docs.python.org/3/library/asyncio-task.html#asyncio.gather),它接受一系列协程并同时运行它们. 调整代码相当简单,您创建一个异步函数来获取 url 列表,然后调用async_pull它们中的每一个,然后将其传递给asyncio.gather并等待结果。使您的代码适应此代码如下所示:

import httpx
import asyncio
import time

def sync_pull(url):
    r = httpx.get(url)
    print(r.status_code)

async def async_pull(url):
    async with httpx.AsyncClient() as client:
        r = await client.get(url)
        print(r.status_code)


async def async_pull_all(urls):
    return await asyncio.gather(*[async_pull(url) for url in urls])

if __name__ == "__main__":

    goog_fin_nyse_url = 'https://www.google.com/finance/quote/'
    tickers = ['F', 'TWTR', 'CVX', 'VZ', 'GME', 'GM', 'PG', 'AAL',
           'MARK', 'AAP', 'THO', 'NGD', 'ZSAN', 'SEAC',
           ]

    print("Running asynchronously...")
    async_start = time.time()
    results = asyncio.run(async_pull_all([goog_fin_nyse_url + ticker + ':NYSE' for ticker in tickers]))
    async_end = time.time()
    print(f"Time lapsed is: {async_end - async_start}")


    print("Running synchronously...")
    sync_start = time.time()
    for ticker in tickers:
        url = goog_fin_nyse_url + ticker + ':NYSE'
        sync_pull(url)
    sync_end = time.time()
    print(f"Time lapsed is: {sync_end - sync_start}")

以这种方式运行,异步版本对我来说运行大约一秒钟,而不是同步运行七秒钟。

于 2021-05-26T22:41:40.477 回答
1

这是我使用的一个很好的模式(我倾向于每次都改变一点)。通常,我创建一个模块async_utils.py并只导入顶级获取函数(例如这里fetch_things),然后我的代码可以自由地忘记内部(除了错误处理)。你可以用其他方式来做,但我喜欢 aiostream 的“功能”风格,并且经常发现对 process 函数的重复调用会采用我使用设置的某些默认值functools.partial

您可以将tqdm.tqdm进度条传递给pbar(以已知大小初始化total=len(things))以在处理每个异步响应时更新它。

import asyncio
import httpx
from aiostream import stream
from functools import partial

__all__ = ["fetch", "process", "async_fetch_urlset", "fetch_things"]

async def fetch(session, url, raise_for_status=False):
    response = await session.get(str(url))
    if raise_for_status:
        response.raise_for_status()
    return response


async def process_thing(data, things, pbar=None, verbose=False):
    # Map the response back to the thing it came from in the things list
    source_url = data.history[0].url if data.history else data.url
    thing = next(t for t in things if source_url == t.get("thing_url"))
    # Handle `data.content` here, where `data` is the `httpx.Response`
    if verbose:
        print(f"Processing {source_url=}")
    build.update({"computed_value": "result goes here"})
    if pbar:
        pbar.update()


async def async_fetch_urlset(urls, things, pbar=None, verbose=False, timeout_s=10.0):
    timeout = httpx.Timeout(timeout=timeout_s)
    async with httpx.AsyncClient(timeout=timeout) as session:
        ws = stream.repeat(session)
        xs = stream.zip(ws, stream.iterate(urls))
        ys = stream.starmap(xs, fetch, ordered=False, task_limit=20)
        process = partial(process_thing, things=things, pbar=pbar, verbose=verbose)
        zs = stream.map(ys, process)
        return await zs

def fetch_things(urls, things, pbar=None, verbose=False):
    return asyncio.run(async_fetch_urlset(urls, things, pbar, verbose))

在此示例中,输入是 dicts 列表(带有字符串键和值),things: list[dict[str,str]]并且"thing_url"访问键以检索 URL。当您想将结果“映射”回它来自的对象时,需要一个 dict 或对象,而不仅仅是 URL 字符串。该process_thing函数能够things就地修改输入列表(即,任何更改都不在函数范围内,它们将其更改回调用它的范围内)。

您经常会发现在异步运行期间会出现同步运行时不会出现的错误,因此您需要捕获它们并重试。一个常见的问题是在错误的级别重试(例如,围绕整个循环)

特别是,您需要导入并捕获httpcore.ConnectTimeouthttpx.ConnectTimeouthttpx.RemoteProtocolErrorhttpx.ReadTimeout

增加该timeout_s参数将通过让 AsyncClient '等待'更长的时间来降低超时错误的频率,但这样做实际上可能会减慢您的程序(它不会“快速失败”那么快)。

这是如何使用async_utils上面给出的模块的示例:

from async_utils import fetch_things
import httpx
import httpcore

# UNCOMMENT THIS TO SEE ALL THE HTTPX INTERNAL LOGGING
#import logging
#log = logging.getLogger()
#log.setLevel(logging.DEBUG)
#log_format = logging.Formatter('[%(asctime)s] [%(levelname)s] - %(message)s')
#console = logging.StreamHandler()
#console.setLevel(logging.DEBUG)
#console.setFormatter(log_format)
#log.addHandler(console)

things = [
    {"url": "https://python.org", "name": "Python"},
    {"url": "https://www.python-httpx.org/", "name": "HTTPX"},
]
#log.debug("URLSET:" + str(list(t.get("url") for t in things)))

def make_urlset(things):
    """Make a URL generator (empty if all have been fetched)"""
    urlset = (t.get("url") for t in things if "computed_value" not in t)
    return urlset

retryable_errors = (
    httpcore.ConnectTimeout,
    httpx.ConnectTimeout, httpx.RemoteProtocolError, httpx.ReadTimeout,
)

# ASYNCHRONOUS:
max_retries = 100
for i in range(max_retries):
    print(f"Retry {i}")
    try:
        urlset = make_urlset(things)
        foo = fetch_things(urls=urlset, things=things, verbose=True)
    except retryable_errors as exc:
        print(f"Caught {exc!r}")
        if i == max_retries - 1:
            raise
    except Exception:
        raise

# SYNCHRONOUS:
#for t in things:
#    resp = httpx.get(t["url"])

在这个例子"computed_value"中,一旦异步响应被成功处理,我就会在字典上设置一个键,然后阻止该 URL 在下一轮(make_urlset再次调用时)输入到生成器中。通过这种方式,生成器逐渐变小。您也可以使用列表来执行此操作,但我发现要提取的 URL 的生成器可以可靠地工作。对于一个对象,您可以将字典键分配/访问 ( update/ in) 更改为属性分配/访问 ( settatr/ hasattr)。

于 2021-08-09T16:10:41.063 回答
0

我想使用期货发布编码的工作版本 - 几乎相同的运行时间:


import httpx
import asyncio
import time

#
#--------------------------------------------------------------------
# Synchronous pull
#--------------------------------------------------------------------
#
def sync_pull(url):
  r = httpx.get(url)
  print(r.status_code)

#
#--------------------------------------------------------------------
# Asynchronous Pull
#--------------------------------------------------------------------
#
async def async_pull(url):
  async with httpx.AsyncClient() as client:
    r = await client.get(url)
    print(r.status_code)

#
#--------------------------------------------------------------------
# Build tasks queue & execute coroutines
#--------------------------------------------------------------------
#
async def build_task() -> None:
  goog_fin_nyse_url = 'https://www.google.com/finance/quote/'
  tickers = ['F', 'TWTR', 'CVX', 'VZ', 'GME', 'GM', 'PG', 'AAL', 
             'MARK', 'AAP', 'THO', 'NGD', 'ZSAN', 'SEAC',
             ]  
  tasks= []

  #
  ## Following block of code will create a queue full of function 
  ## call
  for ticker in tickers:
    url = goog_fin_nyse_url + ticker + ':NYSE'
    tasks.append(asyncio.ensure_future(async_pull(url)))

  start_time = time.time()

  #
  ## This block of code will derefernce the function calls
  ## from the queue, which will cause them all to run
  ## rapidly
  await asyncio.gather(*tasks)

  #
  ## Calculate time lapsed
  finish_time = time.time()
  elapsed_time = finish_time - start_time
  print(f"\n Time spent processing: {elapsed_time} ")


# Start from here
if __name__ == "__main__":
  asyncio.run(build_task())

于 2021-05-27T20:39:18.750 回答