0

我正在使用带有传递块大小的多处理 pool.imap() 处理字符串列表。我的列表长度为 1821,imap 中的进程为 4。我试图为每个进程提供几乎相等数量的块大小,因此将块大小设置为 455。也尝试使用 500。但这使我的 imap 跳过了一些记录。跳过也不是那么随机,因为它是有序列表。一旦我将块大小更改为 200,imap 就开始将所有记录发送到我的目标函数。有人可以解释为什么 chunksize > 450 在这里引起问题,而根据文档,理想情况下,它应该在每个进程中划分为 1821/4 = 455 或 456 rec。另请注意,在我的函数中,我正在使用该字符串并运行一些步骤,每个步骤需要几秒钟。

  def process_init(self,l):
        global process_lock
        process_lock = l

  def _run_multiprocess(self,num_process,input_list,target_func,chunk):   
        l = mp.Lock()
        with mp.Pool(processes=(num_process),initializer=self.process_init, initargs=(l,)) as p:
            start = time.time()
            async_result  = p.imap(target_func, input_list,chunksize =chunk)

            p.close()
            p.join()
            print("Completed the multiprocess")
            end = time.time()
            print('total time (s)= ' + str(end-start))

chunksize = 500
self._run_multiprocess(4,iterator_source,self._process_data,chunksize)

def _process_data(self,well_name):       

        with open("MultiProcessMethod_RecordReceived.csv","a") as openfile:
            process_lock.acquire()
            openfile.write( "\t" +well_name.upper() + "\n")
            openfile.flush()
            process_lock.release()
4

0 回答 0