我正在分割大型 ctype 数组并并行处理它们。我收到下面的错误并相信它,因为数组的一部分正在完成另一部分的处理。我尝试使用 process.join() 让第一组进程等待,但这不起作用。想法?
Exception RuntimeError: RuntimeError('cannot join current thread',) in <Finalize object, dead> ignored
使用:
....
with closing(multiprocessing.Pool(initializer=init(array))) as p:
del array #Since the array is now stored in a shared array destroy the array ref for memory reasons
step = y // cores
if step != 0:
jobs =[]
for i in range (0, y, step):
process = p.Process(target=stretch, args= (shared_arr,slice(i, i+step)),kwargs=options)
jobs.append(process)
process.start()
for j in jobs:
j.join()
del jobs
del process
更新:
#Create an ctypes array
array = ArrayConvert.SharedMemArray(array)
#Create a global of options
init_options(options) #options is a dict
with closing(multiprocessing.Pool(initializer=init(array))) as p:
del array #Since the array is not stored in a shared array destroy the array ref for memory reasons
step = y // cores
if step != 0:
for i in range (0, y, step):
#Package all the options into a global dictionary
p.map_async(stretch,[slice(i, i+step)])
#p.apply_async(stretch,args=(shared_arr,slice(i, i+step)),kwargs=options)
p.join()
def init_options(options_):
global kwoptions
kwoptions = options_
我传递给 map_async 的函数存储在不同的模块中,因此我很难将全局 kwoptions 传递给该函数。在这样的模块之间传递全局变量似乎不正确(unpythonic)。这是能够通过 map_async 传递 kwargs 的方式吗?
我应该使用不同的东西(应用或处理)重新处理多处理吗?