你有几个问题。首先,当您进行多处理时,启动进程的代码必须位于由if __name__ == '__main__':
测试控制的块内。否则,当启动子流程时,将重新执行流程启动代码,并且您将有子流程在无限递归循环中启动更多子流程。
其次,与线程不同,每个进程都在自己的内存空间中运行,因此它修改的任何全局变量都不会反映在其他进程中。例如,这里尝试通过将列表作为参数传递来纠正您的两个问题(尽管只是使用简单的列表而不是数据框来演示):
import multiprocessing
def myfunction(l, i):
l[i] += 1
def main():
l = [i for i in range(3)]
processes = []
for i in range(3):
#launch multiprocessing
pro = multiprocessing.Process(target = myfunction, args = (l, i))
pro.start()
processes.append(pro)
for process in processes:
process.join()
print(l)
if __name__ == '__main__':
main()
印刷:
[0, 1, 2]
原始列表没有被修改,因为传递给子流程的是列表的副本。必须修改程序以使子进程返回修改后的值,并让主进程本身对列表进行实际更新。但是从 using 中获取返回值Process
并不是那么简单。要获取返回值,最好使用模块中的Pool
类multiprocessing.pool
或模块中的ProcessPoolExecutor
类concurrent.futures
。
但是还有另一种方法可以在不传递返回值的情况下完成您需要做的事情,方法是允许多个进程通过代理对对象的单个副本进行操作。这是由模块中的Manager
类提供的multiprocessing
。
import multiprocessing
def myfunction(l, i):
l[i] += 1
def main():
with multiprocessing.Manager() as manager:
l = manager.list()
for i in range(3):
l.append(i)
processes = []
for i in range(3):
#launch multiprocessing
pro = multiprocessing.Process(target = myfunction, args = (l, i))
pro.start()
processes.append(pro)
for process in processes:
process.join()
print(l)
if __name__ == '__main__':
main()
印刷:
[1, 2, 3]
你显然需要阅读这Manager
门课,看看它如何适应你的具体问题。该Manager
课程为您提供选择,但不是无限的。如果您想继续使用 Dataframe,您可能需要发挥创造力。
以下代码不使用Manager
该类,并且每个子流程都在处理自己的输入数据副本。相反,每个进程将其修改后的值返回给重新组合最终结果的主进程:
from concurrent.futures import ProcessPoolExecutor
l = [i for i in range(3)]
def myfunction(i):
return l[i] + 1
def main():
with ProcessPoolExecutor(max_workers=3) as executor:
results = executor.map(myfunction, range(3))
for i, result in enumerate(results):
l[i] = result
print(l)
if __name__ == '__main__':
main()