diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /contrib/python/pytest/py2/_pytest/pytester.py | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'contrib/python/pytest/py2/_pytest/pytester.py')
-rw-r--r-- | contrib/python/pytest/py2/_pytest/pytester.py | 1413 |
1 files changed, 1413 insertions, 0 deletions
diff --git a/contrib/python/pytest/py2/_pytest/pytester.py b/contrib/python/pytest/py2/_pytest/pytester.py new file mode 100644 index 0000000000..f1d739c991 --- /dev/null +++ b/contrib/python/pytest/py2/_pytest/pytester.py @@ -0,0 +1,1413 @@ +# -*- coding: utf-8 -*- +"""(disabled by default) support for testing pytest and pytest plugins.""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import codecs +import gc +import os +import platform +import re +import subprocess +import sys +import time +import traceback +from fnmatch import fnmatch +from weakref import WeakKeyDictionary + +import py +import six + +import pytest +from _pytest._code import Source +from _pytest._io.saferepr import saferepr +from _pytest.assertion.rewrite import AssertionRewritingHook +from _pytest.capture import MultiCapture +from _pytest.capture import SysCapture +from _pytest.compat import safe_str +from _pytest.compat import Sequence +from _pytest.main import EXIT_INTERRUPTED +from _pytest.main import EXIT_OK +from _pytest.main import Session +from _pytest.monkeypatch import MonkeyPatch +from _pytest.pathlib import Path + +IGNORE_PAM = [ # filenames added when obtaining details about the current user + u"/var/lib/sss/mc/passwd" +] + + +def pytest_addoption(parser): + parser.addoption( + "--lsof", + action="store_true", + dest="lsof", + default=False, + help="run FD checks if lsof is available", + ) + + parser.addoption( + "--runpytest", + default="inprocess", + dest="runpytest", + choices=("inprocess", "subprocess"), + help=( + "run pytest sub runs in tests using an 'inprocess' " + "or 'subprocess' (python -m main) method" + ), + ) + + parser.addini( + "pytester_example_dir", help="directory to take the pytester example files from" + ) + + +def pytest_configure(config): + if config.getvalue("lsof"): + checker = LsofFdLeakChecker() + if checker.matching_platform(): + config.pluginmanager.register(checker) + + config.addinivalue_line( + "markers", + "pytester_example_path(*path_segments): join the given path " + "segments to `pytester_example_dir` for this test.", + ) + + +def raise_on_kwargs(kwargs): + __tracebackhide__ = True + if kwargs: # pragma: no branch + raise TypeError( + "Unexpected keyword arguments: {}".format(", ".join(sorted(kwargs))) + ) + + +class LsofFdLeakChecker(object): + def get_open_files(self): + out = self._exec_lsof() + open_files = self._parse_lsof_output(out) + return open_files + + def _exec_lsof(self): + pid = os.getpid() + # py3: use subprocess.DEVNULL directly. + with open(os.devnull, "wb") as devnull: + return subprocess.check_output( + ("lsof", "-Ffn0", "-p", str(pid)), stderr=devnull + ).decode() + + def _parse_lsof_output(self, out): + def isopen(line): + return line.startswith("f") and ( + "deleted" not in line + and "mem" not in line + and "txt" not in line + and "cwd" not in line + ) + + open_files = [] + + for line in out.split("\n"): + if isopen(line): + fields = line.split("\0") + fd = fields[0][1:] + filename = fields[1][1:] + if filename in IGNORE_PAM: + continue + if filename.startswith("/"): + open_files.append((fd, filename)) + + return open_files + + def matching_platform(self): + try: + subprocess.check_output(("lsof", "-v")) + except (OSError, subprocess.CalledProcessError): + return False + else: + return True + + @pytest.hookimpl(hookwrapper=True, tryfirst=True) + def pytest_runtest_protocol(self, item): + lines1 = self.get_open_files() + yield + if hasattr(sys, "pypy_version_info"): + gc.collect() + lines2 = self.get_open_files() + + new_fds = {t[0] for t in lines2} - {t[0] for t in lines1} + leaked_files = [t for t in lines2 if t[0] in new_fds] + if leaked_files: + error = [] + error.append("***** %s FD leakage detected" % len(leaked_files)) + error.extend([str(f) for f in leaked_files]) + error.append("*** Before:") + error.extend([str(f) for f in lines1]) + error.append("*** After:") + error.extend([str(f) for f in lines2]) + error.append(error[0]) + error.append("*** function %s:%s: %s " % item.location) + error.append("See issue #2366") + item.warn(pytest.PytestWarning("\n".join(error))) + + +# used at least by pytest-xdist plugin + + +@pytest.fixture +def _pytest(request): + """Return a helper which offers a gethookrecorder(hook) method which + returns a HookRecorder instance which helps to make assertions about called + hooks. + + """ + return PytestArg(request) + + +class PytestArg(object): + def __init__(self, request): + self.request = request + + def gethookrecorder(self, hook): + hookrecorder = HookRecorder(hook._pm) + self.request.addfinalizer(hookrecorder.finish_recording) + return hookrecorder + + +def get_public_names(values): + """Only return names from iterator values without a leading underscore.""" + return [x for x in values if x[0] != "_"] + + +class ParsedCall(object): + def __init__(self, name, kwargs): + self.__dict__.update(kwargs) + self._name = name + + def __repr__(self): + d = self.__dict__.copy() + del d["_name"] + return "<ParsedCall %r(**%r)>" % (self._name, d) + + +class HookRecorder(object): + """Record all hooks called in a plugin manager. + + This wraps all the hook calls in the plugin manager, recording each call + before propagating the normal calls. + + """ + + def __init__(self, pluginmanager): + self._pluginmanager = pluginmanager + self.calls = [] + + def before(hook_name, hook_impls, kwargs): + self.calls.append(ParsedCall(hook_name, kwargs)) + + def after(outcome, hook_name, hook_impls, kwargs): + pass + + self._undo_wrapping = pluginmanager.add_hookcall_monitoring(before, after) + + def finish_recording(self): + self._undo_wrapping() + + def getcalls(self, names): + if isinstance(names, str): + names = names.split() + return [call for call in self.calls if call._name in names] + + def assert_contains(self, entries): + __tracebackhide__ = True + i = 0 + entries = list(entries) + backlocals = sys._getframe(1).f_locals + while entries: + name, check = entries.pop(0) + for ind, call in enumerate(self.calls[i:]): + if call._name == name: + print("NAMEMATCH", name, call) + if eval(check, backlocals, call.__dict__): + print("CHECKERMATCH", repr(check), "->", call) + else: + print("NOCHECKERMATCH", repr(check), "-", call) + continue + i += ind + 1 + break + print("NONAMEMATCH", name, "with", call) + else: + pytest.fail("could not find %r check %r" % (name, check)) + + def popcall(self, name): + __tracebackhide__ = True + for i, call in enumerate(self.calls): + if call._name == name: + del self.calls[i] + return call + lines = ["could not find call %r, in:" % (name,)] + lines.extend([" %s" % x for x in self.calls]) + pytest.fail("\n".join(lines)) + + def getcall(self, name): + values = self.getcalls(name) + assert len(values) == 1, (name, values) + return values[0] + + # functionality for test reports + + def getreports(self, names="pytest_runtest_logreport pytest_collectreport"): + return [x.report for x in self.getcalls(names)] + + def matchreport( + self, + inamepart="", + names="pytest_runtest_logreport pytest_collectreport", + when=None, + ): + """return a testreport whose dotted import path matches""" + values = [] + for rep in self.getreports(names=names): + if not when and rep.when != "call" and rep.passed: + # setup/teardown passing reports - let's ignore those + continue + if when and rep.when != when: + continue + if not inamepart or inamepart in rep.nodeid.split("::"): + values.append(rep) + if not values: + raise ValueError( + "could not find test report matching %r: " + "no test reports at all!" % (inamepart,) + ) + if len(values) > 1: + raise ValueError( + "found 2 or more testreports matching %r: %s" % (inamepart, values) + ) + return values[0] + + def getfailures(self, names="pytest_runtest_logreport pytest_collectreport"): + return [rep for rep in self.getreports(names) if rep.failed] + + def getfailedcollections(self): + return self.getfailures("pytest_collectreport") + + def listoutcomes(self): + passed = [] + skipped = [] + failed = [] + for rep in self.getreports("pytest_collectreport pytest_runtest_logreport"): + if rep.passed: + if rep.when == "call": + passed.append(rep) + elif rep.skipped: + skipped.append(rep) + else: + assert rep.failed, "Unexpected outcome: {!r}".format(rep) + failed.append(rep) + return passed, skipped, failed + + def countoutcomes(self): + return [len(x) for x in self.listoutcomes()] + + def assertoutcome(self, passed=0, skipped=0, failed=0): + realpassed, realskipped, realfailed = self.listoutcomes() + assert passed == len(realpassed) + assert skipped == len(realskipped) + assert failed == len(realfailed) + + def clear(self): + self.calls[:] = [] + + +@pytest.fixture +def linecomp(request): + return LineComp() + + +@pytest.fixture(name="LineMatcher") +def LineMatcher_fixture(request): + return LineMatcher + + +@pytest.fixture +def testdir(request, tmpdir_factory): + return Testdir(request, tmpdir_factory) + + +@pytest.fixture +def _sys_snapshot(): + snappaths = SysPathsSnapshot() + snapmods = SysModulesSnapshot() + yield + snapmods.restore() + snappaths.restore() + + +@pytest.fixture +def _config_for_test(): + from _pytest.config import get_config + + config = get_config() + yield config + config._ensure_unconfigure() # cleanup, e.g. capman closing tmpfiles. + + +rex_outcome = re.compile(r"(\d+) ([\w-]+)") + + +class RunResult(object): + """The result of running a command. + + Attributes: + + :ret: the return value + :outlines: list of lines captured from stdout + :errlines: list of lines captures from stderr + :stdout: :py:class:`LineMatcher` of stdout, use ``stdout.str()`` to + reconstruct stdout or the commonly used ``stdout.fnmatch_lines()`` + method + :stderr: :py:class:`LineMatcher` of stderr + :duration: duration in seconds + + """ + + def __init__(self, ret, outlines, errlines, duration): + self.ret = ret + self.outlines = outlines + self.errlines = errlines + self.stdout = LineMatcher(outlines) + self.stderr = LineMatcher(errlines) + self.duration = duration + + def __repr__(self): + return ( + "<RunResult ret=%r len(stdout.lines)=%d len(stderr.lines)=%d duration=%.2fs>" + % (self.ret, len(self.stdout.lines), len(self.stderr.lines), self.duration) + ) + + def parseoutcomes(self): + """Return a dictionary of outcomestring->num from parsing the terminal + output that the test process produced. + + """ + for line in reversed(self.outlines): + if "seconds" in line: + outcomes = rex_outcome.findall(line) + if outcomes: + d = {} + for num, cat in outcomes: + d[cat] = int(num) + return d + raise ValueError("Pytest terminal report not found") + + def assert_outcomes( + self, passed=0, skipped=0, failed=0, error=0, xpassed=0, xfailed=0 + ): + """Assert that the specified outcomes appear with the respective + numbers (0 means it didn't occur) in the text output from a test run. + + """ + d = self.parseoutcomes() + obtained = { + "passed": d.get("passed", 0), + "skipped": d.get("skipped", 0), + "failed": d.get("failed", 0), + "error": d.get("error", 0), + "xpassed": d.get("xpassed", 0), + "xfailed": d.get("xfailed", 0), + } + expected = { + "passed": passed, + "skipped": skipped, + "failed": failed, + "error": error, + "xpassed": xpassed, + "xfailed": xfailed, + } + assert obtained == expected + + +class CwdSnapshot(object): + def __init__(self): + self.__saved = os.getcwd() + + def restore(self): + os.chdir(self.__saved) + + +class SysModulesSnapshot(object): + def __init__(self, preserve=None): + self.__preserve = preserve + self.__saved = dict(sys.modules) + + def restore(self): + if self.__preserve: + self.__saved.update( + (k, m) for k, m in sys.modules.items() if self.__preserve(k) + ) + sys.modules.clear() + sys.modules.update(self.__saved) + + +class SysPathsSnapshot(object): + def __init__(self): + self.__saved = list(sys.path), list(sys.meta_path) + + def restore(self): + sys.path[:], sys.meta_path[:] = self.__saved + + +class Testdir(object): + """Temporary test directory with tools to test/run pytest itself. + + This is based on the ``tmpdir`` fixture but provides a number of methods + which aid with testing pytest itself. Unless :py:meth:`chdir` is used all + methods will use :py:attr:`tmpdir` as their current working directory. + + Attributes: + + :tmpdir: The :py:class:`py.path.local` instance of the temporary directory. + + :plugins: A list of plugins to use with :py:meth:`parseconfig` and + :py:meth:`runpytest`. Initially this is an empty list but plugins can + be added to the list. The type of items to add to the list depends on + the method using them so refer to them for details. + + """ + + CLOSE_STDIN = object + + class TimeoutExpired(Exception): + pass + + def __init__(self, request, tmpdir_factory): + self.request = request + self._mod_collections = WeakKeyDictionary() + name = request.function.__name__ + self.tmpdir = tmpdir_factory.mktemp(name, numbered=True) + self.test_tmproot = tmpdir_factory.mktemp("tmp-" + name, numbered=True) + self.plugins = [] + self._cwd_snapshot = CwdSnapshot() + self._sys_path_snapshot = SysPathsSnapshot() + self._sys_modules_snapshot = self.__take_sys_modules_snapshot() + self.chdir() + self.request.addfinalizer(self.finalize) + method = self.request.config.getoption("--runpytest") + if method == "inprocess": + self._runpytest_method = self.runpytest_inprocess + elif method == "subprocess": + self._runpytest_method = self.runpytest_subprocess + + mp = self.monkeypatch = MonkeyPatch() + mp.setenv("PYTEST_DEBUG_TEMPROOT", str(self.test_tmproot)) + # Ensure no unexpected caching via tox. + mp.delenv("TOX_ENV_DIR", raising=False) + # Discard outer pytest options. + mp.delenv("PYTEST_ADDOPTS", raising=False) + + # Environment (updates) for inner runs. + tmphome = str(self.tmpdir) + self._env_run_update = {"HOME": tmphome, "USERPROFILE": tmphome} + + def __repr__(self): + return "<Testdir %r>" % (self.tmpdir,) + + def __str__(self): + return str(self.tmpdir) + + def finalize(self): + """Clean up global state artifacts. + + Some methods modify the global interpreter state and this tries to + clean this up. It does not remove the temporary directory however so + it can be looked at after the test run has finished. + + """ + self._sys_modules_snapshot.restore() + self._sys_path_snapshot.restore() + self._cwd_snapshot.restore() + self.monkeypatch.undo() + + def __take_sys_modules_snapshot(self): + # some zope modules used by twisted-related tests keep internal state + # and can't be deleted; we had some trouble in the past with + # `zope.interface` for example + def preserve_module(name): + return name.startswith("zope") + + return SysModulesSnapshot(preserve=preserve_module) + + def make_hook_recorder(self, pluginmanager): + """Create a new :py:class:`HookRecorder` for a PluginManager.""" + pluginmanager.reprec = reprec = HookRecorder(pluginmanager) + self.request.addfinalizer(reprec.finish_recording) + return reprec + + def chdir(self): + """Cd into the temporary directory. + + This is done automatically upon instantiation. + + """ + self.tmpdir.chdir() + + def _makefile(self, ext, args, kwargs, encoding="utf-8"): + items = list(kwargs.items()) + + def to_text(s): + return s.decode(encoding) if isinstance(s, bytes) else six.text_type(s) + + if args: + source = u"\n".join(to_text(x) for x in args) + basename = self.request.function.__name__ + items.insert(0, (basename, source)) + + ret = None + for basename, value in items: + p = self.tmpdir.join(basename).new(ext=ext) + p.dirpath().ensure_dir() + source = Source(value) + source = u"\n".join(to_text(line) for line in source.lines) + p.write(source.strip().encode(encoding), "wb") + if ret is None: + ret = p + return ret + + def makefile(self, ext, *args, **kwargs): + r"""Create new file(s) in the testdir. + + :param str ext: The extension the file(s) should use, including the dot, e.g. `.py`. + :param list[str] args: All args will be treated as strings and joined using newlines. + The result will be written as contents to the file. The name of the + file will be based on the test function requesting this fixture. + :param kwargs: Each keyword is the name of a file, while the value of it will + be written as contents of the file. + + Examples: + + .. code-block:: python + + testdir.makefile(".txt", "line1", "line2") + + testdir.makefile(".ini", pytest="[pytest]\naddopts=-rs\n") + + """ + return self._makefile(ext, args, kwargs) + + def makeconftest(self, source): + """Write a contest.py file with 'source' as contents.""" + return self.makepyfile(conftest=source) + + def makeini(self, source): + """Write a tox.ini file with 'source' as contents.""" + return self.makefile(".ini", tox=source) + + def getinicfg(self, source): + """Return the pytest section from the tox.ini config file.""" + p = self.makeini(source) + return py.iniconfig.IniConfig(p)["pytest"] + + def makepyfile(self, *args, **kwargs): + """Shortcut for .makefile() with a .py extension.""" + return self._makefile(".py", args, kwargs) + + def maketxtfile(self, *args, **kwargs): + """Shortcut for .makefile() with a .txt extension.""" + return self._makefile(".txt", args, kwargs) + + def syspathinsert(self, path=None): + """Prepend a directory to sys.path, defaults to :py:attr:`tmpdir`. + + This is undone automatically when this object dies at the end of each + test. + """ + if path is None: + path = self.tmpdir + + self.monkeypatch.syspath_prepend(str(path)) + + def mkdir(self, name): + """Create a new (sub)directory.""" + return self.tmpdir.mkdir(name) + + def mkpydir(self, name): + """Create a new python package. + + This creates a (sub)directory with an empty ``__init__.py`` file so it + gets recognised as a python package. + + """ + p = self.mkdir(name) + p.ensure("__init__.py") + return p + + def copy_example(self, name=None): + import warnings + from _pytest.warning_types import PYTESTER_COPY_EXAMPLE + + warnings.warn(PYTESTER_COPY_EXAMPLE, stacklevel=2) + example_dir = self.request.config.getini("pytester_example_dir") + if example_dir is None: + raise ValueError("pytester_example_dir is unset, can't copy examples") + example_dir = self.request.config.rootdir.join(example_dir) + + for extra_element in self.request.node.iter_markers("pytester_example_path"): + assert extra_element.args + example_dir = example_dir.join(*extra_element.args) + + if name is None: + func_name = self.request.function.__name__ + maybe_dir = example_dir / func_name + maybe_file = example_dir / (func_name + ".py") + + if maybe_dir.isdir(): + example_path = maybe_dir + elif maybe_file.isfile(): + example_path = maybe_file + else: + raise LookupError( + "{} cant be found as module or package in {}".format( + func_name, example_dir.bestrelpath(self.request.config.rootdir) + ) + ) + else: + example_path = example_dir.join(name) + + if example_path.isdir() and not example_path.join("__init__.py").isfile(): + example_path.copy(self.tmpdir) + return self.tmpdir + elif example_path.isfile(): + result = self.tmpdir.join(example_path.basename) + example_path.copy(result) + return result + else: + raise LookupError( + 'example "{}" is not found as a file or directory'.format(example_path) + ) + + Session = Session + + def getnode(self, config, arg): + """Return the collection node of a file. + + :param config: :py:class:`_pytest.config.Config` instance, see + :py:meth:`parseconfig` and :py:meth:`parseconfigure` to create the + configuration + + :param arg: a :py:class:`py.path.local` instance of the file + + """ + session = Session(config) + assert "::" not in str(arg) + p = py.path.local(arg) + config.hook.pytest_sessionstart(session=session) + res = session.perform_collect([str(p)], genitems=False)[0] + config.hook.pytest_sessionfinish(session=session, exitstatus=EXIT_OK) + return res + + def getpathnode(self, path): + """Return the collection node of a file. + + This is like :py:meth:`getnode` but uses :py:meth:`parseconfigure` to + create the (configured) pytest Config instance. + + :param path: a :py:class:`py.path.local` instance of the file + + """ + config = self.parseconfigure(path) + session = Session(config) + x = session.fspath.bestrelpath(path) + config.hook.pytest_sessionstart(session=session) + res = session.perform_collect([x], genitems=False)[0] + config.hook.pytest_sessionfinish(session=session, exitstatus=EXIT_OK) + return res + + def genitems(self, colitems): + """Generate all test items from a collection node. + + This recurses into the collection node and returns a list of all the + test items contained within. + + """ + session = colitems[0].session + result = [] + for colitem in colitems: + result.extend(session.genitems(colitem)) + return result + + def runitem(self, source): + """Run the "test_func" Item. + + The calling test instance (class containing the test method) must + provide a ``.getrunner()`` method which should return a runner which + can run the test protocol for a single item, e.g. + :py:func:`_pytest.runner.runtestprotocol`. + + """ + # used from runner functional tests + item = self.getitem(source) + # the test class where we are called from wants to provide the runner + testclassinstance = self.request.instance + runner = testclassinstance.getrunner() + return runner(item) + + def inline_runsource(self, source, *cmdlineargs): + """Run a test module in process using ``pytest.main()``. + + This run writes "source" into a temporary file and runs + ``pytest.main()`` on it, returning a :py:class:`HookRecorder` instance + for the result. + + :param source: the source code of the test module + + :param cmdlineargs: any extra command line arguments to use + + :return: :py:class:`HookRecorder` instance of the result + + """ + p = self.makepyfile(source) + values = list(cmdlineargs) + [p] + return self.inline_run(*values) + + def inline_genitems(self, *args): + """Run ``pytest.main(['--collectonly'])`` in-process. + + Runs the :py:func:`pytest.main` function to run all of pytest inside + the test process itself like :py:meth:`inline_run`, but returns a + tuple of the collected items and a :py:class:`HookRecorder` instance. + + """ + rec = self.inline_run("--collect-only", *args) + items = [x.item for x in rec.getcalls("pytest_itemcollected")] + return items, rec + + def inline_run(self, *args, **kwargs): + """Run ``pytest.main()`` in-process, returning a HookRecorder. + + Runs the :py:func:`pytest.main` function to run all of pytest inside + the test process itself. This means it can return a + :py:class:`HookRecorder` instance which gives more detailed results + from that run than can be done by matching stdout/stderr from + :py:meth:`runpytest`. + + :param args: command line arguments to pass to :py:func:`pytest.main` + + :param plugins: (keyword-only) extra plugin instances the + ``pytest.main()`` instance should use + + :return: a :py:class:`HookRecorder` instance + """ + plugins = kwargs.pop("plugins", []) + no_reraise_ctrlc = kwargs.pop("no_reraise_ctrlc", None) + raise_on_kwargs(kwargs) + + finalizers = [] + try: + # Do not load user config (during runs only). + mp_run = MonkeyPatch() + for k, v in self._env_run_update.items(): + mp_run.setenv(k, v) + finalizers.append(mp_run.undo) + + # When running pytest inline any plugins active in the main test + # process are already imported. So this disables the warning which + # will trigger to say they can no longer be rewritten, which is + # fine as they have already been rewritten. + orig_warn = AssertionRewritingHook._warn_already_imported + + def revert_warn_already_imported(): + AssertionRewritingHook._warn_already_imported = orig_warn + + finalizers.append(revert_warn_already_imported) + AssertionRewritingHook._warn_already_imported = lambda *a: None + + # Any sys.module or sys.path changes done while running pytest + # inline should be reverted after the test run completes to avoid + # clashing with later inline tests run within the same pytest test, + # e.g. just because they use matching test module names. + finalizers.append(self.__take_sys_modules_snapshot().restore) + finalizers.append(SysPathsSnapshot().restore) + + # Important note: + # - our tests should not leave any other references/registrations + # laying around other than possibly loaded test modules + # referenced from sys.modules, as nothing will clean those up + # automatically + + rec = [] + + class Collect(object): + def pytest_configure(x, config): + rec.append(self.make_hook_recorder(config.pluginmanager)) + + plugins.append(Collect()) + ret = pytest.main(list(args), plugins=plugins) + if len(rec) == 1: + reprec = rec.pop() + else: + + class reprec(object): + pass + + reprec.ret = ret + + # typically we reraise keyboard interrupts from the child run + # because it's our user requesting interruption of the testing + if ret == EXIT_INTERRUPTED and not no_reraise_ctrlc: + calls = reprec.getcalls("pytest_keyboard_interrupt") + if calls and calls[-1].excinfo.type == KeyboardInterrupt: + raise KeyboardInterrupt() + return reprec + finally: + for finalizer in finalizers: + finalizer() + + def runpytest_inprocess(self, *args, **kwargs): + """Return result of running pytest in-process, providing a similar + interface to what self.runpytest() provides. + """ + syspathinsert = kwargs.pop("syspathinsert", False) + + if syspathinsert: + self.syspathinsert() + now = time.time() + capture = MultiCapture(Capture=SysCapture) + capture.start_capturing() + try: + try: + reprec = self.inline_run(*args, **kwargs) + except SystemExit as e: + + class reprec(object): + ret = e.args[0] + + except Exception: + traceback.print_exc() + + class reprec(object): + ret = 3 + + finally: + out, err = capture.readouterr() + capture.stop_capturing() + sys.stdout.write(out) + sys.stderr.write(err) + + res = RunResult(reprec.ret, out.split("\n"), err.split("\n"), time.time() - now) + res.reprec = reprec + return res + + def runpytest(self, *args, **kwargs): + """Run pytest inline or in a subprocess, depending on the command line + option "--runpytest" and return a :py:class:`RunResult`. + + """ + args = self._ensure_basetemp(args) + return self._runpytest_method(*args, **kwargs) + + def _ensure_basetemp(self, args): + args = list(args) + for x in args: + if safe_str(x).startswith("--basetemp"): + break + else: + args.append("--basetemp=%s" % self.tmpdir.dirpath("basetemp")) + return args + + def parseconfig(self, *args): + """Return a new pytest Config instance from given commandline args. + + This invokes the pytest bootstrapping code in _pytest.config to create + a new :py:class:`_pytest.core.PluginManager` and call the + pytest_cmdline_parse hook to create a new + :py:class:`_pytest.config.Config` instance. + + If :py:attr:`plugins` has been populated they should be plugin modules + to be registered with the PluginManager. + + """ + args = self._ensure_basetemp(args) + + import _pytest.config + + config = _pytest.config._prepareconfig(args, self.plugins) + # we don't know what the test will do with this half-setup config + # object and thus we make sure it gets unconfigured properly in any + # case (otherwise capturing could still be active, for example) + self.request.addfinalizer(config._ensure_unconfigure) + return config + + def parseconfigure(self, *args): + """Return a new pytest configured Config instance. + + This returns a new :py:class:`_pytest.config.Config` instance like + :py:meth:`parseconfig`, but also calls the pytest_configure hook. + + """ + config = self.parseconfig(*args) + config._do_configure() + self.request.addfinalizer(config._ensure_unconfigure) + return config + + def getitem(self, source, funcname="test_func"): + """Return the test item for a test function. + + This writes the source to a python file and runs pytest's collection on + the resulting module, returning the test item for the requested + function name. + + :param source: the module source + + :param funcname: the name of the test function for which to return a + test item + + """ + items = self.getitems(source) + for item in items: + if item.name == funcname: + return item + assert 0, "%r item not found in module:\n%s\nitems: %s" % ( + funcname, + source, + items, + ) + + def getitems(self, source): + """Return all test items collected from the module. + + This writes the source to a python file and runs pytest's collection on + the resulting module, returning all test items contained within. + + """ + modcol = self.getmodulecol(source) + return self.genitems([modcol]) + + def getmodulecol(self, source, configargs=(), withinit=False): + """Return the module collection node for ``source``. + + This writes ``source`` to a file using :py:meth:`makepyfile` and then + runs the pytest collection on it, returning the collection node for the + test module. + + :param source: the source code of the module to collect + + :param configargs: any extra arguments to pass to + :py:meth:`parseconfigure` + + :param withinit: whether to also write an ``__init__.py`` file to the + same directory to ensure it is a package + + """ + if isinstance(source, Path): + path = self.tmpdir.join(str(source)) + assert not withinit, "not supported for paths" + else: + kw = {self.request.function.__name__: Source(source).strip()} + path = self.makepyfile(**kw) + if withinit: + self.makepyfile(__init__="#") + self.config = config = self.parseconfigure(path, *configargs) + return self.getnode(config, path) + + def collect_by_name(self, modcol, name): + """Return the collection node for name from the module collection. + + This will search a module collection node for a collection node + matching the given name. + + :param modcol: a module collection node; see :py:meth:`getmodulecol` + + :param name: the name of the node to return + + """ + if modcol not in self._mod_collections: + self._mod_collections[modcol] = list(modcol.collect()) + for colitem in self._mod_collections[modcol]: + if colitem.name == name: + return colitem + + def popen( + self, + cmdargs, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=CLOSE_STDIN, + **kw + ): + """Invoke subprocess.Popen. + + This calls subprocess.Popen making sure the current working directory + is in the PYTHONPATH. + + You probably want to use :py:meth:`run` instead. + + """ + env = os.environ.copy() + env["PYTHONPATH"] = os.pathsep.join( + filter(None, [os.getcwd(), env.get("PYTHONPATH", "")]) + ) + env.update(self._env_run_update) + kw["env"] = env + + if stdin is Testdir.CLOSE_STDIN: + kw["stdin"] = subprocess.PIPE + elif isinstance(stdin, bytes): + kw["stdin"] = subprocess.PIPE + else: + kw["stdin"] = stdin + + popen = subprocess.Popen(cmdargs, stdout=stdout, stderr=stderr, **kw) + if stdin is Testdir.CLOSE_STDIN: + popen.stdin.close() + elif isinstance(stdin, bytes): + popen.stdin.write(stdin) + + return popen + + def run(self, *cmdargs, **kwargs): + """Run a command with arguments. + + Run a process using subprocess.Popen saving the stdout and stderr. + + :param args: the sequence of arguments to pass to `subprocess.Popen()` + :param timeout: the period in seconds after which to timeout and raise + :py:class:`Testdir.TimeoutExpired` + :param stdin: optional standard input. Bytes are being send, closing + the pipe, otherwise it is passed through to ``popen``. + Defaults to ``CLOSE_STDIN``, which translates to using a pipe + (``subprocess.PIPE``) that gets closed. + + Returns a :py:class:`RunResult`. + + """ + __tracebackhide__ = True + + timeout = kwargs.pop("timeout", None) + stdin = kwargs.pop("stdin", Testdir.CLOSE_STDIN) + raise_on_kwargs(kwargs) + + cmdargs = [ + str(arg) if isinstance(arg, py.path.local) else arg for arg in cmdargs + ] + p1 = self.tmpdir.join("stdout") + p2 = self.tmpdir.join("stderr") + print("running:", *cmdargs) + print(" in:", py.path.local()) + f1 = codecs.open(str(p1), "w", encoding="utf8") + f2 = codecs.open(str(p2), "w", encoding="utf8") + try: + now = time.time() + popen = self.popen( + cmdargs, + stdin=stdin, + stdout=f1, + stderr=f2, + close_fds=(sys.platform != "win32"), + ) + if isinstance(stdin, bytes): + popen.stdin.close() + + def handle_timeout(): + __tracebackhide__ = True + + timeout_message = ( + "{seconds} second timeout expired running:" + " {command}".format(seconds=timeout, command=cmdargs) + ) + + popen.kill() + popen.wait() + raise self.TimeoutExpired(timeout_message) + + if timeout is None: + ret = popen.wait() + elif not six.PY2: + try: + ret = popen.wait(timeout) + except subprocess.TimeoutExpired: + handle_timeout() + else: + end = time.time() + timeout + + resolution = min(0.1, timeout / 10) + + while True: + ret = popen.poll() + if ret is not None: + break + + if time.time() > end: + handle_timeout() + + time.sleep(resolution) + finally: + f1.close() + f2.close() + f1 = codecs.open(str(p1), "r", encoding="utf8") + f2 = codecs.open(str(p2), "r", encoding="utf8") + try: + out = f1.read().splitlines() + err = f2.read().splitlines() + finally: + f1.close() + f2.close() + self._dump_lines(out, sys.stdout) + self._dump_lines(err, sys.stderr) + return RunResult(ret, out, err, time.time() - now) + + def _dump_lines(self, lines, fp): + try: + for line in lines: + print(line, file=fp) + except UnicodeEncodeError: + print("couldn't print to %s because of encoding" % (fp,)) + + def _getpytestargs(self): + return sys.executable, "-mpytest" + + def runpython(self, script): + """Run a python script using sys.executable as interpreter. + + Returns a :py:class:`RunResult`. + + """ + return self.run(sys.executable, script) + + def runpython_c(self, command): + """Run python -c "command", return a :py:class:`RunResult`.""" + return self.run(sys.executable, "-c", command) + + def runpytest_subprocess(self, *args, **kwargs): + """Run pytest as a subprocess with given arguments. + + Any plugins added to the :py:attr:`plugins` list will be added using the + ``-p`` command line option. Additionally ``--basetemp`` is used to put + any temporary files and directories in a numbered directory prefixed + with "runpytest-" to not conflict with the normal numbered pytest + location for temporary files and directories. + + :param args: the sequence of arguments to pass to the pytest subprocess + :param timeout: the period in seconds after which to timeout and raise + :py:class:`Testdir.TimeoutExpired` + + Returns a :py:class:`RunResult`. + """ + __tracebackhide__ = True + timeout = kwargs.pop("timeout", None) + raise_on_kwargs(kwargs) + + p = py.path.local.make_numbered_dir( + prefix="runpytest-", keep=None, rootdir=self.tmpdir + ) + args = ("--basetemp=%s" % p,) + args + plugins = [x for x in self.plugins if isinstance(x, str)] + if plugins: + args = ("-p", plugins[0]) + args + args = self._getpytestargs() + args + return self.run(*args, timeout=timeout) + + def spawn_pytest(self, string, expect_timeout=10.0): + """Run pytest using pexpect. + + This makes sure to use the right pytest and sets up the temporary + directory locations. + + The pexpect child is returned. + + """ + basetemp = self.tmpdir.mkdir("temp-pexpect") + invoke = " ".join(map(str, self._getpytestargs())) + cmd = "%s --basetemp=%s %s" % (invoke, basetemp, string) + return self.spawn(cmd, expect_timeout=expect_timeout) + + def spawn(self, cmd, expect_timeout=10.0): + """Run a command using pexpect. + + The pexpect child is returned. + + """ + pexpect = pytest.importorskip("pexpect", "3.0") + if hasattr(sys, "pypy_version_info") and "64" in platform.machine(): + pytest.skip("pypy-64 bit not supported") + if sys.platform.startswith("freebsd"): + pytest.xfail("pexpect does not work reliably on freebsd") + logfile = self.tmpdir.join("spawn.out").open("wb") + + # Do not load user config. + env = os.environ.copy() + env.update(self._env_run_update) + + child = pexpect.spawn(cmd, logfile=logfile, env=env) + self.request.addfinalizer(logfile.close) + child.timeout = expect_timeout + return child + + +def getdecoded(out): + try: + return out.decode("utf-8") + except UnicodeDecodeError: + return "INTERNAL not-utf8-decodeable, truncated string:\n%s" % (saferepr(out),) + + +class LineComp(object): + def __init__(self): + self.stringio = py.io.TextIO() + + def assert_contains_lines(self, lines2): + """Assert that lines2 are contained (linearly) in lines1. + + Return a list of extralines found. + + """ + __tracebackhide__ = True + val = self.stringio.getvalue() + self.stringio.truncate(0) + self.stringio.seek(0) + lines1 = val.split("\n") + return LineMatcher(lines1).fnmatch_lines(lines2) + + +class LineMatcher(object): + """Flexible matching of text. + + This is a convenience class to test large texts like the output of + commands. + + The constructor takes a list of lines without their trailing newlines, i.e. + ``text.splitlines()``. + + """ + + def __init__(self, lines): + self.lines = lines + self._log_output = [] + + def str(self): + """Return the entire original text.""" + return "\n".join(self.lines) + + def _getlines(self, lines2): + if isinstance(lines2, str): + lines2 = Source(lines2) + if isinstance(lines2, Source): + lines2 = lines2.strip().lines + return lines2 + + def fnmatch_lines_random(self, lines2): + """Check lines exist in the output using in any order. + + Lines are checked using ``fnmatch.fnmatch``. The argument is a list of + lines which have to occur in the output, in any order. + + """ + self._match_lines_random(lines2, fnmatch) + + def re_match_lines_random(self, lines2): + """Check lines exist in the output using ``re.match``, in any order. + + The argument is a list of lines which have to occur in the output, in + any order. + + """ + self._match_lines_random(lines2, lambda name, pat: re.match(pat, name)) + + def _match_lines_random(self, lines2, match_func): + """Check lines exist in the output. + + The argument is a list of lines which have to occur in the output, in + any order. Each line can contain glob whildcards. + + """ + lines2 = self._getlines(lines2) + for line in lines2: + for x in self.lines: + if line == x or match_func(x, line): + self._log("matched: ", repr(line)) + break + else: + self._log("line %r not found in output" % line) + raise ValueError(self._log_text) + + def get_lines_after(self, fnline): + """Return all lines following the given line in the text. + + The given line can contain glob wildcards. + + """ + for i, line in enumerate(self.lines): + if fnline == line or fnmatch(line, fnline): + return self.lines[i + 1 :] + raise ValueError("line %r not found in output" % fnline) + + def _log(self, *args): + self._log_output.append(" ".join(str(x) for x in args)) + + @property + def _log_text(self): + return "\n".join(self._log_output) + + def fnmatch_lines(self, lines2): + """Search captured text for matching lines using ``fnmatch.fnmatch``. + + The argument is a list of lines which have to match and can use glob + wildcards. If they do not match a pytest.fail() is called. The + matches and non-matches are also printed on stdout. + + """ + __tracebackhide__ = True + self._match_lines(lines2, fnmatch, "fnmatch") + + def re_match_lines(self, lines2): + """Search captured text for matching lines using ``re.match``. + + The argument is a list of lines which have to match using ``re.match``. + If they do not match a pytest.fail() is called. + + The matches and non-matches are also printed on stdout. + + """ + __tracebackhide__ = True + self._match_lines(lines2, lambda name, pat: re.match(pat, name), "re.match") + + def _match_lines(self, lines2, match_func, match_nickname): + """Underlying implementation of ``fnmatch_lines`` and ``re_match_lines``. + + :param list[str] lines2: list of string patterns to match. The actual + format depends on ``match_func`` + :param match_func: a callable ``match_func(line, pattern)`` where line + is the captured line from stdout/stderr and pattern is the matching + pattern + :param str match_nickname: the nickname for the match function that + will be logged to stdout when a match occurs + + """ + assert isinstance(lines2, Sequence) + lines2 = self._getlines(lines2) + lines1 = self.lines[:] + nextline = None + extralines = [] + __tracebackhide__ = True + for line in lines2: + nomatchprinted = False + while lines1: + nextline = lines1.pop(0) + if line == nextline: + self._log("exact match:", repr(line)) + break + elif match_func(nextline, line): + self._log("%s:" % match_nickname, repr(line)) + self._log(" with:", repr(nextline)) + break + else: + if not nomatchprinted: + self._log("nomatch:", repr(line)) + nomatchprinted = True + self._log(" and:", repr(nextline)) + extralines.append(nextline) + else: + self._log("remains unmatched: %r" % (line,)) + pytest.fail(self._log_text) |