1

我的程序有问题。

我设法找到了问题的根源。问题出在“checkPort”方法中。没有它,用于关闭/终止线程的标准教科书方法效果很好。

我错过了什么吗?checkPort 方法中是否有阻止成功加入线程的东西?它总是卡在thread.join()

部分主程序

try:
    queue = Queue.Queue()

    for i in range(MAX_THREAD_COUNT):
        t = checkPort_IPv6_thread(queue)
        t.daemon = True
        threads.append(t)
        t.start()

    cur.execute("SELECT * FROM ipv6")
    results = cur.fetchall()
    for row in results:
        queue.put((row[0], row[2]))

    queue.join()

    for thread in threads:
        thread.stop()
        thread.join()

except Exception as e:
    sys.stderr.write("Error: " + str(e))
    print

print "\nChecking ports for IPv6 - DONE"

这是我调用 checkPort 方法的线程类:

class checkPort_IPv6_thread(threading.Thread):
    def __init__(self,queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.keepRunning = True

    def run(self):
        while self.keepRunning:
            args = self.queue.get()
            id = args[0]
            address = args[1]
            port443 = 0
            port21 = 0 
            port80 = 0 

            #---------------- Check ports for IPv6 ----------------
            if str(address) != "N/A":
                port443 = checkPort("TCP",str(address), 443)
                port21 = checkPort("TCP",str(address), 21)
                port80 = checkPort("TCP",str(address), 80)

            lock.acquire()
            try:
                cur.execute("UPDATE ipv6 SET port_443=" + str(port443) + " WHERE id_ipv6 =" + str(id))
                cur.execute("UPDATE ipv6 SET port_21=" + str(port21) + " WHERE id_ipv6 =" + str(id))
                cur.execute("UPDATE ipv6 SET port_80=" + str(port80) + " WHERE id_ipv6 =" + str(id))
                db.commit()

            except Exception as e:
                sys.stderr.write("Error: " + str(e))
            except:
                db.rollback()
            lock.release()

            self.queue.task_done()


    def stop(self):
        self.keepRunning = False

checkPort 方法

def checkPort(typ, address, port):
    if typ == "TCP":
        s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
    else:
        s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
    pom = 0 # 0/1 = True/False
    try:
        s.settimeout(2) # timeout 1.5 sekundy
        s.connect((str(address), port))
        s.settimeout(None)
        #time.sleep(0.5)
        pom = 1
        print str(address) + " >> on port: " + str(port) + " >> Connection was successfull"

    except socket.timeout:
        print str(address) + " >> on port: " + str(port) + " >> * Error: Timed out *"
    except socket.error as e:
        if e.errno == 10061:
            print str(address) + " >> on port: " + str(port) + " >> * No connection could be made - target machine refused it *"
    except Exception as ex:
        sys.stderr.write("Error: " + str(ex))

    return pom
4

2 回答 2

1

以下内容可能会对您的程序有所帮​​助。它是为 Python 3.x 编写的。我相信主要问题在于args = self.queue.get()您的程序。它应该固定在下面。

import multiprocessing
import queue
import threading
import sys
import socket

MAX_THREAD_COUNT = multiprocessing.cpu_count()

def main(cur):
    cur.execute("""UPDATE ipv6
                      SET port_21 = 0,
                          port_80 = 0,
                          port_443 = 0
                    WHERE address = 'N/A'""")
    q = queue.Queue()
    for _ in range(MAX_THREAD_COUNT):
        CheckPort(q).start()
    cur.execute("""SELECT id_ipv6,
                          address
                     FROM ipv6
                    WHERE address != 'N/A'""")
    for row in cur.fetchall():
        q.put(row)
    q.join()
    for thread in threading.enumerate():
        if isinstance(thread, CheckPort):
            thread.stop()
            thread.join()

class CheckPort(threading.Thread):

    def __init__(self, q):
        super().__init__()
        self.__q = q
        self.__run = True

    def stop(self):
        self.__run = False

    def run(self):
        while self.__run:
            try:
                id, address = self.__q.get(True, 1)
            except queue.Empty:
                continue
            with lock:
                try:
                    cur.execute('''UPDATE ipv6
                                      SET port_21 = ?,
                                          port_80 = ?,
                                          port_443 = ?
                                    WHERE id_ipv6 = ?''',
                                self.check_port(address, 21),
                                self.check_port(address, 80),
                                self.check_port(address, 443),
                                id)
                    db.commit()
                except Exception as error:
                    print('Error:', error, file=sys.stdout)
                self.__q.task_done()

    @staticmethod
    def check_port(address, port):
        sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
        sock.settimeout(2)
        try:
            sock.connect((address, port))
        except socket.timeout:
            return 0
        else:
            sock.shutdown(socket.SHUT_RDWR)
            sock.close()
            return 1

if __name__ == '__main__':
    try:
        main(cur)
    except Exception as error:
        print('Error:', error, file=sys.stdout)
于 2013-01-29T20:22:58.680 回答
0

从 OP 的编辑到他的问题:


解决方案:

感谢 Noctis Skytower,解决方案是catch queue.empty exception

try:
    id, address = self.queue.get(True, 1)
except Queue.Empty:
    continue
于 2013-04-04T17:50:50.607 回答