diff options
author | shadchin <shadchin@yandex-team.ru> | 2022-02-10 16:44:30 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:44:30 +0300 |
commit | 2598ef1d0aee359b4b6d5fdd1758916d5907d04f (patch) | |
tree | 012bb94d777798f1f56ac1cec429509766d05181 /contrib/python/pytest/py3/_pytest/pathlib.py | |
parent | 6751af0b0c1b952fede40b19b71da8025b5d8bcf (diff) | |
download | ydb-2598ef1d0aee359b4b6d5fdd1758916d5907d04f.tar.gz |
Restoring authorship annotation for <shadchin@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/python/pytest/py3/_pytest/pathlib.py')
-rw-r--r-- | contrib/python/pytest/py3/_pytest/pathlib.py | 948 |
1 files changed, 474 insertions, 474 deletions
diff --git a/contrib/python/pytest/py3/_pytest/pathlib.py b/contrib/python/pytest/py3/_pytest/pathlib.py index 7d9269a1855..2f56c249d4c 100644 --- a/contrib/python/pytest/py3/_pytest/pathlib.py +++ b/contrib/python/pytest/py3/_pytest/pathlib.py @@ -1,200 +1,200 @@ import atexit -import contextlib +import contextlib import fnmatch -import importlib.util +import importlib.util import itertools import os import shutil import sys import uuid -import warnings -from enum import Enum -from errno import EBADF -from errno import ELOOP -from errno import ENOENT -from errno import ENOTDIR -from functools import partial +import warnings +from enum import Enum +from errno import EBADF +from errno import ELOOP +from errno import ENOENT +from errno import ENOTDIR +from functools import partial from os.path import expanduser from os.path import expandvars from os.path import isabs from os.path import sep -from pathlib import Path -from pathlib import PurePath +from pathlib import Path +from pathlib import PurePath from posixpath import sep as posix_sep -from types import ModuleType -from typing import Callable -from typing import Iterable -from typing import Iterator -from typing import Optional -from typing import Set -from typing import TypeVar -from typing import Union - -import py - -from _pytest.compat import assert_never -from _pytest.outcomes import skip -from _pytest.warning_types import PytestWarning - -LOCK_TIMEOUT = 60 * 60 * 24 * 3 - - -_AnyPurePath = TypeVar("_AnyPurePath", bound=PurePath) - -# The following function, variables and comments were -# copied from cpython 3.9 Lib/pathlib.py file. - -# EBADF - guard against macOS `stat` throwing EBADF -_IGNORED_ERRORS = (ENOENT, ENOTDIR, EBADF, ELOOP) - -_IGNORED_WINERRORS = ( - 21, # ERROR_NOT_READY - drive exists but is not accessible - 1921, # ERROR_CANT_RESOLVE_FILENAME - fix for broken symlink pointing to itself -) - - -def _ignore_error(exception): - return ( - getattr(exception, "errno", None) in _IGNORED_ERRORS - or getattr(exception, "winerror", None) in _IGNORED_WINERRORS - ) - - -def get_lock_path(path: _AnyPurePath) -> _AnyPurePath: - return path.joinpath(".lock") - - -def on_rm_rf_error(func, path: str, exc, *, start_path: Path) -> bool: - """Handle known read-only errors during rmtree. - - The returned value is used only by our own tests. - """ - exctype, excvalue = exc[:2] - - # Another process removed the file in the middle of the "rm_rf" (xdist for example). - # More context: https://github.com/pytest-dev/pytest/issues/5974#issuecomment-543799018 - if isinstance(excvalue, FileNotFoundError): - return False - - if not isinstance(excvalue, PermissionError): - warnings.warn( - PytestWarning(f"(rm_rf) error removing {path}\n{exctype}: {excvalue}") - ) - return False - - if func not in (os.rmdir, os.remove, os.unlink): - if func not in (os.open,): - warnings.warn( - PytestWarning( - "(rm_rf) unknown function {} when removing {}:\n{}: {}".format( - func, path, exctype, excvalue - ) - ) - ) - return False - - # Chmod + retry. - import stat - - def chmod_rw(p: str) -> None: - mode = os.stat(p).st_mode - os.chmod(p, mode | stat.S_IRUSR | stat.S_IWUSR) - - # For files, we need to recursively go upwards in the directories to - # ensure they all are also writable. - p = Path(path) - if p.is_file(): - for parent in p.parents: - chmod_rw(str(parent)) - # Stop when we reach the original path passed to rm_rf. - if parent == start_path: - break - chmod_rw(str(path)) - - func(path) - return True - - -def ensure_extended_length_path(path: Path) -> Path: - """Get the extended-length version of a path (Windows). - - On Windows, by default, the maximum length of a path (MAX_PATH) is 260 - characters, and operations on paths longer than that fail. But it is possible - to overcome this by converting the path to "extended-length" form before - performing the operation: - https://docs.microsoft.com/en-us/windows/win32/fileio/naming-a-file#maximum-path-length-limitation - - On Windows, this function returns the extended-length absolute version of path. - On other platforms it returns path unchanged. - """ - if sys.platform.startswith("win32"): - path = path.resolve() - path = Path(get_extended_length_path_str(str(path))) - return path - - -def get_extended_length_path_str(path: str) -> str: - """Convert a path to a Windows extended length path.""" - long_path_prefix = "\\\\?\\" - unc_long_path_prefix = "\\\\?\\UNC\\" - if path.startswith((long_path_prefix, unc_long_path_prefix)): - return path - # UNC - if path.startswith("\\\\"): - return unc_long_path_prefix + path[2:] - return long_path_prefix + path - - -def rm_rf(path: Path) -> None: - """Remove the path contents recursively, even if some elements - are read-only.""" - path = ensure_extended_length_path(path) - onerror = partial(on_rm_rf_error, start_path=path) - shutil.rmtree(str(path), onerror=onerror) - - -def find_prefixed(root: Path, prefix: str) -> Iterator[Path]: - """Find all elements in root that begin with the prefix, case insensitive.""" +from types import ModuleType +from typing import Callable +from typing import Iterable +from typing import Iterator +from typing import Optional +from typing import Set +from typing import TypeVar +from typing import Union + +import py + +from _pytest.compat import assert_never +from _pytest.outcomes import skip +from _pytest.warning_types import PytestWarning + +LOCK_TIMEOUT = 60 * 60 * 24 * 3 + + +_AnyPurePath = TypeVar("_AnyPurePath", bound=PurePath) + +# The following function, variables and comments were +# copied from cpython 3.9 Lib/pathlib.py file. + +# EBADF - guard against macOS `stat` throwing EBADF +_IGNORED_ERRORS = (ENOENT, ENOTDIR, EBADF, ELOOP) + +_IGNORED_WINERRORS = ( + 21, # ERROR_NOT_READY - drive exists but is not accessible + 1921, # ERROR_CANT_RESOLVE_FILENAME - fix for broken symlink pointing to itself +) + + +def _ignore_error(exception): + return ( + getattr(exception, "errno", None) in _IGNORED_ERRORS + or getattr(exception, "winerror", None) in _IGNORED_WINERRORS + ) + + +def get_lock_path(path: _AnyPurePath) -> _AnyPurePath: + return path.joinpath(".lock") + + +def on_rm_rf_error(func, path: str, exc, *, start_path: Path) -> bool: + """Handle known read-only errors during rmtree. + + The returned value is used only by our own tests. + """ + exctype, excvalue = exc[:2] + + # Another process removed the file in the middle of the "rm_rf" (xdist for example). + # More context: https://github.com/pytest-dev/pytest/issues/5974#issuecomment-543799018 + if isinstance(excvalue, FileNotFoundError): + return False + + if not isinstance(excvalue, PermissionError): + warnings.warn( + PytestWarning(f"(rm_rf) error removing {path}\n{exctype}: {excvalue}") + ) + return False + + if func not in (os.rmdir, os.remove, os.unlink): + if func not in (os.open,): + warnings.warn( + PytestWarning( + "(rm_rf) unknown function {} when removing {}:\n{}: {}".format( + func, path, exctype, excvalue + ) + ) + ) + return False + + # Chmod + retry. + import stat + + def chmod_rw(p: str) -> None: + mode = os.stat(p).st_mode + os.chmod(p, mode | stat.S_IRUSR | stat.S_IWUSR) + + # For files, we need to recursively go upwards in the directories to + # ensure they all are also writable. + p = Path(path) + if p.is_file(): + for parent in p.parents: + chmod_rw(str(parent)) + # Stop when we reach the original path passed to rm_rf. + if parent == start_path: + break + chmod_rw(str(path)) + + func(path) + return True + + +def ensure_extended_length_path(path: Path) -> Path: + """Get the extended-length version of a path (Windows). + + On Windows, by default, the maximum length of a path (MAX_PATH) is 260 + characters, and operations on paths longer than that fail. But it is possible + to overcome this by converting the path to "extended-length" form before + performing the operation: + https://docs.microsoft.com/en-us/windows/win32/fileio/naming-a-file#maximum-path-length-limitation + + On Windows, this function returns the extended-length absolute version of path. + On other platforms it returns path unchanged. + """ + if sys.platform.startswith("win32"): + path = path.resolve() + path = Path(get_extended_length_path_str(str(path))) + return path + + +def get_extended_length_path_str(path: str) -> str: + """Convert a path to a Windows extended length path.""" + long_path_prefix = "\\\\?\\" + unc_long_path_prefix = "\\\\?\\UNC\\" + if path.startswith((long_path_prefix, unc_long_path_prefix)): + return path + # UNC + if path.startswith("\\\\"): + return unc_long_path_prefix + path[2:] + return long_path_prefix + path + + +def rm_rf(path: Path) -> None: + """Remove the path contents recursively, even if some elements + are read-only.""" + path = ensure_extended_length_path(path) + onerror = partial(on_rm_rf_error, start_path=path) + shutil.rmtree(str(path), onerror=onerror) + + +def find_prefixed(root: Path, prefix: str) -> Iterator[Path]: + """Find all elements in root that begin with the prefix, case insensitive.""" l_prefix = prefix.lower() for x in root.iterdir(): if x.name.lower().startswith(l_prefix): yield x -def extract_suffixes(iter: Iterable[PurePath], prefix: str) -> Iterator[str]: - """Return the parts of the paths following the prefix. - - :param iter: Iterator over path names. - :param prefix: Expected prefix of the path names. +def extract_suffixes(iter: Iterable[PurePath], prefix: str) -> Iterator[str]: + """Return the parts of the paths following the prefix. + + :param iter: Iterator over path names. + :param prefix: Expected prefix of the path names. """ p_len = len(prefix) for p in iter: yield p.name[p_len:] -def find_suffixes(root: Path, prefix: str) -> Iterator[str]: - """Combine find_prefixes and extract_suffixes.""" +def find_suffixes(root: Path, prefix: str) -> Iterator[str]: + """Combine find_prefixes and extract_suffixes.""" return extract_suffixes(find_prefixed(root, prefix), prefix) -def parse_num(maybe_num) -> int: - """Parse number path suffixes, returns -1 on error.""" +def parse_num(maybe_num) -> int: + """Parse number path suffixes, returns -1 on error.""" try: return int(maybe_num) except ValueError: return -1 -def _force_symlink( - root: Path, target: Union[str, PurePath], link_to: Union[str, Path] -) -> None: - """Helper to create the current symlink. +def _force_symlink( + root: Path, target: Union[str, PurePath], link_to: Union[str, Path] +) -> None: + """Helper to create the current symlink. - It's full of race conditions that are reasonably OK to ignore - for the context of best effort linking to the latest test run. + It's full of race conditions that are reasonably OK to ignore + for the context of best effort linking to the latest test run. - The presumption being that in case of much parallelism - the inaccuracy is going to be acceptable. + The presumption being that in case of much parallelism + the inaccuracy is going to be acceptable. """ current_symlink = root.joinpath(target) try: @@ -207,126 +207,126 @@ def _force_symlink( pass -def make_numbered_dir(root: Path, prefix: str, mode: int = 0o700) -> Path: - """Create a directory with an increased number as suffix for the given prefix.""" +def make_numbered_dir(root: Path, prefix: str, mode: int = 0o700) -> Path: + """Create a directory with an increased number as suffix for the given prefix.""" for i in range(10): # try up to 10 times to create the folder - max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1) + max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1) new_number = max_existing + 1 - new_path = root.joinpath(f"{prefix}{new_number}") + new_path = root.joinpath(f"{prefix}{new_number}") try: - new_path.mkdir(mode=mode) + new_path.mkdir(mode=mode) except Exception: pass else: _force_symlink(root, prefix + "current", new_path) return new_path else: - raise OSError( + raise OSError( "could not create numbered dir with prefix " "{prefix} in {root} after 10 tries".format(prefix=prefix, root=root) ) -def create_cleanup_lock(p: Path) -> Path: - """Create a lock to prevent premature folder cleanup.""" +def create_cleanup_lock(p: Path) -> Path: + """Create a lock to prevent premature folder cleanup.""" lock_path = get_lock_path(p) try: fd = os.open(str(lock_path), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644) - except FileExistsError as e: - raise OSError(f"cannot create lockfile in {p}") from e + except FileExistsError as e: + raise OSError(f"cannot create lockfile in {p}") from e else: pid = os.getpid() - spid = str(pid).encode() + spid = str(pid).encode() os.write(fd, spid) os.close(fd) if not lock_path.is_file(): - raise OSError("lock path got renamed after successful creation") + raise OSError("lock path got renamed after successful creation") return lock_path -def register_cleanup_lock_removal(lock_path: Path, register=atexit.register): - """Register a cleanup function for removing a lock, by default on atexit.""" +def register_cleanup_lock_removal(lock_path: Path, register=atexit.register): + """Register a cleanup function for removing a lock, by default on atexit.""" pid = os.getpid() - def cleanup_on_exit(lock_path: Path = lock_path, original_pid: int = pid) -> None: + def cleanup_on_exit(lock_path: Path = lock_path, original_pid: int = pid) -> None: current_pid = os.getpid() if current_pid != original_pid: # fork return try: lock_path.unlink() - except OSError: + except OSError: pass return register(cleanup_on_exit) -def maybe_delete_a_numbered_dir(path: Path) -> None: - """Remove a numbered directory if its lock can be obtained and it does - not seem to be in use.""" - path = ensure_extended_length_path(path) +def maybe_delete_a_numbered_dir(path: Path) -> None: + """Remove a numbered directory if its lock can be obtained and it does + not seem to be in use.""" + path = ensure_extended_length_path(path) lock_path = None try: lock_path = create_cleanup_lock(path) parent = path.parent - garbage = parent.joinpath(f"garbage-{uuid.uuid4()}") + garbage = parent.joinpath(f"garbage-{uuid.uuid4()}") path.rename(garbage) - rm_rf(garbage) - except OSError: + rm_rf(garbage) + except OSError: # known races: # * other process did a cleanup at the same time # * deletable folder was found # * process cwd (Windows) return finally: - # If we created the lock, ensure we remove it even if we failed - # to properly remove the numbered dir. + # If we created the lock, ensure we remove it even if we failed + # to properly remove the numbered dir. if lock_path is not None: try: lock_path.unlink() - except OSError: + except OSError: pass -def ensure_deletable(path: Path, consider_lock_dead_if_created_before: float) -> bool: - """Check if `path` is deletable based on whether the lock file is expired.""" +def ensure_deletable(path: Path, consider_lock_dead_if_created_before: float) -> bool: + """Check if `path` is deletable based on whether the lock file is expired.""" if path.is_symlink(): return False lock = get_lock_path(path) try: - if not lock.is_file(): - return True - except OSError: - # we might not have access to the lock file at all, in this case assume - # we don't have access to the entire directory (#7491). - return False - try: + if not lock.is_file(): + return True + except OSError: + # we might not have access to the lock file at all, in this case assume + # we don't have access to the entire directory (#7491). + return False + try: lock_time = lock.stat().st_mtime except Exception: return False else: if lock_time < consider_lock_dead_if_created_before: - # We want to ignore any errors while trying to remove the lock such as: - # - PermissionDenied, like the file permissions have changed since the lock creation; - # - FileNotFoundError, in case another pytest process got here first; - # and any other cause of failure. - with contextlib.suppress(OSError): - lock.unlink() - return True - return False - - -def try_cleanup(path: Path, consider_lock_dead_if_created_before: float) -> None: - """Try to cleanup a folder if we can ensure it's deletable.""" + # We want to ignore any errors while trying to remove the lock such as: + # - PermissionDenied, like the file permissions have changed since the lock creation; + # - FileNotFoundError, in case another pytest process got here first; + # and any other cause of failure. + with contextlib.suppress(OSError): + lock.unlink() + return True + return False + + +def try_cleanup(path: Path, consider_lock_dead_if_created_before: float) -> None: + """Try to cleanup a folder if we can ensure it's deletable.""" if ensure_deletable(path, consider_lock_dead_if_created_before): maybe_delete_a_numbered_dir(path) -def cleanup_candidates(root: Path, prefix: str, keep: int) -> Iterator[Path]: - """List candidates for numbered directories to be removed - follows py.path.""" - max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1) +def cleanup_candidates(root: Path, prefix: str, keep: int) -> Iterator[Path]: + """List candidates for numbered directories to be removed - follows py.path.""" + max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1) max_delete = max_existing - keep paths = find_prefixed(root, prefix) paths, paths2 = itertools.tee(paths) @@ -336,65 +336,65 @@ def cleanup_candidates(root: Path, prefix: str, keep: int) -> Iterator[Path]: yield path -def cleanup_numbered_dir( - root: Path, prefix: str, keep: int, consider_lock_dead_if_created_before: float -) -> None: - """Cleanup for lock driven numbered directories.""" +def cleanup_numbered_dir( + root: Path, prefix: str, keep: int, consider_lock_dead_if_created_before: float +) -> None: + """Cleanup for lock driven numbered directories.""" for path in cleanup_candidates(root, prefix, keep): try_cleanup(path, consider_lock_dead_if_created_before) for path in root.glob("garbage-*"): try_cleanup(path, consider_lock_dead_if_created_before) -def make_numbered_dir_with_cleanup( - root: Path, prefix: str, keep: int, lock_timeout: float, mode: int, -) -> Path: - """Create a numbered dir with a cleanup lock and remove old ones.""" +def make_numbered_dir_with_cleanup( + root: Path, prefix: str, keep: int, lock_timeout: float, mode: int, +) -> Path: + """Create a numbered dir with a cleanup lock and remove old ones.""" e = None for i in range(10): try: - p = make_numbered_dir(root, prefix, mode) + p = make_numbered_dir(root, prefix, mode) lock_path = create_cleanup_lock(p) register_cleanup_lock_removal(lock_path) except Exception as exc: e = exc else: consider_lock_dead_if_created_before = p.stat().st_mtime - lock_timeout - # Register a cleanup for program exit - atexit.register( - cleanup_numbered_dir, - root, - prefix, - keep, - consider_lock_dead_if_created_before, + # Register a cleanup for program exit + atexit.register( + cleanup_numbered_dir, + root, + prefix, + keep, + consider_lock_dead_if_created_before, ) return p assert e is not None raise e -def resolve_from_str(input: str, rootpath: Path) -> Path: +def resolve_from_str(input: str, rootpath: Path) -> Path: input = expanduser(input) input = expandvars(input) if isabs(input): return Path(input) else: - return rootpath.joinpath(input) + return rootpath.joinpath(input) -def fnmatch_ex(pattern: str, path) -> bool: - """A port of FNMatcher from py.path.common which works with PurePath() instances. +def fnmatch_ex(pattern: str, path) -> bool: + """A port of FNMatcher from py.path.common which works with PurePath() instances. - The difference between this algorithm and PurePath.match() is that the - latter matches "**" glob expressions for each part of the path, while - this algorithm uses the whole path instead. + The difference between this algorithm and PurePath.match() is that the + latter matches "**" glob expressions for each part of the path, while + this algorithm uses the whole path instead. For example: - "tests/foo/bar/doc/test_foo.py" matches pattern "tests/**/doc/test*.py" - with this algorithm, but not with PurePath.match(). + "tests/foo/bar/doc/test_foo.py" matches pattern "tests/**/doc/test*.py" + with this algorithm, but not with PurePath.match(). - This algorithm was ported to keep backward-compatibility with existing - settings which assume paths match according this logic. + This algorithm was ported to keep backward-compatibility with existing + settings which assume paths match according this logic. References: * https://bugs.python.org/issue29249 @@ -412,243 +412,243 @@ def fnmatch_ex(pattern: str, path) -> bool: if sep not in pattern: name = path.name else: - name = str(path) - if path.is_absolute() and not os.path.isabs(pattern): - pattern = f"*{os.sep}{pattern}" + name = str(path) + if path.is_absolute() and not os.path.isabs(pattern): + pattern = f"*{os.sep}{pattern}" return fnmatch.fnmatch(name, pattern) -def parts(s: str) -> Set[str]: +def parts(s: str) -> Set[str]: parts = s.split(sep) return {sep.join(parts[: i + 1]) or sep for i in range(len(parts))} - - -def symlink_or_skip(src, dst, **kwargs): - """Make a symlink, or skip the test in case symlinks are not supported.""" - try: - os.symlink(str(src), str(dst), **kwargs) - except OSError as e: - skip(f"symlinks not supported: {e}") - - -class ImportMode(Enum): - """Possible values for `mode` parameter of `import_path`.""" - - prepend = "prepend" - append = "append" - importlib = "importlib" - - -class ImportPathMismatchError(ImportError): - """Raised on import_path() if there is a mismatch of __file__'s. - - This can happen when `import_path` is called multiple times with different filenames that has - the same basename but reside in packages - (for example "/tests1/test_foo.py" and "/tests2/test_foo.py"). - """ - - -def import_path( - p: Union[str, py.path.local, Path], - *, - mode: Union[str, ImportMode] = ImportMode.prepend, -) -> ModuleType: - """Import and return a module from the given path, which can be a file (a module) or - a directory (a package). - - The import mechanism used is controlled by the `mode` parameter: - - * `mode == ImportMode.prepend`: the directory containing the module (or package, taking - `__init__.py` files into account) will be put at the *start* of `sys.path` before - being imported with `__import__. - - * `mode == ImportMode.append`: same as `prepend`, but the directory will be appended - to the end of `sys.path`, if not already in `sys.path`. - - * `mode == ImportMode.importlib`: uses more fine control mechanisms provided by `importlib` - to import the module, which avoids having to use `__import__` and muck with `sys.path` - at all. It effectively allows having same-named test modules in different places. - - :raises ImportPathMismatchError: - If after importing the given `path` and the module `__file__` - are different. Only raised in `prepend` and `append` modes. - """ - mode = ImportMode(mode) - - path = Path(str(p)) - - if not path.exists(): - raise ImportError(path) - - if mode is ImportMode.importlib: - module_name = path.stem - - for meta_importer in sys.meta_path: - spec = meta_importer.find_spec(module_name, [str(path.parent)]) - if spec is not None: - break - else: - spec = importlib.util.spec_from_file_location(module_name, str(path)) - - if spec is None: - raise ImportError( - "Can't find module {} at location {}".format(module_name, str(path)) - ) - mod = importlib.util.module_from_spec(spec) - spec.loader.exec_module(mod) # type: ignore[union-attr] - return mod - - pkg_path = resolve_package_path(path) - if pkg_path is not None: - pkg_root = pkg_path.parent - names = list(path.with_suffix("").relative_to(pkg_root).parts) - if names[-1] == "__init__": - names.pop() - module_name = ".".join(names) - else: - pkg_root = path.parent - module_name = path.stem - - # Change sys.path permanently: restoring it at the end of this function would cause surprising - # problems because of delayed imports: for example, a conftest.py file imported by this function - # might have local imports, which would fail at runtime if we restored sys.path. - if mode is ImportMode.append: - if str(pkg_root) not in sys.path: - sys.path.append(str(pkg_root)) - elif mode is ImportMode.prepend: - if str(pkg_root) != sys.path[0]: - sys.path.insert(0, str(pkg_root)) - else: - assert_never(mode) - - importlib.import_module(module_name) - - mod = sys.modules[module_name] - if path.name == "__init__.py": - return mod - - ignore = os.environ.get("PY_IGNORE_IMPORTMISMATCH", "") - if ignore != "1": - module_file = mod.__file__ - if module_file.endswith((".pyc", ".pyo")): - module_file = module_file[:-1] - if module_file.endswith(os.path.sep + "__init__.py"): - module_file = module_file[: -(len(os.path.sep + "__init__.py"))] - - try: - is_same = _is_same(str(path), module_file) - except FileNotFoundError: - is_same = False - - if not is_same: - raise ImportPathMismatchError(module_name, module_file, path) - - return mod - - -# Implement a special _is_same function on Windows which returns True if the two filenames -# compare equal, to circumvent os.path.samefile returning False for mounts in UNC (#7678). -if sys.platform.startswith("win"): - - def _is_same(f1: str, f2: str) -> bool: - return Path(f1) == Path(f2) or os.path.samefile(f1, f2) - - -else: - - def _is_same(f1: str, f2: str) -> bool: - return os.path.samefile(f1, f2) - - -def resolve_package_path(path: Path) -> Optional[Path]: - """Return the Python package path by looking for the last - directory upwards which still contains an __init__.py. - - Returns None if it can not be determined. - """ - result = None - for parent in itertools.chain((path,), path.parents): - if parent.is_dir(): - if not parent.joinpath("__init__.py").is_file(): - break - if not parent.name.isidentifier(): - break - result = parent - return result - - -def visit( - path: str, recurse: Callable[["os.DirEntry[str]"], bool] -) -> Iterator["os.DirEntry[str]"]: - """Walk a directory recursively, in breadth-first order. - - Entries at each directory level are sorted. - """ - - # Skip entries with symlink loops and other brokenness, so the caller doesn't - # have to deal with it. - entries = [] - for entry in os.scandir(path): - try: - entry.is_file() - except OSError as err: - if _ignore_error(err): - continue - raise - entries.append(entry) - - entries.sort(key=lambda entry: entry.name) - - yield from entries - - for entry in entries: - if entry.is_dir() and recurse(entry): - yield from visit(entry.path, recurse) - - -def absolutepath(path: Union[Path, str]) -> Path: - """Convert a path to an absolute path using os.path.abspath. - - Prefer this over Path.resolve() (see #6523). - Prefer this over Path.absolute() (not public, doesn't normalize). - """ - return Path(os.path.abspath(str(path))) - - -def commonpath(path1: Path, path2: Path) -> Optional[Path]: - """Return the common part shared with the other path, or None if there is - no common part. - - If one path is relative and one is absolute, returns None. - """ - try: - return Path(os.path.commonpath((str(path1), str(path2)))) - except ValueError: - return None - - -def bestrelpath(directory: Path, dest: Path) -> str: - """Return a string which is a relative path from directory to dest such - that directory/bestrelpath == dest. - - The paths must be either both absolute or both relative. - - If no such path can be determined, returns dest. - """ - if dest == directory: - return os.curdir - # Find the longest common directory. - base = commonpath(directory, dest) - # Can be the case on Windows for two absolute paths on different drives. - # Can be the case for two relative paths without common prefix. - # Can be the case for a relative path and an absolute path. - if not base: - return str(dest) - reldirectory = directory.relative_to(base) - reldest = dest.relative_to(base) - return os.path.join( - # Back from directory to base. - *([os.pardir] * len(reldirectory.parts)), - # Forward from base to dest. - *reldest.parts, - ) + + +def symlink_or_skip(src, dst, **kwargs): + """Make a symlink, or skip the test in case symlinks are not supported.""" + try: + os.symlink(str(src), str(dst), **kwargs) + except OSError as e: + skip(f"symlinks not supported: {e}") + + +class ImportMode(Enum): + """Possible values for `mode` parameter of `import_path`.""" + + prepend = "prepend" + append = "append" + importlib = "importlib" + + +class ImportPathMismatchError(ImportError): + """Raised on import_path() if there is a mismatch of __file__'s. + + This can happen when `import_path` is called multiple times with different filenames that has + the same basename but reside in packages + (for example "/tests1/test_foo.py" and "/tests2/test_foo.py"). + """ + + +def import_path( + p: Union[str, py.path.local, Path], + *, + mode: Union[str, ImportMode] = ImportMode.prepend, +) -> ModuleType: + """Import and return a module from the given path, which can be a file (a module) or + a directory (a package). + + The import mechanism used is controlled by the `mode` parameter: + + * `mode == ImportMode.prepend`: the directory containing the module (or package, taking + `__init__.py` files into account) will be put at the *start* of `sys.path` before + being imported with `__import__. + + * `mode == ImportMode.append`: same as `prepend`, but the directory will be appended + to the end of `sys.path`, if not already in `sys.path`. + + * `mode == ImportMode.importlib`: uses more fine control mechanisms provided by `importlib` + to import the module, which avoids having to use `__import__` and muck with `sys.path` + at all. It effectively allows having same-named test modules in different places. + + :raises ImportPathMismatchError: + If after importing the given `path` and the module `__file__` + are different. Only raised in `prepend` and `append` modes. + """ + mode = ImportMode(mode) + + path = Path(str(p)) + + if not path.exists(): + raise ImportError(path) + + if mode is ImportMode.importlib: + module_name = path.stem + + for meta_importer in sys.meta_path: + spec = meta_importer.find_spec(module_name, [str(path.parent)]) + if spec is not None: + break + else: + spec = importlib.util.spec_from_file_location(module_name, str(path)) + + if spec is None: + raise ImportError( + "Can't find module {} at location {}".format(module_name, str(path)) + ) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) # type: ignore[union-attr] + return mod + + pkg_path = resolve_package_path(path) + if pkg_path is not None: + pkg_root = pkg_path.parent + names = list(path.with_suffix("").relative_to(pkg_root).parts) + if names[-1] == "__init__": + names.pop() + module_name = ".".join(names) + else: + pkg_root = path.parent + module_name = path.stem + + # Change sys.path permanently: restoring it at the end of this function would cause surprising + # problems because of delayed imports: for example, a conftest.py file imported by this function + # might have local imports, which would fail at runtime if we restored sys.path. + if mode is ImportMode.append: + if str(pkg_root) not in sys.path: + sys.path.append(str(pkg_root)) + elif mode is ImportMode.prepend: + if str(pkg_root) != sys.path[0]: + sys.path.insert(0, str(pkg_root)) + else: + assert_never(mode) + + importlib.import_module(module_name) + + mod = sys.modules[module_name] + if path.name == "__init__.py": + return mod + + ignore = os.environ.get("PY_IGNORE_IMPORTMISMATCH", "") + if ignore != "1": + module_file = mod.__file__ + if module_file.endswith((".pyc", ".pyo")): + module_file = module_file[:-1] + if module_file.endswith(os.path.sep + "__init__.py"): + module_file = module_file[: -(len(os.path.sep + "__init__.py"))] + + try: + is_same = _is_same(str(path), module_file) + except FileNotFoundError: + is_same = False + + if not is_same: + raise ImportPathMismatchError(module_name, module_file, path) + + return mod + + +# Implement a special _is_same function on Windows which returns True if the two filenames +# compare equal, to circumvent os.path.samefile returning False for mounts in UNC (#7678). +if sys.platform.startswith("win"): + + def _is_same(f1: str, f2: str) -> bool: + return Path(f1) == Path(f2) or os.path.samefile(f1, f2) + + +else: + + def _is_same(f1: str, f2: str) -> bool: + return os.path.samefile(f1, f2) + + +def resolve_package_path(path: Path) -> Optional[Path]: + """Return the Python package path by looking for the last + directory upwards which still contains an __init__.py. + + Returns None if it can not be determined. + """ + result = None + for parent in itertools.chain((path,), path.parents): + if parent.is_dir(): + if not parent.joinpath("__init__.py").is_file(): + break + if not parent.name.isidentifier(): + break + result = parent + return result + + +def visit( + path: str, recurse: Callable[["os.DirEntry[str]"], bool] +) -> Iterator["os.DirEntry[str]"]: + """Walk a directory recursively, in breadth-first order. + + Entries at each directory level are sorted. + """ + + # Skip entries with symlink loops and other brokenness, so the caller doesn't + # have to deal with it. + entries = [] + for entry in os.scandir(path): + try: + entry.is_file() + except OSError as err: + if _ignore_error(err): + continue + raise + entries.append(entry) + + entries.sort(key=lambda entry: entry.name) + + yield from entries + + for entry in entries: + if entry.is_dir() and recurse(entry): + yield from visit(entry.path, recurse) + + +def absolutepath(path: Union[Path, str]) -> Path: + """Convert a path to an absolute path using os.path.abspath. + + Prefer this over Path.resolve() (see #6523). + Prefer this over Path.absolute() (not public, doesn't normalize). + """ + return Path(os.path.abspath(str(path))) + + +def commonpath(path1: Path, path2: Path) -> Optional[Path]: + """Return the common part shared with the other path, or None if there is + no common part. + + If one path is relative and one is absolute, returns None. + """ + try: + return Path(os.path.commonpath((str(path1), str(path2)))) + except ValueError: + return None + + +def bestrelpath(directory: Path, dest: Path) -> str: + """Return a string which is a relative path from directory to dest such + that directory/bestrelpath == dest. + + The paths must be either both absolute or both relative. + + If no such path can be determined, returns dest. + """ + if dest == directory: + return os.curdir + # Find the longest common directory. + base = commonpath(directory, dest) + # Can be the case on Windows for two absolute paths on different drives. + # Can be the case for two relative paths without common prefix. + # Can be the case for a relative path and an absolute path. + if not base: + return str(dest) + reldirectory = directory.relative_to(base) + reldest = dest.relative_to(base) + return os.path.join( + # Back from directory to base. + *([os.pardir] * len(reldirectory.parts)), + # Forward from base to dest. + *reldest.parts, + ) |