5

我想从网站下载和处理很多文件。该网站的服务条款限制了您每秒允许下载的文件数量。

处理文件所需的时间实际上是瓶颈,所以我希望能够并行处理多个文件。但我不希望不同的进程结合起来违反下载限制。所以我需要一些限制过度请求率的东西。我在想类似以下的事情,但我并不完全是该multiprocessing模块的专家。

import multiprocessing
from multiprocessing.managers import BaseManager
import time

class DownloadLimiter(object):

    def __init__(self, time):
        self.time = time
        self.lock = multiprocessing.Lock()

    def get(self, url):
        self.lock.acquire()
        time.sleep(self.time)
        self.lock.release()
        return url


class DownloadManager(BaseManager):
    pass

DownloadManager.register('downloader', DownloadLimiter)


class Worker(multiprocessing.Process):

    def __init__(self, downloader, queue, file_name):
        super().__init__()
        self.downloader = downloader
        self.file_name = file_name
        self.queue = queue

    def run(self):
        while not self.queue.empty():
            url = self.queue.get()
            content = self.downloader.get(url)
            with open(self.file_name, "a+") as fh:
                fh.write(str(content) + "\n")

然后在其他地方运行下载

manager = DownloadManager()
manager.start()
downloader = manager.downloader(0.5)
queue = multiprocessing.Queue()

urls = range(50)
for url in urls:
    queue.put(url)

job1 = Worker(downloader, queue, r"foo.txt")
job2 = Worker(downloader, queue, r"bar.txt")
jobs = [job1, job2]

for job in jobs:
    job.start()

for job in jobs:
    job.join()

这似乎在小范围内完成了这项工作,但我有点担心锁定是否真的正确完成。

此外,如果有更好的模式来实现相同的目标,我很想听听。

4

5 回答 5

1

这可以使用 Ray 干净地完成,Ray是一个用于并行和分布式 Python 的库。

雷资源

当您启动 Ray 时,您可以告诉它该机器上有哪些可用资源。Ray 将自动尝试确定 CPU 内核的数量和 GPU 的数量,但这些可以指定,实际上也可以传入任意用户定义的资源,例如,通过调用

ray.init(num_cpus=4, resources={'Network': 2})

这告诉 Ray 这台机器有 4 个 CPU 内核和 2 个用户定义的资源,称为Network.

每个 Ray“任务”,它是一个可调度的工作单元,都有一定的资源需求。默认情况下,一个任务将需要 1 个 CPU 内核,仅此而已。但是,可以通过声明相应的函数来指定任意资源需求

@ray.remote(resources={'Network': 1})
def f():
    pass

这告诉 Ray,为了f在“工作”进程上执行,必须有 1 个 CPU 核心(默认值)和 1 个Network可用资源。

由于机器有 2 个Network资源和 4 个 CPU 核,最多f可以同时执行 2 个副本。另一方面,如果有另一个函数g声明为

@ray.remote
def g():
    pass

那么g可以同时执行四个副本或两个副本f和两个副本g可以同时执行。

例子

这是一个示例,其中包含用于下载内容和处理内容的实际函数的占位符。

import ray
import time

max_concurrent_downloads = 2

ray.init(num_cpus=4, resources={'Network': max_concurrent_downloads})

@ray.remote(resources={'Network': 1})
def download_content(url):
    # Download the file.
    time.sleep(1)
    return 'result from ' + url

@ray.remote
def process_result(result):
    # Process the result.
    time.sleep(1)
    return 'processed ' + result

urls = ['url1', 'url2', 'url3', 'url4']

result_ids = [download_content.remote(url) for url in urls]

processed_ids = [process_result.remote(result_id) for result_id in result_ids]

# Wait until the tasks have finished and retrieve the results.
processed_results = ray.get(processed_ids)

这是一个时间线描述(您可以通过ray timeline从命令行运行并在 Chrome Web 浏览器的 chrome://tracing 中打开生成的 JSON 文件来生成)。

在上面的脚本中,我们提交了 4 个download_content任务。这些是我们通过指定它们需要Network资源(除了默认的 1 个 CPU 资源)来限制它们的速率。然后我们提交 4 个process_result任务,每个任务都需要默认的 1 个 CPU 资源。任务分三个阶段执行(只看蓝色框)。

  1. 我们从执行 2 个download_content任务开始,一次可以执行多个任务(因为速率限制)。我们还不能执行任何process_result任务,因为它们依赖于download_content任务的输出。
  2. 那些完成了,所以我们开始执行剩下的两个download_content任务以及两个process_result任务,因为我们没有对process_result任务进行速率限制。
  3. 我们执行剩余的process_result任务。

每个“行”是一个工作进程。时间从左到右。

在此处输入图像描述

您可以在Ray 文档中查看有关如何执行此操作的更多信息。

于 2019-03-25T22:41:30.140 回答
0

有一个完全满足您需求的库,称为ratelimit

他们主页上的例子:

此函数将无法在 15 分钟内进行超过 15 次 API 调用。

from ratelimit import limits

import requests

FIFTEEN_MINUTES = 900

@limits(calls=15, period=FIFTEEN_MINUTES)
def call_api(url):
    response = requests.get(url)

    if response.status_code != 200:
        raise Exception('API response: {}'.format(response.status_code))
    return response

顺便说一句,在 I/O 密集型任务(例如网络爬虫)中,您可以使用多线程,而不是多处理。在使用多处理时,您必须创建另一个控制流程,并协调您所做的所有事情。在多线程方法的情况下,所有线程本质上都可以访问主进程内存,因此信号变得更加容易(e在线程之间共享):

import logging
import threading
import time

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )

def wait_for_event(e):
    """Wait for the event to be set before doing anything"""
    logging.debug('wait_for_event starting')
    event_is_set = e.wait()
    logging.debug('event set: %s', event_is_set)

def wait_for_event_timeout(e, t):
    """Wait t seconds and then timeout"""
    while not e.isSet():
        logging.debug('wait_for_event_timeout starting')
        event_is_set = e.wait(t)
        logging.debug('event set: %s', event_is_set)
        if event_is_set:
            logging.debug('processing event')
        else:
            logging.debug('doing other work')


e = threading.Event()
t1 = threading.Thread(name='block', 
                      target=wait_for_event,
                      args=(e,))
t1.start()

t2 = threading.Thread(name='non-block', 
                      target=wait_for_event_timeout, 
                      args=(e, 2))
t2.start()

logging.debug('Waiting before calling Event.set()')
time.sleep(3)
e.set()
logging.debug('Event is set')
于 2019-03-26T06:56:27.780 回答
0

最简单的方法是在主线程上下载并将文档提供给工作池。

在我自己的实现中,我已经走上了使用 celery 处理文档和使用 gevent 下载的路线。哪个做同样的事情只是更复杂。

这是一个简单的例子。

import multiprocessing
from multiprocessing import Pool
import time
import typing

def work(doc: str) -> str:
    # do some processing here....
    return doc + " processed"

def download(url: str) -> str:
    return url  # a hack for demo, use e.g. `requests.get()`

def run_pipeline(
    urls: typing.List[str],
    session_request_limit: int = 10,
    session_length: int = 60,
) -> None:
    """
    Download and process each url in `urls` at a max. rate limit
    given by `session_request_limit / session_length`
    """
    workers = Pool(multiprocessing.cpu_count())
    results = []

    n_requests = 0
    session_start = time.time()

    for url in urls:
        doc = download(url)
        results.append(
            workers.apply_async(work, (doc,))
        )
        n_requests += 1

        if n_requests >= session_request_limit:
            time_to_next_session = session_length - time.time() - session_start
            time.sleep(time_to_next_session)

        if time.time() - session_start >= session_length:
            session_start = time.time()
            n_requests = 0

    # Collect results
    for result in results:
        print(result.get())

if __name__ == "__main__":
    urls = ["www.google.com", "www.stackoverflow.com"]
    run_pipeline(urls)
于 2019-03-26T07:43:34.000 回答
0

在“速率限制下载”下,您的意思并不是很清楚。在这种情况下,它是多个并发下载,这是一个常见的用例,我认为简单的解决方案是使用带有进程池的信号量:

#!/usr/bin/env python3
import os
import time
import random
from functools import partial
from multiprocessing import Pool, Manager


CPU_NUM = 4
CONCURRENT_DOWNLOADS = 2


def download(url, semaphore):
    pid = os.getpid()

    with semaphore:
        print('Process {p} is downloading from {u}'.format(p=pid, u=url))
        time.sleep(random.randint(1, 5))

    # Process the obtained resource:
    time.sleep(random.randint(1, 5))

    return 'Successfully processed {}'.format(url)


def main():
    manager = Manager()

    semaphore = manager.Semaphore(CONCURRENT_DOWNLOADS)
    target = partial(download, semaphore=semaphore)

    urls = ['https://link/to/resource/{i}'.format(i=i) for i in range(10)]

    with Pool(processes=CPU_NUM) as pool:
        results = pool.map(target, urls)

    print(results)


if __name__ == '__main__':
    main()

如您所见,一次只有CONCURRENT_DONWLOADS一个进程正在下载,而其他进程则忙于处理获得的资源。

于 2019-03-27T02:45:46.803 回答
0

好的,经过OP的以下澄清

“每秒下载量”是指全球范围内每秒启动的下载量不超过每秒。

我决定发布另一个答案,因为我认为我的第一个答案对于希望限制多个并发运行进程的人也可能感兴趣。

我认为没有必要使用额外的框架来解决这个问题。这个想法是使用为每个资源链接、资源队列和固定数量的处理工作者(它们是进程而不是线程)生成的下载线程:

#!/usr/bin/env python3
import os
import time
import random
from threading import Thread
from multiprocessing import Process, JoinableQueue


WORKERS = 4
DOWNLOADS_PER_SECOND = 2


def download_resource(url, resource_queue):
    pid = os.getpid()

    t = time.strftime('%H:%M:%S')
    print('Thread {p} is downloading from {u} ({t})'.format(p=pid, u=url, t=t),
          flush=True)
    time.sleep(random.randint(1, 10))

    results = '[resource {}]'.format(url)
    resource_queue.put(results)


def process_resource(resource_queue):
    pid = os.getpid()

    while True:
        res = resource_queue.get()

        print('Process {p} is processing {r}'.format(p=pid, r=res),
              flush=True)
        time.sleep(random.randint(1, 10))

        resource_queue.task_done()


def main():
    resource_queue = JoinableQueue()

    # Start process workers:
    for _ in range(WORKERS):
        worker = Process(target=process_resource,
                         args=(resource_queue,),
                         daemon=True)
        worker.start()

    urls = ['https://link/to/resource/{i}'.format(i=i) for i in range(10)]

    while urls:
        target_urls = urls[:DOWNLOADS_PER_SECOND]
        urls = urls[DOWNLOADS_PER_SECOND:]

        # Start downloader threads:
        for url in target_urls:
            downloader = Thread(target=download_resource,
                                args=(url, resource_queue),
                                daemon=True)
            downloader.start()

        time.sleep(1)

    resource_queue.join()


if __name__ == '__main__':
    main()

结果如下所示:

$ ./limit_download_rate.py
Thread 32482 is downloading from https://link/to/resource/0 (10:14:08)
Thread 32482 is downloading from https://link/to/resource/1 (10:14:08)
Thread 32482 is downloading from https://link/to/resource/2 (10:14:09)
Thread 32482 is downloading from https://link/to/resource/3 (10:14:09)
Thread 32482 is downloading from https://link/to/resource/4 (10:14:10)
Thread 32482 is downloading from https://link/to/resource/5 (10:14:10)
Process 32483 is processing [resource https://link/to/resource/2]
Process 32484 is processing [resource https://link/to/resource/0]
Thread 32482 is downloading from https://link/to/resource/6 (10:14:11)
Thread 32482 is downloading from https://link/to/resource/7 (10:14:11)
Process 32485 is processing [resource https://link/to/resource/1]
Process 32486 is processing [resource https://link/to/resource/3]
Thread 32482 is downloading from https://link/to/resource/8 (10:14:12)
Thread 32482 is downloading from https://link/to/resource/9 (10:14:12)
Process 32484 is processing [resource https://link/to/resource/6]
Process 32485 is processing [resource https://link/to/resource/9]
Process 32483 is processing [resource https://link/to/resource/8]
Process 32486 is processing [resource https://link/to/resource/4]
Process 32485 is processing [resource https://link/to/resource/7]
Process 32483 is processing [resource https://link/to/resource/5]

在这里,每隔一秒DOWNLOADS_PER_SECOND就会启动一个线程,在这个例子中是两个,然后下载资源并将其放入队列中。这WORKERS是一些从队列中获取资源以进行进一步处理的进程。使用此设置,您将能够限制每秒开始的下载次数,并让工作人员并行处理获得的资源。

于 2019-03-29T02:54:45.653 回答