0

我正在尝试进行并行计算以加快 for 循环(我已经在使用 itertools,我需要更快的速度,因为我多次执行 for 循环)。我是多处理的新手。我已经检查了几个关于堆栈溢出的问题,我试图解决我的问题,但是我仍然遇到了一些困难。我正在创建共享变量(self.A、self.B、self.C),以便有效地完成多处理。但是,我认为我做错了,因为当我在计算后检查我的变量时,我发现它们没有改变。我的代码有点复杂,所以下面的代码是演示我的问题的示例代码。谢谢你的帮助!

import numpy as np
from multiprocessing import Process, Array, Pool
from ctypes import c_double
import itertools

class F():
     def __init__(self, num_process=4):
        self.num_process = num_process
        self.idx = list(itertools.product(range(5), range(10)))
        self.A = np.zeros((5, 10)) 
        if self.num_process > 1:
            self.A = np.frombuffer(Array(c_double, self.A.flat, lock=False))
            self.A.resize(5,10)

     def solve(self):       
         self.B = np.zeros((10, 5, 10)) 
         self.C = np.zeros((10, 5, 10)) 
         if self.num_process > 1:
            self.B = np.frombuffer(Array(c_double, self.B.flat, lock=False))
            self.B.resize(10,5,10)
            self.C = np.frombuffer(Array(c_double, self.C.flat, lock=False))
            self.C.resize(10,5,10)
         print('Before=',self.A,self.B,self.C)
         for i in range(10):   
             if self.num_process == 1:
                 for (k,l) in self.idx:    
                     self.B[i,k,l]=1
                     self.C[i,k,l]=1

             else:
                 workers = []
                 for worker_num in range(self.num_process):
                     worker = Process(target=F.update, 
                                         args=(i, worker_num, self.num_process,
                                               self.idx, self.A, self.B, self.C))
                     workers.append(worker)
                     worker.start()
                 for worker in workers:
                     worker.join() 
         print('After=',self.A,self.B,self.C)

     @staticmethod 
     def update( i, worker_num, num_process, idx, A, B, C):
        start_num = int(len(idx) * (worker_num/num_process))
        end_num = int(len(idx) * ((worker_num+1)/num_process))
        for j in range(start_num, end_num):
            k,l = idx[j]
            B[i,k,l]=min(2,A[k,l])
            C[i,k,l]=2

if __name__ == '__main__':

    var=F()
    var.solve()

当我在计算后打印我的变量时,我发现它们没有改变。

更新
我能够更正我的代码并使用以下代码进行多处理。正如 Ricky Kim 所指出的,我的错误是我没有创建共享变量。下面的代码实现了这一点,但它仍然比使用 1 个进程慢得多(当然要执行相同的操作)。关于如何使多处理更快、更有效的任何想法。谢谢!

import numpy as np
import multiprocessing as mp
from multiprocessing import Process, Array, Pool
from ctypes import c_double
import itertools

class F():
     def __init__(self, num_process=4):
        self.num_process = num_process
        self.idx = list(itertools.product(range(5), range(10)))
        self.A = np.zeros((5, 10)) 

     def solve(self):       
         B_shared = Array(c_double, 10*5*10)
         C_shared = Array(c_double, 10*5*10)
         self.B = np.frombuffer(B_shared.get_obj())
         self.B = self.B.reshape(10,5,10)
         self.C = np.frombuffer(C_shared.get_obj())
         self.C = self.C.reshape(10,5,10)
         print('Before=',self.A,self.B,self.C)
         for i in range(10):   
             if self.num_process == 1:
                 # perform some expensive operation
                 for (k,l) in self.idx:    
                     self.B[i,k,l]=1
                     self.C[i,k,l]=1

             else:
                 workers = []
                 for worker_num in range(self.num_process):
                     worker = Process(target=self.update, 
                                         args=(i, worker_num, self.num_process, B_shared, C_shared))
                     workers.append(worker)
                     worker.start()
                 for worker in workers:
                     worker.join() 
         print('After=',self.A,self.B,self.C)


     def update(self, i, worker_num, num_process, B_shared, C_shared):
        B = np.frombuffer(B_shared.get_obj())
        B = B.reshape((10,5,10)) 
        C = np.frombuffer(B_shared.get_obj())
        C = C.reshape((10,5,10)) 
        start_num = int(len(self.idx) * (worker_num/num_process))
        end_num = int(len(self.idx) * ((worker_num+1)/num_process))
        for j in range(start_num, end_num):
            # perform some expensive operation
            k,l = self.idx[j]
            B[i,k,l]=min(2,self.A[k,l])
            C[i,k,l]=2

if __name__ == '__main__':
    mp.freeze_support()
    var=F()
    var.solve()
4

0 回答 0