3

最近我正在开发一个小型爬虫,用于在 url 上下载图像。

我在 urllib2 中使用 openurl() 和 f.open()/f.write():

这是代码片段:

# the list for the images' urls
imglist = re.findall(regImg,pageHtml)

# iterate to download images
for index in xrange(1,len(imglist)+1):
    img = urllib2.urlopen(imglist[index-1])
    f = open(r'E:\OK\%s.jpg' % str(index), 'wb')
    print('To Read...')

    # potential timeout, may block for a long time
    # so I wonder whether there is any mechanism to enable retry when time exceeds a certain threshold
    f.write(img.read())
    f.close()
    print('Image %d is ready !' % index)

在上面的代码中,img.read()可能会阻塞很长时间,我希望在这个问题下做一些重试/重新打开图片url的操作。

我还关心上面代码的效率方面,如果要下载的图像数量有点大,使用线程池下载它们似乎更好。

有什么建议么?提前致谢。

ps 我发现img对象上的read()方法可能会导致阻塞,所以单独给urlopen()加个超时参数似乎没什么用。但我发现文件对象没有 read() 的超时版本。对此有什么建议吗?非常感谢 。

4

4 回答 4

2

urllib2.urlopen有一个timeout用于所有阻塞操作(连接建立等)的参数

这个片段取自我的一个项目。我使用线程池一次下载多个文件。它使用urllib.urlretrieve但逻辑是相同的。url_and_path_list(url, path)元组列表,是num_concurrent要生成的线程数,skip_existing如果文件系统中已经存在文件,则跳过文件下载。

def download_urls(url_and_path_list, num_concurrent, skip_existing):
    # prepare the queue
    queue = Queue.Queue()
    for url_and_path in url_and_path_list:
        queue.put(url_and_path)

    # start the requested number of download threads to download the files
    threads = []
    for _ in range(num_concurrent):
        t = DownloadThread(queue, skip_existing)
        t.daemon = True
        t.start()

    queue.join()

class DownloadThread(threading.Thread):
    def __init__(self, queue, skip_existing):
        super(DownloadThread, self).__init__()
        self.queue = queue
        self.skip_existing = skip_existing

    def run(self):
        while True:
            #grabs url from queue
            url, path = self.queue.get()

            if self.skip_existing and exists(path):
                # skip if requested
                self.queue.task_done()
                continue

            try:
                urllib.urlretrieve(url, path)
            except IOError:
                print "Error downloading url '%s'." % url

            #signals to queue job is done
            self.queue.task_done()
于 2012-12-06T16:28:10.237 回答
1

当您使用 urllib2.urlopen ()创建 tje 连接时,您可以提供超时参数。

如文档中所述:

可选的 timeout 参数以秒为单位指定连接尝试等阻塞操作的超时时间(如果未指定,将使用全局默认超时设置)。这实际上只适用于 HTTP、HTTPS 和 FTP 连接。

有了这个,您将能够管理最大等待时间并捕获引发的异常。

于 2012-12-06T16:29:12.347 回答
1

我抓取大量文档的方式是使用批处理器来抓取和转储恒定大小的块。

假设您要抓取一组预先知道的 100K 文档。您可以有一些逻辑来生成固定大小的 1000 个文档块,这些文档将由线程池下载。抓取整个块后,您可以在数据库中进行批量插入。然后继续处理另外的 1000 个文档,依此类推。

遵循这种方法可以获得的优势:

  • 您可以利用线程池加快爬网速度。

  • 它在某种意义上是容错的,您可以从上次失败的块继续。

  • 您可以根据优先级生成块,即首先要抓取的重要文档。因此,如果您无法完成整个批次。重要的文件会被处理,不太重要的文件可以在下次运行时提取。

于 2012-12-06T16:36:12.183 回答
0

一个看起来有效的丑陋黑客。

import os, socket, threading, errno

def timeout_http_body_read(response, timeout = 60):
    def murha(resp):
        os.close(resp.fileno())
        resp.close()

    # set a timer to yank the carpet underneath the blocking read() by closing the os file descriptor
    t = threading.Timer(timeout, murha, (response,))
    try:
        t.start()
        body = response.read()
        t.cancel()
    except socket.error as se:
        if se.errno == errno.EBADF: # murha happened
            return (False, None)
        raise
    return (True, body)
于 2013-08-27T14:49:14.823 回答