10

我需要知道队列何时关闭并且不会有更多项目,以便我可以结束迭代。

我通过在队列中放置一个哨兵来做到这一点:

from Queue import Queue

class IterableQueue(Queue): 

    _sentinel = object()

    def __iter__(self):
        return self

    def close(self):
        self.put(self._sentinel)

    def next(self):
        item = self.get()
        if item is self._sentinel:
            raise StopIteration
        else:
            return item

鉴于这是队列的一种非常常见的用途,难道没有任何内置实现吗?

4

3 回答 3

14

哨兵是生产者发送没有更多队列任务即将到来的消息的合理方式。

FWIW,您的代码可以使用iter()的两个参数形式进行相当多的简化:

from Queue import Queue

class IterableQueue(Queue): 

    _sentinel = object()

    def __iter__(self):
        return iter(self.get, self._sentinel)

    def close(self):
        self.put(self._sentinel)
于 2012-07-02T06:08:34.047 回答
4

多处理模块有自己的Queue版本,其中包含一个close方法。我不确定它在线程中是如何工作的,但值得一试。我不明白为什么它不应该一样工作:

from multiprocessing import Queue

q = Queue()
q.put(1)
q.get_nowait()
# 1
q.close()
q.get_nowait()
# ...
# IOError: handle out of range in select()

您可以将 IOError 作为关闭信号。

测试

from multiprocessing import Queue
from threading import Thread

def worker(q):
    while True:
        try:
            item = q.get(timeout=.5)
        except IOError:
            print "Queue closed. Exiting thread."
            return
        except:
            continue
        print "Got item:", item

q = Queue()
for i in xrange(3):
    q.put(i)
t = Thread(target=worker, args=(q,))
t.start()
# Got item: 0
# Got item: 1
# Got item: 2
q.close()
# Queue closed. Exiting thread.

虽然老实说,它与在 Queue.Queue 上设置标志并没有太大区别。multiprocessing.Queue 只是使用关闭的文件描述符作为标志:

from Queue import Queue

def worker2(q):
    while True:
        if q.closed:
            print "Queue closed. Exiting thread."
            return
        try:
            item = q.get(timeout=.5)
        except:
            continue
        print "Got item:", item

q = Queue()
q.closed = False
for i in xrange(3):
    q.put(i)
t = Thread(target=worker2, args=(q,))
t.start()
# Got item: 0
# Got item: 1
# Got item: 2
q.closed = True
# Queue closed. Exiting thread.
于 2012-07-02T05:41:46.893 回答
0

一个古老的问题,以及变体self._sentinel = Object()将起作用。在 2021 年重新审视这一点,我建议将concurrent.futures与 usingNone作为您的哨兵结合使用:

# Note: this is Python 3.8+ code                                                                                                                                                   

import queue
import time
import functools
import random
from concurrent.futures import ThreadPoolExecutor

def worker(tup):
    (q,i) = tup
    print(f"Starting thread {i}")
    partial_sum = 0
    numbers_added = 0
    while True:
        try:
            item = q.get()
            if item is None:
                # 'propagate' this 'sentinel' to anybody else                                                                                                                      
                q.put(None)
                break
            numbers_added += 1
            partial_sum += item
            # need to pretend that we're doing something asynchronous                                                                                                              
            time.sleep(random.random()/100)

    except Exception as e:
            print(f"(warning) Thread {i} got an exception {e}, that shouldn't happen.")
            break

    print(f"Thread {i} is done, saw a total of {numbers_added} numbers to add up")
    return partial_sum

MAX_RANGE = 1024
MAX_THREADS = 12

with ThreadPoolExecutor() as executor:

    # create a queue with numbers to add up                                                                                                                                        
    (q := queue.Queue()).queue = queue.deque(range(MAX_RANGE))

    # kick off the threads                                                                                                                                                         
    future_partials = executor.map(worker, [(q,i) for i in range(MAX_THREADS)])

    # they'll be done more or less instantly, but we'll make them wait                                                                                                             
    print("Threads launched with first batch ... sleeping 2 seconds")
    time.sleep(2)

    # threads are still available for more work!                                                                                                                                   
    for i in range(MAX_RANGE):
        q.put(i)

    print("Finished giving them another batch, this time we're not sleeping")

    # now we tell them all to wrap it up                                                                                                                                           
    q.put(None)
    # this will nicely catch the outputs                                                                                                                                           
    sum = functools.reduce(lambda x, y: x+y, future_partials)
    print(f"Got total sum {sum} (correct answer is {(MAX_RANGE-1)*MAX_RANGE}")

# Starting thread 0                                                                                                                                                                
# Starting thread 1                                                                                                                                                                
# Starting thread 2                                                                                                                                                                
# Starting thread 3                                                                                                                                                                
# Starting thread 4                                                                                                                                                                
# Starting thread 5                                                                                                                                                                
# Starting thread 6                                                                                                                                                                
# Starting thread 7                                                                                                                                                                
# Starting thread 8                                                                                                                                                                
# Starting thread 9                                                                                                                                                                
# Starting thread 10                                                                                                                                                               
# Starting thread 11                                                                                                                                                               
# Threads launched with first batch ... sleeping 2 seconds                                                                                                                         
# Finished giving them another batch, this time we're not sleeping                                                                                                                 
# Thread 0 is done, saw a total of 175 numbers to add up                                                                                                                           
# Thread 3 is done, saw a total of 178 numbers to add up                                                                                                                           
# Thread 11 is done, saw a total of 173 numbers to add up                                                                                                                          
# Thread 4 is done, saw a total of 177 numbers to add up                                                                                                                           
# Thread 9 is done, saw a total of 169 numbers to add up                                                                                                                           
# Thread 1 is done, saw a total of 172 numbers to add up                                                                                                                           
# Thread 7 is done, saw a total of 162 numbers to add up                                                                                                                           
# Thread 10 is done, saw a total of 161 numbers to add up                                                                                                                          
# Thread 5 is done, saw a total of 169 numbers to add up                                                                                                                           
# Thread 2 is done, saw a total of 157 numbers to add up                                                                                                                           
# Thread 6 is done, saw a total of 169 numbers to add up                                                                                                                           
# Thread 8 is done, saw a total of 186 numbers to add up                                                                                                                           
# Got total sum 1047552 (correct answer is 1047552      

                                                                                                                       

请注意事实上的“主线程”只需要推None送到队列中,类似于条件变量“信号”,线程都拾取(并传播)。

此外,这不使用Queue比标准(线程安全)队列更重的多处理器。上面的代码还具有易于修改为使用ProcessPoolExecutor或两者混合的好处(在任何一种情况下,您都需要使用multiprocessing.Queue)。

(旁注:一般来说,如果需要类来解决任何给定一代 Python 中的“基本”问题,在更现代的版本中通常会有新的选项。)

(旁注:代码是 Python 3.8+ 的唯一原因是因为我是赋值表达式的粉丝,根据上面的旁注,它解决了如何从列表中初始化队列的历史问题,而无需诉诸非功能性解决方案。)

于 2021-08-29T19:08:41.870 回答