aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/pytest/py2/_pytest/pathlib.py
diff options
context:
space:
mode:
authordeshevoy <deshevoy@yandex-team.ru>2022-02-10 16:46:56 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:46:56 +0300
commite988f30484abe5fdeedcc7a5d3c226c01a21800c (patch)
tree0a217b173aabb57b7e51f8a169989b1a3e0309fe /contrib/python/pytest/py2/_pytest/pathlib.py
parent33ee501c05d3f24036ae89766a858930ae66c548 (diff)
downloadydb-e988f30484abe5fdeedcc7a5d3c226c01a21800c.tar.gz
Restoring authorship annotation for <deshevoy@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/python/pytest/py2/_pytest/pathlib.py')
-rw-r--r--contrib/python/pytest/py2/_pytest/pathlib.py620
1 files changed, 310 insertions, 310 deletions
diff --git a/contrib/python/pytest/py2/_pytest/pathlib.py b/contrib/python/pytest/py2/_pytest/pathlib.py
index 42071f4310..668bd2b2be 100644
--- a/contrib/python/pytest/py2/_pytest/pathlib.py
+++ b/contrib/python/pytest/py2/_pytest/pathlib.py
@@ -1,60 +1,60 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import
-import atexit
-import errno
-import fnmatch
-import itertools
-import operator
-import os
-import shutil
-import sys
-import uuid
+import atexit
+import errno
+import fnmatch
+import itertools
+import operator
+import os
+import shutil
+import sys
+import uuid
import warnings
from functools import partial
-from functools import reduce
-from os.path import expanduser
-from os.path import expandvars
-from os.path import isabs
-from os.path import sep
-from posixpath import sep as posix_sep
-
-import six
-from six.moves import map
-
-from .compat import PY36
+from functools import reduce
+from os.path import expanduser
+from os.path import expandvars
+from os.path import isabs
+from os.path import sep
+from posixpath import sep as posix_sep
+
+import six
+from six.moves import map
+
+from .compat import PY36
from _pytest.warning_types import PytestWarning
-
-if PY36:
- from pathlib import Path, PurePath
-else:
- from pathlib2 import Path, PurePath
-
-__all__ = ["Path", "PurePath"]
-
-
-LOCK_TIMEOUT = 60 * 60 * 3
-
-get_lock_path = operator.methodcaller("joinpath", ".lock")
-
-
-def ensure_reset_dir(path):
- """
- ensures the given path is an empty directory
- """
- if path.exists():
+
+if PY36:
+ from pathlib import Path, PurePath
+else:
+ from pathlib2 import Path, PurePath
+
+__all__ = ["Path", "PurePath"]
+
+
+LOCK_TIMEOUT = 60 * 60 * 3
+
+get_lock_path = operator.methodcaller("joinpath", ".lock")
+
+
+def ensure_reset_dir(path):
+ """
+ ensures the given path is an empty directory
+ """
+ if path.exists():
rm_rf(path)
- path.mkdir()
-
-
+ path.mkdir()
+
+
def on_rm_rf_error(func, path, exc, **kwargs):
"""Handles known read-only errors during rmtree.
-
+
The returned value is used only by our own tests.
"""
start_path = kwargs["start_path"]
exctype, excvalue = exc[:2]
-
+
# another process removed the file in the middle of the "rm_rf" (xdist for example)
# more context: https://github.com/pytest-dev/pytest/issues/5974#issuecomment-543799018
if isinstance(excvalue, OSError) and excvalue.errno == errno.ENOENT:
@@ -111,270 +111,270 @@ def rm_rf(path):
shutil.rmtree(str(path), onerror=onerror)
-def find_prefixed(root, prefix):
- """finds all elements in root that begin with the prefix, case insensitive"""
- l_prefix = prefix.lower()
- for x in root.iterdir():
- if x.name.lower().startswith(l_prefix):
- yield x
-
-
-def extract_suffixes(iter, prefix):
- """
- :param iter: iterator over path names
- :param prefix: expected prefix of the path names
- :returns: the parts of the paths following the prefix
- """
- p_len = len(prefix)
- for p in iter:
- yield p.name[p_len:]
-
-
-def find_suffixes(root, prefix):
- """combines find_prefixes and extract_suffixes
- """
- return extract_suffixes(find_prefixed(root, prefix), prefix)
-
-
-def parse_num(maybe_num):
- """parses number path suffixes, returns -1 on error"""
- try:
- return int(maybe_num)
- except ValueError:
- return -1
-
-
-if six.PY2:
-
- def _max(iterable, default):
- """needed due to python2.7 lacking the default argument for max"""
- return reduce(max, iterable, default)
-
-
-else:
- _max = max
-
-
-def _force_symlink(root, target, link_to):
- """helper to create the current symlink
-
- it's full of race conditions that are reasonably ok to ignore
- for the context of best effort linking to the latest testrun
-
- the presumption being thatin case of much parallelism
- the inaccuracy is going to be acceptable
- """
- current_symlink = root.joinpath(target)
- try:
- current_symlink.unlink()
- except OSError:
- pass
- try:
- current_symlink.symlink_to(link_to)
- except Exception:
- pass
-
-
-def make_numbered_dir(root, prefix):
- """create a directory with an increased number as suffix for the given prefix"""
- for i in range(10):
- # try up to 10 times to create the folder
- max_existing = _max(map(parse_num, find_suffixes(root, prefix)), default=-1)
- new_number = max_existing + 1
- new_path = root.joinpath("{}{}".format(prefix, new_number))
- try:
- new_path.mkdir()
- except Exception:
- pass
- else:
- _force_symlink(root, prefix + "current", new_path)
- return new_path
- else:
- raise EnvironmentError(
- "could not create numbered dir with prefix "
- "{prefix} in {root} after 10 tries".format(prefix=prefix, root=root)
- )
-
-
-def create_cleanup_lock(p):
- """crates a lock to prevent premature folder cleanup"""
- lock_path = get_lock_path(p)
- try:
- fd = os.open(str(lock_path), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644)
- except OSError as e:
- if e.errno == errno.EEXIST:
- six.raise_from(
- EnvironmentError("cannot create lockfile in {path}".format(path=p)), e
- )
- else:
- raise
- else:
- pid = os.getpid()
- spid = str(pid)
- if not isinstance(spid, bytes):
- spid = spid.encode("ascii")
- os.write(fd, spid)
- os.close(fd)
- if not lock_path.is_file():
- raise EnvironmentError("lock path got renamed after successful creation")
- return lock_path
-
-
-def register_cleanup_lock_removal(lock_path, register=atexit.register):
- """registers a cleanup function for removing a lock, by default on atexit"""
- pid = os.getpid()
-
- def cleanup_on_exit(lock_path=lock_path, original_pid=pid):
- current_pid = os.getpid()
- if current_pid != original_pid:
- # fork
- return
- try:
- lock_path.unlink()
- except (OSError, IOError):
- pass
-
- return register(cleanup_on_exit)
-
-
-def maybe_delete_a_numbered_dir(path):
- """removes a numbered directory if its lock can be obtained and it does not seem to be in use"""
- lock_path = None
- try:
- lock_path = create_cleanup_lock(path)
- parent = path.parent
-
- garbage = parent.joinpath("garbage-{}".format(uuid.uuid4()))
- path.rename(garbage)
+def find_prefixed(root, prefix):
+ """finds all elements in root that begin with the prefix, case insensitive"""
+ l_prefix = prefix.lower()
+ for x in root.iterdir():
+ if x.name.lower().startswith(l_prefix):
+ yield x
+
+
+def extract_suffixes(iter, prefix):
+ """
+ :param iter: iterator over path names
+ :param prefix: expected prefix of the path names
+ :returns: the parts of the paths following the prefix
+ """
+ p_len = len(prefix)
+ for p in iter:
+ yield p.name[p_len:]
+
+
+def find_suffixes(root, prefix):
+ """combines find_prefixes and extract_suffixes
+ """
+ return extract_suffixes(find_prefixed(root, prefix), prefix)
+
+
+def parse_num(maybe_num):
+ """parses number path suffixes, returns -1 on error"""
+ try:
+ return int(maybe_num)
+ except ValueError:
+ return -1
+
+
+if six.PY2:
+
+ def _max(iterable, default):
+ """needed due to python2.7 lacking the default argument for max"""
+ return reduce(max, iterable, default)
+
+
+else:
+ _max = max
+
+
+def _force_symlink(root, target, link_to):
+ """helper to create the current symlink
+
+ it's full of race conditions that are reasonably ok to ignore
+ for the context of best effort linking to the latest testrun
+
+ the presumption being thatin case of much parallelism
+ the inaccuracy is going to be acceptable
+ """
+ current_symlink = root.joinpath(target)
+ try:
+ current_symlink.unlink()
+ except OSError:
+ pass
+ try:
+ current_symlink.symlink_to(link_to)
+ except Exception:
+ pass
+
+
+def make_numbered_dir(root, prefix):
+ """create a directory with an increased number as suffix for the given prefix"""
+ for i in range(10):
+ # try up to 10 times to create the folder
+ max_existing = _max(map(parse_num, find_suffixes(root, prefix)), default=-1)
+ new_number = max_existing + 1
+ new_path = root.joinpath("{}{}".format(prefix, new_number))
+ try:
+ new_path.mkdir()
+ except Exception:
+ pass
+ else:
+ _force_symlink(root, prefix + "current", new_path)
+ return new_path
+ else:
+ raise EnvironmentError(
+ "could not create numbered dir with prefix "
+ "{prefix} in {root} after 10 tries".format(prefix=prefix, root=root)
+ )
+
+
+def create_cleanup_lock(p):
+ """crates a lock to prevent premature folder cleanup"""
+ lock_path = get_lock_path(p)
+ try:
+ fd = os.open(str(lock_path), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644)
+ except OSError as e:
+ if e.errno == errno.EEXIST:
+ six.raise_from(
+ EnvironmentError("cannot create lockfile in {path}".format(path=p)), e
+ )
+ else:
+ raise
+ else:
+ pid = os.getpid()
+ spid = str(pid)
+ if not isinstance(spid, bytes):
+ spid = spid.encode("ascii")
+ os.write(fd, spid)
+ os.close(fd)
+ if not lock_path.is_file():
+ raise EnvironmentError("lock path got renamed after successful creation")
+ return lock_path
+
+
+def register_cleanup_lock_removal(lock_path, register=atexit.register):
+ """registers a cleanup function for removing a lock, by default on atexit"""
+ pid = os.getpid()
+
+ def cleanup_on_exit(lock_path=lock_path, original_pid=pid):
+ current_pid = os.getpid()
+ if current_pid != original_pid:
+ # fork
+ return
+ try:
+ lock_path.unlink()
+ except (OSError, IOError):
+ pass
+
+ return register(cleanup_on_exit)
+
+
+def maybe_delete_a_numbered_dir(path):
+ """removes a numbered directory if its lock can be obtained and it does not seem to be in use"""
+ lock_path = None
+ try:
+ lock_path = create_cleanup_lock(path)
+ parent = path.parent
+
+ garbage = parent.joinpath("garbage-{}".format(uuid.uuid4()))
+ path.rename(garbage)
rm_rf(garbage)
- except (OSError, EnvironmentError):
- # known races:
- # * other process did a cleanup at the same time
- # * deletable folder was found
- # * process cwd (Windows)
- return
- finally:
- # if we created the lock, ensure we remove it even if we failed
- # to properly remove the numbered dir
- if lock_path is not None:
- try:
- lock_path.unlink()
- except (OSError, IOError):
- pass
-
-
-def ensure_deletable(path, consider_lock_dead_if_created_before):
- """checks if a lock exists and breaks it if its considered dead"""
- if path.is_symlink():
- return False
- lock = get_lock_path(path)
- if not lock.exists():
- return True
- try:
- lock_time = lock.stat().st_mtime
- except Exception:
- return False
- else:
- if lock_time < consider_lock_dead_if_created_before:
- lock.unlink()
- return True
- else:
- return False
-
-
-def try_cleanup(path, consider_lock_dead_if_created_before):
- """tries to cleanup a folder if we can ensure it's deletable"""
- if ensure_deletable(path, consider_lock_dead_if_created_before):
- maybe_delete_a_numbered_dir(path)
-
-
-def cleanup_candidates(root, prefix, keep):
- """lists candidates for numbered directories to be removed - follows py.path"""
- max_existing = _max(map(parse_num, find_suffixes(root, prefix)), default=-1)
- max_delete = max_existing - keep
- paths = find_prefixed(root, prefix)
- paths, paths2 = itertools.tee(paths)
- numbers = map(parse_num, extract_suffixes(paths2, prefix))
- for path, number in zip(paths, numbers):
- if number <= max_delete:
- yield path
-
-
-def cleanup_numbered_dir(root, prefix, keep, consider_lock_dead_if_created_before):
- """cleanup for lock driven numbered directories"""
- for path in cleanup_candidates(root, prefix, keep):
- try_cleanup(path, consider_lock_dead_if_created_before)
- for path in root.glob("garbage-*"):
- try_cleanup(path, consider_lock_dead_if_created_before)
-
-
-def make_numbered_dir_with_cleanup(root, prefix, keep, lock_timeout):
- """creates a numbered dir with a cleanup lock and removes old ones"""
- e = None
- for i in range(10):
- try:
- p = make_numbered_dir(root, prefix)
- lock_path = create_cleanup_lock(p)
- register_cleanup_lock_removal(lock_path)
- except Exception as exc:
- e = exc
- else:
- consider_lock_dead_if_created_before = p.stat().st_mtime - lock_timeout
- cleanup_numbered_dir(
- root=root,
- prefix=prefix,
- keep=keep,
- consider_lock_dead_if_created_before=consider_lock_dead_if_created_before,
- )
- return p
- assert e is not None
- raise e
-
-
-def resolve_from_str(input, root):
- assert not isinstance(input, Path), "would break on py2"
- root = Path(root)
- input = expanduser(input)
- input = expandvars(input)
- if isabs(input):
- return Path(input)
- else:
- return root.joinpath(input)
-
-
-def fnmatch_ex(pattern, path):
- """FNMatcher port from py.path.common which works with PurePath() instances.
-
- The difference between this algorithm and PurePath.match() is that the latter matches "**" glob expressions
- for each part of the path, while this algorithm uses the whole path instead.
-
- For example:
- "tests/foo/bar/doc/test_foo.py" matches pattern "tests/**/doc/test*.py" with this algorithm, but not with
- PurePath.match().
-
- This algorithm was ported to keep backward-compatibility with existing settings which assume paths match according
- this logic.
-
- References:
- * https://bugs.python.org/issue29249
- * https://bugs.python.org/issue34731
- """
- path = PurePath(path)
- iswin32 = sys.platform.startswith("win")
-
- if iswin32 and sep not in pattern and posix_sep in pattern:
- # Running on Windows, the pattern has no Windows path separators,
- # and the pattern has one or more Posix path separators. Replace
- # the Posix path separators with the Windows path separator.
- pattern = pattern.replace(posix_sep, sep)
-
- if sep not in pattern:
- name = path.name
- else:
- name = six.text_type(path)
- return fnmatch.fnmatch(name, pattern)
-
-
-def parts(s):
- parts = s.split(sep)
- return {sep.join(parts[: i + 1]) or sep for i in range(len(parts))}
+ except (OSError, EnvironmentError):
+ # known races:
+ # * other process did a cleanup at the same time
+ # * deletable folder was found
+ # * process cwd (Windows)
+ return
+ finally:
+ # if we created the lock, ensure we remove it even if we failed
+ # to properly remove the numbered dir
+ if lock_path is not None:
+ try:
+ lock_path.unlink()
+ except (OSError, IOError):
+ pass
+
+
+def ensure_deletable(path, consider_lock_dead_if_created_before):
+ """checks if a lock exists and breaks it if its considered dead"""
+ if path.is_symlink():
+ return False
+ lock = get_lock_path(path)
+ if not lock.exists():
+ return True
+ try:
+ lock_time = lock.stat().st_mtime
+ except Exception:
+ return False
+ else:
+ if lock_time < consider_lock_dead_if_created_before:
+ lock.unlink()
+ return True
+ else:
+ return False
+
+
+def try_cleanup(path, consider_lock_dead_if_created_before):
+ """tries to cleanup a folder if we can ensure it's deletable"""
+ if ensure_deletable(path, consider_lock_dead_if_created_before):
+ maybe_delete_a_numbered_dir(path)
+
+
+def cleanup_candidates(root, prefix, keep):
+ """lists candidates for numbered directories to be removed - follows py.path"""
+ max_existing = _max(map(parse_num, find_suffixes(root, prefix)), default=-1)
+ max_delete = max_existing - keep
+ paths = find_prefixed(root, prefix)
+ paths, paths2 = itertools.tee(paths)
+ numbers = map(parse_num, extract_suffixes(paths2, prefix))
+ for path, number in zip(paths, numbers):
+ if number <= max_delete:
+ yield path
+
+
+def cleanup_numbered_dir(root, prefix, keep, consider_lock_dead_if_created_before):
+ """cleanup for lock driven numbered directories"""
+ for path in cleanup_candidates(root, prefix, keep):
+ try_cleanup(path, consider_lock_dead_if_created_before)
+ for path in root.glob("garbage-*"):
+ try_cleanup(path, consider_lock_dead_if_created_before)
+
+
+def make_numbered_dir_with_cleanup(root, prefix, keep, lock_timeout):
+ """creates a numbered dir with a cleanup lock and removes old ones"""
+ e = None
+ for i in range(10):
+ try:
+ p = make_numbered_dir(root, prefix)
+ lock_path = create_cleanup_lock(p)
+ register_cleanup_lock_removal(lock_path)
+ except Exception as exc:
+ e = exc
+ else:
+ consider_lock_dead_if_created_before = p.stat().st_mtime - lock_timeout
+ cleanup_numbered_dir(
+ root=root,
+ prefix=prefix,
+ keep=keep,
+ consider_lock_dead_if_created_before=consider_lock_dead_if_created_before,
+ )
+ return p
+ assert e is not None
+ raise e
+
+
+def resolve_from_str(input, root):
+ assert not isinstance(input, Path), "would break on py2"
+ root = Path(root)
+ input = expanduser(input)
+ input = expandvars(input)
+ if isabs(input):
+ return Path(input)
+ else:
+ return root.joinpath(input)
+
+
+def fnmatch_ex(pattern, path):
+ """FNMatcher port from py.path.common which works with PurePath() instances.
+
+ The difference between this algorithm and PurePath.match() is that the latter matches "**" glob expressions
+ for each part of the path, while this algorithm uses the whole path instead.
+
+ For example:
+ "tests/foo/bar/doc/test_foo.py" matches pattern "tests/**/doc/test*.py" with this algorithm, but not with
+ PurePath.match().
+
+ This algorithm was ported to keep backward-compatibility with existing settings which assume paths match according
+ this logic.
+
+ References:
+ * https://bugs.python.org/issue29249
+ * https://bugs.python.org/issue34731
+ """
+ path = PurePath(path)
+ iswin32 = sys.platform.startswith("win")
+
+ if iswin32 and sep not in pattern and posix_sep in pattern:
+ # Running on Windows, the pattern has no Windows path separators,
+ # and the pattern has one or more Posix path separators. Replace
+ # the Posix path separators with the Windows path separator.
+ pattern = pattern.replace(posix_sep, sep)
+
+ if sep not in pattern:
+ name = path.name
+ else:
+ name = six.text_type(path)
+ return fnmatch.fnmatch(name, pattern)
+
+
+def parts(s):
+ parts = s.split(sep)
+ return {sep.join(parts[: i + 1]) or sep for i in range(len(parts))}