4

我需要发出 100k 头请求,并且我在请求之上使用 gevent。我的代码运行了一段时间,但最终挂起。我不确定它为什么挂起,或者它是否挂在请求或 gevent 中。我在请求和 gevent 中都使用了 timeout 参数。

请看看我下面的代码片段,让我知道我应该改变什么。

import gevent
from gevent import monkey, pool
monkey.patch_all()
import requests

def get_head(url, timeout=3):
    try:
        return requests.head(url, allow_redirects=True, timeout=timeout)
    except:
        return None

def expand_short_urls(short_urls, chunk_size=100, timeout=60*5):
    chunk_list = lambda l, n: ( l[i:i+n] for i in range(0, len(l), n) )
    p = pool.Pool(chunk_size)
    print 'Expanding %d short_urls' % len(short_urls)
    results = {}
    for i, _short_urls_chunked in enumerate(chunk_list(short_urls, chunk_size)):
        print '\t%d. processing %d urls @ %s' % (i, chunk_size, str(datetime.datetime.now()))
        jobs = [p.spawn(get_head, _short_url) for _short_url in _short_urls_chunked]
        gevent.joinall(jobs, timeout=timeout)
        results.update({_short_url:job.get().url for _short_url, job in zip(_short_urls_chunked, jobs) if job.get() is not None and job.get().status_code==200})
    return results 

我已经尝试过 grequests,但它已被放弃,并且我已经完成了 github 拉取请求,但它们也都有问题。

4

2 回答 2

9

您观察到的 RAM 使用主要源于存储 100.000 个响应对象时堆积的所有数据以及所有底层开销。我已经复制了您的应用案例,并针对 Alexa 排名靠前的 15000 个 URL 发出了 HEAD 请求。没关系

  • 我是使用 gevent 池(即每个连接一个 greenlet)还是一组固定的 greenlet,都请求多个 URL
  • 我设置的池大小有多大

最后,RAM 使用量随着时间的推移而增长,达到相当大的数量。但是,我注意到从更改requestsurllib2已经导致 RAM 使用量减少了大约两倍。也就是说,我换了

result = requests.head(url)

request = urllib2.Request(url)
request.get_method = lambda : 'HEAD'
result = urllib2.urlopen(request)

其他一些建议:不要使用两种超时机制。Gevent 的超时方法非常可靠,您可以像这样轻松使用它:

def gethead(url):
    result = None
    try:
        with Timeout(5, False):
            result = requests.head(url)
    except Exception as e:
        result = e
    return result

可能看起来很棘手,但要么返回None(在相当精确的 5 秒后,并指示超时)、表示通信错误的任何异常对象,要么返回响应。效果很好!

虽然这可能不是问题的一部分,但在这种情况下,我建议让工人活着,让他们每个人处理多个项目!确实,产卵小绿叶的开销很小。尽管如此,这将是一个非常简单的解决方案,其中包含一组长寿的 greenlets:

def qworker(qin, qout):
    while True:
        try:
            qout.put(gethead(qin.get(block=False)))
        except Empty:
            break

qin = Queue()
qout = Queue()

for url in urls:
    qin.put(url)

workers = [spawn(qworker, qin, qout) for i in xrange(POOLSIZE)]
joinall(workers)
returnvalues = [qout.get() for _ in xrange(len(urls))]

此外,您确实需要意识到这是您正在解决的大规模问题,会产生非标准问题。当我以 20 秒的超时和 100 个工作人员和 15000 个要请求的 URL 重现您的场景时,我很容易得到大量的套接字:

# netstat -tpn | wc -l
10074

也就是说,操作系统有超过 10000 个套接字要管理,其中大部分处于 TIME_WAIT 状态。我还观察到“打开的文件太多”错误,并通过 sysctl 调整了限制。当您请求 100.000 个 URL 时,您也可能会达到这样的限制,您需要采取措施来防止系统饥饿。

还要注意您使用请求的方式,它会自动遵循从 HTTP 到 HTTPS 的重定向,并自动验证证书,所有这些肯定会消耗 RAM。

在我的测量中,当我将请求的 URL 数量除以程序的运行时间时,我几乎从未超过 100 响应/秒,这是与世界各地的外国服务器的高延迟连接的结果。我想你也受到这样的限制。将架构的其余部分调整到此限制,您可能能够生成从 Internet 到磁盘(或数据库)的数据流,而两者之间的 RAM 使用量不会太大。

我应该解决你的两个主要问题,具体来说:

我认为 gevent/你使用它的方式不是你的问题。我认为您只是低估了任务的复杂性。它伴随着令人讨厌的问题,并将您的系统推向极限。

  • 您的 RAM 使用问题:urllib2如果可以,请使用 开始。然后,如果事情积累得太高,你需要反对积累。尝试产生一个稳定的状态:您可能希望开始将数据写入磁盘,并且通常针对对象可能被垃圾收集的情况工作。

  • 您的代码“最终挂起”:这可能是您的 RAM 问题。如果不是,则不要生成这么多的greenlets,而是按照指示重复使用它们。此外,进一步降低并发性,监控打开的套接字数量,必要时增加系统限制,并尝试找出您的软件挂起的确切位置。

于 2015-02-07T02:10:18.247 回答
2

我不确定这是否能解决您的问题,但您没有正确使用 pool.Pool() 。

尝试这个:

def expand_short_urls(short_urls, chunk_size=100):
    # Pool() automatically limits your process to chunk_size greenlets running concurrently
    # thus you don't need to do all that chunking business you were doing in your for loop
    p = pool.Pool(chunk_size)
    print 'Expanding %d short_urls' % len(short_urls)

    # spawn() (both gevent.spawn() and Pool.spawn()) returns a gevent.Greenlet object
    # NOT the value your function, get_head, will return
    threads = [p.spawn(get_head, short_url) for short_url in short_urls]
    p.join()

    # to access the returned value of your function, access the Greenlet.value property
    results = {short_url: thread.value.url for short_url, thread in zip(short_urls, threads) 

如果 thread.value 不是 None 并且 thread.value.status_code == 200} 返回结果

于 2015-02-05T23:16:58.123 回答