29

我确信标准库中有这样的东西,但似乎我错了。

我有一堆我想要urlopen并行的网址。我想要类似内置map函数的东西,除了工作是由一堆线程并行完成的。

有没有一个好的模块可以做到这一点?

4

5 回答 5

53

multiprocessing.Pool中有一个map方法。这会执行多个过程。

如果多个进程不是你的菜,你可以使用multiprocessing.dummy它使用线程。

import urllib
import multiprocessing.dummy

p = multiprocessing.dummy.Pool(5)
def f(post):
    return urllib.urlopen('http://stackoverflow.com/questions/%u' % post)

print p.map(f, range(3329361, 3329361 + 5))
于 2010-07-26T07:11:16.993 回答
16

有人建议我为此使用该futures软件包。我试过了,它似乎工作。

http://pypi.python.org/pypi/futures

这是一个例子:

"Download many URLs in parallel."

import functools
import urllib.request
import futures

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

def load_url(url, timeout):
    return urllib.request.urlopen(url, timeout=timeout).read()

with futures.ThreadPoolExecutor(50) as executor:
   future_list = executor.run_to_futures(
           [functools.partial(load_url, url, 30) for url in URLS])
于 2010-07-27T11:51:21.493 回答
5

这是我对线程映射的实现:

from threading import Thread
from queue import Queue

def thread_map(f, iterable, pool=None):
    """
    Just like [f(x) for x in iterable] but each f(x) in a separate thread.
    :param f: f
    :param iterable: iterable
    :param pool: thread pool, infinite by default
    :return: list if results
    """
    res = {}
    if pool is None:
        def target(arg, num):
            try:
                res[num] = f(arg)
            except:
                res[num] = sys.exc_info()

        threads = [Thread(target=target, args=[arg, i]) for i, arg in enumerate(iterable)]
    else:
        class WorkerThread(Thread):
            def run(self):
                while True:
                    try:
                        num, arg = queue.get(block=False)
                        try:
                            res[num] = f(arg)
                        except:
                            res[num] = sys.exc_info()
                    except Empty:
                        break

        queue = Queue()
        for i, arg in enumerate(iterable):
            queue.put((i, arg))

        threads = [WorkerThread() for _ in range(pool)]

    [t.start() for t in threads]
    [t.join() for t in threads]
    return [res[i] for i in range(len(res))]
于 2016-03-24T11:08:16.143 回答
3

Python 模块Queue可能会对您有所帮助。使用一个线程Queue.put()将所有 url 推送到队列中,而工作线程只是get()将 url 一个一个地推入队列。

Python Docs: queue — 一个同步的队列类

于 2010-07-25T13:50:19.930 回答
-1

我将它包装在一个函数中(未经测试):

import itertools
import threading
import urllib2
import Queue

def openurl(url, queue):
    def starter():
        try:
            result = urllib2.urlopen(url)
        except Ecxeption, exc:
            def raiser():
                raise exc
            queue.put((url, raiser))
        else:
            queue.put((url, lambda:result))
    threadind.Thread(target=starter).start()

myurls = ... # the list of urls
myqueue = Queue.Queue()

map(openurl, myurls, itertools.repeat(myqueue))

for each in myurls:
    url, getresult = queue.get()
    try:
        result = getresult()
    except Exception, exc:
        print 'exception raised:' + str(exc)
    else:
        # do stuff with result
于 2010-07-25T14:53:31.423 回答