2

我正在尝试使用流程为生产者消费者编写简单的代码。生产者是一个过程。对于消费者,我正在从池中获取进程。

from multiprocessing import Manager, Process, Pool
from time import sleep
def writer(queue):
   for i in range(10):
     queue.put(i)
     print 'put 1 size now ',queue.qsize()
     sleep(1)

def reader(queue):
   print 'in reader'
   for i in range(10):
     queue.get(1)
     print 'got 1 size now ', queue.qsize()

if __name__ == '__main__':
    q = Manager().Queue()
    p = Process(target=writer, args=(q,))
    p.start()
    pool = Pool()
    c = pool.apply_async(reader,q)

但我得到错误,

Process Process-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "pc.py", line 5, in writer
    queue.put(i)
  File "<string>", line 2, in put
  File "/usr/lib/python2.7/multiprocessing/managers.py", line 758, in _callmethod
    conn.send((self._id, methodname, args, kwds))
IOError: [Errno 32] Broken pipe

谁能指出我,我哪里错了。

4

1 回答 1

6

嗨,我找到了这个答案,

from multiprocessing import Manager, Process, Pool,Queue
from Queue import Empty

def writer(queue):
   for i in range(10):
     queue.put(i)
     print 'put %i size now %i'%(i, queue.qsize())

def reader(id, queue):
   for i in range(10):
     try:
       cnt = queue.get(1,1)
       print '%i got %i size now %i'%(id, cnt, queue.qsize())
     except Empty:
       pass

class Managerss:
   def __init__(self):
     self.queue= Queue()
     self.NUMBER_OF_PROCESSES = 3

   def start(self):
     self.p = Process(target=writer, args=(self.queue,))
     self.p.start()
     self.workers = [Process(target=reader, args=(i, self.queue,))
                        for i in xrange(self.NUMBER_OF_PROCESSES)]
     for w in self.workers:
       w.start()

   def join(self):
     self.p.join()
     for w in self.workers:
       w.join()

if __name__ == '__main__':
    m= Managerss()
    m.start()
    m.join()

希望能帮助到你

于 2013-04-18T04:52:02.173 回答