27

我正在尝试找出使用 Python 压缩流的最佳方法zlib

我有一个类似文件的输入流(input下面的)和一个接受类似文件的输出函数(output_function下面的):

with open("file") as input:
    output_function(input)

而且我想input在将块发送到之前对其进行 gzip 压缩output_function

with open("file") as input:
    output_function(gzip_stream(input))

看起来gzip模块假定输入或输出将是 gzip 的磁盘文件……所以我假设zlib模块是我想要的。

但是,它本身并没有提供一种简单的方法来创建类似流文件的...并且它支持的流压缩是通过手动将数据添加到压缩缓冲区,然后刷新该缓冲区来实现的。

当然,我可以写一个包装器zlib.Compress.compresszlib.Compress.flush(Compresszlib.compressobj()) 返回,但我会担心缓冲区大小错误或类似的东西。

那么,使用 Python 创建流式 gzip 压缩文件的最简单方法是什么?

编辑:为了澄清,输入流和压缩输出流都太大而无法放入内存,所以类似output_function(StringIO(zlib.compress(input.read())))的东西并不能真正解决问题。

4

6 回答 6

13

它非常笨拙(自我引用等;只需花几分钟编写它,没有什么真正优雅的),但如果您仍然对使用gzip而不是zlib直接感兴趣,它可以满足您的需求。

基本上,GzipWrap是一个(非常有限的)类文件对象,它从给定的可迭代对象中生成一个 gzip 压缩文件(例如,类文件对象、字符串列表、任何生成器......)

当然,它会生成二进制文件,因此实现“readline”是没有意义的。

您应该能够扩展它以涵盖其他情况或用作可迭代对象本身。

from gzip import GzipFile

class GzipWrap(object):
    # input is a filelike object that feeds the input
    def __init__(self, input, filename = None):
        self.input = input
        self.buffer = ''
        self.zipper = GzipFile(filename, mode = 'wb', fileobj = self)

    def read(self, size=-1):
        if (size < 0) or len(self.buffer) < size:
            for s in self.input:
                self.zipper.write(s)
                if size > 0 and len(self.buffer) >= size:
                    self.zipper.flush()
                    break
            else:
                self.zipper.close()
            if size < 0:
                ret = self.buffer
                self.buffer = ''
        else:
            ret, self.buffer = self.buffer[:size], self.buffer[size:]
        return ret

    def flush(self):
        pass

    def write(self, data):
        self.buffer += data

    def close(self):
        self.input.close()
于 2010-02-03T16:29:39.163 回答
9

这是一个基于 Ricardo Cárdenes 非常有用的答案的更简洁、非自引用的版本。

from gzip import GzipFile
from collections import deque


CHUNK = 16 * 1024


class Buffer (object):
    def __init__ (self):
        self.__buf = deque()
        self.__size = 0
    def __len__ (self):
        return self.__size
    def write (self, data):
        self.__buf.append(data)
        self.__size += len(data)
    def read (self, size=-1):
        if size < 0: size = self.__size
        ret_list = []
        while size > 0 and len(self.__buf):
            s = self.__buf.popleft()
            size -= len(s)
            ret_list.append(s)
        if size < 0:
            ret_list[-1], remainder = ret_list[-1][:size], ret_list[-1][size:]
            self.__buf.appendleft(remainder)
        ret = ''.join(ret_list)
        self.__size -= len(ret)
        return ret
    def flush (self):
        pass
    def close (self):
        pass


class GzipCompressReadStream (object):
    def __init__ (self, fileobj):
        self.__input = fileobj
        self.__buf = Buffer()
        self.__gzip = GzipFile(None, mode='wb', fileobj=self.__buf)
    def read (self, size=-1):
        while size < 0 or len(self.__buf) < size:
            s = self.__input.read(CHUNK)
            if not s:
                self.__gzip.close()
                break
            self.__gzip.write(s)
        return self.__buf.read(size)

好处:

  • 避免重复的字符串连接,这会导致整个字符串被重复复制。
  • 从输入流中读取固定的 CHUNK 大小,而不是一次读取整行(可以任意长)。
  • 避免循环引用。
  • 避免 GzipCompressStream() 的误导性公共“写入”方法,该方法实际上仅在内部使用。
  • 利用内部成员变量的名称修饰。
于 2015-07-22T14:28:45.273 回答
5

gzip 模块支持压缩为类文件对象,将 fileobj 参数传递给 GzipFile,以及文件名。您传入的文件名不需要存在,但 gzip 标头有一个需要填写的文件名字段。

更新

这个答案不起作用。例子:

# tmp/try-gzip.py 
import sys
import gzip

fd=gzip.GzipFile(fileobj=sys.stdin)
sys.stdout.write(fd.read())

输出:

===> cat .bash_history  | python tmp/try-gzip.py  > tmp/history.gzip
Traceback (most recent call last):
  File "tmp/try-gzip.py", line 7, in <module>
    sys.stdout.write(fd.read())
  File "/usr/lib/python2.7/gzip.py", line 254, in read
    self._read(readsize)
  File "/usr/lib/python2.7/gzip.py", line 288, in _read
    pos = self.fileobj.tell()   # Save current position
IOError: [Errno 29] Illegal seek
于 2010-02-03T14:35:26.713 回答
2

将 cStringIO(或 StringIO)模块与 zlib 结合使用:

>>> import zlib
>>> from cStringIO import StringIO
>>> s.write(zlib.compress("I'm a lumberjack"))
>>> s.seek(0)
>>> zlib.decompress(s.read())
"I'm a lumberjack"
于 2010-02-03T14:49:08.890 回答
0

由可重用组件组成的更清洁、更通用的版本:

gzipped_iter = igizip(io_iter(input_file_obj))
gzipped_file_obj = iter_io(prefetch(gzipped_iter))

上面的功能来自我的要点

  • iter_ioio_iter提供与Iterable[AnyStr]<->之间的透明转换SupportsRead[AnyStr]
  • igzip流式gzip压缩
  • (可选)prefetch通过线程同时从底层迭代中拉取,正常让给消费者,用于并发读/写
def as_bytes(s: str | bytes):
    if type(s) not in [str, bytes]:
        raise TypeError
    return s.encode() if isinstance(s, str) else s


def iter_io(iterable: Iterable[AnyStr], buffer_size: int = io.DEFAULT_BUFFER_SIZE):
    """
    Returns a buffered file obj that reads bytes from an iterable of str/bytes.

    Example:

    iter_io(['abc', 'def', 'g']).read() == b'abcdefg'
    iter_io([b'abcd', b'efg']).read(5) == b'abcde'
    """
    class IterIO(io.RawIOBase):
        def __init__(self, iterable: Iterable[AnyStr]):
            self._leftover = b''
            self._iterable = (as_bytes(s) for s in iterable if s)

        def readable(self):
            return True

        def readinto(self, buf):
            try:
                chunk = self._leftover or next(self._iterable)
            except StopIteration:
                return 0    # indicate EOF

            output, self._leftover = chunk[:len(buf)], chunk[len(buf):]
            buf[:len(output)] = output
            return len(output)

    return io.BufferedReader(IterIO(iterable), buffer_size=buffer_size)


def io_iter(fo: SupportsRead[AnyStr], size: int = io.DEFAULT_BUFFER_SIZE):
    """
    Returns an iterator that reads from a file obj in sized chunks.

    Example:

    list(io_iter(io.StringIO('abcdefg'), 3)) == ['abc', 'def', 'g']
    list(io_iter(io.BytesIO(b'abcdefg'), 4)) == [b'abcd', b'efg']

    Usage notes/TODO:
     * file obj isn't closed, fix /w keep_open=False and an internal contextmanager
    """
    return iter(lambda: fo.read(size), fo.read(0))


def igzip(chunks: Iterable[AnyStr], level=zlib.Z_DEFAULT_COMPRESSION):
    """
    Streaming gzip: lazily compresses an iterable of bytes or str (utf8)

    Example:

    gzipped_bytes_iter = igzip(['hello ', 'world!'])
    gzip.decompress(b''.join(gzipped_bytes_iter)).encode() == 'hello world!'
    """
    def gen():
        gzip_format = 0b10000
        c = zlib.compressobj(level=level, wbits=zlib.MAX_WBITS + gzip_format)

        yield from (c.compress(as_bytes(chunk)) for chunk in chunks)
        yield c.flush()

    return filter(None, gen())


def prefetch(iterable: Iterable[Any], n: int = 1) -> Iterator[Any]:
    """
    Prefetch an iterable via thread, yielding original contents as normal.

    Example:

    def slow_produce(*args):
        for x in args:
            time.sleep(1)
            yield x

    def slow_consume(iterable):
        for _ in iterable:
            time.sleep(1)

    slow_consume(prefetch(slow_produce('a', 'b')))  # takes 3 sec, not 4

    # Prefetch
    # produce: | 'a' | 'b' |
    # consume:       | 'a' | 'b' |
    # seconds: 0 --- 1 --- 2 --- 3

    # No prefetch
    # produce: | 'a' |     | 'b' |
    # consume:       | 'a' |     | 'b' |
    # seconds: 0 --- 1 --- 2 --- 3 --- 4

    Usage notes/TODO:
     * mem leak: Thread is GC'd only after iterable is fully consumed, fix /w __del__
    """
    queue = Queue(n)
    finished = object()

    def produce():
        for x in iterable:
            queue.put(x)
        queue.put(finished)

    t = Thread(target=produce, daemon=True)
    t.start()

    while True:
        item = queue.get()
        if item is finished:
            break
        else:
            yield item
于 2021-08-27T21:22:20.147 回答
-1

这有效(至少在python 3中):

with s3.open(path, 'wb') as f:
    gz = gzip.GzipFile(filename, 'wb', 9, f)
    gz.write(b'hello')
    gz.flush()
    gz.close()

在这里,它使用 gzip 压缩写入 s3fs 的文件对象。神奇的是f参数,它是 GzipFile 的fileobj. 您必须为 gzip 的标头提供文件名。

于 2019-05-08T09:04:24.323 回答