aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/tools/python3/src/Lib/shutil.py
diff options
context:
space:
mode:
authorshadchin <shadchin@yandex-team.com>2024-02-12 07:53:52 +0300
committerDaniil Cherednik <dcherednik@ydb.tech>2024-02-14 14:26:16 +0000
commit31f2a419764a8ba77c2a970cfc80056c6cd06756 (patch)
treec1995d239eba8571cefc640f6648e1d5dd4ce9e2 /contrib/tools/python3/src/Lib/shutil.py
parentfe2ef02b38d9c85d80060963b265a1df9f38c3bb (diff)
downloadydb-31f2a419764a8ba77c2a970cfc80056c6cd06756.tar.gz
Update Python from 3.11.8 to 3.12.2
Diffstat (limited to 'contrib/tools/python3/src/Lib/shutil.py')
-rw-r--r--contrib/tools/python3/src/Lib/shutil.py299
1 files changed, 173 insertions, 126 deletions
diff --git a/contrib/tools/python3/src/Lib/shutil.py b/contrib/tools/python3/src/Lib/shutil.py
index 79b38cf456..96463007d1 100644
--- a/contrib/tools/python3/src/Lib/shutil.py
+++ b/contrib/tools/python3/src/Lib/shutil.py
@@ -10,6 +10,7 @@ import stat
import fnmatch
import collections
import errno
+import warnings
try:
import zlib
@@ -39,6 +40,11 @@ if os.name == 'posix':
elif _WINDOWS:
import nt
+if sys.platform == 'win32':
+ import _winapi
+else:
+ _winapi = None
+
COPY_BUFSIZE = 1024 * 1024 if _WINDOWS else 64 * 1024
# This should never be removed, see rationale in:
# https://bugs.python.org/issue43743#msg393429
@@ -182,7 +188,8 @@ def _copyfileobj_readinto(fsrc, fdst, length=COPY_BUFSIZE):
break
elif n < length:
with mv[:n] as smv:
- fdst.write(smv)
+ fdst_write(smv)
+ break
else:
fdst_write(mv)
@@ -193,10 +200,7 @@ def copyfileobj(fsrc, fdst, length=0):
# Localize variable access to minimize overhead.
fsrc_read = fsrc.read
fdst_write = fdst.write
- while True:
- buf = fsrc_read(length)
- if not buf:
- break
+ while buf := fsrc_read(length):
fdst_write(buf)
def _samefile(src, dst):
@@ -334,7 +338,7 @@ if hasattr(os, 'listxattr'):
os.setxattr(dst, name, value, follow_symlinks=follow_symlinks)
except OSError as e:
if e.errno not in (errno.EPERM, errno.ENOTSUP, errno.ENODATA,
- errno.EINVAL):
+ errno.EINVAL, errno.EACCES):
raise
else:
def _copyxattr(*args, **kwargs):
@@ -445,6 +449,29 @@ def copy2(src, dst, *, follow_symlinks=True):
"""
if os.path.isdir(dst):
dst = os.path.join(dst, os.path.basename(src))
+
+ if hasattr(_winapi, "CopyFile2"):
+ src_ = os.fsdecode(src)
+ dst_ = os.fsdecode(dst)
+ flags = _winapi.COPY_FILE_ALLOW_DECRYPTED_DESTINATION # for compat
+ if not follow_symlinks:
+ flags |= _winapi.COPY_FILE_COPY_SYMLINK
+ try:
+ _winapi.CopyFile2(src_, dst_, flags)
+ return dst
+ except OSError as exc:
+ if (exc.winerror == _winapi.ERROR_PRIVILEGE_NOT_HELD
+ and not follow_symlinks):
+ # Likely encountered a symlink we aren't allowed to create.
+ # Fall back on the old code
+ pass
+ elif exc.winerror == _winapi.ERROR_ACCESS_DENIED:
+ # Possibly encountered a hidden or readonly file we can't
+ # overwrite. Fall back on old code
+ pass
+ else:
+ raise
+
copyfile(src, dst, follow_symlinks=follow_symlinks)
copystat(src, dst, follow_symlinks=follow_symlinks)
return dst
@@ -576,18 +603,6 @@ def copytree(src, dst, symlinks=False, ignore=None, copy_function=copy2,
dirs_exist_ok=dirs_exist_ok)
if hasattr(os.stat_result, 'st_file_attributes'):
- # Special handling for directory junctions to make them behave like
- # symlinks for shutil.rmtree, since in general they do not appear as
- # regular links.
- def _rmtree_isdir(entry):
- try:
- st = entry.stat(follow_symlinks=False)
- return (stat.S_ISDIR(st.st_mode) and not
- (st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT
- and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT))
- except OSError:
- return False
-
def _rmtree_islink(path):
try:
st = os.lstat(path)
@@ -597,54 +612,53 @@ if hasattr(os.stat_result, 'st_file_attributes'):
except OSError:
return False
else:
- def _rmtree_isdir(entry):
- try:
- return entry.is_dir(follow_symlinks=False)
- except OSError:
- return False
-
def _rmtree_islink(path):
return os.path.islink(path)
# version vulnerable to race conditions
-def _rmtree_unsafe(path, onerror):
+def _rmtree_unsafe(path, onexc):
try:
with os.scandir(path) as scandir_it:
entries = list(scandir_it)
- except OSError:
- onerror(os.scandir, path, sys.exc_info())
+ except OSError as err:
+ onexc(os.scandir, path, err)
entries = []
for entry in entries:
fullname = entry.path
- if _rmtree_isdir(entry):
+ try:
+ is_dir = entry.is_dir(follow_symlinks=False)
+ except OSError:
+ is_dir = False
+
+ if is_dir and not entry.is_junction():
try:
if entry.is_symlink():
# This can only happen if someone replaces
# a directory with a symlink after the call to
# os.scandir or entry.is_dir above.
raise OSError("Cannot call rmtree on a symbolic link")
- except OSError:
- onerror(os.path.islink, fullname, sys.exc_info())
+ except OSError as err:
+ onexc(os.path.islink, fullname, err)
continue
- _rmtree_unsafe(fullname, onerror)
+ _rmtree_unsafe(fullname, onexc)
else:
try:
os.unlink(fullname)
- except OSError:
- onerror(os.unlink, fullname, sys.exc_info())
+ except OSError as err:
+ onexc(os.unlink, fullname, err)
try:
os.rmdir(path)
- except OSError:
- onerror(os.rmdir, path, sys.exc_info())
+ except OSError as err:
+ onexc(os.rmdir, path, err)
# Version using fd-based APIs to protect against races
-def _rmtree_safe_fd(topfd, path, onerror):
+def _rmtree_safe_fd(topfd, path, onexc):
try:
with os.scandir(topfd) as scandir_it:
entries = list(scandir_it)
except OSError as err:
err.filename = path
- onerror(os.scandir, path, sys.exc_info())
+ onexc(os.scandir, path, err)
return
for entry in entries:
fullname = os.path.join(path, entry.name)
@@ -657,30 +671,30 @@ def _rmtree_safe_fd(topfd, path, onerror):
try:
orig_st = entry.stat(follow_symlinks=False)
is_dir = stat.S_ISDIR(orig_st.st_mode)
- except OSError:
- onerror(os.lstat, fullname, sys.exc_info())
+ except OSError as err:
+ onexc(os.lstat, fullname, err)
continue
if is_dir:
try:
dirfd = os.open(entry.name, os.O_RDONLY, dir_fd=topfd)
dirfd_closed = False
- except OSError:
- onerror(os.open, fullname, sys.exc_info())
+ except OSError as err:
+ onexc(os.open, fullname, err)
else:
try:
if os.path.samestat(orig_st, os.fstat(dirfd)):
- _rmtree_safe_fd(dirfd, fullname, onerror)
+ _rmtree_safe_fd(dirfd, fullname, onexc)
try:
os.close(dirfd)
- except OSError:
+ except OSError as err:
# close() should not be retried after an error.
dirfd_closed = True
- onerror(os.close, fullname, sys.exc_info())
+ onexc(os.close, fullname, err)
dirfd_closed = True
try:
os.rmdir(entry.name, dir_fd=topfd)
- except OSError:
- onerror(os.rmdir, fullname, sys.exc_info())
+ except OSError as err:
+ onexc(os.rmdir, fullname, err)
else:
try:
# This can only happen if someone replaces
@@ -688,26 +702,26 @@ def _rmtree_safe_fd(topfd, path, onerror):
# os.scandir or stat.S_ISDIR above.
raise OSError("Cannot call rmtree on a symbolic "
"link")
- except OSError:
- onerror(os.path.islink, fullname, sys.exc_info())
+ except OSError as err:
+ onexc(os.path.islink, fullname, err)
finally:
if not dirfd_closed:
try:
os.close(dirfd)
- except OSError:
- onerror(os.close, fullname, sys.exc_info())
+ except OSError as err:
+ onexc(os.close, fullname, err)
else:
try:
os.unlink(entry.name, dir_fd=topfd)
- except OSError:
- onerror(os.unlink, fullname, sys.exc_info())
+ except OSError as err:
+ onexc(os.unlink, fullname, err)
_use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
os.supports_dir_fd and
os.scandir in os.supports_fd and
os.stat in os.supports_follow_symlinks)
-def rmtree(path, ignore_errors=False, onerror=None, *, dir_fd=None):
+def rmtree(path, ignore_errors=False, onerror=None, *, onexc=None, dir_fd=None):
"""Recursively delete a directory tree.
If dir_fd is not None, it should be a file descriptor open to a directory;
@@ -715,21 +729,40 @@ def rmtree(path, ignore_errors=False, onerror=None, *, dir_fd=None):
dir_fd may not be implemented on your platform.
If it is unavailable, using it will raise a NotImplementedError.
- If ignore_errors is set, errors are ignored; otherwise, if onerror
- is set, it is called to handle the error with arguments (func,
+ If ignore_errors is set, errors are ignored; otherwise, if onexc or
+ onerror is set, it is called to handle the error with arguments (func,
path, exc_info) where func is platform and implementation dependent;
path is the argument to that function that caused it to fail; and
- exc_info is a tuple returned by sys.exc_info(). If ignore_errors
- is false and onerror is None, an exception is raised.
+ the value of exc_info describes the exception. For onexc it is the
+ exception instance, and for onerror it is a tuple as returned by
+ sys.exc_info(). If ignore_errors is false and both onexc and
+ onerror are None, the exception is reraised.
+ onerror is deprecated and only remains for backwards compatibility.
+ If both onerror and onexc are set, onerror is ignored and onexc is used.
"""
+
sys.audit("shutil.rmtree", path, dir_fd)
if ignore_errors:
- def onerror(*args):
+ def onexc(*args):
pass
- elif onerror is None:
- def onerror(*args):
+ elif onerror is None and onexc is None:
+ def onexc(*args):
raise
+ elif onexc is None:
+ if onerror is None:
+ def onexc(*args):
+ raise
+ else:
+ # delegate to onerror
+ def onexc(*args):
+ func, path, exc = args
+ if exc is None:
+ exc_info = None, None, None
+ else:
+ exc_info = type(exc), exc, exc.__traceback__
+ return onerror(func, path, exc_info)
+
if _use_fd_functions:
# While the unsafe rmtree works fine on bytes, the fd based does not.
if isinstance(path, bytes):
@@ -738,41 +771,41 @@ def rmtree(path, ignore_errors=False, onerror=None, *, dir_fd=None):
# lstat()/open()/fstat() trick.
try:
orig_st = os.lstat(path, dir_fd=dir_fd)
- except Exception:
- onerror(os.lstat, path, sys.exc_info())
+ except Exception as err:
+ onexc(os.lstat, path, err)
return
try:
fd = os.open(path, os.O_RDONLY, dir_fd=dir_fd)
fd_closed = False
- except Exception:
- onerror(os.open, path, sys.exc_info())
+ except Exception as err:
+ onexc(os.open, path, err)
return
try:
if os.path.samestat(orig_st, os.fstat(fd)):
- _rmtree_safe_fd(fd, path, onerror)
+ _rmtree_safe_fd(fd, path, onexc)
try:
os.close(fd)
- except OSError:
+ except OSError as err:
# close() should not be retried after an error.
fd_closed = True
- onerror(os.close, path, sys.exc_info())
+ onexc(os.close, path, err)
fd_closed = True
try:
os.rmdir(path, dir_fd=dir_fd)
- except OSError:
- onerror(os.rmdir, path, sys.exc_info())
+ except OSError as err:
+ onexc(os.rmdir, path, err)
else:
try:
# symlinks to directories are forbidden, see bug #1669
raise OSError("Cannot call rmtree on a symbolic link")
- except OSError:
- onerror(os.path.islink, path, sys.exc_info())
+ except OSError as err:
+ onexc(os.path.islink, path, err)
finally:
if not fd_closed:
try:
os.close(fd)
- except OSError:
- onerror(os.close, path, sys.exc_info())
+ except OSError as err:
+ onexc(os.close, path, err)
else:
if dir_fd is not None:
raise NotImplementedError("dir_fd unavailable on this platform")
@@ -780,11 +813,11 @@ def rmtree(path, ignore_errors=False, onerror=None, *, dir_fd=None):
if _rmtree_islink(path):
# symlinks to directories are forbidden, see bug #1669
raise OSError("Cannot call rmtree on a symbolic link")
- except OSError:
- onerror(os.path.islink, path, sys.exc_info())
- # can't continue even if onerror hook returns
+ except OSError as err:
+ onexc(os.path.islink, path, err)
+ # can't continue even if onexc hook returns
return
- return _rmtree_unsafe(path, onerror)
+ return _rmtree_unsafe(path, onexc)
# Allow introspection of whether or not the hardening against symlink
# attacks is supported on the current platform
@@ -1051,28 +1084,30 @@ def _make_zipfile(base_name, base_dir, verbose=0, dry_run=0,
zip_filename = os.path.abspath(zip_filename)
return zip_filename
+_make_tarball.supports_root_dir = True
+_make_zipfile.supports_root_dir = True
+
# Maps the name of the archive format to a tuple containing:
# * the archiving function
# * extra keyword arguments
# * description
-# * does it support the root_dir argument?
_ARCHIVE_FORMATS = {
'tar': (_make_tarball, [('compress', None)],
- "uncompressed tar file", True),
+ "uncompressed tar file"),
}
if _ZLIB_SUPPORTED:
_ARCHIVE_FORMATS['gztar'] = (_make_tarball, [('compress', 'gzip')],
- "gzip'ed tar-file", True)
- _ARCHIVE_FORMATS['zip'] = (_make_zipfile, [], "ZIP file", True)
+ "gzip'ed tar-file")
+ _ARCHIVE_FORMATS['zip'] = (_make_zipfile, [], "ZIP file")
if _BZ2_SUPPORTED:
_ARCHIVE_FORMATS['bztar'] = (_make_tarball, [('compress', 'bzip2')],
- "bzip2'ed tar-file", True)
+ "bzip2'ed tar-file")
if _LZMA_SUPPORTED:
_ARCHIVE_FORMATS['xztar'] = (_make_tarball, [('compress', 'xz')],
- "xz'ed tar-file", True)
+ "xz'ed tar-file")
def get_archive_formats():
"""Returns a list of supported formats for archiving and unarchiving.
@@ -1103,7 +1138,7 @@ def register_archive_format(name, function, extra_args=None, description=''):
if not isinstance(element, (tuple, list)) or len(element) !=2:
raise TypeError('extra_args elements are : (arg_name, value)')
- _ARCHIVE_FORMATS[name] = (function, extra_args, description, False)
+ _ARCHIVE_FORMATS[name] = (function, extra_args, description)
def unregister_archive_format(name):
del _ARCHIVE_FORMATS[name]
@@ -1142,7 +1177,7 @@ def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0,
if base_dir is None:
base_dir = os.curdir
- supports_root_dir = format_info[3]
+ supports_root_dir = getattr(func, 'supports_root_dir', False)
save_cwd = None
if root_dir is not None:
stmd = os.stat(root_dir).st_mode
@@ -1477,6 +1512,16 @@ def _access_check(fn, mode):
and not os.path.isdir(fn))
+def _win_path_needs_curdir(cmd, mode):
+ """
+ On Windows, we can use NeedCurrentDirectoryForExePath to figure out
+ if we should add the cwd to PATH when searching for executables if
+ the mode is executable.
+ """
+ return (not (mode & os.X_OK)) or _winapi.NeedCurrentDirectoryForExePath(
+ os.fsdecode(cmd))
+
+
def which(cmd, mode=os.F_OK | os.X_OK, path=None):
"""Given a command, mode, and a PATH string, return the path which
conforms to the given mode on the PATH, or None if there is no such
@@ -1487,60 +1532,62 @@ def which(cmd, mode=os.F_OK | os.X_OK, path=None):
path.
"""
- # If we're given a path with a directory part, look it up directly rather
- # than referring to PATH directories. This includes checking relative to the
- # current directory, e.g. ./script
- if os.path.dirname(cmd):
- if _access_check(cmd, mode):
- return cmd
- return None
-
use_bytes = isinstance(cmd, bytes)
- if path is None:
- path = os.environ.get("PATH", None)
- if path is None:
- try:
- path = os.confstr("CS_PATH")
- except (AttributeError, ValueError):
- # os.confstr() or CS_PATH is not available
- path = os.defpath
- # bpo-35755: Don't use os.defpath if the PATH environment variable is
- # set to an empty string
-
- # PATH='' doesn't match, whereas PATH=':' looks in the current directory
- if not path:
- return None
-
- if use_bytes:
- path = os.fsencode(path)
- path = path.split(os.fsencode(os.pathsep))
+ # If we're given a path with a directory part, look it up directly rather
+ # than referring to PATH directories. This includes checking relative to
+ # the current directory, e.g. ./script
+ dirname, cmd = os.path.split(cmd)
+ if dirname:
+ path = [dirname]
else:
- path = os.fsdecode(path)
- path = path.split(os.pathsep)
+ if path is None:
+ path = os.environ.get("PATH", None)
+ if path is None:
+ try:
+ path = os.confstr("CS_PATH")
+ except (AttributeError, ValueError):
+ # os.confstr() or CS_PATH is not available
+ path = os.defpath
+ # bpo-35755: Don't use os.defpath if the PATH environment variable
+ # is set to an empty string
+
+ # PATH='' doesn't match, whereas PATH=':' looks in the current
+ # directory
+ if not path:
+ return None
- if sys.platform == "win32":
- # The current directory takes precedence on Windows.
- curdir = os.curdir
if use_bytes:
- curdir = os.fsencode(curdir)
- if curdir not in path:
+ path = os.fsencode(path)
+ path = path.split(os.fsencode(os.pathsep))
+ else:
+ path = os.fsdecode(path)
+ path = path.split(os.pathsep)
+
+ if sys.platform == "win32" and _win_path_needs_curdir(cmd, mode):
+ curdir = os.curdir
+ if use_bytes:
+ curdir = os.fsencode(curdir)
path.insert(0, curdir)
+ if sys.platform == "win32":
# PATHEXT is necessary to check on Windows.
pathext_source = os.getenv("PATHEXT") or _WIN_DEFAULT_PATHEXT
pathext = [ext for ext in pathext_source.split(os.pathsep) if ext]
if use_bytes:
pathext = [os.fsencode(ext) for ext in pathext]
- # See if the given file matches any of the expected path extensions.
- # This will allow us to short circuit when given "python.exe".
- # If it does match, only test that one, otherwise we have to try
- # others.
- if any(cmd.lower().endswith(ext.lower()) for ext in pathext):
- files = [cmd]
- else:
- files = [cmd + ext for ext in pathext]
+
+ files = ([cmd] + [cmd + ext for ext in pathext])
+
+ # gh-109590. If we are looking for an executable, we need to look
+ # for a PATHEXT match. The first cmd is the direct match
+ # (e.g. python.exe instead of python)
+ # Check that direct match first if and only if the extension is in PATHEXT
+ # Otherwise check it last
+ suffix = os.path.splitext(files[0])[1].upper()
+ if mode & os.X_OK and not any(suffix == ext.upper() for ext in pathext):
+ files.append(files.pop(0))
else:
# On other platforms you don't have things like PATHEXT to tell you
# what file suffixes are executable, so just pass on cmd as-is.