1

我是新手,httpx并且async无法将调用外部 API 的函数转换为requests使用httpx.

这是原始功能:

import requests


def get_api_data(url: str) -> Optional[List[str]]:
    """Get the data from an API url.
    Args:
        url (str):  The API url.
    Returns:
        list: The data.
    """
    try:
        resp = requests.get(url)
        return resp.json()
    except requests.exceptions.HTTPError as errh:
        print("Http Error:", errh)
    except requests.exceptions.ConnectionError as errc:
        print("Error Connecting:", errc)
    except requests.exceptions.Timeout as errt:
        print("Timeout Error:", errt)
    except requests.exceptions.RequestException as err:
        print("OOps: Something Else", err)

这就是我尝试过的:

import httpx


async def get_api_data(url: str) -> Optional[List[Any]]:
    """Get the data from an API url.
    Args:
        url (str):  The API url.
    Returns:
        list: The data.
    """
    try:
        async with httpx.AsyncClient() as client:
            resp = await client.get(url)
            return resp.json()
    except httpx.HTTPError as errh:
        print("Http Error:", errh)

但我不确定这是一个有效的代码以及如何测试它......

requests更改功能(版本)之前的测试示例;它用于patch模拟:

import pytest
import requests

from unittest.mock import patch


def test_api_http_error():
    with patch('app.internal.world_clock.requests.get', side_effect=requests.exceptions.HTTPError):
        assert not get_api_data(TIMEZONES_BASE_URL)

我花了很多时间试图弄清楚这一点,所以如果您对如何正确进行此转换以及如何测试它有任何建议 - 我将不胜感激。

4

1 回答 1

0

你的功能看起来不错。你在运行你的异步功能吗?当我第一次尝试使用它来替换项目中的请求时,我遇到了这个问题。而不是像这样调用它:results = async_funtion(parameter) 它需要是: results = asyncio.run(async_funtion(parameter)) 在第一个示例中,您只是创建一个协程对象,第二个实际运行它。

于 2022-01-27T19:29:27.663 回答