1

我对 python 多线程队列有问题。我有这个脚本,其中生产者从输入队列中获取元素,生成一些元素并将它们放入输出队列,消费者从输出队列中获取元素并打印它们:

import threading
import Queue

class Producer(threading.Thread):
    def __init__(self, iq, oq):
        threading.Thread.__init__(self)
        self.iq = iq
        self.oq = oq

    def produce(self, e):
        self.oq.put(e*2)
        self.oq.task_done()
        print "Producer %s produced %d and put it to output Queue"%(self.getName(), e*2)

    def run(self):
        while 1:
            e = self.iq.get()
            self.iq.task_done()
            print "Get %d from input Queue"%(e)
            self.produce(e)


class Consumer(threading.Thread):
    def __init__(self, oq):
        threading.Thread.__init__(self)
        self.oq = oq

    def run(self):
        while 1:
            e = self.oq.get()
            self.oq.task_done()
            print "Consumer get %d from output queue and consumed"%e

iq = Queue.Queue()
oq = Queue.Queue()

for i in xrange(2):
    iq.put((i+1)*10)

for i in xrange(2):
    t1 = Producer(iq, oq)
    t1.setDaemon(True)
    t1.start()

    t2 = Consumer(oq)
    t2.setDaemon(True)
    t2.start()

iq.join()
oq.join()

但是,每次我运行它时,它的工作方式都不同(给出异常,或者消费者不做任何工作)。我认为问题出在 task_done() 命令中,谁能解释我的错误在哪里?

我修改了消费者类:

class Consumer(threading.Thread):
    def __init__(self, oq):
        threading.Thread.__init__(self)
        self.oq = oq

    def run(self):
        while 1:
            e = self.oq.get()
            self.oq.task_done()
            print "Consumer get %d from output queue and consumed"%e
            page = urllib2.urlopen("http://www.ifconfig.me/ip")
            print page

现在每个 task_done() 命令之后的消费者应该连接到网站(这需要一些时间),但它没有,相反,如果 task_done() 之后的代码执行时间很短,它会运行,但如果它很长,它不会运行!为什么?谁能解释我这个问题?如果我将所有内容都放在 task_done() 命令之前,那么我将阻止来自其他线程的队列,这很愚蠢。或者我对python中的多线程有什么遗漏吗?

4

1 回答 1

3

Queue 文档

Queue.task_done() 表示之前入队的任务已经完成。由队列消费者线程使用。对于用于获取任务的每个 get(),对 task_done() 的后续调用会告诉队列该任务的处理已完成。

如果 join() 当前处于阻塞状态,它将在处理完所有项目后恢复(这意味着对于已将 put() 放入队列的每个项目都收到了 task_done() 调用)

例如,在您的代码中,您在Producer课堂上执行以下操作:

def produce(self, e):
    self.oq.put(e*2)
    self.oq.task_done()
    print "Producer %s produced %d and put it to output Queue"%(self.getName(), e*2)

你不应该在self.oq.task_done()这里做,因为你没有使用oq.get().

我不确定这是唯一的问题。

编辑:

对于您的另一个问题,您正在使用iq.join()andoq.join()最后,这会导致您的主线程在其他线程打印检索到的页面之前退出,并且由于您将线程创建为Daemons,因此您的 Python 应用程序退出而不等待它们完成执行。(记住Queue.join()取决于Queue.task_done()

现在您说“如果我将所有内容都放在 task_done() 命令之前,那么我将阻止其他线程的队列”。我看不出你的意思,这只会阻塞你的Consumer线程,但你总是可以创建更多Consumer不会被彼此阻塞的线程。

于 2012-06-24T16:47:19.913 回答