我正在尝试利用 python 多处理池从共享队列中获取值并在工作进程中使用。
import multiprocessing
def myTask(queue):
while not queue.empty():
value = queue.get()
print "Process {} Popped {} from the shared Queue".format(multiprocessing.current_process().pid, value)
queue.task_done()
def main():
m = multiprocessing.Manager()
sharedQueue = m.Queue()
sharedQueue.put(2)
sharedQueue.put(3)
sharedQueue.put(4)
sharedQueue.put(5)
sharedQueue.put(6)
pool = multiprocessing.Pool(processes=3)
pool.apply_async(myTask, args=(sharedQueue,))
pool.close()
pool.join()
if __name__ == '__main__':
main()
从我得到的输出中,我看到只启动了一个进程,将所有值从队列中取出。如何并行生成多个进程,不断从队列中获取值。我对可能大于队列大小的进程数有最大限制。请在这方面指导我。谢谢。
PS:这只是一个例子。实际任务是将数据从一种形式迁移到另一种形式,并对数据进行一些繁重的操作。
更新:我做了以下修改,它们似乎工作,除了 pool.join() 阻止主进程退出,即使所有子进程都退出。
pool = multiprocessing.Pool(processes=4)
while not sharedQueue.empty():
pool.apply_async(myTask, args=(sharedQueue,))
pool.close()
#pool.join()
def myTask(queue):
value = queue.get()
print "Process {} Popped {} from the shared Queue".format(multiprocessing.current_process().pid, value)
queue.task_done()