1

背景:我试图通过在一段时间内检查桥梁的状态(损坏或未损坏)来模拟损坏桥梁的长期修复。为此,我编写了一个名为的 Python 函数repair_over_horizon,该函数获取(除其他外)损坏的桥梁列表并输出列表列表,每个列表都包含在特定时间步长仍损坏的桥梁。例如,如果输入列表是:

[100、423、667、904、221、56、495、70、109、65]

我使用repair_over_horizon了 3 个时间步长,我希望输出类似于:

[[100, 423, 667, 904, 221, 56, 495, 70, 109, 65], [100, 423, 904, 56, 70, 109, 65], [423, 904]]。

该函数repair_over_horizon实际上一次接受多个输入列表或“映射”,并并行计算它们的修复。为每个时间步实现并行化,如下面的代码所示。我还包括了内部调用的函数repair_over_horizon——即compute_repairs_at_timesteprepair——以供参考。

问题:虽然函数在输出方面按预期工作,但在每次迭代(即时间步长)时,修复需要的时间要长得多。当每个时间步长的映射数量增加时,时间的增加就变得很明显了——例如,在我每个时间步长有 50 个映射和 10 个时间步长的情况下,第一次迭代需要大约 6 秒,最后一次迭代需要大约 21 秒。当我每个时间步有 500 个地图和 5 个时间步时,第一次迭代需要约 36 秒,第五次迭代需要约 110 秒。

故障排除:我已经分析了代码(请参阅每个时间步长 500 个映射和下面 5 个时间步长的结果)并检查内存使用情况,但是从我在实现中出错的结果来看,这对我来说并不明显。

问题:随着地图数量的增加,迭代之间的大幅减速是什么原因造成的?任何关于如何解决它的想法将不胜感激。

更新:为了测试是否存在内存问题,我在函数末尾的各个点插入了 gc.collect()。但是,我仍然发现每次迭代的修复时间都更长,如上面的问题部分所述。

部分分析结果:

Ordered by: cumulative time
ncalls  tottime  percall  cumtime  percall filename:lineno(function)
 2500    1.102    0.000  289.855    0.116 /Library/Python/2.7/site-packages/pp.py:380(submit)
 2505  285.901    0.114  285.901    0.114 {cPickle.dumps}
 2500    0.020    0.000  127.742    0.051 /Library/Python/2.7/site-packages/pp.py:96(__call__)
 2500    0.040    0.000  127.719    0.051 /Library/Python/2.7/site-packages/pp.py:116(__unpickle)
 2500  127.675    0.051  127.675    0.051 {cPickle.loads}
    5    0.001    0.000    2.282    0.456 /Library/Python/2.7/site-packages/pp.py:280(__init__)
    5    0.001    0.000    2.279    0.456 /Library/Python/2.7/site-packages/pp.py:491(set_ncpus)
   20    0.001    0.000    2.278    0.114 /Library/Python/2.7/site-packages/pp.py:134(__init__)
   20    0.006    0.000    2.276    0.114 /Library/Python/2.7/site-packages/pp.py:140(start)
   20    0.001    0.000    1.938    0.097 /Library/Python/2.7/site-packages/pptransport.py:133(receive)
   40    1.935    0.048    1.935    0.048 {method 'read' of 'file' objects}
 2500    0.174    0.000    1.666    0.001 /Library/Python/2.7/site-packages/pp.py:656(__scheduler)
16084    1.364    0.000    1.364    0.000 {method 'acquire' of 'thread.lock' objects}
 1261    0.034    0.000    0.910    0.001 /Library/Python/2.7/site-packages/ppcommon.py:38(start_thread)
 1261    0.063    0.000    0.580    0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:726(start)
 1261    0.057    0.000    0.370    0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:603(wait)
   20    0.007    0.000    0.330    0.017 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/subprocess.py:650(__init__)
   20    0.014    0.001    0.316    0.016 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/subprocess.py:1195(_execute_child)
 1261    0.058    0.000    0.288    0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:656(__init__)
 2500    0.107    0.000    0.244    0.000 /Library/Python/2.7/site-packages/pp.py:73(__init__)
60344    0.232    0.000    0.232    0.000 {isinstance}
  889    0.045    0.000    0.224    0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:309(wait)
   20    0.201    0.010    0.201    0.010 {posix.fork}
 2522    0.032    0.000    0.151    0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:242(Condition)
 1261    0.145    0.000    0.145    0.000 {thread.start_new_thread}
 1261    0.011    0.000    0.142    0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:542(Event)
 1261    0.025    0.000    0.131    0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:561(__init__)
 2522    0.113    0.000    0.119    0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:260(__init__)

repair_over_horizon

def repair_over_horizon(n_gm_maps, n_damage_maps, 
bridge_array_internal_assembled, bridge_array_new_assembled, horizon = 
365, n_steps = 10, trajectories = 10, plot = False, quiet = False):

from itertools import product

t_per_timestep = []

sim_times = get_times(horizon,n_steps)

thresholds = []
for i in range(0,n_steps):
    thresholds.append(get_repair_threshold(sim_times[i])) 

ppservers = ()  # starting parallelization

for k in range(0, n_steps):
    start = time.time()
    jobs = []
    job_server = pp.Server(ppservers=ppservers)
    print "Starting pp for recovery modelling at k=", k, "with", job_server.get_ncpus(), "workers"
    for m, n, l in product(xrange(n_gm_maps), xrange(n_damage_maps), xrange(trajectories)):
        jobs.append(job_server.submit(compute_repairs_at_timestep, (m, n, k, l, bridge_array_internal_assembled,
        bridge_array_new_assembled, thresholds[k], n_steps), depfuncs = (repair,), modules=('numpy', 'random', 'copy',), group = k))
    # get the results of timestep k, since the next timestep depends on those
    for job in jobs:
        temp_dict = job()
        if temp_dict is not None:
            if k < n_steps-1:
                temp_inds = temp_dict['indices']
                mstar = temp_inds[0]
                nstar = temp_inds[1]
                kstar = temp_inds[2] + 1
                lstar = temp_inds[3]

                bridge_array_internal_assembled[mstar][nstar][kstar][lstar][:] = temp_dict['internal'][mstar][nstar][kstar][lstar]
                bridge_array_new_assembled[mstar][nstar][kstar][lstar][:] = temp_dict['new'][mstar][nstar][kstar][lstar]

            else:
                pass

    t_per_timestep.append(time.time()-start)
    print "Done with timestep k = ", k, " in ", time.time() - start, " seconds"

# close server
job_server.destroy()

if quiet == False:
    #print bridge_array_internal_assembled_rec
    print 'Great. You have modeled stochastic recovery of bridges and created time-dependent damage maps.'
if plot == True:
    pass

# returns the two lists of lists as a single dictionary with two keys (internal and new)
return {'internal':bridge_array_internal_assembled, 'new':bridge_array_new_assembled, 'timesteps':t_per_timestep}

compute_repairs_at_timestep

def compute_repairs_at_timestep(m, n, k, l, 
bridge_array_internal_assembled, bridge_array_new_assembled, threshold, 
n_steps, limit = None):
    # keep track of ground motion map, associated damage map, timestep, 
    and recovery trajectory
    inds = [m,n,k,l]
    # repair bridges stochastically at a single timestep
    temp_dict = repair(bridge_array_internal_assembled[m][n][k][l], 
    bridge_array_new_assembled[m][n][k][l], threshold, limit)
    # propagate repair to next timestep until timesteps are done
    if k < n_steps-1:
        bridge_array_internal_assembled[m][n][k + 1][l][:] = 
        temp_dict['internal']
        bridge_array_new_assembled[m][n][k + 1][l][:] = temp_dict['new']
        return {'indices': inds, 'internal': 
        bridge_array_internal_assembled, 'new': 
        bridge_array_new_assembled}
    else:
        return {'indices': inds, 'internal': 
        bridge_array_internal_assembled, 'new': 
        bridge_array_new_assembled}

repair

        def repair(bridge_array_internal, bridge_array_new, ext_threshold, limit=None):

        import numpy as np

        d = len(bridge_array_internal)
        if d > 0:
            temp_internal = copy.deepcopy(bridge_array_internal)
            temp_new = copy.deepcopy(bridge_array_new)
            # repair bridges stochastically per Shinozuka et al 2003 model
            temp = np.random.rand(1, d)
            temp = temp[0]
            temp = np.ndarray.tolist(temp)

            rep_inds_internal = []  # track the internal ID numbers of bridges that are repaired
            rep_inds_new = []  # track the new ID numbers of bridges that are repaired
            if limit is None:
                for i in range(0, d):
                    if temp[i] < ext_threshold:
                        rep_inds_internal.append(temp_internal[i])
                        rep_inds_new.append(temp_new[i])

                if len(rep_inds_internal) > 0:  # if bridges have been repaired in the previous loop, we need to remove them from the list of damaged bridges
                    for i in range(0, len(rep_inds_internal)):
                        try:
                            temp_internal.remove(rep_inds_internal[i])
                            temp_new.remove(rep_inds_new[i])
                        except ValueError:
                            pass
            else:  # if limit is a number
                rep_count = 0
                for i in range(0, d):
                    if (temp[i] < ext_threshold) & (rep_count < limit):
                        rep_inds_internal.append(temp_internal[i])
                        rep_inds_new.append(temp_new[i])
                        rep_count += 1
                    elif rep_count == limit:  # if we've repaired as many bridges as we are allowed to, stop repairing bridges
                        break
                    else:
                        pass

                if len(rep_inds_internal) > 0:  # if bridges have been repaired in the previous loop, we need to remove them from the list of damaged bridges
                    for i in range(0, len(rep_inds_internal)):
                        try:
                            temp_internal.remove(rep_inds_internal[i])
                            temp_new.remove(rep_inds_new[i])
                        except ValueError:
                            pass

            return {'internal': temp_internal,'new': temp_new}  # these damaged bridge arrays have been updated to not include repaired bridges

        elif d == 0:
            # all bridges have already been repaired
            return {'internal': [], 'new': []}
4

0 回答 0