2

Python中的一些文件读取(readlines())函数
将文件内容复制到内存(作为列表)

我需要处理一个太大而
无法在内存中复制的文件,因此需要使用
文件指针(一次访问一个字节的文件
)——如在 C getc() 中。

我还有一个额外的要求是
我想将文件指针倒回到以前的
字节,就像在 C ungetc() 中一样。

有没有办法在 Python 中做到这一点?


此外,在 Python 中,我可以使用 readline() 一次 读取一行

有没有办法
向后阅读上一行?

4

6 回答 6

5
  • 您不需要 Python 没有或不想要的文件指针。

  • 要逐行遍历文件而不将整个内容读入内存,只需遍历文件对象本身,即

     with open(filename, "r") as f:
         for line in f:
             ...
    

    readlines一般应避免使用。

  • 退回一条线并不是您可以超级轻松地做到的事情。如果您不需要返回超过一行,请查看文档pairwise中的配方。itertools

于 2010-04-16T19:39:14.790 回答
3

好的,这就是我想出的。感谢 Brenda 提出建立班级的想法。
感谢 Josh 提出使用类似 C 的函数 seek() 和 read() 的想法

#!/bin/python

# Usage: BufRead.py inputfile

import sys, os, string
from inspect import currentframe

# Debug function usage
#
# if DEBUG:
#   debugLogMsg(currentframe().f_lineno,currentframe().f_code.co_filename)
#   print ...
def debugLogMsg(line,file,msg=""):
  print "%s:%s %s" % (file,line,msg)

# Set DEBUG off.
DEBUG = 0

class BufRead: 
  def __init__(self,filename): 
    self.__filename           = filename 
    self.__file               = open(self.__filename,'rb') 
    self.__fileposition       = self.__file.tell()
    self.__file.seek(0, os.SEEK_END)
    self.__filesize           = self.__file.tell() 
    self.__file.seek(self.__fileposition, os.SEEK_SET) 

  def close(self): 
    if self.__file is not None: 
      self.__file.close() 
      self.__file = None 

  def seekstart(self):
    if self.__file == None:
      self.__file.seek(0, os.SEEK_SET)
      self.__fileposition = self.__file.tell()

  def seekend(self):
    if self.__file == None:
      self.__file.seek(-1, os.SEEK_END)
      self.__fileposition = self.__file.tell()

  def getc(self):
    if self.__file == None:
      return None 
    self.__fileposition = self.__file.tell()
    if self.__fileposition < self.__filesize:
      byte = self.__file.read(1)
      self.__fileposition = self.__file.tell()
      return byte
    else:
      return None

  def ungetc(self):
    if self.__file == None:
      return None 
    self.__fileposition = self.__file.tell()
    if self.__fileposition > 0:
      self.__fileposition = self.__fileposition - 1
      self.__file.seek(self.__fileposition, os.SEEK_SET)
      byte = self.__file.read(1)
      self.__file.seek(self.__fileposition, os.SEEK_SET)
      return byte
    else:
      return None

  # uses getc() and ungetc()
  def getline(self):
    if self.__file == None:
      return None 
    self.__fileposition = self.__file.tell()

    if self.__fileposition < self.__filesize:
      startOfLine = False
      line = ""

      while True:
        if self.__fileposition == 0:
          startOfLine = True
          break
        else:
          c = self.ungetc()
          if c == '\n':
            c = self.getc()
            startOfLine = True
            break

      if startOfLine:
        c = self.getc()
        if c == '\n':
          return '\n'
        else:
          self.ungetc()

        while True:
          c = self.getc()
          if c == '\n':
            line += c
            c = self.getc()
            if c == None:
              return line
            if c == '\n':
              self.ungetc()
            return line
          elif c == None:
            return line
          else:
            line += c
    else:
      return None

  # uses getc() and ungetc()
  def ungetline(self):
    if self.__file == None:
      return None 
    self.__fileposition = self.__file.tell()

    if self.__fileposition > 0:
      endOfLine = False
      line = ""

      while True:
        if self.__fileposition == self.__filesize:
          endOfLine = True
          break
        else:
          c = self.getc()
          if c == '\n':
            c = self.ungetc()
            endOfLine = True
            break

      if endOfLine:
        c = self.ungetc()
        if c == '\n':
          return '\n'
        else:
          self.getc()

        while True:
          c = self.ungetc()
          if c == None:
            return line
          if c == '\n':
            line += c
            c = self.ungetc()
            if c == None:
              return line
            if c == '\n':
              self.getc()
            return line
          elif c == None:
            return line
          else:
            line = c + line
    else:
      return None

def main():
  if len(sys.argv) == 2:
    print sys.argv[1]
    b = BufRead(sys.argv[1])

    sys.stdout.write(
      '----------------------------------\n' \
      '- TESTING GETC                    \n' \
      '----------------------------------\n')

    while True:
      c = b.getc()
      if c == None: 
        sys.stdout.write('\n')
        break
      else:
        sys.stdout.write(c)

    sys.stdout.write(
      '----------------------------------\n' \
      '- TESTING UNGETC                  \n' \
      '----------------------------------\n')

    while True:
      c = b.ungetc()
      if c == None: 
        sys.stdout.write('\n')
        break
      else:
        sys.stdout.write(c)

    sys.stdout.write(
      '----------------------------------\n' \
      '- TESTING GETLINE                 \n' \
      '----------------------------------\n')

    b.seekstart()

    while True:
      line = b.getline()
      if line == None:
        sys.stdout.write('\n')
        break
      else:
        sys.stdout.write(line)

    sys.stdout.write(
      '----------------------------------\n' \
      '- TESTING UNGETLINE               \n' \
      '----------------------------------\n')

    b.seekend()

    while True:
      line = b.ungetline()
      if line == None:
        sys.stdout.write('\n')
        break
      else:
        sys.stdout.write(line)

    b.close()

if __name__=="__main__": main()
于 2010-05-13T21:52:11.293 回答
2

如果您确实想直接使用文件指针(我认为 Mike Graham 的建议更好),您可以使用文件对象的seek()方法,它可以让您设置内部指针,结合read()方法,它支持选项参数指定您要读取的字节数。

于 2010-04-16T19:42:24.457 回答
0

我不想进行数十亿次无缓冲的单个字符文件读取,而且我想要一种
调试文件指针位置的方法。因此,我决定返回
除了字符或行之外的文件位置,并使用 mmap 将文件映射到内存。(并让 mmap
处理分页)我认为如果文件非常非常大,这将是一个问题。(如大于物理内存量)这时 mmap 将开始进入虚拟内存,事情可能会变得非常缓慢。目前,它在大约 4 分钟内处理一个 50 MB 的文件。

import sys, os, string, re, time
from mmap import mmap

class StreamReaderDb: 
  def __init__(self,stream):
    self.__stream             = mmap(stream.fileno(), os.path.getsize(stream.name))  
    self.__streamPosition     = self.__stream.tell()
    self.__stream.seek(0                    , os.SEEK_END)
    self.__streamSize         = self.__stream.tell() 
    self.__stream.seek(self.__streamPosition, os.SEEK_SET) 

  def setStreamPositionDb(self,streamPosition):
    if self.__stream == None:
      return None 
    self.__streamPosition = streamPosition
    self.__stream.seek(self.__streamPosition, os.SEEK_SET)

  def streamPositionDb(self):
    if self.__stream == None:
      return None 
    return self.__streamPosition

  def streamSize(self):
    if self.__stream == None:
      return None 
    return self.__streamSize

  def close(self): 
    if self.__stream is not None: 
      self.__stream.close() 
      self.__stream = None 

  def seekStart(self):
    if self.__stream == None:
      return None 
    self.setStreamPositionDb(0)

  def seekEnd(self):
    if self.__stream == None:
      return None 
    self.__stream.seek(-1, os.SEEK_END)
    self.setStreamPositionDb(self.__stream.tell())

  def getcDb(self):
    if self.__stream == None:
      return None,None 
    self.setStreamPositionDb(self.__stream.tell())
    if self.streamPositionDb() < self.streamSize():
      byte = self.__stream.read(1)
      self.setStreamPositionDb(self.__stream.tell())
      return byte,self.streamPositionDb()
    else:
      return None,self.streamPositionDb()

  def unGetcDb(self):
    if self.__stream == None:
      return None,None 
    self.setStreamPositionDb(self.__stream.tell())
    if self.streamPositionDb() > 0:
      self.setStreamPositionDb(self.streamPositionDb() - 1)
      byte = self.__stream.read(1)
      self.__stream.seek(self.streamPositionDb(), os.SEEK_SET)
      return byte,self.streamPositionDb()
    else:
      return None,self.streamPositionDb()

  def seekLineStartDb(self):
    if self.__stream == None:
      return None
    self.setStreamPositionDb(self.__stream.tell())

    if self.streamPositionDb() < self.streamSize():
      # Back up to the start of the line
      while True:
        if self.streamPositionDb() == 0:
          return self.streamPositionDb() 
        else:
          c,fp = self.unGetcDb()
          if c == '\n':
            c,fp = self.getcDb()
            return fp
    else:
      return None

  def seekPrevLineEndDb(self):
    if self.__stream == None:
      return None
    self.setStreamPositionDb(self.__stream.tell())

    if self.streamPositionDb() < self.streamSize():
      # Back up to the start of the line
      while True:
        if self.streamPositionDb() == 0:
          return self.streamPositionDb() 
        else:
          c,fp = self.unGetcDb()
          if c == '\n':
            return fp
    else:
      return None

  def seekPrevLineStartDb(self):
    if self.__stream == None:
      return None
    self.setStreamPositionDb(self.__stream.tell())

    if self.streamPositionDb() < self.streamSize():
      # Back up to the start of the line
      while True:
        if self.streamPositionDb() == 0:
          return self.streamPositionDb() 
        else:
          c,fp = self.unGetcDb()
          if c == '\n':
            return self.seekLineStartDb()
    else:
      return None

  def seekLineEndDb(self):
    if self.__stream == None:
      return None 
    self.setStreamPositionDb(self.__stream.tell())

    if self.streamPositionDb() > 0:
      while True:
        if self.streamPositionDb() == self.streamSize():
          return self.streamPositionDb()
        else:
          c,fp = self.getcDb()
          if c == '\n':
            c,fp = self.unGetcDb()
            return fp
    else:
      return None

  def seekNextLineEndDb(self):
    if self.__stream == None:
      return None 
    self.setStreamPositionDb(self.__stream.tell())

    if self.streamPositionDb() > 0:
      while True:
        if self.streamPositionDb() == self.streamSize():
          return self.streamPositionDb()
        else:
          c,fp = self.getcDb()
          if c == '\n':
            return fp
    else:
      return None

  def seekNextLineStartDb(self):
    if self.__stream == None:
      return None 
    self.setStreamPositionDb(self.__stream.tell())

    if self.streamPositionDb() > 0:
      while True:
        if self.streamPositionDb() == self.streamSize():
          return self.streamPositionDb()
        else:
          c,fp = self.getcDb()
          if c == '\n':
            return self.seekLineStartDb()
    else:
      return None

  # uses getc() and ungetc()
  def getLineDb(self):
    if self.__stream == None:
      return None,None 
    self.setStreamPositionDb(self.__stream.tell())

    line = ""

    if self.seekLineStartDb() != None:
      c,fp = self.getcDb()
      if c == '\n':
        return c,self.streamPositionDb()
      else:
        self.unGetcDb()

      while True:
        c,fp = self.getcDb()
        if c == '\n':
          line += c
          c,fp = self.getcDb()
          if c == None:
            return line,self.streamPositionDb()
          self.unGetcDb()
          return line,self.streamPositionDb()
        elif c == None:
          return line,self.streamPositionDb()
        else:
          line += c
    else:
      return None,self.streamPositionDb()

  # uses getc() and ungetc()
  def unGetLineDb(self):
    if self.__stream == None:
      return None,None 
    self.setStreamPositionDb(self.__stream.tell())

    line = ""

    if self.seekLineEndDb() != None:
      c,fp = self.unGetcDb()
      if c == '\n':
        return c,self.streamPositionDb()
      else:
        self.getcDb()

      while True:
        c,fp = self.unGetcDb()
        if c == None:
          return line,self.streamPositionDb()
        if c == '\n':
          line += c
          c,fp = self.unGetcDb()
          if c == None:
            return line,self.streamPositionDb()
          self.getcDb()
          return line,self.streamPositionDb()
        elif c == None:
          return line,self.streamPositionDb()
        else:
          line = c + line
    else:
      return None,self.streamPositionDb()
于 2010-05-19T19:39:27.990 回答
0

这个问题最初是因为我需要构建一个词法分析器。
getc() 和 ungetc() 起初很有用(排除读取的错误并
构建状态机) 状态机完成后,
getc() 和 ungetc() 成为一种负担,因为它们花费的时间太长直接从存储中读取。

当状态机完成(调试任何 IO 问题,
最终确定状态)后,我优化了词法分析器。

将源文件以块(或页面)的形式读入内存并
在每个页面上运行状态机会产生最佳的时间结果。

我发现如果不使用 getc() 和 ungetc()
直接从文件中读取,可以节省大量时间。

于 2010-05-20T22:57:15.133 回答
0

为您编写一个读取和缓冲输入的类,并在其上实现 ungetc —— 可能是这样的(警告:未经测试,编译时编写):

class BufRead:
    def __init__(self,filename):
        self.filename = filename
        self.fn = open(filename,'rb')
        self.buffer = []
        self.bufsize = 256
        self.ready = True
    def close(self):
        if self.fn is not None:
            self.fn.close()
            self.fn = None
        self.ready = False
    def read(self,size=1):
        l = len(self.buffer)
        if not self.ready: return None
        if l <= size:
            s = self.buffer[:size]
            self.buffer = self.buffer[size:]
            return s
        s = self.buffer
        size = size - l
        self.buffer = self.fn.read(min(self.bufsize,size))
        if self.buffer is None or len(self.buffer) == 0:
            self.ready = False
            return s
        return s + self.read(size)
    def ungetc(self,ch):
        if self.buffer is None:
            self.buffer = [ch]
        else:   
            self.buffer.append(ch)
        self.ready = True
于 2010-04-16T19:57:03.597 回答