0

我一直在与一个给我内存错误的函数作斗争,感谢您的支持(Python:如何从函数中拆分和返回列表以避免内存错误)我设法解决了这个问题;但是,由于我不是专业程序员,因此我想征求您对我的方法以及如何提高其性能(如果可能的话)的意见。

该函数是一个生成器函数,从 n 节点有向图中返回所有循环。然而,对于 12 个节点的有向图,大约有 1.15 亿个循环(每个定义为节点列表,例如 [0,1,2,0] 是一个循环)。我需要所有可用于进一步处理的循环,即使在我第一次生成它们时提取了它们的一些属性,所以它们需要存储在某个地方。因此,想法是每 1000 万个周期切割一次结果数组以避免内存错误(当数组太大时,python 会耗尽 RAM)并创建一个新数组来存储以下结果。在 12 节点有向图中,我将有 12 个结果数组,11 个完整的数组(每个包含 1000 万个循环),最后一个包含 500 万个循环。

但是,拆分结果数组是不够的,因为变量保留在 RAM 中。所以,我仍然需要将每一个写入磁盘并在之后将其删除以清除 RAM。

如何创建可变数量的变量中所述?,使用 'exec' 创建变量变量名不是很“干净”,字典解决方案更好。但是,就我而言,如果我将结果存储在一个字典中,由于数组的大小,它将耗尽内存。因此,我选择了“执行”方式。如果您能就该决定发表评论,我将不胜感激。

此外,为了存储我使用 numpy.savez_compressed 的数组,它为每个 1000 万周期数组提供了一个 43 Mb 的文件。如果它没有被压缩,它会创建一个 500 Mb 的文件。但是,使用压缩版本会减慢编写过程。知道如何加快写入和/或压缩过程吗?

我写的代码的简化版本如下:

nbr_result_arrays=0
result_array_0=[]
result_lenght=10000000
tmp=result_array_0 # I use tmp to avoid using exec within the for loop (exec slows down code execution) 
for cycle in generator:
    tmp.append(cycle)    
    if len(tmp) == result_lenght:
        exec 'np.savez_compressed(\'results_' +str(nbr_result_arrays)+ '\', tmp)'
        exec 'del result_array_'+str(nbr_result_arrays)
        nbr_result_arrays+=1
        exec 'result_array_'+str(nbr_result_arrays)+'=[]'
        exec 'tmp=result_array_'+str(nbr_result_arrays)

谢谢阅读,

阿莱克斯

4

2 回答 2

1

使用itertools.islice怎么样?

import itertools
import numpy as np

for i in itertools.count():
    tmp = list(itertools.islice(generator, 10000000))
    if not tmp:
        break
    np.savez_compressed('results_{}'.format(i), tmp)
    del tmp
于 2013-06-17T15:47:46.217 回答
0

感谢大家的建议。

正如@Aya 所建议的那样,我认为为了提高性能(以及可能的空间问题),我应该避免将结果存储在 HD 上,因为存储它们比创建结果增加了一半的时间,因此再次加载和处理它会变得非常接近再次创建结果。此外,如果我不存储任何结果,我会节省空间,这对于更大的有向图可能会成为一个大问题(12 个节点的完整有向图有大约 1.15 亿个周期,但 29 个节点的有大约 848E27 个周期......并且以因子速率增加)。

这个想法是,我首先需要找到通过最弱弧的所有循环,以找到所有循环通过它的总概率。然后,有了这个总概率,我必须再次遍历所有这些循环,根据加权概率从原始数组中减去它们(我需要总概率才能计算加权概率:weighted_prob= prob_of_this_cycle/total_prob_through_this_edge)。

因此,我相信这是最好的方法(但我愿意接受更多讨论!:))。

但是,我对两个子功能的速度处理有疑问:

  • 第一个:查找一个序列是否包含一个特定的(较小的)序列。我正在使用依赖于生成器函数“window”的函数“contains_sequence”来做到这一点(如Is there a Python builtin for determine if an iterable是否包含某个序列中所建议的那样?但是我被告知使用双端队列会速度提高 33%。还有其他想法吗?

  • 第二:我目前正在通过滑动循环节点(由列表表示)来找到循环的循环概率,以找到每个弧的输出处保持在循环内的概率,然后将它们全部相乘以找到循环概率(函数名称是 find_cycle_probability)。任何有关此功能的性能建议都将不胜感激,因为我需要为每个周期运行它,即无数次。

任何其他提示/建议/评论都将受到欢迎!并再次感谢您的帮助。

阿莱克斯

下面是简化的代码:

def simple_cycles_generator_w_filters(working_array_digraph, arc):
    '''Generator function generating all cycles containing a specific arc.'''
    generator=new_cycles.simple_cycles_generator(working_array_digraph)
    for cycle in generator:
        if contains_sequence(cycle, arc):             
            yield cycle
    return

def find_smallest_arc_with_cycle(working_array,working_array_digraph):
    '''Find the smallest arc through which at least one cycle flows.
    Returns:
        - if such arc exist:
            smallest_arc_with_cycle = [a,b] where a is the start of arc and b the end
            smallest_arc_with_cycle_value = x where x is the weight of the arc
        - if such arc does not exist:
            smallest_arc_with_cycle = []
            smallest_arc_with_cycle_value = 0 '''
    smallest_arc_with_cycle = []
    smallest_arc_with_cycle_value = 0
    sparse_array = []
    for i in range(numpy.shape(working_array)[0]):
        for j in range(numpy.shape(working_array)[1]):
            if working_array[i][j] !=0:
                sparse_array.append([i,j,working_array[i][j]])
    sorted_array=sorted(sparse_array, key=lambda x: x[2])
    for i in range(len(sorted_array)):
        smallest_arc=[sorted_array[i][0],sorted_array[i][1]]
        generator=simple_cycles_generator_w_filters(working_array_digraph,smallest_arc)
        if any(generator):
            smallest_arc_with_cycle=smallest_arc
            smallest_arc_with_cycle_value=sorted_array[i][2]
            break

    return smallest_arc_with_cycle,smallest_arc_with_cycle_value

def window(seq, n=2):
    """Returns a sliding window (of width n) over data from the iterable
    s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ... """
    it = iter(seq)
    result = list(itertools.islice(it, n))
    if len(result) == n:
        yield result    
    for elem in it:
        result = result[1:] + [elem]
        yield result

def contains_sequence(all_values, seq):
    return any(seq == current_seq for current_seq in window(all_values, len(seq)))


def find_cycle_probability(cycle, working_array, total_outputs):
    '''Finds the cycle probability of a given cycle within a given array'''
    output_prob_of_each_arc=[]
    for i in range(len(cycle)-1):
        weight_of_the_arc=working_array[cycle[i]][cycle[i+1]]
        output_probability_of_the_arc=float(weight_of_the_arc)/float(total_outputs[cycle[i]])#NOTE:total_outputs is an array, thus the float
        output_prob_of_each_arc.append(output_probability_of_the_arc)
    circuit_probabilities_of_the_cycle=numpy.prod(output_prob_of_each_arc)    
    return circuit_probabilities_of_the_cycle 

def clean_negligible_values(working_array):
    ''' Cleans the array by rounding negligible values to 0 according to a 
    pre-defined threeshold.'''
    zero_threeshold=0.000001
    for i in range(numpy.shape(working_array)[0]):
        for j in range(numpy.shape(working_array)[1]):
            if working_array[i][j] == 0:
                continue
            elif 0 < working_array[i][j] < zero_threeshold:
                working_array[i][j] = 0
            elif -zero_threeshold <= working_array[i][j] < 0:
                working_array[i][j] = 0
            elif working_array[i][j] < -zero_threeshold:
                sys.exit('Error')    
    return working_array

original_array= 1000 * numpy.random.random_sample((5, 5))
total_outputs=numpy.sum(original_array,axis=0) + 100 * numpy.random.random_sample(5)

working_array=original_array.__copy__() 
straight_array= working_array.__copy__() 
cycle_array=numpy.zeros(numpy.shape(working_array))
iteration_counter=0
working_array_digraph=networkx.DiGraph(working_array)

[smallest_arc_with_cycle, smallest_arc_with_cycle_value]= find_smallest_arc_with_cycle(working_array, working_array_digraph) 

while smallest_arc_with_cycle: # using implicit true value of a non-empty list

    cycle_flows_to_be_subtracted = numpy.zeros(numpy.shape((working_array)))

    # FIRST run of the generator to calculate each cycle probability
    # note: the cycle generator ONLY provides all cycles going through 
    # the specified weakest arc    
    generator = simple_cycles_generator_w_filters(working_array_digraph, smallest_arc_with_cycle)
    nexus_total_probs = 0
    for cycle in generator:
        cycle_prob = find_cycle_probability(cycle, working_array, total_outputs)
        nexus_total_probs += cycle_prob

    # SECOND run of the generator
    # using the nexus_prob_sum calculated before, I can allocate the weight of the 
    # weakest arc to each cycle going through it
    generator = simple_cycles_generator_w_filters(working_array_digraph,smallest_arc_with_cycle)
    for cycle in generator:
        cycle_prob = find_cycle_probability(cycle, working_array, total_outputs)        
        allocated_cycle_weight = cycle_prob / nexus_total_probs * smallest_arc_with_cycle_value
        # create the array to be substracted
        for i in range(len(cycle)-1):
            cycle_flows_to_be_subtracted[cycle[i]][cycle[i+1]] += allocated_cycle_weight 

    working_array = working_array - cycle_flows_to_be_subtracted
    clean_negligible_values(working_array)    
    cycle_array = cycle_array + cycle_flows_to_be_subtracted   
    straight_array = straight_array - cycle_flows_to_be_subtracted
    clean_negligible_values(straight_array)
    # find the next weakest arc with cycles.
    working_array_digraph=networkx.DiGraph(working_array)
    [smallest_arc_with_cycle, smallest_arc_with_cycle_value] = find_smallest_arc_with_cycle(working_array,working_array_digraph)
于 2013-06-30T15:58:37.740 回答