2

我正在尝试在谷歌应用引擎上构建一个维基百科链接爬虫。我想在数据存储中存储一个索引。但是我遇到了 cron 作业和任务队列的 DeadlineExceededError。

对于 cron 工作,我有以下代码:

def buildTree(self):

    start=time.time()
    self.log.info(" Start Time: %f" % start)
    nobranches=TreeNode.all()       

    for tree in nobranches:            
        if tree.branches==[]:
            self.addBranches(tree)
            time.sleep(1)
        if (time.time()-start) > 10 :                
            break
        self.log.info("Time Eclipsed: %f" % (time.time()-start))

    self.log.info(" End Time:%f" % time.clock())

我不明白为什么 for 循环在 10 秒后没有中断。它在开发服务器上。服务器上的 time.time() 一定有问题。我可以使用其他功能吗?

对于任务队列,我有以下代码:
def addNewBranch(self, keyword, level=0):

    self.log.debug("Add Tree")        
    self.addBranches(keyword)

    t=TreeNode.gql("WHERE name=:1", keyword).get()
    branches=t.nodes

    if level < 3:
        for branch in branches:
            if branch.branches == []:
                taskqueue.add(url="/addTree/%s" % branch.name)
                self.log.debug("url:%s" % "/addTree/%s" % branch.name)

日志显示它们都遇到了 DeadlineExceededError。后台处理的页面请求不应该超过 30 秒。有没有办法绕过异常?

这是 addBranch() 的代码

def addBranches(self, keyword):

    tree=TreeNode.gql("WHERE name=:1", keyword).get()
    if tree is None:
        tree=TreeNode(name=keyword)


    self.log.debug("in addBranches arguments: tree %s", tree.name)     
    t=urllib2.quote(tree.name.encode('utf8'))
    s="http://en.wikipedia.org/w/api.php?action=query&titles=%s&prop=links&pllimit=500&format=xml" % t
    self.log.debug(s)
    try:        
        usock = urllib2.urlopen(s)       
    except :        

        self.log.error( "Could not retrieve doc: %s" % tree.name)
        usock=None

    if usock is not None:

        try:
            xmldoc=minidom.parse(usock)
        except Exception , error:
            self.log.error("Parse Error: %s" % error) 
            return None   
        usock.close()            
        try:
            pyNode= xmldoc.getElementsByTagName('pl')  
            self.log.debug("Nodes to be added: %d" % pyNode.length)
        except Exception, e:
            pyNode=None
            self.log.error("Getting Nodes Error: %s" % e)
            return None
        newNodes=[]    
        if pyNode is not None:
            for child in pyNode: 
                node=None             
                node= TreeNode.gql("WHERE name=:1", child.attributes["title"].value).get()

                if node is None:
                    newNodes.append(TreeNode(name=child.attributes["title"].value))           

                else:
                    tree.branches.append(node.key())  
            db.put(newNodes)
            for node in newNodes:
                tree.branches.append(node.key())
                self.log.debug("Node Added: %s" % node.name)                    
            tree.put()
            return tree.branches 

4

4 回答 4

2

我在 GAE 上的日期时间方面取得了巨大的成功。

from datetime import datetime, timedelta
time_start = datetime.now()
time_taken = datetime.now() - time_start

time_taken 将是一个时间增量。您可以将其与另一个具有您感兴趣的持续时间的 timedelta 进行比较。

ten_seconds = timedelta(seconds=10)
if time_taken > ten_seconds:
    ....do something quick.

听起来使用 mapreduce 或任务队列会更好地为您服务。两者都非常适合处理大量记录。

您拥有的代码的一个更简洁的模式是只获取一些记录。

nobranches=TreeNode.all().fetch(100)

此代码只会提取 100 条记录。如果你有一个完整的 100,当你完成后,你可以在队列中扔另一个项目来启动更多。

-- 基于关于需要没有树枝的树的评论 --

我在那里看不到你的模型,但是如果我试图创建一个没有分支的所有树的列表并处理它们,我会: 仅获取 100 块左右的树的键。然后,我将使用 In 查询获取属于这些树的所有分支。按树键排序。扫描分支列表,第一次找到树的键时,从列表中拉出键树。完成后,您将拥有一个“无分支”树键列表。安排他们中的每一个进行处理。

一个更简单的版本是在树上使用 MapReduce。对于每棵树,找到一个与其 ID 匹配的分支。如果不能,请标记树以进行跟进。默认情况下,这个函数将拉出一批树(我认为是 25 个),同时有 8 个工作人员。而且,它在内部管理作业队列,因此您不必担心超时。

于 2010-10-12T23:12:23.577 回答
1

除了使您的代码在适当的时间范围内执行之外,没有办法“绕过”截止日期异常。

于 2010-10-12T22:16:43.530 回答
1

当 DeadlineExcededErrors 发生时,如果再次调用,您希望请求最终成功。这可能需要保证你的爬取状态已经取得了一些可以在下次跳过的进度。(此处不作说明)

并行调用可以提供极大的帮助。

  • 网址提取
  • Datastore Put(使用 db.put 将实体混合在一起)
  • 数据存储查询(并行查询 - asynctools)

网址提取:

  • 当你进行 urlfetch 调用时,一定要使用异步模式来折叠你的循环。

数据存储

  • 将实体组合到一个往返调用中。

    # put newNodes+tree at the same time
    db.put(newNodes+tree)
    
  • 将 TreeNode.gql 从内部循环拉到并行查询工具中,例如 asynctools http://asynctools.googlecode.com

异步工具示例

    if pyNode is not None:

        runner = AsyncMultiTask()
        for child in pyNode:
             title = child.attributes["title"].value
             query = db.GqlQuery("SELECT __key__ FROM TreeNode WHERE name = :1", title)
             runner.append(QueryTask(query, limit=1, client_state=title))

        # kick off the work
        runner.run()

        # peel out the results
        treeNodes = []
        for task in runner:
            task_result = task.get_result() # will raise any exception that occurred for the given query
            treeNodes.append(task_result)

        for node in treeNodes:
            if node is None:
                newNodes.append(TreeNode(name=child.attributes["title"].value))

            else:
                tree.branches.append(node.key())
        for node in newNodes:
            tree.branches.append(node.key())
            self.log.debug("Node Added: %s" % node.name)

        # put newNodes+tree at the same time
        db.put(newNodes+tree)
        return tree.branches

披露:我与 asynctools 相关联。

于 2010-10-13T05:52:58.263 回答
1

这里的问题是您正在对文档中的每个链接进行查询操作。由于维基百科页面可以包含很多链接,这意味着很多查询 - 因此,您会耗尽处理时间。这种方法也会以惊人的速度消耗您的配额!

相反,您应该使用 Wikipedia 页面的页面名称作为实体的键名。然后,您可以将文档中的所有链接收集到一个列表中,从中构造键(这完全是本地操作),然后对所有链接执行单个批处理 db.get。一旦您根据需要更新和/或创建它们,您可以执行批处理 db.put 将它们全部存储到数据存储区 - 将您的数据存储操作总数从 numlinks*2 减少到仅 2!

于 2010-10-13T09:37:31.303 回答