aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/tools/python3/src/Lib/zipfile.py
diff options
context:
space:
mode:
authorshadchin <shadchin@yandex-team.ru>2022-02-10 16:44:30 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:44:30 +0300
commit2598ef1d0aee359b4b6d5fdd1758916d5907d04f (patch)
tree012bb94d777798f1f56ac1cec429509766d05181 /contrib/tools/python3/src/Lib/zipfile.py
parent6751af0b0c1b952fede40b19b71da8025b5d8bcf (diff)
downloadydb-2598ef1d0aee359b4b6d5fdd1758916d5907d04f.tar.gz
Restoring authorship annotation for <shadchin@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/tools/python3/src/Lib/zipfile.py')
-rw-r--r--contrib/tools/python3/src/Lib/zipfile.py800
1 files changed, 400 insertions, 400 deletions
diff --git a/contrib/tools/python3/src/Lib/zipfile.py b/contrib/tools/python3/src/Lib/zipfile.py
index 816f8582bb..23e1605f4e 100644
--- a/contrib/tools/python3/src/Lib/zipfile.py
+++ b/contrib/tools/python3/src/Lib/zipfile.py
@@ -3,19 +3,19 @@ Read and write ZIP files.
XXX references to utf-8 need further investigation.
"""
-import binascii
-import importlib.util
+import binascii
+import importlib.util
import io
-import itertools
+import itertools
import os
-import posixpath
-import shutil
+import posixpath
+import shutil
import stat
import struct
-import sys
+import sys
import threading
-import time
-import contextlib
+import time
+import contextlib
try:
import zlib # We may need its compression method
@@ -36,8 +36,8 @@ except ImportError:
__all__ = ["BadZipFile", "BadZipfile", "error",
"ZIP_STORED", "ZIP_DEFLATED", "ZIP_BZIP2", "ZIP_LZMA",
- "is_zipfile", "ZipInfo", "ZipFile", "PyZipFile", "LargeZipFile",
- "Path"]
+ "is_zipfile", "ZipInfo", "ZipFile", "PyZipFile", "LargeZipFile",
+ "Path"]
class BadZipFile(Exception):
pass
@@ -228,7 +228,7 @@ def _EndRecData64(fpin, offset, endrec):
if sig != stringEndArchive64Locator:
return endrec
- if diskno != 0 or disks > 1:
+ if diskno != 0 or disks > 1:
raise BadZipFile("zipfiles that span multiple disks are not supported")
# Assume no 'zip64 extensible data'
@@ -377,8 +377,8 @@ class ZipInfo (object):
self.volume = 0 # Volume number of file header
self.internal_attr = 0 # Internal attributes
self.external_attr = 0 # External file attributes
- self.compress_size = 0 # Size of the compressed file
- self.file_size = 0 # Size of the uncompressed file
+ self.compress_size = 0 # Size of the compressed file
+ self.file_size = 0 # Size of the uncompressed file
# Other attributes are set by class ZipFile:
# header_offset Byte offset to the file header
# CRC CRC-32 of the uncompressed file
@@ -466,28 +466,28 @@ class ZipInfo (object):
if ln+4 > len(extra):
raise BadZipFile("Corrupt extra field %04x (size=%d)" % (tp, ln))
if tp == 0x0001:
- data = extra[4:ln+4]
+ data = extra[4:ln+4]
# ZIP64 extension (large files and/or large archives)
- try:
- if self.file_size in (0xFFFF_FFFF_FFFF_FFFF, 0xFFFF_FFFF):
- field = "File size"
- self.file_size, = unpack('<Q', data[:8])
- data = data[8:]
- if self.compress_size == 0xFFFF_FFFF:
- field = "Compress size"
- self.compress_size, = unpack('<Q', data[:8])
- data = data[8:]
- if self.header_offset == 0xFFFF_FFFF:
- field = "Header offset"
- self.header_offset, = unpack('<Q', data[:8])
- except struct.error:
- raise BadZipFile(f"Corrupt zip64 extra field. "
- f"{field} not found.") from None
+ try:
+ if self.file_size in (0xFFFF_FFFF_FFFF_FFFF, 0xFFFF_FFFF):
+ field = "File size"
+ self.file_size, = unpack('<Q', data[:8])
+ data = data[8:]
+ if self.compress_size == 0xFFFF_FFFF:
+ field = "Compress size"
+ self.compress_size, = unpack('<Q', data[:8])
+ data = data[8:]
+ if self.header_offset == 0xFFFF_FFFF:
+ field = "Header offset"
+ self.header_offset, = unpack('<Q', data[:8])
+ except struct.error:
+ raise BadZipFile(f"Corrupt zip64 extra field. "
+ f"{field} not found.") from None
extra = extra[ln+4:]
@classmethod
- def from_file(cls, filename, arcname=None, *, strict_timestamps=True):
+ def from_file(cls, filename, arcname=None, *, strict_timestamps=True):
"""Construct an appropriate ZipInfo for a file on the filesystem.
filename should be the path to a file or directory on the filesystem.
@@ -502,10 +502,10 @@ class ZipInfo (object):
isdir = stat.S_ISDIR(st.st_mode)
mtime = time.localtime(st.st_mtime)
date_time = mtime[0:6]
- if not strict_timestamps and date_time[0] < 1980:
- date_time = (1980, 1, 1, 0, 0, 0)
- elif not strict_timestamps and date_time[0] > 2107:
- date_time = (2107, 12, 31, 23, 59, 59)
+ if not strict_timestamps and date_time[0] < 1980:
+ date_time = (1980, 1, 1, 0, 0, 0)
+ elif not strict_timestamps and date_time[0] > 2107:
+ date_time = (2107, 12, 31, 23, 59, 59)
# Create ZipInfo instance to store file information
if arcname is None:
arcname = filename
@@ -695,7 +695,7 @@ def _get_compressor(compress_type, compresslevel=None):
def _get_decompressor(compress_type):
- _check_compression(compress_type)
+ _check_compression(compress_type)
if compress_type == ZIP_STORED:
return None
elif compress_type == ZIP_DEFLATED:
@@ -784,10 +784,10 @@ class ZipExtFile(io.BufferedIOBase):
# Chunk size to read during seek
MAX_SEEK_READ = 1 << 24
- def __init__(self, fileobj, mode, zipinfo, pwd=None,
+ def __init__(self, fileobj, mode, zipinfo, pwd=None,
close_fileobj=False):
self._fileobj = fileobj
- self._pwd = pwd
+ self._pwd = pwd
self._close_fileobj = close_fileobj
self._compress_type = zipinfo.compress_type
@@ -822,30 +822,30 @@ class ZipExtFile(io.BufferedIOBase):
except AttributeError:
pass
- self._decrypter = None
- if pwd:
- if zipinfo.flag_bits & 0x8:
- # compare against the file type from extended local headers
- check_byte = (zipinfo._raw_time >> 8) & 0xff
- else:
- # compare against the CRC otherwise
- check_byte = (zipinfo.CRC >> 24) & 0xff
- h = self._init_decrypter()
- if h != check_byte:
- raise RuntimeError("Bad password for file %r" % zipinfo.orig_filename)
-
-
- def _init_decrypter(self):
- self._decrypter = _ZipDecrypter(self._pwd)
- # The first 12 bytes in the cypher stream is an encryption header
- # used to strengthen the algorithm. The first 11 bytes are
- # completely random, while the 12th contains the MSB of the CRC,
- # or the MSB of the file time depending on the header type
- # and is used to check the correctness of the password.
- header = self._fileobj.read(12)
- self._compress_left -= 12
- return self._decrypter(header)[11]
-
+ self._decrypter = None
+ if pwd:
+ if zipinfo.flag_bits & 0x8:
+ # compare against the file type from extended local headers
+ check_byte = (zipinfo._raw_time >> 8) & 0xff
+ else:
+ # compare against the CRC otherwise
+ check_byte = (zipinfo.CRC >> 24) & 0xff
+ h = self._init_decrypter()
+ if h != check_byte:
+ raise RuntimeError("Bad password for file %r" % zipinfo.orig_filename)
+
+
+ def _init_decrypter(self):
+ self._decrypter = _ZipDecrypter(self._pwd)
+ # The first 12 bytes in the cypher stream is an encryption header
+ # used to strengthen the algorithm. The first 11 bytes are
+ # completely random, while the 12th contains the MSB of the CRC,
+ # or the MSB of the file time depending on the header type
+ # and is used to check the correctness of the password.
+ header = self._fileobj.read(12)
+ self._compress_left -= 12
+ return self._decrypter(header)[11]
+
def __repr__(self):
result = ['<%s.%s' % (self.__class__.__module__,
self.__class__.__qualname__)]
@@ -890,16 +890,16 @@ class ZipExtFile(io.BufferedIOBase):
return self._readbuffer[self._offset: self._offset + 512]
def readable(self):
- if self.closed:
- raise ValueError("I/O operation on closed file.")
+ if self.closed:
+ raise ValueError("I/O operation on closed file.")
return True
def read(self, n=-1):
"""Read and return up to n bytes.
- If the argument is omitted, None, or negative, data is read and returned until EOF is reached.
+ If the argument is omitted, None, or negative, data is read and returned until EOF is reached.
"""
- if self.closed:
- raise ValueError("read from closed file.")
+ if self.closed:
+ raise ValueError("read from closed file.")
if n is None or n < 0:
buf = self._readbuffer[self._offset:]
self._readbuffer = b''
@@ -1036,13 +1036,13 @@ class ZipExtFile(io.BufferedIOBase):
super().close()
def seekable(self):
- if self.closed:
- raise ValueError("I/O operation on closed file.")
+ if self.closed:
+ raise ValueError("I/O operation on closed file.")
return self._seekable
def seek(self, offset, whence=0):
- if self.closed:
- raise ValueError("seek on closed file.")
+ if self.closed:
+ raise ValueError("seek on closed file.")
if not self._seekable:
raise io.UnsupportedOperation("underlying stream is not seekable")
curr_pos = self.tell()
@@ -1080,8 +1080,8 @@ class ZipExtFile(io.BufferedIOBase):
self._decompressor = _get_decompressor(self._compress_type)
self._eof = False
read_offset = new_pos
- if self._decrypter is not None:
- self._init_decrypter()
+ if self._decrypter is not None:
+ self._init_decrypter()
while read_offset > 0:
read_len = min(self.MAX_SEEK_READ, read_offset)
@@ -1091,8 +1091,8 @@ class ZipExtFile(io.BufferedIOBase):
return self.tell()
def tell(self):
- if self.closed:
- raise ValueError("tell on closed file.")
+ if self.closed:
+ raise ValueError("tell on closed file.")
if not self._seekable:
raise io.UnsupportedOperation("underlying stream is not seekable")
filepos = self._orig_file_size - self._left - len(self._readbuffer) + self._offset
@@ -1132,51 +1132,51 @@ class _ZipWriteFile(io.BufferedIOBase):
def close(self):
if self.closed:
return
- try:
- super().close()
- # Flush any data from the compressor, and update header info
- if self._compressor:
- buf = self._compressor.flush()
- self._compress_size += len(buf)
- self._fileobj.write(buf)
- self._zinfo.compress_size = self._compress_size
- else:
- self._zinfo.compress_size = self._file_size
- self._zinfo.CRC = self._crc
- self._zinfo.file_size = self._file_size
-
- # Write updated header info
- if self._zinfo.flag_bits & 0x08:
- # Write CRC and file sizes after the file data
- fmt = '<LLQQ' if self._zip64 else '<LLLL'
- self._fileobj.write(struct.pack(fmt, _DD_SIGNATURE, self._zinfo.CRC,
- self._zinfo.compress_size, self._zinfo.file_size))
- self._zipfile.start_dir = self._fileobj.tell()
- else:
- if not self._zip64:
- if self._file_size > ZIP64_LIMIT:
- raise RuntimeError(
- 'File size unexpectedly exceeded ZIP64 limit')
- if self._compress_size > ZIP64_LIMIT:
- raise RuntimeError(
- 'Compressed size unexpectedly exceeded ZIP64 limit')
- # Seek backwards and write file header (which will now include
- # correct CRC and file sizes)
-
- # Preserve current position in file
- self._zipfile.start_dir = self._fileobj.tell()
- self._fileobj.seek(self._zinfo.header_offset)
- self._fileobj.write(self._zinfo.FileHeader(self._zip64))
- self._fileobj.seek(self._zipfile.start_dir)
-
- # Successfully written: Add file to our caches
- self._zipfile.filelist.append(self._zinfo)
- self._zipfile.NameToInfo[self._zinfo.filename] = self._zinfo
- finally:
- self._zipfile._writing = False
-
-
-
+ try:
+ super().close()
+ # Flush any data from the compressor, and update header info
+ if self._compressor:
+ buf = self._compressor.flush()
+ self._compress_size += len(buf)
+ self._fileobj.write(buf)
+ self._zinfo.compress_size = self._compress_size
+ else:
+ self._zinfo.compress_size = self._file_size
+ self._zinfo.CRC = self._crc
+ self._zinfo.file_size = self._file_size
+
+ # Write updated header info
+ if self._zinfo.flag_bits & 0x08:
+ # Write CRC and file sizes after the file data
+ fmt = '<LLQQ' if self._zip64 else '<LLLL'
+ self._fileobj.write(struct.pack(fmt, _DD_SIGNATURE, self._zinfo.CRC,
+ self._zinfo.compress_size, self._zinfo.file_size))
+ self._zipfile.start_dir = self._fileobj.tell()
+ else:
+ if not self._zip64:
+ if self._file_size > ZIP64_LIMIT:
+ raise RuntimeError(
+ 'File size unexpectedly exceeded ZIP64 limit')
+ if self._compress_size > ZIP64_LIMIT:
+ raise RuntimeError(
+ 'Compressed size unexpectedly exceeded ZIP64 limit')
+ # Seek backwards and write file header (which will now include
+ # correct CRC and file sizes)
+
+ # Preserve current position in file
+ self._zipfile.start_dir = self._fileobj.tell()
+ self._fileobj.seek(self._zinfo.header_offset)
+ self._fileobj.write(self._zinfo.FileHeader(self._zip64))
+ self._fileobj.seek(self._zipfile.start_dir)
+
+ # Successfully written: Add file to our caches
+ self._zipfile.filelist.append(self._zinfo)
+ self._zipfile.NameToInfo[self._zinfo.filename] = self._zinfo
+ finally:
+ self._zipfile._writing = False
+
+
+
class ZipFile:
""" Class with methods to open, read, write, close, list zip files.
@@ -1204,7 +1204,7 @@ class ZipFile:
_windows_illegal_name_trans_table = None
def __init__(self, file, mode="r", compression=ZIP_STORED, allowZip64=True,
- compresslevel=None, *, strict_timestamps=True):
+ compresslevel=None, *, strict_timestamps=True):
"""Open the ZIP file with mode read 'r', write 'w', exclusive create 'x',
or append 'a'."""
if mode not in ('r', 'w', 'x', 'a'):
@@ -1222,7 +1222,7 @@ class ZipFile:
self.mode = mode
self.pwd = None
self._comment = b''
- self._strict_timestamps = strict_timestamps
+ self._strict_timestamps = strict_timestamps
# Check if we were passed a file-like object
if isinstance(file, os.PathLike):
@@ -1534,7 +1534,7 @@ class ZipFile:
# strong encryption
raise NotImplementedError("strong encryption (flag bit 6)")
- if fheader[_FH_GENERAL_PURPOSE_FLAG_BITS] & 0x800:
+ if fheader[_FH_GENERAL_PURPOSE_FLAG_BITS] & 0x800:
# UTF-8 filename
fname_str = fname.decode("utf-8")
else:
@@ -1553,10 +1553,10 @@ class ZipFile:
if not pwd:
raise RuntimeError("File %r is encrypted, password "
"required for extraction" % name)
- else:
- pwd = None
+ else:
+ pwd = None
- return ZipExtFile(zef_file, mode, zinfo, pwd, True)
+ return ZipExtFile(zef_file, mode, zinfo, pwd, True)
except:
zef_file.close()
raise
@@ -1572,7 +1572,7 @@ class ZipFile:
"another write handle open on it. "
"Close the first handle before opening another.")
- # Size and CRC are overwritten with correct data after processing the file
+ # Size and CRC are overwritten with correct data after processing the file
zinfo.compress_size = 0
zinfo.CRC = 0
@@ -1724,8 +1724,8 @@ class ZipFile:
"Can't write to ZIP archive while an open writing handle exists"
)
- zinfo = ZipInfo.from_file(filename, arcname,
- strict_timestamps=self._strict_timestamps)
+ zinfo = ZipInfo.from_file(filename, arcname,
+ strict_timestamps=self._strict_timestamps)
if zinfo.is_dir():
zinfo.compress_size = 0
@@ -1868,15 +1868,15 @@ class ZipFile:
extract_version = max(min_version, zinfo.extract_version)
create_version = max(min_version, zinfo.create_version)
- filename, flag_bits = zinfo._encodeFilenameFlags()
- centdir = struct.pack(structCentralDir,
- stringCentralDir, create_version,
- zinfo.create_system, extract_version, zinfo.reserved,
- flag_bits, zinfo.compress_type, dostime, dosdate,
- zinfo.CRC, compress_size, file_size,
- len(filename), len(extra_data), len(zinfo.comment),
- 0, zinfo.internal_attr, zinfo.external_attr,
- header_offset)
+ filename, flag_bits = zinfo._encodeFilenameFlags()
+ centdir = struct.pack(structCentralDir,
+ stringCentralDir, create_version,
+ zinfo.create_system, extract_version, zinfo.reserved,
+ flag_bits, zinfo.compress_type, dostime, dosdate,
+ zinfo.CRC, compress_size, file_size,
+ len(filename), len(extra_data), len(zinfo.comment),
+ 0, zinfo.internal_attr, zinfo.external_attr,
+ header_offset)
self.fp.write(centdir)
self.fp.write(filename)
self.fp.write(extra_data)
@@ -1918,8 +1918,8 @@ class ZipFile:
centDirSize, centDirOffset, len(self._comment))
self.fp.write(endrec)
self.fp.write(self._comment)
- if self.mode == "a":
- self.fp.truncate()
+ if self.mode == "a":
+ self.fp.truncate()
self.fp.flush()
def _fpclose(self, fp):
@@ -2103,266 +2103,266 @@ class PyZipFile(ZipFile):
return (fname, archivename)
-def _parents(path):
- """
- Given a path with elements separated by
- posixpath.sep, generate all parents of that path.
-
- >>> list(_parents('b/d'))
- ['b']
- >>> list(_parents('/b/d/'))
- ['/b']
- >>> list(_parents('b/d/f/'))
- ['b/d', 'b']
- >>> list(_parents('b'))
- []
- >>> list(_parents(''))
- []
- """
- return itertools.islice(_ancestry(path), 1, None)
-
-
-def _ancestry(path):
- """
- Given a path with elements separated by
- posixpath.sep, generate all elements of that path
-
- >>> list(_ancestry('b/d'))
- ['b/d', 'b']
- >>> list(_ancestry('/b/d/'))
- ['/b/d', '/b']
- >>> list(_ancestry('b/d/f/'))
- ['b/d/f', 'b/d', 'b']
- >>> list(_ancestry('b'))
- ['b']
- >>> list(_ancestry(''))
- []
- """
- path = path.rstrip(posixpath.sep)
- while path and path != posixpath.sep:
- yield path
- path, tail = posixpath.split(path)
-
-
-_dedupe = dict.fromkeys
-"""Deduplicate an iterable in original order"""
-
-
-def _difference(minuend, subtrahend):
- """
- Return items in minuend not in subtrahend, retaining order
- with O(1) lookup.
- """
- return itertools.filterfalse(set(subtrahend).__contains__, minuend)
-
-
-class CompleteDirs(ZipFile):
- """
- A ZipFile subclass that ensures that implied directories
- are always included in the namelist.
- """
-
- @staticmethod
- def _implied_dirs(names):
- parents = itertools.chain.from_iterable(map(_parents, names))
- as_dirs = (p + posixpath.sep for p in parents)
- return _dedupe(_difference(as_dirs, names))
-
- def namelist(self):
- names = super(CompleteDirs, self).namelist()
- return names + list(self._implied_dirs(names))
-
- def _name_set(self):
- return set(self.namelist())
-
- def resolve_dir(self, name):
- """
- If the name represents a directory, return that name
- as a directory (with the trailing slash).
- """
- names = self._name_set()
- dirname = name + '/'
- dir_match = name not in names and dirname in names
- return dirname if dir_match else name
-
- @classmethod
- def make(cls, source):
- """
- Given a source (filename or zipfile), return an
- appropriate CompleteDirs subclass.
- """
- if isinstance(source, CompleteDirs):
- return source
-
- if not isinstance(source, ZipFile):
- return cls(source)
-
- # Only allow for FastPath when supplied zipfile is read-only
- if 'r' not in source.mode:
- cls = CompleteDirs
-
- res = cls.__new__(cls)
- vars(res).update(vars(source))
- return res
-
-
-class FastLookup(CompleteDirs):
- """
- ZipFile subclass to ensure implicit
- dirs exist and are resolved rapidly.
- """
- def namelist(self):
- with contextlib.suppress(AttributeError):
- return self.__names
- self.__names = super(FastLookup, self).namelist()
- return self.__names
-
- def _name_set(self):
- with contextlib.suppress(AttributeError):
- return self.__lookup
- self.__lookup = super(FastLookup, self)._name_set()
- return self.__lookup
-
-
-class Path:
- """
- A pathlib-compatible interface for zip files.
-
- Consider a zip file with this structure::
-
- .
- ├── a.txt
- └── b
- ├── c.txt
- └── d
- └── e.txt
-
- >>> data = io.BytesIO()
- >>> zf = ZipFile(data, 'w')
- >>> zf.writestr('a.txt', 'content of a')
- >>> zf.writestr('b/c.txt', 'content of c')
- >>> zf.writestr('b/d/e.txt', 'content of e')
- >>> zf.filename = 'abcde.zip'
-
- Path accepts the zipfile object itself or a filename
-
- >>> root = Path(zf)
-
- From there, several path operations are available.
-
- Directory iteration (including the zip file itself):
-
- >>> a, b = root.iterdir()
- >>> a
- Path('abcde.zip', 'a.txt')
- >>> b
- Path('abcde.zip', 'b/')
-
- name property:
-
- >>> b.name
- 'b'
-
- join with divide operator:
-
- >>> c = b / 'c.txt'
- >>> c
- Path('abcde.zip', 'b/c.txt')
- >>> c.name
- 'c.txt'
-
- Read text:
-
- >>> c.read_text()
- 'content of c'
-
- existence:
-
- >>> c.exists()
- True
- >>> (b / 'missing.txt').exists()
- False
-
- Coercion to string:
-
- >>> str(c)
- 'abcde.zip/b/c.txt'
- """
-
- __repr = "{self.__class__.__name__}({self.root.filename!r}, {self.at!r})"
-
- def __init__(self, root, at=""):
- self.root = FastLookup.make(root)
- self.at = at
-
- def open(self, mode='r', *args, **kwargs):
- """
- Open this entry as text or binary following the semantics
- of ``pathlib.Path.open()`` by passing arguments through
- to io.TextIOWrapper().
- """
- pwd = kwargs.pop('pwd', None)
- zip_mode = mode[0]
- stream = self.root.open(self.at, zip_mode, pwd=pwd)
- if 'b' in mode:
- if args or kwargs:
- raise ValueError("encoding args invalid for binary operation")
- return stream
- return io.TextIOWrapper(stream, *args, **kwargs)
-
- @property
- def name(self):
- return posixpath.basename(self.at.rstrip("/"))
-
- def read_text(self, *args, **kwargs):
- with self.open('r', *args, **kwargs) as strm:
- return strm.read()
-
- def read_bytes(self):
- with self.open('rb') as strm:
- return strm.read()
-
- def _is_child(self, path):
- return posixpath.dirname(path.at.rstrip("/")) == self.at.rstrip("/")
-
- def _next(self, at):
- return Path(self.root, at)
-
- def is_dir(self):
- return not self.at or self.at.endswith("/")
-
- def is_file(self):
- return not self.is_dir()
-
- def exists(self):
- return self.at in self.root._name_set()
-
- def iterdir(self):
- if not self.is_dir():
- raise ValueError("Can't listdir a file")
- subs = map(self._next, self.root.namelist())
- return filter(self._is_child, subs)
-
- def __str__(self):
- return posixpath.join(self.root.filename, self.at)
-
- def __repr__(self):
- return self.__repr.format(self=self)
-
- def joinpath(self, add):
- next = posixpath.join(self.at, add)
- return self._next(self.root.resolve_dir(next))
-
- __truediv__ = joinpath
-
- @property
- def parent(self):
- parent_at = posixpath.dirname(self.at.rstrip('/'))
- if parent_at:
- parent_at += '/'
- return self._next(parent_at)
-
-
+def _parents(path):
+ """
+ Given a path with elements separated by
+ posixpath.sep, generate all parents of that path.
+
+ >>> list(_parents('b/d'))
+ ['b']
+ >>> list(_parents('/b/d/'))
+ ['/b']
+ >>> list(_parents('b/d/f/'))
+ ['b/d', 'b']
+ >>> list(_parents('b'))
+ []
+ >>> list(_parents(''))
+ []
+ """
+ return itertools.islice(_ancestry(path), 1, None)
+
+
+def _ancestry(path):
+ """
+ Given a path with elements separated by
+ posixpath.sep, generate all elements of that path
+
+ >>> list(_ancestry('b/d'))
+ ['b/d', 'b']
+ >>> list(_ancestry('/b/d/'))
+ ['/b/d', '/b']
+ >>> list(_ancestry('b/d/f/'))
+ ['b/d/f', 'b/d', 'b']
+ >>> list(_ancestry('b'))
+ ['b']
+ >>> list(_ancestry(''))
+ []
+ """
+ path = path.rstrip(posixpath.sep)
+ while path and path != posixpath.sep:
+ yield path
+ path, tail = posixpath.split(path)
+
+
+_dedupe = dict.fromkeys
+"""Deduplicate an iterable in original order"""
+
+
+def _difference(minuend, subtrahend):
+ """
+ Return items in minuend not in subtrahend, retaining order
+ with O(1) lookup.
+ """
+ return itertools.filterfalse(set(subtrahend).__contains__, minuend)
+
+
+class CompleteDirs(ZipFile):
+ """
+ A ZipFile subclass that ensures that implied directories
+ are always included in the namelist.
+ """
+
+ @staticmethod
+ def _implied_dirs(names):
+ parents = itertools.chain.from_iterable(map(_parents, names))
+ as_dirs = (p + posixpath.sep for p in parents)
+ return _dedupe(_difference(as_dirs, names))
+
+ def namelist(self):
+ names = super(CompleteDirs, self).namelist()
+ return names + list(self._implied_dirs(names))
+
+ def _name_set(self):
+ return set(self.namelist())
+
+ def resolve_dir(self, name):
+ """
+ If the name represents a directory, return that name
+ as a directory (with the trailing slash).
+ """
+ names = self._name_set()
+ dirname = name + '/'
+ dir_match = name not in names and dirname in names
+ return dirname if dir_match else name
+
+ @classmethod
+ def make(cls, source):
+ """
+ Given a source (filename or zipfile), return an
+ appropriate CompleteDirs subclass.
+ """
+ if isinstance(source, CompleteDirs):
+ return source
+
+ if not isinstance(source, ZipFile):
+ return cls(source)
+
+ # Only allow for FastPath when supplied zipfile is read-only
+ if 'r' not in source.mode:
+ cls = CompleteDirs
+
+ res = cls.__new__(cls)
+ vars(res).update(vars(source))
+ return res
+
+
+class FastLookup(CompleteDirs):
+ """
+ ZipFile subclass to ensure implicit
+ dirs exist and are resolved rapidly.
+ """
+ def namelist(self):
+ with contextlib.suppress(AttributeError):
+ return self.__names
+ self.__names = super(FastLookup, self).namelist()
+ return self.__names
+
+ def _name_set(self):
+ with contextlib.suppress(AttributeError):
+ return self.__lookup
+ self.__lookup = super(FastLookup, self)._name_set()
+ return self.__lookup
+
+
+class Path:
+ """
+ A pathlib-compatible interface for zip files.
+
+ Consider a zip file with this structure::
+
+ .
+ ├── a.txt
+ └── b
+ ├── c.txt
+ └── d
+ └── e.txt
+
+ >>> data = io.BytesIO()
+ >>> zf = ZipFile(data, 'w')
+ >>> zf.writestr('a.txt', 'content of a')
+ >>> zf.writestr('b/c.txt', 'content of c')
+ >>> zf.writestr('b/d/e.txt', 'content of e')
+ >>> zf.filename = 'abcde.zip'
+
+ Path accepts the zipfile object itself or a filename
+
+ >>> root = Path(zf)
+
+ From there, several path operations are available.
+
+ Directory iteration (including the zip file itself):
+
+ >>> a, b = root.iterdir()
+ >>> a
+ Path('abcde.zip', 'a.txt')
+ >>> b
+ Path('abcde.zip', 'b/')
+
+ name property:
+
+ >>> b.name
+ 'b'
+
+ join with divide operator:
+
+ >>> c = b / 'c.txt'
+ >>> c
+ Path('abcde.zip', 'b/c.txt')
+ >>> c.name
+ 'c.txt'
+
+ Read text:
+
+ >>> c.read_text()
+ 'content of c'
+
+ existence:
+
+ >>> c.exists()
+ True
+ >>> (b / 'missing.txt').exists()
+ False
+
+ Coercion to string:
+
+ >>> str(c)
+ 'abcde.zip/b/c.txt'
+ """
+
+ __repr = "{self.__class__.__name__}({self.root.filename!r}, {self.at!r})"
+
+ def __init__(self, root, at=""):
+ self.root = FastLookup.make(root)
+ self.at = at
+
+ def open(self, mode='r', *args, **kwargs):
+ """
+ Open this entry as text or binary following the semantics
+ of ``pathlib.Path.open()`` by passing arguments through
+ to io.TextIOWrapper().
+ """
+ pwd = kwargs.pop('pwd', None)
+ zip_mode = mode[0]
+ stream = self.root.open(self.at, zip_mode, pwd=pwd)
+ if 'b' in mode:
+ if args or kwargs:
+ raise ValueError("encoding args invalid for binary operation")
+ return stream
+ return io.TextIOWrapper(stream, *args, **kwargs)
+
+ @property
+ def name(self):
+ return posixpath.basename(self.at.rstrip("/"))
+
+ def read_text(self, *args, **kwargs):
+ with self.open('r', *args, **kwargs) as strm:
+ return strm.read()
+
+ def read_bytes(self):
+ with self.open('rb') as strm:
+ return strm.read()
+
+ def _is_child(self, path):
+ return posixpath.dirname(path.at.rstrip("/")) == self.at.rstrip("/")
+
+ def _next(self, at):
+ return Path(self.root, at)
+
+ def is_dir(self):
+ return not self.at or self.at.endswith("/")
+
+ def is_file(self):
+ return not self.is_dir()
+
+ def exists(self):
+ return self.at in self.root._name_set()
+
+ def iterdir(self):
+ if not self.is_dir():
+ raise ValueError("Can't listdir a file")
+ subs = map(self._next, self.root.namelist())
+ return filter(self._is_child, subs)
+
+ def __str__(self):
+ return posixpath.join(self.root.filename, self.at)
+
+ def __repr__(self):
+ return self.__repr.format(self=self)
+
+ def joinpath(self, add):
+ next = posixpath.join(self.at, add)
+ return self._next(self.root.resolve_dir(next))
+
+ __truediv__ = joinpath
+
+ @property
+ def parent(self):
+ parent_at = posixpath.dirname(self.at.rstrip('/'))
+ if parent_at:
+ parent_at += '/'
+ return self._next(parent_at)
+
+
def main(args=None):
import argparse
@@ -2423,6 +2423,6 @@ def main(args=None):
zippath = ''
addToZip(zf, path, zippath)
-
+
if __name__ == "__main__":
main()