5

你好!我正在尝试用 python 编写网络爬虫。我想使用python多线程。即使在阅读了早期建议的论文和教程之后,我仍然有问题。我的代码在这里(整个源代码在这里):

class Crawler(threading.Thread):

    global g_URLsDict 
    varLock = threading.Lock()
    count = 0

    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.url = self.queue.get()

    def run(self):
        while 1:
            print self.getName()+" started" 
            self.page = getPage(self.url)
            self.parsedPage = getParsedPage(self.page, fix=True)
            self.urls = getLinksFromParsedPage(self.parsedPage)

            for url in self.urls:

                self.fp = hashlib.sha1(url).hexdigest()

                #url-seen check
                Crawler.varLock.acquire() #lock for global variable g_URLs
                if self.fp in g_URLsDict:
                    Crawler.varLock.release() #releasing lock
                else:
                    #print url+" does not exist"
                    Crawler.count +=1
                    print "total links: %d"%len(g_URLsDict)
                    print self.fp
                    g_URLsDict[self.fp] = url
                    Crawler.varLock.release() #releasing lock
                    self.queue.put(url)

                    print self.getName()+ " %d"%self.queue.qsize()
                    self.queue.task_done()
            #self.queue.task_done()
        #self.queue.task_done()


print g_URLsDict
queue = Queue.Queue()
queue.put("http://www.ertir.com")

for i in range(5):
    t = Crawler(queue)
    t.setDaemon(True)
    t.start()

queue.join()

它不能按需要工作,在线程 1 之后它没有给出任何结果,并且它在某些时候以不同的方式执行会给出此错误:

Exception in thread Thread-2 (most likely raised during interpreter shutdown):

我该如何解决?而且我不认为这比 for 循环更有效。

我试图修复运行():

def run(self):
    while 1:
        print self.getName()+" started" 
        self.page = getPage(self.url)
        self.parsedPage = getParsedPage(self.page, fix=True)
        self.urls = getLinksFromParsedPage(self.parsedPage)

        for url in self.urls:

            self.fp = hashlib.sha1(url).hexdigest()

            #url-seen check
            Crawler.varLock.acquire() #lock for global variable g_URLs
            if self.fp in g_URLsDict:
                Crawler.varLock.release() #releasing lock
            else:
                #print url+" does not exist"
                print self.fp
                g_URLsDict[self.fp] = url
                Crawler.varLock.release() #releasing lock
                self.queue.put(url)

                print self.getName()+ " %d"%self.queue.qsize()
                #self.queue.task_done()
        #self.queue.task_done()
    self.queue.task_done()

我在不同的地方尝试了 task_done() 命令,谁能解释一下区别?

4

1 回答 1

4

您仅self.url = self.queue.get()在线程初始化时调用。如果您想获取新的 url 以便进一步处理,则需要尝试从 while 循环内的队列中重新获取 url。

尝试替换self.page = getPage(self.url)self.page = getPage(self.queue.get()). 请注意,get 函数将无限期阻塞。您可能希望在一段时间后超时并添加一些方式让您的后台线程通过请求优雅地退出(这将消除您看到的异常)。

effbot.org 上有一些很好的例子,它们以我上面描述的方式使用 get()。

编辑- 对您最初评论的回答:

查看文档task_done();对于每次调用get()(不会超时),您应该调用task_done()which 告诉任何阻塞调用join()该队列上的所有内容现在都已处理。每次调用get()都会阻塞(睡眠),同时等待新的 url 发布到队列中。

Edit2 - 试试这个替代运行功能:

def run(self):
    while 1:
        print self.getName()+" started"
        url = self.queue.get() # <-- note that we're blocking here to wait for a url from the queue
        self.page = getPage(url)
        self.parsedPage = getParsedPage(self.page, fix=True)
        self.urls = getLinksFromParsedPage(self.parsedPage)

        for url in self.urls:

            self.fp = hashlib.sha1(url).hexdigest()

            #url-seen check
            Crawler.varLock.acquire() #lock for global variable g_URLs
            if self.fp in g_URLsDict:
                Crawler.varLock.release() #releasing lock
            else:
                #print url+" does not exist"
                Crawler.count +=1
                print "total links: %d"%len(g_URLsDict)
                print self.fp
                g_URLsDict[self.fp] = url
                Crawler.varLock.release() #releasing lock
                self.queue.put(url)

                print self.getName()+ " %d"%self.queue.qsize()

        self.queue.task_done() # <-- We've processed the url this thread pulled off the queue so indicate we're done with it.
于 2012-05-29T14:55:39.050 回答