17

我有几个可能的文件可以保存我的数据;它们可以以不同的方式压缩,所以要打开它们我需要使用file(),gzip.GzipFile()和其他也返回一个文件对象(支持with接口)。

我想尝试每一个,直到一个成功打开,所以我可以做类似的事情

try:
  with gzip.GzipFile(fn + '.gz') as f:
    result = process(f)
except (IOError, MaybeSomeGzipExceptions):
  try:
    with xCompressLib.xCompressFile(fn + '.x') as f:
      result = process(f)
  except (IOError, MaybeSomeXCompressExceptions):
    try:
      with file(fn) as f:
        result = process(f)
    except IOError:
      result = "some default value"

如果我有几十种可能的压缩变体,这显然是不可行的。(嵌套会越来越深,代码总是看起来非常相似。)

有没有更好的方法来拼写这个?

编辑:如果可能的话,我也想process(f)退出 try/except 以避免意外捕获process(f).

4

3 回答 3

9

是的,您可以将所有变体放在一个列表中并尝试它们,直到其中一个起作用,从而取消嵌套您的代码:

def process_gzip(fn):
    with gzip.GzipFile(fn + '.gz') as f:
        return process(f)

def process_xlib(fn):
    with xCompressLib.xCompressFile(fn + '.x') as f:
        return process(f)

def process_builtin(fn):
    with file(fn) as f:
        return process(f)

process_funcs = [process_gzip, process_xlib, process_builtin]

#processing code:

for process_f in process_funcs:
    try:
        result = process_f(fn)
        break
    except IOError:
        #error reading the file, keep going
        continue
    except:
        #processing error, re-raise the exception
        raise

或者,为了减少代码量,您可以创建一个 process_func 工厂,因为它们都具有相同的形式:

def make_process_func(constructor, filename_transform):
    with constructor(filename_transform) as f:
        return process(f)

process_funcs = [
    make_process_func(gzip.GzipFile, lambda fn: fn + '.gz'),
    make_process_func(xCompressLib.xCompressFile, lambda fn: fn + '.x'),
    make_process_func(file, lambda fn: fn),
]
于 2012-09-24T12:43:30.650 回答
7

我会写一个自定义上下文管理器:

from contextlib import contextmanager

filetypes = [('.gz', gzip.GzipFile, (IOError, MaybeSomeGzipExceptions)), 
             ('.x', xCompressLib.xCompressFile, (IOError, MaybeSomeXCompressExceptions))]

@contextmanager
def open_compressed(fn):
    f = None
    try:
        for ext, cls, exs in filetypes:
            try:
                f = cls(fn + ext)
            except exs:
                pass
            else:
                break
        yield f
    finally:
        if f is not None:
            f.close()

with open_compressed(fn) as f:
    result = "some default value" if f is None else process(f)

或者可能只是一个返回上下文管理器的函数:

filetypes = [('.gz', gzip.GzipFile, (IOError, MaybeSomeGzipExceptions)), 
             ('.x', xCompressLib.xCompressFile, (IOError, MaybeSomeXCompressExceptions))]

class UnknownCompressionFormat(Exception):
    pass

def open_compressed(fn):
    for ext, cls, exs in filetypes:
        try:
            return cls(fn + ext)
        except exs:
            pass
    raise UnknownCompressionFormat

try:
    with open_compressed(fn) as f:
        result = process(f)
except UnknownCompressionFormat:
    result = "some default value"
于 2012-09-24T13:38:35.683 回答
5

这会起作用吗:

extensions = [('.gz', gzip.GzipFile, (IOError, MaybeSomeGzipExceptions)), 
              ('.x', xCompressLib.xCompressFile, (IOError, MaybeSomeXCompressExceptions))] # and other such entries
processed = False
for ext, (compressor, errors) in extensions.iteritems():
    try:
        with compressor(fn+ext) as f:
            try:
                result = process(f)
                processed = True
                break
            except:
                raise
    except errors:
        pass
if not processed:
    result = "some default value"

希望有帮助

于 2012-09-24T12:48:03.820 回答