我正在使用 MapReduce 代码,但遇到了一个问题:地图(需要 1 小时)完成后代码没有响应。我深入研究了代码,发现这个函数没有响应:
def wait(self, running, tag): """Test if any worker has finished its job. If so, decrease its key and make it available """
atimer = Timer('Wait')
inittime = time()
status = MPI.Status()
while time() - inittime < self.config['jobwait']:
if world.Iprobe(source=MPI.ANY_SOURCE,tag=tag,status=status):
jobf = world.recv(source=status.source, tag=tag)
idx = 0
for ii, worker in enumerate(self.workers):
if worker.id == status.source: idx = ii; break
if self.config['verbosity'] >= 8:
print('Freeing worker '+str(self.workers[idx].id))
worker = self.workers[idx]
# faulty worker's job has already been cleaned
if not worker.isFaulty():
del running[jobf]
else:
self.nActive += 1
worker.setFree()
heapq._siftup(self.workers, idx)
此行没有响应:
if world.Iprobe(source=MPI.ANY_SOURCE,tag=tag,status=status):
我想知道是否有超时时间Iprobe()
以及mpi4py
如何设置超时时间?是否有替代品在Iprobe()
这里具有相同的作用?
这是之前通过 .send() 发送消息的函数
def execTask(self, task):
"""Wrapper function calling mapping/reducing/finalizing phase tasks,
dispatch tasks to workers until all finished and collect feedback.
Faulty workers are removed from active duty work list.
"""
atimer = Timer(task)
print( 'Entering {0:s} phase...'.format(task) )
taskDict = { 'Map':(self.mapIn, MAP_START, MAP_FINISH), \
'Init':(self.mapIn, INIT_START, MAP_FINISH), \
'Reduce':(self.reduceIn, REDUCE_START, REDUCE_FINISH) }
# line up jobs and workers into priority queues
jobs = taskDict[task][0][:]
heapq.heapify(jobs); running = {}
heapq.heapify(self.workers)
while (jobs or running) and self.nActive > 0:
# dispatch all jobs to all free workers
while jobs and self.workers[0].isFree():
job = heapq.heappop(jobs)
worker = heapq.heappop(self.workers)
world.send(job, dest=worker.id, tag=taskDict[task][1])
print('hi')
print job
worker.setBusy(); heapq.heappush(self.workers, worker)
running[job] = (time(), worker)
if self.config['verbosity'] >= 6:
print('Dispatching file '+os.path.basename(job)+' to worker '+str(worker.id))
# if no more free workers, break
if not self.workers[0].isFree(): break
# wait for finishing workers as well as do cleaning
self.wait(running, taskDict[task][2])
# print running
self.clean(running, jobs)
print( '{0:s} phase completed'.format(task) )
整个代码可以在这里看到:
https://drive.google.com/file/d/0B36fJi35SPIedWdjbW5NdzlCeTg/view?usp=sharing