我想从网站下载和处理很多文件。该网站的服务条款限制了您每秒允许下载的文件数量。
处理文件所需的时间实际上是瓶颈,所以我希望能够并行处理多个文件。但我不希望不同的进程结合起来违反下载限制。所以我需要一些限制过度请求率的东西。我在想类似以下的事情,但我并不完全是该multiprocessing
模块的专家。
import multiprocessing
from multiprocessing.managers import BaseManager
import time
class DownloadLimiter(object):
def __init__(self, time):
self.time = time
self.lock = multiprocessing.Lock()
def get(self, url):
self.lock.acquire()
time.sleep(self.time)
self.lock.release()
return url
class DownloadManager(BaseManager):
pass
DownloadManager.register('downloader', DownloadLimiter)
class Worker(multiprocessing.Process):
def __init__(self, downloader, queue, file_name):
super().__init__()
self.downloader = downloader
self.file_name = file_name
self.queue = queue
def run(self):
while not self.queue.empty():
url = self.queue.get()
content = self.downloader.get(url)
with open(self.file_name, "a+") as fh:
fh.write(str(content) + "\n")
然后在其他地方运行下载
manager = DownloadManager()
manager.start()
downloader = manager.downloader(0.5)
queue = multiprocessing.Queue()
urls = range(50)
for url in urls:
queue.put(url)
job1 = Worker(downloader, queue, r"foo.txt")
job2 = Worker(downloader, queue, r"bar.txt")
jobs = [job1, job2]
for job in jobs:
job.start()
for job in jobs:
job.join()
这似乎在小范围内完成了这项工作,但我有点担心锁定是否真的正确完成。
此外,如果有更好的模式来实现相同的目标,我很想听听。