diff options
| author | nkozlovskiy <[email protected]> | 2023-09-29 12:24:06 +0300 | 
|---|---|---|
| committer | nkozlovskiy <[email protected]> | 2023-09-29 12:41:34 +0300 | 
| commit | e0e3e1717e3d33762ce61950504f9637a6e669ed (patch) | |
| tree | bca3ff6939b10ed60c3d5c12439963a1146b9711 /contrib/python/pytest/py2/_pytest/pytester.py | |
| parent | 38f2c5852db84c7b4d83adfcb009eb61541d1ccd (diff) | |
add ydb deps
Diffstat (limited to 'contrib/python/pytest/py2/_pytest/pytester.py')
| -rw-r--r-- | contrib/python/pytest/py2/_pytest/pytester.py | 1413 | 
1 files changed, 1413 insertions, 0 deletions
| diff --git a/contrib/python/pytest/py2/_pytest/pytester.py b/contrib/python/pytest/py2/_pytest/pytester.py new file mode 100644 index 00000000000..f1d739c9917 --- /dev/null +++ b/contrib/python/pytest/py2/_pytest/pytester.py @@ -0,0 +1,1413 @@ +# -*- coding: utf-8 -*- +"""(disabled by default) support for testing pytest and pytest plugins.""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import codecs +import gc +import os +import platform +import re +import subprocess +import sys +import time +import traceback +from fnmatch import fnmatch +from weakref import WeakKeyDictionary + +import py +import six + +import pytest +from _pytest._code import Source +from _pytest._io.saferepr import saferepr +from _pytest.assertion.rewrite import AssertionRewritingHook +from _pytest.capture import MultiCapture +from _pytest.capture import SysCapture +from _pytest.compat import safe_str +from _pytest.compat import Sequence +from _pytest.main import EXIT_INTERRUPTED +from _pytest.main import EXIT_OK +from _pytest.main import Session +from _pytest.monkeypatch import MonkeyPatch +from _pytest.pathlib import Path + +IGNORE_PAM = [  # filenames added when obtaining details about the current user +    u"/var/lib/sss/mc/passwd" +] + + +def pytest_addoption(parser): +    parser.addoption( +        "--lsof", +        action="store_true", +        dest="lsof", +        default=False, +        help="run FD checks if lsof is available", +    ) + +    parser.addoption( +        "--runpytest", +        default="inprocess", +        dest="runpytest", +        choices=("inprocess", "subprocess"), +        help=( +            "run pytest sub runs in tests using an 'inprocess' " +            "or 'subprocess' (python -m main) method" +        ), +    ) + +    parser.addini( +        "pytester_example_dir", help="directory to take the pytester example files from" +    ) + + +def pytest_configure(config): +    if config.getvalue("lsof"): +        checker = LsofFdLeakChecker() +        if checker.matching_platform(): +            config.pluginmanager.register(checker) + +    config.addinivalue_line( +        "markers", +        "pytester_example_path(*path_segments): join the given path " +        "segments to `pytester_example_dir` for this test.", +    ) + + +def raise_on_kwargs(kwargs): +    __tracebackhide__ = True +    if kwargs:  # pragma: no branch +        raise TypeError( +            "Unexpected keyword arguments: {}".format(", ".join(sorted(kwargs))) +        ) + + +class LsofFdLeakChecker(object): +    def get_open_files(self): +        out = self._exec_lsof() +        open_files = self._parse_lsof_output(out) +        return open_files + +    def _exec_lsof(self): +        pid = os.getpid() +        # py3: use subprocess.DEVNULL directly. +        with open(os.devnull, "wb") as devnull: +            return subprocess.check_output( +                ("lsof", "-Ffn0", "-p", str(pid)), stderr=devnull +            ).decode() + +    def _parse_lsof_output(self, out): +        def isopen(line): +            return line.startswith("f") and ( +                "deleted" not in line +                and "mem" not in line +                and "txt" not in line +                and "cwd" not in line +            ) + +        open_files = [] + +        for line in out.split("\n"): +            if isopen(line): +                fields = line.split("\0") +                fd = fields[0][1:] +                filename = fields[1][1:] +                if filename in IGNORE_PAM: +                    continue +                if filename.startswith("/"): +                    open_files.append((fd, filename)) + +        return open_files + +    def matching_platform(self): +        try: +            subprocess.check_output(("lsof", "-v")) +        except (OSError, subprocess.CalledProcessError): +            return False +        else: +            return True + +    @pytest.hookimpl(hookwrapper=True, tryfirst=True) +    def pytest_runtest_protocol(self, item): +        lines1 = self.get_open_files() +        yield +        if hasattr(sys, "pypy_version_info"): +            gc.collect() +        lines2 = self.get_open_files() + +        new_fds = {t[0] for t in lines2} - {t[0] for t in lines1} +        leaked_files = [t for t in lines2 if t[0] in new_fds] +        if leaked_files: +            error = [] +            error.append("***** %s FD leakage detected" % len(leaked_files)) +            error.extend([str(f) for f in leaked_files]) +            error.append("*** Before:") +            error.extend([str(f) for f in lines1]) +            error.append("*** After:") +            error.extend([str(f) for f in lines2]) +            error.append(error[0]) +            error.append("*** function %s:%s: %s " % item.location) +            error.append("See issue #2366") +            item.warn(pytest.PytestWarning("\n".join(error))) + + +# used at least by pytest-xdist plugin + + +def _pytest(request): +    """Return a helper which offers a gethookrecorder(hook) method which +    returns a HookRecorder instance which helps to make assertions about called +    hooks. + +    """ +    return PytestArg(request) + + +class PytestArg(object): +    def __init__(self, request): +        self.request = request + +    def gethookrecorder(self, hook): +        hookrecorder = HookRecorder(hook._pm) +        self.request.addfinalizer(hookrecorder.finish_recording) +        return hookrecorder + + +def get_public_names(values): +    """Only return names from iterator values without a leading underscore.""" +    return [x for x in values if x[0] != "_"] + + +class ParsedCall(object): +    def __init__(self, name, kwargs): +        self.__dict__.update(kwargs) +        self._name = name + +    def __repr__(self): +        d = self.__dict__.copy() +        del d["_name"] +        return "<ParsedCall %r(**%r)>" % (self._name, d) + + +class HookRecorder(object): +    """Record all hooks called in a plugin manager. + +    This wraps all the hook calls in the plugin manager, recording each call +    before propagating the normal calls. + +    """ + +    def __init__(self, pluginmanager): +        self._pluginmanager = pluginmanager +        self.calls = [] + +        def before(hook_name, hook_impls, kwargs): +            self.calls.append(ParsedCall(hook_name, kwargs)) + +        def after(outcome, hook_name, hook_impls, kwargs): +            pass + +        self._undo_wrapping = pluginmanager.add_hookcall_monitoring(before, after) + +    def finish_recording(self): +        self._undo_wrapping() + +    def getcalls(self, names): +        if isinstance(names, str): +            names = names.split() +        return [call for call in self.calls if call._name in names] + +    def assert_contains(self, entries): +        __tracebackhide__ = True +        i = 0 +        entries = list(entries) +        backlocals = sys._getframe(1).f_locals +        while entries: +            name, check = entries.pop(0) +            for ind, call in enumerate(self.calls[i:]): +                if call._name == name: +                    print("NAMEMATCH", name, call) +                    if eval(check, backlocals, call.__dict__): +                        print("CHECKERMATCH", repr(check), "->", call) +                    else: +                        print("NOCHECKERMATCH", repr(check), "-", call) +                        continue +                    i += ind + 1 +                    break +                print("NONAMEMATCH", name, "with", call) +            else: +                pytest.fail("could not find %r check %r" % (name, check)) + +    def popcall(self, name): +        __tracebackhide__ = True +        for i, call in enumerate(self.calls): +            if call._name == name: +                del self.calls[i] +                return call +        lines = ["could not find call %r, in:" % (name,)] +        lines.extend(["  %s" % x for x in self.calls]) +        pytest.fail("\n".join(lines)) + +    def getcall(self, name): +        values = self.getcalls(name) +        assert len(values) == 1, (name, values) +        return values[0] + +    # functionality for test reports + +    def getreports(self, names="pytest_runtest_logreport pytest_collectreport"): +        return [x.report for x in self.getcalls(names)] + +    def matchreport( +        self, +        inamepart="", +        names="pytest_runtest_logreport pytest_collectreport", +        when=None, +    ): +        """return a testreport whose dotted import path matches""" +        values = [] +        for rep in self.getreports(names=names): +            if not when and rep.when != "call" and rep.passed: +                # setup/teardown passing reports - let's ignore those +                continue +            if when and rep.when != when: +                continue +            if not inamepart or inamepart in rep.nodeid.split("::"): +                values.append(rep) +        if not values: +            raise ValueError( +                "could not find test report matching %r: " +                "no test reports at all!" % (inamepart,) +            ) +        if len(values) > 1: +            raise ValueError( +                "found 2 or more testreports matching %r: %s" % (inamepart, values) +            ) +        return values[0] + +    def getfailures(self, names="pytest_runtest_logreport pytest_collectreport"): +        return [rep for rep in self.getreports(names) if rep.failed] + +    def getfailedcollections(self): +        return self.getfailures("pytest_collectreport") + +    def listoutcomes(self): +        passed = [] +        skipped = [] +        failed = [] +        for rep in self.getreports("pytest_collectreport pytest_runtest_logreport"): +            if rep.passed: +                if rep.when == "call": +                    passed.append(rep) +            elif rep.skipped: +                skipped.append(rep) +            else: +                assert rep.failed, "Unexpected outcome: {!r}".format(rep) +                failed.append(rep) +        return passed, skipped, failed + +    def countoutcomes(self): +        return [len(x) for x in self.listoutcomes()] + +    def assertoutcome(self, passed=0, skipped=0, failed=0): +        realpassed, realskipped, realfailed = self.listoutcomes() +        assert passed == len(realpassed) +        assert skipped == len(realskipped) +        assert failed == len(realfailed) + +    def clear(self): +        self.calls[:] = [] + + +def linecomp(request): +    return LineComp() + + [email protected](name="LineMatcher") +def LineMatcher_fixture(request): +    return LineMatcher + + +def testdir(request, tmpdir_factory): +    return Testdir(request, tmpdir_factory) + + +def _sys_snapshot(): +    snappaths = SysPathsSnapshot() +    snapmods = SysModulesSnapshot() +    yield +    snapmods.restore() +    snappaths.restore() + + +def _config_for_test(): +    from _pytest.config import get_config + +    config = get_config() +    yield config +    config._ensure_unconfigure()  # cleanup, e.g. capman closing tmpfiles. + + +rex_outcome = re.compile(r"(\d+) ([\w-]+)") + + +class RunResult(object): +    """The result of running a command. + +    Attributes: + +    :ret: the return value +    :outlines: list of lines captured from stdout +    :errlines: list of lines captures from stderr +    :stdout: :py:class:`LineMatcher` of stdout, use ``stdout.str()`` to +       reconstruct stdout or the commonly used ``stdout.fnmatch_lines()`` +       method +    :stderr: :py:class:`LineMatcher` of stderr +    :duration: duration in seconds + +    """ + +    def __init__(self, ret, outlines, errlines, duration): +        self.ret = ret +        self.outlines = outlines +        self.errlines = errlines +        self.stdout = LineMatcher(outlines) +        self.stderr = LineMatcher(errlines) +        self.duration = duration + +    def __repr__(self): +        return ( +            "<RunResult ret=%r len(stdout.lines)=%d len(stderr.lines)=%d duration=%.2fs>" +            % (self.ret, len(self.stdout.lines), len(self.stderr.lines), self.duration) +        ) + +    def parseoutcomes(self): +        """Return a dictionary of outcomestring->num from parsing the terminal +        output that the test process produced. + +        """ +        for line in reversed(self.outlines): +            if "seconds" in line: +                outcomes = rex_outcome.findall(line) +                if outcomes: +                    d = {} +                    for num, cat in outcomes: +                        d[cat] = int(num) +                    return d +        raise ValueError("Pytest terminal report not found") + +    def assert_outcomes( +        self, passed=0, skipped=0, failed=0, error=0, xpassed=0, xfailed=0 +    ): +        """Assert that the specified outcomes appear with the respective +        numbers (0 means it didn't occur) in the text output from a test run. + +        """ +        d = self.parseoutcomes() +        obtained = { +            "passed": d.get("passed", 0), +            "skipped": d.get("skipped", 0), +            "failed": d.get("failed", 0), +            "error": d.get("error", 0), +            "xpassed": d.get("xpassed", 0), +            "xfailed": d.get("xfailed", 0), +        } +        expected = { +            "passed": passed, +            "skipped": skipped, +            "failed": failed, +            "error": error, +            "xpassed": xpassed, +            "xfailed": xfailed, +        } +        assert obtained == expected + + +class CwdSnapshot(object): +    def __init__(self): +        self.__saved = os.getcwd() + +    def restore(self): +        os.chdir(self.__saved) + + +class SysModulesSnapshot(object): +    def __init__(self, preserve=None): +        self.__preserve = preserve +        self.__saved = dict(sys.modules) + +    def restore(self): +        if self.__preserve: +            self.__saved.update( +                (k, m) for k, m in sys.modules.items() if self.__preserve(k) +            ) +        sys.modules.clear() +        sys.modules.update(self.__saved) + + +class SysPathsSnapshot(object): +    def __init__(self): +        self.__saved = list(sys.path), list(sys.meta_path) + +    def restore(self): +        sys.path[:], sys.meta_path[:] = self.__saved + + +class Testdir(object): +    """Temporary test directory with tools to test/run pytest itself. + +    This is based on the ``tmpdir`` fixture but provides a number of methods +    which aid with testing pytest itself.  Unless :py:meth:`chdir` is used all +    methods will use :py:attr:`tmpdir` as their current working directory. + +    Attributes: + +    :tmpdir: The :py:class:`py.path.local` instance of the temporary directory. + +    :plugins: A list of plugins to use with :py:meth:`parseconfig` and +       :py:meth:`runpytest`.  Initially this is an empty list but plugins can +       be added to the list.  The type of items to add to the list depends on +       the method using them so refer to them for details. + +    """ + +    CLOSE_STDIN = object + +    class TimeoutExpired(Exception): +        pass + +    def __init__(self, request, tmpdir_factory): +        self.request = request +        self._mod_collections = WeakKeyDictionary() +        name = request.function.__name__ +        self.tmpdir = tmpdir_factory.mktemp(name, numbered=True) +        self.test_tmproot = tmpdir_factory.mktemp("tmp-" + name, numbered=True) +        self.plugins = [] +        self._cwd_snapshot = CwdSnapshot() +        self._sys_path_snapshot = SysPathsSnapshot() +        self._sys_modules_snapshot = self.__take_sys_modules_snapshot() +        self.chdir() +        self.request.addfinalizer(self.finalize) +        method = self.request.config.getoption("--runpytest") +        if method == "inprocess": +            self._runpytest_method = self.runpytest_inprocess +        elif method == "subprocess": +            self._runpytest_method = self.runpytest_subprocess + +        mp = self.monkeypatch = MonkeyPatch() +        mp.setenv("PYTEST_DEBUG_TEMPROOT", str(self.test_tmproot)) +        # Ensure no unexpected caching via tox. +        mp.delenv("TOX_ENV_DIR", raising=False) +        # Discard outer pytest options. +        mp.delenv("PYTEST_ADDOPTS", raising=False) + +        # Environment (updates) for inner runs. +        tmphome = str(self.tmpdir) +        self._env_run_update = {"HOME": tmphome, "USERPROFILE": tmphome} + +    def __repr__(self): +        return "<Testdir %r>" % (self.tmpdir,) + +    def __str__(self): +        return str(self.tmpdir) + +    def finalize(self): +        """Clean up global state artifacts. + +        Some methods modify the global interpreter state and this tries to +        clean this up.  It does not remove the temporary directory however so +        it can be looked at after the test run has finished. + +        """ +        self._sys_modules_snapshot.restore() +        self._sys_path_snapshot.restore() +        self._cwd_snapshot.restore() +        self.monkeypatch.undo() + +    def __take_sys_modules_snapshot(self): +        # some zope modules used by twisted-related tests keep internal state +        # and can't be deleted; we had some trouble in the past with +        # `zope.interface` for example +        def preserve_module(name): +            return name.startswith("zope") + +        return SysModulesSnapshot(preserve=preserve_module) + +    def make_hook_recorder(self, pluginmanager): +        """Create a new :py:class:`HookRecorder` for a PluginManager.""" +        pluginmanager.reprec = reprec = HookRecorder(pluginmanager) +        self.request.addfinalizer(reprec.finish_recording) +        return reprec + +    def chdir(self): +        """Cd into the temporary directory. + +        This is done automatically upon instantiation. + +        """ +        self.tmpdir.chdir() + +    def _makefile(self, ext, args, kwargs, encoding="utf-8"): +        items = list(kwargs.items()) + +        def to_text(s): +            return s.decode(encoding) if isinstance(s, bytes) else six.text_type(s) + +        if args: +            source = u"\n".join(to_text(x) for x in args) +            basename = self.request.function.__name__ +            items.insert(0, (basename, source)) + +        ret = None +        for basename, value in items: +            p = self.tmpdir.join(basename).new(ext=ext) +            p.dirpath().ensure_dir() +            source = Source(value) +            source = u"\n".join(to_text(line) for line in source.lines) +            p.write(source.strip().encode(encoding), "wb") +            if ret is None: +                ret = p +        return ret + +    def makefile(self, ext, *args, **kwargs): +        r"""Create new file(s) in the testdir. + +        :param str ext: The extension the file(s) should use, including the dot, e.g. `.py`. +        :param list[str] args: All args will be treated as strings and joined using newlines. +           The result will be written as contents to the file.  The name of the +           file will be based on the test function requesting this fixture. +        :param kwargs: Each keyword is the name of a file, while the value of it will +           be written as contents of the file. + +        Examples: + +        .. code-block:: python + +            testdir.makefile(".txt", "line1", "line2") + +            testdir.makefile(".ini", pytest="[pytest]\naddopts=-rs\n") + +        """ +        return self._makefile(ext, args, kwargs) + +    def makeconftest(self, source): +        """Write a contest.py file with 'source' as contents.""" +        return self.makepyfile(conftest=source) + +    def makeini(self, source): +        """Write a tox.ini file with 'source' as contents.""" +        return self.makefile(".ini", tox=source) + +    def getinicfg(self, source): +        """Return the pytest section from the tox.ini config file.""" +        p = self.makeini(source) +        return py.iniconfig.IniConfig(p)["pytest"] + +    def makepyfile(self, *args, **kwargs): +        """Shortcut for .makefile() with a .py extension.""" +        return self._makefile(".py", args, kwargs) + +    def maketxtfile(self, *args, **kwargs): +        """Shortcut for .makefile() with a .txt extension.""" +        return self._makefile(".txt", args, kwargs) + +    def syspathinsert(self, path=None): +        """Prepend a directory to sys.path, defaults to :py:attr:`tmpdir`. + +        This is undone automatically when this object dies at the end of each +        test. +        """ +        if path is None: +            path = self.tmpdir + +        self.monkeypatch.syspath_prepend(str(path)) + +    def mkdir(self, name): +        """Create a new (sub)directory.""" +        return self.tmpdir.mkdir(name) + +    def mkpydir(self, name): +        """Create a new python package. + +        This creates a (sub)directory with an empty ``__init__.py`` file so it +        gets recognised as a python package. + +        """ +        p = self.mkdir(name) +        p.ensure("__init__.py") +        return p + +    def copy_example(self, name=None): +        import warnings +        from _pytest.warning_types import PYTESTER_COPY_EXAMPLE + +        warnings.warn(PYTESTER_COPY_EXAMPLE, stacklevel=2) +        example_dir = self.request.config.getini("pytester_example_dir") +        if example_dir is None: +            raise ValueError("pytester_example_dir is unset, can't copy examples") +        example_dir = self.request.config.rootdir.join(example_dir) + +        for extra_element in self.request.node.iter_markers("pytester_example_path"): +            assert extra_element.args +            example_dir = example_dir.join(*extra_element.args) + +        if name is None: +            func_name = self.request.function.__name__ +            maybe_dir = example_dir / func_name +            maybe_file = example_dir / (func_name + ".py") + +            if maybe_dir.isdir(): +                example_path = maybe_dir +            elif maybe_file.isfile(): +                example_path = maybe_file +            else: +                raise LookupError( +                    "{} cant be found as module or package in {}".format( +                        func_name, example_dir.bestrelpath(self.request.config.rootdir) +                    ) +                ) +        else: +            example_path = example_dir.join(name) + +        if example_path.isdir() and not example_path.join("__init__.py").isfile(): +            example_path.copy(self.tmpdir) +            return self.tmpdir +        elif example_path.isfile(): +            result = self.tmpdir.join(example_path.basename) +            example_path.copy(result) +            return result +        else: +            raise LookupError( +                'example "{}" is not found as a file or directory'.format(example_path) +            ) + +    Session = Session + +    def getnode(self, config, arg): +        """Return the collection node of a file. + +        :param config: :py:class:`_pytest.config.Config` instance, see +           :py:meth:`parseconfig` and :py:meth:`parseconfigure` to create the +           configuration + +        :param arg: a :py:class:`py.path.local` instance of the file + +        """ +        session = Session(config) +        assert "::" not in str(arg) +        p = py.path.local(arg) +        config.hook.pytest_sessionstart(session=session) +        res = session.perform_collect([str(p)], genitems=False)[0] +        config.hook.pytest_sessionfinish(session=session, exitstatus=EXIT_OK) +        return res + +    def getpathnode(self, path): +        """Return the collection node of a file. + +        This is like :py:meth:`getnode` but uses :py:meth:`parseconfigure` to +        create the (configured) pytest Config instance. + +        :param path: a :py:class:`py.path.local` instance of the file + +        """ +        config = self.parseconfigure(path) +        session = Session(config) +        x = session.fspath.bestrelpath(path) +        config.hook.pytest_sessionstart(session=session) +        res = session.perform_collect([x], genitems=False)[0] +        config.hook.pytest_sessionfinish(session=session, exitstatus=EXIT_OK) +        return res + +    def genitems(self, colitems): +        """Generate all test items from a collection node. + +        This recurses into the collection node and returns a list of all the +        test items contained within. + +        """ +        session = colitems[0].session +        result = [] +        for colitem in colitems: +            result.extend(session.genitems(colitem)) +        return result + +    def runitem(self, source): +        """Run the "test_func" Item. + +        The calling test instance (class containing the test method) must +        provide a ``.getrunner()`` method which should return a runner which +        can run the test protocol for a single item, e.g. +        :py:func:`_pytest.runner.runtestprotocol`. + +        """ +        # used from runner functional tests +        item = self.getitem(source) +        # the test class where we are called from wants to provide the runner +        testclassinstance = self.request.instance +        runner = testclassinstance.getrunner() +        return runner(item) + +    def inline_runsource(self, source, *cmdlineargs): +        """Run a test module in process using ``pytest.main()``. + +        This run writes "source" into a temporary file and runs +        ``pytest.main()`` on it, returning a :py:class:`HookRecorder` instance +        for the result. + +        :param source: the source code of the test module + +        :param cmdlineargs: any extra command line arguments to use + +        :return: :py:class:`HookRecorder` instance of the result + +        """ +        p = self.makepyfile(source) +        values = list(cmdlineargs) + [p] +        return self.inline_run(*values) + +    def inline_genitems(self, *args): +        """Run ``pytest.main(['--collectonly'])`` in-process. + +        Runs the :py:func:`pytest.main` function to run all of pytest inside +        the test process itself like :py:meth:`inline_run`, but returns a +        tuple of the collected items and a :py:class:`HookRecorder` instance. + +        """ +        rec = self.inline_run("--collect-only", *args) +        items = [x.item for x in rec.getcalls("pytest_itemcollected")] +        return items, rec + +    def inline_run(self, *args, **kwargs): +        """Run ``pytest.main()`` in-process, returning a HookRecorder. + +        Runs the :py:func:`pytest.main` function to run all of pytest inside +        the test process itself.  This means it can return a +        :py:class:`HookRecorder` instance which gives more detailed results +        from that run than can be done by matching stdout/stderr from +        :py:meth:`runpytest`. + +        :param args: command line arguments to pass to :py:func:`pytest.main` + +        :param plugins: (keyword-only) extra plugin instances the +           ``pytest.main()`` instance should use + +        :return: a :py:class:`HookRecorder` instance +        """ +        plugins = kwargs.pop("plugins", []) +        no_reraise_ctrlc = kwargs.pop("no_reraise_ctrlc", None) +        raise_on_kwargs(kwargs) + +        finalizers = [] +        try: +            # Do not load user config (during runs only). +            mp_run = MonkeyPatch() +            for k, v in self._env_run_update.items(): +                mp_run.setenv(k, v) +            finalizers.append(mp_run.undo) + +            # When running pytest inline any plugins active in the main test +            # process are already imported.  So this disables the warning which +            # will trigger to say they can no longer be rewritten, which is +            # fine as they have already been rewritten. +            orig_warn = AssertionRewritingHook._warn_already_imported + +            def revert_warn_already_imported(): +                AssertionRewritingHook._warn_already_imported = orig_warn + +            finalizers.append(revert_warn_already_imported) +            AssertionRewritingHook._warn_already_imported = lambda *a: None + +            # Any sys.module or sys.path changes done while running pytest +            # inline should be reverted after the test run completes to avoid +            # clashing with later inline tests run within the same pytest test, +            # e.g. just because they use matching test module names. +            finalizers.append(self.__take_sys_modules_snapshot().restore) +            finalizers.append(SysPathsSnapshot().restore) + +            # Important note: +            # - our tests should not leave any other references/registrations +            #   laying around other than possibly loaded test modules +            #   referenced from sys.modules, as nothing will clean those up +            #   automatically + +            rec = [] + +            class Collect(object): +                def pytest_configure(x, config): +                    rec.append(self.make_hook_recorder(config.pluginmanager)) + +            plugins.append(Collect()) +            ret = pytest.main(list(args), plugins=plugins) +            if len(rec) == 1: +                reprec = rec.pop() +            else: + +                class reprec(object): +                    pass + +            reprec.ret = ret + +            # typically we reraise keyboard interrupts from the child run +            # because it's our user requesting interruption of the testing +            if ret == EXIT_INTERRUPTED and not no_reraise_ctrlc: +                calls = reprec.getcalls("pytest_keyboard_interrupt") +                if calls and calls[-1].excinfo.type == KeyboardInterrupt: +                    raise KeyboardInterrupt() +            return reprec +        finally: +            for finalizer in finalizers: +                finalizer() + +    def runpytest_inprocess(self, *args, **kwargs): +        """Return result of running pytest in-process, providing a similar +        interface to what self.runpytest() provides. +        """ +        syspathinsert = kwargs.pop("syspathinsert", False) + +        if syspathinsert: +            self.syspathinsert() +        now = time.time() +        capture = MultiCapture(Capture=SysCapture) +        capture.start_capturing() +        try: +            try: +                reprec = self.inline_run(*args, **kwargs) +            except SystemExit as e: + +                class reprec(object): +                    ret = e.args[0] + +            except Exception: +                traceback.print_exc() + +                class reprec(object): +                    ret = 3 + +        finally: +            out, err = capture.readouterr() +            capture.stop_capturing() +            sys.stdout.write(out) +            sys.stderr.write(err) + +        res = RunResult(reprec.ret, out.split("\n"), err.split("\n"), time.time() - now) +        res.reprec = reprec +        return res + +    def runpytest(self, *args, **kwargs): +        """Run pytest inline or in a subprocess, depending on the command line +        option "--runpytest" and return a :py:class:`RunResult`. + +        """ +        args = self._ensure_basetemp(args) +        return self._runpytest_method(*args, **kwargs) + +    def _ensure_basetemp(self, args): +        args = list(args) +        for x in args: +            if safe_str(x).startswith("--basetemp"): +                break +        else: +            args.append("--basetemp=%s" % self.tmpdir.dirpath("basetemp")) +        return args + +    def parseconfig(self, *args): +        """Return a new pytest Config instance from given commandline args. + +        This invokes the pytest bootstrapping code in _pytest.config to create +        a new :py:class:`_pytest.core.PluginManager` and call the +        pytest_cmdline_parse hook to create a new +        :py:class:`_pytest.config.Config` instance. + +        If :py:attr:`plugins` has been populated they should be plugin modules +        to be registered with the PluginManager. + +        """ +        args = self._ensure_basetemp(args) + +        import _pytest.config + +        config = _pytest.config._prepareconfig(args, self.plugins) +        # we don't know what the test will do with this half-setup config +        # object and thus we make sure it gets unconfigured properly in any +        # case (otherwise capturing could still be active, for example) +        self.request.addfinalizer(config._ensure_unconfigure) +        return config + +    def parseconfigure(self, *args): +        """Return a new pytest configured Config instance. + +        This returns a new :py:class:`_pytest.config.Config` instance like +        :py:meth:`parseconfig`, but also calls the pytest_configure hook. + +        """ +        config = self.parseconfig(*args) +        config._do_configure() +        self.request.addfinalizer(config._ensure_unconfigure) +        return config + +    def getitem(self, source, funcname="test_func"): +        """Return the test item for a test function. + +        This writes the source to a python file and runs pytest's collection on +        the resulting module, returning the test item for the requested +        function name. + +        :param source: the module source + +        :param funcname: the name of the test function for which to return a +            test item + +        """ +        items = self.getitems(source) +        for item in items: +            if item.name == funcname: +                return item +        assert 0, "%r item not found in module:\n%s\nitems: %s" % ( +            funcname, +            source, +            items, +        ) + +    def getitems(self, source): +        """Return all test items collected from the module. + +        This writes the source to a python file and runs pytest's collection on +        the resulting module, returning all test items contained within. + +        """ +        modcol = self.getmodulecol(source) +        return self.genitems([modcol]) + +    def getmodulecol(self, source, configargs=(), withinit=False): +        """Return the module collection node for ``source``. + +        This writes ``source`` to a file using :py:meth:`makepyfile` and then +        runs the pytest collection on it, returning the collection node for the +        test module. + +        :param source: the source code of the module to collect + +        :param configargs: any extra arguments to pass to +            :py:meth:`parseconfigure` + +        :param withinit: whether to also write an ``__init__.py`` file to the +            same directory to ensure it is a package + +        """ +        if isinstance(source, Path): +            path = self.tmpdir.join(str(source)) +            assert not withinit, "not supported for paths" +        else: +            kw = {self.request.function.__name__: Source(source).strip()} +            path = self.makepyfile(**kw) +        if withinit: +            self.makepyfile(__init__="#") +        self.config = config = self.parseconfigure(path, *configargs) +        return self.getnode(config, path) + +    def collect_by_name(self, modcol, name): +        """Return the collection node for name from the module collection. + +        This will search a module collection node for a collection node +        matching the given name. + +        :param modcol: a module collection node; see :py:meth:`getmodulecol` + +        :param name: the name of the node to return + +        """ +        if modcol not in self._mod_collections: +            self._mod_collections[modcol] = list(modcol.collect()) +        for colitem in self._mod_collections[modcol]: +            if colitem.name == name: +                return colitem + +    def popen( +        self, +        cmdargs, +        stdout=subprocess.PIPE, +        stderr=subprocess.PIPE, +        stdin=CLOSE_STDIN, +        **kw +    ): +        """Invoke subprocess.Popen. + +        This calls subprocess.Popen making sure the current working directory +        is in the PYTHONPATH. + +        You probably want to use :py:meth:`run` instead. + +        """ +        env = os.environ.copy() +        env["PYTHONPATH"] = os.pathsep.join( +            filter(None, [os.getcwd(), env.get("PYTHONPATH", "")]) +        ) +        env.update(self._env_run_update) +        kw["env"] = env + +        if stdin is Testdir.CLOSE_STDIN: +            kw["stdin"] = subprocess.PIPE +        elif isinstance(stdin, bytes): +            kw["stdin"] = subprocess.PIPE +        else: +            kw["stdin"] = stdin + +        popen = subprocess.Popen(cmdargs, stdout=stdout, stderr=stderr, **kw) +        if stdin is Testdir.CLOSE_STDIN: +            popen.stdin.close() +        elif isinstance(stdin, bytes): +            popen.stdin.write(stdin) + +        return popen + +    def run(self, *cmdargs, **kwargs): +        """Run a command with arguments. + +        Run a process using subprocess.Popen saving the stdout and stderr. + +        :param args: the sequence of arguments to pass to `subprocess.Popen()` +        :param timeout: the period in seconds after which to timeout and raise +            :py:class:`Testdir.TimeoutExpired` +        :param stdin: optional standard input.  Bytes are being send, closing +            the pipe, otherwise it is passed through to ``popen``. +            Defaults to ``CLOSE_STDIN``, which translates to using a pipe +            (``subprocess.PIPE``) that gets closed. + +        Returns a :py:class:`RunResult`. + +        """ +        __tracebackhide__ = True + +        timeout = kwargs.pop("timeout", None) +        stdin = kwargs.pop("stdin", Testdir.CLOSE_STDIN) +        raise_on_kwargs(kwargs) + +        cmdargs = [ +            str(arg) if isinstance(arg, py.path.local) else arg for arg in cmdargs +        ] +        p1 = self.tmpdir.join("stdout") +        p2 = self.tmpdir.join("stderr") +        print("running:", *cmdargs) +        print("     in:", py.path.local()) +        f1 = codecs.open(str(p1), "w", encoding="utf8") +        f2 = codecs.open(str(p2), "w", encoding="utf8") +        try: +            now = time.time() +            popen = self.popen( +                cmdargs, +                stdin=stdin, +                stdout=f1, +                stderr=f2, +                close_fds=(sys.platform != "win32"), +            ) +            if isinstance(stdin, bytes): +                popen.stdin.close() + +            def handle_timeout(): +                __tracebackhide__ = True + +                timeout_message = ( +                    "{seconds} second timeout expired running:" +                    " {command}".format(seconds=timeout, command=cmdargs) +                ) + +                popen.kill() +                popen.wait() +                raise self.TimeoutExpired(timeout_message) + +            if timeout is None: +                ret = popen.wait() +            elif not six.PY2: +                try: +                    ret = popen.wait(timeout) +                except subprocess.TimeoutExpired: +                    handle_timeout() +            else: +                end = time.time() + timeout + +                resolution = min(0.1, timeout / 10) + +                while True: +                    ret = popen.poll() +                    if ret is not None: +                        break + +                    if time.time() > end: +                        handle_timeout() + +                    time.sleep(resolution) +        finally: +            f1.close() +            f2.close() +        f1 = codecs.open(str(p1), "r", encoding="utf8") +        f2 = codecs.open(str(p2), "r", encoding="utf8") +        try: +            out = f1.read().splitlines() +            err = f2.read().splitlines() +        finally: +            f1.close() +            f2.close() +        self._dump_lines(out, sys.stdout) +        self._dump_lines(err, sys.stderr) +        return RunResult(ret, out, err, time.time() - now) + +    def _dump_lines(self, lines, fp): +        try: +            for line in lines: +                print(line, file=fp) +        except UnicodeEncodeError: +            print("couldn't print to %s because of encoding" % (fp,)) + +    def _getpytestargs(self): +        return sys.executable, "-mpytest" + +    def runpython(self, script): +        """Run a python script using sys.executable as interpreter. + +        Returns a :py:class:`RunResult`. + +        """ +        return self.run(sys.executable, script) + +    def runpython_c(self, command): +        """Run python -c "command", return a :py:class:`RunResult`.""" +        return self.run(sys.executable, "-c", command) + +    def runpytest_subprocess(self, *args, **kwargs): +        """Run pytest as a subprocess with given arguments. + +        Any plugins added to the :py:attr:`plugins` list will be added using the +        ``-p`` command line option.  Additionally ``--basetemp`` is used to put +        any temporary files and directories in a numbered directory prefixed +        with "runpytest-" to not conflict with the normal numbered pytest +        location for temporary files and directories. + +        :param args: the sequence of arguments to pass to the pytest subprocess +        :param timeout: the period in seconds after which to timeout and raise +            :py:class:`Testdir.TimeoutExpired` + +        Returns a :py:class:`RunResult`. +        """ +        __tracebackhide__ = True +        timeout = kwargs.pop("timeout", None) +        raise_on_kwargs(kwargs) + +        p = py.path.local.make_numbered_dir( +            prefix="runpytest-", keep=None, rootdir=self.tmpdir +        ) +        args = ("--basetemp=%s" % p,) + args +        plugins = [x for x in self.plugins if isinstance(x, str)] +        if plugins: +            args = ("-p", plugins[0]) + args +        args = self._getpytestargs() + args +        return self.run(*args, timeout=timeout) + +    def spawn_pytest(self, string, expect_timeout=10.0): +        """Run pytest using pexpect. + +        This makes sure to use the right pytest and sets up the temporary +        directory locations. + +        The pexpect child is returned. + +        """ +        basetemp = self.tmpdir.mkdir("temp-pexpect") +        invoke = " ".join(map(str, self._getpytestargs())) +        cmd = "%s --basetemp=%s %s" % (invoke, basetemp, string) +        return self.spawn(cmd, expect_timeout=expect_timeout) + +    def spawn(self, cmd, expect_timeout=10.0): +        """Run a command using pexpect. + +        The pexpect child is returned. + +        """ +        pexpect = pytest.importorskip("pexpect", "3.0") +        if hasattr(sys, "pypy_version_info") and "64" in platform.machine(): +            pytest.skip("pypy-64 bit not supported") +        if sys.platform.startswith("freebsd"): +            pytest.xfail("pexpect does not work reliably on freebsd") +        logfile = self.tmpdir.join("spawn.out").open("wb") + +        # Do not load user config. +        env = os.environ.copy() +        env.update(self._env_run_update) + +        child = pexpect.spawn(cmd, logfile=logfile, env=env) +        self.request.addfinalizer(logfile.close) +        child.timeout = expect_timeout +        return child + + +def getdecoded(out): +    try: +        return out.decode("utf-8") +    except UnicodeDecodeError: +        return "INTERNAL not-utf8-decodeable, truncated string:\n%s" % (saferepr(out),) + + +class LineComp(object): +    def __init__(self): +        self.stringio = py.io.TextIO() + +    def assert_contains_lines(self, lines2): +        """Assert that lines2 are contained (linearly) in lines1. + +        Return a list of extralines found. + +        """ +        __tracebackhide__ = True +        val = self.stringio.getvalue() +        self.stringio.truncate(0) +        self.stringio.seek(0) +        lines1 = val.split("\n") +        return LineMatcher(lines1).fnmatch_lines(lines2) + + +class LineMatcher(object): +    """Flexible matching of text. + +    This is a convenience class to test large texts like the output of +    commands. + +    The constructor takes a list of lines without their trailing newlines, i.e. +    ``text.splitlines()``. + +    """ + +    def __init__(self, lines): +        self.lines = lines +        self._log_output = [] + +    def str(self): +        """Return the entire original text.""" +        return "\n".join(self.lines) + +    def _getlines(self, lines2): +        if isinstance(lines2, str): +            lines2 = Source(lines2) +        if isinstance(lines2, Source): +            lines2 = lines2.strip().lines +        return lines2 + +    def fnmatch_lines_random(self, lines2): +        """Check lines exist in the output using in any order. + +        Lines are checked using ``fnmatch.fnmatch``. The argument is a list of +        lines which have to occur in the output, in any order. + +        """ +        self._match_lines_random(lines2, fnmatch) + +    def re_match_lines_random(self, lines2): +        """Check lines exist in the output using ``re.match``, in any order. + +        The argument is a list of lines which have to occur in the output, in +        any order. + +        """ +        self._match_lines_random(lines2, lambda name, pat: re.match(pat, name)) + +    def _match_lines_random(self, lines2, match_func): +        """Check lines exist in the output. + +        The argument is a list of lines which have to occur in the output, in +        any order.  Each line can contain glob whildcards. + +        """ +        lines2 = self._getlines(lines2) +        for line in lines2: +            for x in self.lines: +                if line == x or match_func(x, line): +                    self._log("matched: ", repr(line)) +                    break +            else: +                self._log("line %r not found in output" % line) +                raise ValueError(self._log_text) + +    def get_lines_after(self, fnline): +        """Return all lines following the given line in the text. + +        The given line can contain glob wildcards. + +        """ +        for i, line in enumerate(self.lines): +            if fnline == line or fnmatch(line, fnline): +                return self.lines[i + 1 :] +        raise ValueError("line %r not found in output" % fnline) + +    def _log(self, *args): +        self._log_output.append(" ".join(str(x) for x in args)) + +    @property +    def _log_text(self): +        return "\n".join(self._log_output) + +    def fnmatch_lines(self, lines2): +        """Search captured text for matching lines using ``fnmatch.fnmatch``. + +        The argument is a list of lines which have to match and can use glob +        wildcards.  If they do not match a pytest.fail() is called.  The +        matches and non-matches are also printed on stdout. + +        """ +        __tracebackhide__ = True +        self._match_lines(lines2, fnmatch, "fnmatch") + +    def re_match_lines(self, lines2): +        """Search captured text for matching lines using ``re.match``. + +        The argument is a list of lines which have to match using ``re.match``. +        If they do not match a pytest.fail() is called. + +        The matches and non-matches are also printed on stdout. + +        """ +        __tracebackhide__ = True +        self._match_lines(lines2, lambda name, pat: re.match(pat, name), "re.match") + +    def _match_lines(self, lines2, match_func, match_nickname): +        """Underlying implementation of ``fnmatch_lines`` and ``re_match_lines``. + +        :param list[str] lines2: list of string patterns to match. The actual +            format depends on ``match_func`` +        :param match_func: a callable ``match_func(line, pattern)`` where line +            is the captured line from stdout/stderr and pattern is the matching +            pattern +        :param str match_nickname: the nickname for the match function that +            will be logged to stdout when a match occurs + +        """ +        assert isinstance(lines2, Sequence) +        lines2 = self._getlines(lines2) +        lines1 = self.lines[:] +        nextline = None +        extralines = [] +        __tracebackhide__ = True +        for line in lines2: +            nomatchprinted = False +            while lines1: +                nextline = lines1.pop(0) +                if line == nextline: +                    self._log("exact match:", repr(line)) +                    break +                elif match_func(nextline, line): +                    self._log("%s:" % match_nickname, repr(line)) +                    self._log("   with:", repr(nextline)) +                    break +                else: +                    if not nomatchprinted: +                        self._log("nomatch:", repr(line)) +                        nomatchprinted = True +                    self._log("    and:", repr(nextline)) +                extralines.append(nextline) +            else: +                self._log("remains unmatched: %r" % (line,)) +                pytest.fail(self._log_text) | 
