0

So what I'm trying to do is go through a 5Gb xml file of products for a website and eventually add the data to a datastore. I'm just playing around with queues now and my idea was to create a queue that will read through the file line by line and take every 50 products and send them to another queue to be processed (eventually to the datastore). I'm testing this on a much smaller xml file. My problem is within OpenFileQueue, it's creating a queue even when the conditions "if ((self.count % 50) == 0):" have not been met. Any ideas on what might be going on? Or ideas on better ways to read through this file. It feels like a bad hack the way im doing it now. The test file im using has around 170 products when I run the code as it is now and call /gcs I end up with about 86 queues. Not sure what is going on here.

import webapp2
import os
import datetime
import time
from lxml import etree
import sys
import codecs
import time
import gc
import logging


from google.appengine.ext import db
from google.appengine.api import search
import cloudstorage as gcs
from google.appengine.api import taskqueue

my_default_retry_params = gcs.RetryParams(initial_delay=0.2,
                                      max_delay=5.0,
                                      backoff_factor=2,
                                      max_retry_period=15)
gcs.set_default_retry_params(my_default_retry_params)

logging.getLogger().setLevel(logging.DEBUG)


class GoogleCloudStorage(webapp2.RequestHandler):

    def get(self):
        bucket = '/newegg-catalog'
        self.response.headers['Content-Type'] = 'text/plain'
        self.tmp_filenames_to_clean_up = []    
        filename = bucket + '/ndd.xml'     
        taskqueue.add(url='/openfile', params={'filename': filename})
        self.redirect('/')



class AddFileParts(webapp2.RequestHandler):
     def post(self):
         data = self.request.get('data')
         logging.debug('PROCESSING %s', data)

class OpenFileQueue(webapp2.RequestHandler):
    def __init__(self, request, response):
        self.initialize(request, response)
        self.Plist = []
        self.masterList = []
        self.count = 0

    def post(self):
        filename = self.request.get('filename')
        logging.debug('Opening file %s', filename)
        gcs_file = gcs.open(filename)

        while True:
            line = gcs_file.readline()
            self.Plist.append(line)
            if line.strip()=="</product>":
                self.masterList.append(self.Plist)
                self.Plist = []
                self.count+=1

            if ((self.count % 50) == 0):
                logging.debug('Starting queue of items up to %s with 50 items', self.count)
                taskqueue.add(url='/adddata', params={'data': self.masterList})
                self.masterList = []
            if line.strip()=="</catalog>":
                break
        gcs_file.close()

app = webapp2.WSGIApplication([('/adddata',AddFileParts),
                                ('/openfile', OpenFileQueue),
                                ('/gcs', GoogleCloudStorage)],
                                debug=True)
4

1 回答 1

0

当一行匹配"</product>"时,它会追加self.masterlist并增加self.count(最终增加到 50)。但如果下一行不是"</product>",则计数仍为 50 并将另一个任务添加到队列中。

而是使用 的长度,self.masterList因为它在添加到队列后会被重置:

if len(self.masterList) >= 50:
    logging.debug('Starting queue of items up to %s with 50 items', len(self.masterList))
    taskqueue.add(url='/adddata', params={'data': self.masterList})
    self.masterList = []

并删除所有对self.count.

于 2013-08-12T21:07:48.333 回答