69

我花了一整天的时间在 Python 中寻找最简单的多线程 URL 获取器,但我发现的大多数脚本都使用队列或多处理或复杂的库。

最后我自己写了一个,我将其作为答案报告。请随时提出任何改进建议。

我想其他人可能一直在寻找类似的东西。

4

5 回答 5

54

尽可能简化您的原始版本:

import threading
import urllib2
import time

start = time.time()
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]

def fetch_url(url):
    urlHandler = urllib2.urlopen(url)
    html = urlHandler.read()
    print "'%s\' fetched in %ss" % (url, (time.time() - start))

threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()

print "Elapsed Time: %s" % (time.time() - start)

这里唯一的新技巧是:

  • 跟踪您创建的线程。
  • 如果您只想知道线程何时完成,请不要打扰线程计数器;join已经告诉你了。
  • 如果您不需要任何状态或外部 API,则不需要Thread子类,只需一个target函数。
于 2013-04-24T01:50:02.610 回答
40

multiprocessing有一个不启动其他进程的线程池:

#!/usr/bin/env python
from multiprocessing.pool import ThreadPool
from time import time as timer
from urllib2 import urlopen

urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]

def fetch_url(url):
    try:
        response = urlopen(url)
        return url, response.read(), None
    except Exception as e:
        return url, None, e

start = timer()
results = ThreadPool(20).imap_unordered(fetch_url, urls)
for url, html, error in results:
    if error is None:
        print("%r fetched in %ss" % (url, timer() - start))
    else:
        print("error fetching %r: %s" % (url, error))
print("Elapsed Time: %s" % (timer() - start,))

与基于 - 的解决方案相比的优势Thread

  • ThreadPool允许限制最大并发连接数(20在代码示例中)
  • 输出没有乱码,因为所有输出都在主线程中
  • 记录错误
  • 该代码无需更改即可在 Python 2 和 3 上运行(假设from urllib.request import urlopen在 Python 3 上)。
于 2015-01-16T14:45:56.897 回答
21

中的主要示例concurrent.futures可以满足您的所有需求,而且要简单得多。另外,它一次只做 5 个就可以处理大量的 URL,而且它可以更好地处理错误。

当然,这个模块只内置于 Python 3.2 或更高版本……但如果你使用的是 2.5-3.1,你可以只在futuresPyPI 上安装 backport。您需要从示例代码中更改的所有内容是搜索并替换concurrent.futuresfutures,并且对于 2.x,urllib.request使用urllib2.

这是向后移植到 2.x 的示例,修改为使用您的 URL 列表并添加时间:

import concurrent.futures
import urllib2
import time

start = time.time()
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]

# Retrieve a single page and report the url and contents
def load_url(url, timeout):
    conn = urllib2.urlopen(url, timeout=timeout)
    return conn.readall()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in urls}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print '%r generated an exception: %s' % (url, exc)
        else:
            print '"%s" fetched in %ss' % (url,(time.time() - start))
print "Elapsed Time: %ss" % (time.time() - start)

但是你可以让这更简单。真的,您只需要:

def load_url(url):
    conn = urllib2.urlopen(url, timeout)
    data = conn.readall()
    print '"%s" fetched in %ss' % (url,(time.time() - start))
    return data
    
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    pages = executor.map(load_url, urls)

print "Elapsed Time: %ss" % (time.time() - start)
于 2013-04-24T00:10:28.337 回答
2

我现在发布了一个不同的解决方案,通过让工作线程不是守护进程并将它们加入主线程(这意味着阻塞主线程直到所有工作线程都完成)而不是通知每个工作线程的执行结束回调到全局函数(正如我在上一个答案中所做的那样),正如在一些评论中指出的那样,这种方式不是线程安全的。

import threading
import urllib2
import time

start = time.time()
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]

class FetchUrl(threading.Thread):
    def __init__(self, url):
        threading.Thread.__init__(self)
        self.url = url

    def run(self):
        urlHandler = urllib2.urlopen(self.url)
        html = urlHandler.read()
        print "'%s\' fetched in %ss" % (self.url,(time.time() - start))

for url in urls:
    FetchUrl(url).start()

#Join all existing threads to main thread.
for thread in threading.enumerate():
    if thread is not threading.currentThread():
        thread.join()

print "Elapsed Time: %s" % (time.time() - start)
于 2013-04-24T01:02:40.257 回答
-1

此脚本从数组中定义的一组 URL 中获取内容。它为每个要获取的 URL 生成一个线程,因此它旨在用于一组有限的 URL。

每个线程不使用队列对象,而是通过对全局函数的回调来通知其结束,该函数保持对正在运行的线程数的计数。

import threading
import urllib2
import time

start = time.time()
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]
left_to_fetch = len(urls)

class FetchUrl(threading.Thread):
    def __init__(self, url):
        threading.Thread.__init__(self)
        self.setDaemon = True
        self.url = url

    def run(self):
        urlHandler = urllib2.urlopen(self.url)
        html = urlHandler.read()
        finished_fetch_url(self.url)


def finished_fetch_url(url):
    "callback function called when a FetchUrl thread ends"
    print "\"%s\" fetched in %ss" % (url,(time.time() - start))
    global left_to_fetch
    left_to_fetch-=1
    if left_to_fetch==0:
        "all urls have been fetched"
        print "Elapsed Time: %ss" % (time.time() - start)


for url in urls:
    "spawning a FetchUrl thread for each url to fetch"
    FetchUrl(url).start()
于 2013-04-23T23:58:42.790 回答