diff options
| author | Devtools Arcadia <[email protected]> | 2022-02-07 18:08:42 +0300 | 
|---|---|---|
| committer | Devtools Arcadia <[email protected]> | 2022-02-07 18:08:42 +0300 | 
| commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
| tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /contrib/python/pytest/py2/_pytest/pathlib.py | |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'contrib/python/pytest/py2/_pytest/pathlib.py')
| -rw-r--r-- | contrib/python/pytest/py2/_pytest/pathlib.py | 380 | 
1 files changed, 380 insertions, 0 deletions
diff --git a/contrib/python/pytest/py2/_pytest/pathlib.py b/contrib/python/pytest/py2/_pytest/pathlib.py new file mode 100644 index 00000000000..42071f43104 --- /dev/null +++ b/contrib/python/pytest/py2/_pytest/pathlib.py @@ -0,0 +1,380 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +import atexit +import errno +import fnmatch +import itertools +import operator +import os +import shutil +import sys +import uuid +import warnings +from functools import partial +from functools import reduce +from os.path import expanduser +from os.path import expandvars +from os.path import isabs +from os.path import sep +from posixpath import sep as posix_sep + +import six +from six.moves import map + +from .compat import PY36 +from _pytest.warning_types import PytestWarning + +if PY36: +    from pathlib import Path, PurePath +else: +    from pathlib2 import Path, PurePath + +__all__ = ["Path", "PurePath"] + + +LOCK_TIMEOUT = 60 * 60 * 3 + +get_lock_path = operator.methodcaller("joinpath", ".lock") + + +def ensure_reset_dir(path): +    """ +    ensures the given path is an empty directory +    """ +    if path.exists(): +        rm_rf(path) +    path.mkdir() + + +def on_rm_rf_error(func, path, exc, **kwargs): +    """Handles known read-only errors during rmtree. + +    The returned value is used only by our own tests. +    """ +    start_path = kwargs["start_path"] +    exctype, excvalue = exc[:2] + +    # another process removed the file in the middle of the "rm_rf" (xdist for example) +    # more context: https://github.com/pytest-dev/pytest/issues/5974#issuecomment-543799018 +    if isinstance(excvalue, OSError) and excvalue.errno == errno.ENOENT: +        return False + +    if not isinstance(excvalue, OSError) or excvalue.errno not in ( +        errno.EACCES, +        errno.EPERM, +    ): +        warnings.warn( +            PytestWarning( +                "(rm_rf) error removing {}\n{}: {}".format(path, exctype, excvalue) +            ) +        ) +        return False + +    if func not in (os.rmdir, os.remove, os.unlink): +        warnings.warn( +            PytestWarning( +                "(rm_rf) unknown function {} when removing {}:\n{}: {}".format( +                    path, func, exctype, excvalue +                ) +            ) +        ) +        return False + +    # Chmod + retry. +    import stat + +    def chmod_rw(p): +        mode = os.stat(p).st_mode +        os.chmod(p, mode | stat.S_IRUSR | stat.S_IWUSR) + +    # For files, we need to recursively go upwards in the directories to +    # ensure they all are also writable. +    p = Path(path) +    if p.is_file(): +        for parent in p.parents: +            chmod_rw(str(parent)) +            # stop when we reach the original path passed to rm_rf +            if parent == start_path: +                break +    chmod_rw(str(path)) + +    func(path) +    return True + + +def rm_rf(path): +    """Remove the path contents recursively, even if some elements +    are read-only. +    """ +    onerror = partial(on_rm_rf_error, start_path=path) +    shutil.rmtree(str(path), onerror=onerror) + + +def find_prefixed(root, prefix): +    """finds all elements in root that begin with the prefix, case insensitive""" +    l_prefix = prefix.lower() +    for x in root.iterdir(): +        if x.name.lower().startswith(l_prefix): +            yield x + + +def extract_suffixes(iter, prefix): +    """ +    :param iter: iterator over path names +    :param prefix: expected prefix of the path names +    :returns: the parts of the paths following the prefix +    """ +    p_len = len(prefix) +    for p in iter: +        yield p.name[p_len:] + + +def find_suffixes(root, prefix): +    """combines find_prefixes and extract_suffixes +    """ +    return extract_suffixes(find_prefixed(root, prefix), prefix) + + +def parse_num(maybe_num): +    """parses number path suffixes, returns -1 on error""" +    try: +        return int(maybe_num) +    except ValueError: +        return -1 + + +if six.PY2: + +    def _max(iterable, default): +        """needed due to python2.7 lacking the default argument for max""" +        return reduce(max, iterable, default) + + +else: +    _max = max + + +def _force_symlink(root, target, link_to): +    """helper to create the current symlink + +    it's full of race conditions that are reasonably ok to ignore +    for the context of best effort linking to the latest testrun + +    the presumption being thatin case of much parallelism +    the inaccuracy is going to be acceptable +    """ +    current_symlink = root.joinpath(target) +    try: +        current_symlink.unlink() +    except OSError: +        pass +    try: +        current_symlink.symlink_to(link_to) +    except Exception: +        pass + + +def make_numbered_dir(root, prefix): +    """create a directory with an increased number as suffix for the given prefix""" +    for i in range(10): +        # try up to 10 times to create the folder +        max_existing = _max(map(parse_num, find_suffixes(root, prefix)), default=-1) +        new_number = max_existing + 1 +        new_path = root.joinpath("{}{}".format(prefix, new_number)) +        try: +            new_path.mkdir() +        except Exception: +            pass +        else: +            _force_symlink(root, prefix + "current", new_path) +            return new_path +    else: +        raise EnvironmentError( +            "could not create numbered dir with prefix " +            "{prefix} in {root} after 10 tries".format(prefix=prefix, root=root) +        ) + + +def create_cleanup_lock(p): +    """crates a lock to prevent premature folder cleanup""" +    lock_path = get_lock_path(p) +    try: +        fd = os.open(str(lock_path), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644) +    except OSError as e: +        if e.errno == errno.EEXIST: +            six.raise_from( +                EnvironmentError("cannot create lockfile in {path}".format(path=p)), e +            ) +        else: +            raise +    else: +        pid = os.getpid() +        spid = str(pid) +        if not isinstance(spid, bytes): +            spid = spid.encode("ascii") +        os.write(fd, spid) +        os.close(fd) +        if not lock_path.is_file(): +            raise EnvironmentError("lock path got renamed after successful creation") +        return lock_path + + +def register_cleanup_lock_removal(lock_path, register=atexit.register): +    """registers a cleanup function for removing a lock, by default on atexit""" +    pid = os.getpid() + +    def cleanup_on_exit(lock_path=lock_path, original_pid=pid): +        current_pid = os.getpid() +        if current_pid != original_pid: +            # fork +            return +        try: +            lock_path.unlink() +        except (OSError, IOError): +            pass + +    return register(cleanup_on_exit) + + +def maybe_delete_a_numbered_dir(path): +    """removes a numbered directory if its lock can be obtained and it does not seem to be in use""" +    lock_path = None +    try: +        lock_path = create_cleanup_lock(path) +        parent = path.parent + +        garbage = parent.joinpath("garbage-{}".format(uuid.uuid4())) +        path.rename(garbage) +        rm_rf(garbage) +    except (OSError, EnvironmentError): +        #  known races: +        #  * other process did a cleanup at the same time +        #  * deletable folder was found +        #  * process cwd (Windows) +        return +    finally: +        # if we created the lock, ensure we remove it even if we failed +        # to properly remove the numbered dir +        if lock_path is not None: +            try: +                lock_path.unlink() +            except (OSError, IOError): +                pass + + +def ensure_deletable(path, consider_lock_dead_if_created_before): +    """checks if a lock exists and breaks it if its considered dead""" +    if path.is_symlink(): +        return False +    lock = get_lock_path(path) +    if not lock.exists(): +        return True +    try: +        lock_time = lock.stat().st_mtime +    except Exception: +        return False +    else: +        if lock_time < consider_lock_dead_if_created_before: +            lock.unlink() +            return True +        else: +            return False + + +def try_cleanup(path, consider_lock_dead_if_created_before): +    """tries to cleanup a folder if we can ensure it's deletable""" +    if ensure_deletable(path, consider_lock_dead_if_created_before): +        maybe_delete_a_numbered_dir(path) + + +def cleanup_candidates(root, prefix, keep): +    """lists candidates for numbered directories to be removed - follows py.path""" +    max_existing = _max(map(parse_num, find_suffixes(root, prefix)), default=-1) +    max_delete = max_existing - keep +    paths = find_prefixed(root, prefix) +    paths, paths2 = itertools.tee(paths) +    numbers = map(parse_num, extract_suffixes(paths2, prefix)) +    for path, number in zip(paths, numbers): +        if number <= max_delete: +            yield path + + +def cleanup_numbered_dir(root, prefix, keep, consider_lock_dead_if_created_before): +    """cleanup for lock driven numbered directories""" +    for path in cleanup_candidates(root, prefix, keep): +        try_cleanup(path, consider_lock_dead_if_created_before) +    for path in root.glob("garbage-*"): +        try_cleanup(path, consider_lock_dead_if_created_before) + + +def make_numbered_dir_with_cleanup(root, prefix, keep, lock_timeout): +    """creates a numbered dir with a cleanup lock and removes old ones""" +    e = None +    for i in range(10): +        try: +            p = make_numbered_dir(root, prefix) +            lock_path = create_cleanup_lock(p) +            register_cleanup_lock_removal(lock_path) +        except Exception as exc: +            e = exc +        else: +            consider_lock_dead_if_created_before = p.stat().st_mtime - lock_timeout +            cleanup_numbered_dir( +                root=root, +                prefix=prefix, +                keep=keep, +                consider_lock_dead_if_created_before=consider_lock_dead_if_created_before, +            ) +            return p +    assert e is not None +    raise e + + +def resolve_from_str(input, root): +    assert not isinstance(input, Path), "would break on py2" +    root = Path(root) +    input = expanduser(input) +    input = expandvars(input) +    if isabs(input): +        return Path(input) +    else: +        return root.joinpath(input) + + +def fnmatch_ex(pattern, path): +    """FNMatcher port from py.path.common which works with PurePath() instances. + +    The difference between this algorithm and PurePath.match() is that the latter matches "**" glob expressions +    for each part of the path, while this algorithm uses the whole path instead. + +    For example: +        "tests/foo/bar/doc/test_foo.py" matches pattern "tests/**/doc/test*.py" with this algorithm, but not with +        PurePath.match(). + +    This algorithm was ported to keep backward-compatibility with existing settings which assume paths match according +    this logic. + +    References: +    * https://bugs.python.org/issue29249 +    * https://bugs.python.org/issue34731 +    """ +    path = PurePath(path) +    iswin32 = sys.platform.startswith("win") + +    if iswin32 and sep not in pattern and posix_sep in pattern: +        # Running on Windows, the pattern has no Windows path separators, +        # and the pattern has one or more Posix path separators. Replace +        # the Posix path separators with the Windows path separator. +        pattern = pattern.replace(posix_sep, sep) + +    if sep not in pattern: +        name = path.name +    else: +        name = six.text_type(path) +    return fnmatch.fnmatch(name, pattern) + + +def parts(s): +    parts = s.split(sep) +    return {sep.join(parts[: i + 1]) or sep for i in range(len(parts))}  | 
