这是一个例子。我有一个生产者和几个消费者。
#!/usr/bin/env python2
from multiprocessing import Process, Queue
import time
def counter(low, high):
current = low
while current <= high:
yield current
current += 1
def put_tasks(q):
for c in counter(0, 9):
q.put(c)
time.sleep(.1)
print('put_tasks: no more tasks')
def work(id, q):
while True:
task = q.get()
print('process %d: %s' % (id, task))
time.sleep(.3)
print('process %d: done' % id)
if __name__ == '__main__':
q = Queue(2)
task_gen = Process(target=put_tasks, args=(q,))
processes = [Process(target=work, args=(id, q)) for id in range(0, 3)]
task_gen.start()
for p in processes:
p.start()
for p in processes:
p.join()
counter
只是一个数字生成器put_tasks
。通常,我会有数千个任务,而不是像本例中那样只有 10 个。这段代码的重点是逐步向队列提供任务。
问题是消费者无法事先知道他们将要处理多少任务,但该put_tasks
函数确实知道它何时完成(然后打印no more tasks
)。
样本输出:
process 2: 0
process 0: 1
process 1: 2
process 2: 3
process 0: 4
process 1: 5
process 2: 6
process 0: 7
process 1: 8
process 2: 9
put_tasks: no more tasks
所有任务都得到处理,但程序随后挂起(每个进程都卡在q.get()
.
有任何想法吗?