0

我正在尝试以异步方式使用 Python,以加快我对服务器的请求。服务器的响应时间很慢(通常是几秒钟,但有时也快于一秒钟),但并行运行良好。我无权访问此服务器,也无法对其进行任何更改。所以,我有一个很大的 URL 列表(在下面的代码中),我事先知道,并且希望通过一次发出请求pages来加快它们的加载速度。NO_TASKS=5另一方面,我不想让服务器超载,所以我希望每个请求之间的最小暂停时间为 1 秒(即每秒 1 个请求的限制)。

到目前为止,我已经使用 Trio 队列成功地实现了信号量部分(一次五个请求)。

import asks
import time
import trio

NO_TASKS = 5


asks.init('trio')
asks_session = asks.Session()
queue = trio.Queue(NO_TASKS)
next_request_at = 0
results = []


pages = [
    'https://www.yahoo.com/',
    'http://www.cnn.com',
    'http://www.python.org',
    'http://www.jython.org',
    'http://www.pypy.org',
    'http://www.perl.org',
    'http://www.cisco.com',
    'http://www.facebook.com',
    'http://www.twitter.com',
    'http://www.macrumors.com/',
    'http://arstechnica.com/',
    'http://www.reuters.com/',
    'http://abcnews.go.com/',
    'http://www.cnbc.com/',
]


async def async_load_page(url):
    global next_request_at
    sleep = next_request_at
    next_request_at = max(trio.current_time() + 1, next_request_at)
    await trio.sleep_until(sleep)
    next_request_at = max(trio.current_time() + 1, next_request_at)
    print('start loading page {} at {} seconds'.format(url, trio.current_time()))
    req = await asks_session.get(url)
    results.append(req.text)


async def producer(url):
    await queue.put(url)  


async def consumer():
    while True:
        if queue.empty():
            print('queue empty')
            return
        url = await queue.get()
        await async_load_page(url)


async def main():
    async with trio.open_nursery() as nursery:
        for page in pages:
            nursery.start_soon(producer, page)
        await trio.sleep(0.2)
        for _ in range(NO_TASKS):
            nursery.start_soon(consumer)


start = time.time()
trio.run(main)

但是,我错过了限制部分的实现,即max的实现。每秒 1 个请求。您可以在上面看到我这样做的尝试(前五行async_load_page),但是正如您在执行代码时看到的那样,这不起作用:

start loading page http://www.reuters.com/ at 58097.12261669573 seconds
start loading page http://www.python.org at 58098.12367392373 seconds
start loading page http://www.pypy.org at 58098.12380622773 seconds
start loading page http://www.macrumors.com/ at 58098.12389389973 seconds
start loading page http://www.cisco.com at 58098.12397854373 seconds
start loading page http://arstechnica.com/ at 58098.12405119873 seconds
start loading page http://www.facebook.com at 58099.12458010273 seconds
start loading page http://www.twitter.com at 58099.37738939873 seconds
start loading page http://www.perl.org at 58100.37830828273 seconds
start loading page http://www.cnbc.com/ at 58100.91712723473 seconds
start loading page http://abcnews.go.com/ at 58101.91770178373 seconds
start loading page http://www.jython.org at 58102.91875295573 seconds
start loading page https://www.yahoo.com/ at 58103.91993155273 seconds
start loading page http://www.cnn.com at 58104.48031027673 seconds
queue empty
queue empty
queue empty
queue empty
queue empty

我花了一些时间寻找答案,但找不到任何答案。

4

4 回答 4

4

实现目标的一种方法是使用工作人员在发送请求之前获取的互斥锁,并在一段时间后在单独的任务中释放:

async def fetch_urls(urls: Iterator, responses, n_workers, throttle):
    # Using binary `trio.Semaphore` to be able
    # to release it from a separate task.
    mutex = trio.Semaphore(1)

    async def tick():
        await trio.sleep(throttle)
        mutex.release()

    async def worker():
        for url in urls:
            await mutex.acquire()
            nursery.start_soon(tick)
            response = await asks.get(url)
            responses.append(response)

    async with trio.open_nursery() as nursery:
        for _ in range(n_workers):
            nursery.start_soon(worker)

如果 aworker比几秒后更快地得到响应throttle,它将阻止await mutex.acquire(). 否则,mutex遗嘱将被释放,tick另一个worker人将能够获得它。

这类似于漏桶算法的工作原理:

  • 等待的工人mutex就像桶里的水。
  • 每一个tick都像一个以恒定速率泄漏的水桶。

如果您在发送请求之前添加一些日志记录,您应该会得到类似于以下内容的输出:

   0.00169 started
  0.001821 n_workers: 5
  0.001833 throttle: 1
  0.002152 fetching https://httpbin.org/delay/4
     1.012 fetching https://httpbin.org/delay/2
     2.014 fetching https://httpbin.org/delay/2
     3.017 fetching https://httpbin.org/delay/3
      4.02 fetching https://httpbin.org/delay/0
     5.022 fetching https://httpbin.org/delay/2
     6.024 fetching https://httpbin.org/delay/2
     7.026 fetching https://httpbin.org/delay/3
     8.029 fetching https://httpbin.org/delay/0
     9.031 fetching https://httpbin.org/delay/0
     10.61 finished
于 2018-10-03T20:19:55.213 回答
2

恕我直言,使用trio.current_time()它太复杂了。

进行速率限制的最简单方法是速率限制器,即基本上执行此操作的单独任务:

async def ratelimit(queue,tick, task_status=trio.TASK_STATUS_IGNORED):
    with trio.open_cancel_scope() as scope:
        task_status.started(scope)
        while True:
            await queue.get()
            await trio.sleep(tick)

示例使用:

async with trio.open_nursery() as nursery:
    q = trio.Queue(0)
    limiter = await nursery.start(ratelimit, q, 1)
    while whatever:
        await q.put(None) # will return at most once per second
        do_whatever()
    limiter.cancel()

换句话说,你开始这个任务

q = trio.Queue(0)
limiter = await nursery.start(ratelimit, q, 1)

然后你可以确定最多一次调用

await q.put(None)

每秒将返回,因为零长度队列充当集合点。完成后打电话

 limiter.cancel()

停止限速任务,否则您的托儿所不会退出。

如果您的用例包括您需要在取消限制器之前完成的启动子任务,那么最简单的方法是在另一个托儿所中冲洗它们,即代替

while whatever:
    await q.put(None) # will return at most once per second
    do_whatever()
limiter.cancel()

你会使用类似的东西

async with trio.open_nursery() as inner_nursery:
    await start_tasks(inner_nursery, q)
limiter.cancel()

这将在触摸限制器之前等待任务完成。

注意:您可以轻松地将其调整为“突发”模式,即在速率限制开始之前允许一定数量的请求,只需增加队列的长度即可。

于 2018-07-09T20:05:51.553 回答
1

该解决方案的动机和起源

自从我问这个问题以来已经过去了几个月。从那时起,Python 得到了改进,三重奏也得到了改进(以及我对它们的了解)。所以我认为是时候使用带有类型注释和 trio-0.10 内存通道的 Python 3.6 进行一点更新了。

我对原始版本进行了自己的改进,但是在阅读了@Roman Novatorov 的出色解决方案后,再次对其进行了调整,这就是结果。感谢他对该函数的主要结构(以及使用httpbin.org进行说明的想法)。我选择使用内存通道而不是互斥锁,以便能够从 worker 中取出任何令牌重新释放逻辑。

解法说明

我可以这样改写原来的问题:

  • 我希望有许多工作人员彼此独立地启动请求(因此,它们将被实现为异步函数)。
  • 任何时候都会释放零个或一个代币;任何开始向服务器请求的工作人员都会消耗一个令牌,并且在经过最短时间之前不会发出下一个令牌。在我的解决方案中,我使用 trio 的内存通道在令牌发行者和令牌消费者(工人)之间进行协调

如果您不熟悉内存通道及其语法,您可以在trio doc中了解它们。我认为async with memory_channeland的逻辑一开始memory_channel.clone()可能会令人困惑。

from typing import List, Iterator

import asks
import trio

asks.init('trio')

links: List[str] = [
    'https://httpbin.org/delay/7',
    'https://httpbin.org/delay/6',
    'https://httpbin.org/delay/4'
] * 3


async def fetch_urls(urls: List[str], number_workers: int, throttle_rate: float):

    async def token_issuer(token_sender: trio.abc.SendChannel, number_tokens: int):
        async with token_sender:
            for _ in range(number_tokens):
                await token_sender.send(None)
                await trio.sleep(1 / throttle_rate)

    async def worker(url_iterator: Iterator, token_receiver: trio.abc.ReceiveChannel):
        async with token_receiver:
            for url in url_iterator:
                await token_receiver.receive()

                print(f'[{round(trio.current_time(), 2)}] Start loading link: {url}')
                response = await asks.get(url)
                # print(f'[{round(trio.current_time(), 2)}] Loaded link: {url}')
                responses.append(response)

    responses = []
    url_iterator = iter(urls)
    token_send_channel, token_receive_channel = trio.open_memory_channel(0)

    async with trio.open_nursery() as nursery:
        async with token_receive_channel:
            nursery.start_soon(token_issuer, token_send_channel.clone(), len(urls))
            for _ in range(number_workers):
                nursery.start_soon(worker, url_iterator, token_receive_channel.clone())

    return responses

responses = trio.run(fetch_urls, links, 5, 1.)

日志输出示例:

如您所见,所有页面请求之间的最短时间为一秒:

[177878.99] Start loading link: https://httpbin.org/delay/7
[177879.99] Start loading link: https://httpbin.org/delay/6
[177880.99] Start loading link: https://httpbin.org/delay/4
[177881.99] Start loading link: https://httpbin.org/delay/7
[177882.99] Start loading link: https://httpbin.org/delay/6
[177886.20] Start loading link: https://httpbin.org/delay/4
[177887.20] Start loading link: https://httpbin.org/delay/7
[177888.20] Start loading link: https://httpbin.org/delay/6
[177889.44] Start loading link: https://httpbin.org/delay/4

对解决方案的评论

由于异步代码并非不典型,此解决方案不维护请求的 url 的原始顺序。解决此问题的一种方法是将 id 与原始 url 相关联,例如使用元组结构,将响应放入响应字典中,然后一个接一个地抓取响应以将它们放入响应列表中(保存排序并具有线性复杂)。

于 2019-02-01T20:14:27.263 回答
0

next_request_at每次进入时都需要加1 async_load_page。尝试使用next_request_at = max(trio.current_time() + 1, next_request_at + 1). 另外我认为你只需要设置一次。如果您将其设置在 awaits 周围,您可能会遇到麻烦,您可以让其他任务有机会在再次检查它之前对其进行更改。

于 2018-07-09T18:40:57.450 回答