2

我有一系列 8640 x 400 的时间课程。
编辑:第 0 个暗点是位置,第一个暗点是该位置的时间课程。

我需要计算每个点的交叉光谱相干性,这些都可以独立完成。

因此我开始尝试使用多处理模块:

from multiprocessing import Pool
import numpy as np    
from matplotlib.mlab import cohere
from itertools import product    
from scipy.signal import detrend

# this is a module of classes that I wrote myself
from MyTools import SignalProcessingTools as sig

def compute_coherence(args):
    rowA = roi[args[0], :]
    rowB = roi[args[1], :]
    coh, _ =  cohere(rowA, rowB, NFFT=64, Fs=sample_rate, noverlap=32, sides='onesided')
    #TODO: use the freq return and only average the freq in particular range...

    return  np.sqrt(coh.mean())

### start here ###
# I detrend the data for linear features
roi = detrend(data=roi, axis=1, type='linear')
# and normalize it to std. Very simple method, uses x.std() in a loop 
roi = sig.normalize_std(roi)

roi = np.random.rand(8640, 386)# in reality this is a load from disk
length = roi.shape[0]
indices = np.arange(length)
# this gives me all combinations of indices i and j
# since I want the cross spectral coherence of the array
args = product(indices, indices) # note, args is an interator obj

pool = Pool(processes=20)
coh = pool.map(compute_coherence,  args)

该程序使用超过 20 GB,我没有看到明显的内存泄漏。有很多关于这个话题的谷歌返回,但我真的不明白如何追踪这个。

编辑:大错误...... roi 数组不是 8640x8640x400 它只是 8640 x 400 抱歉......:| 漫长的一天

也许我错过了一个错误......?

提前感谢您的想法...

[更新] 因此,在修改代码并尝试注释掉部分之后,我相信我已经将内存问题缩小到 cohere() 方法。运行代码并仅返回零数组可以正常工作。

这是一个更新的版本:

from os import path, getenv
import numpy as np
from matplotlib import mlab
import scipy.signal as sig
from multiprocessing import Pool
from itertools import product
import Tools
from scipy.signal import detrend

from pympler import tracker
tr = tracker.SummaryTracker()
import gc


def call_back():
    gc.collect()

def call_compute(arg):
    start, stop = arg
    ind_pairs = indice_combos[start:stop]
    coh = np.zeros(len(ind_pairs), dtype=float)

    #tr.print_diff()

    for i, ind in enumerate(ind_pairs):
        row1 = ind[0]
        row2 = ind[1]

        mag, _ = mlab.cohere(roi[row1,:], roi[row2,:], NFFT=128, Fs=sample_rate, noverlap=64, sides='onesided')

        coh[i] = np.sqrt(mag.mean()) 
        #tr.print_diff()

    #tr.print_diff()    
    return coh

### start Here ###
imagetools = Tools.ImageTools()
sigtools = Tools.SignalProcess()

HOME = Tools.HOME
sample_rate = 1 / 1.65

mask_obj = imagetools.load_image(path.join(HOME, 'python_conn/Rat/Inputs/rat_gm_rs.nii.gz'))
mask_data = mask_obj.get_data()

rs_obj = imagetools.load_image(path.join(HOME, 'python_conn/Rat/Inputs/rs_4D.nii.gz'))
rs_data = rs_obj.get_data()

# logical index
ind = mask_data > 0 
roi = rs_data[ind, :]

# normalize with STD
roi = sigtools.normalize_nd(roi)
# detrend linear
roi = detrend(data=roi, axis=1, type='linear')
# filter
roi = sigtools.butter_bandpass(lowcut=0.002, highcut=0.1, sample_rate=sample_rate, data=roi, order=5)

# drop frames for steady state and filter noise
roi = roi[:, 16:] 

################
### testing ####
roi = roi[0:5000,:]
################
################

length = roi.shape[0]    

# setup up row and col vector of indices
indices = np.arange(length)
temp = product(indices, indices)# all possible combinations iterator
indice_combos = [ i for i in temp ] # make iterator into a list

num_cores = 10
chunk_size = len(indice_combos) / num_cores  # divdide the combo list for each core
grps = np.arange(0, len(indice_combos)+chunk_size, chunk_size)

#make the final list of args, where each item is a pair of stop and stop
args = [ [grps[i], grps[i+1]-1]  for i in range(0, len(grps)-1)]
args[-1][1] = args[-1][1] + 1

# deallocate some memory
grps = None

# Multi core
pool = Pool(num_cores)
coh = np.hstack(pool.map(call_compute, args, call_back()))

coh = coh.ravel()

out_path = path.join(HOME, 'python_conn/Rat/coh.npy')
np.save(out_path, coh)

map = np.zeros_like(mask_data)
map[ind] = coh.sum(0)

out_path = path.join(HOME, 'python_conn/Rat/coherence_map.nii.gz')
imagetools.save_new_image(map, out_path, rs_obj.coordmap)

[更新]

这不是 cohere 的错......我的错......我希望开发人员没有看到这个......:| 我更改了很多代码。所以我担心这个线程不再有效。

有什么帮助:

只使用迭代器

发送多于一对 i,j 来处理的进程

有很多开销,但内存实际上并没有增加那么多。我觉得我有点滥用了……但是当你学习新东西时,这里总是很难准确地说……我很惊讶还没有人讨厌我。明天我会发布我自己的解决方案。

4

1 回答 1

0

数字不加起来。正如@sashkello 已经指出的那样,您有一个包含 29,859,840,000 个元素的数组。即使它们每个只占用 1 个字节,您也将使用 30 GB,而不是 20 GB。那么您没有透露的是什么?;-)

后来:现在 8640 * 400 = 3,456,000 - 那么“7500 万个元素”从何而来?7500 万将接近 8640 * 8640。

无论如何,有两件事需要调查:

  1. 如果你不调用多处理机制,只在主程序中做一个块,它会消耗多少内存?

  2. 叉积 ( ) 有多大args?由于您的阵列有多大似乎仍然存在混淆,因此无法从这里猜测。

还有一件事我们需要知道:池中有多少进程?普通Pool()意味着“使用所有可用的”处理器,但无法猜测你有多少。当然,每个人的内存使用都会增加。

于 2013-11-15T01:56:58.877 回答