我正在测试一个蚁群优化 (ACO) 软件,该软件使用多个线程(每个创建的蚂蚁 1 个线程)运行。
在允许下一次迭代开始之前,每次 ACO 迭代都应该等待所有线程完成。我正在使用线程模块中的“条件()”来执行此操作。
由于蚂蚁共享一个信息素矩阵,因此对该矩阵的读写受制于锁,也来自线程模块。
现在描述问题:
我运行该函数并在每次迭代时打印一些东西。有时,并非总是如此,函数的执行似乎会停止,也就是说,它会停止打印,这意味着迭代从未完成。
老实说,我现在不知道为什么会发生这种情况,我将不胜感激任何可以让我走上正轨的答案。如果我不得不猜测,我会说条件变量没有被正确调用,或者类似的东西。但是我不确定,而且我也觉得奇怪的是这种情况有时会发生。
下面是相关功能。ACO 通过调用 start() 函数开始。这会创建 N 个线程,完成后调用 update()。此更新函数在被调用 N 次后调用 notify,这允许 start() 继续该过程,并最终开始下一次迭代。我还发布了每个线程的运行方法。
值得一提的是,如果没有守护程序操作,错误几乎不会发生。对于守护进程操作,它几乎总是发生(我也觉得很奇怪)。最后,错误并不总是发生在同一次迭代中。
def start(self):
self.ants = self.create_ants()
self.iter_counter = 0
while self.iter_counter < self.num_iterations:
print "START ACQUIRED"
self.cv.acquire()
print "calling iteration"
self.iteration()
#CV wait until all ants (threads) finish and call update, which
#calls notify(), and allow continuation
while not self.iter_done:
print "iter not complete, W8ING"
self.cv.wait()
print "global update "
self.global_update_with_lock()
print "START RELEASED"
self.cv.release()
def update(self, ant):
lock = Lock()
lock.acquire()
print "Update called by %s" % (ant.ID,)
self.ant_counter += 1
self.avg_path_cost += ant.path_cost
# book-keeping
if ant.path_cost < self.best_path_cost:
self.best_path_cost = ant.path_cost
self.best_path_mat = ant.path_mat
self.best_path_vec = ant.path_vec
self.last_best_path_iteration = self.iter_counter
#all threads finished, call notify
print "ant counter"
print self.ant_counter
if self.ant_counter == len(self.ants):
print "ants finished"
#THIS MIGHT CAUSE PROBLEMS (no need to notify if its no one waiting)
self.best_cost_at_iter.append(self.best_path_cost)
self.avg_path_cost /= len(self.ants)
self.cv.acquire()
self.iter_done = True
self.cv.notify()
self.cv.release()
lock.release()
# overide Thread's run()
def run(self):
graph = self.colony.graph
while not self.end():
# we need exclusive access to the graph
graph.lock.acquire()
new_node = self.state_transition_rule(self.curr_node)
self.path_cost += graph.delta(self.curr_node, new_node)
self.path_vec.append(new_node)
self.path_mat[self.curr_node][new_node] = 1 #adjacency matrix representing path
#print "Ant %s : %s, %s" % (self.ID, self.path_vec, self.path_cost,)
self.local_updating_rule(self.curr_node, new_node)
graph.lock.release()
self.curr_node = new_node
# close the tour
self.path_vec.append(self.path_vec[0])
#RUN LOCAL HEURISTIC
if self.daemon == True:
try:
daemon_result = twoOpt(self.path_vec, graph.delta_mat)
d_path, d_adj = daemon_result['path_vec'], daemon_result['path_matrix']
self.path_vec = d_path
self.path_mat = d_adj
except Exception, e:
print "exception: " + str(e)
traceback.print_exc()
self.path_cost += graph.delta(self.path_vec[-2], self.path_vec[-1])
# send our results to the colony
self.colony.update(self)
#print "Ant thread %s terminating." % (self.ID,)
# allows thread to be restarted (calls Thread.__init__)
self.__init__(self.ID, self.start_node, self.colony, self.daemon, self.Beta, self.Q0, self.Rho)
问题的解决方案: 首先,我根据这里的评论纠正了条件变量等待中的错误。其次,它有时仍然挂起,这是由于线程计数器更新中的一些错误错误。解决方案是将计数器从 int 更改为长度为 num_threads、全为 0 的数组,其中每个线程更新其在列表中的位置。当所有线程完成时,计数器数组应该全为 1。这目前工作得很好。