0

我构造了一个简单的示例脚本,它定义了multiprocessing在 python 中使用的三个单独的进程。我的目标是让一个父线程产生两个较小的线程来收集和处理数据。

目前,我的实现如下所示:

from Queue import Queue,Empty
from multiprocessing import Process
import time
import hashlib


class FillQueue(Process):
    def __init__(self,q):
        Process.__init__(self)
        self.q = q

    def run(self):
        i = 0
        while i is not 5:
            print 'putting'
            self.q.put('foo')
            i+=1
        self.q.put('|STOP|')

class ConsumeQueue(Process):
    def __init__(self,q):
        Process.__init__(self)
        self.q = q

    def run(self):
        print 'Consume'
        while True:
            try:
                value =  self.q.get(False)
                print value
                if value == '|STOP|':
                    print 'done'
                    break;
            except Empty:
                print 'Nothing to process atm'

class Ripper(Process):

    q = Queue()

    def __init__(self):
        self.fq = FillQueue(self.q)
        self.cq = ConsumeQueue(self.q)
        self.fq.daemon = True
        self.cq.daemon = True

    def run(self):
        try:
            self.fq.start()
            self.cq.start()
        except KeyboardInterrupt:
            print 'exit'

if __name__ == '__main__':
    r = Ripper()
    r.start()

当前运行时,CLI 上脚本的输出如下所示:

putting
putting
putting
putting
putting
Consume
foo
foo
foo
foo
foo
|STOP|
done

显然,我开始我的两个线程的方式是阻塞的,因为在填充程序完成添加项目之前,消费者甚至不会开始处理队列中的项目。

我应该如何重写它以使两个线程立即开始而不是阻塞,因此消费者将在Empty没有工作要处理的情况下简单地传递到 except 块,但在收到停止消息时会完全退出?

编辑:错字,startrun方法混淆了

4

3 回答 3

4

您似乎正在使用 multiprocessing.Process 启动多个进程。

但是,您使用的是 Queue.Queue,它只是线程安全的,而不是为多个进程使用而设计的。

shevek 的回答也是有效的,但首先,您应该将 Queue.Queue 替换为 multiprocessing.Queue。

于 2012-10-02T19:57:24.340 回答
3

我认为您的程序运行良好。CPU 一次只处理一件事,时间很短。但是,将所有东西放入队列所需的时间非常短。因此,没有理由填充器不能在一个时间片内完成此操作。

如果您在填充程序中添加一些延迟,我认为您应该会看到它实际上按预期工作。

于 2012-10-02T19:40:47.920 回答
3

试试这个:

from Queue import Empty
from multiprocessing import Process, Queue
import time
import hashlib


class FillQueue(object):
    def __init__(self, q): 
        self.q = q 

    def run(self):
        i = 0 
        while i < 5:
            print 'putting'
            self.q.put('foo %d' % i ) 
            i+=1
            time.sleep(.5)
        self.q.put('|STOP|')

class ConsumeQueue(object):
    def __init__(self, q): 
        self.q = q 

    def run(self):
        while True:
            try:
                value =  self.q.get(False)
                print value
                if value == '|STOP|':
                    print 'done'
                    break;
            except Empty:
                print 'Nothing to process atm'
                time.sleep(.2)


if __name__ == '__main__':
    q = Queue()
    f = FillQueue(q)
    c = ConsumeQueue(q)

    p1 = Process(target=f.run)
    p1.start()

    p2 = Process(target=c.run)
    p2.start()

    p1.join()
    p2.join()
于 2012-10-02T20:01:51.203 回答