diff options
author | shadchin <shadchin@yandex-team.ru> | 2022-02-10 16:44:30 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:44:30 +0300 |
commit | 2598ef1d0aee359b4b6d5fdd1758916d5907d04f (patch) | |
tree | 012bb94d777798f1f56ac1cec429509766d05181 /contrib/tools/python3/src/Lib/zipfile.py | |
parent | 6751af0b0c1b952fede40b19b71da8025b5d8bcf (diff) | |
download | ydb-2598ef1d0aee359b4b6d5fdd1758916d5907d04f.tar.gz |
Restoring authorship annotation for <shadchin@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/tools/python3/src/Lib/zipfile.py')
-rw-r--r-- | contrib/tools/python3/src/Lib/zipfile.py | 800 |
1 files changed, 400 insertions, 400 deletions
diff --git a/contrib/tools/python3/src/Lib/zipfile.py b/contrib/tools/python3/src/Lib/zipfile.py index 816f8582bb..23e1605f4e 100644 --- a/contrib/tools/python3/src/Lib/zipfile.py +++ b/contrib/tools/python3/src/Lib/zipfile.py @@ -3,19 +3,19 @@ Read and write ZIP files. XXX references to utf-8 need further investigation. """ -import binascii -import importlib.util +import binascii +import importlib.util import io -import itertools +import itertools import os -import posixpath -import shutil +import posixpath +import shutil import stat import struct -import sys +import sys import threading -import time -import contextlib +import time +import contextlib try: import zlib # We may need its compression method @@ -36,8 +36,8 @@ except ImportError: __all__ = ["BadZipFile", "BadZipfile", "error", "ZIP_STORED", "ZIP_DEFLATED", "ZIP_BZIP2", "ZIP_LZMA", - "is_zipfile", "ZipInfo", "ZipFile", "PyZipFile", "LargeZipFile", - "Path"] + "is_zipfile", "ZipInfo", "ZipFile", "PyZipFile", "LargeZipFile", + "Path"] class BadZipFile(Exception): pass @@ -228,7 +228,7 @@ def _EndRecData64(fpin, offset, endrec): if sig != stringEndArchive64Locator: return endrec - if diskno != 0 or disks > 1: + if diskno != 0 or disks > 1: raise BadZipFile("zipfiles that span multiple disks are not supported") # Assume no 'zip64 extensible data' @@ -377,8 +377,8 @@ class ZipInfo (object): self.volume = 0 # Volume number of file header self.internal_attr = 0 # Internal attributes self.external_attr = 0 # External file attributes - self.compress_size = 0 # Size of the compressed file - self.file_size = 0 # Size of the uncompressed file + self.compress_size = 0 # Size of the compressed file + self.file_size = 0 # Size of the uncompressed file # Other attributes are set by class ZipFile: # header_offset Byte offset to the file header # CRC CRC-32 of the uncompressed file @@ -466,28 +466,28 @@ class ZipInfo (object): if ln+4 > len(extra): raise BadZipFile("Corrupt extra field %04x (size=%d)" % (tp, ln)) if tp == 0x0001: - data = extra[4:ln+4] + data = extra[4:ln+4] # ZIP64 extension (large files and/or large archives) - try: - if self.file_size in (0xFFFF_FFFF_FFFF_FFFF, 0xFFFF_FFFF): - field = "File size" - self.file_size, = unpack('<Q', data[:8]) - data = data[8:] - if self.compress_size == 0xFFFF_FFFF: - field = "Compress size" - self.compress_size, = unpack('<Q', data[:8]) - data = data[8:] - if self.header_offset == 0xFFFF_FFFF: - field = "Header offset" - self.header_offset, = unpack('<Q', data[:8]) - except struct.error: - raise BadZipFile(f"Corrupt zip64 extra field. " - f"{field} not found.") from None + try: + if self.file_size in (0xFFFF_FFFF_FFFF_FFFF, 0xFFFF_FFFF): + field = "File size" + self.file_size, = unpack('<Q', data[:8]) + data = data[8:] + if self.compress_size == 0xFFFF_FFFF: + field = "Compress size" + self.compress_size, = unpack('<Q', data[:8]) + data = data[8:] + if self.header_offset == 0xFFFF_FFFF: + field = "Header offset" + self.header_offset, = unpack('<Q', data[:8]) + except struct.error: + raise BadZipFile(f"Corrupt zip64 extra field. " + f"{field} not found.") from None extra = extra[ln+4:] @classmethod - def from_file(cls, filename, arcname=None, *, strict_timestamps=True): + def from_file(cls, filename, arcname=None, *, strict_timestamps=True): """Construct an appropriate ZipInfo for a file on the filesystem. filename should be the path to a file or directory on the filesystem. @@ -502,10 +502,10 @@ class ZipInfo (object): isdir = stat.S_ISDIR(st.st_mode) mtime = time.localtime(st.st_mtime) date_time = mtime[0:6] - if not strict_timestamps and date_time[0] < 1980: - date_time = (1980, 1, 1, 0, 0, 0) - elif not strict_timestamps and date_time[0] > 2107: - date_time = (2107, 12, 31, 23, 59, 59) + if not strict_timestamps and date_time[0] < 1980: + date_time = (1980, 1, 1, 0, 0, 0) + elif not strict_timestamps and date_time[0] > 2107: + date_time = (2107, 12, 31, 23, 59, 59) # Create ZipInfo instance to store file information if arcname is None: arcname = filename @@ -695,7 +695,7 @@ def _get_compressor(compress_type, compresslevel=None): def _get_decompressor(compress_type): - _check_compression(compress_type) + _check_compression(compress_type) if compress_type == ZIP_STORED: return None elif compress_type == ZIP_DEFLATED: @@ -784,10 +784,10 @@ class ZipExtFile(io.BufferedIOBase): # Chunk size to read during seek MAX_SEEK_READ = 1 << 24 - def __init__(self, fileobj, mode, zipinfo, pwd=None, + def __init__(self, fileobj, mode, zipinfo, pwd=None, close_fileobj=False): self._fileobj = fileobj - self._pwd = pwd + self._pwd = pwd self._close_fileobj = close_fileobj self._compress_type = zipinfo.compress_type @@ -822,30 +822,30 @@ class ZipExtFile(io.BufferedIOBase): except AttributeError: pass - self._decrypter = None - if pwd: - if zipinfo.flag_bits & 0x8: - # compare against the file type from extended local headers - check_byte = (zipinfo._raw_time >> 8) & 0xff - else: - # compare against the CRC otherwise - check_byte = (zipinfo.CRC >> 24) & 0xff - h = self._init_decrypter() - if h != check_byte: - raise RuntimeError("Bad password for file %r" % zipinfo.orig_filename) - - - def _init_decrypter(self): - self._decrypter = _ZipDecrypter(self._pwd) - # The first 12 bytes in the cypher stream is an encryption header - # used to strengthen the algorithm. The first 11 bytes are - # completely random, while the 12th contains the MSB of the CRC, - # or the MSB of the file time depending on the header type - # and is used to check the correctness of the password. - header = self._fileobj.read(12) - self._compress_left -= 12 - return self._decrypter(header)[11] - + self._decrypter = None + if pwd: + if zipinfo.flag_bits & 0x8: + # compare against the file type from extended local headers + check_byte = (zipinfo._raw_time >> 8) & 0xff + else: + # compare against the CRC otherwise + check_byte = (zipinfo.CRC >> 24) & 0xff + h = self._init_decrypter() + if h != check_byte: + raise RuntimeError("Bad password for file %r" % zipinfo.orig_filename) + + + def _init_decrypter(self): + self._decrypter = _ZipDecrypter(self._pwd) + # The first 12 bytes in the cypher stream is an encryption header + # used to strengthen the algorithm. The first 11 bytes are + # completely random, while the 12th contains the MSB of the CRC, + # or the MSB of the file time depending on the header type + # and is used to check the correctness of the password. + header = self._fileobj.read(12) + self._compress_left -= 12 + return self._decrypter(header)[11] + def __repr__(self): result = ['<%s.%s' % (self.__class__.__module__, self.__class__.__qualname__)] @@ -890,16 +890,16 @@ class ZipExtFile(io.BufferedIOBase): return self._readbuffer[self._offset: self._offset + 512] def readable(self): - if self.closed: - raise ValueError("I/O operation on closed file.") + if self.closed: + raise ValueError("I/O operation on closed file.") return True def read(self, n=-1): """Read and return up to n bytes. - If the argument is omitted, None, or negative, data is read and returned until EOF is reached. + If the argument is omitted, None, or negative, data is read and returned until EOF is reached. """ - if self.closed: - raise ValueError("read from closed file.") + if self.closed: + raise ValueError("read from closed file.") if n is None or n < 0: buf = self._readbuffer[self._offset:] self._readbuffer = b'' @@ -1036,13 +1036,13 @@ class ZipExtFile(io.BufferedIOBase): super().close() def seekable(self): - if self.closed: - raise ValueError("I/O operation on closed file.") + if self.closed: + raise ValueError("I/O operation on closed file.") return self._seekable def seek(self, offset, whence=0): - if self.closed: - raise ValueError("seek on closed file.") + if self.closed: + raise ValueError("seek on closed file.") if not self._seekable: raise io.UnsupportedOperation("underlying stream is not seekable") curr_pos = self.tell() @@ -1080,8 +1080,8 @@ class ZipExtFile(io.BufferedIOBase): self._decompressor = _get_decompressor(self._compress_type) self._eof = False read_offset = new_pos - if self._decrypter is not None: - self._init_decrypter() + if self._decrypter is not None: + self._init_decrypter() while read_offset > 0: read_len = min(self.MAX_SEEK_READ, read_offset) @@ -1091,8 +1091,8 @@ class ZipExtFile(io.BufferedIOBase): return self.tell() def tell(self): - if self.closed: - raise ValueError("tell on closed file.") + if self.closed: + raise ValueError("tell on closed file.") if not self._seekable: raise io.UnsupportedOperation("underlying stream is not seekable") filepos = self._orig_file_size - self._left - len(self._readbuffer) + self._offset @@ -1132,51 +1132,51 @@ class _ZipWriteFile(io.BufferedIOBase): def close(self): if self.closed: return - try: - super().close() - # Flush any data from the compressor, and update header info - if self._compressor: - buf = self._compressor.flush() - self._compress_size += len(buf) - self._fileobj.write(buf) - self._zinfo.compress_size = self._compress_size - else: - self._zinfo.compress_size = self._file_size - self._zinfo.CRC = self._crc - self._zinfo.file_size = self._file_size - - # Write updated header info - if self._zinfo.flag_bits & 0x08: - # Write CRC and file sizes after the file data - fmt = '<LLQQ' if self._zip64 else '<LLLL' - self._fileobj.write(struct.pack(fmt, _DD_SIGNATURE, self._zinfo.CRC, - self._zinfo.compress_size, self._zinfo.file_size)) - self._zipfile.start_dir = self._fileobj.tell() - else: - if not self._zip64: - if self._file_size > ZIP64_LIMIT: - raise RuntimeError( - 'File size unexpectedly exceeded ZIP64 limit') - if self._compress_size > ZIP64_LIMIT: - raise RuntimeError( - 'Compressed size unexpectedly exceeded ZIP64 limit') - # Seek backwards and write file header (which will now include - # correct CRC and file sizes) - - # Preserve current position in file - self._zipfile.start_dir = self._fileobj.tell() - self._fileobj.seek(self._zinfo.header_offset) - self._fileobj.write(self._zinfo.FileHeader(self._zip64)) - self._fileobj.seek(self._zipfile.start_dir) - - # Successfully written: Add file to our caches - self._zipfile.filelist.append(self._zinfo) - self._zipfile.NameToInfo[self._zinfo.filename] = self._zinfo - finally: - self._zipfile._writing = False - - - + try: + super().close() + # Flush any data from the compressor, and update header info + if self._compressor: + buf = self._compressor.flush() + self._compress_size += len(buf) + self._fileobj.write(buf) + self._zinfo.compress_size = self._compress_size + else: + self._zinfo.compress_size = self._file_size + self._zinfo.CRC = self._crc + self._zinfo.file_size = self._file_size + + # Write updated header info + if self._zinfo.flag_bits & 0x08: + # Write CRC and file sizes after the file data + fmt = '<LLQQ' if self._zip64 else '<LLLL' + self._fileobj.write(struct.pack(fmt, _DD_SIGNATURE, self._zinfo.CRC, + self._zinfo.compress_size, self._zinfo.file_size)) + self._zipfile.start_dir = self._fileobj.tell() + else: + if not self._zip64: + if self._file_size > ZIP64_LIMIT: + raise RuntimeError( + 'File size unexpectedly exceeded ZIP64 limit') + if self._compress_size > ZIP64_LIMIT: + raise RuntimeError( + 'Compressed size unexpectedly exceeded ZIP64 limit') + # Seek backwards and write file header (which will now include + # correct CRC and file sizes) + + # Preserve current position in file + self._zipfile.start_dir = self._fileobj.tell() + self._fileobj.seek(self._zinfo.header_offset) + self._fileobj.write(self._zinfo.FileHeader(self._zip64)) + self._fileobj.seek(self._zipfile.start_dir) + + # Successfully written: Add file to our caches + self._zipfile.filelist.append(self._zinfo) + self._zipfile.NameToInfo[self._zinfo.filename] = self._zinfo + finally: + self._zipfile._writing = False + + + class ZipFile: """ Class with methods to open, read, write, close, list zip files. @@ -1204,7 +1204,7 @@ class ZipFile: _windows_illegal_name_trans_table = None def __init__(self, file, mode="r", compression=ZIP_STORED, allowZip64=True, - compresslevel=None, *, strict_timestamps=True): + compresslevel=None, *, strict_timestamps=True): """Open the ZIP file with mode read 'r', write 'w', exclusive create 'x', or append 'a'.""" if mode not in ('r', 'w', 'x', 'a'): @@ -1222,7 +1222,7 @@ class ZipFile: self.mode = mode self.pwd = None self._comment = b'' - self._strict_timestamps = strict_timestamps + self._strict_timestamps = strict_timestamps # Check if we were passed a file-like object if isinstance(file, os.PathLike): @@ -1534,7 +1534,7 @@ class ZipFile: # strong encryption raise NotImplementedError("strong encryption (flag bit 6)") - if fheader[_FH_GENERAL_PURPOSE_FLAG_BITS] & 0x800: + if fheader[_FH_GENERAL_PURPOSE_FLAG_BITS] & 0x800: # UTF-8 filename fname_str = fname.decode("utf-8") else: @@ -1553,10 +1553,10 @@ class ZipFile: if not pwd: raise RuntimeError("File %r is encrypted, password " "required for extraction" % name) - else: - pwd = None + else: + pwd = None - return ZipExtFile(zef_file, mode, zinfo, pwd, True) + return ZipExtFile(zef_file, mode, zinfo, pwd, True) except: zef_file.close() raise @@ -1572,7 +1572,7 @@ class ZipFile: "another write handle open on it. " "Close the first handle before opening another.") - # Size and CRC are overwritten with correct data after processing the file + # Size and CRC are overwritten with correct data after processing the file zinfo.compress_size = 0 zinfo.CRC = 0 @@ -1724,8 +1724,8 @@ class ZipFile: "Can't write to ZIP archive while an open writing handle exists" ) - zinfo = ZipInfo.from_file(filename, arcname, - strict_timestamps=self._strict_timestamps) + zinfo = ZipInfo.from_file(filename, arcname, + strict_timestamps=self._strict_timestamps) if zinfo.is_dir(): zinfo.compress_size = 0 @@ -1868,15 +1868,15 @@ class ZipFile: extract_version = max(min_version, zinfo.extract_version) create_version = max(min_version, zinfo.create_version) - filename, flag_bits = zinfo._encodeFilenameFlags() - centdir = struct.pack(structCentralDir, - stringCentralDir, create_version, - zinfo.create_system, extract_version, zinfo.reserved, - flag_bits, zinfo.compress_type, dostime, dosdate, - zinfo.CRC, compress_size, file_size, - len(filename), len(extra_data), len(zinfo.comment), - 0, zinfo.internal_attr, zinfo.external_attr, - header_offset) + filename, flag_bits = zinfo._encodeFilenameFlags() + centdir = struct.pack(structCentralDir, + stringCentralDir, create_version, + zinfo.create_system, extract_version, zinfo.reserved, + flag_bits, zinfo.compress_type, dostime, dosdate, + zinfo.CRC, compress_size, file_size, + len(filename), len(extra_data), len(zinfo.comment), + 0, zinfo.internal_attr, zinfo.external_attr, + header_offset) self.fp.write(centdir) self.fp.write(filename) self.fp.write(extra_data) @@ -1918,8 +1918,8 @@ class ZipFile: centDirSize, centDirOffset, len(self._comment)) self.fp.write(endrec) self.fp.write(self._comment) - if self.mode == "a": - self.fp.truncate() + if self.mode == "a": + self.fp.truncate() self.fp.flush() def _fpclose(self, fp): @@ -2103,266 +2103,266 @@ class PyZipFile(ZipFile): return (fname, archivename) -def _parents(path): - """ - Given a path with elements separated by - posixpath.sep, generate all parents of that path. - - >>> list(_parents('b/d')) - ['b'] - >>> list(_parents('/b/d/')) - ['/b'] - >>> list(_parents('b/d/f/')) - ['b/d', 'b'] - >>> list(_parents('b')) - [] - >>> list(_parents('')) - [] - """ - return itertools.islice(_ancestry(path), 1, None) - - -def _ancestry(path): - """ - Given a path with elements separated by - posixpath.sep, generate all elements of that path - - >>> list(_ancestry('b/d')) - ['b/d', 'b'] - >>> list(_ancestry('/b/d/')) - ['/b/d', '/b'] - >>> list(_ancestry('b/d/f/')) - ['b/d/f', 'b/d', 'b'] - >>> list(_ancestry('b')) - ['b'] - >>> list(_ancestry('')) - [] - """ - path = path.rstrip(posixpath.sep) - while path and path != posixpath.sep: - yield path - path, tail = posixpath.split(path) - - -_dedupe = dict.fromkeys -"""Deduplicate an iterable in original order""" - - -def _difference(minuend, subtrahend): - """ - Return items in minuend not in subtrahend, retaining order - with O(1) lookup. - """ - return itertools.filterfalse(set(subtrahend).__contains__, minuend) - - -class CompleteDirs(ZipFile): - """ - A ZipFile subclass that ensures that implied directories - are always included in the namelist. - """ - - @staticmethod - def _implied_dirs(names): - parents = itertools.chain.from_iterable(map(_parents, names)) - as_dirs = (p + posixpath.sep for p in parents) - return _dedupe(_difference(as_dirs, names)) - - def namelist(self): - names = super(CompleteDirs, self).namelist() - return names + list(self._implied_dirs(names)) - - def _name_set(self): - return set(self.namelist()) - - def resolve_dir(self, name): - """ - If the name represents a directory, return that name - as a directory (with the trailing slash). - """ - names = self._name_set() - dirname = name + '/' - dir_match = name not in names and dirname in names - return dirname if dir_match else name - - @classmethod - def make(cls, source): - """ - Given a source (filename or zipfile), return an - appropriate CompleteDirs subclass. - """ - if isinstance(source, CompleteDirs): - return source - - if not isinstance(source, ZipFile): - return cls(source) - - # Only allow for FastPath when supplied zipfile is read-only - if 'r' not in source.mode: - cls = CompleteDirs - - res = cls.__new__(cls) - vars(res).update(vars(source)) - return res - - -class FastLookup(CompleteDirs): - """ - ZipFile subclass to ensure implicit - dirs exist and are resolved rapidly. - """ - def namelist(self): - with contextlib.suppress(AttributeError): - return self.__names - self.__names = super(FastLookup, self).namelist() - return self.__names - - def _name_set(self): - with contextlib.suppress(AttributeError): - return self.__lookup - self.__lookup = super(FastLookup, self)._name_set() - return self.__lookup - - -class Path: - """ - A pathlib-compatible interface for zip files. - - Consider a zip file with this structure:: - - . - ├── a.txt - └── b - ├── c.txt - └── d - └── e.txt - - >>> data = io.BytesIO() - >>> zf = ZipFile(data, 'w') - >>> zf.writestr('a.txt', 'content of a') - >>> zf.writestr('b/c.txt', 'content of c') - >>> zf.writestr('b/d/e.txt', 'content of e') - >>> zf.filename = 'abcde.zip' - - Path accepts the zipfile object itself or a filename - - >>> root = Path(zf) - - From there, several path operations are available. - - Directory iteration (including the zip file itself): - - >>> a, b = root.iterdir() - >>> a - Path('abcde.zip', 'a.txt') - >>> b - Path('abcde.zip', 'b/') - - name property: - - >>> b.name - 'b' - - join with divide operator: - - >>> c = b / 'c.txt' - >>> c - Path('abcde.zip', 'b/c.txt') - >>> c.name - 'c.txt' - - Read text: - - >>> c.read_text() - 'content of c' - - existence: - - >>> c.exists() - True - >>> (b / 'missing.txt').exists() - False - - Coercion to string: - - >>> str(c) - 'abcde.zip/b/c.txt' - """ - - __repr = "{self.__class__.__name__}({self.root.filename!r}, {self.at!r})" - - def __init__(self, root, at=""): - self.root = FastLookup.make(root) - self.at = at - - def open(self, mode='r', *args, **kwargs): - """ - Open this entry as text or binary following the semantics - of ``pathlib.Path.open()`` by passing arguments through - to io.TextIOWrapper(). - """ - pwd = kwargs.pop('pwd', None) - zip_mode = mode[0] - stream = self.root.open(self.at, zip_mode, pwd=pwd) - if 'b' in mode: - if args or kwargs: - raise ValueError("encoding args invalid for binary operation") - return stream - return io.TextIOWrapper(stream, *args, **kwargs) - - @property - def name(self): - return posixpath.basename(self.at.rstrip("/")) - - def read_text(self, *args, **kwargs): - with self.open('r', *args, **kwargs) as strm: - return strm.read() - - def read_bytes(self): - with self.open('rb') as strm: - return strm.read() - - def _is_child(self, path): - return posixpath.dirname(path.at.rstrip("/")) == self.at.rstrip("/") - - def _next(self, at): - return Path(self.root, at) - - def is_dir(self): - return not self.at or self.at.endswith("/") - - def is_file(self): - return not self.is_dir() - - def exists(self): - return self.at in self.root._name_set() - - def iterdir(self): - if not self.is_dir(): - raise ValueError("Can't listdir a file") - subs = map(self._next, self.root.namelist()) - return filter(self._is_child, subs) - - def __str__(self): - return posixpath.join(self.root.filename, self.at) - - def __repr__(self): - return self.__repr.format(self=self) - - def joinpath(self, add): - next = posixpath.join(self.at, add) - return self._next(self.root.resolve_dir(next)) - - __truediv__ = joinpath - - @property - def parent(self): - parent_at = posixpath.dirname(self.at.rstrip('/')) - if parent_at: - parent_at += '/' - return self._next(parent_at) - - +def _parents(path): + """ + Given a path with elements separated by + posixpath.sep, generate all parents of that path. + + >>> list(_parents('b/d')) + ['b'] + >>> list(_parents('/b/d/')) + ['/b'] + >>> list(_parents('b/d/f/')) + ['b/d', 'b'] + >>> list(_parents('b')) + [] + >>> list(_parents('')) + [] + """ + return itertools.islice(_ancestry(path), 1, None) + + +def _ancestry(path): + """ + Given a path with elements separated by + posixpath.sep, generate all elements of that path + + >>> list(_ancestry('b/d')) + ['b/d', 'b'] + >>> list(_ancestry('/b/d/')) + ['/b/d', '/b'] + >>> list(_ancestry('b/d/f/')) + ['b/d/f', 'b/d', 'b'] + >>> list(_ancestry('b')) + ['b'] + >>> list(_ancestry('')) + [] + """ + path = path.rstrip(posixpath.sep) + while path and path != posixpath.sep: + yield path + path, tail = posixpath.split(path) + + +_dedupe = dict.fromkeys +"""Deduplicate an iterable in original order""" + + +def _difference(minuend, subtrahend): + """ + Return items in minuend not in subtrahend, retaining order + with O(1) lookup. + """ + return itertools.filterfalse(set(subtrahend).__contains__, minuend) + + +class CompleteDirs(ZipFile): + """ + A ZipFile subclass that ensures that implied directories + are always included in the namelist. + """ + + @staticmethod + def _implied_dirs(names): + parents = itertools.chain.from_iterable(map(_parents, names)) + as_dirs = (p + posixpath.sep for p in parents) + return _dedupe(_difference(as_dirs, names)) + + def namelist(self): + names = super(CompleteDirs, self).namelist() + return names + list(self._implied_dirs(names)) + + def _name_set(self): + return set(self.namelist()) + + def resolve_dir(self, name): + """ + If the name represents a directory, return that name + as a directory (with the trailing slash). + """ + names = self._name_set() + dirname = name + '/' + dir_match = name not in names and dirname in names + return dirname if dir_match else name + + @classmethod + def make(cls, source): + """ + Given a source (filename or zipfile), return an + appropriate CompleteDirs subclass. + """ + if isinstance(source, CompleteDirs): + return source + + if not isinstance(source, ZipFile): + return cls(source) + + # Only allow for FastPath when supplied zipfile is read-only + if 'r' not in source.mode: + cls = CompleteDirs + + res = cls.__new__(cls) + vars(res).update(vars(source)) + return res + + +class FastLookup(CompleteDirs): + """ + ZipFile subclass to ensure implicit + dirs exist and are resolved rapidly. + """ + def namelist(self): + with contextlib.suppress(AttributeError): + return self.__names + self.__names = super(FastLookup, self).namelist() + return self.__names + + def _name_set(self): + with contextlib.suppress(AttributeError): + return self.__lookup + self.__lookup = super(FastLookup, self)._name_set() + return self.__lookup + + +class Path: + """ + A pathlib-compatible interface for zip files. + + Consider a zip file with this structure:: + + . + ├── a.txt + └── b + ├── c.txt + └── d + └── e.txt + + >>> data = io.BytesIO() + >>> zf = ZipFile(data, 'w') + >>> zf.writestr('a.txt', 'content of a') + >>> zf.writestr('b/c.txt', 'content of c') + >>> zf.writestr('b/d/e.txt', 'content of e') + >>> zf.filename = 'abcde.zip' + + Path accepts the zipfile object itself or a filename + + >>> root = Path(zf) + + From there, several path operations are available. + + Directory iteration (including the zip file itself): + + >>> a, b = root.iterdir() + >>> a + Path('abcde.zip', 'a.txt') + >>> b + Path('abcde.zip', 'b/') + + name property: + + >>> b.name + 'b' + + join with divide operator: + + >>> c = b / 'c.txt' + >>> c + Path('abcde.zip', 'b/c.txt') + >>> c.name + 'c.txt' + + Read text: + + >>> c.read_text() + 'content of c' + + existence: + + >>> c.exists() + True + >>> (b / 'missing.txt').exists() + False + + Coercion to string: + + >>> str(c) + 'abcde.zip/b/c.txt' + """ + + __repr = "{self.__class__.__name__}({self.root.filename!r}, {self.at!r})" + + def __init__(self, root, at=""): + self.root = FastLookup.make(root) + self.at = at + + def open(self, mode='r', *args, **kwargs): + """ + Open this entry as text or binary following the semantics + of ``pathlib.Path.open()`` by passing arguments through + to io.TextIOWrapper(). + """ + pwd = kwargs.pop('pwd', None) + zip_mode = mode[0] + stream = self.root.open(self.at, zip_mode, pwd=pwd) + if 'b' in mode: + if args or kwargs: + raise ValueError("encoding args invalid for binary operation") + return stream + return io.TextIOWrapper(stream, *args, **kwargs) + + @property + def name(self): + return posixpath.basename(self.at.rstrip("/")) + + def read_text(self, *args, **kwargs): + with self.open('r', *args, **kwargs) as strm: + return strm.read() + + def read_bytes(self): + with self.open('rb') as strm: + return strm.read() + + def _is_child(self, path): + return posixpath.dirname(path.at.rstrip("/")) == self.at.rstrip("/") + + def _next(self, at): + return Path(self.root, at) + + def is_dir(self): + return not self.at or self.at.endswith("/") + + def is_file(self): + return not self.is_dir() + + def exists(self): + return self.at in self.root._name_set() + + def iterdir(self): + if not self.is_dir(): + raise ValueError("Can't listdir a file") + subs = map(self._next, self.root.namelist()) + return filter(self._is_child, subs) + + def __str__(self): + return posixpath.join(self.root.filename, self.at) + + def __repr__(self): + return self.__repr.format(self=self) + + def joinpath(self, add): + next = posixpath.join(self.at, add) + return self._next(self.root.resolve_dir(next)) + + __truediv__ = joinpath + + @property + def parent(self): + parent_at = posixpath.dirname(self.at.rstrip('/')) + if parent_at: + parent_at += '/' + return self._next(parent_at) + + def main(args=None): import argparse @@ -2423,6 +2423,6 @@ def main(args=None): zippath = '' addToZip(zf, path, zippath) - + if __name__ == "__main__": main() |