如果您只有一个“更高优先级”级别而不是 支持的任意多个queue.PriorityQueue
,您可以collections.deque
通过在左侧插入正常作业并在右侧.appendleft()
插入更高优先级的条目来有效地使用 a.append()
queue 和 deque 实例都有线程安全的 push/pop 方法
双端队列的其他优势
- 允许查看任意元素(可索引和可迭代而不会弹出,而队列实例只能弹出)
- 明显快于
queue.PriorityQueue
(见下面的粗略测试)
关于长度限制的注意事项
- 设置长度将让它从任一端推出元素,而不仅仅是从左边推出,这与阻塞或引发的队列实例不同
queue.Full
- 如果输入速率超过消耗,任何无限集合最终都会使您的系统内存不足
import threading
from collections import deque as Deque
Q = Deque() # don't set a maximum length
def worker_queue_creator(q):
sleepE = threading.Event() # use wait method for sleeping thread
sleepE.wait(timeout=1)
for index in range(3): # start with a few jobs
Q.appendleft("low job {}".format(index))
Q.append("high job 1") # add an important job
for index in range(3, 3+3): # add a few more jobs
Q.appendleft("low job {}".format(index))
# one more important job before ending worker
sleepE.wait(timeout=2)
Q.append("high job 2")
# wait while the consumer worker processes these before exiting
sleepE.wait(timeout=5)
def worker_queue_consumer(q):
""" daemon thread which consumes queue forever """
sleepE = threading.Event() # use wait method for sleeping thread
sleepE.wait(timeout=1) # wait a moment to mock startup
while True:
try:
pre_q_str = str(q) # see what the Deque looks like before before pop
job = q.pop()
except IndexError: # Deque is empty
pass # keep trying forever
else: # successfully popped job
print("{}: {}".format(job, pre_q_str))
sleepE.wait(timeout=0.4) # quickly consume jobs
# create threads to consume and display the queue
T = [
threading.Thread(target=worker_queue_creator, args=(Q,)),
threading.Thread(target=worker_queue_consumer, args=(Q,), daemon=True),
]
for t in T:
t.start()
T[0].join() # wait on sleep in worker_queue_creator to quit
% python3 deque_as_priorityqueue.py
high job 1: deque(['low job 5', 'low job 4', 'low job 3', 'low job 2', 'low job 1', 'low job 0', 'high job 1'])
low job 0: deque(['low job 5', 'low job 4', 'low job 3', 'low job 2', 'low job 1', 'low job 0'])
low job 1: deque(['low job 5', 'low job 4', 'low job 3', 'low job 2', 'low job 1'])
low job 2: deque(['low job 5', 'low job 4', 'low job 3', 'low job 2'])
low job 3: deque(['low job 5', 'low job 4', 'low job 3'])
high job 2: deque(['low job 5', 'low job 4', 'high job 2'])
low job 4: deque(['low job 5', 'low job 4'])
low job 5: deque(['low job 5'])
比较
import timeit
NUMBER = 1000
values_builder = """
low_priority_values = [(1, "low-{}".format(index)) for index in range(5000)]
high_priority_values = [(0, "high-{}".format(index)) for index in range(1000)]
"""
deque_setup = """
from collections import deque as Deque
Q = Deque()
"""
deque_logic_input = """
for item in low_priority_values:
Q.appendleft(item[1]) # index into tuples to remove priority
for item in high_priority_values:
Q.append(item[1])
"""
deque_logic_output = """
while True:
try:
v = Q.pop()
except IndexError:
break
"""
queue_setup = """
from queue import PriorityQueue
from queue import Empty
Q = PriorityQueue()
"""
queue_logic_input = """
for item in low_priority_values:
Q.put(item)
for item in high_priority_values:
Q.put(item)
"""
queue_logic_output = """
while True:
try:
v = Q.get_nowait()
except Empty:
break
"""
# abuse string catenation to build the setup blocks
results_dict = {
"deque input": timeit.timeit(deque_logic_input, setup=deque_setup+values_builder, number=NUMBER),
"queue input": timeit.timeit(queue_logic_input, setup=queue_setup+values_builder, number=NUMBER),
"deque output": timeit.timeit(deque_logic_output, setup=deque_setup+values_builder+deque_logic_input, number=NUMBER),
"queue output": timeit.timeit(queue_logic_output, setup=queue_setup+values_builder+queue_logic_input, number=NUMBER),
}
for k, v in results_dict.items():
print("{}: {}".format(k, v))
结果(推送和弹出 6000 个元素,timeit number=1000
)
% python3 deque_priorityqueue_compare.py
deque input: 0.853059
queue input: 24.504084000000002
deque output: 0.0013576999999997952
queue output: 0.02025689999999969
虽然这是一个展示 deque 性能的虚构示例,但PriorityQueue
' 的插入时间是其长度的重要函数,O(log n)
甚至更糟,而Deque 是O(1)
,因此它应该相当代表真实用例