我有大量数据(几场演出)我需要用 Python 写入一个 zip 文件。我不能一次将其全部加载到内存中以传递给 ZipFile 的 .writestr 方法,而且我真的不想使用临时文件将其全部输入磁盘然后再读回。
有没有办法将生成器或类似文件的对象提供给 ZipFile 库?或者是否有某种原因似乎不支持此功能?
通过 zip 文件,我的意思是 zip 文件。在 Python zipfile 包中受支持。
唯一的解决方案是重写它用于压缩文件以从缓冲区读取的方法。将它添加到标准库中将是微不足道的;我有点惊讶它还没有完成。我收集到很多人都同意整个界面需要大修,这似乎阻碍了任何增量改进。
import zipfile, zlib, binascii, struct
class BufferedZipFile(zipfile.ZipFile):
def writebuffered(self, zipinfo, buffer):
zinfo = zipinfo
zinfo.file_size = file_size = 0
zinfo.flag_bits = 0x00
zinfo.header_offset = self.fp.tell()
self._writecheck(zinfo)
self._didModify = True
zinfo.CRC = CRC = 0
zinfo.compress_size = compress_size = 0
self.fp.write(zinfo.FileHeader())
if zinfo.compress_type == zipfile.ZIP_DEFLATED:
cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
else:
cmpr = None
while True:
buf = buffer.read(1024 * 8)
if not buf:
break
file_size = file_size + len(buf)
CRC = binascii.crc32(buf, CRC) & 0xffffffff
if cmpr:
buf = cmpr.compress(buf)
compress_size = compress_size + len(buf)
self.fp.write(buf)
if cmpr:
buf = cmpr.flush()
compress_size = compress_size + len(buf)
self.fp.write(buf)
zinfo.compress_size = compress_size
else:
zinfo.compress_size = file_size
zinfo.CRC = CRC
zinfo.file_size = file_size
position = self.fp.tell()
self.fp.seek(zinfo.header_offset + 14, 0)
self.fp.write(struct.pack("<LLL", zinfo.CRC, zinfo.compress_size, zinfo.file_size))
self.fp.seek(position, 0)
self.filelist.append(zinfo)
self.NameToInfo[zinfo.filename] = zinfo
我接受了Chris B. 的回答并创建了一个完整的解决方案。这是以防其他人感兴趣:
import os
import threading
from zipfile import *
import zlib, binascii, struct
class ZipEntryWriter(threading.Thread):
def __init__(self, zf, zinfo, fileobj):
self.zf = zf
self.zinfo = zinfo
self.fileobj = fileobj
zinfo.file_size = 0
zinfo.flag_bits = 0x00
zinfo.header_offset = zf.fp.tell()
zf._writecheck(zinfo)
zf._didModify = True
zinfo.CRC = 0
zinfo.compress_size = compress_size = 0
zf.fp.write(zinfo.FileHeader())
super(ZipEntryWriter, self).__init__()
def run(self):
zinfo = self.zinfo
zf = self.zf
file_size = 0
CRC = 0
if zinfo.compress_type == ZIP_DEFLATED:
cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
else:
cmpr = None
while True:
buf = self.fileobj.read(1024 * 8)
if not buf:
self.fileobj.close()
break
file_size = file_size + len(buf)
CRC = binascii.crc32(buf, CRC)
if cmpr:
buf = cmpr.compress(buf)
compress_size = compress_size + len(buf)
zf.fp.write(buf)
if cmpr:
buf = cmpr.flush()
compress_size = compress_size + len(buf)
zf.fp.write(buf)
zinfo.compress_size = compress_size
else:
zinfo.compress_size = file_size
zinfo.CRC = CRC
zinfo.file_size = file_size
position = zf.fp.tell()
zf.fp.seek(zinfo.header_offset + 14, 0)
zf.fp.write(struct.pack("<lLL", zinfo.CRC, zinfo.compress_size, zinfo.file_size))
zf.fp.seek(position, 0)
zf.filelist.append(zinfo)
zf.NameToInfo[zinfo.filename] = zinfo
class EnhZipFile(ZipFile, object):
def _current_writer(self):
return hasattr(self, 'cur_writer') and self.cur_writer or None
def assert_no_current_writer(self):
cur_writer = self._current_writer()
if cur_writer and cur_writer.isAlive():
raise ValueError('An entry is already started for name: %s' % cur_write.zinfo.filename)
def write(self, filename, arcname=None, compress_type=None):
self.assert_no_current_writer()
super(EnhZipFile, self).write(filename, arcname, compress_type)
def writestr(self, zinfo_or_arcname, bytes):
self.assert_no_current_writer()
super(EnhZipFile, self).writestr(zinfo_or_arcname, bytes)
def close(self):
self.finish_entry()
super(EnhZipFile, self).close()
def start_entry(self, zipinfo):
"""
Start writing a new entry with the specified ZipInfo and return a
file like object. Any data written to the file like object is
read by a background thread and written directly to the zip file.
Make sure to close the returned file object, before closing the
zipfile, or the close() would end up hanging indefinitely.
Only one entry can be open at any time. If multiple entries need to
be written, make sure to call finish_entry() before calling any of
these methods:
- start_entry
- write
- writestr
It is not necessary to explicitly call finish_entry() before closing
zipfile.
Example:
zf = EnhZipFile('tmp.zip', 'w')
w = zf.start_entry(ZipInfo('t.txt'))
w.write("some text")
w.close()
zf.close()
"""
self.assert_no_current_writer()
r, w = os.pipe()
self.cur_writer = ZipEntryWriter(self, zipinfo, os.fdopen(r, 'r'))
self.cur_writer.start()
return os.fdopen(w, 'w')
def finish_entry(self, timeout=None):
"""
Ensure that the ZipEntry that is currently being written is finished.
Joins on any background thread to exit. It is safe to call this method
multiple times.
"""
cur_writer = self._current_writer()
if not cur_writer or not cur_writer.isAlive():
return
cur_writer.join(timeout)
if __name__ == "__main__":
zf = EnhZipFile('c:/tmp/t.zip', 'w')
import time
w = zf.start_entry(ZipInfo('t.txt', time.localtime()[:6]))
w.write("Line1\n")
w.write("Line2\n")
w.close()
zf.finish_entry()
w = zf.start_entry(ZipInfo('p.txt', time.localtime()[:6]))
w.write("Some text\n")
w.close()
zf.close()
在Python 3.5中更改(来自官方文档):添加了对写入不可搜索流的支持。
这意味着现在zipfile.ZipFile
我们可以使用不将整个文件存储在内存中的流。此类流不支持在整个数据量上移动。
所以这是一个简单的生成器:
from zipfile import ZipFile, ZipInfo
def zipfile_generator(path, stream):
with ZipFile(stream, mode='w') as zf:
z_info = ZipInfo.from_file(path)
with open(path, 'rb') as entry, zf.open(z_info, mode='w') as dest:
for chunk in iter(lambda: entry.read(16384), b''):
dest.write(chunk)
# Yield chunk of the zip file stream in bytes.
yield stream.get()
# ZipFile was closed.
yield stream.get()
path
是大文件或目录或pathlike
对象的字符串路径。
stream
是这样的类的不可搜索的流实例(根据官方文档设计):
from io import RawIOBase
class UnseekableStream(RawIOBase):
def __init__(self):
self._buffer = b''
def writable(self):
return True
def write(self, b):
if self.closed:
raise ValueError('Stream was closed!')
self._buffer += b
return len(b)
def get(self):
chunk = self._buffer
self._buffer = b''
return chunk
您可以在线尝试此代码:https ://repl.it/@IvanErgunov/zipfilegenerator
还有另一种创建生成器的方法,无需ZipInfo
手动读取和分割大文件。您可以将queue.Queue()
对象传递给您的UnseekableStream()
对象并在另一个线程中写入此队列。然后在当前线程中,您可以简单地以可迭代的方式从此队列中读取块。查看文档
allanlei 的PS Python Zipstream是过时且不可靠的方式。这是在正式完成之前尝试添加对不可搜索流的支持。
gzip.GzipFile 将数据写入 gzipped chunks ,您可以根据从文件中读取的行数来设置块的大小。
一个例子:
file = gzip.GzipFile('blah.gz', 'wb')
sourcefile = open('source', 'rb')
chunks = []
for line in sourcefile:
chunks.append(line)
if len(chunks) >= X:
file.write("".join(chunks))
file.flush()
chunks = []
基本压缩由 zlib.compressobj 完成。ZipFile(在 MacOSX 上的 Python 2.5 下似乎已编译)。Python 2.3 版本如下。
您可以看到它以 8k 块构建压缩文件。提取源文件信息很复杂,因为很多源文件属性(如未压缩大小)都记录在 zip 文件头中。
def write(self, filename, arcname=None, compress_type=None):
"""Put the bytes from filename into the archive under the name
arcname."""
st = os.stat(filename)
mtime = time.localtime(st.st_mtime)
date_time = mtime[0:6]
# Create ZipInfo instance to store file information
if arcname is None:
zinfo = ZipInfo(filename, date_time)
else:
zinfo = ZipInfo(arcname, date_time)
zinfo.external_attr = st[0] << 16L # Unix attributes
if compress_type is None:
zinfo.compress_type = self.compression
else:
zinfo.compress_type = compress_type
self._writecheck(zinfo)
fp = open(filename, "rb")
zinfo.flag_bits = 0x00
zinfo.header_offset = self.fp.tell() # Start of header bytes
# Must overwrite CRC and sizes with correct data later
zinfo.CRC = CRC = 0
zinfo.compress_size = compress_size = 0
zinfo.file_size = file_size = 0
self.fp.write(zinfo.FileHeader())
zinfo.file_offset = self.fp.tell() # Start of file bytes
if zinfo.compress_type == ZIP_DEFLATED:
cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
zlib.DEFLATED, -15)
else:
cmpr = None
while 1:
buf = fp.read(1024 * 8)
if not buf:
break
file_size = file_size + len(buf)
CRC = binascii.crc32(buf, CRC)
if cmpr:
buf = cmpr.compress(buf)
compress_size = compress_size + len(buf)
self.fp.write(buf)
fp.close()
if cmpr:
buf = cmpr.flush()
compress_size = compress_size + len(buf)
self.fp.write(buf)
zinfo.compress_size = compress_size
else:
zinfo.compress_size = file_size
zinfo.CRC = CRC
zinfo.file_size = file_size
# Seek backwards and write CRC and file sizes
position = self.fp.tell() # Preserve current position in file
self.fp.seek(zinfo.header_offset + 14, 0)
self.fp.write(struct.pack("<lLL", zinfo.CRC, zinfo.compress_size,
zinfo.file_size))
self.fp.seek(position, 0)
self.filelist.append(zinfo)
self.NameToInfo[zinfo.filename] = zinfo
一些(很多?大多数?)压缩算法是基于查看整个 文件的冗余。
一些压缩库会根据对文件最有效的几种压缩算法进行选择。
我相信 ZipFile 模块会执行此操作,因此它希望查看整个文件,而不仅仅是一次查看文件。
因此,它不适用于生成器或大文件以加载到内存中。这可以解释 Zipfile 库的局限性。
如果有人偶然发现这个问题,这个问题在 2017 年仍然与 Python 2.7 相关,这里是一个真正的流 zip 文件的工作解决方案,不需要像其他情况那样输出可搜索。秘诀是设置通用位标志的第 3 位(参见https://pkware.cachefly.net/webdocs/casestudies/APPNOTE.TXT第 4.3.9.1 节)。
请注意,此实现将始终创建一个 ZIP64 样式的文件,允许流处理任意大的文件。它包括一个丑陋的黑客来强制中央目录记录的 zip64 结尾,所以请注意它会导致您的进程编写的所有 zipfile 变成 ZIP64 样式。
import io
import zipfile
import zlib
import binascii
import struct
class ByteStreamer(io.BytesIO):
'''
Variant on BytesIO which lets you write and consume data while
keeping track of the total filesize written. When data is consumed
it is removed from memory, keeping the memory requirements low.
'''
def __init__(self):
super(ByteStreamer, self).__init__()
self._tellall = 0
def tell(self):
return self._tellall
def write(self, b):
orig_size = super(ByteStreamer, self).tell()
super(ByteStreamer, self).write(b)
new_size = super(ByteStreamer, self).tell()
self._tellall += (new_size - orig_size)
def consume(self):
bytes = self.getvalue()
self.seek(0)
self.truncate(0)
return bytes
class BufferedZipFileWriter(zipfile.ZipFile):
'''
ZipFile writer with true streaming (input and output).
Created zip files are always ZIP64-style because it is the only safe way to stream
potentially large zip files without knowing the full size ahead of time.
Example usage:
>>> def stream():
>>> bzfw = BufferedZip64FileWriter()
>>> for arc_path, buffer in inputs: # buffer is a file-like object which supports read(size)
>>> for chunk in bzfw.streambuffer(arc_path, buffer):
>>> yield chunk
>>> yield bzfw.close()
'''
def __init__(self, compression=zipfile.ZIP_DEFLATED):
self._buffer = ByteStreamer()
super(BufferedZipFileWriter, self).__init__(self._buffer, mode='w', compression=compression, allowZip64=True)
def streambuffer(self, zinfo_or_arcname, buffer, chunksize=2**16):
if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
zinfo = zipfile.ZipInfo(filename=zinfo_or_arcname,
date_time=time.localtime(time.time())[:6])
zinfo.compress_type = self.compression
zinfo.external_attr = 0o600 << 16 # ?rw-------
else:
zinfo = zinfo_or_arcname
zinfo.file_size = file_size = 0
zinfo.flag_bits = 0x08 # Streaming mode: crc and size come after the data
zinfo.header_offset = self.fp.tell()
self._writecheck(zinfo)
self._didModify = True
zinfo.CRC = CRC = 0
zinfo.compress_size = compress_size = 0
self.fp.write(zinfo.FileHeader())
if zinfo.compress_type == zipfile.ZIP_DEFLATED:
cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
else:
cmpr = None
while True:
buf = buffer.read(chunksize)
if not buf:
break
file_size += len(buf)
CRC = binascii.crc32(buf, CRC) & 0xffffffff
if cmpr:
buf = cmpr.compress(buf)
compress_size += len(buf)
self.fp.write(buf)
compressed_bytes = self._buffer.consume()
if compressed_bytes:
yield compressed_bytes
if cmpr:
buf = cmpr.flush()
compress_size += len(buf)
self.fp.write(buf)
zinfo.compress_size = compress_size
compressed_bytes = self._buffer.consume()
if compressed_bytes:
yield compressed_bytes
else:
zinfo.compress_size = file_size
zinfo.CRC = CRC
zinfo.file_size = file_size
# Write CRC and file sizes after the file data
# Always write as zip64 -- only safe way to stream what might become a large zipfile
fmt = '<LQQ'
self.fp.write(struct.pack(fmt, zinfo.CRC, zinfo.compress_size, zinfo.file_size))
self.fp.flush()
self.filelist.append(zinfo)
self.NameToInfo[zinfo.filename] = zinfo
yield self._buffer.consume()
# The close method needs to be patched to force writing a ZIP64 file
# We'll hack ZIP_FILECOUNT_LIMIT to do the forcing
def close(self):
tmp = zipfile.ZIP_FILECOUNT_LIMIT
zipfile.ZIP_FILECOUNT_LIMIT = 0
super(BufferedZipFileWriter, self).close()
zipfile.ZIP_FILECOUNT_LIMIT = tmp
return self._buffer.consume()
gzip 库将采用类似文件的对象进行压缩。
class GzipFile([filename [,mode [,compresslevel [,fileobj]]]])
您仍然需要提供一个名义文件名以包含在 zip 文件中,但您可以将数据源传递给 fileobj。
(这个答案与 Damnsweet 的答案不同,重点应该放在增量读取的数据源上,而不是增量写入的压缩文件上。)
我现在看到原来的提问者不会接受 Gzip :-(
现在使用 python 2.7,您可以将数据添加到文件的 zipfile 中:
http://docs.python.org/2/library/zipfile#zipfile.ZipFile.writestr
这是 2017 年。如果您仍然希望优雅地做到这一点,请使用allanlei 的 Python Zipstream。到目前为止,它可能是唯一一个编写良好的库来实现这一点。