2

我正在编写一个文件处理器,它可以(希望)解析任意文件并对解析的内容执行任意操作。文件处理器需要连续运行。我遵循的基本思想是

  1. 每个文件将有两个关联的进程(一个用于读取,另一个用于解析和写入其他地方)
  2. 阅读器会将一行读入公共缓冲区(例如 a Queue),直到 EOF 或缓冲区满。然后等待(睡觉)
  3. 编写器将从缓冲区读取,解析内容,将其写入(比如说)DB,直到缓冲区不为空。然后等待(睡觉)
  4. 中断主程序会导致读写器安全退出(缓冲区可以不写就被冲走)

程序运行良好。但是,有时 Writer 会先初始化并发现缓冲区为空。所以它会去睡觉。Reader 也会填满缓冲区并休眠。所以对于sleep_interval我的代码什么都不做。为了解决这个问题,我尝试使用 amultiprocessing.Event()向写入器发出信号,表明缓冲区有一些可以处理的条目。

我的代码是

import multiprocessing
import time
import sys
import signal
import Queue

class FReader(multiprocessing.Process): 
    """
    A basic file reader class
    It spawns a new process that shares a queue with the writer process
    """
    def __init__(self,queue,fp,sleep_interval,read_offset,event): 
        self.queue = queue
        self.fp = fp
        self.sleep_interval = sleep_interval
        self.offset = read_offset
        self.fp.seek(self.offset)
        self.event = event
        self.event.clear()
        super(FReader,self).__init__()

    def myhandler(self,signum,frame): 
        self.fp.close()
        print "Stopping Reader"
        sys.exit(0)

    def run(self): 
        signal.signal(signal.SIGINT,self.myhandler)
        signal.signal(signal.SIGCLD,signal.SIG_DFL)
        signal.signal(signal.SIGILL,self.myhandler)
        while True: 
            sleep_now = False
            if not self.queue.full(): 
                print "READER:Reading"
                m = self.fp.readline()
                if not self.event.is_set(): 
                    self.event.set()
                if m: 
                    self.queue.put((m,self.fp.tell()),block=False)
                else: 
                    sleep_now = True 
            else: 
                print "Queue Full"
                sleep_now = True

            if sleep_now: 
                print "Reader sleeping for %d seconds"%self.sleep_interval
                time.sleep(self.sleep_interval)            

class FWriter(multiprocessing.Process): 
    """
    A basic file writer class
    It spawns a new process that shares a queue with the reader process
    """
    def __init__(self,queue,session,sleep_interval,fp,event): 
        self.queue = queue
        self.session = session
        self.sleep_interval = sleep_interval
        self.offset = 0
        self.queue_offset = 0
        self.fp = fp
        self.dbqueue = Queue.Queue(50)
        self.event = event
        self.event.clear()
        super(FWriter,self).__init__()

    def myhandler(self,signum,frame): 
        #self.session.commit()
        self.session.close()
        self.fp.truncate()
        self.fp.write(str(self.offset))
        self.fp.close()
        print "Stopping Writer"
        sys.exit(0)

    def process_line(self,line): 
        #Do not process comments
        if line[0] == '#': 
            return None
        my_list = []
        split_line = line.split(',')
        my_list = split_line
        return my_list

    def run(self): 
        signal.signal(signal.SIGINT,self.myhandler)
        signal.signal(signal.SIGCLD,signal.SIG_DFL)
        signal.signal(signal.SIGILL,self.myhandler)
        while True: 
            sleep_now = False
            if not self.queue.empty(): 
                print "WRITER:Getting"
                line,offset = self.queue.get(False)
                #Process the line just read
                proc_line = self.process_line(line)
                if proc_line: 
                    #Must write it to DB. Put it into DB Queue
                    if self.dbqueue.full(): 
                        #DB Queue is full, put data into DB before putting more data
                        self.empty_dbqueue()
                    self.dbqueue.put(proc_line)
                    #Keep a track of the maximum offset in the queue
                    self.queue_offset = offset if offset > self.queue_offset else self.queue_offset
            else: 
                #Looks like writing queue is empty. Just check if DB Queue is empty too
                print "WRITER: Empty Read Queue"
                self.empty_dbqueue()
                sleep_now = True
            if sleep_now: 
                self.event.clear()
                print "WRITER: Sleeping for %d seconds"%self.sleep_interval
                #time.sleep(self.sleep_interval)
                self.event.wait(5) 



    def empty_dbqueue(self): 
        #The DB Queue has many objects waiting to be written to the DB. Lets write them 
        print "WRITER:Emptying DB QUEUE"
        while True: 
            try: 
                new_line = self.dbqueue.get(False)
            except Queue.Empty: 
                #Write the new offset to file
                self.offset = self.queue_offset
                break
            print new_line[0]

def main(): 
    write_file = '/home/xyz/stats.offset'
    wp = open(write_file,'r')
    read_offset = wp.read()
    try: 
        read_offset = int(read_offset)
    except ValueError: 
        read_offset = 0
    wp.close()
    print read_offset
    read_file = '/var/log/somefile'
    file_q = multiprocessing.Queue(100)
    ev = multiprocessing.Event()
    new_reader = FReader(file_q,open(read_file,'r'),30,read_offset,ev)
    new_writer = FWriter(file_q,open('/dev/null'),30,open(write_file,'w'),ev)
    new_reader.start()
    new_writer.start()
    try: 
        new_reader.join()
        new_writer.join()
    except KeyboardInterrupt: 
        print "Closing Master"
        new_reader.join()
        new_writer.join()

if __name__=='__main__': 
    main()

Writer 中的 dbqueue 用于将数据库写入批处理在一起,并且对于每一行,我都保留该行的偏移量。写入 DB 的最大偏移量在退出时存储到偏移量文件中,这样我就可以在下次运行时找到我离开的地方。DB 对象(会话)仅'/dev/null'用于演示。

以前而不是做

self.event.wait(5)

我在做

time.sleep(self.sleep_interval)

哪个(正如我所说的)运作良好,但引入了一点延迟。但随后流程完美退出。

Ctrl-C现在在主进程上做一个,阅读器退出但作者抛出一个 OSError

^CStopping Reader
Closing Master
Stopping Writer
Process FWriter-2:
Traceback (most recent call last):
  File "/usr/lib64/python2.6/multiprocessing/process.py", line 232, in _bootstrap
    self.run()
  File "FileParse.py", line 113, in run
    self.event.wait(5)
  File "/usr/lib64/python2.6/multiprocessing/synchronize.py", line 303, in wait
    self._cond.wait(timeout)
  File "/usr/lib64/python2.6/multiprocessing/synchronize.py", line 212, in wait
    self._wait_semaphore.acquire(True, timeout)
OSError: [Errno 0] Error

我知道 event.wait() 以某种方式阻止了代码,但我不知道如何克服这个问题。我尝试将self.event.wait(5)和包装sys.exit()在一个try: except OSError:块中,但这只会使程序永远挂起。

我正在使用 Python-2.6

4

1 回答 1

1

我认为最好为 Writer 类使用Queue阻塞超时 - 使用 Queue.get(True, 5),然后如果在时间间隔内将某些东西放入队列中,则 Writer 会立即唤醒.. Writer然后循环将类似于:

while True: 
    sleep_now = False
    try:
        print "WRITER:Getting"
        line,offset = self.queue.get(True, 5)
        #Process the line just read
        proc_line = self.process_line(line)
        if proc_line: 
            #Must write it to DB. Put it into DB Queue
            if self.dbqueue.full(): 
                #DB Queue is full, put data into DB before putting more data
                self.empty_dbqueue()
            self.dbqueue.put(proc_line)
            #Keep a track of the maximum offset in the queue
            self.queue_offset = offset if offset > self.queue_offset else self.queue_offset
    except Queue.Empty: 
        #Looks like writing queue is empty. Just check if DB Queue is empty too
        print "WRITER: Empty Read Queue"
        self.empty_dbqueue()
于 2012-09-21T11:17:03.613 回答