3

我有太多未完成的线程的问题。我认为队列命令 .join() 只是关闭队列而不是使用它的线程。

在我的脚本中,我需要检查 280k 域,并为每个域获取他的 MX 记录列表,并获取服务器的 IPv6 地址(如果有)。

我使用了线程并感谢他们的脚本,它的速度要快很多倍。但是有一个问题,虽然队列有 join(),但活动线程的数量正在增长,直到发生错误,通知无法创建任何新线程(操作系统的限制?)。

当我从数据库中检索新域时,如何在每个 For 循环之后终止/关闭/停止/重置线程?

线程类定义...

class MX_getAAAA_thread(threading.Thread):
    def __init__(self,queue,id_domain):
        threading.Thread.__init__(self)
        self.queue = queue
        self.id_domain = id_domain


    def run(self):
        while True:
            self.mx = self.queue.get()

            res = dns.resolver.Resolver()
            res.lifetime = 1.5
            res.timeout = 0.5

            try:
                answers = res.query(self.mx,'AAAA')
                ip_mx = str(answers[0])
            except:
                ip_mx = "N/A"

            lock.acquire()

            sql = "INSERT INTO mx (id_domain,mx,ip_mx) VALUES (" + str(id_domain) + ",'" + str(self.mx) + "','" + str(ip_mx) + "')"
            try:
                cursor.execute(sql)
                db.commit()
            except:
                db.rollback()

            print "MX" , '>>' , ip_mx, ' :: ', str(self.mx)

            lock.release()
            self.queue.task_done()

正在使用的线程类...(这里没有主要的for循环,这只是他身体的一部分)

try:
    answers = resolver.query(domain, 'MX')

    qMX = Queue.Queue()
    for i in range(len(answers)):
        t = MX_getAAAA_thread(qMX,id_domain)
        t.setDaemon(True)
        threads.append(t)
        t.start()

    for mx in answers:
        qMX.put(mx.exchange)

    qMX.join()

except NoAnswer as e:
    print "MX - Error: No Answer"
except Timeout as etime:
    print "MX - Error: dns.exception.Timeout"

print "end of script"

我尝试过了:

for thread in threads:
            thread.join()

队列完成后,但是 thread.join() 永远不会停止等待,尽管事实上不需要等待,因为当 queue.join() 执行时,线程没有任何事情可做。

4

3 回答 3

5

当我的线程涉及这样的无限循环时,我经常做的是将条件更改为我可以从外部控制的东西。例如像这样:

def run(self):
    self.keepRunning = True
    while self.keepRunning:
        # do stuff

这样,我可以keepRunning从外部更改属性并将其设置为 false,以便在下次检查循环条件时优雅地终止线程。

顺便提一句。由于您似乎为放入队列的每个项目都生成了一个线程,因此您甚至根本不需要线程循环,尽管我认为您应该始终强制执行可以创建的线程的最大限制这样(即for i in range(min(len(answers), MAX_THREAD_COUNT)):

选择

在您的情况下,您可以重用线程,而不是在每次 for 循环迭代中终止线程。根据我从您的线程源收集的信息,使线程对迭代唯一的所有因素都是id_domain您在创建它时设置的属性。但是,您也可以在队列中提供它,因此线程是完全独立的,您可以重用它们。

这可能看起来像这样:

qMX = Queue.Queue()
threads = []
for i in range(MAX_THREAD_COUNT):
    t = MX_getAAAA_thread(qMX)
    t.daemon = True
    threads.append(t)
    t.start()

for id_domain in enumerateIdDomains():
    answers = resolver.query(id_domain, 'MX')
    for mx in answers:
        qMX.put((id_domain, mx.exchange)) # insert a tuple

qMX.join()

for thread in threads:
    thread.keepRunning = False

当然,你需要稍微改变你的线程:

class MX_getAAAA_thread(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def run(self):
        self.keepRunning = True
        while self.keepRunning:
            id_domain, mx = self.queue.get()
            # do stuff
于 2013-01-27T14:52:36.683 回答
4

我不明白为什么你首先需要 a Queue
毕竟在您的设计中,每个线程只处理一项任务。
您应该能够在创建时将该任务传递给线程。
这样你就不需要 aQueue并且你摆脱了while-loop:

class MX_getAAAA_thread(threading.Thread):
    def __init__(self, id_domain, mx):
        threading.Thread.__init__(self)
        self.id_domain = id_domain
        self.mx = mx

然后你可以去掉-method中的while-loop :run

def run(self):
    res = dns.resolver.Resolver()
    res.lifetime = 1.5
    res.timeout = 0.5

    try:
        answers = res.query(self.mx,'AAAA')
        ip_mx = str(answers[0])
    except:
        ip_mx = "N/A"

    with lock:
        sql = "INSERT INTO mx (id_domain,mx,ip_mx) VALUES (" + str(id_domain) + ",'" + str(self.mx) + "','" + str(ip_mx) + "')"
        try:
            cursor.execute(sql)
            db.commit()
        except:
            db.rollback()

        print "MX" , '>>' , ip_mx, ' :: ', str(self.mx)

为每个任务创建一个线程

for mx in answers:
    t = MX_getAAAA_thread(qMX, id_domain, mx)
    t.setDaemon(True)
    threads.append(t)
    t.start()

并加入他们

for thread in threads:
    thread.join()
于 2013-01-27T14:41:42.697 回答
2

加入线程可以解决问题,但是在您的情况下,连接会无限期地阻塞,因为您的线程永远不会退出您的运行循环。您需要退出 run 方法,以便可以加入线程。

于 2013-01-27T14:23:02.910 回答