6

我的任务是从给定的 url 列表中下载 1M+ 图像。推荐的方法是什么?

在阅读了Greenlet Vs之后。我调查过的线程gevent,但我无法可靠地运行它。我玩了一个包含 100 个 url 的测试集,有时它在 1.5 秒内完成,但有时它需要超过 30 秒,这很奇怪,因为每个请求的超时*为 0.1,所以它永远不会超过 10 秒。

*见下文代码

我也调查过,grequests但他们似乎在异常处理方面存在问题。

我的“要求”是我可以

  • 检查下载时出现的错误(超时、损坏的图像......),
  • 监控处理图像数量的进度和
  • 尽可能快。
from gevent import monkey; monkey.patch_all()
from time import time
import requests
from PIL import Image
import cStringIO
import gevent.hub
POOL_SIZE = 300


def download_image_wrapper(task):
    return download_image(task[0], task[1])

def download_image(image_url, download_path):
    raw_binary_request = requests.get(image_url, timeout=0.1).content
    image = Image.open(cStringIO.StringIO(raw_binary_request))
    image.save(download_path)

def download_images_gevent_spawn(list_of_image_urls, base_folder):
    download_paths = ['/'.join([base_folder, url.split('/')[-1]])
                      for url in list_of_image_urls]
    parameters = [[image_url, download_path] for image_url, download_path in
             zip(list_of_image_urls, download_paths)]
    tasks = [gevent.spawn(download_image_wrapper, parameter_tuple) for parameter_tuple in parameters]
    for task in tasks:
        try:
            task.get()
        except Exception:
            print 'x',
            continue
        print '.',

test_urls = # list of 100 urls

t1 = time()
download_images_gevent_spawn(test_urls, 'download_temp')
print time() - t1
4

3 回答 3

1

我认为最好坚持使用 urllib2,例如https://github.com/gevent/gevent/blob/master/examples/concurrent_download.py#L1

试试这个代码,我想这就是你要问的。

import gevent
from gevent import monkey

# patches stdlib (including socket and ssl modules) to cooperate with other greenlets
monkey.patch_all()

import sys

urls = sorted(chloya_files)

if sys.version_info[0] == 3:
    from urllib.request import urlopen
else:
    from urllib2 import urlopen


def download_file(url):
    data = urlopen(url).read()
    img_name = url.split('/')[-1]
    with open('c:/temp/img/'+img_name, 'wb') as f:
        f.write(data)
    return True


from time import time

t1 = time()
tasks = [gevent.spawn(download_file, url) for url in urls]
gevent.joinall(tasks, timeout = 12.0)
print "Sucessful: %s from %s" % (sum(1 if task.value else 0 for task in tasks), len(tasks))
print time() - t1
于 2015-11-16T00:21:04.690 回答
1

有一个简单的解决方案使用geventRequests 简单的请求

使用Requests Session进行 HTTP 持久连接。由于gevent使异步,我认为在 HTTP 请求中 Requests没有必要。timeout

默认情况下,为 10 个主机requests.Session缓存 TCP 连接 ( pool_connections),并限制每个缓存的 TCP 连接 ( ) 10 个并发 HTTP 请求pool_maxsize。应通过显式创建 http 适配器来调整默认配置以适应需要。

session = requests.Session()
http_adapter = requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100)
session.mount('http://', http_adapter)

打破生产者-消费者的任务。图像下载是生产者任务,图像处理是消费者任务。

如果图像处理库PIL不是异步的,它可能会阻塞生产者协程。如果是这样,消费者池可以是一个gevent.threadpool.ThreadPool. 铁

from gevent.threadpool import ThreadPool
consumer = ThreadPool(POOL_SIZE)  

这是如何完成的概述。我没有测试代码。

from gevent import monkey; monkey.patch_all()
from time import time
import requests
from PIL import Image
from io import BytesIO
import os
from urlparse import urlparse
from gevent.pool import Pool

def download(url):
    try:
        response = session.get(url)
    except Exception as e:
        print(e)
    else:
        if response.status_code == requests.codes.ok:
            file_name = urlparse(url).path.rsplit('/',1)[-1]
            return (response.content,file_name)
        response.raise_for_status()

def process(img):
    if img is None:
        return None
    img, name = img
    img = Image.open(BytesIO(img))
    path = os.path.join(base_folder, name)
    try:
        img.save(path)
    except Exception as e:
        print(e)
    else:
        return True

def run(urls):        
    consumer.map(process, producer.imap_unordered(download, urls))

if __name__ == '__main__':
        POOL_SIZE = 300
        producer = Pool(POOL_SIZE)
        consumer = Pool(POOL_SIZE)

        session = requests.Session()
        http_adapter = requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100)
        session.mount('http://', http_adapter)

        test_urls = # list of 100 urls
        base_folder = 'download_temp'
        t1 = time()
        run(test_urls)
        print time() - t1  
于 2015-11-17T07:16:09.957 回答
-1

我会建议关注 Grablib http://grablib.org/

它是一个基于 pycurl 和 multicurl 的异步解析器。它还尝试自动解决网络错误(如超时重试等)。

我相信 Grab:Spider 模块可以解决 99% 的问题。 http://docs.grablib.org/en/latest/index.html#spider-toc

于 2015-11-11T05:45:38.867 回答