任何熟悉我如何在 python 中实现多处理优先级队列的人?
6 回答
唉,它没有像改变旧的排队规则那么简单Queue.Queue
:后者实际上被设计为根据模板方法模式进行子类化,并且只覆盖钩子方法_put
和/或_get
可以轻松地允许改变排队规则(在提供了 2.6 显式 LIFO 和优先级实现,但即使在早期版本的 Python 中也很容易实现)。
对于多处理,在一般情况下(多个读取器,多个写入器),除了放弃队列的分布式特性外,我没有看到如何实现优先级队列的解决方案;指定一个特殊的辅助进程,它除了处理队列之外什么都不做,向它发送(基本上)RPC 以创建具有指定规则的队列,执行 put 和 get 操作,获取有关它的信息等等。因此,人们会遇到有关确保每个进程都知道辅助 proc 的位置(例如主机和端口)等的常见问题(如果进程总是在主 proc 启动时产生,则更容易)。一个相当大的问题,特别是如果想要以良好的性能做到这一点,可以防止 aux proc 崩溃(需要将数据复制到从属进程,如果主崩溃,则在从属之间分布式“主选举”,&c),等等。从头开始听起来像是一个博士的工作。一个可能从Johnson 的工作,或者搭载一些非常通用的方法,例如ActiveMQ。
一些特殊情况(例如,单读、单写)可能更容易,并且由于其有限的应用领域而变得更快;但是随后应该为该有限区域制定一个非常具体受限的规范 - 结果将不构成(通用)“多处理队列”,但仅适用于给定的受限要求集。
有一个错误会阻止真正的 FIFO。在这里
阅读。
构建优先级队列多处理设置的另一种方法肯定很棒!
虽然这不是答案,但也许它可以帮助您开发多处理队列。
这是我使用 Python 的Array编写的一个简单的优先级队列类:
class PriorityQueue():
"""A basic priority queue that dequeues items with the smallest priority number."""
def __init__(self):
"""Initializes the queue with no items in it."""
self.array = []
self.count = 0
def enqueue(self, item, priority):
"""Adds an item to the queue."""
self.array.append([item, priority])
self.count += 1
def dequeue(self):
"""Removes the highest priority item (smallest priority number) from the queue."""
max = -1
dq = 0
if(self.count > 0):
self.count -= 1
for i in range(len(self.array)):
if self.array[i][1] != None and self.array[i][1] > max:
max = self.array[i][1]
if max == -1:
return self.array.pop(0)
else:
for i in range(len(self.array)):
if self.array[i][1] != None and self.array[i][1] <= max:
max = self.array[i][1]
dq = i
return self.array.pop(dq)
def requeue(self, item, newPrio):
"""Changes specified item's priority."""
for i in range(len(self.array)):
if self.array[i][0] == item:
self.array[i][1] = newPrio
break
def returnArray(self):
"""Returns array representation of the queue."""
return self.array
def __len__(self):
"""Returnes the length of the queue."""
return self.count
我有同样的用例。但是有有限数量的优先级。
我最终要做的是为每个优先级创建一个队列,我的流程工作人员将尝试从这些队列中获取项目,从最重要的队列开始到不太重要的队列(从一个队列移动到另一个队列时完成队列为空)
受@user211505 建议的启发,我整理了一些又快又脏的东西。
请注意,这并不是多处理生产环境中优先级队列难题的完整解决方案。但是,如果您只是在乱搞或需要一些东西来完成一个短期项目,那么这可能符合要求:
from time import sleep
from datetime import datetime
from Queue import Empty
from multiprocessing import Queue as ProcessQueue
class SimplePriorityQueue(object):
'''
Simple priority queue that works with multiprocessing. Only a finite number
of priorities are allowed. Adding many priorities slow things down.
Also: no guarantee that this will pull the highest priority item
out of the queue if many items are being added and removed. Race conditions
exist where you may not get the highest priority queue item. However, if
you tend to keep your queues not empty, this will be relatively rare.
'''
def __init__(self, num_priorities=1, default_sleep=.2):
self.queues = []
self.default_sleep = default_sleep
for i in range(0, num_priorities):
self.queues.append(ProcessQueue())
def __repr__(self):
return "<Queue with %d priorities, sizes: %s>"%(len(self.queues),
", ".join(map(lambda (i, q): "%d:%d"%(i, q.qsize()),
enumerate(self.queues))))
qsize = lambda(self): sum(map(lambda q: q.qsize(), self.queues))
def get(self, block=True, timeout=None):
start = datetime.utcnow()
while True:
for q in self.queues:
try:
return q.get(block=False)
except Empty:
pass
if not block:
raise Empty
if timeout and (datetime.utcnow()-start).total_seconds > timeout:
raise Empty
if timeout:
time_left = (datetime.utcnow()-start).total_seconds - timeout
sleep(time_left/4)
else:
sleep(self.default_sleep)
get_nowait = lambda(self): self.get(block=False)
def put(self, priority, obj, block=False, timeout=None):
if priority < 0 or priority >= len(self.queues):
raise Exception("Priority %d out of range."%priority)
# Block and timeout don't mean much here because we never set maxsize
return self.queues[priority].put(obj, block=block, timeout=timeout)
根据您的要求,您可以通过多种方式使用操作系统和文件系统。队列将增长多大,它必须有多快?如果队列可能很大,但您愿意为每个队列访问打开几个文件,您可以使用 BTree 实现来存储队列和文件锁定以强制执行独占访问。缓慢但稳健。
如果队列将保持相对较小并且您需要它更快,您可能可以在某些操作系统上使用共享内存......
如果队列很小(1000 个条目)并且您不需要它非常快,您可以使用像目录一样简单的东西,其中包含包含文件锁定数据的文件。如果小而慢是可以的,这将是我的偏好。
如果队列可能很大并且您希望它平均快速,那么您可能应该使用 Alex 建议的专用服务器进程。然而,这是一种颈部疼痛。
您的性能和尺寸要求是什么?