我确信标准库中有这样的东西,但似乎我错了。
我有一堆我想要urlopen
并行的网址。我想要类似内置map
函数的东西,除了工作是由一堆线程并行完成的。
有没有一个好的模块可以做到这一点?
我确信标准库中有这样的东西,但似乎我错了。
我有一堆我想要urlopen
并行的网址。我想要类似内置map
函数的东西,除了工作是由一堆线程并行完成的。
有没有一个好的模块可以做到这一点?
multiprocessing.Pool中有一个map
方法。这会执行多个过程。
如果多个进程不是你的菜,你可以使用multiprocessing.dummy它使用线程。
import urllib
import multiprocessing.dummy
p = multiprocessing.dummy.Pool(5)
def f(post):
return urllib.urlopen('http://stackoverflow.com/questions/%u' % post)
print p.map(f, range(3329361, 3329361 + 5))
有人建议我为此使用该futures
软件包。我试过了,它似乎工作。
http://pypi.python.org/pypi/futures
这是一个例子:
"Download many URLs in parallel."
import functools
import urllib.request
import futures
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
def load_url(url, timeout):
return urllib.request.urlopen(url, timeout=timeout).read()
with futures.ThreadPoolExecutor(50) as executor:
future_list = executor.run_to_futures(
[functools.partial(load_url, url, 30) for url in URLS])
这是我对线程映射的实现:
from threading import Thread
from queue import Queue
def thread_map(f, iterable, pool=None):
"""
Just like [f(x) for x in iterable] but each f(x) in a separate thread.
:param f: f
:param iterable: iterable
:param pool: thread pool, infinite by default
:return: list if results
"""
res = {}
if pool is None:
def target(arg, num):
try:
res[num] = f(arg)
except:
res[num] = sys.exc_info()
threads = [Thread(target=target, args=[arg, i]) for i, arg in enumerate(iterable)]
else:
class WorkerThread(Thread):
def run(self):
while True:
try:
num, arg = queue.get(block=False)
try:
res[num] = f(arg)
except:
res[num] = sys.exc_info()
except Empty:
break
queue = Queue()
for i, arg in enumerate(iterable):
queue.put((i, arg))
threads = [WorkerThread() for _ in range(pool)]
[t.start() for t in threads]
[t.join() for t in threads]
return [res[i] for i in range(len(res))]
Python 模块Queue
可能会对您有所帮助。使用一个线程Queue.put()
将所有 url 推送到队列中,而工作线程只是get()
将 url 一个一个地推入队列。
我将它包装在一个函数中(未经测试):
import itertools
import threading
import urllib2
import Queue
def openurl(url, queue):
def starter():
try:
result = urllib2.urlopen(url)
except Ecxeption, exc:
def raiser():
raise exc
queue.put((url, raiser))
else:
queue.put((url, lambda:result))
threadind.Thread(target=starter).start()
myurls = ... # the list of urls
myqueue = Queue.Queue()
map(openurl, myurls, itertools.repeat(myqueue))
for each in myurls:
url, getresult = queue.get()
try:
result = getresult()
except Exception, exc:
print 'exception raised:' + str(exc)
else:
# do stuff with result