3

我正在创建一个解析大型(但简单)CSV 的python 脚本。

需要一些时间来处理。我希望能够中断 CSV 的解析,以便稍后继续。

目前我有这个 - 其中住在一个更大的班级:(未完成)

编辑:

我有一些更改的代码。但系统将解析超过 300 万行。

def parseData(self)
    reader = csv.reader(open(self.file))
    for id, title, disc in reader:
        print "%-5s %-50s %s" % (id, title, disc)
        l = LegacyData()
        l.old_id = int(id)
        l.name = title
        l.disc_number = disc
        l.parsed = False
        l.save()

这是旧代码。

def parseData(self):
        #first line start
        fields = self.data.next()
        for row in self.data:
            items = zip(fields, row)
            item = {}
            for (name, value) in items:
                item[name] = value.strip()
            self.save(item)

多谢你们。

4

4 回答 4

2

如果在 linux 下,按 Ctrl-Z 并停止正在运行的进程。键入“fg”将其恢复并从您停止的地方开始。

于 2011-01-05T00:26:20.243 回答
1

我这样做的方式:

将实际处理代码放入一个类中,然后在该类上实现 Pickle 协议(http://docs.python.org/library/pickle.html)(基本上,编写正确的__getstate__函数__setstate__

此类将接受文件名、保留打开的文件和 CSV 阅读器实例作为实例成员。该__getstate__方法将保存当前文件位置,setstate 将重新打开文件,将其转发到正确的位置,并创建一个新的阅读器。

我会在一个__iter__方法中执行实际的工作,在处理每一行之后都会向外部函数屈服。

这个外部函数将运行一个“主循环”监控输入的中断(套接字、键盘、文件系统上特定文件的状态等)——一切都很安静,它只会调用处理器的下一次迭代。如果发生中断,它会将处理器状态腌制到磁盘上的特定文件。

启动时,程序只需检查是否有保存的执行,如果有,使用 pickle 检索执行器对象,并恢复主循环。

这里有一些(未经测试的)代码 - iea 很简单:

from cPickle import load, dump
import csv
import os, sys

SAVEFILE = "running.pkl"
STOPNOWFILE = "stop.now"

class Processor(object):
    def __init__(self, filename):
        self.file = open(filename, "rt")
        self.reader = csv.reader(self.file)
    def __iter__(self):
        for line in self.reader():
            # do stuff
            yield None
    def __getstate__(self):
        return (self.file.name, self.file.tell())
    def __setstate__(self, state):
        self.file = open(state[0],"rt")
        self.file.seek(state[1])
        self.reader = csv.reader(self.File)

def check_for_interrupts():
    # Use your imagination here!  
    # One simple thing would e to check for the existence of an specific file
    # on disk.
    # But you go all the way up to instantiate a tcp server and listen to 
    # interruptions on the network
    if os.path.exists(STOPNOWFILE): 
        return True
    return False

def main():
    if os.path.exists(SAVEFILE):
        with open(SAVEFILE) as savefile:
            processor = load(savefile)
        os.unlink(savefile)
    else:
        #Assumes the name of the .csv file to be passed on the command line
        processor = Processor(sys.argv[1])
    for line in processor:
        if check_for_interrupts():
            with open(SAVEFILE, "wb") as savefile:
                dump(processor)
            break

if __name__ == "__main__":
    main()
于 2011-01-05T03:04:17.207 回答
1

您可以使用signal来捕捉事件。这是一个解析器模型,它可以捕捉CTRL-C到窗口并停止解析:

import signal, tme, sys

def onInterupt(signum, frame):
    raise Interupted()

try:
    #windows
    signal.signal(signal.CTRL_C_EVENT, onInterupt)
except:
    pass

class Interupted(Exception): pass
class InteruptableParser(object):

    def __init__(self, previous_parsed_lines=0):
        self.parsed_lines = previous_parsed_lines

    def _parse(self, line):
        # do stuff
        time.sleep(1) #mock up
        self.parsed_lines += 1
        print 'parsed %d' % self.parsed_lines

   def parse(self, filelike):
        for line in filelike:
            try:
                self._parse(line)
            except Interupted:
                print 'caught interupt'
                self.save()
                print 'exiting ...'
                sys.exit(0)

    def save(self):
        # do what you need to save state
        # like write the parse_lines to a file maybe
        pass

parser = InteruptableParser()
parser.parse([1,2,3])

虽然我现在在 linux 上,但无法对其进行测试。

于 2011-01-05T01:33:41.907 回答
0

我的完整代码

我按照@jsbueno 的建议使用了一个标志——但我没有将另一个文件作为变量保存在类中:

我创建了一个类 - 当我调用它时,它会询问任何输入,然后开始另一个进程来完成我的工作。当它循环时 - 如果我按下一个键,则设置标志并且仅在为我的下一次解析调用循环时检查。因此,我不会终止当前的操作。从我正在调用的数据中为每个对象在数据库中添加一个process标志意味着我可以随时启动它并从我离开的地方继续。

class MultithreadParsing(object):
    
    process = None
    process_flag = True
    
    def f(self):
        print "\nMultithreadParsing has started\n"
        while self.process_flag:
            ''' get my object from database '''
            legacy = LegacyData.objects.filter(parsed=False)[0:1]
            
            if legacy:
                print "Processing: %s %s" % (legacy[0].name, legacy[0].disc_number)
                for l in legacy:
                    ''' ... Do what I want it to do ...'''
                sleep(1)
            else:
                self.process_flag = False
                print "Nothing to parse"
                
        
    
    def __init__(self):
        self.process = Process(target=self.f)
        self.process.start()
        print self.process
        a = raw_input("Press any key to stop \n")
        print "\nKILL FLAG HAS BEEN SENT\n"
        
        if a:
            print "\nKILL\n"
            self.process_flag = False

感谢你们所有的帮助(尤其是你们的@jsbueno)——如果不是你们,我不会有这个课程的想法。

于 2011-01-16T12:27:06.337 回答