0

我需要运行大约 500 个并发循环。每个循环将依次获取一个分页的 REST 端点,直到它到达 500 个端点中每个端点的最后一页。其中一些循环只有 5 到 10 页,因此会很快完成,但其他循环有 100 页。

问题是我需要将此 URL 获取放在一个顺序的阻塞循环中,因为由于 API 限制,每个页面都必须按顺序获取(如果我获取第 7 页,然后获取第 5 页, API 将抛出错误)。因此,这里的并行单位是每个循环,而不是循环内的每个 URL 获取。

任何地方都没有进行繁重的计算。只需获取一个页面,然后将原始内容放入 kafka 主题中。除了依赖于许多内核的多进程之外,我对任何建议持开放态度。AsyncIO、Gevent、多线程...

编辑1:

实际的问题是,如果我使用aiohttp在每个循环中异步获取每个页面,我无法保证页面 2 会在页面 1 之后被获取。请求将以正确的顺序发起,但绝对不能保证请求将以正确的顺序到达并在端点处被处理。

编辑2:

正如 user4815162342 指出的那样,aiohttp 应该可以工作

谢谢!

4

1 回答 1

2

在 asyncio 中,您可以并行启动与端点一样多的循环,并等待所有循环完成。每个循环将使用 aiohttp 顺序获取端点页面。例如:

async def download_loop(session, endpoint):
    for i in itertools.count(1):
        try:
            async with session.get(endpoint, params={'page': str(i)}) as resp:
                content = await resp.read()
        except aiohttp.ClientResponseError:
            break   # no more pages
        # do something with the response content

async def download(endpoints):
    loop = asyncio.get_event_loop()
    async with aiohttp.ClientSession() as session:
        # Start all loops in parallel and wait for them to finish.
        # This will start as many loops as there are endpoints.
        await asyncio.wait([download_loop(session, endpoint)
                            for endpoint in endpoints])

# for testing:
loop = asyncio.get_event_loop()
loop.run_until_complete(download(['http://endpoint1', 'http://endpoint2', ...]))

生产代码可能还会捕获aiohttp.ClientConnectionError并重试该 URL。

于 2018-02-13T14:16:09.393 回答