5

我有一些依赖 I/O 重阻塞调用的生产者函数和一些也依赖 I/O 重阻塞调用的消费者函数。为了加快速度,我使用了 Gevent 微线程库作为粘合剂。

这是我的范例:

import gevent
from gevent.queue import *
import time
import random

q = JoinableQueue()
workers = []
producers = []

def do_work(wid, value):
    gevent.sleep(random.randint(0,2))
    print 'Task', value, 'done', wid

def worker(wid):
    while True:
        item = q.get()
        try:
            print "Got item %s" % item
            do_work(wid, item)
        finally:
            print "No more items"
            q.task_done()


def producer():
    while True:
        item = random.randint(1, 11)
        if item == 10:
            print "Signal Received"
            return
        else:
            print "Added item %s" % item
            q.put(item)



for i in range(4):
    workers.append(gevent.spawn(worker, random.randint(1, 100000)))

#This doesnt work.
for j in range(2):
    producers.append(gevent.spawn(producer))

#Uncommenting this makes this script work.
#producer()

q.join()

我有四个消费者,希望有两个生产者。生产者在发出信号时退出,即 10。消费者继续从这个队列中获取食物,当生产者和消费者结束时,整个任务完成。

但是,这不起作用。如果我注释掉for产生多个生产者的循环并仅使用一个生产者,则脚本运行良好。

我似乎无法弄清楚我做错了什么。

有任何想法吗?

谢谢

4

4 回答 4

6

You don't actually want to quit when the queue has no unfinished work, because conceptually that's not when the application should finish.

You want to quit when the producers have finished, and then when there is no unfinished work.

# Wait for all producers to finish producing
gevent.joinall(producers)
# *Now* we want to make sure there's no unfinished work
q.join()
# We don't care about workers. We weren't paying them anything, anyways
gevent.killall(workers)
# And, we're done.
于 2012-02-09T14:25:12.437 回答
3

我认为它会q.join()在任何东西被放入队列并立即退出之前完成。在加入队列之前尝试加入所有生产者。

于 2012-02-09T12:30:16.573 回答
0

您要做的是在生产者和工人进行通信时阻止主程序。阻塞队列将等到队列为空,然后屈服,这可能是立即的。把它放在程序的末尾而不是q.join()

gevent.joinall(producers)
于 2012-02-16T12:03:14.867 回答
0

我遇到了和你一样的问题。您的代码的主要问题是您的生产者已经在 gevent 线程中产生,这使得工作人员无法立即获得任务。

我建议您应该producer()在主进程中运行,而不是在 gevent 线程中产生当进程运行遇到可以立即推送任务的生产者时。

import gevent
from gevent.queue import *
import time
import random

q = JoinableQueue()
workers = []
producers = []

def do_work(wid, value):
    gevent.sleep(random.randint(0,2))
    print 'Task', value, 'done', wid

def worker(wid):
    while True:
        item = q.get()
        try:
            print "Got item %s" % item
            do_work(wid, item)
        finally:
            print "No more items"
            q.task_done()


def producer():
    while True:
        item = random.randint(1, 11)
        if item == 10:
            print "Signal Received"
            return
        else:
            print "Added item %s" % item
            q.put(item)


producer()

for i in range(4):
    workers.append(gevent.spawn(worker, random.randint(1, 100000)))

上面的代码很有意义.. :)

于 2015-12-08T13:55:54.807 回答