0

我在 python3 中运行多个进程时遇到问题。

我的程序执行以下操作: 1. 从 sqllite 数据库中获取条目并将它们传递给 input_queue 2. 创建多个进程,从 input_queue 中取出项目,通过函数运行它并将结果输出到输出队列。3. 创建一个线程,将项目从 output_queue 中取出并打印出来(这个线程显然是在前 2 步之前启动的)

我的问题是,目前第 2 步中的“函数”仅运行与设置的进程数一样多的次数,例如,如果您将进程数设置为 8,它仅运行 8 次然后停止。我假设它会继续运行,直到它从 input_queue 中取出所有项目。

我是否需要重写将条目从数据库(步骤 1)中取出到另一个进程中的函数,然后将其输出队列作为步骤 2 的输入队列传递?

编辑:这是代码示例,我使用数字列表代替数据库条目,因为它仍然以相同的方式执行。我的列表中有 300 个项目,我希望它处理所有 300 个项目,但目前它只处理 10 个(我分配的进程数)

#!/usr/bin/python3
from multiprocessing import Process,Queue
import multiprocessing
from threading import Thread


## This is the class that would be passed to the multi_processing function
class Processor:
    def __init__(self,out_queue):
        self.out_queue = out_queue
    def __call__(self,in_queue):
        data_entry = in_queue.get()
        result = data_entry*2
        self.out_queue.put(result)



#Performs the multiprocessing
def perform_distributed_processing(dbList,threads,processor_factory,output_queue):
    input_queue = Queue()


    # Create the Data processors.
    for i in range(threads):
        processor  = processor_factory(output_queue)
        data_proc = Process(target = processor,
                            args   = (input_queue,))

        data_proc.start()

    # Push entries to the queue.

    for entry in dbList:
        input_queue.put(entry)


    # Push stop markers to the queue, one for each thread.

    for i in range(threads):
        input_queue.put(None)

    data_proc.join()
    output_queue.put(None)


if __name__ == '__main__':
    output_results   = Queue()

    def output_results_reader(queue):
        while True:
            item = queue.get()
            if item is None:
                break
            print(item)


    # Establish results collecting thread.
    results_process = Thread(target = output_results_reader,args   = (output_results,))
    results_process.start()

    # Use this as a substitute for the database in the example
    dbList = [i for i in range(300)]

    # Perform multi processing
    perform_distributed_processing(dbList,10,Processor,output_results)

    # Wait for it all to finish.
    results_process.join()
4

2 回答 2

2

为输入队列提供服务并写入输出队列的进程集合几乎就是进程池的定义。

如果您想知道如何从头开始构建一个,最好的学习方法是查看源代码multiprocessing.Pool,它是非常简单的 Python,并且编写得非常好。但是,正如您所料,您可以只使用 multiprocessing.Pool而不是重新实现它。文档中的示例非常好。

但实际上,您可以通过使用执行程序而不是池来使这更加简单。很难解释差异(再次阅读两个模块的文档),但基本上,未来是一个“智能”结果对象,这意味着您不是一个具有各种不同方式来运行作业和获得结果的池,而是只需要一个愚蠢的东西,除了返回期货之外什么都不知道。(当然,在最简单的情况下,代码看起来几乎相同......)

from concurrent.futures import ProcessPoolExecutor

def Processor(data_entry):
    return data_entry*2

def perform_distributed_processing(dbList, threads, processor_factory):
    with ProcessPoolExecutor(processes=threads) as executor:
        yield from executor.map(processor_factory, dbList)

if __name__ == '__main__':
    # Use this as a substitute for the database in the example
    dbList = [i for i in range(300)]
    for result in perform_distributed_processing(dbList, 8, Processor):
        print(result)

或者,如果您想按顺序而不是按顺序处理它们:

def perform_distributed_processing(dbList, threads, processor_factory):
    with ProcessPoolExecutor(processes=threads) as executor:
        fs = (executor.submit(processor_factory, db) for db in dbList)
        yield from map(Future.result, as_completed(fs))

请注意,我还替换了您的进程内队列和线程,因为它没有做任何事情,只是提供了一种交错“等待下一个结果”和“处理最新结果”的方法,并且yield(或者yield from,在这种情况下)这样做没有所有的复杂性、开销和出错的可能性。

于 2013-08-21T21:01:22.277 回答
2

不要尝试再次重写整个多处理库。我认为您可以根据需要使用任何multiprocessing.Pool方法 - 如果这是一个批处理作业,您甚至可以使用同步multiprocessing.Pool.map()- 只需编写一个生成器来生成线程输入,而不是推送到输入队列。

于 2013-08-21T01:51:37.340 回答