diff options
author | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
---|---|---|
committer | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
commit | 06e5c21a835c0e923506c4ff27929f34e00761c2 (patch) | |
tree | 75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /contrib/python/pytest/py3/_pytest/cacheprovider.py | |
parent | 03f024c4412e3aa613bb543cf1660176320ba8f4 (diff) | |
download | ydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz |
fix ya.make
Diffstat (limited to 'contrib/python/pytest/py3/_pytest/cacheprovider.py')
-rw-r--r-- | contrib/python/pytest/py3/_pytest/cacheprovider.py | 580 |
1 files changed, 0 insertions, 580 deletions
diff --git a/contrib/python/pytest/py3/_pytest/cacheprovider.py b/contrib/python/pytest/py3/_pytest/cacheprovider.py deleted file mode 100644 index 681d02b409..0000000000 --- a/contrib/python/pytest/py3/_pytest/cacheprovider.py +++ /dev/null @@ -1,580 +0,0 @@ -"""Implementation of the cache provider.""" -# This plugin was not named "cache" to avoid conflicts with the external -# pytest-cache version. -import json -import os -from pathlib import Path -from typing import Dict -from typing import Generator -from typing import Iterable -from typing import List -from typing import Optional -from typing import Set -from typing import Union - -import attr - -from .pathlib import resolve_from_str -from .pathlib import rm_rf -from .reports import CollectReport -from _pytest import nodes -from _pytest._io import TerminalWriter -from _pytest.compat import final -from _pytest.config import Config -from _pytest.config import ExitCode -from _pytest.config import hookimpl -from _pytest.config.argparsing import Parser -from _pytest.deprecated import check_ispytest -from _pytest.fixtures import fixture -from _pytest.fixtures import FixtureRequest -from _pytest.main import Session -from _pytest.python import Module -from _pytest.python import Package -from _pytest.reports import TestReport - - -README_CONTENT = """\ -# pytest cache directory # - -This directory contains data from the pytest's cache plugin, -which provides the `--lf` and `--ff` options, as well as the `cache` fixture. - -**Do not** commit this to version control. - -See [the docs](https://docs.pytest.org/en/stable/how-to/cache.html) for more information. -""" - -CACHEDIR_TAG_CONTENT = b"""\ -Signature: 8a477f597d28d172789f06886806bc55 -# This file is a cache directory tag created by pytest. -# For information about cache directory tags, see: -# https://bford.info/cachedir/spec.html -""" - - -@final -@attr.s(init=False, auto_attribs=True) -class Cache: - _cachedir: Path = attr.ib(repr=False) - _config: Config = attr.ib(repr=False) - - # Sub-directory under cache-dir for directories created by `mkdir()`. - _CACHE_PREFIX_DIRS = "d" - - # Sub-directory under cache-dir for values created by `set()`. - _CACHE_PREFIX_VALUES = "v" - - def __init__( - self, cachedir: Path, config: Config, *, _ispytest: bool = False - ) -> None: - check_ispytest(_ispytest) - self._cachedir = cachedir - self._config = config - - @classmethod - def for_config(cls, config: Config, *, _ispytest: bool = False) -> "Cache": - """Create the Cache instance for a Config. - - :meta private: - """ - check_ispytest(_ispytest) - cachedir = cls.cache_dir_from_config(config, _ispytest=True) - if config.getoption("cacheclear") and cachedir.is_dir(): - cls.clear_cache(cachedir, _ispytest=True) - return cls(cachedir, config, _ispytest=True) - - @classmethod - def clear_cache(cls, cachedir: Path, _ispytest: bool = False) -> None: - """Clear the sub-directories used to hold cached directories and values. - - :meta private: - """ - check_ispytest(_ispytest) - for prefix in (cls._CACHE_PREFIX_DIRS, cls._CACHE_PREFIX_VALUES): - d = cachedir / prefix - if d.is_dir(): - rm_rf(d) - - @staticmethod - def cache_dir_from_config(config: Config, *, _ispytest: bool = False) -> Path: - """Get the path to the cache directory for a Config. - - :meta private: - """ - check_ispytest(_ispytest) - return resolve_from_str(config.getini("cache_dir"), config.rootpath) - - def warn(self, fmt: str, *, _ispytest: bool = False, **args: object) -> None: - """Issue a cache warning. - - :meta private: - """ - check_ispytest(_ispytest) - import warnings - from _pytest.warning_types import PytestCacheWarning - - warnings.warn( - PytestCacheWarning(fmt.format(**args) if args else fmt), - self._config.hook, - stacklevel=3, - ) - - def mkdir(self, name: str) -> Path: - """Return a directory path object with the given name. - - If the directory does not yet exist, it will be created. You can use - it to manage files to e.g. store/retrieve database dumps across test - sessions. - - .. versionadded:: 7.0 - - :param name: - Must be a string not containing a ``/`` separator. - Make sure the name contains your plugin or application - identifiers to prevent clashes with other cache users. - """ - path = Path(name) - if len(path.parts) > 1: - raise ValueError("name is not allowed to contain path separators") - res = self._cachedir.joinpath(self._CACHE_PREFIX_DIRS, path) - res.mkdir(exist_ok=True, parents=True) - return res - - def _getvaluepath(self, key: str) -> Path: - return self._cachedir.joinpath(self._CACHE_PREFIX_VALUES, Path(key)) - - def get(self, key: str, default): - """Return the cached value for the given key. - - If no value was yet cached or the value cannot be read, the specified - default is returned. - - :param key: - Must be a ``/`` separated value. Usually the first - name is the name of your plugin or your application. - :param default: - The value to return in case of a cache-miss or invalid cache value. - """ - path = self._getvaluepath(key) - try: - with path.open("r") as f: - return json.load(f) - except (ValueError, OSError): - return default - - def set(self, key: str, value: object) -> None: - """Save value for the given key. - - :param key: - Must be a ``/`` separated value. Usually the first - name is the name of your plugin or your application. - :param value: - Must be of any combination of basic python types, - including nested types like lists of dictionaries. - """ - path = self._getvaluepath(key) - try: - if path.parent.is_dir(): - cache_dir_exists_already = True - else: - cache_dir_exists_already = self._cachedir.exists() - path.parent.mkdir(exist_ok=True, parents=True) - except OSError: - self.warn("could not create cache path {path}", path=path, _ispytest=True) - return - if not cache_dir_exists_already: - self._ensure_supporting_files() - data = json.dumps(value, indent=2) - try: - f = path.open("w") - except OSError: - self.warn("cache could not write path {path}", path=path, _ispytest=True) - else: - with f: - f.write(data) - - def _ensure_supporting_files(self) -> None: - """Create supporting files in the cache dir that are not really part of the cache.""" - readme_path = self._cachedir / "README.md" - readme_path.write_text(README_CONTENT) - - gitignore_path = self._cachedir.joinpath(".gitignore") - msg = "# Created by pytest automatically.\n*\n" - gitignore_path.write_text(msg, encoding="UTF-8") - - cachedir_tag_path = self._cachedir.joinpath("CACHEDIR.TAG") - cachedir_tag_path.write_bytes(CACHEDIR_TAG_CONTENT) - - -class LFPluginCollWrapper: - def __init__(self, lfplugin: "LFPlugin") -> None: - self.lfplugin = lfplugin - self._collected_at_least_one_failure = False - - @hookimpl(hookwrapper=True) - def pytest_make_collect_report(self, collector: nodes.Collector): - if isinstance(collector, Session): - out = yield - res: CollectReport = out.get_result() - - # Sort any lf-paths to the beginning. - lf_paths = self.lfplugin._last_failed_paths - - res.result = sorted( - res.result, - # use stable sort to priorize last failed - key=lambda x: x.path in lf_paths, - reverse=True, - ) - return - - elif isinstance(collector, Module): - if collector.path in self.lfplugin._last_failed_paths: - out = yield - res = out.get_result() - result = res.result - lastfailed = self.lfplugin.lastfailed - - # Only filter with known failures. - if not self._collected_at_least_one_failure: - if not any(x.nodeid in lastfailed for x in result): - return - self.lfplugin.config.pluginmanager.register( - LFPluginCollSkipfiles(self.lfplugin), "lfplugin-collskip" - ) - self._collected_at_least_one_failure = True - - session = collector.session - result[:] = [ - x - for x in result - if x.nodeid in lastfailed - # Include any passed arguments (not trivial to filter). - or session.isinitpath(x.path) - # Keep all sub-collectors. - or isinstance(x, nodes.Collector) - ] - return - yield - - -class LFPluginCollSkipfiles: - def __init__(self, lfplugin: "LFPlugin") -> None: - self.lfplugin = lfplugin - - @hookimpl - def pytest_make_collect_report( - self, collector: nodes.Collector - ) -> Optional[CollectReport]: - # Packages are Modules, but _last_failed_paths only contains - # test-bearing paths and doesn't try to include the paths of their - # packages, so don't filter them. - if isinstance(collector, Module) and not isinstance(collector, Package): - if collector.path not in self.lfplugin._last_failed_paths: - self.lfplugin._skipped_files += 1 - - return CollectReport( - collector.nodeid, "passed", longrepr=None, result=[] - ) - return None - - -class LFPlugin: - """Plugin which implements the --lf (run last-failing) option.""" - - def __init__(self, config: Config) -> None: - self.config = config - active_keys = "lf", "failedfirst" - self.active = any(config.getoption(key) for key in active_keys) - assert config.cache - self.lastfailed: Dict[str, bool] = config.cache.get("cache/lastfailed", {}) - self._previously_failed_count: Optional[int] = None - self._report_status: Optional[str] = None - self._skipped_files = 0 # count skipped files during collection due to --lf - - if config.getoption("lf"): - self._last_failed_paths = self.get_last_failed_paths() - config.pluginmanager.register( - LFPluginCollWrapper(self), "lfplugin-collwrapper" - ) - - def get_last_failed_paths(self) -> Set[Path]: - """Return a set with all Paths()s of the previously failed nodeids.""" - rootpath = self.config.rootpath - result = {rootpath / nodeid.split("::")[0] for nodeid in self.lastfailed} - return {x for x in result if x.exists()} - - def pytest_report_collectionfinish(self) -> Optional[str]: - if self.active and self.config.getoption("verbose") >= 0: - return "run-last-failure: %s" % self._report_status - return None - - def pytest_runtest_logreport(self, report: TestReport) -> None: - if (report.when == "call" and report.passed) or report.skipped: - self.lastfailed.pop(report.nodeid, None) - elif report.failed: - self.lastfailed[report.nodeid] = True - - def pytest_collectreport(self, report: CollectReport) -> None: - passed = report.outcome in ("passed", "skipped") - if passed: - if report.nodeid in self.lastfailed: - self.lastfailed.pop(report.nodeid) - self.lastfailed.update((item.nodeid, True) for item in report.result) - else: - self.lastfailed[report.nodeid] = True - - @hookimpl(hookwrapper=True, tryfirst=True) - def pytest_collection_modifyitems( - self, config: Config, items: List[nodes.Item] - ) -> Generator[None, None, None]: - yield - - if not self.active: - return - - if self.lastfailed: - previously_failed = [] - previously_passed = [] - for item in items: - if item.nodeid in self.lastfailed: - previously_failed.append(item) - else: - previously_passed.append(item) - self._previously_failed_count = len(previously_failed) - - if not previously_failed: - # Running a subset of all tests with recorded failures - # only outside of it. - self._report_status = "%d known failures not in selected tests" % ( - len(self.lastfailed), - ) - else: - if self.config.getoption("lf"): - items[:] = previously_failed - config.hook.pytest_deselected(items=previously_passed) - else: # --failedfirst - items[:] = previously_failed + previously_passed - - noun = "failure" if self._previously_failed_count == 1 else "failures" - suffix = " first" if self.config.getoption("failedfirst") else "" - self._report_status = "rerun previous {count} {noun}{suffix}".format( - count=self._previously_failed_count, suffix=suffix, noun=noun - ) - - if self._skipped_files > 0: - files_noun = "file" if self._skipped_files == 1 else "files" - self._report_status += " (skipped {files} {files_noun})".format( - files=self._skipped_files, files_noun=files_noun - ) - else: - self._report_status = "no previously failed tests, " - if self.config.getoption("last_failed_no_failures") == "none": - self._report_status += "deselecting all items." - config.hook.pytest_deselected(items=items[:]) - items[:] = [] - else: - self._report_status += "not deselecting items." - - def pytest_sessionfinish(self, session: Session) -> None: - config = self.config - if config.getoption("cacheshow") or hasattr(config, "workerinput"): - return - - assert config.cache is not None - saved_lastfailed = config.cache.get("cache/lastfailed", {}) - if saved_lastfailed != self.lastfailed: - config.cache.set("cache/lastfailed", self.lastfailed) - - -class NFPlugin: - """Plugin which implements the --nf (run new-first) option.""" - - def __init__(self, config: Config) -> None: - self.config = config - self.active = config.option.newfirst - assert config.cache is not None - self.cached_nodeids = set(config.cache.get("cache/nodeids", [])) - - @hookimpl(hookwrapper=True, tryfirst=True) - def pytest_collection_modifyitems( - self, items: List[nodes.Item] - ) -> Generator[None, None, None]: - yield - - if self.active: - new_items: Dict[str, nodes.Item] = {} - other_items: Dict[str, nodes.Item] = {} - for item in items: - if item.nodeid not in self.cached_nodeids: - new_items[item.nodeid] = item - else: - other_items[item.nodeid] = item - - items[:] = self._get_increasing_order( - new_items.values() - ) + self._get_increasing_order(other_items.values()) - self.cached_nodeids.update(new_items) - else: - self.cached_nodeids.update(item.nodeid for item in items) - - def _get_increasing_order(self, items: Iterable[nodes.Item]) -> List[nodes.Item]: - return sorted(items, key=lambda item: item.path.stat().st_mtime, reverse=True) # type: ignore[no-any-return] - - def pytest_sessionfinish(self) -> None: - config = self.config - if config.getoption("cacheshow") or hasattr(config, "workerinput"): - return - - if config.getoption("collectonly"): - return - - assert config.cache is not None - config.cache.set("cache/nodeids", sorted(self.cached_nodeids)) - - -def pytest_addoption(parser: Parser) -> None: - group = parser.getgroup("general") - group.addoption( - "--lf", - "--last-failed", - action="store_true", - dest="lf", - help="rerun only the tests that failed " - "at the last run (or all if none failed)", - ) - group.addoption( - "--ff", - "--failed-first", - action="store_true", - dest="failedfirst", - help="run all tests, but run the last failures first.\n" - "This may re-order tests and thus lead to " - "repeated fixture setup/teardown.", - ) - group.addoption( - "--nf", - "--new-first", - action="store_true", - dest="newfirst", - help="run tests from new files first, then the rest of the tests " - "sorted by file mtime", - ) - group.addoption( - "--cache-show", - action="append", - nargs="?", - dest="cacheshow", - help=( - "show cache contents, don't perform collection or tests. " - "Optional argument: glob (default: '*')." - ), - ) - group.addoption( - "--cache-clear", - action="store_true", - dest="cacheclear", - help="remove all cache contents at start of test run.", - ) - cache_dir_default = ".pytest_cache" - if "TOX_ENV_DIR" in os.environ: - cache_dir_default = os.path.join(os.environ["TOX_ENV_DIR"], cache_dir_default) - parser.addini("cache_dir", default=cache_dir_default, help="cache directory path.") - group.addoption( - "--lfnf", - "--last-failed-no-failures", - action="store", - dest="last_failed_no_failures", - choices=("all", "none"), - default="all", - help="which tests to run with no previously (known) failures.", - ) - - -def pytest_cmdline_main(config: Config) -> Optional[Union[int, ExitCode]]: - if config.option.cacheshow: - from _pytest.main import wrap_session - - return wrap_session(config, cacheshow) - return None - - -@hookimpl(tryfirst=True) -def pytest_configure(config: Config) -> None: - config.cache = Cache.for_config(config, _ispytest=True) - config.pluginmanager.register(LFPlugin(config), "lfplugin") - config.pluginmanager.register(NFPlugin(config), "nfplugin") - - -@fixture -def cache(request: FixtureRequest) -> Cache: - """Return a cache object that can persist state between testing sessions. - - cache.get(key, default) - cache.set(key, value) - - Keys must be ``/`` separated strings, where the first part is usually the - name of your plugin or application to avoid clashes with other cache users. - - Values can be any object handled by the json stdlib module. - """ - assert request.config.cache is not None - return request.config.cache - - -def pytest_report_header(config: Config) -> Optional[str]: - """Display cachedir with --cache-show and if non-default.""" - if config.option.verbose > 0 or config.getini("cache_dir") != ".pytest_cache": - assert config.cache is not None - cachedir = config.cache._cachedir - # TODO: evaluate generating upward relative paths - # starting with .., ../.. if sensible - - try: - displaypath = cachedir.relative_to(config.rootpath) - except ValueError: - displaypath = cachedir - return f"cachedir: {displaypath}" - return None - - -def cacheshow(config: Config, session: Session) -> int: - from pprint import pformat - - assert config.cache is not None - - tw = TerminalWriter() - tw.line("cachedir: " + str(config.cache._cachedir)) - if not config.cache._cachedir.is_dir(): - tw.line("cache is empty") - return 0 - - glob = config.option.cacheshow[0] - if glob is None: - glob = "*" - - dummy = object() - basedir = config.cache._cachedir - vdir = basedir / Cache._CACHE_PREFIX_VALUES - tw.sep("-", "cache values for %r" % glob) - for valpath in sorted(x for x in vdir.rglob(glob) if x.is_file()): - key = str(valpath.relative_to(vdir)) - val = config.cache.get(key, dummy) - if val is dummy: - tw.line("%s contains unreadable content, will be ignored" % key) - else: - tw.line("%s contains:" % key) - for line in pformat(val).splitlines(): - tw.line(" " + line) - - ddir = basedir / Cache._CACHE_PREFIX_DIRS - if ddir.is_dir(): - contents = sorted(ddir.rglob(glob)) - tw.sep("-", "cache directories for %r" % glob) - for p in contents: - # if p.is_dir(): - # print("%s/" % p.relative_to(basedir)) - if p.is_file(): - key = str(p.relative_to(basedir)) - tw.line(f"{key} is a file of length {p.stat().st_size:d}") - return 0 |