您绝对不想创建每个处理 1 个 URL 的 25000 个子进程。但是您也不想要 1 个处理 25000 的进程(这是您构建的)。例如,您可能想要 8 个进程,每个进程处理大约 25000 的 1/8。
您可以通过创建一个Queue
完整的 URL、创建 8 个进程来循环通过拉下一个 URL 并完成工作来为该队列提供服务,然后加入所有 8 个进程来做到这一点。
但是你要做的是建立一个进程池。并且已经内置了一个multiprocessing
。除了已经构建和调试之外,它还具有您可能不会想到的功能。它还可以以各种不同的方式将结果传回。它可以让你一次分块成批的几个 URL(如果你给每个进程一个完整的列表的 1/8,那么就没有负载平衡;如果你一次给每个进程 1 个,你就是在浪费时间- 可以用于实际工作的进程通信)。等等。
所以,让我们使用它:
def do_it(url, headers):
print "adding header to: \n" + i
requests.post(i, headers=headers)
print "done!"
pool = multiprocessing.Pool(max_workers=8)
results = pool.map(lambda url: do_it(url, headers), urls)
pool.join()
这段代码唯一真正的问题是你在等待它建立一个包含 25000 个结果的列表,这些结果都是None
. 还有其他方法可以等待它而不建立返回结果,但实际上,这个列表的成本不值得处理它的额外复杂性。
我在那里使用 a 的原因lambda
是你需要一个只接受 each 的函数url
,而你只有一个接受 eachurl
加上一个headers
参数的函数。lambda
您可以通过使用or定义新包装器来创建该函数def
,或者通过调用为您喜欢的高阶函数来创建该函数partial
。这些基本上是等价的:
results = pool.map(lambda url: do_it(url, headers), urls)
def wrapper(url):
return do_it(url, headers)
results = pool.map(wrapper, urls)
results = pool.map(partial(do_it, headers=headers), urls)
您可能还需要考虑使用 anExecutor
而不是普通池。AnExecutor
返回更智能的结果对象,称为Future
s,在许多情况下更容易处理。(例如,不必在 中的四个不同map
变体之间做出决定Pool
,您只需在简单submit
方法上创建一个理解,然后在结果s 上调用as_completed
or 。)由于这直到 3.2 才添加到 Python 中,对于 2 .x 您必须安装 backport 库才能使用它。无论如何,对于像这样的微不足道的情况,它不会有太大的不同:wait
Future
futures
with futures.ProcessPoolExecutor(max_workers=8) as executor:
results = executor.map(lambda url: do_it(url, headers), urls)
… 或者:
with futures.ProcessPoolExecutor(max_workers=8) as executor:
fs = [executor.submit(do_it, url, headers) for url in urls]
futures.wait(fs)