我在 python3 中运行多个进程时遇到问题。
我的程序执行以下操作: 1. 从 sqllite 数据库中获取条目并将它们传递给 input_queue 2. 创建多个进程,从 input_queue 中取出项目,通过函数运行它并将结果输出到输出队列。3. 创建一个线程,将项目从 output_queue 中取出并打印出来(这个线程显然是在前 2 步之前启动的)
我的问题是,目前第 2 步中的“函数”仅运行与设置的进程数一样多的次数,例如,如果您将进程数设置为 8,它仅运行 8 次然后停止。我假设它会继续运行,直到它从 input_queue 中取出所有项目。
我是否需要重写将条目从数据库(步骤 1)中取出到另一个进程中的函数,然后将其输出队列作为步骤 2 的输入队列传递?
编辑:这是代码示例,我使用数字列表代替数据库条目,因为它仍然以相同的方式执行。我的列表中有 300 个项目,我希望它处理所有 300 个项目,但目前它只处理 10 个(我分配的进程数)
#!/usr/bin/python3
from multiprocessing import Process,Queue
import multiprocessing
from threading import Thread
## This is the class that would be passed to the multi_processing function
class Processor:
def __init__(self,out_queue):
self.out_queue = out_queue
def __call__(self,in_queue):
data_entry = in_queue.get()
result = data_entry*2
self.out_queue.put(result)
#Performs the multiprocessing
def perform_distributed_processing(dbList,threads,processor_factory,output_queue):
input_queue = Queue()
# Create the Data processors.
for i in range(threads):
processor = processor_factory(output_queue)
data_proc = Process(target = processor,
args = (input_queue,))
data_proc.start()
# Push entries to the queue.
for entry in dbList:
input_queue.put(entry)
# Push stop markers to the queue, one for each thread.
for i in range(threads):
input_queue.put(None)
data_proc.join()
output_queue.put(None)
if __name__ == '__main__':
output_results = Queue()
def output_results_reader(queue):
while True:
item = queue.get()
if item is None:
break
print(item)
# Establish results collecting thread.
results_process = Thread(target = output_results_reader,args = (output_results,))
results_process.start()
# Use this as a substitute for the database in the example
dbList = [i for i in range(300)]
# Perform multi processing
perform_distributed_processing(dbList,10,Processor,output_results)
# Wait for it all to finish.
results_process.join()