diff options
author | deshevoy <deshevoy@yandex-team.ru> | 2022-02-10 16:46:57 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:46:57 +0300 |
commit | 28148f76dbfcc644d96427d41c92f36cbf2fdc6e (patch) | |
tree | b83306b6e37edeea782e9eed673d89286c4fef35 /contrib/python/pytest/py3/_pytest/pathlib.py | |
parent | e988f30484abe5fdeedcc7a5d3c226c01a21800c (diff) | |
download | ydb-28148f76dbfcc644d96427d41c92f36cbf2fdc6e.tar.gz |
Restoring authorship annotation for <deshevoy@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'contrib/python/pytest/py3/_pytest/pathlib.py')
-rw-r--r-- | contrib/python/pytest/py3/_pytest/pathlib.py | 384 |
1 files changed, 192 insertions, 192 deletions
diff --git a/contrib/python/pytest/py3/_pytest/pathlib.py b/contrib/python/pytest/py3/_pytest/pathlib.py index 02ba612d320..7d9269a1855 100644 --- a/contrib/python/pytest/py3/_pytest/pathlib.py +++ b/contrib/python/pytest/py3/_pytest/pathlib.py @@ -1,12 +1,12 @@ -import atexit +import atexit import contextlib -import fnmatch +import fnmatch import importlib.util -import itertools -import os -import shutil -import sys -import uuid +import itertools +import os +import shutil +import sys +import uuid import warnings from enum import Enum from errno import EBADF @@ -14,13 +14,13 @@ from errno import ELOOP from errno import ENOENT from errno import ENOTDIR from functools import partial -from os.path import expanduser -from os.path import expandvars -from os.path import isabs -from os.path import sep +from os.path import expanduser +from os.path import expandvars +from os.path import isabs +from os.path import sep from pathlib import Path from pathlib import PurePath -from posixpath import sep as posix_sep +from posixpath import sep as posix_sep from types import ModuleType from typing import Callable from typing import Iterable @@ -29,29 +29,29 @@ from typing import Optional from typing import Set from typing import TypeVar from typing import Union - + import py from _pytest.compat import assert_never from _pytest.outcomes import skip from _pytest.warning_types import PytestWarning - + LOCK_TIMEOUT = 60 * 60 * 24 * 3 - - + + _AnyPurePath = TypeVar("_AnyPurePath", bound=PurePath) - + # The following function, variables and comments were # copied from cpython 3.9 Lib/pathlib.py file. - + # EBADF - guard against macOS `stat` throwing EBADF _IGNORED_ERRORS = (ENOENT, ENOTDIR, EBADF, ELOOP) - + _IGNORED_WINERRORS = ( 21, # ERROR_NOT_READY - drive exists but is not accessible 1921, # ERROR_CANT_RESOLVE_FILENAME - fix for broken symlink pointing to itself ) - + def _ignore_error(exception): return ( @@ -66,11 +66,11 @@ def get_lock_path(path: _AnyPurePath) -> _AnyPurePath: def on_rm_rf_error(func, path: str, exc, *, start_path: Path) -> bool: """Handle known read-only errors during rmtree. - + The returned value is used only by our own tests. """ exctype, excvalue = exc[:2] - + # Another process removed the file in the middle of the "rm_rf" (xdist for example). # More context: https://github.com/pytest-dev/pytest/issues/5974#issuecomment-543799018 if isinstance(excvalue, FileNotFoundError): @@ -155,147 +155,147 @@ def rm_rf(path: Path) -> None: def find_prefixed(root: Path, prefix: str) -> Iterator[Path]: """Find all elements in root that begin with the prefix, case insensitive.""" - l_prefix = prefix.lower() - for x in root.iterdir(): - if x.name.lower().startswith(l_prefix): - yield x - - + l_prefix = prefix.lower() + for x in root.iterdir(): + if x.name.lower().startswith(l_prefix): + yield x + + def extract_suffixes(iter: Iterable[PurePath], prefix: str) -> Iterator[str]: """Return the parts of the paths following the prefix. :param iter: Iterator over path names. :param prefix: Expected prefix of the path names. - """ - p_len = len(prefix) - for p in iter: - yield p.name[p_len:] - - + """ + p_len = len(prefix) + for p in iter: + yield p.name[p_len:] + + def find_suffixes(root: Path, prefix: str) -> Iterator[str]: """Combine find_prefixes and extract_suffixes.""" - return extract_suffixes(find_prefixed(root, prefix), prefix) - - + return extract_suffixes(find_prefixed(root, prefix), prefix) + + def parse_num(maybe_num) -> int: """Parse number path suffixes, returns -1 on error.""" - try: - return int(maybe_num) - except ValueError: - return -1 - - + try: + return int(maybe_num) + except ValueError: + return -1 + + def _force_symlink( root: Path, target: Union[str, PurePath], link_to: Union[str, Path] ) -> None: """Helper to create the current symlink. - + It's full of race conditions that are reasonably OK to ignore for the context of best effort linking to the latest test run. - + The presumption being that in case of much parallelism the inaccuracy is going to be acceptable. - """ - current_symlink = root.joinpath(target) - try: - current_symlink.unlink() - except OSError: - pass - try: - current_symlink.symlink_to(link_to) - except Exception: - pass - - + """ + current_symlink = root.joinpath(target) + try: + current_symlink.unlink() + except OSError: + pass + try: + current_symlink.symlink_to(link_to) + except Exception: + pass + + def make_numbered_dir(root: Path, prefix: str, mode: int = 0o700) -> Path: """Create a directory with an increased number as suffix for the given prefix.""" - for i in range(10): - # try up to 10 times to create the folder + for i in range(10): + # try up to 10 times to create the folder max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1) - new_number = max_existing + 1 + new_number = max_existing + 1 new_path = root.joinpath(f"{prefix}{new_number}") - try: + try: new_path.mkdir(mode=mode) - except Exception: - pass - else: - _force_symlink(root, prefix + "current", new_path) - return new_path - else: + except Exception: + pass + else: + _force_symlink(root, prefix + "current", new_path) + return new_path + else: raise OSError( - "could not create numbered dir with prefix " - "{prefix} in {root} after 10 tries".format(prefix=prefix, root=root) - ) - - + "could not create numbered dir with prefix " + "{prefix} in {root} after 10 tries".format(prefix=prefix, root=root) + ) + + def create_cleanup_lock(p: Path) -> Path: """Create a lock to prevent premature folder cleanup.""" - lock_path = get_lock_path(p) - try: - fd = os.open(str(lock_path), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644) + lock_path = get_lock_path(p) + try: + fd = os.open(str(lock_path), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644) except FileExistsError as e: raise OSError(f"cannot create lockfile in {p}") from e - else: - pid = os.getpid() + else: + pid = os.getpid() spid = str(pid).encode() - os.write(fd, spid) - os.close(fd) - if not lock_path.is_file(): + os.write(fd, spid) + os.close(fd) + if not lock_path.is_file(): raise OSError("lock path got renamed after successful creation") - return lock_path - - + return lock_path + + def register_cleanup_lock_removal(lock_path: Path, register=atexit.register): """Register a cleanup function for removing a lock, by default on atexit.""" - pid = os.getpid() - + pid = os.getpid() + def cleanup_on_exit(lock_path: Path = lock_path, original_pid: int = pid) -> None: - current_pid = os.getpid() - if current_pid != original_pid: - # fork - return - try: - lock_path.unlink() + current_pid = os.getpid() + if current_pid != original_pid: + # fork + return + try: + lock_path.unlink() except OSError: - pass - - return register(cleanup_on_exit) - - + pass + + return register(cleanup_on_exit) + + def maybe_delete_a_numbered_dir(path: Path) -> None: """Remove a numbered directory if its lock can be obtained and it does not seem to be in use.""" path = ensure_extended_length_path(path) - lock_path = None - try: - lock_path = create_cleanup_lock(path) - parent = path.parent - + lock_path = None + try: + lock_path = create_cleanup_lock(path) + parent = path.parent + garbage = parent.joinpath(f"garbage-{uuid.uuid4()}") - path.rename(garbage) + path.rename(garbage) rm_rf(garbage) except OSError: - # known races: - # * other process did a cleanup at the same time - # * deletable folder was found - # * process cwd (Windows) - return - finally: + # known races: + # * other process did a cleanup at the same time + # * deletable folder was found + # * process cwd (Windows) + return + finally: # If we created the lock, ensure we remove it even if we failed # to properly remove the numbered dir. - if lock_path is not None: - try: - lock_path.unlink() + if lock_path is not None: + try: + lock_path.unlink() except OSError: - pass - - + pass + + def ensure_deletable(path: Path, consider_lock_dead_if_created_before: float) -> bool: """Check if `path` is deletable based on whether the lock file is expired.""" - if path.is_symlink(): - return False - lock = get_lock_path(path) - try: + if path.is_symlink(): + return False + lock = get_lock_path(path) + try: if not lock.is_file(): return True except OSError: @@ -303,11 +303,11 @@ def ensure_deletable(path: Path, consider_lock_dead_if_created_before: float) -> # we don't have access to the entire directory (#7491). return False try: - lock_time = lock.stat().st_mtime - except Exception: - return False - else: - if lock_time < consider_lock_dead_if_created_before: + lock_time = lock.stat().st_mtime + except Exception: + return False + else: + if lock_time < consider_lock_dead_if_created_before: # We want to ignore any errors while trying to remove the lock such as: # - PermissionDenied, like the file permissions have changed since the lock creation; # - FileNotFoundError, in case another pytest process got here first; @@ -316,50 +316,50 @@ def ensure_deletable(path: Path, consider_lock_dead_if_created_before: float) -> lock.unlink() return True return False - - + + def try_cleanup(path: Path, consider_lock_dead_if_created_before: float) -> None: """Try to cleanup a folder if we can ensure it's deletable.""" - if ensure_deletable(path, consider_lock_dead_if_created_before): - maybe_delete_a_numbered_dir(path) - - + if ensure_deletable(path, consider_lock_dead_if_created_before): + maybe_delete_a_numbered_dir(path) + + def cleanup_candidates(root: Path, prefix: str, keep: int) -> Iterator[Path]: """List candidates for numbered directories to be removed - follows py.path.""" max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1) - max_delete = max_existing - keep - paths = find_prefixed(root, prefix) - paths, paths2 = itertools.tee(paths) - numbers = map(parse_num, extract_suffixes(paths2, prefix)) - for path, number in zip(paths, numbers): - if number <= max_delete: - yield path - - + max_delete = max_existing - keep + paths = find_prefixed(root, prefix) + paths, paths2 = itertools.tee(paths) + numbers = map(parse_num, extract_suffixes(paths2, prefix)) + for path, number in zip(paths, numbers): + if number <= max_delete: + yield path + + def cleanup_numbered_dir( root: Path, prefix: str, keep: int, consider_lock_dead_if_created_before: float ) -> None: """Cleanup for lock driven numbered directories.""" - for path in cleanup_candidates(root, prefix, keep): - try_cleanup(path, consider_lock_dead_if_created_before) - for path in root.glob("garbage-*"): - try_cleanup(path, consider_lock_dead_if_created_before) - - + for path in cleanup_candidates(root, prefix, keep): + try_cleanup(path, consider_lock_dead_if_created_before) + for path in root.glob("garbage-*"): + try_cleanup(path, consider_lock_dead_if_created_before) + + def make_numbered_dir_with_cleanup( root: Path, prefix: str, keep: int, lock_timeout: float, mode: int, ) -> Path: """Create a numbered dir with a cleanup lock and remove old ones.""" - e = None - for i in range(10): - try: + e = None + for i in range(10): + try: p = make_numbered_dir(root, prefix, mode) - lock_path = create_cleanup_lock(p) - register_cleanup_lock_removal(lock_path) - except Exception as exc: - e = exc - else: - consider_lock_dead_if_created_before = p.stat().st_mtime - lock_timeout + lock_path = create_cleanup_lock(p) + register_cleanup_lock_removal(lock_path) + except Exception as exc: + e = exc + else: + consider_lock_dead_if_created_before = p.stat().st_mtime - lock_timeout # Register a cleanup for program exit atexit.register( cleanup_numbered_dir, @@ -367,60 +367,60 @@ def make_numbered_dir_with_cleanup( prefix, keep, consider_lock_dead_if_created_before, - ) - return p - assert e is not None - raise e - - + ) + return p + assert e is not None + raise e + + def resolve_from_str(input: str, rootpath: Path) -> Path: - input = expanduser(input) - input = expandvars(input) - if isabs(input): - return Path(input) - else: + input = expanduser(input) + input = expandvars(input) + if isabs(input): + return Path(input) + else: return rootpath.joinpath(input) - - + + def fnmatch_ex(pattern: str, path) -> bool: """A port of FNMatcher from py.path.common which works with PurePath() instances. - + The difference between this algorithm and PurePath.match() is that the latter matches "**" glob expressions for each part of the path, while this algorithm uses the whole path instead. - - For example: + + For example: "tests/foo/bar/doc/test_foo.py" matches pattern "tests/**/doc/test*.py" with this algorithm, but not with PurePath.match(). - + This algorithm was ported to keep backward-compatibility with existing settings which assume paths match according this logic. - - References: - * https://bugs.python.org/issue29249 - * https://bugs.python.org/issue34731 - """ - path = PurePath(path) - iswin32 = sys.platform.startswith("win") - - if iswin32 and sep not in pattern and posix_sep in pattern: - # Running on Windows, the pattern has no Windows path separators, - # and the pattern has one or more Posix path separators. Replace - # the Posix path separators with the Windows path separator. - pattern = pattern.replace(posix_sep, sep) - - if sep not in pattern: - name = path.name - else: + + References: + * https://bugs.python.org/issue29249 + * https://bugs.python.org/issue34731 + """ + path = PurePath(path) + iswin32 = sys.platform.startswith("win") + + if iswin32 and sep not in pattern and posix_sep in pattern: + # Running on Windows, the pattern has no Windows path separators, + # and the pattern has one or more Posix path separators. Replace + # the Posix path separators with the Windows path separator. + pattern = pattern.replace(posix_sep, sep) + + if sep not in pattern: + name = path.name + else: name = str(path) if path.is_absolute() and not os.path.isabs(pattern): pattern = f"*{os.sep}{pattern}" - return fnmatch.fnmatch(name, pattern) - - + return fnmatch.fnmatch(name, pattern) + + def parts(s: str) -> Set[str]: - parts = s.split(sep) - return {sep.join(parts[: i + 1]) or sep for i in range(len(parts))} + parts = s.split(sep) + return {sep.join(parts[: i + 1]) or sep for i in range(len(parts))} def symlink_or_skip(src, dst, **kwargs): |