我正在编写一个文件处理器,它可以(希望)解析任意文件并对解析的内容执行任意操作。文件处理器需要连续运行。我遵循的基本思想是
- 每个文件将有两个关联的进程(一个用于读取,另一个用于解析和写入其他地方)
- 阅读器会将一行读入公共缓冲区(例如 a
Queue
),直到 EOF 或缓冲区满。然后等待(睡觉) - 编写器将从缓冲区读取,解析内容,将其写入(比如说)DB,直到缓冲区不为空。然后等待(睡觉)
- 中断主程序会导致读写器安全退出(缓冲区可以不写就被冲走)
程序运行良好。但是,有时 Writer 会先初始化并发现缓冲区为空。所以它会去睡觉。Reader 也会填满缓冲区并休眠。所以对于sleep_interval
我的代码什么都不做。为了解决这个问题,我尝试使用 amultiprocessing.Event()
向写入器发出信号,表明缓冲区有一些可以处理的条目。
我的代码是
import multiprocessing
import time
import sys
import signal
import Queue
class FReader(multiprocessing.Process):
"""
A basic file reader class
It spawns a new process that shares a queue with the writer process
"""
def __init__(self,queue,fp,sleep_interval,read_offset,event):
self.queue = queue
self.fp = fp
self.sleep_interval = sleep_interval
self.offset = read_offset
self.fp.seek(self.offset)
self.event = event
self.event.clear()
super(FReader,self).__init__()
def myhandler(self,signum,frame):
self.fp.close()
print "Stopping Reader"
sys.exit(0)
def run(self):
signal.signal(signal.SIGINT,self.myhandler)
signal.signal(signal.SIGCLD,signal.SIG_DFL)
signal.signal(signal.SIGILL,self.myhandler)
while True:
sleep_now = False
if not self.queue.full():
print "READER:Reading"
m = self.fp.readline()
if not self.event.is_set():
self.event.set()
if m:
self.queue.put((m,self.fp.tell()),block=False)
else:
sleep_now = True
else:
print "Queue Full"
sleep_now = True
if sleep_now:
print "Reader sleeping for %d seconds"%self.sleep_interval
time.sleep(self.sleep_interval)
class FWriter(multiprocessing.Process):
"""
A basic file writer class
It spawns a new process that shares a queue with the reader process
"""
def __init__(self,queue,session,sleep_interval,fp,event):
self.queue = queue
self.session = session
self.sleep_interval = sleep_interval
self.offset = 0
self.queue_offset = 0
self.fp = fp
self.dbqueue = Queue.Queue(50)
self.event = event
self.event.clear()
super(FWriter,self).__init__()
def myhandler(self,signum,frame):
#self.session.commit()
self.session.close()
self.fp.truncate()
self.fp.write(str(self.offset))
self.fp.close()
print "Stopping Writer"
sys.exit(0)
def process_line(self,line):
#Do not process comments
if line[0] == '#':
return None
my_list = []
split_line = line.split(',')
my_list = split_line
return my_list
def run(self):
signal.signal(signal.SIGINT,self.myhandler)
signal.signal(signal.SIGCLD,signal.SIG_DFL)
signal.signal(signal.SIGILL,self.myhandler)
while True:
sleep_now = False
if not self.queue.empty():
print "WRITER:Getting"
line,offset = self.queue.get(False)
#Process the line just read
proc_line = self.process_line(line)
if proc_line:
#Must write it to DB. Put it into DB Queue
if self.dbqueue.full():
#DB Queue is full, put data into DB before putting more data
self.empty_dbqueue()
self.dbqueue.put(proc_line)
#Keep a track of the maximum offset in the queue
self.queue_offset = offset if offset > self.queue_offset else self.queue_offset
else:
#Looks like writing queue is empty. Just check if DB Queue is empty too
print "WRITER: Empty Read Queue"
self.empty_dbqueue()
sleep_now = True
if sleep_now:
self.event.clear()
print "WRITER: Sleeping for %d seconds"%self.sleep_interval
#time.sleep(self.sleep_interval)
self.event.wait(5)
def empty_dbqueue(self):
#The DB Queue has many objects waiting to be written to the DB. Lets write them
print "WRITER:Emptying DB QUEUE"
while True:
try:
new_line = self.dbqueue.get(False)
except Queue.Empty:
#Write the new offset to file
self.offset = self.queue_offset
break
print new_line[0]
def main():
write_file = '/home/xyz/stats.offset'
wp = open(write_file,'r')
read_offset = wp.read()
try:
read_offset = int(read_offset)
except ValueError:
read_offset = 0
wp.close()
print read_offset
read_file = '/var/log/somefile'
file_q = multiprocessing.Queue(100)
ev = multiprocessing.Event()
new_reader = FReader(file_q,open(read_file,'r'),30,read_offset,ev)
new_writer = FWriter(file_q,open('/dev/null'),30,open(write_file,'w'),ev)
new_reader.start()
new_writer.start()
try:
new_reader.join()
new_writer.join()
except KeyboardInterrupt:
print "Closing Master"
new_reader.join()
new_writer.join()
if __name__=='__main__':
main()
Writer 中的 dbqueue 用于将数据库写入批处理在一起,并且对于每一行,我都保留该行的偏移量。写入 DB 的最大偏移量在退出时存储到偏移量文件中,这样我就可以在下次运行时找到我离开的地方。DB 对象(会话)仅'/dev/null'
用于演示。
以前而不是做
self.event.wait(5)
我在做
time.sleep(self.sleep_interval)
哪个(正如我所说的)运作良好,但引入了一点延迟。但随后流程完美退出。
Ctrl-C
现在在主进程上做一个,阅读器退出但作者抛出一个 OSError
^CStopping Reader
Closing Master
Stopping Writer
Process FWriter-2:
Traceback (most recent call last):
File "/usr/lib64/python2.6/multiprocessing/process.py", line 232, in _bootstrap
self.run()
File "FileParse.py", line 113, in run
self.event.wait(5)
File "/usr/lib64/python2.6/multiprocessing/synchronize.py", line 303, in wait
self._cond.wait(timeout)
File "/usr/lib64/python2.6/multiprocessing/synchronize.py", line 212, in wait
self._wait_semaphore.acquire(True, timeout)
OSError: [Errno 0] Error
我知道 event.wait() 以某种方式阻止了代码,但我不知道如何克服这个问题。我尝试将self.event.wait(5)
和包装sys.exit()
在一个try: except OSError:
块中,但这只会使程序永远挂起。
我正在使用 Python-2.6