diff options
author | shadchin <shadchin@yandex-team.ru> | 2022-02-10 16:44:30 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:44:30 +0300 |
commit | 2598ef1d0aee359b4b6d5fdd1758916d5907d04f (patch) | |
tree | 012bb94d777798f1f56ac1cec429509766d05181 /contrib/tools/python3/src/Lib/shutil.py | |
parent | 6751af0b0c1b952fede40b19b71da8025b5d8bcf (diff) | |
download | ydb-2598ef1d0aee359b4b6d5fdd1758916d5907d04f.tar.gz |
Restoring authorship annotation for <shadchin@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/tools/python3/src/Lib/shutil.py')
-rw-r--r-- | contrib/tools/python3/src/Lib/shutil.py | 802 |
1 files changed, 401 insertions, 401 deletions
diff --git a/contrib/tools/python3/src/Lib/shutil.py b/contrib/tools/python3/src/Lib/shutil.py index 752900c588..19dfd529c0 100644 --- a/contrib/tools/python3/src/Lib/shutil.py +++ b/contrib/tools/python3/src/Lib/shutil.py @@ -42,20 +42,20 @@ try: except ImportError: getgrnam = None -_WINDOWS = os.name == 'nt' -posix = nt = None -if os.name == 'posix': - import posix -elif _WINDOWS: - import nt - -COPY_BUFSIZE = 1024 * 1024 if _WINDOWS else 64 * 1024 -_USE_CP_SENDFILE = hasattr(os, "sendfile") and sys.platform.startswith("linux") -_HAS_FCOPYFILE = posix and hasattr(posix, "_fcopyfile") # macOS - -# CMD defaults in Windows 10 -_WIN_DEFAULT_PATHEXT = ".COM;.EXE;.BAT;.CMD;.VBS;.JS;.WS;.MSC" - +_WINDOWS = os.name == 'nt' +posix = nt = None +if os.name == 'posix': + import posix +elif _WINDOWS: + import nt + +COPY_BUFSIZE = 1024 * 1024 if _WINDOWS else 64 * 1024 +_USE_CP_SENDFILE = hasattr(os, "sendfile") and sys.platform.startswith("linux") +_HAS_FCOPYFILE = posix and hasattr(posix, "_fcopyfile") # macOS + +# CMD defaults in Windows 10 +_WIN_DEFAULT_PATHEXT = ".COM;.EXE;.BAT;.CMD;.VBS;.JS;.WS;.MSC" + __all__ = ["copyfileobj", "copyfile", "copymode", "copystat", "copy", "copy2", "copytree", "move", "rmtree", "Error", "SpecialFileError", "ExecError", "make_archive", "get_archive_formats", @@ -86,135 +86,135 @@ class RegistryError(Exception): """Raised when a registry operation with the archiving and unpacking registries fails""" -class _GiveupOnFastCopy(Exception): - """Raised as a signal to fallback on using raw read()/write() - file copy when fast-copy functions fail to do so. - """ - -def _fastcopy_fcopyfile(fsrc, fdst, flags): - """Copy a regular file content or metadata by using high-performance - fcopyfile(3) syscall (macOS). - """ - try: - infd = fsrc.fileno() - outfd = fdst.fileno() - except Exception as err: - raise _GiveupOnFastCopy(err) # not a regular file - - try: - posix._fcopyfile(infd, outfd, flags) - except OSError as err: - err.filename = fsrc.name - err.filename2 = fdst.name - if err.errno in {errno.EINVAL, errno.ENOTSUP}: - raise _GiveupOnFastCopy(err) - else: - raise err from None - -def _fastcopy_sendfile(fsrc, fdst): - """Copy data from one regular mmap-like fd to another by using - high-performance sendfile(2) syscall. - This should work on Linux >= 2.6.33 only. - """ - # Note: copyfileobj() is left alone in order to not introduce any - # unexpected breakage. Possible risks by using zero-copy calls - # in copyfileobj() are: - # - fdst cannot be open in "a"(ppend) mode - # - fsrc and fdst may be open in "t"(ext) mode - # - fsrc may be a BufferedReader (which hides unread data in a buffer), - # GzipFile (which decompresses data), HTTPResponse (which decodes - # chunks). - # - possibly others (e.g. encrypted fs/partition?) - global _USE_CP_SENDFILE - try: - infd = fsrc.fileno() - outfd = fdst.fileno() - except Exception as err: - raise _GiveupOnFastCopy(err) # not a regular file - - # Hopefully the whole file will be copied in a single call. - # sendfile() is called in a loop 'till EOF is reached (0 return) - # so a bufsize smaller or bigger than the actual file size - # should not make any difference, also in case the file content - # changes while being copied. - try: - blocksize = max(os.fstat(infd).st_size, 2 ** 23) # min 8MiB - except OSError: - blocksize = 2 ** 27 # 128MiB - # On 32-bit architectures truncate to 1GiB to avoid OverflowError, - # see bpo-38319. - if sys.maxsize < 2 ** 32: - blocksize = min(blocksize, 2 ** 30) - - offset = 0 - while True: - try: - sent = os.sendfile(outfd, infd, offset, blocksize) - except OSError as err: - # ...in oder to have a more informative exception. - err.filename = fsrc.name - err.filename2 = fdst.name - - if err.errno == errno.ENOTSOCK: - # sendfile() on this platform (probably Linux < 2.6.33) - # does not support copies between regular files (only - # sockets). - _USE_CP_SENDFILE = False - raise _GiveupOnFastCopy(err) - - if err.errno == errno.ENOSPC: # filesystem is full - raise err from None - - # Give up on first call and if no data was copied. - if offset == 0 and os.lseek(outfd, 0, os.SEEK_CUR) == 0: - raise _GiveupOnFastCopy(err) - - raise err - else: - if sent == 0: - break # EOF - offset += sent - -def _copyfileobj_readinto(fsrc, fdst, length=COPY_BUFSIZE): - """readinto()/memoryview() based variant of copyfileobj(). - *fsrc* must support readinto() method and both files must be - open in binary mode. - """ - # Localize variable access to minimize overhead. - fsrc_readinto = fsrc.readinto - fdst_write = fdst.write - with memoryview(bytearray(length)) as mv: - while True: - n = fsrc_readinto(mv) - if not n: - break - elif n < length: - with mv[:n] as smv: - fdst.write(smv) - else: - fdst_write(mv) - -def copyfileobj(fsrc, fdst, length=0): +class _GiveupOnFastCopy(Exception): + """Raised as a signal to fallback on using raw read()/write() + file copy when fast-copy functions fail to do so. + """ + +def _fastcopy_fcopyfile(fsrc, fdst, flags): + """Copy a regular file content or metadata by using high-performance + fcopyfile(3) syscall (macOS). + """ + try: + infd = fsrc.fileno() + outfd = fdst.fileno() + except Exception as err: + raise _GiveupOnFastCopy(err) # not a regular file + + try: + posix._fcopyfile(infd, outfd, flags) + except OSError as err: + err.filename = fsrc.name + err.filename2 = fdst.name + if err.errno in {errno.EINVAL, errno.ENOTSUP}: + raise _GiveupOnFastCopy(err) + else: + raise err from None + +def _fastcopy_sendfile(fsrc, fdst): + """Copy data from one regular mmap-like fd to another by using + high-performance sendfile(2) syscall. + This should work on Linux >= 2.6.33 only. + """ + # Note: copyfileobj() is left alone in order to not introduce any + # unexpected breakage. Possible risks by using zero-copy calls + # in copyfileobj() are: + # - fdst cannot be open in "a"(ppend) mode + # - fsrc and fdst may be open in "t"(ext) mode + # - fsrc may be a BufferedReader (which hides unread data in a buffer), + # GzipFile (which decompresses data), HTTPResponse (which decodes + # chunks). + # - possibly others (e.g. encrypted fs/partition?) + global _USE_CP_SENDFILE + try: + infd = fsrc.fileno() + outfd = fdst.fileno() + except Exception as err: + raise _GiveupOnFastCopy(err) # not a regular file + + # Hopefully the whole file will be copied in a single call. + # sendfile() is called in a loop 'till EOF is reached (0 return) + # so a bufsize smaller or bigger than the actual file size + # should not make any difference, also in case the file content + # changes while being copied. + try: + blocksize = max(os.fstat(infd).st_size, 2 ** 23) # min 8MiB + except OSError: + blocksize = 2 ** 27 # 128MiB + # On 32-bit architectures truncate to 1GiB to avoid OverflowError, + # see bpo-38319. + if sys.maxsize < 2 ** 32: + blocksize = min(blocksize, 2 ** 30) + + offset = 0 + while True: + try: + sent = os.sendfile(outfd, infd, offset, blocksize) + except OSError as err: + # ...in oder to have a more informative exception. + err.filename = fsrc.name + err.filename2 = fdst.name + + if err.errno == errno.ENOTSOCK: + # sendfile() on this platform (probably Linux < 2.6.33) + # does not support copies between regular files (only + # sockets). + _USE_CP_SENDFILE = False + raise _GiveupOnFastCopy(err) + + if err.errno == errno.ENOSPC: # filesystem is full + raise err from None + + # Give up on first call and if no data was copied. + if offset == 0 and os.lseek(outfd, 0, os.SEEK_CUR) == 0: + raise _GiveupOnFastCopy(err) + + raise err + else: + if sent == 0: + break # EOF + offset += sent + +def _copyfileobj_readinto(fsrc, fdst, length=COPY_BUFSIZE): + """readinto()/memoryview() based variant of copyfileobj(). + *fsrc* must support readinto() method and both files must be + open in binary mode. + """ + # Localize variable access to minimize overhead. + fsrc_readinto = fsrc.readinto + fdst_write = fdst.write + with memoryview(bytearray(length)) as mv: + while True: + n = fsrc_readinto(mv) + if not n: + break + elif n < length: + with mv[:n] as smv: + fdst.write(smv) + else: + fdst_write(mv) + +def copyfileobj(fsrc, fdst, length=0): """copy data from file-like object fsrc to file-like object fdst""" - # Localize variable access to minimize overhead. - if not length: - length = COPY_BUFSIZE - fsrc_read = fsrc.read - fdst_write = fdst.write - while True: - buf = fsrc_read(length) + # Localize variable access to minimize overhead. + if not length: + length = COPY_BUFSIZE + fsrc_read = fsrc.read + fdst_write = fdst.write + while True: + buf = fsrc_read(length) if not buf: break - fdst_write(buf) + fdst_write(buf) def _samefile(src, dst): # Macintosh, Unix. - if isinstance(src, os.DirEntry) and hasattr(os.path, 'samestat'): - try: - return os.path.samestat(src.stat(), os.stat(dst)) - except OSError: - return False - + if isinstance(src, os.DirEntry) and hasattr(os.path, 'samestat'): + try: + return os.path.samestat(src.stat(), os.stat(dst)) + except OSError: + return False + if hasattr(os.path, 'samefile'): try: return os.path.samefile(src, dst) @@ -225,74 +225,74 @@ def _samefile(src, dst): return (os.path.normcase(os.path.abspath(src)) == os.path.normcase(os.path.abspath(dst))) -def _stat(fn): - return fn.stat() if isinstance(fn, os.DirEntry) else os.stat(fn) - -def _islink(fn): - return fn.is_symlink() if isinstance(fn, os.DirEntry) else os.path.islink(fn) - +def _stat(fn): + return fn.stat() if isinstance(fn, os.DirEntry) else os.stat(fn) + +def _islink(fn): + return fn.is_symlink() if isinstance(fn, os.DirEntry) else os.path.islink(fn) + def copyfile(src, dst, *, follow_symlinks=True): - """Copy data from src to dst in the most efficient way possible. + """Copy data from src to dst in the most efficient way possible. If follow_symlinks is not set and src is a symbolic link, a new symlink will be created instead of copying the file it points to. """ - sys.audit("shutil.copyfile", src, dst) - + sys.audit("shutil.copyfile", src, dst) + if _samefile(src, dst): raise SameFileError("{!r} and {!r} are the same file".format(src, dst)) - file_size = 0 - for i, fn in enumerate([src, dst]): + file_size = 0 + for i, fn in enumerate([src, dst]): try: - st = _stat(fn) + st = _stat(fn) except OSError: # File most likely does not exist pass else: # XXX What about other special files? (sockets, devices...) if stat.S_ISFIFO(st.st_mode): - fn = fn.path if isinstance(fn, os.DirEntry) else fn + fn = fn.path if isinstance(fn, os.DirEntry) else fn raise SpecialFileError("`%s` is a named pipe" % fn) - if _WINDOWS and i == 0: - file_size = st.st_size + if _WINDOWS and i == 0: + file_size = st.st_size - if not follow_symlinks and _islink(src): + if not follow_symlinks and _islink(src): os.symlink(os.readlink(src), dst) else: - with open(src, 'rb') as fsrc: - try: - with open(dst, 'wb') as fdst: - # macOS - if _HAS_FCOPYFILE: - try: - _fastcopy_fcopyfile(fsrc, fdst, posix._COPYFILE_DATA) - return dst - except _GiveupOnFastCopy: - pass - # Linux - elif _USE_CP_SENDFILE: - try: - _fastcopy_sendfile(fsrc, fdst) - return dst - except _GiveupOnFastCopy: - pass - # Windows, see: - # https://github.com/python/cpython/pull/7160#discussion_r195405230 - elif _WINDOWS and file_size > 0: - _copyfileobj_readinto(fsrc, fdst, min(file_size, COPY_BUFSIZE)) - return dst - - copyfileobj(fsrc, fdst) - - # Issue 43219, raise a less confusing exception - except IsADirectoryError as e: - if not os.path.exists(dst): - raise FileNotFoundError(f'Directory does not exist: {dst}') from e - else: - raise - + with open(src, 'rb') as fsrc: + try: + with open(dst, 'wb') as fdst: + # macOS + if _HAS_FCOPYFILE: + try: + _fastcopy_fcopyfile(fsrc, fdst, posix._COPYFILE_DATA) + return dst + except _GiveupOnFastCopy: + pass + # Linux + elif _USE_CP_SENDFILE: + try: + _fastcopy_sendfile(fsrc, fdst) + return dst + except _GiveupOnFastCopy: + pass + # Windows, see: + # https://github.com/python/cpython/pull/7160#discussion_r195405230 + elif _WINDOWS and file_size > 0: + _copyfileobj_readinto(fsrc, fdst, min(file_size, COPY_BUFSIZE)) + return dst + + copyfileobj(fsrc, fdst) + + # Issue 43219, raise a less confusing exception + except IsADirectoryError as e: + if not os.path.exists(dst): + raise FileNotFoundError(f'Directory does not exist: {dst}') from e + else: + raise + return dst def copymode(src, dst, *, follow_symlinks=True): @@ -303,15 +303,15 @@ def copymode(src, dst, *, follow_symlinks=True): (e.g. Linux) this method does nothing. """ - sys.audit("shutil.copymode", src, dst) - - if not follow_symlinks and _islink(src) and os.path.islink(dst): + sys.audit("shutil.copymode", src, dst) + + if not follow_symlinks and _islink(src) and os.path.islink(dst): if hasattr(os, 'lchmod'): stat_func, chmod_func = os.lstat, os.lchmod else: return else: - stat_func, chmod_func = _stat, os.chmod + stat_func, chmod_func = _stat, os.chmod st = stat_func(src) chmod_func(dst, stat.S_IMODE(st.st_mode)) @@ -329,7 +329,7 @@ if hasattr(os, 'listxattr'): try: names = os.listxattr(src, follow_symlinks=follow_symlinks) except OSError as e: - if e.errno not in (errno.ENOTSUP, errno.ENODATA, errno.EINVAL): + if e.errno not in (errno.ENOTSUP, errno.ENODATA, errno.EINVAL): raise return for name in names: @@ -337,8 +337,8 @@ if hasattr(os, 'listxattr'): value = os.getxattr(src, name, follow_symlinks=follow_symlinks) os.setxattr(dst, name, value, follow_symlinks=follow_symlinks) except OSError as e: - if e.errno not in (errno.EPERM, errno.ENOTSUP, errno.ENODATA, - errno.EINVAL): + if e.errno not in (errno.EPERM, errno.ENOTSUP, errno.ENODATA, + errno.EINVAL): raise else: def _copyxattr(*args, **kwargs): @@ -350,19 +350,19 @@ def copystat(src, dst, *, follow_symlinks=True): Copy the permission bits, last access time, last modification time, and flags from `src` to `dst`. On Linux, copystat() also copies the "extended attributes" where possible. The file contents, owner, and group are - unaffected. `src` and `dst` are path-like objects or path names given as - strings. + unaffected. `src` and `dst` are path-like objects or path names given as + strings. If the optional flag `follow_symlinks` is not set, symlinks aren't followed if and only if both `src` and `dst` are symlinks. """ - sys.audit("shutil.copystat", src, dst) - + sys.audit("shutil.copystat", src, dst) + def _nop(*args, ns=None, follow_symlinks=None): pass # follow symlinks (aka don't not follow symlinks) - follow = follow_symlinks or not (_islink(src) and os.path.islink(dst)) + follow = follow_symlinks or not (_islink(src) and os.path.islink(dst)) if follow: # use the real function if it exists def lookup(name): @@ -376,16 +376,16 @@ def copystat(src, dst, *, follow_symlinks=True): return fn return _nop - if isinstance(src, os.DirEntry): - st = src.stat(follow_symlinks=follow) - else: - st = lookup("stat")(src, follow_symlinks=follow) + if isinstance(src, os.DirEntry): + st = src.stat(follow_symlinks=follow) + else: + st = lookup("stat")(src, follow_symlinks=follow) mode = stat.S_IMODE(st.st_mode) lookup("utime")(dst, ns=(st.st_atime_ns, st.st_mtime_ns), follow_symlinks=follow) - # We must copy extended attributes before the file is (potentially) - # chmod()'ed read-only, otherwise setxattr() will error with -EACCES. - _copyxattr(src, dst, follow_symlinks=follow) + # We must copy extended attributes before the file is (potentially) + # chmod()'ed read-only, otherwise setxattr() will error with -EACCES. + _copyxattr(src, dst, follow_symlinks=follow) try: lookup("chmod")(dst, mode, follow_symlinks=follow) except NotImplementedError: @@ -457,55 +457,55 @@ def ignore_patterns(*patterns): return set(ignored_names) return _ignore_patterns -def _copytree(entries, src, dst, symlinks, ignore, copy_function, - ignore_dangling_symlinks, dirs_exist_ok=False): +def _copytree(entries, src, dst, symlinks, ignore, copy_function, + ignore_dangling_symlinks, dirs_exist_ok=False): if ignore is not None: - ignored_names = ignore(os.fspath(src), [x.name for x in entries]) + ignored_names = ignore(os.fspath(src), [x.name for x in entries]) else: ignored_names = set() - os.makedirs(dst, exist_ok=dirs_exist_ok) + os.makedirs(dst, exist_ok=dirs_exist_ok) errors = [] - use_srcentry = copy_function is copy2 or copy_function is copy - - for srcentry in entries: - if srcentry.name in ignored_names: + use_srcentry = copy_function is copy2 or copy_function is copy + + for srcentry in entries: + if srcentry.name in ignored_names: continue - srcname = os.path.join(src, srcentry.name) - dstname = os.path.join(dst, srcentry.name) - srcobj = srcentry if use_srcentry else srcname + srcname = os.path.join(src, srcentry.name) + dstname = os.path.join(dst, srcentry.name) + srcobj = srcentry if use_srcentry else srcname try: - is_symlink = srcentry.is_symlink() - if is_symlink and os.name == 'nt': - # Special check for directory junctions, which appear as - # symlinks but we want to recurse. - lstat = srcentry.stat(follow_symlinks=False) - if lstat.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT: - is_symlink = False - if is_symlink: + is_symlink = srcentry.is_symlink() + if is_symlink and os.name == 'nt': + # Special check for directory junctions, which appear as + # symlinks but we want to recurse. + lstat = srcentry.stat(follow_symlinks=False) + if lstat.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT: + is_symlink = False + if is_symlink: linkto = os.readlink(srcname) if symlinks: # We can't just leave it to `copy_function` because legacy # code with a custom `copy_function` may rely on copytree # doing the right thing. os.symlink(linkto, dstname) - copystat(srcobj, dstname, follow_symlinks=not symlinks) + copystat(srcobj, dstname, follow_symlinks=not symlinks) else: # ignore dangling symlink if the flag is on if not os.path.exists(linkto) and ignore_dangling_symlinks: continue - # otherwise let the copy occur. copy2 will raise an error - if srcentry.is_dir(): - copytree(srcobj, dstname, symlinks, ignore, - copy_function, dirs_exist_ok=dirs_exist_ok) + # otherwise let the copy occur. copy2 will raise an error + if srcentry.is_dir(): + copytree(srcobj, dstname, symlinks, ignore, + copy_function, dirs_exist_ok=dirs_exist_ok) else: - copy_function(srcobj, dstname) - elif srcentry.is_dir(): - copytree(srcobj, dstname, symlinks, ignore, copy_function, - dirs_exist_ok=dirs_exist_ok) + copy_function(srcobj, dstname) + elif srcentry.is_dir(): + copytree(srcobj, dstname, symlinks, ignore, copy_function, + dirs_exist_ok=dirs_exist_ok) else: # Will raise a SpecialFileError for unsupported file types - copy_function(srcobj, dstname) + copy_function(srcobj, dstname) # catch the Error from the recursive copytree so that we can # continue with other files except Error as err: @@ -522,83 +522,83 @@ def _copytree(entries, src, dst, symlinks, ignore, copy_function, raise Error(errors) return dst -def copytree(src, dst, symlinks=False, ignore=None, copy_function=copy2, - ignore_dangling_symlinks=False, dirs_exist_ok=False): - """Recursively copy a directory tree and return the destination directory. - - dirs_exist_ok dictates whether to raise an exception in case dst or any - missing parent directory already exists. - - If exception(s) occur, an Error is raised with a list of reasons. - - If the optional symlinks flag is true, symbolic links in the - source tree result in symbolic links in the destination tree; if - it is false, the contents of the files pointed to by symbolic - links are copied. If the file pointed by the symlink doesn't - exist, an exception will be added in the list of errors raised in - an Error exception at the end of the copy process. - - You can set the optional ignore_dangling_symlinks flag to true if you - want to silence this exception. Notice that this has no effect on - platforms that don't support os.symlink. - - The optional ignore argument is a callable. If given, it - is called with the `src` parameter, which is the directory - being visited by copytree(), and `names` which is the list of - `src` contents, as returned by os.listdir(): - - callable(src, names) -> ignored_names - - Since copytree() is called recursively, the callable will be - called once for each directory that is copied. It returns a - list of names relative to the `src` directory that should - not be copied. - - The optional copy_function argument is a callable that will be used - to copy each file. It will be called with the source path and the - destination path as arguments. By default, copy2() is used, but any - function that supports the same signature (like copy()) can be used. - - """ - sys.audit("shutil.copytree", src, dst) - with os.scandir(src) as itr: - entries = list(itr) - return _copytree(entries=entries, src=src, dst=dst, symlinks=symlinks, - ignore=ignore, copy_function=copy_function, - ignore_dangling_symlinks=ignore_dangling_symlinks, - dirs_exist_ok=dirs_exist_ok) - -if hasattr(os.stat_result, 'st_file_attributes'): - # Special handling for directory junctions to make them behave like - # symlinks for shutil.rmtree, since in general they do not appear as - # regular links. - def _rmtree_isdir(entry): - try: - st = entry.stat(follow_symlinks=False) - return (stat.S_ISDIR(st.st_mode) and not - (st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT - and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT)) - except OSError: - return False - - def _rmtree_islink(path): - try: - st = os.lstat(path) - return (stat.S_ISLNK(st.st_mode) or - (st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT - and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT)) - except OSError: - return False -else: - def _rmtree_isdir(entry): - try: - return entry.is_dir(follow_symlinks=False) - except OSError: - return False - - def _rmtree_islink(path): - return os.path.islink(path) - +def copytree(src, dst, symlinks=False, ignore=None, copy_function=copy2, + ignore_dangling_symlinks=False, dirs_exist_ok=False): + """Recursively copy a directory tree and return the destination directory. + + dirs_exist_ok dictates whether to raise an exception in case dst or any + missing parent directory already exists. + + If exception(s) occur, an Error is raised with a list of reasons. + + If the optional symlinks flag is true, symbolic links in the + source tree result in symbolic links in the destination tree; if + it is false, the contents of the files pointed to by symbolic + links are copied. If the file pointed by the symlink doesn't + exist, an exception will be added in the list of errors raised in + an Error exception at the end of the copy process. + + You can set the optional ignore_dangling_symlinks flag to true if you + want to silence this exception. Notice that this has no effect on + platforms that don't support os.symlink. + + The optional ignore argument is a callable. If given, it + is called with the `src` parameter, which is the directory + being visited by copytree(), and `names` which is the list of + `src` contents, as returned by os.listdir(): + + callable(src, names) -> ignored_names + + Since copytree() is called recursively, the callable will be + called once for each directory that is copied. It returns a + list of names relative to the `src` directory that should + not be copied. + + The optional copy_function argument is a callable that will be used + to copy each file. It will be called with the source path and the + destination path as arguments. By default, copy2() is used, but any + function that supports the same signature (like copy()) can be used. + + """ + sys.audit("shutil.copytree", src, dst) + with os.scandir(src) as itr: + entries = list(itr) + return _copytree(entries=entries, src=src, dst=dst, symlinks=symlinks, + ignore=ignore, copy_function=copy_function, + ignore_dangling_symlinks=ignore_dangling_symlinks, + dirs_exist_ok=dirs_exist_ok) + +if hasattr(os.stat_result, 'st_file_attributes'): + # Special handling for directory junctions to make them behave like + # symlinks for shutil.rmtree, since in general they do not appear as + # regular links. + def _rmtree_isdir(entry): + try: + st = entry.stat(follow_symlinks=False) + return (stat.S_ISDIR(st.st_mode) and not + (st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT + and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT)) + except OSError: + return False + + def _rmtree_islink(path): + try: + st = os.lstat(path) + return (stat.S_ISLNK(st.st_mode) or + (st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT + and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT)) + except OSError: + return False +else: + def _rmtree_isdir(entry): + try: + return entry.is_dir(follow_symlinks=False) + except OSError: + return False + + def _rmtree_islink(path): + return os.path.islink(path) + # version vulnerable to race conditions def _rmtree_unsafe(path, onerror): try: @@ -609,7 +609,7 @@ def _rmtree_unsafe(path, onerror): entries = [] for entry in entries: fullname = entry.path - if _rmtree_isdir(entry): + if _rmtree_isdir(entry): try: if entry.is_symlink(): # This can only happen if someone replaces @@ -645,14 +645,14 @@ def _rmtree_safe_fd(topfd, path, onerror): is_dir = entry.is_dir(follow_symlinks=False) except OSError: is_dir = False - else: - if is_dir: - try: - orig_st = entry.stat(follow_symlinks=False) - is_dir = stat.S_ISDIR(orig_st.st_mode) - except OSError: - onerror(os.lstat, fullname, sys.exc_info()) - continue + else: + if is_dir: + try: + orig_st = entry.stat(follow_symlinks=False) + is_dir = stat.S_ISDIR(orig_st.st_mode) + except OSError: + onerror(os.lstat, fullname, sys.exc_info()) + continue if is_dir: try: dirfd = os.open(entry.name, os.O_RDONLY, dir_fd=topfd) @@ -699,7 +699,7 @@ def rmtree(path, ignore_errors=False, onerror=None): is false and onerror is None, an exception is raised. """ - sys.audit("shutil.rmtree", path) + sys.audit("shutil.rmtree", path) if ignore_errors: def onerror(*args): pass @@ -720,7 +720,7 @@ def rmtree(path, ignore_errors=False, onerror=None): try: fd = os.open(path, os.O_RDONLY) except Exception: - onerror(os.open, path, sys.exc_info()) + onerror(os.open, path, sys.exc_info()) return try: if os.path.samestat(orig_st, os.fstat(fd)): @@ -739,7 +739,7 @@ def rmtree(path, ignore_errors=False, onerror=None): os.close(fd) else: try: - if _rmtree_islink(path): + if _rmtree_islink(path): # symlinks to directories are forbidden, see bug #1669 raise OSError("Cannot call rmtree on a symbolic link") except OSError: @@ -753,20 +753,20 @@ def rmtree(path, ignore_errors=False, onerror=None): rmtree.avoids_symlink_attacks = _use_fd_functions def _basename(path): - """A basename() variant which first strips the trailing slash, if present. - Thus we always get the last component of the path, even for directories. - - path: Union[PathLike, str] - - e.g. - >>> os.path.basename('/bar/foo') - 'foo' - >>> os.path.basename('/bar/foo/') - '' - >>> _basename('/bar/foo/') - 'foo' - """ - path = os.fspath(path) + """A basename() variant which first strips the trailing slash, if present. + Thus we always get the last component of the path, even for directories. + + path: Union[PathLike, str] + + e.g. + >>> os.path.basename('/bar/foo') + 'foo' + >>> os.path.basename('/bar/foo/') + '' + >>> _basename('/bar/foo/') + 'foo' + """ + path = os.fspath(path) sep = os.path.sep + (os.path.altsep or '') return os.path.basename(path.rstrip(sep)) @@ -796,7 +796,7 @@ def move(src, dst, copy_function=copy2): the issues this implementation glosses over. """ - sys.audit("shutil.move", src, dst) + sys.audit("shutil.move", src, dst) real_dst = dst if os.path.isdir(dst): if _samefile(src, dst): @@ -805,10 +805,10 @@ def move(src, dst, copy_function=copy2): os.rename(src, dst) return - # Using _basename instead of os.path.basename is important, as we must - # ignore any trailing slash to avoid the basename returning '' + # Using _basename instead of os.path.basename is important, as we must + # ignore any trailing slash to avoid the basename returning '' real_dst = os.path.join(dst, _basename(src)) - + if os.path.exists(real_dst): raise Error("Destination path '%s' already exists" % real_dst) try: @@ -822,12 +822,12 @@ def move(src, dst, copy_function=copy2): if _destinsrc(src, dst): raise Error("Cannot move a directory '%s' into itself" " '%s'." % (src, dst)) - if (_is_immutable(src) - or (not os.access(src, os.W_OK) and os.listdir(src) - and sys.platform == 'darwin')): - raise PermissionError("Cannot move the non-empty directory " - "'%s': Lacking write permission to '%s'." - % (src, src)) + if (_is_immutable(src) + or (not os.access(src, os.W_OK) and os.listdir(src) + and sys.platform == 'darwin')): + raise PermissionError("Cannot move the non-empty directory " + "'%s': Lacking write permission to '%s'." + % (src, src)) copytree(src, real_dst, copy_function=copy_function, symlinks=True) rmtree(src) @@ -845,11 +845,11 @@ def _destinsrc(src, dst): dst += os.path.sep return dst.startswith(src) -def _is_immutable(src): - st = _stat(src) - immutable_states = [stat.UF_IMMUTABLE, stat.SF_IMMUTABLE] - return hasattr(st, 'st_flags') and st.st_flags in immutable_states - +def _is_immutable(src): + st = _stat(src) + immutable_states = [stat.UF_IMMUTABLE, stat.SF_IMMUTABLE] + return hasattr(st, 'st_flags') and st.st_flags in immutable_states + def _get_gid(name): """Returns a gid, given a group name.""" if getgrnam is None or name is None: @@ -1052,7 +1052,7 @@ def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0, 'owner' and 'group' are used when creating a tar archive. By default, uses the current owner and group. """ - sys.audit("shutil.make_archive", base_name, format, root_dir, base_dir) + sys.audit("shutil.make_archive", base_name, format, root_dir, base_dir) save_cwd = os.getcwd() if root_dir is not None: if logger is not None: @@ -1168,16 +1168,16 @@ def _unpack_zipfile(filename, extract_dir): if name.startswith('/') or '..' in name: continue - targetpath = os.path.join(extract_dir, *name.split('/')) - if not targetpath: + targetpath = os.path.join(extract_dir, *name.split('/')) + if not targetpath: continue - _ensure_directory(targetpath) + _ensure_directory(targetpath) if not name.endswith('/'): # file - with zip.open(name, 'r') as source, \ - open(targetpath, 'wb') as target: - copyfileobj(source, target) + with zip.open(name, 'r') as source, \ + open(targetpath, 'wb') as target: + copyfileobj(source, target) finally: zip.close() @@ -1234,8 +1234,8 @@ def unpack_archive(filename, extract_dir=None, format=None): In case none is found, a ValueError is raised. """ - sys.audit("shutil.unpack_archive", filename, extract_dir, format) - + sys.audit("shutil.unpack_archive", filename, extract_dir, format) + if extract_dir is None: extract_dir = os.getcwd() @@ -1281,7 +1281,7 @@ if hasattr(os, 'statvfs'): used = (st.f_blocks - st.f_bfree) * st.f_frsize return _ntuple_diskusage(total, used, free) -elif _WINDOWS: +elif _WINDOWS: __all__.append('disk_usage') _ntuple_diskusage = collections.namedtuple('usage', 'total used free') @@ -1303,7 +1303,7 @@ def chown(path, user=None, group=None): user and group can be the uid/gid or the user/group names, and in that case, they are converted to their respective uid/gid. """ - sys.audit('shutil.chown', path, user, group) + sys.audit('shutil.chown', path, user, group) if user is None and group is None: raise ValueError("user and/or group must be set") @@ -1374,15 +1374,15 @@ def get_terminal_size(fallback=(80, 24)): return os.terminal_size((columns, lines)) - -# Check that a given file can be accessed with the correct mode. -# Additionally check that `file` is not a directory, as on Windows -# directories pass the os.access check. -def _access_check(fn, mode): - return (os.path.exists(fn) and os.access(fn, mode) - and not os.path.isdir(fn)) - - + +# Check that a given file can be accessed with the correct mode. +# Additionally check that `file` is not a directory, as on Windows +# directories pass the os.access check. +def _access_check(fn, mode): + return (os.path.exists(fn) and os.access(fn, mode) + and not os.path.isdir(fn)) + + def which(cmd, mode=os.F_OK | os.X_OK, path=None): """Given a command, mode, and a PATH string, return the path which conforms to the given mode on the PATH, or None if there is no such @@ -1401,44 +1401,44 @@ def which(cmd, mode=os.F_OK | os.X_OK, path=None): return cmd return None - use_bytes = isinstance(cmd, bytes) - + use_bytes = isinstance(cmd, bytes) + if path is None: - path = os.environ.get("PATH", None) - if path is None: - try: - path = os.confstr("CS_PATH") - except (AttributeError, ValueError): - # os.confstr() or CS_PATH is not available - path = os.defpath - # bpo-35755: Don't use os.defpath if the PATH environment variable is - # set to an empty string - - # PATH='' doesn't match, whereas PATH=':' looks in the current directory + path = os.environ.get("PATH", None) + if path is None: + try: + path = os.confstr("CS_PATH") + except (AttributeError, ValueError): + # os.confstr() or CS_PATH is not available + path = os.defpath + # bpo-35755: Don't use os.defpath if the PATH environment variable is + # set to an empty string + + # PATH='' doesn't match, whereas PATH=':' looks in the current directory if not path: return None - if use_bytes: - path = os.fsencode(path) - path = path.split(os.fsencode(os.pathsep)) - else: - path = os.fsdecode(path) - path = path.split(os.pathsep) - + if use_bytes: + path = os.fsencode(path) + path = path.split(os.fsencode(os.pathsep)) + else: + path = os.fsdecode(path) + path = path.split(os.pathsep) + if sys.platform == "win32": # The current directory takes precedence on Windows. - curdir = os.curdir - if use_bytes: - curdir = os.fsencode(curdir) - if curdir not in path: - path.insert(0, curdir) + curdir = os.curdir + if use_bytes: + curdir = os.fsencode(curdir) + if curdir not in path: + path.insert(0, curdir) # PATHEXT is necessary to check on Windows. - pathext_source = os.getenv("PATHEXT") or _WIN_DEFAULT_PATHEXT - pathext = [ext for ext in pathext_source.split(os.pathsep) if ext] - - if use_bytes: - pathext = [os.fsencode(ext) for ext in pathext] + pathext_source = os.getenv("PATHEXT") or _WIN_DEFAULT_PATHEXT + pathext = [ext for ext in pathext_source.split(os.pathsep) if ext] + + if use_bytes: + pathext = [os.fsencode(ext) for ext in pathext] # See if the given file matches any of the expected path extensions. # This will allow us to short circuit when given "python.exe". # If it does match, only test that one, otherwise we have to try |