59

我需要在我的 Python 代码中使用优先级队列,并且:

  • 我正在寻找优先级队列的任何快速实现
  • 最佳情况下,我希望队列是通用的(即适用于具有指定比较运算符的任何对象)。

环顾四周寻找有效的东西,我遇到了heapq,但是:

  • 我正在寻找比 更快的东西heapq,它是在本机 Python 中实现的,所以它并不快。
  • 它看起来不错,但似乎只为整数指定。我想它适用于任何具有比较运算符的对象,但它没有指定它需要什么比较运算符。
  • 更新:重新比较heapq,我可以(priority, object)按照查理马丁的建议使用 a ,或者只是__cmp__为我的对象实现。
4

13 回答 13

50

您可以使用Queue.PriorityQueue

回想一下,Python 不是强类型的,所以你可以保存任何你喜欢的东西:只需创建一个元组就可以了(priority, thing)

于 2009-01-02T19:11:31.367 回答
21

在使用优先级队列的时候,reduce-key是很多算法(Dijkstra's Algorithm, A*, OPTICS)的必备操作,不知道为什么Python内置的优先级队列不支持。其他答案都没有提供支持此功能的解决方案。

Daniel Stutzbach的这个实现也支持减少键操作的优先级队列在 Python 3.5 中非常适合我。

from heapdict import heapdict

hd = heapdict()
hd["two"] = 2
hd["one"] = 1
obj = hd.popitem()
print("object:",obj[0])
print("priority:",obj[1])

# object: one
# priority: 1
于 2016-05-03T15:18:23.523 回答
18

我最终实现了一个包装器heapq,添加了一个字典来保持队列元素的唯一性。结果应该对所有运算符都非常有效:

class PriorityQueueSet(object):

    """
    Combined priority queue and set data structure.

    Acts like a priority queue, except that its items are guaranteed to be
    unique. Provides O(1) membership test, O(log N) insertion and O(log N)
    removal of the smallest item.

    Important: the items of this data structure must be both comparable and
    hashable (i.e. must implement __cmp__ and __hash__). This is true of
    Python's built-in objects, but you should implement those methods if you
    want to use the data structure for custom objects.
    """

    def __init__(self, items=[]):
        """
        Create a new PriorityQueueSet.

        Arguments:
            items (list): An initial item list - it can be unsorted and
                non-unique. The data structure will be created in O(N).
        """
        self.set = dict((item, True) for item in items)
        self.heap = self.set.keys()
        heapq.heapify(self.heap)

    def has_item(self, item):
        """Check if ``item`` exists in the queue."""
        return item in self.set

    def pop_smallest(self):
        """Remove and return the smallest item from the queue."""
        smallest = heapq.heappop(self.heap)
        del self.set[smallest]
        return smallest

    def add(self, item):
        """Add ``item`` to the queue if doesn't already exist."""
        if item not in self.set:
            self.set[item] = True
            heapq.heappush(self.heap, item)
于 2009-01-02T20:19:36.113 回答
12

您可以将 heapq 用于非整数元素(元组):

import heapq

heap = []
data = [(10,"ten"), (3,"three"), (5,"five"), (7,"seven"), (9, "nine"), (2,"two")]
for item in data:
    heapq.heappush(heap, item)
sorted_data = []
while heap:
    sorted_data.append(heapq.heappop(heap))
print(sorted_data)
data.sort()
print(data == sorted_data)

这将比queue.PriorityQueue最佳答案中推荐的选项快得多,并且与 不同的是,如果您尝试从空堆中弹出queue.PriorityQueue,则不会永远挂起。heapq

于 2011-10-12T07:11:32.850 回答
7

我没用过,但你可以试试PyHeap。它是用 C 编写的,所以希望它对你来说足够快。

你肯定 heapq/PriorityQueue 不够快吗?可能值得从其中一个开始,然后进行分析以查看它是否真的是您的性能瓶颈。

于 2009-01-02T19:57:30.650 回答
6

您是否查看了 heapq 页面上的“显示源代码”链接?有一个使用带有 (int, char) 元组列表的堆作为优先级队列的示例。

于 2009-01-02T19:13:33.967 回答
3

我正在使用这样priority queue的python 3实现a-queue.PriorityQueue

from queue import PriorityQueue

class PqElement(object):
    def __init__(self, value: int):
        self.val = value

    #Custom Compare Function (less than or equsal)
    def __lt__(self, other):
        """self < obj."""
        return self.val > other.val

    #Print each element function
    def __repr__(self):
        return f'PQE:{self.val}'

#Usage-
pq = PriorityQueue()
pq.put(PqElement(v))       #Add Item      - O(Log(n))
topValue = pq.get()        #Pop top item  - O(1)
topValue = pq.queue[0].val #Get top value - O(1)
于 2020-12-14T15:33:52.410 回答
2

这很有效,也适用于字符串或任何类型的输入-:)

import itertools
from heapq import heappush, heappop

pq = []                         # list of entries arranged in a heap
entry_finder = {}               # mapping of tasks to entries
REMOVED = '<removed-task>'      # placeholder for a removed task
counter = itertools.count()     # unique sequence count

def add_task(task, priority=0):
    'Add a new task or update the priority of an existing task'
    if task in entry_finder:
        remove_task(task)
    count = next(counter)
    entry = [priority, count, task]
    entry_finder[task] = entry
    heappush(pq, entry)

def remove_task(task):
    'Mark an existing task as REMOVED.  Raise KeyError if not found.'
    entry = entry_finder.pop(task)
    entry[-1] = REMOVED

def pop_task():
    'Remove and return the lowest priority task. Raise KeyError if empty.'
    while pq:
        priority, count, task = heappop(pq)
        if task is not REMOVED:
            del entry_finder[task]
            return task
    raise KeyError('pop from an empty priority queue')

参考: http ://docs.python.org/library/heapq.html

于 2012-04-01T19:57:11.263 回答
1

我在https://pypi.python.org/pypi/fibonacci-heap-mod有一个优先级队列/斐波那契堆

它并不快(delete-min 上的大常数 c,即 O(c*logn))。但是 find-min、insert、reduce-key 和 merge 都是 O(1) - IOW,它很懒。

如果在 CPython 上太慢,您可以尝试 Pypy、Nuitka 甚至 CPython+Numba :)

于 2014-11-06T18:01:32.220 回答
0

我可以(priority, object)按照查理马丁的建议使用 a ,或者只是__cmp__为我的对象实现。

如果您希望根据特定规则对插入的对象进行优先级排序,我发现编写一个PriorityQueue接受键函数的简单子类非常有帮助。您不必(priority, object)手动插入元组,并且处理感觉更自然。

所需行为的演示

>>> h = KeyHeap(sum)
>>> h.put([-1,1])
>>> h.put((-1,-2,-3))
>>> h.put({100})
>>> h.put([1,2,3])
>>> h.get()
(-1, -2, -3)
>>> h.get()
[-1, 1]
>>> h.get()
[1, 2, 3]
>>> h.get()
set([100])
>>> h.empty()
True
>>>
>>> k = KeyHeap(len)
>>> k.put('hello')
>>> k.put('stackoverflow')
>>> k.put('!')
>>> k.get()
'!'
>>> k.get()
'hello'
>>> k.get()
'stackoverflow'

Python 2 代码

from Queue import PriorityQueue

class KeyHeap(PriorityQueue):
    def __init__(self, key, maxsize=0):            
        PriorityQueue.__init__(self, maxsize)
        self.key = key

    def put(self, x):
        PriorityQueue.put(self, (self.key(x), x))

    def get(self):
        return PriorityQueue.get(self)[1]

Python 3 代码

from queue import PriorityQueue

class KeyHeap(PriorityQueue):
    def __init__(self, key, maxsize=0):            
        super().__init__(maxsize)
        self.key = key

    def put(self, x):
        super().put((self.key(x), x))

    def get(self):
        return super().get()[1]

显然,put如果您尝试插入您的键功能无法处理的对象,调用将(并且应该!)引发错误。

于 2016-01-18T20:57:50.090 回答
0

如果您想保持整个列表有序,而不仅仅是最高值,我在多个项目中使用了此代码的一些变体,它是list用类似 api 替换标准类的一个下降:

import bisect

class OrderedList(list):
    """Keep a list sorted as you append or extend it

    An ordered list, this sorts items from smallest to largest using key, so
    if you want MaxQueue like functionality use negative values: .pop(-1) and
    if you want MinQueue like functionality use positive values: .pop(0)
    """
    def __init__(self, iterable=None, key=None):
        if key:
            self.key = key
        self._keys = []
        super(OrderedList, self).__init__()
        if iterable:
            for x in iterable:
                self.append(x)

    def key(self, x):
        return x

    def append(self, x):
        k = self.key(x)
        # https://docs.python.org/3/library/bisect.html#bisect.bisect_right
        i = bisect.bisect_right(self._keys, k)
        if i is None:
            super(OrderedList, self).append((self.key(x), x))
            self._keys.append(k)
        else:
            super(OrderedList, self).insert(i, (self.key(x), x))
            self._keys.insert(i, k)

    def extend(self, iterable):
        for x in iterable:
            self.append(x)

    def remove(self, x):
        k = self.key(x)
        self._keys.remove(k)
        super(OrderedList, self).remove((k, x))

    def pop(self, i=-1):
        self._keys.pop(i)
        return super(OrderedList, self).pop(i)[-1]

    def clear(self):
        super(OrderedList, self).clear()
        self._keys.clear()

    def __iter__(self):
        for x in super(OrderedList, self).__iter__():
            yield x[-1]

    def __getitem__(self, i):
        return super(OrderedList, self).__getitem__(i)[-1]

    def insert(self, i, x):
        raise NotImplementedError()
    def __setitem__(self, x):
        raise NotImplementedError()
    def reverse(self):
        raise NotImplementedError()
    def sort(self):
        raise NotImplementedError()

它可以像(priority, value)默认一样处理元组,但您也可以像这样自定义它:

class Val(object):
    def __init__(self, priority, val):
        self.priority = priority
        self.val = val

h = OrderedList(key=lambda x: x.priority)

h.append(Val(100, "foo"))
h.append(Val(10, "bar"))
h.append(Val(200, "che"))

print(h[0].val) # "bar"
print(h[-1].val) # "che"
于 2019-09-27T00:58:48.757 回答
0

如果您只有一个“更高优先级”级别而不是 支持的任意多个queue.PriorityQueue,您可以collections.deque通过在左侧插入正常作业并在右侧.appendleft()插入更高优先级的条目来有效地使用 a.append()

queue 和 deque 实例都有线程安全的 push/pop 方法

双端队列的其他优势

  • 允许查看任意元素(可索引和可​​迭代而不会弹出,而队列实例只能弹出)
  • 明显快于queue.PriorityQueue(见下面的粗略测试)

关于长度限制的注意事项

  • 设置长度将让它从任一端推出元素,而不仅仅是从左边推出,这与阻塞或引发的队列实例不同queue.Full
  • 如果输入速率超过消耗,任何无限集合最终都会使您的系统内存不足
import threading
from collections import deque as Deque

Q = Deque()  # don't set a maximum length

def worker_queue_creator(q):
    sleepE = threading.Event()  # use wait method for sleeping thread
    sleepE.wait(timeout=1)

    for index in range(3):  # start with a few jobs
        Q.appendleft("low job {}".format(index))

    Q.append("high job 1")  # add an important job

    for index in range(3, 3+3):  # add a few more jobs
        Q.appendleft("low job {}".format(index))

    # one more important job before ending worker
    sleepE.wait(timeout=2)
    Q.append("high job 2")

    # wait while the consumer worker processes these before exiting
    sleepE.wait(timeout=5)

def worker_queue_consumer(q):
    """ daemon thread which consumes queue forever """
    sleepE = threading.Event()  # use wait method for sleeping thread
    sleepE.wait(timeout=1)  # wait a moment to mock startup
    while True:
        try:
            pre_q_str = str(q)  # see what the Deque looks like before before pop
            job = q.pop()
        except IndexError:  # Deque is empty
            pass            # keep trying forever
        else:  # successfully popped job
            print("{}: {}".format(job, pre_q_str))
        sleepE.wait(timeout=0.4)  # quickly consume jobs


# create threads to consume and display the queue
T = [
    threading.Thread(target=worker_queue_creator, args=(Q,)),
    threading.Thread(target=worker_queue_consumer, args=(Q,), daemon=True),
]

for t in T:
    t.start()

T[0].join()  # wait on sleep in worker_queue_creator to quit
% python3 deque_as_priorityqueue.py
high job 1: deque(['low job 5', 'low job 4', 'low job 3', 'low job 2', 'low job 1', 'low job 0', 'high job 1'])
low job 0: deque(['low job 5', 'low job 4', 'low job 3', 'low job 2', 'low job 1', 'low job 0'])
low job 1: deque(['low job 5', 'low job 4', 'low job 3', 'low job 2', 'low job 1'])
low job 2: deque(['low job 5', 'low job 4', 'low job 3', 'low job 2'])
low job 3: deque(['low job 5', 'low job 4', 'low job 3'])
high job 2: deque(['low job 5', 'low job 4', 'high job 2'])
low job 4: deque(['low job 5', 'low job 4'])
low job 5: deque(['low job 5'])

比较

import timeit

NUMBER = 1000

values_builder = """
low_priority_values  = [(1, "low-{}".format(index)) for index in range(5000)]
high_priority_values = [(0, "high-{}".format(index)) for index in range(1000)]
"""

deque_setup = """
from collections import deque as Deque
Q = Deque()
"""
deque_logic_input = """
for item in low_priority_values:
    Q.appendleft(item[1])  # index into tuples to remove priority
for item in high_priority_values:
    Q.append(item[1])
"""
deque_logic_output = """
while True:
    try:
        v = Q.pop()
    except IndexError:
        break
"""

queue_setup = """
from queue import PriorityQueue
from queue import Empty
Q = PriorityQueue()
"""
queue_logic_input = """
for item in low_priority_values:
    Q.put(item)
for item in high_priority_values:
    Q.put(item)
"""

queue_logic_output = """
while True:
    try:
        v = Q.get_nowait()
    except Empty:
        break
"""

# abuse string catenation to build the setup blocks
results_dict = {
    "deque input":  timeit.timeit(deque_logic_input, setup=deque_setup+values_builder, number=NUMBER),
    "queue input":  timeit.timeit(queue_logic_input, setup=queue_setup+values_builder, number=NUMBER),
    "deque output": timeit.timeit(deque_logic_output, setup=deque_setup+values_builder+deque_logic_input, number=NUMBER),
    "queue output": timeit.timeit(queue_logic_output, setup=queue_setup+values_builder+queue_logic_input, number=NUMBER),
}

for k, v in results_dict.items():
    print("{}: {}".format(k, v))

结果(推送和弹出 6000 个元素,timeit number=1000

% python3 deque_priorityqueue_compare.py
deque input: 0.853059
queue input: 24.504084000000002
deque output: 0.0013576999999997952
queue output: 0.02025689999999969

虽然这是一个展示 deque 性能的虚构示例,但PriorityQueue' 的插入时间是其长度的重要函数,O(log n)甚至更糟,而Deque 是O(1),因此它应该相当代表真实用例

于 2020-05-08T17:25:57.157 回答
0

一个简单的工具:

因为PriorityQueue首先较低。

from queue import PriorityQueue


class PriorityQueueWithKey(PriorityQueue):
    def __init__(self, key=None, maxsize=0):
        super().__init__(maxsize)
        self.key = key

    def put(self, item):
        if self.key is None:
            super().put((item, item))
        else:
            super().put((self.key(item), item))

    def get(self):
        return super().get(self.queue)[1]


a = PriorityQueueWithKey(abs)
a.put(-4)
a.put(-3)
print(*a.queue)
于 2020-11-26T03:57:33.433 回答