等待(不旋转)直到两个(多处理)队列中的任何一个可用的最佳方法是什么,它们都驻留在同一系统上?
9 回答
实际上,您可以在 select.select 中使用 multiprocessing.Queue 对象。IE
que = multiprocessing.Queue()
(input,[],[]) = select.select([que._reader],[],[])
只有当它准备好被读取时才会选择 que。
虽然没有关于它的文档。我正在阅读 multiprocessing.queue 库的源代码(在 linux 上,它通常类似于 /usr/lib/python2.6/multiprocessing/queue.py)来找出它。
使用 Queue.Queue,我没有找到任何聪明的方法来做到这一点(我真的很想这样做)。
看起来还没有官方的方法来处理这个问题。或者至少,不是基于此:
您可以尝试类似这篇文章正在做的事情——访问底层管道文件句柄:
然后使用选择。
不确定多处理队列上的选择在 Windows 上的效果如何。由于 Windows 上的 select 侦听套接字而不是文件句柄,我怀疑可能存在问题。
我的答案是让一个线程以阻塞的方式监听每个队列,并将结果全部放入主线程监听的单个队列中,本质上是将各个队列多路复用到一个队列中。
我这样做的代码是:
"""
Allow multiple queues to be waited upon.
queue,value = multiq.select(list_of_queues)
"""
import queue
import threading
class queue_reader(threading.Thread):
def __init__(self,inq,sharedq):
threading.Thread.__init__(self)
self.inq = inq
self.sharedq = sharedq
def run(self):
while True:
data = self.inq.get()
print ("thread reads data=",data)
result = (self.inq,data)
self.sharedq.put(result)
class multi_queue(queue.Queue):
def __init__(self,list_of_queues):
queue.Queue.__init__(self)
for q in list_of_queues:
qr = queue_reader(q,self)
qr.start()
def select(list_of_queues):
outq = queue.Queue()
for q in list_of_queues:
qr = queue_reader(q,outq)
qr.start()
return outq.get()
以下测试例程显示了如何使用它:
import multiq
import queue
q1 = queue.Queue()
q2 = queue.Queue()
q3 = multiq.multi_queue([q1,q2])
q1.put(1)
q2.put(2)
q1.put(3)
q1.put(4)
res=0
while not res==4:
while not q3.empty():
res = q3.get()[1]
print ("returning result =",res)
希望这可以帮助。
托尼·华莱士
当以独立于平台的方式使用多处理时,似乎使用将传入项目转发到单个队列然后等待的线程是一个实际的选择。
避免线程需要处理低级管道/FD,这既是特定于平台的,又不容易与高级 API 一致地处理。
或者您需要能够设置回调的队列,我认为这是合适的更高级别的接口。即你会写这样的东西:
单队列 = 队列() incoming_queue1.setcallback(singlequeue.put) incoming_queue2.setcallback(singlequeue.put) ... 单队列.get()
也许多处理包可以增加这个 API,但它还没有。该概念适用于使用术语“通道”而不是“队列”的 py.execnet,请参见此处http://tinyurl.com/nmtr4w
您可以使用类似于观察者模式的东西,其中队列订阅者会收到状态更改的通知。
在这种情况下,您可以将您的工作线程指定为每个队列上的侦听器,并且每当它接收到就绪信号时,它就可以处理新项目,否则就休眠。
上述代码的新版本...
不确定多处理队列上的选择在 Windows 上的效果如何。由于 Windows 上的 select 侦听套接字而不是文件句柄,我怀疑可能存在问题。
我的答案是让一个线程以阻塞的方式监听每个队列,并将结果全部放入主线程监听的单个队列中,本质上是将各个队列多路复用到一个队列中。
我这样做的代码是:
"""
Allow multiple queues to be waited upon.
An EndOfQueueMarker marks a queue as
"all data sent on this queue".
When this marker has been accessed on
all input threads, this marker is returned
by the multi_queue.
"""
import queue
import threading
class EndOfQueueMarker:
def __str___(self):
return "End of data marker"
pass
class queue_reader(threading.Thread):
def __init__(self,inq,sharedq):
threading.Thread.__init__(self)
self.inq = inq
self.sharedq = sharedq
def run(self):
q_run = True
while q_run:
data = self.inq.get()
result = (self.inq,data)
self.sharedq.put(result)
if data is EndOfQueueMarker:
q_run = False
class multi_queue(queue.Queue):
def __init__(self,list_of_queues):
queue.Queue.__init__(self)
self.qList = list_of_queues
self.qrList = []
for q in list_of_queues:
qr = queue_reader(q,self)
qr.start()
self.qrList.append(qr)
def get(self,blocking=True,timeout=None):
res = []
while len(res)==0:
if len(self.qList)==0:
res = (self,EndOfQueueMarker)
else:
res = queue.Queue.get(self,blocking,timeout)
if res[1] is EndOfQueueMarker:
self.qList.remove(res[0])
res = []
return res
def join(self):
for qr in self.qrList:
qr.join()
def select(list_of_queues):
outq = queue.Queue()
for q in list_of_queues:
qr = queue_reader(q,outq)
qr.start()
return outq.get()
以下代码是我的测试例程,以显示它是如何工作的:
import multiq
import queue
q1 = queue.Queue()
q2 = queue.Queue()
q3 = multiq.multi_queue([q1,q2])
q1.put(1)
q2.put(2)
q1.put(3)
q1.put(4)
q1.put(multiq.EndOfQueueMarker)
q2.put(multiq.EndOfQueueMarker)
res=0
have_data = True
while have_data:
res = q3.get()[1]
print ("returning result =",res)
have_data = not(res==multiq.EndOfQueueMarker)
从 Python 3.3 开始,您可以使用multiprocessing.connection.wait一次等待多个Queue._reader
对象。
我通常很想多路复用多个队列的一种情况是,每个队列对应于需要不同处理程序的不同类型的消息。您不能只从一个队列中提取,因为如果它不是您想要的消息类型,您需要将其放回。
但是,在这种情况下,每个处理程序本质上都是一个单独的消费者,这使其成为一个多生产者、多消费者的问题。幸运的是,即使在这种情况下,您仍然不需要阻塞多个队列。您可以为每个处理程序创建不同的线程/进程,每个处理程序都有自己的队列。基本上,您可以将其分解为多生产者、单消费者问题的多个实例。
我能想到的唯一需要在多个队列上等待的情况是,如果您被迫将多个处理程序放在同一个线程/进程中。在这种情况下,我会通过为我的主线程创建一个队列,为每个处理程序生成一个线程,并让处理程序使用主队列与主线程通信来重组它。然后,每个处理程序可以为其独特的消息类型创建一个单独的队列。
不要这样做。
在消息上放置一个标题并将它们发送到一个公共队列。这简化了代码,整体上会更干净。