6

几天前,我问了一个关于 SO 的问题,关于帮助我设计一个用于构建多个 HTTP 请求的范例

这是场景。我想要一个多生产者、多消费者的系统。我的生产者抓取并抓取了一些网站,并将找到的链接添加到队列中。由于我将爬取多个站点,因此我希望有多个生产者/爬虫。

消费者/工作人员从这个队列中获取信息,向这些链接发出 TCP/UDP 请求,并将结果保存到我的 Django DB 中。我还希望有多个工作人员,因为每个队列项目彼此完全独立。

人们建议为此使用协程库,即 Gevent 或 Eventlet。从未使用过协程,我读到尽管编程范式类似于线程范式,但只有一个线程正在积极执行,但是当阻塞调用发生时 - 例如 I/O 调用 - 堆栈在内存中切换,另一个绿色线程接管,直到遇到某种阻塞 I/O 调用。希望我做对了吗?这是我的一篇 SO 帖子中的代码:

import gevent
from gevent.queue import *
import time
import random

q = JoinableQueue()
workers = []
producers = []


def do_work(wid, value):
    gevent.sleep(random.randint(0,2))
    print 'Task', value, 'done', wid


def worker(wid):
    while True:
        item = q.get()
        try:
            print "Got item %s" % item
            do_work(wid, item)
        finally:
            print "No more items"
            q.task_done()


def producer():
    while True:
        item = random.randint(1, 11)
        if item == 10:
            print "Signal Received"
            return
        else:
            print "Added item %s" % item
            q.put(item)


for i in range(4):
    workers.append(gevent.spawn(worker, random.randint(1, 100000)))

# This doesn't work.
for j in range(2):
    producers.append(gevent.spawn(producer))

# Uncommenting this makes this script work.
# producer()

q.join()

这很有效,因为sleep调用是阻塞调用,当sleep事件发生时,另一个绿色线程接管。这比顺序执行快得多。如您所见,我的程序中没有任何代码故意将一个线程的执行交给另一个线程。我看不出这如何适合上面的场景,因为我希望所有线程同时执行。

一切正常,但我觉得我使用 Gevent/Eventlets 实现的吞吐量高于原始的顺序运行程序,但远低于使用真实线程可以实现的吞吐量。

如果我要使用线程机制重新实现我的程序,我的每个生产者和消费者都可以同时工作,而无需像协程那样交换堆栈。

是否应该使用线程重新实现?我的设计错了吗?我没有看到使用协程的真正好处。

也许我的概念有点混乱,但这就是我所吸收的。对我的范式和概念的任何帮助或澄清都会很棒。

谢谢

4

3 回答 3

5

如您所见,我的程序中没有任何代码故意将一个线程的执行交给另一个线程。我看不出这如何适合上面的场景,因为我希望所有线程同时执行。

有一个 OS 线程,但有几个 greenlet。在您的情况下gevent.sleep(),允许工作人员同时执行。如果您使用patched 来处理(通过调用) ,阻塞 IO 调用也urllib2.urlopen(url).read()会这样做。urllib2geventgevent.monkey.patch_*()

另请参阅A Curious Course on Coroutines and Concurrency以了解代码如何在单线程环境中并发工作。

要比较 gevent、线程、多处理之间的吞吐量差异,您可以编写与所有方法兼容的代码:

#!/usr/bin/env python
concurrency_impl = 'gevent' # single process, single thread
##concurrency_impl = 'threading' # single process, multiple threads
##concurrency_impl = 'multiprocessing' # multiple processes

if concurrency_impl == 'gevent':
    import gevent.monkey; gevent.monkey.patch_all()

import logging
import time
import random
from itertools import count, islice

info = logging.info

if concurrency_impl in ['gevent', 'threading']:
    from Queue import Queue as JoinableQueue
    from threading import Thread
if concurrency_impl == 'multiprocessing':
    from multiprocessing import Process as Thread, JoinableQueue

对于所有并发实现,脚本的其余部分都是相同的:

def do_work(wid, value):
    time.sleep(random.randint(0,2))
    info("%d Task %s done" % (wid, value))

def worker(wid, q):
    while True:
        item = q.get()
        try:
            info("%d Got item %s" % (wid, item))
            do_work(wid, item)
        finally:
            q.task_done()
            info("%d Done item %s" % (wid, item))

def producer(pid, q):
    for item in iter(lambda: random.randint(1, 11), 10):
        time.sleep(.1) # simulate a green blocking call that yields control
        info("%d Added item %s" % (pid, item))
        q.put(item)
    info("%d Signal Received" % (pid,))

不要在模块级别执行代码将其放入main()

def main():
    logging.basicConfig(level=logging.INFO,
                        format="%(asctime)s %(process)d %(message)s")

    q = JoinableQueue()
    it = count(1)
    producers = [Thread(target=producer, args=(i, q)) for i in islice(it, 2)]
    workers = [Thread(target=worker, args=(i, q)) for i in islice(it, 4)]
    for t in producers+workers:
        t.daemon = True
        t.start()

    for t in producers: t.join() # put items in the queue
    q.join() # wait while it is empty
    # exit main thread (daemon workers die at this point)

if __name__=="__main__":    
   main()
于 2012-02-12T19:37:12.203 回答
1

当您有很多(绿色)线程时,gevent 很棒。我用数千人对其进行了测试,效果非常好。您已确保用于抓取和保存到数据库的所有库都变为绿色。afaik 如果他们使用 python 的套接字,gevent 注入应该可以工作。但是,用 C 语言编写的扩展(例如 mysqldb)会阻塞,您需要使用绿色等效项。

如果你使用 gevent,你基本上可以取消队列,为每个任务生成新的(绿色)线程,线程的代码就像db.save(web.get(address)). 当 db 或 web 中的某些库阻塞时,gevent 将负责抢占。只要您的任务适合记忆,它就会起作用。

于 2012-02-12T16:17:09.130 回答
0

在这种情况下,您的问题不在于程序速度(即选择 gevent 或线程),而在于网络 IO 吞吐量。那是(应该是)决定程序运行速度的瓶颈。

Gevent 是一种很好的方法来确保这瓶颈,而不是你的程序架构。

这是您想要的那种过程:

import gevent
from gevent.queue import Queue, JoinableQueue
from gevent.monkey import patch_all


patch_all()  # Patch urllib2, etc


def worker(work_queue, output_queue):
    for work_unit in work_queue:
        finished = do_work(work_unit)
        output_queue.put(finished)
        work_queue.task_done()


def producer(input_queue, work_queue):
    for url in input_queue:
        url_list = crawl(url)
        for work in url_list:
            work_queue.put(work)
        input_queue.task_done()


def do_work(work):
    gevent.sleep(0)  # Actually proces link here
    return work


def crawl(url):
    gevent.sleep(0)
    return list(url)  # Actually process url here

input = JoinableQueue()
work = JoinableQueue()
output = Queue()

workers = [gevent.spawn(worker, work, output) for i in range(0, 10)]
producers = [gevent.spawn(producer, input, work) for i in range(0, 10)]


list_of_urls = ['foo', 'bar']

for url in list_of_urls:
    input.put(url)

# Wait for input to finish processing
input.join()
print 'finished producing'
# Wait for workers to finish processing work
work.join()
print 'finished working'

# We now have output!
print 'output:'
for message in output:
    print message
# Or if you'd like, you could use the output as it comes!

您无需等待输入和工作队列完成,我刚刚在这里演示了这一点。

于 2012-02-12T19:22:26.153 回答