我有一系列 8640 x 400 的时间课程。
编辑:第 0 个暗点是位置,第一个暗点是该位置的时间课程。
from multiprocessing import Pool
import numpy as np
from matplotlib.mlab import cohere
from itertools import product
from scipy.signal import detrend
# this is a module of classes that I wrote myself
from MyTools import SignalProcessingTools as sig
def compute_coherence(args):
rowA = roi[args[0], :]
rowB = roi[args[1], :]
coh, _ = cohere(rowA, rowB, NFFT=64, Fs=sample_rate, noverlap=32, sides='onesided')
#TODO: use the freq return and only average the freq in particular range...
return np.sqrt(coh.mean())
### start here ###
# I detrend the data for linear features
roi = detrend(data=roi, axis=1, type='linear')
# and normalize it to std. Very simple method, uses x.std() in a loop
roi = sig.normalize_std(roi)
roi = np.random.rand(8640, 386)# in reality this is a load from disk
length = roi.shape[0]
indices = np.arange(length)
# this gives me all combinations of indices i and j
# since I want the cross spectral coherence of the array
args = product(indices, indices) # note, args is an interator obj
pool = Pool(processes=20)
coh = pool.map(compute_coherence, args)
该程序使用超过 20 GB,我没有看到明显的内存泄漏。有很多关于这个话题的谷歌返回,但我真的不明白如何追踪这个。
编辑:大错误...... roi 数组不是 8640x8640x400 它只是 8640 x 400 抱歉......:| 漫长的一天
[更新] 因此,在修改代码并尝试注释掉部分之后,我相信我已经将内存问题缩小到 cohere() 方法。运行代码并仅返回零数组可以正常工作。
from os import path, getenv
import numpy as np
from matplotlib import mlab
import scipy.signal as sig
from multiprocessing import Pool
from itertools import product
import Tools
from scipy.signal import detrend
from pympler import tracker
tr = tracker.SummaryTracker()
import gc
def call_back():
def call_compute(arg):
start, stop = arg
ind_pairs = indice_combos[start:stop]
coh = np.zeros(len(ind_pairs), dtype=float)
for i, ind in enumerate(ind_pairs):
row1 = ind[0]
row2 = ind[1]
mag, _ = mlab.cohere(roi[row1,:], roi[row2,:], NFFT=128, Fs=sample_rate, noverlap=64, sides='onesided')
coh[i] = np.sqrt(mag.mean())
return coh
### start Here ###
imagetools = Tools.ImageTools()
sigtools = Tools.SignalProcess()
sample_rate = 1 / 1.65
mask_obj = imagetools.load_image(path.join(HOME, 'python_conn/Rat/Inputs/rat_gm_rs.nii.gz'))
mask_data = mask_obj.get_data()
rs_obj = imagetools.load_image(path.join(HOME, 'python_conn/Rat/Inputs/rs_4D.nii.gz'))
rs_data = rs_obj.get_data()
# logical index
ind = mask_data > 0
roi = rs_data[ind, :]
# normalize with STD
roi = sigtools.normalize_nd(roi)
# detrend linear
roi = detrend(data=roi, axis=1, type='linear')
# filter
roi = sigtools.butter_bandpass(lowcut=0.002, highcut=0.1, sample_rate=sample_rate, data=roi, order=5)
# drop frames for steady state and filter noise
roi = roi[:, 16:]
### testing ####
roi = roi[0:5000,:]
length = roi.shape[0]
# setup up row and col vector of indices
indices = np.arange(length)
temp = product(indices, indices)# all possible combinations iterator
indice_combos = [ i for i in temp ] # make iterator into a list
num_cores = 10
chunk_size = len(indice_combos) / num_cores # divdide the combo list for each core
grps = np.arange(0, len(indice_combos)+chunk_size, chunk_size)
#make the final list of args, where each item is a pair of stop and stop
args = [ [grps[i], grps[i+1]-1] for i in range(0, len(grps)-1)]
args[-1][1] = args[-1][1] + 1
# deallocate some memory
grps = None
# Multi core
pool = Pool(num_cores)
coh = np.hstack(pool.map(call_compute, args, call_back()))
coh = coh.ravel()
out_path = path.join(HOME, 'python_conn/Rat/coh.npy')
np.save(out_path, coh)
map = np.zeros_like(mask_data)
map[ind] = coh.sum(0)
out_path = path.join(HOME, 'python_conn/Rat/coherence_map.nii.gz')
imagetools.save_new_image(map, out_path, rs_obj.coordmap)
这不是 cohere 的错......我的错......我希望开发人员没有看到这个......:| 我更改了很多代码。所以我担心这个线程不再有效。
发送多于一对 i,j 来处理的进程