aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/tools/python3/src/Lib/shutil.py
diff options
context:
space:
mode:
authorshadchin <shadchin@yandex-team.ru>2022-02-10 16:44:30 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:44:30 +0300
commit2598ef1d0aee359b4b6d5fdd1758916d5907d04f (patch)
tree012bb94d777798f1f56ac1cec429509766d05181 /contrib/tools/python3/src/Lib/shutil.py
parent6751af0b0c1b952fede40b19b71da8025b5d8bcf (diff)
downloadydb-2598ef1d0aee359b4b6d5fdd1758916d5907d04f.tar.gz
Restoring authorship annotation for <shadchin@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/tools/python3/src/Lib/shutil.py')
-rw-r--r--contrib/tools/python3/src/Lib/shutil.py802
1 files changed, 401 insertions, 401 deletions
diff --git a/contrib/tools/python3/src/Lib/shutil.py b/contrib/tools/python3/src/Lib/shutil.py
index 752900c588..19dfd529c0 100644
--- a/contrib/tools/python3/src/Lib/shutil.py
+++ b/contrib/tools/python3/src/Lib/shutil.py
@@ -42,20 +42,20 @@ try:
except ImportError:
getgrnam = None
-_WINDOWS = os.name == 'nt'
-posix = nt = None
-if os.name == 'posix':
- import posix
-elif _WINDOWS:
- import nt
-
-COPY_BUFSIZE = 1024 * 1024 if _WINDOWS else 64 * 1024
-_USE_CP_SENDFILE = hasattr(os, "sendfile") and sys.platform.startswith("linux")
-_HAS_FCOPYFILE = posix and hasattr(posix, "_fcopyfile") # macOS
-
-# CMD defaults in Windows 10
-_WIN_DEFAULT_PATHEXT = ".COM;.EXE;.BAT;.CMD;.VBS;.JS;.WS;.MSC"
-
+_WINDOWS = os.name == 'nt'
+posix = nt = None
+if os.name == 'posix':
+ import posix
+elif _WINDOWS:
+ import nt
+
+COPY_BUFSIZE = 1024 * 1024 if _WINDOWS else 64 * 1024
+_USE_CP_SENDFILE = hasattr(os, "sendfile") and sys.platform.startswith("linux")
+_HAS_FCOPYFILE = posix and hasattr(posix, "_fcopyfile") # macOS
+
+# CMD defaults in Windows 10
+_WIN_DEFAULT_PATHEXT = ".COM;.EXE;.BAT;.CMD;.VBS;.JS;.WS;.MSC"
+
__all__ = ["copyfileobj", "copyfile", "copymode", "copystat", "copy", "copy2",
"copytree", "move", "rmtree", "Error", "SpecialFileError",
"ExecError", "make_archive", "get_archive_formats",
@@ -86,135 +86,135 @@ class RegistryError(Exception):
"""Raised when a registry operation with the archiving
and unpacking registries fails"""
-class _GiveupOnFastCopy(Exception):
- """Raised as a signal to fallback on using raw read()/write()
- file copy when fast-copy functions fail to do so.
- """
-
-def _fastcopy_fcopyfile(fsrc, fdst, flags):
- """Copy a regular file content or metadata by using high-performance
- fcopyfile(3) syscall (macOS).
- """
- try:
- infd = fsrc.fileno()
- outfd = fdst.fileno()
- except Exception as err:
- raise _GiveupOnFastCopy(err) # not a regular file
-
- try:
- posix._fcopyfile(infd, outfd, flags)
- except OSError as err:
- err.filename = fsrc.name
- err.filename2 = fdst.name
- if err.errno in {errno.EINVAL, errno.ENOTSUP}:
- raise _GiveupOnFastCopy(err)
- else:
- raise err from None
-
-def _fastcopy_sendfile(fsrc, fdst):
- """Copy data from one regular mmap-like fd to another by using
- high-performance sendfile(2) syscall.
- This should work on Linux >= 2.6.33 only.
- """
- # Note: copyfileobj() is left alone in order to not introduce any
- # unexpected breakage. Possible risks by using zero-copy calls
- # in copyfileobj() are:
- # - fdst cannot be open in "a"(ppend) mode
- # - fsrc and fdst may be open in "t"(ext) mode
- # - fsrc may be a BufferedReader (which hides unread data in a buffer),
- # GzipFile (which decompresses data), HTTPResponse (which decodes
- # chunks).
- # - possibly others (e.g. encrypted fs/partition?)
- global _USE_CP_SENDFILE
- try:
- infd = fsrc.fileno()
- outfd = fdst.fileno()
- except Exception as err:
- raise _GiveupOnFastCopy(err) # not a regular file
-
- # Hopefully the whole file will be copied in a single call.
- # sendfile() is called in a loop 'till EOF is reached (0 return)
- # so a bufsize smaller or bigger than the actual file size
- # should not make any difference, also in case the file content
- # changes while being copied.
- try:
- blocksize = max(os.fstat(infd).st_size, 2 ** 23) # min 8MiB
- except OSError:
- blocksize = 2 ** 27 # 128MiB
- # On 32-bit architectures truncate to 1GiB to avoid OverflowError,
- # see bpo-38319.
- if sys.maxsize < 2 ** 32:
- blocksize = min(blocksize, 2 ** 30)
-
- offset = 0
- while True:
- try:
- sent = os.sendfile(outfd, infd, offset, blocksize)
- except OSError as err:
- # ...in oder to have a more informative exception.
- err.filename = fsrc.name
- err.filename2 = fdst.name
-
- if err.errno == errno.ENOTSOCK:
- # sendfile() on this platform (probably Linux < 2.6.33)
- # does not support copies between regular files (only
- # sockets).
- _USE_CP_SENDFILE = False
- raise _GiveupOnFastCopy(err)
-
- if err.errno == errno.ENOSPC: # filesystem is full
- raise err from None
-
- # Give up on first call and if no data was copied.
- if offset == 0 and os.lseek(outfd, 0, os.SEEK_CUR) == 0:
- raise _GiveupOnFastCopy(err)
-
- raise err
- else:
- if sent == 0:
- break # EOF
- offset += sent
-
-def _copyfileobj_readinto(fsrc, fdst, length=COPY_BUFSIZE):
- """readinto()/memoryview() based variant of copyfileobj().
- *fsrc* must support readinto() method and both files must be
- open in binary mode.
- """
- # Localize variable access to minimize overhead.
- fsrc_readinto = fsrc.readinto
- fdst_write = fdst.write
- with memoryview(bytearray(length)) as mv:
- while True:
- n = fsrc_readinto(mv)
- if not n:
- break
- elif n < length:
- with mv[:n] as smv:
- fdst.write(smv)
- else:
- fdst_write(mv)
-
-def copyfileobj(fsrc, fdst, length=0):
+class _GiveupOnFastCopy(Exception):
+ """Raised as a signal to fallback on using raw read()/write()
+ file copy when fast-copy functions fail to do so.
+ """
+
+def _fastcopy_fcopyfile(fsrc, fdst, flags):
+ """Copy a regular file content or metadata by using high-performance
+ fcopyfile(3) syscall (macOS).
+ """
+ try:
+ infd = fsrc.fileno()
+ outfd = fdst.fileno()
+ except Exception as err:
+ raise _GiveupOnFastCopy(err) # not a regular file
+
+ try:
+ posix._fcopyfile(infd, outfd, flags)
+ except OSError as err:
+ err.filename = fsrc.name
+ err.filename2 = fdst.name
+ if err.errno in {errno.EINVAL, errno.ENOTSUP}:
+ raise _GiveupOnFastCopy(err)
+ else:
+ raise err from None
+
+def _fastcopy_sendfile(fsrc, fdst):
+ """Copy data from one regular mmap-like fd to another by using
+ high-performance sendfile(2) syscall.
+ This should work on Linux >= 2.6.33 only.
+ """
+ # Note: copyfileobj() is left alone in order to not introduce any
+ # unexpected breakage. Possible risks by using zero-copy calls
+ # in copyfileobj() are:
+ # - fdst cannot be open in "a"(ppend) mode
+ # - fsrc and fdst may be open in "t"(ext) mode
+ # - fsrc may be a BufferedReader (which hides unread data in a buffer),
+ # GzipFile (which decompresses data), HTTPResponse (which decodes
+ # chunks).
+ # - possibly others (e.g. encrypted fs/partition?)
+ global _USE_CP_SENDFILE
+ try:
+ infd = fsrc.fileno()
+ outfd = fdst.fileno()
+ except Exception as err:
+ raise _GiveupOnFastCopy(err) # not a regular file
+
+ # Hopefully the whole file will be copied in a single call.
+ # sendfile() is called in a loop 'till EOF is reached (0 return)
+ # so a bufsize smaller or bigger than the actual file size
+ # should not make any difference, also in case the file content
+ # changes while being copied.
+ try:
+ blocksize = max(os.fstat(infd).st_size, 2 ** 23) # min 8MiB
+ except OSError:
+ blocksize = 2 ** 27 # 128MiB
+ # On 32-bit architectures truncate to 1GiB to avoid OverflowError,
+ # see bpo-38319.
+ if sys.maxsize < 2 ** 32:
+ blocksize = min(blocksize, 2 ** 30)
+
+ offset = 0
+ while True:
+ try:
+ sent = os.sendfile(outfd, infd, offset, blocksize)
+ except OSError as err:
+ # ...in oder to have a more informative exception.
+ err.filename = fsrc.name
+ err.filename2 = fdst.name
+
+ if err.errno == errno.ENOTSOCK:
+ # sendfile() on this platform (probably Linux < 2.6.33)
+ # does not support copies between regular files (only
+ # sockets).
+ _USE_CP_SENDFILE = False
+ raise _GiveupOnFastCopy(err)
+
+ if err.errno == errno.ENOSPC: # filesystem is full
+ raise err from None
+
+ # Give up on first call and if no data was copied.
+ if offset == 0 and os.lseek(outfd, 0, os.SEEK_CUR) == 0:
+ raise _GiveupOnFastCopy(err)
+
+ raise err
+ else:
+ if sent == 0:
+ break # EOF
+ offset += sent
+
+def _copyfileobj_readinto(fsrc, fdst, length=COPY_BUFSIZE):
+ """readinto()/memoryview() based variant of copyfileobj().
+ *fsrc* must support readinto() method and both files must be
+ open in binary mode.
+ """
+ # Localize variable access to minimize overhead.
+ fsrc_readinto = fsrc.readinto
+ fdst_write = fdst.write
+ with memoryview(bytearray(length)) as mv:
+ while True:
+ n = fsrc_readinto(mv)
+ if not n:
+ break
+ elif n < length:
+ with mv[:n] as smv:
+ fdst.write(smv)
+ else:
+ fdst_write(mv)
+
+def copyfileobj(fsrc, fdst, length=0):
"""copy data from file-like object fsrc to file-like object fdst"""
- # Localize variable access to minimize overhead.
- if not length:
- length = COPY_BUFSIZE
- fsrc_read = fsrc.read
- fdst_write = fdst.write
- while True:
- buf = fsrc_read(length)
+ # Localize variable access to minimize overhead.
+ if not length:
+ length = COPY_BUFSIZE
+ fsrc_read = fsrc.read
+ fdst_write = fdst.write
+ while True:
+ buf = fsrc_read(length)
if not buf:
break
- fdst_write(buf)
+ fdst_write(buf)
def _samefile(src, dst):
# Macintosh, Unix.
- if isinstance(src, os.DirEntry) and hasattr(os.path, 'samestat'):
- try:
- return os.path.samestat(src.stat(), os.stat(dst))
- except OSError:
- return False
-
+ if isinstance(src, os.DirEntry) and hasattr(os.path, 'samestat'):
+ try:
+ return os.path.samestat(src.stat(), os.stat(dst))
+ except OSError:
+ return False
+
if hasattr(os.path, 'samefile'):
try:
return os.path.samefile(src, dst)
@@ -225,74 +225,74 @@ def _samefile(src, dst):
return (os.path.normcase(os.path.abspath(src)) ==
os.path.normcase(os.path.abspath(dst)))
-def _stat(fn):
- return fn.stat() if isinstance(fn, os.DirEntry) else os.stat(fn)
-
-def _islink(fn):
- return fn.is_symlink() if isinstance(fn, os.DirEntry) else os.path.islink(fn)
-
+def _stat(fn):
+ return fn.stat() if isinstance(fn, os.DirEntry) else os.stat(fn)
+
+def _islink(fn):
+ return fn.is_symlink() if isinstance(fn, os.DirEntry) else os.path.islink(fn)
+
def copyfile(src, dst, *, follow_symlinks=True):
- """Copy data from src to dst in the most efficient way possible.
+ """Copy data from src to dst in the most efficient way possible.
If follow_symlinks is not set and src is a symbolic link, a new
symlink will be created instead of copying the file it points to.
"""
- sys.audit("shutil.copyfile", src, dst)
-
+ sys.audit("shutil.copyfile", src, dst)
+
if _samefile(src, dst):
raise SameFileError("{!r} and {!r} are the same file".format(src, dst))
- file_size = 0
- for i, fn in enumerate([src, dst]):
+ file_size = 0
+ for i, fn in enumerate([src, dst]):
try:
- st = _stat(fn)
+ st = _stat(fn)
except OSError:
# File most likely does not exist
pass
else:
# XXX What about other special files? (sockets, devices...)
if stat.S_ISFIFO(st.st_mode):
- fn = fn.path if isinstance(fn, os.DirEntry) else fn
+ fn = fn.path if isinstance(fn, os.DirEntry) else fn
raise SpecialFileError("`%s` is a named pipe" % fn)
- if _WINDOWS and i == 0:
- file_size = st.st_size
+ if _WINDOWS and i == 0:
+ file_size = st.st_size
- if not follow_symlinks and _islink(src):
+ if not follow_symlinks and _islink(src):
os.symlink(os.readlink(src), dst)
else:
- with open(src, 'rb') as fsrc:
- try:
- with open(dst, 'wb') as fdst:
- # macOS
- if _HAS_FCOPYFILE:
- try:
- _fastcopy_fcopyfile(fsrc, fdst, posix._COPYFILE_DATA)
- return dst
- except _GiveupOnFastCopy:
- pass
- # Linux
- elif _USE_CP_SENDFILE:
- try:
- _fastcopy_sendfile(fsrc, fdst)
- return dst
- except _GiveupOnFastCopy:
- pass
- # Windows, see:
- # https://github.com/python/cpython/pull/7160#discussion_r195405230
- elif _WINDOWS and file_size > 0:
- _copyfileobj_readinto(fsrc, fdst, min(file_size, COPY_BUFSIZE))
- return dst
-
- copyfileobj(fsrc, fdst)
-
- # Issue 43219, raise a less confusing exception
- except IsADirectoryError as e:
- if not os.path.exists(dst):
- raise FileNotFoundError(f'Directory does not exist: {dst}') from e
- else:
- raise
-
+ with open(src, 'rb') as fsrc:
+ try:
+ with open(dst, 'wb') as fdst:
+ # macOS
+ if _HAS_FCOPYFILE:
+ try:
+ _fastcopy_fcopyfile(fsrc, fdst, posix._COPYFILE_DATA)
+ return dst
+ except _GiveupOnFastCopy:
+ pass
+ # Linux
+ elif _USE_CP_SENDFILE:
+ try:
+ _fastcopy_sendfile(fsrc, fdst)
+ return dst
+ except _GiveupOnFastCopy:
+ pass
+ # Windows, see:
+ # https://github.com/python/cpython/pull/7160#discussion_r195405230
+ elif _WINDOWS and file_size > 0:
+ _copyfileobj_readinto(fsrc, fdst, min(file_size, COPY_BUFSIZE))
+ return dst
+
+ copyfileobj(fsrc, fdst)
+
+ # Issue 43219, raise a less confusing exception
+ except IsADirectoryError as e:
+ if not os.path.exists(dst):
+ raise FileNotFoundError(f'Directory does not exist: {dst}') from e
+ else:
+ raise
+
return dst
def copymode(src, dst, *, follow_symlinks=True):
@@ -303,15 +303,15 @@ def copymode(src, dst, *, follow_symlinks=True):
(e.g. Linux) this method does nothing.
"""
- sys.audit("shutil.copymode", src, dst)
-
- if not follow_symlinks and _islink(src) and os.path.islink(dst):
+ sys.audit("shutil.copymode", src, dst)
+
+ if not follow_symlinks and _islink(src) and os.path.islink(dst):
if hasattr(os, 'lchmod'):
stat_func, chmod_func = os.lstat, os.lchmod
else:
return
else:
- stat_func, chmod_func = _stat, os.chmod
+ stat_func, chmod_func = _stat, os.chmod
st = stat_func(src)
chmod_func(dst, stat.S_IMODE(st.st_mode))
@@ -329,7 +329,7 @@ if hasattr(os, 'listxattr'):
try:
names = os.listxattr(src, follow_symlinks=follow_symlinks)
except OSError as e:
- if e.errno not in (errno.ENOTSUP, errno.ENODATA, errno.EINVAL):
+ if e.errno not in (errno.ENOTSUP, errno.ENODATA, errno.EINVAL):
raise
return
for name in names:
@@ -337,8 +337,8 @@ if hasattr(os, 'listxattr'):
value = os.getxattr(src, name, follow_symlinks=follow_symlinks)
os.setxattr(dst, name, value, follow_symlinks=follow_symlinks)
except OSError as e:
- if e.errno not in (errno.EPERM, errno.ENOTSUP, errno.ENODATA,
- errno.EINVAL):
+ if e.errno not in (errno.EPERM, errno.ENOTSUP, errno.ENODATA,
+ errno.EINVAL):
raise
else:
def _copyxattr(*args, **kwargs):
@@ -350,19 +350,19 @@ def copystat(src, dst, *, follow_symlinks=True):
Copy the permission bits, last access time, last modification time, and
flags from `src` to `dst`. On Linux, copystat() also copies the "extended
attributes" where possible. The file contents, owner, and group are
- unaffected. `src` and `dst` are path-like objects or path names given as
- strings.
+ unaffected. `src` and `dst` are path-like objects or path names given as
+ strings.
If the optional flag `follow_symlinks` is not set, symlinks aren't
followed if and only if both `src` and `dst` are symlinks.
"""
- sys.audit("shutil.copystat", src, dst)
-
+ sys.audit("shutil.copystat", src, dst)
+
def _nop(*args, ns=None, follow_symlinks=None):
pass
# follow symlinks (aka don't not follow symlinks)
- follow = follow_symlinks or not (_islink(src) and os.path.islink(dst))
+ follow = follow_symlinks or not (_islink(src) and os.path.islink(dst))
if follow:
# use the real function if it exists
def lookup(name):
@@ -376,16 +376,16 @@ def copystat(src, dst, *, follow_symlinks=True):
return fn
return _nop
- if isinstance(src, os.DirEntry):
- st = src.stat(follow_symlinks=follow)
- else:
- st = lookup("stat")(src, follow_symlinks=follow)
+ if isinstance(src, os.DirEntry):
+ st = src.stat(follow_symlinks=follow)
+ else:
+ st = lookup("stat")(src, follow_symlinks=follow)
mode = stat.S_IMODE(st.st_mode)
lookup("utime")(dst, ns=(st.st_atime_ns, st.st_mtime_ns),
follow_symlinks=follow)
- # We must copy extended attributes before the file is (potentially)
- # chmod()'ed read-only, otherwise setxattr() will error with -EACCES.
- _copyxattr(src, dst, follow_symlinks=follow)
+ # We must copy extended attributes before the file is (potentially)
+ # chmod()'ed read-only, otherwise setxattr() will error with -EACCES.
+ _copyxattr(src, dst, follow_symlinks=follow)
try:
lookup("chmod")(dst, mode, follow_symlinks=follow)
except NotImplementedError:
@@ -457,55 +457,55 @@ def ignore_patterns(*patterns):
return set(ignored_names)
return _ignore_patterns
-def _copytree(entries, src, dst, symlinks, ignore, copy_function,
- ignore_dangling_symlinks, dirs_exist_ok=False):
+def _copytree(entries, src, dst, symlinks, ignore, copy_function,
+ ignore_dangling_symlinks, dirs_exist_ok=False):
if ignore is not None:
- ignored_names = ignore(os.fspath(src), [x.name for x in entries])
+ ignored_names = ignore(os.fspath(src), [x.name for x in entries])
else:
ignored_names = set()
- os.makedirs(dst, exist_ok=dirs_exist_ok)
+ os.makedirs(dst, exist_ok=dirs_exist_ok)
errors = []
- use_srcentry = copy_function is copy2 or copy_function is copy
-
- for srcentry in entries:
- if srcentry.name in ignored_names:
+ use_srcentry = copy_function is copy2 or copy_function is copy
+
+ for srcentry in entries:
+ if srcentry.name in ignored_names:
continue
- srcname = os.path.join(src, srcentry.name)
- dstname = os.path.join(dst, srcentry.name)
- srcobj = srcentry if use_srcentry else srcname
+ srcname = os.path.join(src, srcentry.name)
+ dstname = os.path.join(dst, srcentry.name)
+ srcobj = srcentry if use_srcentry else srcname
try:
- is_symlink = srcentry.is_symlink()
- if is_symlink and os.name == 'nt':
- # Special check for directory junctions, which appear as
- # symlinks but we want to recurse.
- lstat = srcentry.stat(follow_symlinks=False)
- if lstat.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT:
- is_symlink = False
- if is_symlink:
+ is_symlink = srcentry.is_symlink()
+ if is_symlink and os.name == 'nt':
+ # Special check for directory junctions, which appear as
+ # symlinks but we want to recurse.
+ lstat = srcentry.stat(follow_symlinks=False)
+ if lstat.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT:
+ is_symlink = False
+ if is_symlink:
linkto = os.readlink(srcname)
if symlinks:
# We can't just leave it to `copy_function` because legacy
# code with a custom `copy_function` may rely on copytree
# doing the right thing.
os.symlink(linkto, dstname)
- copystat(srcobj, dstname, follow_symlinks=not symlinks)
+ copystat(srcobj, dstname, follow_symlinks=not symlinks)
else:
# ignore dangling symlink if the flag is on
if not os.path.exists(linkto) and ignore_dangling_symlinks:
continue
- # otherwise let the copy occur. copy2 will raise an error
- if srcentry.is_dir():
- copytree(srcobj, dstname, symlinks, ignore,
- copy_function, dirs_exist_ok=dirs_exist_ok)
+ # otherwise let the copy occur. copy2 will raise an error
+ if srcentry.is_dir():
+ copytree(srcobj, dstname, symlinks, ignore,
+ copy_function, dirs_exist_ok=dirs_exist_ok)
else:
- copy_function(srcobj, dstname)
- elif srcentry.is_dir():
- copytree(srcobj, dstname, symlinks, ignore, copy_function,
- dirs_exist_ok=dirs_exist_ok)
+ copy_function(srcobj, dstname)
+ elif srcentry.is_dir():
+ copytree(srcobj, dstname, symlinks, ignore, copy_function,
+ dirs_exist_ok=dirs_exist_ok)
else:
# Will raise a SpecialFileError for unsupported file types
- copy_function(srcobj, dstname)
+ copy_function(srcobj, dstname)
# catch the Error from the recursive copytree so that we can
# continue with other files
except Error as err:
@@ -522,83 +522,83 @@ def _copytree(entries, src, dst, symlinks, ignore, copy_function,
raise Error(errors)
return dst
-def copytree(src, dst, symlinks=False, ignore=None, copy_function=copy2,
- ignore_dangling_symlinks=False, dirs_exist_ok=False):
- """Recursively copy a directory tree and return the destination directory.
-
- dirs_exist_ok dictates whether to raise an exception in case dst or any
- missing parent directory already exists.
-
- If exception(s) occur, an Error is raised with a list of reasons.
-
- If the optional symlinks flag is true, symbolic links in the
- source tree result in symbolic links in the destination tree; if
- it is false, the contents of the files pointed to by symbolic
- links are copied. If the file pointed by the symlink doesn't
- exist, an exception will be added in the list of errors raised in
- an Error exception at the end of the copy process.
-
- You can set the optional ignore_dangling_symlinks flag to true if you
- want to silence this exception. Notice that this has no effect on
- platforms that don't support os.symlink.
-
- The optional ignore argument is a callable. If given, it
- is called with the `src` parameter, which is the directory
- being visited by copytree(), and `names` which is the list of
- `src` contents, as returned by os.listdir():
-
- callable(src, names) -> ignored_names
-
- Since copytree() is called recursively, the callable will be
- called once for each directory that is copied. It returns a
- list of names relative to the `src` directory that should
- not be copied.
-
- The optional copy_function argument is a callable that will be used
- to copy each file. It will be called with the source path and the
- destination path as arguments. By default, copy2() is used, but any
- function that supports the same signature (like copy()) can be used.
-
- """
- sys.audit("shutil.copytree", src, dst)
- with os.scandir(src) as itr:
- entries = list(itr)
- return _copytree(entries=entries, src=src, dst=dst, symlinks=symlinks,
- ignore=ignore, copy_function=copy_function,
- ignore_dangling_symlinks=ignore_dangling_symlinks,
- dirs_exist_ok=dirs_exist_ok)
-
-if hasattr(os.stat_result, 'st_file_attributes'):
- # Special handling for directory junctions to make them behave like
- # symlinks for shutil.rmtree, since in general they do not appear as
- # regular links.
- def _rmtree_isdir(entry):
- try:
- st = entry.stat(follow_symlinks=False)
- return (stat.S_ISDIR(st.st_mode) and not
- (st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT
- and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT))
- except OSError:
- return False
-
- def _rmtree_islink(path):
- try:
- st = os.lstat(path)
- return (stat.S_ISLNK(st.st_mode) or
- (st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT
- and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT))
- except OSError:
- return False
-else:
- def _rmtree_isdir(entry):
- try:
- return entry.is_dir(follow_symlinks=False)
- except OSError:
- return False
-
- def _rmtree_islink(path):
- return os.path.islink(path)
-
+def copytree(src, dst, symlinks=False, ignore=None, copy_function=copy2,
+ ignore_dangling_symlinks=False, dirs_exist_ok=False):
+ """Recursively copy a directory tree and return the destination directory.
+
+ dirs_exist_ok dictates whether to raise an exception in case dst or any
+ missing parent directory already exists.
+
+ If exception(s) occur, an Error is raised with a list of reasons.
+
+ If the optional symlinks flag is true, symbolic links in the
+ source tree result in symbolic links in the destination tree; if
+ it is false, the contents of the files pointed to by symbolic
+ links are copied. If the file pointed by the symlink doesn't
+ exist, an exception will be added in the list of errors raised in
+ an Error exception at the end of the copy process.
+
+ You can set the optional ignore_dangling_symlinks flag to true if you
+ want to silence this exception. Notice that this has no effect on
+ platforms that don't support os.symlink.
+
+ The optional ignore argument is a callable. If given, it
+ is called with the `src` parameter, which is the directory
+ being visited by copytree(), and `names` which is the list of
+ `src` contents, as returned by os.listdir():
+
+ callable(src, names) -> ignored_names
+
+ Since copytree() is called recursively, the callable will be
+ called once for each directory that is copied. It returns a
+ list of names relative to the `src` directory that should
+ not be copied.
+
+ The optional copy_function argument is a callable that will be used
+ to copy each file. It will be called with the source path and the
+ destination path as arguments. By default, copy2() is used, but any
+ function that supports the same signature (like copy()) can be used.
+
+ """
+ sys.audit("shutil.copytree", src, dst)
+ with os.scandir(src) as itr:
+ entries = list(itr)
+ return _copytree(entries=entries, src=src, dst=dst, symlinks=symlinks,
+ ignore=ignore, copy_function=copy_function,
+ ignore_dangling_symlinks=ignore_dangling_symlinks,
+ dirs_exist_ok=dirs_exist_ok)
+
+if hasattr(os.stat_result, 'st_file_attributes'):
+ # Special handling for directory junctions to make them behave like
+ # symlinks for shutil.rmtree, since in general they do not appear as
+ # regular links.
+ def _rmtree_isdir(entry):
+ try:
+ st = entry.stat(follow_symlinks=False)
+ return (stat.S_ISDIR(st.st_mode) and not
+ (st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT
+ and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT))
+ except OSError:
+ return False
+
+ def _rmtree_islink(path):
+ try:
+ st = os.lstat(path)
+ return (stat.S_ISLNK(st.st_mode) or
+ (st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT
+ and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT))
+ except OSError:
+ return False
+else:
+ def _rmtree_isdir(entry):
+ try:
+ return entry.is_dir(follow_symlinks=False)
+ except OSError:
+ return False
+
+ def _rmtree_islink(path):
+ return os.path.islink(path)
+
# version vulnerable to race conditions
def _rmtree_unsafe(path, onerror):
try:
@@ -609,7 +609,7 @@ def _rmtree_unsafe(path, onerror):
entries = []
for entry in entries:
fullname = entry.path
- if _rmtree_isdir(entry):
+ if _rmtree_isdir(entry):
try:
if entry.is_symlink():
# This can only happen if someone replaces
@@ -645,14 +645,14 @@ def _rmtree_safe_fd(topfd, path, onerror):
is_dir = entry.is_dir(follow_symlinks=False)
except OSError:
is_dir = False
- else:
- if is_dir:
- try:
- orig_st = entry.stat(follow_symlinks=False)
- is_dir = stat.S_ISDIR(orig_st.st_mode)
- except OSError:
- onerror(os.lstat, fullname, sys.exc_info())
- continue
+ else:
+ if is_dir:
+ try:
+ orig_st = entry.stat(follow_symlinks=False)
+ is_dir = stat.S_ISDIR(orig_st.st_mode)
+ except OSError:
+ onerror(os.lstat, fullname, sys.exc_info())
+ continue
if is_dir:
try:
dirfd = os.open(entry.name, os.O_RDONLY, dir_fd=topfd)
@@ -699,7 +699,7 @@ def rmtree(path, ignore_errors=False, onerror=None):
is false and onerror is None, an exception is raised.
"""
- sys.audit("shutil.rmtree", path)
+ sys.audit("shutil.rmtree", path)
if ignore_errors:
def onerror(*args):
pass
@@ -720,7 +720,7 @@ def rmtree(path, ignore_errors=False, onerror=None):
try:
fd = os.open(path, os.O_RDONLY)
except Exception:
- onerror(os.open, path, sys.exc_info())
+ onerror(os.open, path, sys.exc_info())
return
try:
if os.path.samestat(orig_st, os.fstat(fd)):
@@ -739,7 +739,7 @@ def rmtree(path, ignore_errors=False, onerror=None):
os.close(fd)
else:
try:
- if _rmtree_islink(path):
+ if _rmtree_islink(path):
# symlinks to directories are forbidden, see bug #1669
raise OSError("Cannot call rmtree on a symbolic link")
except OSError:
@@ -753,20 +753,20 @@ def rmtree(path, ignore_errors=False, onerror=None):
rmtree.avoids_symlink_attacks = _use_fd_functions
def _basename(path):
- """A basename() variant which first strips the trailing slash, if present.
- Thus we always get the last component of the path, even for directories.
-
- path: Union[PathLike, str]
-
- e.g.
- >>> os.path.basename('/bar/foo')
- 'foo'
- >>> os.path.basename('/bar/foo/')
- ''
- >>> _basename('/bar/foo/')
- 'foo'
- """
- path = os.fspath(path)
+ """A basename() variant which first strips the trailing slash, if present.
+ Thus we always get the last component of the path, even for directories.
+
+ path: Union[PathLike, str]
+
+ e.g.
+ >>> os.path.basename('/bar/foo')
+ 'foo'
+ >>> os.path.basename('/bar/foo/')
+ ''
+ >>> _basename('/bar/foo/')
+ 'foo'
+ """
+ path = os.fspath(path)
sep = os.path.sep + (os.path.altsep or '')
return os.path.basename(path.rstrip(sep))
@@ -796,7 +796,7 @@ def move(src, dst, copy_function=copy2):
the issues this implementation glosses over.
"""
- sys.audit("shutil.move", src, dst)
+ sys.audit("shutil.move", src, dst)
real_dst = dst
if os.path.isdir(dst):
if _samefile(src, dst):
@@ -805,10 +805,10 @@ def move(src, dst, copy_function=copy2):
os.rename(src, dst)
return
- # Using _basename instead of os.path.basename is important, as we must
- # ignore any trailing slash to avoid the basename returning ''
+ # Using _basename instead of os.path.basename is important, as we must
+ # ignore any trailing slash to avoid the basename returning ''
real_dst = os.path.join(dst, _basename(src))
-
+
if os.path.exists(real_dst):
raise Error("Destination path '%s' already exists" % real_dst)
try:
@@ -822,12 +822,12 @@ def move(src, dst, copy_function=copy2):
if _destinsrc(src, dst):
raise Error("Cannot move a directory '%s' into itself"
" '%s'." % (src, dst))
- if (_is_immutable(src)
- or (not os.access(src, os.W_OK) and os.listdir(src)
- and sys.platform == 'darwin')):
- raise PermissionError("Cannot move the non-empty directory "
- "'%s': Lacking write permission to '%s'."
- % (src, src))
+ if (_is_immutable(src)
+ or (not os.access(src, os.W_OK) and os.listdir(src)
+ and sys.platform == 'darwin')):
+ raise PermissionError("Cannot move the non-empty directory "
+ "'%s': Lacking write permission to '%s'."
+ % (src, src))
copytree(src, real_dst, copy_function=copy_function,
symlinks=True)
rmtree(src)
@@ -845,11 +845,11 @@ def _destinsrc(src, dst):
dst += os.path.sep
return dst.startswith(src)
-def _is_immutable(src):
- st = _stat(src)
- immutable_states = [stat.UF_IMMUTABLE, stat.SF_IMMUTABLE]
- return hasattr(st, 'st_flags') and st.st_flags in immutable_states
-
+def _is_immutable(src):
+ st = _stat(src)
+ immutable_states = [stat.UF_IMMUTABLE, stat.SF_IMMUTABLE]
+ return hasattr(st, 'st_flags') and st.st_flags in immutable_states
+
def _get_gid(name):
"""Returns a gid, given a group name."""
if getgrnam is None or name is None:
@@ -1052,7 +1052,7 @@ def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0,
'owner' and 'group' are used when creating a tar archive. By default,
uses the current owner and group.
"""
- sys.audit("shutil.make_archive", base_name, format, root_dir, base_dir)
+ sys.audit("shutil.make_archive", base_name, format, root_dir, base_dir)
save_cwd = os.getcwd()
if root_dir is not None:
if logger is not None:
@@ -1168,16 +1168,16 @@ def _unpack_zipfile(filename, extract_dir):
if name.startswith('/') or '..' in name:
continue
- targetpath = os.path.join(extract_dir, *name.split('/'))
- if not targetpath:
+ targetpath = os.path.join(extract_dir, *name.split('/'))
+ if not targetpath:
continue
- _ensure_directory(targetpath)
+ _ensure_directory(targetpath)
if not name.endswith('/'):
# file
- with zip.open(name, 'r') as source, \
- open(targetpath, 'wb') as target:
- copyfileobj(source, target)
+ with zip.open(name, 'r') as source, \
+ open(targetpath, 'wb') as target:
+ copyfileobj(source, target)
finally:
zip.close()
@@ -1234,8 +1234,8 @@ def unpack_archive(filename, extract_dir=None, format=None):
In case none is found, a ValueError is raised.
"""
- sys.audit("shutil.unpack_archive", filename, extract_dir, format)
-
+ sys.audit("shutil.unpack_archive", filename, extract_dir, format)
+
if extract_dir is None:
extract_dir = os.getcwd()
@@ -1281,7 +1281,7 @@ if hasattr(os, 'statvfs'):
used = (st.f_blocks - st.f_bfree) * st.f_frsize
return _ntuple_diskusage(total, used, free)
-elif _WINDOWS:
+elif _WINDOWS:
__all__.append('disk_usage')
_ntuple_diskusage = collections.namedtuple('usage', 'total used free')
@@ -1303,7 +1303,7 @@ def chown(path, user=None, group=None):
user and group can be the uid/gid or the user/group names, and in that case,
they are converted to their respective uid/gid.
"""
- sys.audit('shutil.chown', path, user, group)
+ sys.audit('shutil.chown', path, user, group)
if user is None and group is None:
raise ValueError("user and/or group must be set")
@@ -1374,15 +1374,15 @@ def get_terminal_size(fallback=(80, 24)):
return os.terminal_size((columns, lines))
-
-# Check that a given file can be accessed with the correct mode.
-# Additionally check that `file` is not a directory, as on Windows
-# directories pass the os.access check.
-def _access_check(fn, mode):
- return (os.path.exists(fn) and os.access(fn, mode)
- and not os.path.isdir(fn))
-
-
+
+# Check that a given file can be accessed with the correct mode.
+# Additionally check that `file` is not a directory, as on Windows
+# directories pass the os.access check.
+def _access_check(fn, mode):
+ return (os.path.exists(fn) and os.access(fn, mode)
+ and not os.path.isdir(fn))
+
+
def which(cmd, mode=os.F_OK | os.X_OK, path=None):
"""Given a command, mode, and a PATH string, return the path which
conforms to the given mode on the PATH, or None if there is no such
@@ -1401,44 +1401,44 @@ def which(cmd, mode=os.F_OK | os.X_OK, path=None):
return cmd
return None
- use_bytes = isinstance(cmd, bytes)
-
+ use_bytes = isinstance(cmd, bytes)
+
if path is None:
- path = os.environ.get("PATH", None)
- if path is None:
- try:
- path = os.confstr("CS_PATH")
- except (AttributeError, ValueError):
- # os.confstr() or CS_PATH is not available
- path = os.defpath
- # bpo-35755: Don't use os.defpath if the PATH environment variable is
- # set to an empty string
-
- # PATH='' doesn't match, whereas PATH=':' looks in the current directory
+ path = os.environ.get("PATH", None)
+ if path is None:
+ try:
+ path = os.confstr("CS_PATH")
+ except (AttributeError, ValueError):
+ # os.confstr() or CS_PATH is not available
+ path = os.defpath
+ # bpo-35755: Don't use os.defpath if the PATH environment variable is
+ # set to an empty string
+
+ # PATH='' doesn't match, whereas PATH=':' looks in the current directory
if not path:
return None
- if use_bytes:
- path = os.fsencode(path)
- path = path.split(os.fsencode(os.pathsep))
- else:
- path = os.fsdecode(path)
- path = path.split(os.pathsep)
-
+ if use_bytes:
+ path = os.fsencode(path)
+ path = path.split(os.fsencode(os.pathsep))
+ else:
+ path = os.fsdecode(path)
+ path = path.split(os.pathsep)
+
if sys.platform == "win32":
# The current directory takes precedence on Windows.
- curdir = os.curdir
- if use_bytes:
- curdir = os.fsencode(curdir)
- if curdir not in path:
- path.insert(0, curdir)
+ curdir = os.curdir
+ if use_bytes:
+ curdir = os.fsencode(curdir)
+ if curdir not in path:
+ path.insert(0, curdir)
# PATHEXT is necessary to check on Windows.
- pathext_source = os.getenv("PATHEXT") or _WIN_DEFAULT_PATHEXT
- pathext = [ext for ext in pathext_source.split(os.pathsep) if ext]
-
- if use_bytes:
- pathext = [os.fsencode(ext) for ext in pathext]
+ pathext_source = os.getenv("PATHEXT") or _WIN_DEFAULT_PATHEXT
+ pathext = [ext for ext in pathext_source.split(os.pathsep) if ext]
+
+ if use_bytes:
+ pathext = [os.fsencode(ext) for ext in pathext]
# See if the given file matches any of the expected path extensions.
# This will allow us to short circuit when given "python.exe".
# If it does match, only test that one, otherwise we have to try