0

我正在使用 MapReduce 代码,但遇到了一个问题:地图(需要 1 小时)完成后代码没有响应。我深入研究了代码,发现这个函数没有响应:

def wait(self, running, tag): """Test if any worker has finished its job. If so, decrease its key and make it available """ 
  atimer = Timer('Wait')

  inittime = time()
  status = MPI.Status()
  while time() - inittime < self.config['jobwait']:
      if world.Iprobe(source=MPI.ANY_SOURCE,tag=tag,status=status):
          jobf = world.recv(source=status.source, tag=tag)
          idx = 0
          for ii, worker in enumerate(self.workers):
              if worker.id == status.source: idx = ii; break
          if self.config['verbosity'] >= 8:
              print('Freeing worker '+str(self.workers[idx].id))
          worker = self.workers[idx]

          # faulty worker's job has already been cleaned
          if not worker.isFaulty():
              del running[jobf]
          else:
              self.nActive += 1
          worker.setFree()
          heapq._siftup(self.workers, idx)

此行没有响应:

if world.Iprobe(source=MPI.ANY_SOURCE,tag=tag,status=status):

我想知道是否有超时时间Iprobe()以及mpi4py如何设置超时时间?是否有替代品在Iprobe()这里具有相同的作用?

这是之前通过 .send() 发送消息的函数

def execTask(self, task):
    """Wrapper function calling mapping/reducing/finalizing phase tasks,
    dispatch tasks to workers until all finished and collect feedback. 
    Faulty workers are removed from active duty work list.
    """
    atimer = Timer(task)
    print( 'Entering {0:s} phase...'.format(task) )

    taskDict = { 'Map':(self.mapIn, MAP_START, MAP_FINISH), \
            'Init':(self.mapIn, INIT_START, MAP_FINISH), \
            'Reduce':(self.reduceIn, REDUCE_START, REDUCE_FINISH) }

    # line up jobs and workers into priority queues
    jobs = taskDict[task][0][:]
    heapq.heapify(jobs); running = {}
    heapq.heapify(self.workers)

    while (jobs or running) and self.nActive > 0:
        # dispatch all jobs to all free workers
        while jobs and self.workers[0].isFree():
            job = heapq.heappop(jobs)
            worker = heapq.heappop(self.workers)
            world.send(job, dest=worker.id, tag=taskDict[task][1])
            print('hi')
            print job
            worker.setBusy(); heapq.heappush(self.workers, worker)
            running[job] = (time(), worker)
            if self.config['verbosity'] >= 6:
                print('Dispatching file '+os.path.basename(job)+' to worker '+str(worker.id))
            # if no more free workers, break
            if not self.workers[0].isFree(): break

        # wait for finishing workers as well as do cleaning
        self.wait(running, taskDict[task][2])
       # print running 
        self.clean(running, jobs)

    print( '{0:s} phase completed'.format(task) )

整个代码可以在这里看到:

https://drive.google.com/file/d/0B36fJi35SPIedWdjbW5NdzlCeTg/view?usp=sharing

4

0 回答 0