背景:我试图通过在一段时间内检查桥梁的状态(损坏或未损坏)来模拟损坏桥梁的长期修复。为此,我编写了一个名为的 Python 函数repair_over_horizon
,该函数获取(除其他外)损坏的桥梁列表并输出列表列表,每个列表都包含在特定时间步长仍损坏的桥梁。例如,如果输入列表是:
[100、423、667、904、221、56、495、70、109、65]
我使用repair_over_horizon
了 3 个时间步长,我希望输出类似于:
[[100, 423, 667, 904, 221, 56, 495, 70, 109, 65], [100, 423, 904, 56, 70, 109, 65], [423, 904]]。
该函数repair_over_horizon
实际上一次接受多个输入列表或“映射”,并并行计算它们的修复。为每个时间步实现并行化,如下面的代码所示。我还包括了内部调用的函数repair_over_horizon
——即compute_repairs_at_timestep
和repair
——以供参考。
问题:虽然函数在输出方面按预期工作,但在每次迭代(即时间步长)时,修复需要的时间要长得多。当每个时间步长的映射数量增加时,时间的增加就变得很明显了——例如,在我每个时间步长有 50 个映射和 10 个时间步长的情况下,第一次迭代需要大约 6 秒,最后一次迭代需要大约 21 秒。当我每个时间步有 500 个地图和 5 个时间步时,第一次迭代需要约 36 秒,第五次迭代需要约 110 秒。
故障排除:我已经分析了代码(请参阅每个时间步长 500 个映射和下面 5 个时间步长的结果)并检查内存使用情况,但是从我在实现中出错的结果来看,这对我来说并不明显。
问题:随着地图数量的增加,迭代之间的大幅减速是什么原因造成的?任何关于如何解决它的想法将不胜感激。
更新:为了测试是否存在内存问题,我在函数末尾的各个点插入了 gc.collect()。但是,我仍然发现每次迭代的修复时间都更长,如上面的问题部分所述。
部分分析结果:
Ordered by: cumulative time
ncalls tottime percall cumtime percall filename:lineno(function)
2500 1.102 0.000 289.855 0.116 /Library/Python/2.7/site-packages/pp.py:380(submit)
2505 285.901 0.114 285.901 0.114 {cPickle.dumps}
2500 0.020 0.000 127.742 0.051 /Library/Python/2.7/site-packages/pp.py:96(__call__)
2500 0.040 0.000 127.719 0.051 /Library/Python/2.7/site-packages/pp.py:116(__unpickle)
2500 127.675 0.051 127.675 0.051 {cPickle.loads}
5 0.001 0.000 2.282 0.456 /Library/Python/2.7/site-packages/pp.py:280(__init__)
5 0.001 0.000 2.279 0.456 /Library/Python/2.7/site-packages/pp.py:491(set_ncpus)
20 0.001 0.000 2.278 0.114 /Library/Python/2.7/site-packages/pp.py:134(__init__)
20 0.006 0.000 2.276 0.114 /Library/Python/2.7/site-packages/pp.py:140(start)
20 0.001 0.000 1.938 0.097 /Library/Python/2.7/site-packages/pptransport.py:133(receive)
40 1.935 0.048 1.935 0.048 {method 'read' of 'file' objects}
2500 0.174 0.000 1.666 0.001 /Library/Python/2.7/site-packages/pp.py:656(__scheduler)
16084 1.364 0.000 1.364 0.000 {method 'acquire' of 'thread.lock' objects}
1261 0.034 0.000 0.910 0.001 /Library/Python/2.7/site-packages/ppcommon.py:38(start_thread)
1261 0.063 0.000 0.580 0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:726(start)
1261 0.057 0.000 0.370 0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:603(wait)
20 0.007 0.000 0.330 0.017 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/subprocess.py:650(__init__)
20 0.014 0.001 0.316 0.016 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/subprocess.py:1195(_execute_child)
1261 0.058 0.000 0.288 0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:656(__init__)
2500 0.107 0.000 0.244 0.000 /Library/Python/2.7/site-packages/pp.py:73(__init__)
60344 0.232 0.000 0.232 0.000 {isinstance}
889 0.045 0.000 0.224 0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:309(wait)
20 0.201 0.010 0.201 0.010 {posix.fork}
2522 0.032 0.000 0.151 0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:242(Condition)
1261 0.145 0.000 0.145 0.000 {thread.start_new_thread}
1261 0.011 0.000 0.142 0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:542(Event)
1261 0.025 0.000 0.131 0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:561(__init__)
2522 0.113 0.000 0.119 0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:260(__init__)
repair_over_horizon
def repair_over_horizon(n_gm_maps, n_damage_maps,
bridge_array_internal_assembled, bridge_array_new_assembled, horizon =
365, n_steps = 10, trajectories = 10, plot = False, quiet = False):
from itertools import product
t_per_timestep = []
sim_times = get_times(horizon,n_steps)
thresholds = []
for i in range(0,n_steps):
thresholds.append(get_repair_threshold(sim_times[i]))
ppservers = () # starting parallelization
for k in range(0, n_steps):
start = time.time()
jobs = []
job_server = pp.Server(ppservers=ppservers)
print "Starting pp for recovery modelling at k=", k, "with", job_server.get_ncpus(), "workers"
for m, n, l in product(xrange(n_gm_maps), xrange(n_damage_maps), xrange(trajectories)):
jobs.append(job_server.submit(compute_repairs_at_timestep, (m, n, k, l, bridge_array_internal_assembled,
bridge_array_new_assembled, thresholds[k], n_steps), depfuncs = (repair,), modules=('numpy', 'random', 'copy',), group = k))
# get the results of timestep k, since the next timestep depends on those
for job in jobs:
temp_dict = job()
if temp_dict is not None:
if k < n_steps-1:
temp_inds = temp_dict['indices']
mstar = temp_inds[0]
nstar = temp_inds[1]
kstar = temp_inds[2] + 1
lstar = temp_inds[3]
bridge_array_internal_assembled[mstar][nstar][kstar][lstar][:] = temp_dict['internal'][mstar][nstar][kstar][lstar]
bridge_array_new_assembled[mstar][nstar][kstar][lstar][:] = temp_dict['new'][mstar][nstar][kstar][lstar]
else:
pass
t_per_timestep.append(time.time()-start)
print "Done with timestep k = ", k, " in ", time.time() - start, " seconds"
# close server
job_server.destroy()
if quiet == False:
#print bridge_array_internal_assembled_rec
print 'Great. You have modeled stochastic recovery of bridges and created time-dependent damage maps.'
if plot == True:
pass
# returns the two lists of lists as a single dictionary with two keys (internal and new)
return {'internal':bridge_array_internal_assembled, 'new':bridge_array_new_assembled, 'timesteps':t_per_timestep}
compute_repairs_at_timestep
def compute_repairs_at_timestep(m, n, k, l,
bridge_array_internal_assembled, bridge_array_new_assembled, threshold,
n_steps, limit = None):
# keep track of ground motion map, associated damage map, timestep,
and recovery trajectory
inds = [m,n,k,l]
# repair bridges stochastically at a single timestep
temp_dict = repair(bridge_array_internal_assembled[m][n][k][l],
bridge_array_new_assembled[m][n][k][l], threshold, limit)
# propagate repair to next timestep until timesteps are done
if k < n_steps-1:
bridge_array_internal_assembled[m][n][k + 1][l][:] =
temp_dict['internal']
bridge_array_new_assembled[m][n][k + 1][l][:] = temp_dict['new']
return {'indices': inds, 'internal':
bridge_array_internal_assembled, 'new':
bridge_array_new_assembled}
else:
return {'indices': inds, 'internal':
bridge_array_internal_assembled, 'new':
bridge_array_new_assembled}
repair
def repair(bridge_array_internal, bridge_array_new, ext_threshold, limit=None):
import numpy as np
d = len(bridge_array_internal)
if d > 0:
temp_internal = copy.deepcopy(bridge_array_internal)
temp_new = copy.deepcopy(bridge_array_new)
# repair bridges stochastically per Shinozuka et al 2003 model
temp = np.random.rand(1, d)
temp = temp[0]
temp = np.ndarray.tolist(temp)
rep_inds_internal = [] # track the internal ID numbers of bridges that are repaired
rep_inds_new = [] # track the new ID numbers of bridges that are repaired
if limit is None:
for i in range(0, d):
if temp[i] < ext_threshold:
rep_inds_internal.append(temp_internal[i])
rep_inds_new.append(temp_new[i])
if len(rep_inds_internal) > 0: # if bridges have been repaired in the previous loop, we need to remove them from the list of damaged bridges
for i in range(0, len(rep_inds_internal)):
try:
temp_internal.remove(rep_inds_internal[i])
temp_new.remove(rep_inds_new[i])
except ValueError:
pass
else: # if limit is a number
rep_count = 0
for i in range(0, d):
if (temp[i] < ext_threshold) & (rep_count < limit):
rep_inds_internal.append(temp_internal[i])
rep_inds_new.append(temp_new[i])
rep_count += 1
elif rep_count == limit: # if we've repaired as many bridges as we are allowed to, stop repairing bridges
break
else:
pass
if len(rep_inds_internal) > 0: # if bridges have been repaired in the previous loop, we need to remove them from the list of damaged bridges
for i in range(0, len(rep_inds_internal)):
try:
temp_internal.remove(rep_inds_internal[i])
temp_new.remove(rep_inds_new[i])
except ValueError:
pass
return {'internal': temp_internal,'new': temp_new} # these damaged bridge arrays have been updated to not include repaired bridges
elif d == 0:
# all bridges have already been repaired
return {'internal': [], 'new': []}