我有太多未完成的线程的问题。我认为队列命令 .join() 只是关闭队列而不是使用它的线程。
在我的脚本中,我需要检查 280k 域,并为每个域获取他的 MX 记录列表,并获取服务器的 IPv6 地址(如果有)。
我使用了线程并感谢他们的脚本,它的速度要快很多倍。但是有一个问题,虽然队列有 join(),但活动线程的数量正在增长,直到发生错误,通知无法创建任何新线程(操作系统的限制?)。
当我从数据库中检索新域时,如何在每个 For 循环之后终止/关闭/停止/重置线程?
线程类定义...
class MX_getAAAA_thread(threading.Thread):
def __init__(self,queue,id_domain):
threading.Thread.__init__(self)
self.queue = queue
self.id_domain = id_domain
def run(self):
while True:
self.mx = self.queue.get()
res = dns.resolver.Resolver()
res.lifetime = 1.5
res.timeout = 0.5
try:
answers = res.query(self.mx,'AAAA')
ip_mx = str(answers[0])
except:
ip_mx = "N/A"
lock.acquire()
sql = "INSERT INTO mx (id_domain,mx,ip_mx) VALUES (" + str(id_domain) + ",'" + str(self.mx) + "','" + str(ip_mx) + "')"
try:
cursor.execute(sql)
db.commit()
except:
db.rollback()
print "MX" , '>>' , ip_mx, ' :: ', str(self.mx)
lock.release()
self.queue.task_done()
正在使用的线程类...(这里没有主要的for循环,这只是他身体的一部分)
try:
answers = resolver.query(domain, 'MX')
qMX = Queue.Queue()
for i in range(len(answers)):
t = MX_getAAAA_thread(qMX,id_domain)
t.setDaemon(True)
threads.append(t)
t.start()
for mx in answers:
qMX.put(mx.exchange)
qMX.join()
except NoAnswer as e:
print "MX - Error: No Answer"
except Timeout as etime:
print "MX - Error: dns.exception.Timeout"
print "end of script"
我尝试过了:
for thread in threads:
thread.join()
队列完成后,但是 thread.join() 永远不会停止等待,尽管事实上不需要等待,因为当 queue.join() 执行时,线程没有任何事情可做。