3

这是一个例子。我有一个生产者和几个消费者。

#!/usr/bin/env python2

from multiprocessing import Process, Queue
import time

def counter(low, high):
    current = low 
    while current <= high:
        yield current
        current += 1

def put_tasks(q):
    for c in counter(0, 9):
        q.put(c)
        time.sleep(.1)
    print('put_tasks: no more tasks') 

def work(id, q): 
    while True:
        task = q.get()
        print('process %d: %s' % (id, task))
        time.sleep(.3)
    print('process %d: done' % id) 

if __name__ == '__main__':
    q = Queue(2)
    task_gen = Process(target=put_tasks, args=(q,))
    processes = [Process(target=work, args=(id, q)) for id in range(0, 3)] 

    task_gen.start()
    for p in processes:
        p.start()
    for p in processes:
        p.join()

counter只是一个数字生成器put_tasks。通常,我会有数千个任务,而不是像本例中那样只有 10 个。这段代码的重点是逐步向队列提供任务。

问题是消费者无法事先知道他们将要处理多少任务,但该put_tasks函数确实知道它何时完成(然后打印no more tasks)。

样本输出:

process 2: 0
process 0: 1
process 1: 2
process 2: 3
process 0: 4
process 1: 5
process 2: 6
process 0: 7
process 1: 8
process 2: 9
put_tasks: no more tasks

所有任务都得到处理,但程序随后挂起(每个进程都卡在q.get().

有任何想法吗?

4

3 回答 3

4

最简单的方法是向队列中添加一些告诉消费者所有工作都已完成的内容。

number_of_consumers = 3

def put_tasks(q):
    for c in counter(0, 9):
        q.put(c)
        time.sleep(.1)
    print('put_tasks: no more tasks')
    for i in range(number_of_consumers):
        q.put(None)

def work(id, q): 
    while True:
        task = q.get()
        if task is None:
            break
        print('process %d: %s' % (id, task))
        time.sleep(.3)
    print('process %d: done' % id) 
于 2013-09-22T13:20:14.367 回答
4

我建议放一个哨兵值放在队列的末尾

def put_tasks(q):
    ...

    print('put_tasks: no more tasks')
    q.put(end_of_queue)

def work(id, q):
    while True:
        task = q.get()

        if task == end_of_queue:
            q.put(task)
            print("DONE")
            return

        print('process %d: %s' % (id, task))
        time.sleep(.1)
    print('process %d: done' % id)

class Sentinel:
    def __init__(self, id):
        self.id = id

    def __eq__(self, other):
        if isinstance(other, Sentinel):
            return self.id == other.id

        return NotImplemented

if __name__ == '__main__':
    q = Queue(2)
    end_of_queue = Sentinel("end of queue")
    task_gen = Process(target=put_tasks, args=(q,))
    processes = [Process(target=work, args=(id, q)) for id in range(0, 3)]
    ...

我似乎无法object()用作哨兵,因为线程似乎可以访问不同的实例,因此它们比较不相等。

如果您希望生成随机哨兵,可以使用该uuid模块生成随机 id:

import uuid

class Sentinel:
    def __init__(self):
        self.id = uuid.uuid4()

    def __eq__(self, other):
        if isinstance(other, Sentinel):
            return self.id == other.id

        return NotImplemented

最后,zch 用于None一个哨兵,只要队列不能进入就足够None了。哨兵方法将适用于大多数任意参数。

于 2013-09-22T13:20:33.130 回答
-2

我最近在 Python 文档中查看了相同的问题并找到了上述问题的替代答案

看起来“正确”的方法是使用该Queue.task_done()方法,即:

def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

q = Queue()
for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done
于 2014-01-31T10:17:16.690 回答