1

我在进行多处理时正在努力修改我的数据框。这是我的代码的简化版本:

data_attrib.loc[:, 'Id'] = ['' for _ in range(len(data_attrib))]

def myfunction(i):
     data_attrib.at[i, 'Id'] = i
     print(data_attrib.at[i, 'Id'])

import multiprocessing
processes = []
for i in data_attrib.index:
    #launch multiprocessing
    pro = multiprocessing.Process(target = myfunction, args = [i])
    pro.start()
    processes.append(pro)
for process in processes:
    process.join()
    

如果没有 Multiprocessing,该功能可以完美运行。但是,通过多处理,它可以正确执行函数,但 data_attrib["Id"] 列仍然为空。

谁能告诉我为什么以及如何解决它。谢谢!

4

1 回答 1

1

你有几个问题。首先,当您进行多处理时,启动进程的代码必须位于由if __name__ == '__main__':测试控制的块内。否则,当启动子流程时,将重新执行流程启动代码,并且您将有子流程在无限递归循环中启动更多子流程。

其次,与线程不同,每个进程都在自己的内存空间中运行,因此它修改的任何全局变量都不会反映在其他进程中。例如,这里尝试通过将列表作为参数传递来纠正您的两个问题(尽管只是使用简单的列表而不是数据框来演示):

import multiprocessing


def myfunction(l, i):
    l[i] += 1


def main():
    l = [i for i in range(3)]
    processes = []
    for i in range(3):
        #launch multiprocessing
        pro = multiprocessing.Process(target = myfunction, args = (l, i))
        pro.start()
        processes.append(pro)
    for process in processes:
        process.join()
    print(l)

if __name__ == '__main__':
    main()

印刷:

[0, 1, 2]

原始列表没有被修改,因为传递给子流程的是列表的副本。必须修改程序以使子进程返回修改后的值,并让主进程本身对列表进行实际更新。但是从 using 中获取返回值Process并不是那么简单。要获取返回值,最好使用模块中的Poolmultiprocessing.pool或模块中的ProcessPoolExecutorconcurrent.futures

但是还有另一种方法可以在不传递返回值的情况下完成您需要做的事情,方法是允许多个进程通过代理对对象的单个副本进行操作。这是由模块中的Manager类提供的multiprocessing

import multiprocessing


def myfunction(l, i):
    l[i] += 1


def main():
    with multiprocessing.Manager() as manager:
        l = manager.list()
        for i in range(3):
            l.append(i)
        processes = []
        for i in range(3):
            #launch multiprocessing
            pro = multiprocessing.Process(target = myfunction, args = (l, i))
            pro.start()
            processes.append(pro)
        for process in processes:
            process.join()
        print(l)

if __name__ == '__main__':
    main()

印刷:

[1, 2, 3]

你显然需要阅读这Manager门课,看看它如何适应你的具体问题。该Manager课程为您提供选择,但不是无限的。如果您想继续使用 Dataframe,您可能需要发挥创造力。

以下代码不使用Manager该类,并且每个子流程都在处理自己的输入数据副本。相反,每个进程将其修改后的值返回给重新组合最终结果的主进程:

from concurrent.futures import ProcessPoolExecutor

l = [i for i in range(3)]

def myfunction(i):
    return l[i] + 1


def main():
    with ProcessPoolExecutor(max_workers=3) as executor:
        results = executor.map(myfunction, range(3))
        for i, result in enumerate(results):
            l[i] = result
        print(l)

if __name__ == '__main__':
    main()
于 2020-07-30T11:16:40.927 回答