我很好奇是否有办法multiprocessing.Queue
手动锁定对象。
我设置了一个非常标准的生产者/消费者模式,其中我的主线程不断产生一系列值,并且一组multiprocessing.Process
工作人员正在对产生的值起作用。
这一切都是通过一个鞋底来控制的multiprocessing.Queue()
。
import time
import multiprocessing
class Reader(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def run(self):
while True:
item = self.queue.get()
if isinstance(item, str):
break
if __name__ == '__main__':
queue = multiprocessing.Queue()
reader = Reader(queue)
reader.start()
start_time = time.time()
while time.time() - start_time < 10:
queue.put(1)
queue.put('bla bla bla sentinal')
queue.join()
我遇到的问题是我的工作池不能queue
像主线程向其中插入值一样快地消耗和处理。所以经过一段时间后,Queue 变得非常笨拙,以至于它会弹出一个 MemoryError。
一个明显的解决方案是简单地在生产者中添加一个等待检查,以阻止其将更多值放入队列。类似于以下内容:
while time.time() - start_time < 10:
queue.put(1)
while queue.qsize() > some_size:
time.sleep(.1)
queue.put('bla bla bla sentinal')
queue.join()
但是,由于程序的时髦性质,我想将队列中的所有内容转储到文件中以供以后处理。但!由于无法暂时锁定队列,工作人员无法消耗其中的所有内容,因为生产者不断用垃圾填充它 - 无论如何从概念上讲。经过无数次测试后,似乎在某些时候其中一个锁获胜(但通常是添加到队列中的那个)。
编辑:另外,我意识到可以简单地停止生产者并从该线程中消费它......但这让我心中的单一责任人感到难过,因为生产者是生产者,而不是消费者。
编辑:
在查看了 的来源后Queue
,我想出了这个:
def dump_queue(q):
q._rlock.acquire()
try:
res = []
while not q.empty():
res.append(q._recv())
q._sem.release()
return res
finally:
q._rlock.release()
但是,我太害怕使用它了!我不知道这是否“正确”。我没有足够的把握来知道这是否会在不炸毁任何Queue
内部结构的情况下保持下去。
有谁知道这会不会坏?:)