有两种方法可以同时做事。或者,真的,2-3/4 左右:
- 多线程
- 或多个进程,特别是如果“事物”占用大量 CPU 资源
- 或协程或greenlets,特别是如果有成千上万的“事物”
- 或以上之一的池
- 事件循环(手动编码)
- 或混合 greenlet/事件循环系统,如
gevent
.
如果您有 1000 个 URL,您可能不想同时执行 1000 个请求。例如,Web 浏览器通常一次只执行 8 个请求。池是一次只做 8 件事的好方法,所以让我们这样做。
而且,由于您一次只做 8 件事情,而且这些事情主要受 I/O 限制,因此线程是完美的。
我会用futures
. (如果您使用的是 Python 2.x 或 3.0-3.1,则需要安装 backport,. futures
)
import concurrent.futures
urls = ['http://example.com/foo',
'http://example.com/bar']
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
result = b''.join(executor.map(download, urls))
with open('output_file', 'wb') as f:
f.write(result)
当然,您需要编写download
函数,但这与您一次编写这些函数时编写的函数完全相同。
例如,使用urlopen
(如果您使用的是 Python 2.x,请使用urllib2
代替urllib.request
):
def download(url):
with urllib.request.urlopen(url) as f:
return f.read()
如果您想学习如何自己构建线程池执行器,源代码实际上非常简单,并且multiprocessing.pool
是 stdlib 中的另一个很好的示例。
然而,这两者都有很多多余的代码(处理弱引用以提高内存使用率、干净地关闭、提供不同的等待结果的方式、正确传播异常等)可能会妨碍您。
如果您环顾 PyPI 和 ActiveState,您会发现更简单的设计threadpool
,您可能会发现更容易理解。
但这是最简单的可连接线程池:
class ThreadPool(object):
def __init__(self, max_workers):
self.queue = queue.Queue()
self.workers = [threading.Thread(target=self._worker) for _ in range(max_workers)]
def start(self):
for worker in self.workers:
worker.start()
def stop(self):
for _ in range(self.workers):
self.queue.put(None)
for worker in self.workers:
worker.join()
def submit(self, job):
self.queue.put(job)
def _worker(self):
while True:
job = self.queue.get()
if job is None:
break
job()
当然,一个非常简单的实现的缺点是它的使用不友好concurrent.futures.ThreadPoolExecutor
:
urls = ['http://example.com/foo',
'http://example.com/bar']
results = [list() for _ in urls]
results_lock = threading.Lock()
def download(url, i):
with urllib.request.urlopen(url) as f:
result = f.read()
with results_lock:
results[i] = url
pool = ThreadPool(max_workers=8)
pool.start()
for i, url in enumerate(urls):
pool.submit(functools.partial(download, url, i))
pool.stop()
result = b''.join(results)
with open('output_file', 'wb') as f:
f.write(result)