0

看,人们。我们有一个关于 gevent.pool 类和 pool.wait_available() 方法的问题,两个代码片段

1.

def fetch(url):
    print 'start fetching...', url
    data = urllib2.urlopen(url)
    print url,':',data.code

urls = ['http://www.google.ru', 'http://www.s-str.ru', 'http://www.vk.com', 'http://www.yandex.ru', 'http://www.xxx.com']

pool = Pool(2)

def producer():
    for url in urls:
        pool.spawn(fetch, url)
    pool.join()

p = gevent.spawn(producer)
p.join()

2.

def fetch(url):
    print 'start fetching...', url
    data = urllib2.urlopen(url)
    print url,':',data.code

urls = ['http://www.google.ru', 'http://www.s-str.ru', 'http://www.vk.com', 'http://www.yandex.ru', 'http://www.xxx.com']

pool = Pool(2)

def producer():
    for url in urls:
        pool.wait_available()
        pool.spawn(fetch, url)
    pool.join()

p = gevent.spawn(producer)
p.join()

给我们类似的结果:

start fetching... http://www.google.ru
start fetching... http://www.s-str.ru
http://www.google.ru : 200
start fetching... http://www.vk.com
http://www.s-str.ru : 200
start fetching... http://www.yandex.ru
http://www.yandex.ru : 200
start fetching... http://www.xxx.com
http://www.vk.com : 200
http://www.xxx.com : 200

谁能解释 wait_available() 方法的含义?以及可能的使用情况。

=======update======== 我已经对它进行了修改,它可以正常工作,我只想知道这两个代码片段之间的区别。

4

2 回答 2

2

TL;DRwait_available如果您使用 spawn 则没有必要,因为在两种方法中都运行相同的检查。但是,如果您正在使用apply_async并且不想提交超过池上限的线程,那么您应该wait_available首先调用。


对于可能稍微更清楚的解释.. 有几种方法可以用 gevent 的Pool类来实现同样的事情。在池上使用spawn将阻塞,直到有空间可Pool用于运行新的 greenlet。这是一个简单的例子:

import gevent.monkey
gevent.monkey.patch_all()
import gevent.pool
import time

def my_slow_function():
    time.sleep(5)

def log(text):
    print '%d : %s' % (int(time.time()), text)

if __name__ == '__main__':
    thread_pool = gevent.pool.Pool(5)
    for i in xrange(20):
        log('Submitting slow func %d' % i)
        thread_pool.spawn(my_slow_function)
    thread_pool.join()
    log('Exiting')

其输出表明它将以 5 个为一组生成这些,因为池中包含 5 个 greenlets 的空间:

1403037287:提交慢FUNC 0
1403037287:提交慢FUNC 1
1403037287:提交慢FUNC 2
1403037287:提交慢FUNC 3
1403037287:提交慢FUNC 4
1403037292:提交慢FUNC 5
1403037292:提交慢FUNC 6
1403037292:提交慢FUNC 7
1403037292:提交慢FUNC 8
1403037292:提交慢FUNC 9
1403037297:提交慢FUNC 10
1403037297:提交慢FUNC 11
1403037297:提交慢FUNC 12
1403037297:提交慢FUNC 13
1403037297:提交慢FUNC 14
1403037302:提交慢FUNC 15
1403037302:提交慢功能 16
1403037302:提交慢功能 17
1403037302:提交慢功能 18
1403037302:提交慢功能 19
1403037307:退出

如您所见,它们以 5 组的形式生成,间隔大约 5 秒。如果您深入研究 gevent 代码并查看 Pool 对象,您会看到调用spawn将要求锁定Pools 内部信号量,该信号量用于跟踪正在运行的 greenlets。

apply_async相反,如果您使用而不是尝试相同的代码spawn,它将强制所有调用同时运行:

1403037313:提交慢FUNC 0
1403037313:提交慢FUNC 1
1403037313:提交慢FUNC 2
1403037313:提交慢FUNC 3
1403037313:提交慢FUNC 4
1403037313:提交慢FUNC 5
1403037313:提交慢FUNC 6
1403037313:提交慢FUNC 7
1403037313:提交缓慢的功能8
1403037313:提交缓慢的功能9
1403037313:提交慢速功能10
1403037313:提交慢速功能11
1403037313:提交慢速Func 12
1403037313:提交慢速Func 13
1403037313:提交慢速
速合
1403037313:提交慢功能 16
1403037313:提交慢功能 17
1403037313:提交慢功能 18
1403037313:提交慢功能 19
1403037318:退出

您可以在这里看到没有阻塞或等待,它们都是同时被推入的。但是,如果你在wait_available()for 循环的开头加入 a,你会回到与spawn.

1403038292:提交慢FUNC 0
1403038292:提交慢FUNC 1
1403038292:提交慢FUNC 2
1403038292:提交慢FUNC 3
1403038292:提交慢FUNC 4
1403038297:提交慢FUNC 5
1403038297:提交慢FUNC 6
1403038297:提交慢FUNC 7
1403038297:提交慢FUNC 8
1403038297:提交慢FUNC 9
1403038302:提交慢FUNC 10
1403038302:提交慢FUNC 11
1403038302:提交慢FUNC 12
1403038302:提交慢FUNC 13
1403038302:提交慢FUNC 14
1403038307:提交慢FUNC 15
1403038307:提交慢功能 16
1403038307:提交慢功能 17
1403038307:提交慢功能 18
1403038307:提交慢功能 19
1403038312:退出

再一次,查看 gevent 中的源代码,wait_available执行与调用结果相同的检查,spawn即检查信号量以查看池中是否真的有空间。

于 2014-06-17T21:00:42.100 回答
0

gevent您合作之前需要修补标准模块。

>>> import gevent.monkey
>>> gevent.monkey.patch_all()
>>> ...
>>> p = gevent.spawn(producer)
>>> p.join()
start fetching... http://www.google.ru
start fetching... http://www.s-str.ru
http://www.google.ru : 200
start fetching... http://www.vk.com
http://www.vk.com : 200
start fetching... http://www.yandex.ru
http://www.yandex.ru : 200
start fetching... http://www.xxx.com
http://www.xxx.com : 200
http://www.s-str.ru : 200

你可以看到,这是pool.wait_available()可以预测的。

更新

Pool仅对spawn功能以相同的方式工作(它将等待池中可用的“插槽”)。如果您需要提供基于Pool状态的其他功能(日志记录、跟踪、监控) - 您肯定会使用 等功能wait_availablefree_count如果您只需要spawn新的绿色线程 - 您可以依赖Pool实现。

于 2012-12-17T09:47:18.690 回答