aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/pytest/py3/_pytest/pathlib.py
diff options
context:
space:
mode:
authordeshevoy <deshevoy@yandex-team.ru>2022-02-10 16:46:56 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:46:56 +0300
commite988f30484abe5fdeedcc7a5d3c226c01a21800c (patch)
tree0a217b173aabb57b7e51f8a169989b1a3e0309fe /contrib/python/pytest/py3/_pytest/pathlib.py
parent33ee501c05d3f24036ae89766a858930ae66c548 (diff)
downloadydb-e988f30484abe5fdeedcc7a5d3c226c01a21800c.tar.gz
Restoring authorship annotation for <deshevoy@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/python/pytest/py3/_pytest/pathlib.py')
-rw-r--r--contrib/python/pytest/py3/_pytest/pathlib.py384
1 files changed, 192 insertions, 192 deletions
diff --git a/contrib/python/pytest/py3/_pytest/pathlib.py b/contrib/python/pytest/py3/_pytest/pathlib.py
index 7d9269a1855..02ba612d320 100644
--- a/contrib/python/pytest/py3/_pytest/pathlib.py
+++ b/contrib/python/pytest/py3/_pytest/pathlib.py
@@ -1,12 +1,12 @@
-import atexit
+import atexit
import contextlib
-import fnmatch
+import fnmatch
import importlib.util
-import itertools
-import os
-import shutil
-import sys
-import uuid
+import itertools
+import os
+import shutil
+import sys
+import uuid
import warnings
from enum import Enum
from errno import EBADF
@@ -14,13 +14,13 @@ from errno import ELOOP
from errno import ENOENT
from errno import ENOTDIR
from functools import partial
-from os.path import expanduser
-from os.path import expandvars
-from os.path import isabs
-from os.path import sep
+from os.path import expanduser
+from os.path import expandvars
+from os.path import isabs
+from os.path import sep
from pathlib import Path
from pathlib import PurePath
-from posixpath import sep as posix_sep
+from posixpath import sep as posix_sep
from types import ModuleType
from typing import Callable
from typing import Iterable
@@ -29,29 +29,29 @@ from typing import Optional
from typing import Set
from typing import TypeVar
from typing import Union
-
+
import py
from _pytest.compat import assert_never
from _pytest.outcomes import skip
from _pytest.warning_types import PytestWarning
-
+
LOCK_TIMEOUT = 60 * 60 * 24 * 3
-
-
+
+
_AnyPurePath = TypeVar("_AnyPurePath", bound=PurePath)
-
+
# The following function, variables and comments were
# copied from cpython 3.9 Lib/pathlib.py file.
-
+
# EBADF - guard against macOS `stat` throwing EBADF
_IGNORED_ERRORS = (ENOENT, ENOTDIR, EBADF, ELOOP)
-
+
_IGNORED_WINERRORS = (
21, # ERROR_NOT_READY - drive exists but is not accessible
1921, # ERROR_CANT_RESOLVE_FILENAME - fix for broken symlink pointing to itself
)
-
+
def _ignore_error(exception):
return (
@@ -66,11 +66,11 @@ def get_lock_path(path: _AnyPurePath) -> _AnyPurePath:
def on_rm_rf_error(func, path: str, exc, *, start_path: Path) -> bool:
"""Handle known read-only errors during rmtree.
-
+
The returned value is used only by our own tests.
"""
exctype, excvalue = exc[:2]
-
+
# Another process removed the file in the middle of the "rm_rf" (xdist for example).
# More context: https://github.com/pytest-dev/pytest/issues/5974#issuecomment-543799018
if isinstance(excvalue, FileNotFoundError):
@@ -155,147 +155,147 @@ def rm_rf(path: Path) -> None:
def find_prefixed(root: Path, prefix: str) -> Iterator[Path]:
"""Find all elements in root that begin with the prefix, case insensitive."""
- l_prefix = prefix.lower()
- for x in root.iterdir():
- if x.name.lower().startswith(l_prefix):
- yield x
-
-
+ l_prefix = prefix.lower()
+ for x in root.iterdir():
+ if x.name.lower().startswith(l_prefix):
+ yield x
+
+
def extract_suffixes(iter: Iterable[PurePath], prefix: str) -> Iterator[str]:
"""Return the parts of the paths following the prefix.
:param iter: Iterator over path names.
:param prefix: Expected prefix of the path names.
- """
- p_len = len(prefix)
- for p in iter:
- yield p.name[p_len:]
-
-
+ """
+ p_len = len(prefix)
+ for p in iter:
+ yield p.name[p_len:]
+
+
def find_suffixes(root: Path, prefix: str) -> Iterator[str]:
"""Combine find_prefixes and extract_suffixes."""
- return extract_suffixes(find_prefixed(root, prefix), prefix)
-
-
+ return extract_suffixes(find_prefixed(root, prefix), prefix)
+
+
def parse_num(maybe_num) -> int:
"""Parse number path suffixes, returns -1 on error."""
- try:
- return int(maybe_num)
- except ValueError:
- return -1
-
-
+ try:
+ return int(maybe_num)
+ except ValueError:
+ return -1
+
+
def _force_symlink(
root: Path, target: Union[str, PurePath], link_to: Union[str, Path]
) -> None:
"""Helper to create the current symlink.
-
+
It's full of race conditions that are reasonably OK to ignore
for the context of best effort linking to the latest test run.
-
+
The presumption being that in case of much parallelism
the inaccuracy is going to be acceptable.
- """
- current_symlink = root.joinpath(target)
- try:
- current_symlink.unlink()
- except OSError:
- pass
- try:
- current_symlink.symlink_to(link_to)
- except Exception:
- pass
-
-
+ """
+ current_symlink = root.joinpath(target)
+ try:
+ current_symlink.unlink()
+ except OSError:
+ pass
+ try:
+ current_symlink.symlink_to(link_to)
+ except Exception:
+ pass
+
+
def make_numbered_dir(root: Path, prefix: str, mode: int = 0o700) -> Path:
"""Create a directory with an increased number as suffix for the given prefix."""
- for i in range(10):
- # try up to 10 times to create the folder
+ for i in range(10):
+ # try up to 10 times to create the folder
max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1)
- new_number = max_existing + 1
+ new_number = max_existing + 1
new_path = root.joinpath(f"{prefix}{new_number}")
- try:
+ try:
new_path.mkdir(mode=mode)
- except Exception:
- pass
- else:
- _force_symlink(root, prefix + "current", new_path)
- return new_path
- else:
+ except Exception:
+ pass
+ else:
+ _force_symlink(root, prefix + "current", new_path)
+ return new_path
+ else:
raise OSError(
- "could not create numbered dir with prefix "
- "{prefix} in {root} after 10 tries".format(prefix=prefix, root=root)
- )
-
-
+ "could not create numbered dir with prefix "
+ "{prefix} in {root} after 10 tries".format(prefix=prefix, root=root)
+ )
+
+
def create_cleanup_lock(p: Path) -> Path:
"""Create a lock to prevent premature folder cleanup."""
- lock_path = get_lock_path(p)
- try:
- fd = os.open(str(lock_path), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644)
+ lock_path = get_lock_path(p)
+ try:
+ fd = os.open(str(lock_path), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644)
except FileExistsError as e:
raise OSError(f"cannot create lockfile in {p}") from e
- else:
- pid = os.getpid()
+ else:
+ pid = os.getpid()
spid = str(pid).encode()
- os.write(fd, spid)
- os.close(fd)
- if not lock_path.is_file():
+ os.write(fd, spid)
+ os.close(fd)
+ if not lock_path.is_file():
raise OSError("lock path got renamed after successful creation")
- return lock_path
-
-
+ return lock_path
+
+
def register_cleanup_lock_removal(lock_path: Path, register=atexit.register):
"""Register a cleanup function for removing a lock, by default on atexit."""
- pid = os.getpid()
-
+ pid = os.getpid()
+
def cleanup_on_exit(lock_path: Path = lock_path, original_pid: int = pid) -> None:
- current_pid = os.getpid()
- if current_pid != original_pid:
- # fork
- return
- try:
- lock_path.unlink()
+ current_pid = os.getpid()
+ if current_pid != original_pid:
+ # fork
+ return
+ try:
+ lock_path.unlink()
except OSError:
- pass
-
- return register(cleanup_on_exit)
-
-
+ pass
+
+ return register(cleanup_on_exit)
+
+
def maybe_delete_a_numbered_dir(path: Path) -> None:
"""Remove a numbered directory if its lock can be obtained and it does
not seem to be in use."""
path = ensure_extended_length_path(path)
- lock_path = None
- try:
- lock_path = create_cleanup_lock(path)
- parent = path.parent
-
+ lock_path = None
+ try:
+ lock_path = create_cleanup_lock(path)
+ parent = path.parent
+
garbage = parent.joinpath(f"garbage-{uuid.uuid4()}")
- path.rename(garbage)
+ path.rename(garbage)
rm_rf(garbage)
except OSError:
- # known races:
- # * other process did a cleanup at the same time
- # * deletable folder was found
- # * process cwd (Windows)
- return
- finally:
+ # known races:
+ # * other process did a cleanup at the same time
+ # * deletable folder was found
+ # * process cwd (Windows)
+ return
+ finally:
# If we created the lock, ensure we remove it even if we failed
# to properly remove the numbered dir.
- if lock_path is not None:
- try:
- lock_path.unlink()
+ if lock_path is not None:
+ try:
+ lock_path.unlink()
except OSError:
- pass
-
-
+ pass
+
+
def ensure_deletable(path: Path, consider_lock_dead_if_created_before: float) -> bool:
"""Check if `path` is deletable based on whether the lock file is expired."""
- if path.is_symlink():
- return False
- lock = get_lock_path(path)
- try:
+ if path.is_symlink():
+ return False
+ lock = get_lock_path(path)
+ try:
if not lock.is_file():
return True
except OSError:
@@ -303,11 +303,11 @@ def ensure_deletable(path: Path, consider_lock_dead_if_created_before: float) ->
# we don't have access to the entire directory (#7491).
return False
try:
- lock_time = lock.stat().st_mtime
- except Exception:
- return False
- else:
- if lock_time < consider_lock_dead_if_created_before:
+ lock_time = lock.stat().st_mtime
+ except Exception:
+ return False
+ else:
+ if lock_time < consider_lock_dead_if_created_before:
# We want to ignore any errors while trying to remove the lock such as:
# - PermissionDenied, like the file permissions have changed since the lock creation;
# - FileNotFoundError, in case another pytest process got here first;
@@ -316,50 +316,50 @@ def ensure_deletable(path: Path, consider_lock_dead_if_created_before: float) ->
lock.unlink()
return True
return False
-
-
+
+
def try_cleanup(path: Path, consider_lock_dead_if_created_before: float) -> None:
"""Try to cleanup a folder if we can ensure it's deletable."""
- if ensure_deletable(path, consider_lock_dead_if_created_before):
- maybe_delete_a_numbered_dir(path)
-
-
+ if ensure_deletable(path, consider_lock_dead_if_created_before):
+ maybe_delete_a_numbered_dir(path)
+
+
def cleanup_candidates(root: Path, prefix: str, keep: int) -> Iterator[Path]:
"""List candidates for numbered directories to be removed - follows py.path."""
max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1)
- max_delete = max_existing - keep
- paths = find_prefixed(root, prefix)
- paths, paths2 = itertools.tee(paths)
- numbers = map(parse_num, extract_suffixes(paths2, prefix))
- for path, number in zip(paths, numbers):
- if number <= max_delete:
- yield path
-
-
+ max_delete = max_existing - keep
+ paths = find_prefixed(root, prefix)
+ paths, paths2 = itertools.tee(paths)
+ numbers = map(parse_num, extract_suffixes(paths2, prefix))
+ for path, number in zip(paths, numbers):
+ if number <= max_delete:
+ yield path
+
+
def cleanup_numbered_dir(
root: Path, prefix: str, keep: int, consider_lock_dead_if_created_before: float
) -> None:
"""Cleanup for lock driven numbered directories."""
- for path in cleanup_candidates(root, prefix, keep):
- try_cleanup(path, consider_lock_dead_if_created_before)
- for path in root.glob("garbage-*"):
- try_cleanup(path, consider_lock_dead_if_created_before)
-
-
+ for path in cleanup_candidates(root, prefix, keep):
+ try_cleanup(path, consider_lock_dead_if_created_before)
+ for path in root.glob("garbage-*"):
+ try_cleanup(path, consider_lock_dead_if_created_before)
+
+
def make_numbered_dir_with_cleanup(
root: Path, prefix: str, keep: int, lock_timeout: float, mode: int,
) -> Path:
"""Create a numbered dir with a cleanup lock and remove old ones."""
- e = None
- for i in range(10):
- try:
+ e = None
+ for i in range(10):
+ try:
p = make_numbered_dir(root, prefix, mode)
- lock_path = create_cleanup_lock(p)
- register_cleanup_lock_removal(lock_path)
- except Exception as exc:
- e = exc
- else:
- consider_lock_dead_if_created_before = p.stat().st_mtime - lock_timeout
+ lock_path = create_cleanup_lock(p)
+ register_cleanup_lock_removal(lock_path)
+ except Exception as exc:
+ e = exc
+ else:
+ consider_lock_dead_if_created_before = p.stat().st_mtime - lock_timeout
# Register a cleanup for program exit
atexit.register(
cleanup_numbered_dir,
@@ -367,60 +367,60 @@ def make_numbered_dir_with_cleanup(
prefix,
keep,
consider_lock_dead_if_created_before,
- )
- return p
- assert e is not None
- raise e
-
-
+ )
+ return p
+ assert e is not None
+ raise e
+
+
def resolve_from_str(input: str, rootpath: Path) -> Path:
- input = expanduser(input)
- input = expandvars(input)
- if isabs(input):
- return Path(input)
- else:
+ input = expanduser(input)
+ input = expandvars(input)
+ if isabs(input):
+ return Path(input)
+ else:
return rootpath.joinpath(input)
-
-
+
+
def fnmatch_ex(pattern: str, path) -> bool:
"""A port of FNMatcher from py.path.common which works with PurePath() instances.
-
+
The difference between this algorithm and PurePath.match() is that the
latter matches "**" glob expressions for each part of the path, while
this algorithm uses the whole path instead.
-
- For example:
+
+ For example:
"tests/foo/bar/doc/test_foo.py" matches pattern "tests/**/doc/test*.py"
with this algorithm, but not with PurePath.match().
-
+
This algorithm was ported to keep backward-compatibility with existing
settings which assume paths match according this logic.
-
- References:
- * https://bugs.python.org/issue29249
- * https://bugs.python.org/issue34731
- """
- path = PurePath(path)
- iswin32 = sys.platform.startswith("win")
-
- if iswin32 and sep not in pattern and posix_sep in pattern:
- # Running on Windows, the pattern has no Windows path separators,
- # and the pattern has one or more Posix path separators. Replace
- # the Posix path separators with the Windows path separator.
- pattern = pattern.replace(posix_sep, sep)
-
- if sep not in pattern:
- name = path.name
- else:
+
+ References:
+ * https://bugs.python.org/issue29249
+ * https://bugs.python.org/issue34731
+ """
+ path = PurePath(path)
+ iswin32 = sys.platform.startswith("win")
+
+ if iswin32 and sep not in pattern and posix_sep in pattern:
+ # Running on Windows, the pattern has no Windows path separators,
+ # and the pattern has one or more Posix path separators. Replace
+ # the Posix path separators with the Windows path separator.
+ pattern = pattern.replace(posix_sep, sep)
+
+ if sep not in pattern:
+ name = path.name
+ else:
name = str(path)
if path.is_absolute() and not os.path.isabs(pattern):
pattern = f"*{os.sep}{pattern}"
- return fnmatch.fnmatch(name, pattern)
-
-
+ return fnmatch.fnmatch(name, pattern)
+
+
def parts(s: str) -> Set[str]:
- parts = s.split(sep)
- return {sep.join(parts[: i + 1]) or sep for i in range(len(parts))}
+ parts = s.split(sep)
+ return {sep.join(parts[: i + 1]) or sep for i in range(len(parts))}
def symlink_or_skip(src, dst, **kwargs):