aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/tools/python3/src/Lib/os.py
diff options
context:
space:
mode:
authornkozlovskiy <nmk@ydb.tech>2023-09-29 12:24:06 +0300
committernkozlovskiy <nmk@ydb.tech>2023-09-29 12:41:34 +0300
commite0e3e1717e3d33762ce61950504f9637a6e669ed (patch)
treebca3ff6939b10ed60c3d5c12439963a1146b9711 /contrib/tools/python3/src/Lib/os.py
parent38f2c5852db84c7b4d83adfcb009eb61541d1ccd (diff)
downloadydb-e0e3e1717e3d33762ce61950504f9637a6e669ed.tar.gz
add ydb deps
Diffstat (limited to 'contrib/tools/python3/src/Lib/os.py')
-rw-r--r--contrib/tools/python3/src/Lib/os.py1124
1 files changed, 1124 insertions, 0 deletions
diff --git a/contrib/tools/python3/src/Lib/os.py b/contrib/tools/python3/src/Lib/os.py
new file mode 100644
index 0000000000..fd1e774fdc
--- /dev/null
+++ b/contrib/tools/python3/src/Lib/os.py
@@ -0,0 +1,1124 @@
+r"""OS routines for NT or Posix depending on what system we're on.
+
+This exports:
+ - all functions from posix or nt, e.g. unlink, stat, etc.
+ - os.path is either posixpath or ntpath
+ - os.name is either 'posix' or 'nt'
+ - os.curdir is a string representing the current directory (always '.')
+ - os.pardir is a string representing the parent directory (always '..')
+ - os.sep is the (or a most common) pathname separator ('/' or '\\')
+ - os.extsep is the extension separator (always '.')
+ - os.altsep is the alternate pathname separator (None or '/')
+ - os.pathsep is the component separator used in $PATH etc
+ - os.linesep is the line separator in text files ('\r' or '\n' or '\r\n')
+ - os.defpath is the default search path for executables
+ - os.devnull is the file path of the null device ('/dev/null', etc.)
+
+Programs that import and use 'os' stand a better chance of being
+portable between different platforms. Of course, they must then
+only use functions that are defined by all platforms (e.g., unlink
+and opendir), and leave all pathname manipulation to os.path
+(e.g., split and join).
+"""
+
+#'
+import abc
+import sys
+import stat as st
+
+from _collections_abc import _check_methods
+
+GenericAlias = type(list[int])
+
+_names = sys.builtin_module_names
+
+# Note: more names are added to __all__ later.
+__all__ = ["altsep", "curdir", "pardir", "sep", "pathsep", "linesep",
+ "defpath", "name", "path", "devnull", "SEEK_SET", "SEEK_CUR",
+ "SEEK_END", "fsencode", "fsdecode", "get_exec_path", "fdopen",
+ "extsep"]
+
+def _exists(name):
+ return name in globals()
+
+def _get_exports_list(module):
+ try:
+ return list(module.__all__)
+ except AttributeError:
+ return [n for n in dir(module) if n[0] != '_']
+
+# Any new dependencies of the os module and/or changes in path separator
+# requires updating importlib as well.
+if 'posix' in _names:
+ name = 'posix'
+ linesep = '\n'
+ from posix import *
+ try:
+ from posix import _exit
+ __all__.append('_exit')
+ except ImportError:
+ pass
+ import posixpath as path
+
+ try:
+ from posix import _have_functions
+ except ImportError:
+ pass
+
+ import posix
+ __all__.extend(_get_exports_list(posix))
+ del posix
+
+elif 'nt' in _names:
+ name = 'nt'
+ linesep = '\r\n'
+ from nt import *
+ try:
+ from nt import _exit
+ __all__.append('_exit')
+ except ImportError:
+ pass
+ import ntpath as path
+
+ import nt
+ __all__.extend(_get_exports_list(nt))
+ del nt
+
+ try:
+ from nt import _have_functions
+ except ImportError:
+ pass
+
+else:
+ raise ImportError('no os specific module found')
+
+sys.modules['os.path'] = path
+from os.path import (curdir, pardir, sep, pathsep, defpath, extsep, altsep,
+ devnull)
+
+del _names
+
+
+if _exists("_have_functions"):
+ _globals = globals()
+ def _add(str, fn):
+ if (fn in _globals) and (str in _have_functions):
+ _set.add(_globals[fn])
+
+ _set = set()
+ _add("HAVE_FACCESSAT", "access")
+ _add("HAVE_FCHMODAT", "chmod")
+ _add("HAVE_FCHOWNAT", "chown")
+ _add("HAVE_FSTATAT", "stat")
+ _add("HAVE_FUTIMESAT", "utime")
+ _add("HAVE_LINKAT", "link")
+ _add("HAVE_MKDIRAT", "mkdir")
+ _add("HAVE_MKFIFOAT", "mkfifo")
+ _add("HAVE_MKNODAT", "mknod")
+ _add("HAVE_OPENAT", "open")
+ _add("HAVE_READLINKAT", "readlink")
+ _add("HAVE_RENAMEAT", "rename")
+ _add("HAVE_SYMLINKAT", "symlink")
+ _add("HAVE_UNLINKAT", "unlink")
+ _add("HAVE_UNLINKAT", "rmdir")
+ _add("HAVE_UTIMENSAT", "utime")
+ supports_dir_fd = _set
+
+ _set = set()
+ _add("HAVE_FACCESSAT", "access")
+ supports_effective_ids = _set
+
+ _set = set()
+ _add("HAVE_FCHDIR", "chdir")
+ _add("HAVE_FCHMOD", "chmod")
+ _add("HAVE_FCHOWN", "chown")
+ _add("HAVE_FDOPENDIR", "listdir")
+ _add("HAVE_FDOPENDIR", "scandir")
+ _add("HAVE_FEXECVE", "execve")
+ _set.add(stat) # fstat always works
+ _add("HAVE_FTRUNCATE", "truncate")
+ _add("HAVE_FUTIMENS", "utime")
+ _add("HAVE_FUTIMES", "utime")
+ _add("HAVE_FPATHCONF", "pathconf")
+ if _exists("statvfs") and _exists("fstatvfs"): # mac os x10.3
+ _add("HAVE_FSTATVFS", "statvfs")
+ supports_fd = _set
+
+ _set = set()
+ _add("HAVE_FACCESSAT", "access")
+ # Some platforms don't support lchmod(). Often the function exists
+ # anyway, as a stub that always returns ENOSUP or perhaps EOPNOTSUPP.
+ # (No, I don't know why that's a good design.) ./configure will detect
+ # this and reject it--so HAVE_LCHMOD still won't be defined on such
+ # platforms. This is Very Helpful.
+ #
+ # However, sometimes platforms without a working lchmod() *do* have
+ # fchmodat(). (Examples: Linux kernel 3.2 with glibc 2.15,
+ # OpenIndiana 3.x.) And fchmodat() has a flag that theoretically makes
+ # it behave like lchmod(). So in theory it would be a suitable
+ # replacement for lchmod(). But when lchmod() doesn't work, fchmodat()'s
+ # flag doesn't work *either*. Sadly ./configure isn't sophisticated
+ # enough to detect this condition--it only determines whether or not
+ # fchmodat() minimally works.
+ #
+ # Therefore we simply ignore fchmodat() when deciding whether or not
+ # os.chmod supports follow_symlinks. Just checking lchmod() is
+ # sufficient. After all--if you have a working fchmodat(), your
+ # lchmod() almost certainly works too.
+ #
+ # _add("HAVE_FCHMODAT", "chmod")
+ _add("HAVE_FCHOWNAT", "chown")
+ _add("HAVE_FSTATAT", "stat")
+ _add("HAVE_LCHFLAGS", "chflags")
+ _add("HAVE_LCHMOD", "chmod")
+ if _exists("lchown"): # mac os x10.3
+ _add("HAVE_LCHOWN", "chown")
+ _add("HAVE_LINKAT", "link")
+ _add("HAVE_LUTIMES", "utime")
+ _add("HAVE_LSTAT", "stat")
+ _add("HAVE_FSTATAT", "stat")
+ _add("HAVE_UTIMENSAT", "utime")
+ _add("MS_WINDOWS", "stat")
+ supports_follow_symlinks = _set
+
+ del _set
+ del _have_functions
+ del _globals
+ del _add
+
+
+# Python uses fixed values for the SEEK_ constants; they are mapped
+# to native constants if necessary in posixmodule.c
+# Other possible SEEK values are directly imported from posixmodule.c
+SEEK_SET = 0
+SEEK_CUR = 1
+SEEK_END = 2
+
+# Super directory utilities.
+# (Inspired by Eric Raymond; the doc strings are mostly his)
+
+def makedirs(name, mode=0o777, exist_ok=False):
+ """makedirs(name [, mode=0o777][, exist_ok=False])
+
+ Super-mkdir; create a leaf directory and all intermediate ones. Works like
+ mkdir, except that any intermediate path segment (not just the rightmost)
+ will be created if it does not exist. If the target directory already
+ exists, raise an OSError if exist_ok is False. Otherwise no exception is
+ raised. This is recursive.
+
+ """
+ head, tail = path.split(name)
+ if not tail:
+ head, tail = path.split(head)
+ if head and tail and not path.exists(head):
+ try:
+ makedirs(head, exist_ok=exist_ok)
+ except FileExistsError:
+ # Defeats race condition when another thread created the path
+ pass
+ cdir = curdir
+ if isinstance(tail, bytes):
+ cdir = bytes(curdir, 'ASCII')
+ if tail == cdir: # xxx/newdir/. exists if xxx/newdir exists
+ return
+ try:
+ mkdir(name, mode)
+ except OSError:
+ # Cannot rely on checking for EEXIST, since the operating system
+ # could give priority to other errors like EACCES or EROFS
+ if not exist_ok or not path.isdir(name):
+ raise
+
+def removedirs(name):
+ """removedirs(name)
+
+ Super-rmdir; remove a leaf directory and all empty intermediate
+ ones. Works like rmdir except that, if the leaf directory is
+ successfully removed, directories corresponding to rightmost path
+ segments will be pruned away until either the whole path is
+ consumed or an error occurs. Errors during this latter phase are
+ ignored -- they generally mean that a directory was not empty.
+
+ """
+ rmdir(name)
+ head, tail = path.split(name)
+ if not tail:
+ head, tail = path.split(head)
+ while head and tail:
+ try:
+ rmdir(head)
+ except OSError:
+ break
+ head, tail = path.split(head)
+
+def renames(old, new):
+ """renames(old, new)
+
+ Super-rename; create directories as necessary and delete any left
+ empty. Works like rename, except creation of any intermediate
+ directories needed to make the new pathname good is attempted
+ first. After the rename, directories corresponding to rightmost
+ path segments of the old name will be pruned until either the
+ whole path is consumed or a nonempty directory is found.
+
+ Note: this function can fail with the new directory structure made
+ if you lack permissions needed to unlink the leaf directory or
+ file.
+
+ """
+ head, tail = path.split(new)
+ if head and tail and not path.exists(head):
+ makedirs(head)
+ rename(old, new)
+ head, tail = path.split(old)
+ if head and tail:
+ try:
+ removedirs(head)
+ except OSError:
+ pass
+
+__all__.extend(["makedirs", "removedirs", "renames"])
+
+def walk(top, topdown=True, onerror=None, followlinks=False):
+ """Directory tree generator.
+
+ For each directory in the directory tree rooted at top (including top
+ itself, but excluding '.' and '..'), yields a 3-tuple
+
+ dirpath, dirnames, filenames
+
+ dirpath is a string, the path to the directory. dirnames is a list of
+ the names of the subdirectories in dirpath (including symlinks to directories,
+ and excluding '.' and '..').
+ filenames is a list of the names of the non-directory files in dirpath.
+ Note that the names in the lists are just names, with no path components.
+ To get a full path (which begins with top) to a file or directory in
+ dirpath, do os.path.join(dirpath, name).
+
+ If optional arg 'topdown' is true or not specified, the triple for a
+ directory is generated before the triples for any of its subdirectories
+ (directories are generated top down). If topdown is false, the triple
+ for a directory is generated after the triples for all of its
+ subdirectories (directories are generated bottom up).
+
+ When topdown is true, the caller can modify the dirnames list in-place
+ (e.g., via del or slice assignment), and walk will only recurse into the
+ subdirectories whose names remain in dirnames; this can be used to prune the
+ search, or to impose a specific order of visiting. Modifying dirnames when
+ topdown is false has no effect on the behavior of os.walk(), since the
+ directories in dirnames have already been generated by the time dirnames
+ itself is generated. No matter the value of topdown, the list of
+ subdirectories is retrieved before the tuples for the directory and its
+ subdirectories are generated.
+
+ By default errors from the os.scandir() call are ignored. If
+ optional arg 'onerror' is specified, it should be a function; it
+ will be called with one argument, an OSError instance. It can
+ report the error to continue with the walk, or raise the exception
+ to abort the walk. Note that the filename is available as the
+ filename attribute of the exception object.
+
+ By default, os.walk does not follow symbolic links to subdirectories on
+ systems that support them. In order to get this functionality, set the
+ optional argument 'followlinks' to true.
+
+ Caution: if you pass a relative pathname for top, don't change the
+ current working directory between resumptions of walk. walk never
+ changes the current directory, and assumes that the client doesn't
+ either.
+
+ Example:
+
+ import os
+ from os.path import join, getsize
+ for root, dirs, files in os.walk('python/Lib/email'):
+ print(root, "consumes ")
+ print(sum(getsize(join(root, name)) for name in files), end=" ")
+ print("bytes in", len(files), "non-directory files")
+ if 'CVS' in dirs:
+ dirs.remove('CVS') # don't visit CVS directories
+
+ """
+ sys.audit("os.walk", top, topdown, onerror, followlinks)
+ return _walk(fspath(top), topdown, onerror, followlinks)
+
+def _walk(top, topdown, onerror, followlinks):
+ dirs = []
+ nondirs = []
+ walk_dirs = []
+
+ # We may not have read permission for top, in which case we can't
+ # get a list of the files the directory contains. os.walk
+ # always suppressed the exception then, rather than blow up for a
+ # minor reason when (say) a thousand readable directories are still
+ # left to visit. That logic is copied here.
+ try:
+ # Note that scandir is global in this module due
+ # to earlier import-*.
+ scandir_it = scandir(top)
+ except OSError as error:
+ if onerror is not None:
+ onerror(error)
+ return
+
+ with scandir_it:
+ while True:
+ try:
+ try:
+ entry = next(scandir_it)
+ except StopIteration:
+ break
+ except OSError as error:
+ if onerror is not None:
+ onerror(error)
+ return
+
+ try:
+ is_dir = entry.is_dir()
+ except OSError:
+ # If is_dir() raises an OSError, consider that the entry is not
+ # a directory, same behaviour than os.path.isdir().
+ is_dir = False
+
+ if is_dir:
+ dirs.append(entry.name)
+ else:
+ nondirs.append(entry.name)
+
+ if not topdown and is_dir:
+ # Bottom-up: recurse into sub-directory, but exclude symlinks to
+ # directories if followlinks is False
+ if followlinks:
+ walk_into = True
+ else:
+ try:
+ is_symlink = entry.is_symlink()
+ except OSError:
+ # If is_symlink() raises an OSError, consider that the
+ # entry is not a symbolic link, same behaviour than
+ # os.path.islink().
+ is_symlink = False
+ walk_into = not is_symlink
+
+ if walk_into:
+ walk_dirs.append(entry.path)
+
+ # Yield before recursion if going top down
+ if topdown:
+ yield top, dirs, nondirs
+
+ # Recurse into sub-directories
+ islink, join = path.islink, path.join
+ for dirname in dirs:
+ new_path = join(top, dirname)
+ # Issue #23605: os.path.islink() is used instead of caching
+ # entry.is_symlink() result during the loop on os.scandir() because
+ # the caller can replace the directory entry during the "yield"
+ # above.
+ if followlinks or not islink(new_path):
+ yield from _walk(new_path, topdown, onerror, followlinks)
+ else:
+ # Recurse into sub-directories
+ for new_path in walk_dirs:
+ yield from _walk(new_path, topdown, onerror, followlinks)
+ # Yield after recursion if going bottom up
+ yield top, dirs, nondirs
+
+__all__.append("walk")
+
+if {open, stat} <= supports_dir_fd and {scandir, stat} <= supports_fd:
+
+ def fwalk(top=".", topdown=True, onerror=None, *, follow_symlinks=False, dir_fd=None):
+ """Directory tree generator.
+
+ This behaves exactly like walk(), except that it yields a 4-tuple
+
+ dirpath, dirnames, filenames, dirfd
+
+ `dirpath`, `dirnames` and `filenames` are identical to walk() output,
+ and `dirfd` is a file descriptor referring to the directory `dirpath`.
+
+ The advantage of fwalk() over walk() is that it's safe against symlink
+ races (when follow_symlinks is False).
+
+ If dir_fd is not None, it should be a file descriptor open to a directory,
+ and top should be relative; top will then be relative to that directory.
+ (dir_fd is always supported for fwalk.)
+
+ Caution:
+ Since fwalk() yields file descriptors, those are only valid until the
+ next iteration step, so you should dup() them if you want to keep them
+ for a longer period.
+
+ Example:
+
+ import os
+ for root, dirs, files, rootfd in os.fwalk('python/Lib/email'):
+ print(root, "consumes", end="")
+ print(sum(os.stat(name, dir_fd=rootfd).st_size for name in files),
+ end="")
+ print("bytes in", len(files), "non-directory files")
+ if 'CVS' in dirs:
+ dirs.remove('CVS') # don't visit CVS directories
+ """
+ sys.audit("os.fwalk", top, topdown, onerror, follow_symlinks, dir_fd)
+ top = fspath(top)
+ # Note: To guard against symlink races, we use the standard
+ # lstat()/open()/fstat() trick.
+ if not follow_symlinks:
+ orig_st = stat(top, follow_symlinks=False, dir_fd=dir_fd)
+ topfd = open(top, O_RDONLY, dir_fd=dir_fd)
+ try:
+ if (follow_symlinks or (st.S_ISDIR(orig_st.st_mode) and
+ path.samestat(orig_st, stat(topfd)))):
+ yield from _fwalk(topfd, top, isinstance(top, bytes),
+ topdown, onerror, follow_symlinks)
+ finally:
+ close(topfd)
+
+ def _fwalk(topfd, toppath, isbytes, topdown, onerror, follow_symlinks):
+ # Note: This uses O(depth of the directory tree) file descriptors: if
+ # necessary, it can be adapted to only require O(1) FDs, see issue
+ # #13734.
+
+ scandir_it = scandir(topfd)
+ dirs = []
+ nondirs = []
+ entries = None if topdown or follow_symlinks else []
+ for entry in scandir_it:
+ name = entry.name
+ if isbytes:
+ name = fsencode(name)
+ try:
+ if entry.is_dir():
+ dirs.append(name)
+ if entries is not None:
+ entries.append(entry)
+ else:
+ nondirs.append(name)
+ except OSError:
+ try:
+ # Add dangling symlinks, ignore disappeared files
+ if entry.is_symlink():
+ nondirs.append(name)
+ except OSError:
+ pass
+
+ if topdown:
+ yield toppath, dirs, nondirs, topfd
+
+ for name in dirs if entries is None else zip(dirs, entries):
+ try:
+ if not follow_symlinks:
+ if topdown:
+ orig_st = stat(name, dir_fd=topfd, follow_symlinks=False)
+ else:
+ assert entries is not None
+ name, entry = name
+ orig_st = entry.stat(follow_symlinks=False)
+ dirfd = open(name, O_RDONLY, dir_fd=topfd)
+ except OSError as err:
+ if onerror is not None:
+ onerror(err)
+ continue
+ try:
+ if follow_symlinks or path.samestat(orig_st, stat(dirfd)):
+ dirpath = path.join(toppath, name)
+ yield from _fwalk(dirfd, dirpath, isbytes,
+ topdown, onerror, follow_symlinks)
+ finally:
+ close(dirfd)
+
+ if not topdown:
+ yield toppath, dirs, nondirs, topfd
+
+ __all__.append("fwalk")
+
+def execl(file, *args):
+ """execl(file, *args)
+
+ Execute the executable file with argument list args, replacing the
+ current process. """
+ execv(file, args)
+
+def execle(file, *args):
+ """execle(file, *args, env)
+
+ Execute the executable file with argument list args and
+ environment env, replacing the current process. """
+ env = args[-1]
+ execve(file, args[:-1], env)
+
+def execlp(file, *args):
+ """execlp(file, *args)
+
+ Execute the executable file (which is searched for along $PATH)
+ with argument list args, replacing the current process. """
+ execvp(file, args)
+
+def execlpe(file, *args):
+ """execlpe(file, *args, env)
+
+ Execute the executable file (which is searched for along $PATH)
+ with argument list args and environment env, replacing the current
+ process. """
+ env = args[-1]
+ execvpe(file, args[:-1], env)
+
+def execvp(file, args):
+ """execvp(file, args)
+
+ Execute the executable file (which is searched for along $PATH)
+ with argument list args, replacing the current process.
+ args may be a list or tuple of strings. """
+ _execvpe(file, args)
+
+def execvpe(file, args, env):
+ """execvpe(file, args, env)
+
+ Execute the executable file (which is searched for along $PATH)
+ with argument list args and environment env, replacing the
+ current process.
+ args may be a list or tuple of strings. """
+ _execvpe(file, args, env)
+
+__all__.extend(["execl","execle","execlp","execlpe","execvp","execvpe"])
+
+def _execvpe(file, args, env=None):
+ if env is not None:
+ exec_func = execve
+ argrest = (args, env)
+ else:
+ exec_func = execv
+ argrest = (args,)
+ env = environ
+
+ if path.dirname(file):
+ exec_func(file, *argrest)
+ return
+ saved_exc = None
+ path_list = get_exec_path(env)
+ if name != 'nt':
+ file = fsencode(file)
+ path_list = map(fsencode, path_list)
+ for dir in path_list:
+ fullname = path.join(dir, file)
+ try:
+ exec_func(fullname, *argrest)
+ except (FileNotFoundError, NotADirectoryError) as e:
+ last_exc = e
+ except OSError as e:
+ last_exc = e
+ if saved_exc is None:
+ saved_exc = e
+ if saved_exc is not None:
+ raise saved_exc
+ raise last_exc
+
+
+def get_exec_path(env=None):
+ """Returns the sequence of directories that will be searched for the
+ named executable (similar to a shell) when launching a process.
+
+ *env* must be an environment variable dict or None. If *env* is None,
+ os.environ will be used.
+ """
+ # Use a local import instead of a global import to limit the number of
+ # modules loaded at startup: the os module is always loaded at startup by
+ # Python. It may also avoid a bootstrap issue.
+ import warnings
+
+ if env is None:
+ env = environ
+
+ # {b'PATH': ...}.get('PATH') and {'PATH': ...}.get(b'PATH') emit a
+ # BytesWarning when using python -b or python -bb: ignore the warning
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", BytesWarning)
+
+ try:
+ path_list = env.get('PATH')
+ except TypeError:
+ path_list = None
+
+ if supports_bytes_environ:
+ try:
+ path_listb = env[b'PATH']
+ except (KeyError, TypeError):
+ pass
+ else:
+ if path_list is not None:
+ raise ValueError(
+ "env cannot contain 'PATH' and b'PATH' keys")
+ path_list = path_listb
+
+ if path_list is not None and isinstance(path_list, bytes):
+ path_list = fsdecode(path_list)
+
+ if path_list is None:
+ path_list = defpath
+ return path_list.split(pathsep)
+
+
+# Change environ to automatically call putenv() and unsetenv()
+from _collections_abc import MutableMapping, Mapping
+
+class _Environ(MutableMapping):
+ def __init__(self, data, encodekey, decodekey, encodevalue, decodevalue):
+ self.encodekey = encodekey
+ self.decodekey = decodekey
+ self.encodevalue = encodevalue
+ self.decodevalue = decodevalue
+ self._data = data
+
+ def __getitem__(self, key):
+ try:
+ value = self._data[self.encodekey(key)]
+ except KeyError:
+ # raise KeyError with the original key value
+ raise KeyError(key) from None
+ return self.decodevalue(value)
+
+ def __setitem__(self, key, value):
+ key = self.encodekey(key)
+ value = self.encodevalue(value)
+ putenv(key, value)
+ self._data[key] = value
+
+ def __delitem__(self, key):
+ encodedkey = self.encodekey(key)
+ unsetenv(encodedkey)
+ try:
+ del self._data[encodedkey]
+ except KeyError:
+ # raise KeyError with the original key value
+ raise KeyError(key) from None
+
+ def __iter__(self):
+ # list() from dict object is an atomic operation
+ keys = list(self._data)
+ for key in keys:
+ yield self.decodekey(key)
+
+ def __len__(self):
+ return len(self._data)
+
+ def __repr__(self):
+ formatted_items = ", ".join(
+ f"{self.decodekey(key)!r}: {self.decodevalue(value)!r}"
+ for key, value in self._data.items()
+ )
+ return f"environ({{{formatted_items}}})"
+
+ def copy(self):
+ return dict(self)
+
+ def setdefault(self, key, value):
+ if key not in self:
+ self[key] = value
+ return self[key]
+
+ def __ior__(self, other):
+ self.update(other)
+ return self
+
+ def __or__(self, other):
+ if not isinstance(other, Mapping):
+ return NotImplemented
+ new = dict(self)
+ new.update(other)
+ return new
+
+ def __ror__(self, other):
+ if not isinstance(other, Mapping):
+ return NotImplemented
+ new = dict(other)
+ new.update(self)
+ return new
+
+def _createenviron():
+ if name == 'nt':
+ # Where Env Var Names Must Be UPPERCASE
+ def check_str(value):
+ if not isinstance(value, str):
+ raise TypeError("str expected, not %s" % type(value).__name__)
+ return value
+ encode = check_str
+ decode = str
+ def encodekey(key):
+ return encode(key).upper()
+ data = {}
+ for key, value in environ.items():
+ data[encodekey(key)] = value
+ else:
+ # Where Env Var Names Can Be Mixed Case
+ encoding = sys.getfilesystemencoding()
+ def encode(value):
+ if not isinstance(value, str):
+ raise TypeError("str expected, not %s" % type(value).__name__)
+ return value.encode(encoding, 'surrogateescape')
+ def decode(value):
+ return value.decode(encoding, 'surrogateescape')
+ encodekey = encode
+ data = environ
+ return _Environ(data,
+ encodekey, decode,
+ encode, decode)
+
+# unicode environ
+environ = _createenviron()
+del _createenviron
+
+
+def getenv(key, default=None):
+ """Get an environment variable, return None if it doesn't exist.
+ The optional second argument can specify an alternate default.
+ key, default and the result are str."""
+ return environ.get(key, default)
+
+supports_bytes_environ = (name != 'nt')
+__all__.extend(("getenv", "supports_bytes_environ"))
+
+if supports_bytes_environ:
+ def _check_bytes(value):
+ if not isinstance(value, bytes):
+ raise TypeError("bytes expected, not %s" % type(value).__name__)
+ return value
+
+ # bytes environ
+ environb = _Environ(environ._data,
+ _check_bytes, bytes,
+ _check_bytes, bytes)
+ del _check_bytes
+
+ def getenvb(key, default=None):
+ """Get an environment variable, return None if it doesn't exist.
+ The optional second argument can specify an alternate default.
+ key, default and the result are bytes."""
+ return environb.get(key, default)
+
+ __all__.extend(("environb", "getenvb"))
+
+def _fscodec():
+ encoding = sys.getfilesystemencoding()
+ errors = sys.getfilesystemencodeerrors()
+
+ def fsencode(filename):
+ """Encode filename (an os.PathLike, bytes, or str) to the filesystem
+ encoding with 'surrogateescape' error handler, return bytes unchanged.
+ On Windows, use 'strict' error handler if the file system encoding is
+ 'mbcs' (which is the default encoding).
+ """
+ filename = fspath(filename) # Does type-checking of `filename`.
+ if isinstance(filename, str):
+ return filename.encode(encoding, errors)
+ else:
+ return filename
+
+ def fsdecode(filename):
+ """Decode filename (an os.PathLike, bytes, or str) from the filesystem
+ encoding with 'surrogateescape' error handler, return str unchanged. On
+ Windows, use 'strict' error handler if the file system encoding is
+ 'mbcs' (which is the default encoding).
+ """
+ filename = fspath(filename) # Does type-checking of `filename`.
+ if isinstance(filename, bytes):
+ return filename.decode(encoding, errors)
+ else:
+ return filename
+
+ return fsencode, fsdecode
+
+fsencode, fsdecode = _fscodec()
+del _fscodec
+
+# Supply spawn*() (probably only for Unix)
+if _exists("fork") and not _exists("spawnv") and _exists("execv"):
+
+ P_WAIT = 0
+ P_NOWAIT = P_NOWAITO = 1
+
+ __all__.extend(["P_WAIT", "P_NOWAIT", "P_NOWAITO"])
+
+ # XXX Should we support P_DETACH? I suppose it could fork()**2
+ # and close the std I/O streams. Also, P_OVERLAY is the same
+ # as execv*()?
+
+ def _spawnvef(mode, file, args, env, func):
+ # Internal helper; func is the exec*() function to use
+ if not isinstance(args, (tuple, list)):
+ raise TypeError('argv must be a tuple or a list')
+ if not args or not args[0]:
+ raise ValueError('argv first element cannot be empty')
+ pid = fork()
+ if not pid:
+ # Child
+ try:
+ if env is None:
+ func(file, args)
+ else:
+ func(file, args, env)
+ except:
+ _exit(127)
+ else:
+ # Parent
+ if mode == P_NOWAIT:
+ return pid # Caller is responsible for waiting!
+ while 1:
+ wpid, sts = waitpid(pid, 0)
+ if WIFSTOPPED(sts):
+ continue
+
+ return waitstatus_to_exitcode(sts)
+
+ def spawnv(mode, file, args):
+ """spawnv(mode, file, args) -> integer
+
+Execute file with arguments from args in a subprocess.
+If mode == P_NOWAIT return the pid of the process.
+If mode == P_WAIT return the process's exit code if it exits normally;
+otherwise return -SIG, where SIG is the signal that killed it. """
+ return _spawnvef(mode, file, args, None, execv)
+
+ def spawnve(mode, file, args, env):
+ """spawnve(mode, file, args, env) -> integer
+
+Execute file with arguments from args in a subprocess with the
+specified environment.
+If mode == P_NOWAIT return the pid of the process.
+If mode == P_WAIT return the process's exit code if it exits normally;
+otherwise return -SIG, where SIG is the signal that killed it. """
+ return _spawnvef(mode, file, args, env, execve)
+
+ # Note: spawnvp[e] isn't currently supported on Windows
+
+ def spawnvp(mode, file, args):
+ """spawnvp(mode, file, args) -> integer
+
+Execute file (which is looked for along $PATH) with arguments from
+args in a subprocess.
+If mode == P_NOWAIT return the pid of the process.
+If mode == P_WAIT return the process's exit code if it exits normally;
+otherwise return -SIG, where SIG is the signal that killed it. """
+ return _spawnvef(mode, file, args, None, execvp)
+
+ def spawnvpe(mode, file, args, env):
+ """spawnvpe(mode, file, args, env) -> integer
+
+Execute file (which is looked for along $PATH) with arguments from
+args in a subprocess with the supplied environment.
+If mode == P_NOWAIT return the pid of the process.
+If mode == P_WAIT return the process's exit code if it exits normally;
+otherwise return -SIG, where SIG is the signal that killed it. """
+ return _spawnvef(mode, file, args, env, execvpe)
+
+
+ __all__.extend(["spawnv", "spawnve", "spawnvp", "spawnvpe"])
+
+
+if _exists("spawnv"):
+ # These aren't supplied by the basic Windows code
+ # but can be easily implemented in Python
+
+ def spawnl(mode, file, *args):
+ """spawnl(mode, file, *args) -> integer
+
+Execute file with arguments from args in a subprocess.
+If mode == P_NOWAIT return the pid of the process.
+If mode == P_WAIT return the process's exit code if it exits normally;
+otherwise return -SIG, where SIG is the signal that killed it. """
+ return spawnv(mode, file, args)
+
+ def spawnle(mode, file, *args):
+ """spawnle(mode, file, *args, env) -> integer
+
+Execute file with arguments from args in a subprocess with the
+supplied environment.
+If mode == P_NOWAIT return the pid of the process.
+If mode == P_WAIT return the process's exit code if it exits normally;
+otherwise return -SIG, where SIG is the signal that killed it. """
+ env = args[-1]
+ return spawnve(mode, file, args[:-1], env)
+
+
+ __all__.extend(["spawnl", "spawnle"])
+
+
+if _exists("spawnvp"):
+ # At the moment, Windows doesn't implement spawnvp[e],
+ # so it won't have spawnlp[e] either.
+ def spawnlp(mode, file, *args):
+ """spawnlp(mode, file, *args) -> integer
+
+Execute file (which is looked for along $PATH) with arguments from
+args in a subprocess with the supplied environment.
+If mode == P_NOWAIT return the pid of the process.
+If mode == P_WAIT return the process's exit code if it exits normally;
+otherwise return -SIG, where SIG is the signal that killed it. """
+ return spawnvp(mode, file, args)
+
+ def spawnlpe(mode, file, *args):
+ """spawnlpe(mode, file, *args, env) -> integer
+
+Execute file (which is looked for along $PATH) with arguments from
+args in a subprocess with the supplied environment.
+If mode == P_NOWAIT return the pid of the process.
+If mode == P_WAIT return the process's exit code if it exits normally;
+otherwise return -SIG, where SIG is the signal that killed it. """
+ env = args[-1]
+ return spawnvpe(mode, file, args[:-1], env)
+
+
+ __all__.extend(["spawnlp", "spawnlpe"])
+
+# VxWorks has no user space shell provided. As a result, running
+# command in a shell can't be supported.
+if sys.platform != 'vxworks':
+ # Supply os.popen()
+ def popen(cmd, mode="r", buffering=-1):
+ if not isinstance(cmd, str):
+ raise TypeError("invalid cmd type (%s, expected string)" % type(cmd))
+ if mode not in ("r", "w"):
+ raise ValueError("invalid mode %r" % mode)
+ if buffering == 0 or buffering is None:
+ raise ValueError("popen() does not support unbuffered streams")
+ import subprocess
+ if mode == "r":
+ proc = subprocess.Popen(cmd,
+ shell=True, text=True,
+ stdout=subprocess.PIPE,
+ bufsize=buffering)
+ return _wrap_close(proc.stdout, proc)
+ else:
+ proc = subprocess.Popen(cmd,
+ shell=True, text=True,
+ stdin=subprocess.PIPE,
+ bufsize=buffering)
+ return _wrap_close(proc.stdin, proc)
+
+ # Helper for popen() -- a proxy for a file whose close waits for the process
+ class _wrap_close:
+ def __init__(self, stream, proc):
+ self._stream = stream
+ self._proc = proc
+ def close(self):
+ self._stream.close()
+ returncode = self._proc.wait()
+ if returncode == 0:
+ return None
+ if name == 'nt':
+ return returncode
+ else:
+ return returncode << 8 # Shift left to match old behavior
+ def __enter__(self):
+ return self
+ def __exit__(self, *args):
+ self.close()
+ def __getattr__(self, name):
+ return getattr(self._stream, name)
+ def __iter__(self):
+ return iter(self._stream)
+
+ __all__.append("popen")
+
+# Supply os.fdopen()
+def fdopen(fd, mode="r", buffering=-1, encoding=None, *args, **kwargs):
+ if not isinstance(fd, int):
+ raise TypeError("invalid fd type (%s, expected integer)" % type(fd))
+ import io
+ if "b" not in mode:
+ encoding = io.text_encoding(encoding)
+ return io.open(fd, mode, buffering, encoding, *args, **kwargs)
+
+
+# For testing purposes, make sure the function is available when the C
+# implementation exists.
+def _fspath(path):
+ """Return the path representation of a path-like object.
+
+ If str or bytes is passed in, it is returned unchanged. Otherwise the
+ os.PathLike interface is used to get the path representation. If the
+ path representation is not str or bytes, TypeError is raised. If the
+ provided path is not str, bytes, or os.PathLike, TypeError is raised.
+ """
+ if isinstance(path, (str, bytes)):
+ return path
+
+ # Work from the object's type to match method resolution of other magic
+ # methods.
+ path_type = type(path)
+ try:
+ path_repr = path_type.__fspath__(path)
+ except AttributeError:
+ if hasattr(path_type, '__fspath__'):
+ raise
+ else:
+ raise TypeError("expected str, bytes or os.PathLike object, "
+ "not " + path_type.__name__)
+ if isinstance(path_repr, (str, bytes)):
+ return path_repr
+ else:
+ raise TypeError("expected {}.__fspath__() to return str or bytes, "
+ "not {}".format(path_type.__name__,
+ type(path_repr).__name__))
+
+# If there is no C implementation, make the pure Python version the
+# implementation as transparently as possible.
+if not _exists('fspath'):
+ fspath = _fspath
+ fspath.__name__ = "fspath"
+
+
+class PathLike(abc.ABC):
+
+ """Abstract base class for implementing the file system path protocol."""
+
+ @abc.abstractmethod
+ def __fspath__(self):
+ """Return the file system path representation of the object."""
+ raise NotImplementedError
+
+ @classmethod
+ def __subclasshook__(cls, subclass):
+ if cls is PathLike:
+ return _check_methods(subclass, '__fspath__')
+ return NotImplemented
+
+ __class_getitem__ = classmethod(GenericAlias)
+
+
+if name == 'nt':
+ class _AddedDllDirectory:
+ def __init__(self, path, cookie, remove_dll_directory):
+ self.path = path
+ self._cookie = cookie
+ self._remove_dll_directory = remove_dll_directory
+ def close(self):
+ self._remove_dll_directory(self._cookie)
+ self.path = None
+ def __enter__(self):
+ return self
+ def __exit__(self, *args):
+ self.close()
+ def __repr__(self):
+ if self.path:
+ return "<AddedDllDirectory({!r})>".format(self.path)
+ return "<AddedDllDirectory()>"
+
+ def add_dll_directory(path):
+ """Add a path to the DLL search path.
+
+ This search path is used when resolving dependencies for imported
+ extension modules (the module itself is resolved through sys.path),
+ and also by ctypes.
+
+ Remove the directory by calling close() on the returned object or
+ using it in a with statement.
+ """
+ import nt
+ cookie = nt._add_dll_directory(path)
+ return _AddedDllDirectory(
+ path,
+ cookie,
+ nt._remove_dll_directory
+ )