diff options
author | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
---|---|---|
committer | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
commit | 06e5c21a835c0e923506c4ff27929f34e00761c2 (patch) | |
tree | 75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /contrib/python/pytest/py2/_pytest/pytester.py | |
parent | 03f024c4412e3aa613bb543cf1660176320ba8f4 (diff) | |
download | ydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz |
fix ya.make
Diffstat (limited to 'contrib/python/pytest/py2/_pytest/pytester.py')
-rw-r--r-- | contrib/python/pytest/py2/_pytest/pytester.py | 1413 |
1 files changed, 0 insertions, 1413 deletions
diff --git a/contrib/python/pytest/py2/_pytest/pytester.py b/contrib/python/pytest/py2/_pytest/pytester.py deleted file mode 100644 index f1d739c991..0000000000 --- a/contrib/python/pytest/py2/_pytest/pytester.py +++ /dev/null @@ -1,1413 +0,0 @@ -# -*- coding: utf-8 -*- -"""(disabled by default) support for testing pytest and pytest plugins.""" -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import codecs -import gc -import os -import platform -import re -import subprocess -import sys -import time -import traceback -from fnmatch import fnmatch -from weakref import WeakKeyDictionary - -import py -import six - -import pytest -from _pytest._code import Source -from _pytest._io.saferepr import saferepr -from _pytest.assertion.rewrite import AssertionRewritingHook -from _pytest.capture import MultiCapture -from _pytest.capture import SysCapture -from _pytest.compat import safe_str -from _pytest.compat import Sequence -from _pytest.main import EXIT_INTERRUPTED -from _pytest.main import EXIT_OK -from _pytest.main import Session -from _pytest.monkeypatch import MonkeyPatch -from _pytest.pathlib import Path - -IGNORE_PAM = [ # filenames added when obtaining details about the current user - u"/var/lib/sss/mc/passwd" -] - - -def pytest_addoption(parser): - parser.addoption( - "--lsof", - action="store_true", - dest="lsof", - default=False, - help="run FD checks if lsof is available", - ) - - parser.addoption( - "--runpytest", - default="inprocess", - dest="runpytest", - choices=("inprocess", "subprocess"), - help=( - "run pytest sub runs in tests using an 'inprocess' " - "or 'subprocess' (python -m main) method" - ), - ) - - parser.addini( - "pytester_example_dir", help="directory to take the pytester example files from" - ) - - -def pytest_configure(config): - if config.getvalue("lsof"): - checker = LsofFdLeakChecker() - if checker.matching_platform(): - config.pluginmanager.register(checker) - - config.addinivalue_line( - "markers", - "pytester_example_path(*path_segments): join the given path " - "segments to `pytester_example_dir` for this test.", - ) - - -def raise_on_kwargs(kwargs): - __tracebackhide__ = True - if kwargs: # pragma: no branch - raise TypeError( - "Unexpected keyword arguments: {}".format(", ".join(sorted(kwargs))) - ) - - -class LsofFdLeakChecker(object): - def get_open_files(self): - out = self._exec_lsof() - open_files = self._parse_lsof_output(out) - return open_files - - def _exec_lsof(self): - pid = os.getpid() - # py3: use subprocess.DEVNULL directly. - with open(os.devnull, "wb") as devnull: - return subprocess.check_output( - ("lsof", "-Ffn0", "-p", str(pid)), stderr=devnull - ).decode() - - def _parse_lsof_output(self, out): - def isopen(line): - return line.startswith("f") and ( - "deleted" not in line - and "mem" not in line - and "txt" not in line - and "cwd" not in line - ) - - open_files = [] - - for line in out.split("\n"): - if isopen(line): - fields = line.split("\0") - fd = fields[0][1:] - filename = fields[1][1:] - if filename in IGNORE_PAM: - continue - if filename.startswith("/"): - open_files.append((fd, filename)) - - return open_files - - def matching_platform(self): - try: - subprocess.check_output(("lsof", "-v")) - except (OSError, subprocess.CalledProcessError): - return False - else: - return True - - @pytest.hookimpl(hookwrapper=True, tryfirst=True) - def pytest_runtest_protocol(self, item): - lines1 = self.get_open_files() - yield - if hasattr(sys, "pypy_version_info"): - gc.collect() - lines2 = self.get_open_files() - - new_fds = {t[0] for t in lines2} - {t[0] for t in lines1} - leaked_files = [t for t in lines2 if t[0] in new_fds] - if leaked_files: - error = [] - error.append("***** %s FD leakage detected" % len(leaked_files)) - error.extend([str(f) for f in leaked_files]) - error.append("*** Before:") - error.extend([str(f) for f in lines1]) - error.append("*** After:") - error.extend([str(f) for f in lines2]) - error.append(error[0]) - error.append("*** function %s:%s: %s " % item.location) - error.append("See issue #2366") - item.warn(pytest.PytestWarning("\n".join(error))) - - -# used at least by pytest-xdist plugin - - -@pytest.fixture -def _pytest(request): - """Return a helper which offers a gethookrecorder(hook) method which - returns a HookRecorder instance which helps to make assertions about called - hooks. - - """ - return PytestArg(request) - - -class PytestArg(object): - def __init__(self, request): - self.request = request - - def gethookrecorder(self, hook): - hookrecorder = HookRecorder(hook._pm) - self.request.addfinalizer(hookrecorder.finish_recording) - return hookrecorder - - -def get_public_names(values): - """Only return names from iterator values without a leading underscore.""" - return [x for x in values if x[0] != "_"] - - -class ParsedCall(object): - def __init__(self, name, kwargs): - self.__dict__.update(kwargs) - self._name = name - - def __repr__(self): - d = self.__dict__.copy() - del d["_name"] - return "<ParsedCall %r(**%r)>" % (self._name, d) - - -class HookRecorder(object): - """Record all hooks called in a plugin manager. - - This wraps all the hook calls in the plugin manager, recording each call - before propagating the normal calls. - - """ - - def __init__(self, pluginmanager): - self._pluginmanager = pluginmanager - self.calls = [] - - def before(hook_name, hook_impls, kwargs): - self.calls.append(ParsedCall(hook_name, kwargs)) - - def after(outcome, hook_name, hook_impls, kwargs): - pass - - self._undo_wrapping = pluginmanager.add_hookcall_monitoring(before, after) - - def finish_recording(self): - self._undo_wrapping() - - def getcalls(self, names): - if isinstance(names, str): - names = names.split() - return [call for call in self.calls if call._name in names] - - def assert_contains(self, entries): - __tracebackhide__ = True - i = 0 - entries = list(entries) - backlocals = sys._getframe(1).f_locals - while entries: - name, check = entries.pop(0) - for ind, call in enumerate(self.calls[i:]): - if call._name == name: - print("NAMEMATCH", name, call) - if eval(check, backlocals, call.__dict__): - print("CHECKERMATCH", repr(check), "->", call) - else: - print("NOCHECKERMATCH", repr(check), "-", call) - continue - i += ind + 1 - break - print("NONAMEMATCH", name, "with", call) - else: - pytest.fail("could not find %r check %r" % (name, check)) - - def popcall(self, name): - __tracebackhide__ = True - for i, call in enumerate(self.calls): - if call._name == name: - del self.calls[i] - return call - lines = ["could not find call %r, in:" % (name,)] - lines.extend([" %s" % x for x in self.calls]) - pytest.fail("\n".join(lines)) - - def getcall(self, name): - values = self.getcalls(name) - assert len(values) == 1, (name, values) - return values[0] - - # functionality for test reports - - def getreports(self, names="pytest_runtest_logreport pytest_collectreport"): - return [x.report for x in self.getcalls(names)] - - def matchreport( - self, - inamepart="", - names="pytest_runtest_logreport pytest_collectreport", - when=None, - ): - """return a testreport whose dotted import path matches""" - values = [] - for rep in self.getreports(names=names): - if not when and rep.when != "call" and rep.passed: - # setup/teardown passing reports - let's ignore those - continue - if when and rep.when != when: - continue - if not inamepart or inamepart in rep.nodeid.split("::"): - values.append(rep) - if not values: - raise ValueError( - "could not find test report matching %r: " - "no test reports at all!" % (inamepart,) - ) - if len(values) > 1: - raise ValueError( - "found 2 or more testreports matching %r: %s" % (inamepart, values) - ) - return values[0] - - def getfailures(self, names="pytest_runtest_logreport pytest_collectreport"): - return [rep for rep in self.getreports(names) if rep.failed] - - def getfailedcollections(self): - return self.getfailures("pytest_collectreport") - - def listoutcomes(self): - passed = [] - skipped = [] - failed = [] - for rep in self.getreports("pytest_collectreport pytest_runtest_logreport"): - if rep.passed: - if rep.when == "call": - passed.append(rep) - elif rep.skipped: - skipped.append(rep) - else: - assert rep.failed, "Unexpected outcome: {!r}".format(rep) - failed.append(rep) - return passed, skipped, failed - - def countoutcomes(self): - return [len(x) for x in self.listoutcomes()] - - def assertoutcome(self, passed=0, skipped=0, failed=0): - realpassed, realskipped, realfailed = self.listoutcomes() - assert passed == len(realpassed) - assert skipped == len(realskipped) - assert failed == len(realfailed) - - def clear(self): - self.calls[:] = [] - - -@pytest.fixture -def linecomp(request): - return LineComp() - - -@pytest.fixture(name="LineMatcher") -def LineMatcher_fixture(request): - return LineMatcher - - -@pytest.fixture -def testdir(request, tmpdir_factory): - return Testdir(request, tmpdir_factory) - - -@pytest.fixture -def _sys_snapshot(): - snappaths = SysPathsSnapshot() - snapmods = SysModulesSnapshot() - yield - snapmods.restore() - snappaths.restore() - - -@pytest.fixture -def _config_for_test(): - from _pytest.config import get_config - - config = get_config() - yield config - config._ensure_unconfigure() # cleanup, e.g. capman closing tmpfiles. - - -rex_outcome = re.compile(r"(\d+) ([\w-]+)") - - -class RunResult(object): - """The result of running a command. - - Attributes: - - :ret: the return value - :outlines: list of lines captured from stdout - :errlines: list of lines captures from stderr - :stdout: :py:class:`LineMatcher` of stdout, use ``stdout.str()`` to - reconstruct stdout or the commonly used ``stdout.fnmatch_lines()`` - method - :stderr: :py:class:`LineMatcher` of stderr - :duration: duration in seconds - - """ - - def __init__(self, ret, outlines, errlines, duration): - self.ret = ret - self.outlines = outlines - self.errlines = errlines - self.stdout = LineMatcher(outlines) - self.stderr = LineMatcher(errlines) - self.duration = duration - - def __repr__(self): - return ( - "<RunResult ret=%r len(stdout.lines)=%d len(stderr.lines)=%d duration=%.2fs>" - % (self.ret, len(self.stdout.lines), len(self.stderr.lines), self.duration) - ) - - def parseoutcomes(self): - """Return a dictionary of outcomestring->num from parsing the terminal - output that the test process produced. - - """ - for line in reversed(self.outlines): - if "seconds" in line: - outcomes = rex_outcome.findall(line) - if outcomes: - d = {} - for num, cat in outcomes: - d[cat] = int(num) - return d - raise ValueError("Pytest terminal report not found") - - def assert_outcomes( - self, passed=0, skipped=0, failed=0, error=0, xpassed=0, xfailed=0 - ): - """Assert that the specified outcomes appear with the respective - numbers (0 means it didn't occur) in the text output from a test run. - - """ - d = self.parseoutcomes() - obtained = { - "passed": d.get("passed", 0), - "skipped": d.get("skipped", 0), - "failed": d.get("failed", 0), - "error": d.get("error", 0), - "xpassed": d.get("xpassed", 0), - "xfailed": d.get("xfailed", 0), - } - expected = { - "passed": passed, - "skipped": skipped, - "failed": failed, - "error": error, - "xpassed": xpassed, - "xfailed": xfailed, - } - assert obtained == expected - - -class CwdSnapshot(object): - def __init__(self): - self.__saved = os.getcwd() - - def restore(self): - os.chdir(self.__saved) - - -class SysModulesSnapshot(object): - def __init__(self, preserve=None): - self.__preserve = preserve - self.__saved = dict(sys.modules) - - def restore(self): - if self.__preserve: - self.__saved.update( - (k, m) for k, m in sys.modules.items() if self.__preserve(k) - ) - sys.modules.clear() - sys.modules.update(self.__saved) - - -class SysPathsSnapshot(object): - def __init__(self): - self.__saved = list(sys.path), list(sys.meta_path) - - def restore(self): - sys.path[:], sys.meta_path[:] = self.__saved - - -class Testdir(object): - """Temporary test directory with tools to test/run pytest itself. - - This is based on the ``tmpdir`` fixture but provides a number of methods - which aid with testing pytest itself. Unless :py:meth:`chdir` is used all - methods will use :py:attr:`tmpdir` as their current working directory. - - Attributes: - - :tmpdir: The :py:class:`py.path.local` instance of the temporary directory. - - :plugins: A list of plugins to use with :py:meth:`parseconfig` and - :py:meth:`runpytest`. Initially this is an empty list but plugins can - be added to the list. The type of items to add to the list depends on - the method using them so refer to them for details. - - """ - - CLOSE_STDIN = object - - class TimeoutExpired(Exception): - pass - - def __init__(self, request, tmpdir_factory): - self.request = request - self._mod_collections = WeakKeyDictionary() - name = request.function.__name__ - self.tmpdir = tmpdir_factory.mktemp(name, numbered=True) - self.test_tmproot = tmpdir_factory.mktemp("tmp-" + name, numbered=True) - self.plugins = [] - self._cwd_snapshot = CwdSnapshot() - self._sys_path_snapshot = SysPathsSnapshot() - self._sys_modules_snapshot = self.__take_sys_modules_snapshot() - self.chdir() - self.request.addfinalizer(self.finalize) - method = self.request.config.getoption("--runpytest") - if method == "inprocess": - self._runpytest_method = self.runpytest_inprocess - elif method == "subprocess": - self._runpytest_method = self.runpytest_subprocess - - mp = self.monkeypatch = MonkeyPatch() - mp.setenv("PYTEST_DEBUG_TEMPROOT", str(self.test_tmproot)) - # Ensure no unexpected caching via tox. - mp.delenv("TOX_ENV_DIR", raising=False) - # Discard outer pytest options. - mp.delenv("PYTEST_ADDOPTS", raising=False) - - # Environment (updates) for inner runs. - tmphome = str(self.tmpdir) - self._env_run_update = {"HOME": tmphome, "USERPROFILE": tmphome} - - def __repr__(self): - return "<Testdir %r>" % (self.tmpdir,) - - def __str__(self): - return str(self.tmpdir) - - def finalize(self): - """Clean up global state artifacts. - - Some methods modify the global interpreter state and this tries to - clean this up. It does not remove the temporary directory however so - it can be looked at after the test run has finished. - - """ - self._sys_modules_snapshot.restore() - self._sys_path_snapshot.restore() - self._cwd_snapshot.restore() - self.monkeypatch.undo() - - def __take_sys_modules_snapshot(self): - # some zope modules used by twisted-related tests keep internal state - # and can't be deleted; we had some trouble in the past with - # `zope.interface` for example - def preserve_module(name): - return name.startswith("zope") - - return SysModulesSnapshot(preserve=preserve_module) - - def make_hook_recorder(self, pluginmanager): - """Create a new :py:class:`HookRecorder` for a PluginManager.""" - pluginmanager.reprec = reprec = HookRecorder(pluginmanager) - self.request.addfinalizer(reprec.finish_recording) - return reprec - - def chdir(self): - """Cd into the temporary directory. - - This is done automatically upon instantiation. - - """ - self.tmpdir.chdir() - - def _makefile(self, ext, args, kwargs, encoding="utf-8"): - items = list(kwargs.items()) - - def to_text(s): - return s.decode(encoding) if isinstance(s, bytes) else six.text_type(s) - - if args: - source = u"\n".join(to_text(x) for x in args) - basename = self.request.function.__name__ - items.insert(0, (basename, source)) - - ret = None - for basename, value in items: - p = self.tmpdir.join(basename).new(ext=ext) - p.dirpath().ensure_dir() - source = Source(value) - source = u"\n".join(to_text(line) for line in source.lines) - p.write(source.strip().encode(encoding), "wb") - if ret is None: - ret = p - return ret - - def makefile(self, ext, *args, **kwargs): - r"""Create new file(s) in the testdir. - - :param str ext: The extension the file(s) should use, including the dot, e.g. `.py`. - :param list[str] args: All args will be treated as strings and joined using newlines. - The result will be written as contents to the file. The name of the - file will be based on the test function requesting this fixture. - :param kwargs: Each keyword is the name of a file, while the value of it will - be written as contents of the file. - - Examples: - - .. code-block:: python - - testdir.makefile(".txt", "line1", "line2") - - testdir.makefile(".ini", pytest="[pytest]\naddopts=-rs\n") - - """ - return self._makefile(ext, args, kwargs) - - def makeconftest(self, source): - """Write a contest.py file with 'source' as contents.""" - return self.makepyfile(conftest=source) - - def makeini(self, source): - """Write a tox.ini file with 'source' as contents.""" - return self.makefile(".ini", tox=source) - - def getinicfg(self, source): - """Return the pytest section from the tox.ini config file.""" - p = self.makeini(source) - return py.iniconfig.IniConfig(p)["pytest"] - - def makepyfile(self, *args, **kwargs): - """Shortcut for .makefile() with a .py extension.""" - return self._makefile(".py", args, kwargs) - - def maketxtfile(self, *args, **kwargs): - """Shortcut for .makefile() with a .txt extension.""" - return self._makefile(".txt", args, kwargs) - - def syspathinsert(self, path=None): - """Prepend a directory to sys.path, defaults to :py:attr:`tmpdir`. - - This is undone automatically when this object dies at the end of each - test. - """ - if path is None: - path = self.tmpdir - - self.monkeypatch.syspath_prepend(str(path)) - - def mkdir(self, name): - """Create a new (sub)directory.""" - return self.tmpdir.mkdir(name) - - def mkpydir(self, name): - """Create a new python package. - - This creates a (sub)directory with an empty ``__init__.py`` file so it - gets recognised as a python package. - - """ - p = self.mkdir(name) - p.ensure("__init__.py") - return p - - def copy_example(self, name=None): - import warnings - from _pytest.warning_types import PYTESTER_COPY_EXAMPLE - - warnings.warn(PYTESTER_COPY_EXAMPLE, stacklevel=2) - example_dir = self.request.config.getini("pytester_example_dir") - if example_dir is None: - raise ValueError("pytester_example_dir is unset, can't copy examples") - example_dir = self.request.config.rootdir.join(example_dir) - - for extra_element in self.request.node.iter_markers("pytester_example_path"): - assert extra_element.args - example_dir = example_dir.join(*extra_element.args) - - if name is None: - func_name = self.request.function.__name__ - maybe_dir = example_dir / func_name - maybe_file = example_dir / (func_name + ".py") - - if maybe_dir.isdir(): - example_path = maybe_dir - elif maybe_file.isfile(): - example_path = maybe_file - else: - raise LookupError( - "{} cant be found as module or package in {}".format( - func_name, example_dir.bestrelpath(self.request.config.rootdir) - ) - ) - else: - example_path = example_dir.join(name) - - if example_path.isdir() and not example_path.join("__init__.py").isfile(): - example_path.copy(self.tmpdir) - return self.tmpdir - elif example_path.isfile(): - result = self.tmpdir.join(example_path.basename) - example_path.copy(result) - return result - else: - raise LookupError( - 'example "{}" is not found as a file or directory'.format(example_path) - ) - - Session = Session - - def getnode(self, config, arg): - """Return the collection node of a file. - - :param config: :py:class:`_pytest.config.Config` instance, see - :py:meth:`parseconfig` and :py:meth:`parseconfigure` to create the - configuration - - :param arg: a :py:class:`py.path.local` instance of the file - - """ - session = Session(config) - assert "::" not in str(arg) - p = py.path.local(arg) - config.hook.pytest_sessionstart(session=session) - res = session.perform_collect([str(p)], genitems=False)[0] - config.hook.pytest_sessionfinish(session=session, exitstatus=EXIT_OK) - return res - - def getpathnode(self, path): - """Return the collection node of a file. - - This is like :py:meth:`getnode` but uses :py:meth:`parseconfigure` to - create the (configured) pytest Config instance. - - :param path: a :py:class:`py.path.local` instance of the file - - """ - config = self.parseconfigure(path) - session = Session(config) - x = session.fspath.bestrelpath(path) - config.hook.pytest_sessionstart(session=session) - res = session.perform_collect([x], genitems=False)[0] - config.hook.pytest_sessionfinish(session=session, exitstatus=EXIT_OK) - return res - - def genitems(self, colitems): - """Generate all test items from a collection node. - - This recurses into the collection node and returns a list of all the - test items contained within. - - """ - session = colitems[0].session - result = [] - for colitem in colitems: - result.extend(session.genitems(colitem)) - return result - - def runitem(self, source): - """Run the "test_func" Item. - - The calling test instance (class containing the test method) must - provide a ``.getrunner()`` method which should return a runner which - can run the test protocol for a single item, e.g. - :py:func:`_pytest.runner.runtestprotocol`. - - """ - # used from runner functional tests - item = self.getitem(source) - # the test class where we are called from wants to provide the runner - testclassinstance = self.request.instance - runner = testclassinstance.getrunner() - return runner(item) - - def inline_runsource(self, source, *cmdlineargs): - """Run a test module in process using ``pytest.main()``. - - This run writes "source" into a temporary file and runs - ``pytest.main()`` on it, returning a :py:class:`HookRecorder` instance - for the result. - - :param source: the source code of the test module - - :param cmdlineargs: any extra command line arguments to use - - :return: :py:class:`HookRecorder` instance of the result - - """ - p = self.makepyfile(source) - values = list(cmdlineargs) + [p] - return self.inline_run(*values) - - def inline_genitems(self, *args): - """Run ``pytest.main(['--collectonly'])`` in-process. - - Runs the :py:func:`pytest.main` function to run all of pytest inside - the test process itself like :py:meth:`inline_run`, but returns a - tuple of the collected items and a :py:class:`HookRecorder` instance. - - """ - rec = self.inline_run("--collect-only", *args) - items = [x.item for x in rec.getcalls("pytest_itemcollected")] - return items, rec - - def inline_run(self, *args, **kwargs): - """Run ``pytest.main()`` in-process, returning a HookRecorder. - - Runs the :py:func:`pytest.main` function to run all of pytest inside - the test process itself. This means it can return a - :py:class:`HookRecorder` instance which gives more detailed results - from that run than can be done by matching stdout/stderr from - :py:meth:`runpytest`. - - :param args: command line arguments to pass to :py:func:`pytest.main` - - :param plugins: (keyword-only) extra plugin instances the - ``pytest.main()`` instance should use - - :return: a :py:class:`HookRecorder` instance - """ - plugins = kwargs.pop("plugins", []) - no_reraise_ctrlc = kwargs.pop("no_reraise_ctrlc", None) - raise_on_kwargs(kwargs) - - finalizers = [] - try: - # Do not load user config (during runs only). - mp_run = MonkeyPatch() - for k, v in self._env_run_update.items(): - mp_run.setenv(k, v) - finalizers.append(mp_run.undo) - - # When running pytest inline any plugins active in the main test - # process are already imported. So this disables the warning which - # will trigger to say they can no longer be rewritten, which is - # fine as they have already been rewritten. - orig_warn = AssertionRewritingHook._warn_already_imported - - def revert_warn_already_imported(): - AssertionRewritingHook._warn_already_imported = orig_warn - - finalizers.append(revert_warn_already_imported) - AssertionRewritingHook._warn_already_imported = lambda *a: None - - # Any sys.module or sys.path changes done while running pytest - # inline should be reverted after the test run completes to avoid - # clashing with later inline tests run within the same pytest test, - # e.g. just because they use matching test module names. - finalizers.append(self.__take_sys_modules_snapshot().restore) - finalizers.append(SysPathsSnapshot().restore) - - # Important note: - # - our tests should not leave any other references/registrations - # laying around other than possibly loaded test modules - # referenced from sys.modules, as nothing will clean those up - # automatically - - rec = [] - - class Collect(object): - def pytest_configure(x, config): - rec.append(self.make_hook_recorder(config.pluginmanager)) - - plugins.append(Collect()) - ret = pytest.main(list(args), plugins=plugins) - if len(rec) == 1: - reprec = rec.pop() - else: - - class reprec(object): - pass - - reprec.ret = ret - - # typically we reraise keyboard interrupts from the child run - # because it's our user requesting interruption of the testing - if ret == EXIT_INTERRUPTED and not no_reraise_ctrlc: - calls = reprec.getcalls("pytest_keyboard_interrupt") - if calls and calls[-1].excinfo.type == KeyboardInterrupt: - raise KeyboardInterrupt() - return reprec - finally: - for finalizer in finalizers: - finalizer() - - def runpytest_inprocess(self, *args, **kwargs): - """Return result of running pytest in-process, providing a similar - interface to what self.runpytest() provides. - """ - syspathinsert = kwargs.pop("syspathinsert", False) - - if syspathinsert: - self.syspathinsert() - now = time.time() - capture = MultiCapture(Capture=SysCapture) - capture.start_capturing() - try: - try: - reprec = self.inline_run(*args, **kwargs) - except SystemExit as e: - - class reprec(object): - ret = e.args[0] - - except Exception: - traceback.print_exc() - - class reprec(object): - ret = 3 - - finally: - out, err = capture.readouterr() - capture.stop_capturing() - sys.stdout.write(out) - sys.stderr.write(err) - - res = RunResult(reprec.ret, out.split("\n"), err.split("\n"), time.time() - now) - res.reprec = reprec - return res - - def runpytest(self, *args, **kwargs): - """Run pytest inline or in a subprocess, depending on the command line - option "--runpytest" and return a :py:class:`RunResult`. - - """ - args = self._ensure_basetemp(args) - return self._runpytest_method(*args, **kwargs) - - def _ensure_basetemp(self, args): - args = list(args) - for x in args: - if safe_str(x).startswith("--basetemp"): - break - else: - args.append("--basetemp=%s" % self.tmpdir.dirpath("basetemp")) - return args - - def parseconfig(self, *args): - """Return a new pytest Config instance from given commandline args. - - This invokes the pytest bootstrapping code in _pytest.config to create - a new :py:class:`_pytest.core.PluginManager` and call the - pytest_cmdline_parse hook to create a new - :py:class:`_pytest.config.Config` instance. - - If :py:attr:`plugins` has been populated they should be plugin modules - to be registered with the PluginManager. - - """ - args = self._ensure_basetemp(args) - - import _pytest.config - - config = _pytest.config._prepareconfig(args, self.plugins) - # we don't know what the test will do with this half-setup config - # object and thus we make sure it gets unconfigured properly in any - # case (otherwise capturing could still be active, for example) - self.request.addfinalizer(config._ensure_unconfigure) - return config - - def parseconfigure(self, *args): - """Return a new pytest configured Config instance. - - This returns a new :py:class:`_pytest.config.Config` instance like - :py:meth:`parseconfig`, but also calls the pytest_configure hook. - - """ - config = self.parseconfig(*args) - config._do_configure() - self.request.addfinalizer(config._ensure_unconfigure) - return config - - def getitem(self, source, funcname="test_func"): - """Return the test item for a test function. - - This writes the source to a python file and runs pytest's collection on - the resulting module, returning the test item for the requested - function name. - - :param source: the module source - - :param funcname: the name of the test function for which to return a - test item - - """ - items = self.getitems(source) - for item in items: - if item.name == funcname: - return item - assert 0, "%r item not found in module:\n%s\nitems: %s" % ( - funcname, - source, - items, - ) - - def getitems(self, source): - """Return all test items collected from the module. - - This writes the source to a python file and runs pytest's collection on - the resulting module, returning all test items contained within. - - """ - modcol = self.getmodulecol(source) - return self.genitems([modcol]) - - def getmodulecol(self, source, configargs=(), withinit=False): - """Return the module collection node for ``source``. - - This writes ``source`` to a file using :py:meth:`makepyfile` and then - runs the pytest collection on it, returning the collection node for the - test module. - - :param source: the source code of the module to collect - - :param configargs: any extra arguments to pass to - :py:meth:`parseconfigure` - - :param withinit: whether to also write an ``__init__.py`` file to the - same directory to ensure it is a package - - """ - if isinstance(source, Path): - path = self.tmpdir.join(str(source)) - assert not withinit, "not supported for paths" - else: - kw = {self.request.function.__name__: Source(source).strip()} - path = self.makepyfile(**kw) - if withinit: - self.makepyfile(__init__="#") - self.config = config = self.parseconfigure(path, *configargs) - return self.getnode(config, path) - - def collect_by_name(self, modcol, name): - """Return the collection node for name from the module collection. - - This will search a module collection node for a collection node - matching the given name. - - :param modcol: a module collection node; see :py:meth:`getmodulecol` - - :param name: the name of the node to return - - """ - if modcol not in self._mod_collections: - self._mod_collections[modcol] = list(modcol.collect()) - for colitem in self._mod_collections[modcol]: - if colitem.name == name: - return colitem - - def popen( - self, - cmdargs, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - stdin=CLOSE_STDIN, - **kw - ): - """Invoke subprocess.Popen. - - This calls subprocess.Popen making sure the current working directory - is in the PYTHONPATH. - - You probably want to use :py:meth:`run` instead. - - """ - env = os.environ.copy() - env["PYTHONPATH"] = os.pathsep.join( - filter(None, [os.getcwd(), env.get("PYTHONPATH", "")]) - ) - env.update(self._env_run_update) - kw["env"] = env - - if stdin is Testdir.CLOSE_STDIN: - kw["stdin"] = subprocess.PIPE - elif isinstance(stdin, bytes): - kw["stdin"] = subprocess.PIPE - else: - kw["stdin"] = stdin - - popen = subprocess.Popen(cmdargs, stdout=stdout, stderr=stderr, **kw) - if stdin is Testdir.CLOSE_STDIN: - popen.stdin.close() - elif isinstance(stdin, bytes): - popen.stdin.write(stdin) - - return popen - - def run(self, *cmdargs, **kwargs): - """Run a command with arguments. - - Run a process using subprocess.Popen saving the stdout and stderr. - - :param args: the sequence of arguments to pass to `subprocess.Popen()` - :param timeout: the period in seconds after which to timeout and raise - :py:class:`Testdir.TimeoutExpired` - :param stdin: optional standard input. Bytes are being send, closing - the pipe, otherwise it is passed through to ``popen``. - Defaults to ``CLOSE_STDIN``, which translates to using a pipe - (``subprocess.PIPE``) that gets closed. - - Returns a :py:class:`RunResult`. - - """ - __tracebackhide__ = True - - timeout = kwargs.pop("timeout", None) - stdin = kwargs.pop("stdin", Testdir.CLOSE_STDIN) - raise_on_kwargs(kwargs) - - cmdargs = [ - str(arg) if isinstance(arg, py.path.local) else arg for arg in cmdargs - ] - p1 = self.tmpdir.join("stdout") - p2 = self.tmpdir.join("stderr") - print("running:", *cmdargs) - print(" in:", py.path.local()) - f1 = codecs.open(str(p1), "w", encoding="utf8") - f2 = codecs.open(str(p2), "w", encoding="utf8") - try: - now = time.time() - popen = self.popen( - cmdargs, - stdin=stdin, - stdout=f1, - stderr=f2, - close_fds=(sys.platform != "win32"), - ) - if isinstance(stdin, bytes): - popen.stdin.close() - - def handle_timeout(): - __tracebackhide__ = True - - timeout_message = ( - "{seconds} second timeout expired running:" - " {command}".format(seconds=timeout, command=cmdargs) - ) - - popen.kill() - popen.wait() - raise self.TimeoutExpired(timeout_message) - - if timeout is None: - ret = popen.wait() - elif not six.PY2: - try: - ret = popen.wait(timeout) - except subprocess.TimeoutExpired: - handle_timeout() - else: - end = time.time() + timeout - - resolution = min(0.1, timeout / 10) - - while True: - ret = popen.poll() - if ret is not None: - break - - if time.time() > end: - handle_timeout() - - time.sleep(resolution) - finally: - f1.close() - f2.close() - f1 = codecs.open(str(p1), "r", encoding="utf8") - f2 = codecs.open(str(p2), "r", encoding="utf8") - try: - out = f1.read().splitlines() - err = f2.read().splitlines() - finally: - f1.close() - f2.close() - self._dump_lines(out, sys.stdout) - self._dump_lines(err, sys.stderr) - return RunResult(ret, out, err, time.time() - now) - - def _dump_lines(self, lines, fp): - try: - for line in lines: - print(line, file=fp) - except UnicodeEncodeError: - print("couldn't print to %s because of encoding" % (fp,)) - - def _getpytestargs(self): - return sys.executable, "-mpytest" - - def runpython(self, script): - """Run a python script using sys.executable as interpreter. - - Returns a :py:class:`RunResult`. - - """ - return self.run(sys.executable, script) - - def runpython_c(self, command): - """Run python -c "command", return a :py:class:`RunResult`.""" - return self.run(sys.executable, "-c", command) - - def runpytest_subprocess(self, *args, **kwargs): - """Run pytest as a subprocess with given arguments. - - Any plugins added to the :py:attr:`plugins` list will be added using the - ``-p`` command line option. Additionally ``--basetemp`` is used to put - any temporary files and directories in a numbered directory prefixed - with "runpytest-" to not conflict with the normal numbered pytest - location for temporary files and directories. - - :param args: the sequence of arguments to pass to the pytest subprocess - :param timeout: the period in seconds after which to timeout and raise - :py:class:`Testdir.TimeoutExpired` - - Returns a :py:class:`RunResult`. - """ - __tracebackhide__ = True - timeout = kwargs.pop("timeout", None) - raise_on_kwargs(kwargs) - - p = py.path.local.make_numbered_dir( - prefix="runpytest-", keep=None, rootdir=self.tmpdir - ) - args = ("--basetemp=%s" % p,) + args - plugins = [x for x in self.plugins if isinstance(x, str)] - if plugins: - args = ("-p", plugins[0]) + args - args = self._getpytestargs() + args - return self.run(*args, timeout=timeout) - - def spawn_pytest(self, string, expect_timeout=10.0): - """Run pytest using pexpect. - - This makes sure to use the right pytest and sets up the temporary - directory locations. - - The pexpect child is returned. - - """ - basetemp = self.tmpdir.mkdir("temp-pexpect") - invoke = " ".join(map(str, self._getpytestargs())) - cmd = "%s --basetemp=%s %s" % (invoke, basetemp, string) - return self.spawn(cmd, expect_timeout=expect_timeout) - - def spawn(self, cmd, expect_timeout=10.0): - """Run a command using pexpect. - - The pexpect child is returned. - - """ - pexpect = pytest.importorskip("pexpect", "3.0") - if hasattr(sys, "pypy_version_info") and "64" in platform.machine(): - pytest.skip("pypy-64 bit not supported") - if sys.platform.startswith("freebsd"): - pytest.xfail("pexpect does not work reliably on freebsd") - logfile = self.tmpdir.join("spawn.out").open("wb") - - # Do not load user config. - env = os.environ.copy() - env.update(self._env_run_update) - - child = pexpect.spawn(cmd, logfile=logfile, env=env) - self.request.addfinalizer(logfile.close) - child.timeout = expect_timeout - return child - - -def getdecoded(out): - try: - return out.decode("utf-8") - except UnicodeDecodeError: - return "INTERNAL not-utf8-decodeable, truncated string:\n%s" % (saferepr(out),) - - -class LineComp(object): - def __init__(self): - self.stringio = py.io.TextIO() - - def assert_contains_lines(self, lines2): - """Assert that lines2 are contained (linearly) in lines1. - - Return a list of extralines found. - - """ - __tracebackhide__ = True - val = self.stringio.getvalue() - self.stringio.truncate(0) - self.stringio.seek(0) - lines1 = val.split("\n") - return LineMatcher(lines1).fnmatch_lines(lines2) - - -class LineMatcher(object): - """Flexible matching of text. - - This is a convenience class to test large texts like the output of - commands. - - The constructor takes a list of lines without their trailing newlines, i.e. - ``text.splitlines()``. - - """ - - def __init__(self, lines): - self.lines = lines - self._log_output = [] - - def str(self): - """Return the entire original text.""" - return "\n".join(self.lines) - - def _getlines(self, lines2): - if isinstance(lines2, str): - lines2 = Source(lines2) - if isinstance(lines2, Source): - lines2 = lines2.strip().lines - return lines2 - - def fnmatch_lines_random(self, lines2): - """Check lines exist in the output using in any order. - - Lines are checked using ``fnmatch.fnmatch``. The argument is a list of - lines which have to occur in the output, in any order. - - """ - self._match_lines_random(lines2, fnmatch) - - def re_match_lines_random(self, lines2): - """Check lines exist in the output using ``re.match``, in any order. - - The argument is a list of lines which have to occur in the output, in - any order. - - """ - self._match_lines_random(lines2, lambda name, pat: re.match(pat, name)) - - def _match_lines_random(self, lines2, match_func): - """Check lines exist in the output. - - The argument is a list of lines which have to occur in the output, in - any order. Each line can contain glob whildcards. - - """ - lines2 = self._getlines(lines2) - for line in lines2: - for x in self.lines: - if line == x or match_func(x, line): - self._log("matched: ", repr(line)) - break - else: - self._log("line %r not found in output" % line) - raise ValueError(self._log_text) - - def get_lines_after(self, fnline): - """Return all lines following the given line in the text. - - The given line can contain glob wildcards. - - """ - for i, line in enumerate(self.lines): - if fnline == line or fnmatch(line, fnline): - return self.lines[i + 1 :] - raise ValueError("line %r not found in output" % fnline) - - def _log(self, *args): - self._log_output.append(" ".join(str(x) for x in args)) - - @property - def _log_text(self): - return "\n".join(self._log_output) - - def fnmatch_lines(self, lines2): - """Search captured text for matching lines using ``fnmatch.fnmatch``. - - The argument is a list of lines which have to match and can use glob - wildcards. If they do not match a pytest.fail() is called. The - matches and non-matches are also printed on stdout. - - """ - __tracebackhide__ = True - self._match_lines(lines2, fnmatch, "fnmatch") - - def re_match_lines(self, lines2): - """Search captured text for matching lines using ``re.match``. - - The argument is a list of lines which have to match using ``re.match``. - If they do not match a pytest.fail() is called. - - The matches and non-matches are also printed on stdout. - - """ - __tracebackhide__ = True - self._match_lines(lines2, lambda name, pat: re.match(pat, name), "re.match") - - def _match_lines(self, lines2, match_func, match_nickname): - """Underlying implementation of ``fnmatch_lines`` and ``re_match_lines``. - - :param list[str] lines2: list of string patterns to match. The actual - format depends on ``match_func`` - :param match_func: a callable ``match_func(line, pattern)`` where line - is the captured line from stdout/stderr and pattern is the matching - pattern - :param str match_nickname: the nickname for the match function that - will be logged to stdout when a match occurs - - """ - assert isinstance(lines2, Sequence) - lines2 = self._getlines(lines2) - lines1 = self.lines[:] - nextline = None - extralines = [] - __tracebackhide__ = True - for line in lines2: - nomatchprinted = False - while lines1: - nextline = lines1.pop(0) - if line == nextline: - self._log("exact match:", repr(line)) - break - elif match_func(nextline, line): - self._log("%s:" % match_nickname, repr(line)) - self._log(" with:", repr(nextline)) - break - else: - if not nomatchprinted: - self._log("nomatch:", repr(line)) - nomatchprinted = True - self._log(" and:", repr(nextline)) - extralines.append(nextline) - else: - self._log("remains unmatched: %r" % (line,)) - pytest.fail(self._log_text) |