diff options
author | nkozlovskiy <nmk@ydb.tech> | 2023-09-29 12:24:06 +0300 |
---|---|---|
committer | nkozlovskiy <nmk@ydb.tech> | 2023-09-29 12:41:34 +0300 |
commit | e0e3e1717e3d33762ce61950504f9637a6e669ed (patch) | |
tree | bca3ff6939b10ed60c3d5c12439963a1146b9711 /contrib/python/pytest/py2/_pytest/pathlib.py | |
parent | 38f2c5852db84c7b4d83adfcb009eb61541d1ccd (diff) | |
download | ydb-e0e3e1717e3d33762ce61950504f9637a6e669ed.tar.gz |
add ydb deps
Diffstat (limited to 'contrib/python/pytest/py2/_pytest/pathlib.py')
-rw-r--r-- | contrib/python/pytest/py2/_pytest/pathlib.py | 380 |
1 files changed, 380 insertions, 0 deletions
diff --git a/contrib/python/pytest/py2/_pytest/pathlib.py b/contrib/python/pytest/py2/_pytest/pathlib.py new file mode 100644 index 0000000000..42071f4310 --- /dev/null +++ b/contrib/python/pytest/py2/_pytest/pathlib.py @@ -0,0 +1,380 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +import atexit +import errno +import fnmatch +import itertools +import operator +import os +import shutil +import sys +import uuid +import warnings +from functools import partial +from functools import reduce +from os.path import expanduser +from os.path import expandvars +from os.path import isabs +from os.path import sep +from posixpath import sep as posix_sep + +import six +from six.moves import map + +from .compat import PY36 +from _pytest.warning_types import PytestWarning + +if PY36: + from pathlib import Path, PurePath +else: + from pathlib2 import Path, PurePath + +__all__ = ["Path", "PurePath"] + + +LOCK_TIMEOUT = 60 * 60 * 3 + +get_lock_path = operator.methodcaller("joinpath", ".lock") + + +def ensure_reset_dir(path): + """ + ensures the given path is an empty directory + """ + if path.exists(): + rm_rf(path) + path.mkdir() + + +def on_rm_rf_error(func, path, exc, **kwargs): + """Handles known read-only errors during rmtree. + + The returned value is used only by our own tests. + """ + start_path = kwargs["start_path"] + exctype, excvalue = exc[:2] + + # another process removed the file in the middle of the "rm_rf" (xdist for example) + # more context: https://github.com/pytest-dev/pytest/issues/5974#issuecomment-543799018 + if isinstance(excvalue, OSError) and excvalue.errno == errno.ENOENT: + return False + + if not isinstance(excvalue, OSError) or excvalue.errno not in ( + errno.EACCES, + errno.EPERM, + ): + warnings.warn( + PytestWarning( + "(rm_rf) error removing {}\n{}: {}".format(path, exctype, excvalue) + ) + ) + return False + + if func not in (os.rmdir, os.remove, os.unlink): + warnings.warn( + PytestWarning( + "(rm_rf) unknown function {} when removing {}:\n{}: {}".format( + path, func, exctype, excvalue + ) + ) + ) + return False + + # Chmod + retry. + import stat + + def chmod_rw(p): + mode = os.stat(p).st_mode + os.chmod(p, mode | stat.S_IRUSR | stat.S_IWUSR) + + # For files, we need to recursively go upwards in the directories to + # ensure they all are also writable. + p = Path(path) + if p.is_file(): + for parent in p.parents: + chmod_rw(str(parent)) + # stop when we reach the original path passed to rm_rf + if parent == start_path: + break + chmod_rw(str(path)) + + func(path) + return True + + +def rm_rf(path): + """Remove the path contents recursively, even if some elements + are read-only. + """ + onerror = partial(on_rm_rf_error, start_path=path) + shutil.rmtree(str(path), onerror=onerror) + + +def find_prefixed(root, prefix): + """finds all elements in root that begin with the prefix, case insensitive""" + l_prefix = prefix.lower() + for x in root.iterdir(): + if x.name.lower().startswith(l_prefix): + yield x + + +def extract_suffixes(iter, prefix): + """ + :param iter: iterator over path names + :param prefix: expected prefix of the path names + :returns: the parts of the paths following the prefix + """ + p_len = len(prefix) + for p in iter: + yield p.name[p_len:] + + +def find_suffixes(root, prefix): + """combines find_prefixes and extract_suffixes + """ + return extract_suffixes(find_prefixed(root, prefix), prefix) + + +def parse_num(maybe_num): + """parses number path suffixes, returns -1 on error""" + try: + return int(maybe_num) + except ValueError: + return -1 + + +if six.PY2: + + def _max(iterable, default): + """needed due to python2.7 lacking the default argument for max""" + return reduce(max, iterable, default) + + +else: + _max = max + + +def _force_symlink(root, target, link_to): + """helper to create the current symlink + + it's full of race conditions that are reasonably ok to ignore + for the context of best effort linking to the latest testrun + + the presumption being thatin case of much parallelism + the inaccuracy is going to be acceptable + """ + current_symlink = root.joinpath(target) + try: + current_symlink.unlink() + except OSError: + pass + try: + current_symlink.symlink_to(link_to) + except Exception: + pass + + +def make_numbered_dir(root, prefix): + """create a directory with an increased number as suffix for the given prefix""" + for i in range(10): + # try up to 10 times to create the folder + max_existing = _max(map(parse_num, find_suffixes(root, prefix)), default=-1) + new_number = max_existing + 1 + new_path = root.joinpath("{}{}".format(prefix, new_number)) + try: + new_path.mkdir() + except Exception: + pass + else: + _force_symlink(root, prefix + "current", new_path) + return new_path + else: + raise EnvironmentError( + "could not create numbered dir with prefix " + "{prefix} in {root} after 10 tries".format(prefix=prefix, root=root) + ) + + +def create_cleanup_lock(p): + """crates a lock to prevent premature folder cleanup""" + lock_path = get_lock_path(p) + try: + fd = os.open(str(lock_path), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644) + except OSError as e: + if e.errno == errno.EEXIST: + six.raise_from( + EnvironmentError("cannot create lockfile in {path}".format(path=p)), e + ) + else: + raise + else: + pid = os.getpid() + spid = str(pid) + if not isinstance(spid, bytes): + spid = spid.encode("ascii") + os.write(fd, spid) + os.close(fd) + if not lock_path.is_file(): + raise EnvironmentError("lock path got renamed after successful creation") + return lock_path + + +def register_cleanup_lock_removal(lock_path, register=atexit.register): + """registers a cleanup function for removing a lock, by default on atexit""" + pid = os.getpid() + + def cleanup_on_exit(lock_path=lock_path, original_pid=pid): + current_pid = os.getpid() + if current_pid != original_pid: + # fork + return + try: + lock_path.unlink() + except (OSError, IOError): + pass + + return register(cleanup_on_exit) + + +def maybe_delete_a_numbered_dir(path): + """removes a numbered directory if its lock can be obtained and it does not seem to be in use""" + lock_path = None + try: + lock_path = create_cleanup_lock(path) + parent = path.parent + + garbage = parent.joinpath("garbage-{}".format(uuid.uuid4())) + path.rename(garbage) + rm_rf(garbage) + except (OSError, EnvironmentError): + # known races: + # * other process did a cleanup at the same time + # * deletable folder was found + # * process cwd (Windows) + return + finally: + # if we created the lock, ensure we remove it even if we failed + # to properly remove the numbered dir + if lock_path is not None: + try: + lock_path.unlink() + except (OSError, IOError): + pass + + +def ensure_deletable(path, consider_lock_dead_if_created_before): + """checks if a lock exists and breaks it if its considered dead""" + if path.is_symlink(): + return False + lock = get_lock_path(path) + if not lock.exists(): + return True + try: + lock_time = lock.stat().st_mtime + except Exception: + return False + else: + if lock_time < consider_lock_dead_if_created_before: + lock.unlink() + return True + else: + return False + + +def try_cleanup(path, consider_lock_dead_if_created_before): + """tries to cleanup a folder if we can ensure it's deletable""" + if ensure_deletable(path, consider_lock_dead_if_created_before): + maybe_delete_a_numbered_dir(path) + + +def cleanup_candidates(root, prefix, keep): + """lists candidates for numbered directories to be removed - follows py.path""" + max_existing = _max(map(parse_num, find_suffixes(root, prefix)), default=-1) + max_delete = max_existing - keep + paths = find_prefixed(root, prefix) + paths, paths2 = itertools.tee(paths) + numbers = map(parse_num, extract_suffixes(paths2, prefix)) + for path, number in zip(paths, numbers): + if number <= max_delete: + yield path + + +def cleanup_numbered_dir(root, prefix, keep, consider_lock_dead_if_created_before): + """cleanup for lock driven numbered directories""" + for path in cleanup_candidates(root, prefix, keep): + try_cleanup(path, consider_lock_dead_if_created_before) + for path in root.glob("garbage-*"): + try_cleanup(path, consider_lock_dead_if_created_before) + + +def make_numbered_dir_with_cleanup(root, prefix, keep, lock_timeout): + """creates a numbered dir with a cleanup lock and removes old ones""" + e = None + for i in range(10): + try: + p = make_numbered_dir(root, prefix) + lock_path = create_cleanup_lock(p) + register_cleanup_lock_removal(lock_path) + except Exception as exc: + e = exc + else: + consider_lock_dead_if_created_before = p.stat().st_mtime - lock_timeout + cleanup_numbered_dir( + root=root, + prefix=prefix, + keep=keep, + consider_lock_dead_if_created_before=consider_lock_dead_if_created_before, + ) + return p + assert e is not None + raise e + + +def resolve_from_str(input, root): + assert not isinstance(input, Path), "would break on py2" + root = Path(root) + input = expanduser(input) + input = expandvars(input) + if isabs(input): + return Path(input) + else: + return root.joinpath(input) + + +def fnmatch_ex(pattern, path): + """FNMatcher port from py.path.common which works with PurePath() instances. + + The difference between this algorithm and PurePath.match() is that the latter matches "**" glob expressions + for each part of the path, while this algorithm uses the whole path instead. + + For example: + "tests/foo/bar/doc/test_foo.py" matches pattern "tests/**/doc/test*.py" with this algorithm, but not with + PurePath.match(). + + This algorithm was ported to keep backward-compatibility with existing settings which assume paths match according + this logic. + + References: + * https://bugs.python.org/issue29249 + * https://bugs.python.org/issue34731 + """ + path = PurePath(path) + iswin32 = sys.platform.startswith("win") + + if iswin32 and sep not in pattern and posix_sep in pattern: + # Running on Windows, the pattern has no Windows path separators, + # and the pattern has one or more Posix path separators. Replace + # the Posix path separators with the Windows path separator. + pattern = pattern.replace(posix_sep, sep) + + if sep not in pattern: + name = path.name + else: + name = six.text_type(path) + return fnmatch.fnmatch(name, pattern) + + +def parts(s): + parts = s.split(sep) + return {sep.join(parts[: i + 1]) or sep for i in range(len(parts))} |