From 1110808a9d39d4b808aef724c861a2e1a38d2a69 Mon Sep 17 00:00:00 2001
From: Devtools Arcadia <arcadia-devtools@yandex-team.ru>
Date: Mon, 7 Feb 2022 18:08:42 +0300
Subject: intermediate changes ref:cde9a383711a11544ce7e107a78147fb96cc4029

---
 contrib/python/pytest/py2/_pytest/pytester.py | 1413 +++++++++++++++++++++++++
 1 file changed, 1413 insertions(+)
 create mode 100644 contrib/python/pytest/py2/_pytest/pytester.py

(limited to 'contrib/python/pytest/py2/_pytest/pytester.py')

diff --git a/contrib/python/pytest/py2/_pytest/pytester.py b/contrib/python/pytest/py2/_pytest/pytester.py
new file mode 100644
index 0000000000..f1d739c991
--- /dev/null
+++ b/contrib/python/pytest/py2/_pytest/pytester.py
@@ -0,0 +1,1413 @@
+# -*- coding: utf-8 -*-
+"""(disabled by default) support for testing pytest and pytest plugins."""
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import codecs
+import gc
+import os
+import platform
+import re
+import subprocess
+import sys
+import time
+import traceback
+from fnmatch import fnmatch
+from weakref import WeakKeyDictionary
+
+import py
+import six
+
+import pytest
+from _pytest._code import Source
+from _pytest._io.saferepr import saferepr
+from _pytest.assertion.rewrite import AssertionRewritingHook
+from _pytest.capture import MultiCapture
+from _pytest.capture import SysCapture
+from _pytest.compat import safe_str
+from _pytest.compat import Sequence
+from _pytest.main import EXIT_INTERRUPTED
+from _pytest.main import EXIT_OK
+from _pytest.main import Session
+from _pytest.monkeypatch import MonkeyPatch
+from _pytest.pathlib import Path
+
+IGNORE_PAM = [  # filenames added when obtaining details about the current user
+    u"/var/lib/sss/mc/passwd"
+]
+
+
+def pytest_addoption(parser):
+    parser.addoption(
+        "--lsof",
+        action="store_true",
+        dest="lsof",
+        default=False,
+        help="run FD checks if lsof is available",
+    )
+
+    parser.addoption(
+        "--runpytest",
+        default="inprocess",
+        dest="runpytest",
+        choices=("inprocess", "subprocess"),
+        help=(
+            "run pytest sub runs in tests using an 'inprocess' "
+            "or 'subprocess' (python -m main) method"
+        ),
+    )
+
+    parser.addini(
+        "pytester_example_dir", help="directory to take the pytester example files from"
+    )
+
+
+def pytest_configure(config):
+    if config.getvalue("lsof"):
+        checker = LsofFdLeakChecker()
+        if checker.matching_platform():
+            config.pluginmanager.register(checker)
+
+    config.addinivalue_line(
+        "markers",
+        "pytester_example_path(*path_segments): join the given path "
+        "segments to `pytester_example_dir` for this test.",
+    )
+
+
+def raise_on_kwargs(kwargs):
+    __tracebackhide__ = True
+    if kwargs:  # pragma: no branch
+        raise TypeError(
+            "Unexpected keyword arguments: {}".format(", ".join(sorted(kwargs)))
+        )
+
+
+class LsofFdLeakChecker(object):
+    def get_open_files(self):
+        out = self._exec_lsof()
+        open_files = self._parse_lsof_output(out)
+        return open_files
+
+    def _exec_lsof(self):
+        pid = os.getpid()
+        # py3: use subprocess.DEVNULL directly.
+        with open(os.devnull, "wb") as devnull:
+            return subprocess.check_output(
+                ("lsof", "-Ffn0", "-p", str(pid)), stderr=devnull
+            ).decode()
+
+    def _parse_lsof_output(self, out):
+        def isopen(line):
+            return line.startswith("f") and (
+                "deleted" not in line
+                and "mem" not in line
+                and "txt" not in line
+                and "cwd" not in line
+            )
+
+        open_files = []
+
+        for line in out.split("\n"):
+            if isopen(line):
+                fields = line.split("\0")
+                fd = fields[0][1:]
+                filename = fields[1][1:]
+                if filename in IGNORE_PAM:
+                    continue
+                if filename.startswith("/"):
+                    open_files.append((fd, filename))
+
+        return open_files
+
+    def matching_platform(self):
+        try:
+            subprocess.check_output(("lsof", "-v"))
+        except (OSError, subprocess.CalledProcessError):
+            return False
+        else:
+            return True
+
+    @pytest.hookimpl(hookwrapper=True, tryfirst=True)
+    def pytest_runtest_protocol(self, item):
+        lines1 = self.get_open_files()
+        yield
+        if hasattr(sys, "pypy_version_info"):
+            gc.collect()
+        lines2 = self.get_open_files()
+
+        new_fds = {t[0] for t in lines2} - {t[0] for t in lines1}
+        leaked_files = [t for t in lines2 if t[0] in new_fds]
+        if leaked_files:
+            error = []
+            error.append("***** %s FD leakage detected" % len(leaked_files))
+            error.extend([str(f) for f in leaked_files])
+            error.append("*** Before:")
+            error.extend([str(f) for f in lines1])
+            error.append("*** After:")
+            error.extend([str(f) for f in lines2])
+            error.append(error[0])
+            error.append("*** function %s:%s: %s " % item.location)
+            error.append("See issue #2366")
+            item.warn(pytest.PytestWarning("\n".join(error)))
+
+
+# used at least by pytest-xdist plugin
+
+
+@pytest.fixture
+def _pytest(request):
+    """Return a helper which offers a gethookrecorder(hook) method which
+    returns a HookRecorder instance which helps to make assertions about called
+    hooks.
+
+    """
+    return PytestArg(request)
+
+
+class PytestArg(object):
+    def __init__(self, request):
+        self.request = request
+
+    def gethookrecorder(self, hook):
+        hookrecorder = HookRecorder(hook._pm)
+        self.request.addfinalizer(hookrecorder.finish_recording)
+        return hookrecorder
+
+
+def get_public_names(values):
+    """Only return names from iterator values without a leading underscore."""
+    return [x for x in values if x[0] != "_"]
+
+
+class ParsedCall(object):
+    def __init__(self, name, kwargs):
+        self.__dict__.update(kwargs)
+        self._name = name
+
+    def __repr__(self):
+        d = self.__dict__.copy()
+        del d["_name"]
+        return "<ParsedCall %r(**%r)>" % (self._name, d)
+
+
+class HookRecorder(object):
+    """Record all hooks called in a plugin manager.
+
+    This wraps all the hook calls in the plugin manager, recording each call
+    before propagating the normal calls.
+
+    """
+
+    def __init__(self, pluginmanager):
+        self._pluginmanager = pluginmanager
+        self.calls = []
+
+        def before(hook_name, hook_impls, kwargs):
+            self.calls.append(ParsedCall(hook_name, kwargs))
+
+        def after(outcome, hook_name, hook_impls, kwargs):
+            pass
+
+        self._undo_wrapping = pluginmanager.add_hookcall_monitoring(before, after)
+
+    def finish_recording(self):
+        self._undo_wrapping()
+
+    def getcalls(self, names):
+        if isinstance(names, str):
+            names = names.split()
+        return [call for call in self.calls if call._name in names]
+
+    def assert_contains(self, entries):
+        __tracebackhide__ = True
+        i = 0
+        entries = list(entries)
+        backlocals = sys._getframe(1).f_locals
+        while entries:
+            name, check = entries.pop(0)
+            for ind, call in enumerate(self.calls[i:]):
+                if call._name == name:
+                    print("NAMEMATCH", name, call)
+                    if eval(check, backlocals, call.__dict__):
+                        print("CHECKERMATCH", repr(check), "->", call)
+                    else:
+                        print("NOCHECKERMATCH", repr(check), "-", call)
+                        continue
+                    i += ind + 1
+                    break
+                print("NONAMEMATCH", name, "with", call)
+            else:
+                pytest.fail("could not find %r check %r" % (name, check))
+
+    def popcall(self, name):
+        __tracebackhide__ = True
+        for i, call in enumerate(self.calls):
+            if call._name == name:
+                del self.calls[i]
+                return call
+        lines = ["could not find call %r, in:" % (name,)]
+        lines.extend(["  %s" % x for x in self.calls])
+        pytest.fail("\n".join(lines))
+
+    def getcall(self, name):
+        values = self.getcalls(name)
+        assert len(values) == 1, (name, values)
+        return values[0]
+
+    # functionality for test reports
+
+    def getreports(self, names="pytest_runtest_logreport pytest_collectreport"):
+        return [x.report for x in self.getcalls(names)]
+
+    def matchreport(
+        self,
+        inamepart="",
+        names="pytest_runtest_logreport pytest_collectreport",
+        when=None,
+    ):
+        """return a testreport whose dotted import path matches"""
+        values = []
+        for rep in self.getreports(names=names):
+            if not when and rep.when != "call" and rep.passed:
+                # setup/teardown passing reports - let's ignore those
+                continue
+            if when and rep.when != when:
+                continue
+            if not inamepart or inamepart in rep.nodeid.split("::"):
+                values.append(rep)
+        if not values:
+            raise ValueError(
+                "could not find test report matching %r: "
+                "no test reports at all!" % (inamepart,)
+            )
+        if len(values) > 1:
+            raise ValueError(
+                "found 2 or more testreports matching %r: %s" % (inamepart, values)
+            )
+        return values[0]
+
+    def getfailures(self, names="pytest_runtest_logreport pytest_collectreport"):
+        return [rep for rep in self.getreports(names) if rep.failed]
+
+    def getfailedcollections(self):
+        return self.getfailures("pytest_collectreport")
+
+    def listoutcomes(self):
+        passed = []
+        skipped = []
+        failed = []
+        for rep in self.getreports("pytest_collectreport pytest_runtest_logreport"):
+            if rep.passed:
+                if rep.when == "call":
+                    passed.append(rep)
+            elif rep.skipped:
+                skipped.append(rep)
+            else:
+                assert rep.failed, "Unexpected outcome: {!r}".format(rep)
+                failed.append(rep)
+        return passed, skipped, failed
+
+    def countoutcomes(self):
+        return [len(x) for x in self.listoutcomes()]
+
+    def assertoutcome(self, passed=0, skipped=0, failed=0):
+        realpassed, realskipped, realfailed = self.listoutcomes()
+        assert passed == len(realpassed)
+        assert skipped == len(realskipped)
+        assert failed == len(realfailed)
+
+    def clear(self):
+        self.calls[:] = []
+
+
+@pytest.fixture
+def linecomp(request):
+    return LineComp()
+
+
+@pytest.fixture(name="LineMatcher")
+def LineMatcher_fixture(request):
+    return LineMatcher
+
+
+@pytest.fixture
+def testdir(request, tmpdir_factory):
+    return Testdir(request, tmpdir_factory)
+
+
+@pytest.fixture
+def _sys_snapshot():
+    snappaths = SysPathsSnapshot()
+    snapmods = SysModulesSnapshot()
+    yield
+    snapmods.restore()
+    snappaths.restore()
+
+
+@pytest.fixture
+def _config_for_test():
+    from _pytest.config import get_config
+
+    config = get_config()
+    yield config
+    config._ensure_unconfigure()  # cleanup, e.g. capman closing tmpfiles.
+
+
+rex_outcome = re.compile(r"(\d+) ([\w-]+)")
+
+
+class RunResult(object):
+    """The result of running a command.
+
+    Attributes:
+
+    :ret: the return value
+    :outlines: list of lines captured from stdout
+    :errlines: list of lines captures from stderr
+    :stdout: :py:class:`LineMatcher` of stdout, use ``stdout.str()`` to
+       reconstruct stdout or the commonly used ``stdout.fnmatch_lines()``
+       method
+    :stderr: :py:class:`LineMatcher` of stderr
+    :duration: duration in seconds
+
+    """
+
+    def __init__(self, ret, outlines, errlines, duration):
+        self.ret = ret
+        self.outlines = outlines
+        self.errlines = errlines
+        self.stdout = LineMatcher(outlines)
+        self.stderr = LineMatcher(errlines)
+        self.duration = duration
+
+    def __repr__(self):
+        return (
+            "<RunResult ret=%r len(stdout.lines)=%d len(stderr.lines)=%d duration=%.2fs>"
+            % (self.ret, len(self.stdout.lines), len(self.stderr.lines), self.duration)
+        )
+
+    def parseoutcomes(self):
+        """Return a dictionary of outcomestring->num from parsing the terminal
+        output that the test process produced.
+
+        """
+        for line in reversed(self.outlines):
+            if "seconds" in line:
+                outcomes = rex_outcome.findall(line)
+                if outcomes:
+                    d = {}
+                    for num, cat in outcomes:
+                        d[cat] = int(num)
+                    return d
+        raise ValueError("Pytest terminal report not found")
+
+    def assert_outcomes(
+        self, passed=0, skipped=0, failed=0, error=0, xpassed=0, xfailed=0
+    ):
+        """Assert that the specified outcomes appear with the respective
+        numbers (0 means it didn't occur) in the text output from a test run.
+
+        """
+        d = self.parseoutcomes()
+        obtained = {
+            "passed": d.get("passed", 0),
+            "skipped": d.get("skipped", 0),
+            "failed": d.get("failed", 0),
+            "error": d.get("error", 0),
+            "xpassed": d.get("xpassed", 0),
+            "xfailed": d.get("xfailed", 0),
+        }
+        expected = {
+            "passed": passed,
+            "skipped": skipped,
+            "failed": failed,
+            "error": error,
+            "xpassed": xpassed,
+            "xfailed": xfailed,
+        }
+        assert obtained == expected
+
+
+class CwdSnapshot(object):
+    def __init__(self):
+        self.__saved = os.getcwd()
+
+    def restore(self):
+        os.chdir(self.__saved)
+
+
+class SysModulesSnapshot(object):
+    def __init__(self, preserve=None):
+        self.__preserve = preserve
+        self.__saved = dict(sys.modules)
+
+    def restore(self):
+        if self.__preserve:
+            self.__saved.update(
+                (k, m) for k, m in sys.modules.items() if self.__preserve(k)
+            )
+        sys.modules.clear()
+        sys.modules.update(self.__saved)
+
+
+class SysPathsSnapshot(object):
+    def __init__(self):
+        self.__saved = list(sys.path), list(sys.meta_path)
+
+    def restore(self):
+        sys.path[:], sys.meta_path[:] = self.__saved
+
+
+class Testdir(object):
+    """Temporary test directory with tools to test/run pytest itself.
+
+    This is based on the ``tmpdir`` fixture but provides a number of methods
+    which aid with testing pytest itself.  Unless :py:meth:`chdir` is used all
+    methods will use :py:attr:`tmpdir` as their current working directory.
+
+    Attributes:
+
+    :tmpdir: The :py:class:`py.path.local` instance of the temporary directory.
+
+    :plugins: A list of plugins to use with :py:meth:`parseconfig` and
+       :py:meth:`runpytest`.  Initially this is an empty list but plugins can
+       be added to the list.  The type of items to add to the list depends on
+       the method using them so refer to them for details.
+
+    """
+
+    CLOSE_STDIN = object
+
+    class TimeoutExpired(Exception):
+        pass
+
+    def __init__(self, request, tmpdir_factory):
+        self.request = request
+        self._mod_collections = WeakKeyDictionary()
+        name = request.function.__name__
+        self.tmpdir = tmpdir_factory.mktemp(name, numbered=True)
+        self.test_tmproot = tmpdir_factory.mktemp("tmp-" + name, numbered=True)
+        self.plugins = []
+        self._cwd_snapshot = CwdSnapshot()
+        self._sys_path_snapshot = SysPathsSnapshot()
+        self._sys_modules_snapshot = self.__take_sys_modules_snapshot()
+        self.chdir()
+        self.request.addfinalizer(self.finalize)
+        method = self.request.config.getoption("--runpytest")
+        if method == "inprocess":
+            self._runpytest_method = self.runpytest_inprocess
+        elif method == "subprocess":
+            self._runpytest_method = self.runpytest_subprocess
+
+        mp = self.monkeypatch = MonkeyPatch()
+        mp.setenv("PYTEST_DEBUG_TEMPROOT", str(self.test_tmproot))
+        # Ensure no unexpected caching via tox.
+        mp.delenv("TOX_ENV_DIR", raising=False)
+        # Discard outer pytest options.
+        mp.delenv("PYTEST_ADDOPTS", raising=False)
+
+        # Environment (updates) for inner runs.
+        tmphome = str(self.tmpdir)
+        self._env_run_update = {"HOME": tmphome, "USERPROFILE": tmphome}
+
+    def __repr__(self):
+        return "<Testdir %r>" % (self.tmpdir,)
+
+    def __str__(self):
+        return str(self.tmpdir)
+
+    def finalize(self):
+        """Clean up global state artifacts.
+
+        Some methods modify the global interpreter state and this tries to
+        clean this up.  It does not remove the temporary directory however so
+        it can be looked at after the test run has finished.
+
+        """
+        self._sys_modules_snapshot.restore()
+        self._sys_path_snapshot.restore()
+        self._cwd_snapshot.restore()
+        self.monkeypatch.undo()
+
+    def __take_sys_modules_snapshot(self):
+        # some zope modules used by twisted-related tests keep internal state
+        # and can't be deleted; we had some trouble in the past with
+        # `zope.interface` for example
+        def preserve_module(name):
+            return name.startswith("zope")
+
+        return SysModulesSnapshot(preserve=preserve_module)
+
+    def make_hook_recorder(self, pluginmanager):
+        """Create a new :py:class:`HookRecorder` for a PluginManager."""
+        pluginmanager.reprec = reprec = HookRecorder(pluginmanager)
+        self.request.addfinalizer(reprec.finish_recording)
+        return reprec
+
+    def chdir(self):
+        """Cd into the temporary directory.
+
+        This is done automatically upon instantiation.
+
+        """
+        self.tmpdir.chdir()
+
+    def _makefile(self, ext, args, kwargs, encoding="utf-8"):
+        items = list(kwargs.items())
+
+        def to_text(s):
+            return s.decode(encoding) if isinstance(s, bytes) else six.text_type(s)
+
+        if args:
+            source = u"\n".join(to_text(x) for x in args)
+            basename = self.request.function.__name__
+            items.insert(0, (basename, source))
+
+        ret = None
+        for basename, value in items:
+            p = self.tmpdir.join(basename).new(ext=ext)
+            p.dirpath().ensure_dir()
+            source = Source(value)
+            source = u"\n".join(to_text(line) for line in source.lines)
+            p.write(source.strip().encode(encoding), "wb")
+            if ret is None:
+                ret = p
+        return ret
+
+    def makefile(self, ext, *args, **kwargs):
+        r"""Create new file(s) in the testdir.
+
+        :param str ext: The extension the file(s) should use, including the dot, e.g. `.py`.
+        :param list[str] args: All args will be treated as strings and joined using newlines.
+           The result will be written as contents to the file.  The name of the
+           file will be based on the test function requesting this fixture.
+        :param kwargs: Each keyword is the name of a file, while the value of it will
+           be written as contents of the file.
+
+        Examples:
+
+        .. code-block:: python
+
+            testdir.makefile(".txt", "line1", "line2")
+
+            testdir.makefile(".ini", pytest="[pytest]\naddopts=-rs\n")
+
+        """
+        return self._makefile(ext, args, kwargs)
+
+    def makeconftest(self, source):
+        """Write a contest.py file with 'source' as contents."""
+        return self.makepyfile(conftest=source)
+
+    def makeini(self, source):
+        """Write a tox.ini file with 'source' as contents."""
+        return self.makefile(".ini", tox=source)
+
+    def getinicfg(self, source):
+        """Return the pytest section from the tox.ini config file."""
+        p = self.makeini(source)
+        return py.iniconfig.IniConfig(p)["pytest"]
+
+    def makepyfile(self, *args, **kwargs):
+        """Shortcut for .makefile() with a .py extension."""
+        return self._makefile(".py", args, kwargs)
+
+    def maketxtfile(self, *args, **kwargs):
+        """Shortcut for .makefile() with a .txt extension."""
+        return self._makefile(".txt", args, kwargs)
+
+    def syspathinsert(self, path=None):
+        """Prepend a directory to sys.path, defaults to :py:attr:`tmpdir`.
+
+        This is undone automatically when this object dies at the end of each
+        test.
+        """
+        if path is None:
+            path = self.tmpdir
+
+        self.monkeypatch.syspath_prepend(str(path))
+
+    def mkdir(self, name):
+        """Create a new (sub)directory."""
+        return self.tmpdir.mkdir(name)
+
+    def mkpydir(self, name):
+        """Create a new python package.
+
+        This creates a (sub)directory with an empty ``__init__.py`` file so it
+        gets recognised as a python package.
+
+        """
+        p = self.mkdir(name)
+        p.ensure("__init__.py")
+        return p
+
+    def copy_example(self, name=None):
+        import warnings
+        from _pytest.warning_types import PYTESTER_COPY_EXAMPLE
+
+        warnings.warn(PYTESTER_COPY_EXAMPLE, stacklevel=2)
+        example_dir = self.request.config.getini("pytester_example_dir")
+        if example_dir is None:
+            raise ValueError("pytester_example_dir is unset, can't copy examples")
+        example_dir = self.request.config.rootdir.join(example_dir)
+
+        for extra_element in self.request.node.iter_markers("pytester_example_path"):
+            assert extra_element.args
+            example_dir = example_dir.join(*extra_element.args)
+
+        if name is None:
+            func_name = self.request.function.__name__
+            maybe_dir = example_dir / func_name
+            maybe_file = example_dir / (func_name + ".py")
+
+            if maybe_dir.isdir():
+                example_path = maybe_dir
+            elif maybe_file.isfile():
+                example_path = maybe_file
+            else:
+                raise LookupError(
+                    "{} cant be found as module or package in {}".format(
+                        func_name, example_dir.bestrelpath(self.request.config.rootdir)
+                    )
+                )
+        else:
+            example_path = example_dir.join(name)
+
+        if example_path.isdir() and not example_path.join("__init__.py").isfile():
+            example_path.copy(self.tmpdir)
+            return self.tmpdir
+        elif example_path.isfile():
+            result = self.tmpdir.join(example_path.basename)
+            example_path.copy(result)
+            return result
+        else:
+            raise LookupError(
+                'example "{}" is not found as a file or directory'.format(example_path)
+            )
+
+    Session = Session
+
+    def getnode(self, config, arg):
+        """Return the collection node of a file.
+
+        :param config: :py:class:`_pytest.config.Config` instance, see
+           :py:meth:`parseconfig` and :py:meth:`parseconfigure` to create the
+           configuration
+
+        :param arg: a :py:class:`py.path.local` instance of the file
+
+        """
+        session = Session(config)
+        assert "::" not in str(arg)
+        p = py.path.local(arg)
+        config.hook.pytest_sessionstart(session=session)
+        res = session.perform_collect([str(p)], genitems=False)[0]
+        config.hook.pytest_sessionfinish(session=session, exitstatus=EXIT_OK)
+        return res
+
+    def getpathnode(self, path):
+        """Return the collection node of a file.
+
+        This is like :py:meth:`getnode` but uses :py:meth:`parseconfigure` to
+        create the (configured) pytest Config instance.
+
+        :param path: a :py:class:`py.path.local` instance of the file
+
+        """
+        config = self.parseconfigure(path)
+        session = Session(config)
+        x = session.fspath.bestrelpath(path)
+        config.hook.pytest_sessionstart(session=session)
+        res = session.perform_collect([x], genitems=False)[0]
+        config.hook.pytest_sessionfinish(session=session, exitstatus=EXIT_OK)
+        return res
+
+    def genitems(self, colitems):
+        """Generate all test items from a collection node.
+
+        This recurses into the collection node and returns a list of all the
+        test items contained within.
+
+        """
+        session = colitems[0].session
+        result = []
+        for colitem in colitems:
+            result.extend(session.genitems(colitem))
+        return result
+
+    def runitem(self, source):
+        """Run the "test_func" Item.
+
+        The calling test instance (class containing the test method) must
+        provide a ``.getrunner()`` method which should return a runner which
+        can run the test protocol for a single item, e.g.
+        :py:func:`_pytest.runner.runtestprotocol`.
+
+        """
+        # used from runner functional tests
+        item = self.getitem(source)
+        # the test class where we are called from wants to provide the runner
+        testclassinstance = self.request.instance
+        runner = testclassinstance.getrunner()
+        return runner(item)
+
+    def inline_runsource(self, source, *cmdlineargs):
+        """Run a test module in process using ``pytest.main()``.
+
+        This run writes "source" into a temporary file and runs
+        ``pytest.main()`` on it, returning a :py:class:`HookRecorder` instance
+        for the result.
+
+        :param source: the source code of the test module
+
+        :param cmdlineargs: any extra command line arguments to use
+
+        :return: :py:class:`HookRecorder` instance of the result
+
+        """
+        p = self.makepyfile(source)
+        values = list(cmdlineargs) + [p]
+        return self.inline_run(*values)
+
+    def inline_genitems(self, *args):
+        """Run ``pytest.main(['--collectonly'])`` in-process.
+
+        Runs the :py:func:`pytest.main` function to run all of pytest inside
+        the test process itself like :py:meth:`inline_run`, but returns a
+        tuple of the collected items and a :py:class:`HookRecorder` instance.
+
+        """
+        rec = self.inline_run("--collect-only", *args)
+        items = [x.item for x in rec.getcalls("pytest_itemcollected")]
+        return items, rec
+
+    def inline_run(self, *args, **kwargs):
+        """Run ``pytest.main()`` in-process, returning a HookRecorder.
+
+        Runs the :py:func:`pytest.main` function to run all of pytest inside
+        the test process itself.  This means it can return a
+        :py:class:`HookRecorder` instance which gives more detailed results
+        from that run than can be done by matching stdout/stderr from
+        :py:meth:`runpytest`.
+
+        :param args: command line arguments to pass to :py:func:`pytest.main`
+
+        :param plugins: (keyword-only) extra plugin instances the
+           ``pytest.main()`` instance should use
+
+        :return: a :py:class:`HookRecorder` instance
+        """
+        plugins = kwargs.pop("plugins", [])
+        no_reraise_ctrlc = kwargs.pop("no_reraise_ctrlc", None)
+        raise_on_kwargs(kwargs)
+
+        finalizers = []
+        try:
+            # Do not load user config (during runs only).
+            mp_run = MonkeyPatch()
+            for k, v in self._env_run_update.items():
+                mp_run.setenv(k, v)
+            finalizers.append(mp_run.undo)
+
+            # When running pytest inline any plugins active in the main test
+            # process are already imported.  So this disables the warning which
+            # will trigger to say they can no longer be rewritten, which is
+            # fine as they have already been rewritten.
+            orig_warn = AssertionRewritingHook._warn_already_imported
+
+            def revert_warn_already_imported():
+                AssertionRewritingHook._warn_already_imported = orig_warn
+
+            finalizers.append(revert_warn_already_imported)
+            AssertionRewritingHook._warn_already_imported = lambda *a: None
+
+            # Any sys.module or sys.path changes done while running pytest
+            # inline should be reverted after the test run completes to avoid
+            # clashing with later inline tests run within the same pytest test,
+            # e.g. just because they use matching test module names.
+            finalizers.append(self.__take_sys_modules_snapshot().restore)
+            finalizers.append(SysPathsSnapshot().restore)
+
+            # Important note:
+            # - our tests should not leave any other references/registrations
+            #   laying around other than possibly loaded test modules
+            #   referenced from sys.modules, as nothing will clean those up
+            #   automatically
+
+            rec = []
+
+            class Collect(object):
+                def pytest_configure(x, config):
+                    rec.append(self.make_hook_recorder(config.pluginmanager))
+
+            plugins.append(Collect())
+            ret = pytest.main(list(args), plugins=plugins)
+            if len(rec) == 1:
+                reprec = rec.pop()
+            else:
+
+                class reprec(object):
+                    pass
+
+            reprec.ret = ret
+
+            # typically we reraise keyboard interrupts from the child run
+            # because it's our user requesting interruption of the testing
+            if ret == EXIT_INTERRUPTED and not no_reraise_ctrlc:
+                calls = reprec.getcalls("pytest_keyboard_interrupt")
+                if calls and calls[-1].excinfo.type == KeyboardInterrupt:
+                    raise KeyboardInterrupt()
+            return reprec
+        finally:
+            for finalizer in finalizers:
+                finalizer()
+
+    def runpytest_inprocess(self, *args, **kwargs):
+        """Return result of running pytest in-process, providing a similar
+        interface to what self.runpytest() provides.
+        """
+        syspathinsert = kwargs.pop("syspathinsert", False)
+
+        if syspathinsert:
+            self.syspathinsert()
+        now = time.time()
+        capture = MultiCapture(Capture=SysCapture)
+        capture.start_capturing()
+        try:
+            try:
+                reprec = self.inline_run(*args, **kwargs)
+            except SystemExit as e:
+
+                class reprec(object):
+                    ret = e.args[0]
+
+            except Exception:
+                traceback.print_exc()
+
+                class reprec(object):
+                    ret = 3
+
+        finally:
+            out, err = capture.readouterr()
+            capture.stop_capturing()
+            sys.stdout.write(out)
+            sys.stderr.write(err)
+
+        res = RunResult(reprec.ret, out.split("\n"), err.split("\n"), time.time() - now)
+        res.reprec = reprec
+        return res
+
+    def runpytest(self, *args, **kwargs):
+        """Run pytest inline or in a subprocess, depending on the command line
+        option "--runpytest" and return a :py:class:`RunResult`.
+
+        """
+        args = self._ensure_basetemp(args)
+        return self._runpytest_method(*args, **kwargs)
+
+    def _ensure_basetemp(self, args):
+        args = list(args)
+        for x in args:
+            if safe_str(x).startswith("--basetemp"):
+                break
+        else:
+            args.append("--basetemp=%s" % self.tmpdir.dirpath("basetemp"))
+        return args
+
+    def parseconfig(self, *args):
+        """Return a new pytest Config instance from given commandline args.
+
+        This invokes the pytest bootstrapping code in _pytest.config to create
+        a new :py:class:`_pytest.core.PluginManager` and call the
+        pytest_cmdline_parse hook to create a new
+        :py:class:`_pytest.config.Config` instance.
+
+        If :py:attr:`plugins` has been populated they should be plugin modules
+        to be registered with the PluginManager.
+
+        """
+        args = self._ensure_basetemp(args)
+
+        import _pytest.config
+
+        config = _pytest.config._prepareconfig(args, self.plugins)
+        # we don't know what the test will do with this half-setup config
+        # object and thus we make sure it gets unconfigured properly in any
+        # case (otherwise capturing could still be active, for example)
+        self.request.addfinalizer(config._ensure_unconfigure)
+        return config
+
+    def parseconfigure(self, *args):
+        """Return a new pytest configured Config instance.
+
+        This returns a new :py:class:`_pytest.config.Config` instance like
+        :py:meth:`parseconfig`, but also calls the pytest_configure hook.
+
+        """
+        config = self.parseconfig(*args)
+        config._do_configure()
+        self.request.addfinalizer(config._ensure_unconfigure)
+        return config
+
+    def getitem(self, source, funcname="test_func"):
+        """Return the test item for a test function.
+
+        This writes the source to a python file and runs pytest's collection on
+        the resulting module, returning the test item for the requested
+        function name.
+
+        :param source: the module source
+
+        :param funcname: the name of the test function for which to return a
+            test item
+
+        """
+        items = self.getitems(source)
+        for item in items:
+            if item.name == funcname:
+                return item
+        assert 0, "%r item not found in module:\n%s\nitems: %s" % (
+            funcname,
+            source,
+            items,
+        )
+
+    def getitems(self, source):
+        """Return all test items collected from the module.
+
+        This writes the source to a python file and runs pytest's collection on
+        the resulting module, returning all test items contained within.
+
+        """
+        modcol = self.getmodulecol(source)
+        return self.genitems([modcol])
+
+    def getmodulecol(self, source, configargs=(), withinit=False):
+        """Return the module collection node for ``source``.
+
+        This writes ``source`` to a file using :py:meth:`makepyfile` and then
+        runs the pytest collection on it, returning the collection node for the
+        test module.
+
+        :param source: the source code of the module to collect
+
+        :param configargs: any extra arguments to pass to
+            :py:meth:`parseconfigure`
+
+        :param withinit: whether to also write an ``__init__.py`` file to the
+            same directory to ensure it is a package
+
+        """
+        if isinstance(source, Path):
+            path = self.tmpdir.join(str(source))
+            assert not withinit, "not supported for paths"
+        else:
+            kw = {self.request.function.__name__: Source(source).strip()}
+            path = self.makepyfile(**kw)
+        if withinit:
+            self.makepyfile(__init__="#")
+        self.config = config = self.parseconfigure(path, *configargs)
+        return self.getnode(config, path)
+
+    def collect_by_name(self, modcol, name):
+        """Return the collection node for name from the module collection.
+
+        This will search a module collection node for a collection node
+        matching the given name.
+
+        :param modcol: a module collection node; see :py:meth:`getmodulecol`
+
+        :param name: the name of the node to return
+
+        """
+        if modcol not in self._mod_collections:
+            self._mod_collections[modcol] = list(modcol.collect())
+        for colitem in self._mod_collections[modcol]:
+            if colitem.name == name:
+                return colitem
+
+    def popen(
+        self,
+        cmdargs,
+        stdout=subprocess.PIPE,
+        stderr=subprocess.PIPE,
+        stdin=CLOSE_STDIN,
+        **kw
+    ):
+        """Invoke subprocess.Popen.
+
+        This calls subprocess.Popen making sure the current working directory
+        is in the PYTHONPATH.
+
+        You probably want to use :py:meth:`run` instead.
+
+        """
+        env = os.environ.copy()
+        env["PYTHONPATH"] = os.pathsep.join(
+            filter(None, [os.getcwd(), env.get("PYTHONPATH", "")])
+        )
+        env.update(self._env_run_update)
+        kw["env"] = env
+
+        if stdin is Testdir.CLOSE_STDIN:
+            kw["stdin"] = subprocess.PIPE
+        elif isinstance(stdin, bytes):
+            kw["stdin"] = subprocess.PIPE
+        else:
+            kw["stdin"] = stdin
+
+        popen = subprocess.Popen(cmdargs, stdout=stdout, stderr=stderr, **kw)
+        if stdin is Testdir.CLOSE_STDIN:
+            popen.stdin.close()
+        elif isinstance(stdin, bytes):
+            popen.stdin.write(stdin)
+
+        return popen
+
+    def run(self, *cmdargs, **kwargs):
+        """Run a command with arguments.
+
+        Run a process using subprocess.Popen saving the stdout and stderr.
+
+        :param args: the sequence of arguments to pass to `subprocess.Popen()`
+        :param timeout: the period in seconds after which to timeout and raise
+            :py:class:`Testdir.TimeoutExpired`
+        :param stdin: optional standard input.  Bytes are being send, closing
+            the pipe, otherwise it is passed through to ``popen``.
+            Defaults to ``CLOSE_STDIN``, which translates to using a pipe
+            (``subprocess.PIPE``) that gets closed.
+
+        Returns a :py:class:`RunResult`.
+
+        """
+        __tracebackhide__ = True
+
+        timeout = kwargs.pop("timeout", None)
+        stdin = kwargs.pop("stdin", Testdir.CLOSE_STDIN)
+        raise_on_kwargs(kwargs)
+
+        cmdargs = [
+            str(arg) if isinstance(arg, py.path.local) else arg for arg in cmdargs
+        ]
+        p1 = self.tmpdir.join("stdout")
+        p2 = self.tmpdir.join("stderr")
+        print("running:", *cmdargs)
+        print("     in:", py.path.local())
+        f1 = codecs.open(str(p1), "w", encoding="utf8")
+        f2 = codecs.open(str(p2), "w", encoding="utf8")
+        try:
+            now = time.time()
+            popen = self.popen(
+                cmdargs,
+                stdin=stdin,
+                stdout=f1,
+                stderr=f2,
+                close_fds=(sys.platform != "win32"),
+            )
+            if isinstance(stdin, bytes):
+                popen.stdin.close()
+
+            def handle_timeout():
+                __tracebackhide__ = True
+
+                timeout_message = (
+                    "{seconds} second timeout expired running:"
+                    " {command}".format(seconds=timeout, command=cmdargs)
+                )
+
+                popen.kill()
+                popen.wait()
+                raise self.TimeoutExpired(timeout_message)
+
+            if timeout is None:
+                ret = popen.wait()
+            elif not six.PY2:
+                try:
+                    ret = popen.wait(timeout)
+                except subprocess.TimeoutExpired:
+                    handle_timeout()
+            else:
+                end = time.time() + timeout
+
+                resolution = min(0.1, timeout / 10)
+
+                while True:
+                    ret = popen.poll()
+                    if ret is not None:
+                        break
+
+                    if time.time() > end:
+                        handle_timeout()
+
+                    time.sleep(resolution)
+        finally:
+            f1.close()
+            f2.close()
+        f1 = codecs.open(str(p1), "r", encoding="utf8")
+        f2 = codecs.open(str(p2), "r", encoding="utf8")
+        try:
+            out = f1.read().splitlines()
+            err = f2.read().splitlines()
+        finally:
+            f1.close()
+            f2.close()
+        self._dump_lines(out, sys.stdout)
+        self._dump_lines(err, sys.stderr)
+        return RunResult(ret, out, err, time.time() - now)
+
+    def _dump_lines(self, lines, fp):
+        try:
+            for line in lines:
+                print(line, file=fp)
+        except UnicodeEncodeError:
+            print("couldn't print to %s because of encoding" % (fp,))
+
+    def _getpytestargs(self):
+        return sys.executable, "-mpytest"
+
+    def runpython(self, script):
+        """Run a python script using sys.executable as interpreter.
+
+        Returns a :py:class:`RunResult`.
+
+        """
+        return self.run(sys.executable, script)
+
+    def runpython_c(self, command):
+        """Run python -c "command", return a :py:class:`RunResult`."""
+        return self.run(sys.executable, "-c", command)
+
+    def runpytest_subprocess(self, *args, **kwargs):
+        """Run pytest as a subprocess with given arguments.
+
+        Any plugins added to the :py:attr:`plugins` list will be added using the
+        ``-p`` command line option.  Additionally ``--basetemp`` is used to put
+        any temporary files and directories in a numbered directory prefixed
+        with "runpytest-" to not conflict with the normal numbered pytest
+        location for temporary files and directories.
+
+        :param args: the sequence of arguments to pass to the pytest subprocess
+        :param timeout: the period in seconds after which to timeout and raise
+            :py:class:`Testdir.TimeoutExpired`
+
+        Returns a :py:class:`RunResult`.
+        """
+        __tracebackhide__ = True
+        timeout = kwargs.pop("timeout", None)
+        raise_on_kwargs(kwargs)
+
+        p = py.path.local.make_numbered_dir(
+            prefix="runpytest-", keep=None, rootdir=self.tmpdir
+        )
+        args = ("--basetemp=%s" % p,) + args
+        plugins = [x for x in self.plugins if isinstance(x, str)]
+        if plugins:
+            args = ("-p", plugins[0]) + args
+        args = self._getpytestargs() + args
+        return self.run(*args, timeout=timeout)
+
+    def spawn_pytest(self, string, expect_timeout=10.0):
+        """Run pytest using pexpect.
+
+        This makes sure to use the right pytest and sets up the temporary
+        directory locations.
+
+        The pexpect child is returned.
+
+        """
+        basetemp = self.tmpdir.mkdir("temp-pexpect")
+        invoke = " ".join(map(str, self._getpytestargs()))
+        cmd = "%s --basetemp=%s %s" % (invoke, basetemp, string)
+        return self.spawn(cmd, expect_timeout=expect_timeout)
+
+    def spawn(self, cmd, expect_timeout=10.0):
+        """Run a command using pexpect.
+
+        The pexpect child is returned.
+
+        """
+        pexpect = pytest.importorskip("pexpect", "3.0")
+        if hasattr(sys, "pypy_version_info") and "64" in platform.machine():
+            pytest.skip("pypy-64 bit not supported")
+        if sys.platform.startswith("freebsd"):
+            pytest.xfail("pexpect does not work reliably on freebsd")
+        logfile = self.tmpdir.join("spawn.out").open("wb")
+
+        # Do not load user config.
+        env = os.environ.copy()
+        env.update(self._env_run_update)
+
+        child = pexpect.spawn(cmd, logfile=logfile, env=env)
+        self.request.addfinalizer(logfile.close)
+        child.timeout = expect_timeout
+        return child
+
+
+def getdecoded(out):
+    try:
+        return out.decode("utf-8")
+    except UnicodeDecodeError:
+        return "INTERNAL not-utf8-decodeable, truncated string:\n%s" % (saferepr(out),)
+
+
+class LineComp(object):
+    def __init__(self):
+        self.stringio = py.io.TextIO()
+
+    def assert_contains_lines(self, lines2):
+        """Assert that lines2 are contained (linearly) in lines1.
+
+        Return a list of extralines found.
+
+        """
+        __tracebackhide__ = True
+        val = self.stringio.getvalue()
+        self.stringio.truncate(0)
+        self.stringio.seek(0)
+        lines1 = val.split("\n")
+        return LineMatcher(lines1).fnmatch_lines(lines2)
+
+
+class LineMatcher(object):
+    """Flexible matching of text.
+
+    This is a convenience class to test large texts like the output of
+    commands.
+
+    The constructor takes a list of lines without their trailing newlines, i.e.
+    ``text.splitlines()``.
+
+    """
+
+    def __init__(self, lines):
+        self.lines = lines
+        self._log_output = []
+
+    def str(self):
+        """Return the entire original text."""
+        return "\n".join(self.lines)
+
+    def _getlines(self, lines2):
+        if isinstance(lines2, str):
+            lines2 = Source(lines2)
+        if isinstance(lines2, Source):
+            lines2 = lines2.strip().lines
+        return lines2
+
+    def fnmatch_lines_random(self, lines2):
+        """Check lines exist in the output using in any order.
+
+        Lines are checked using ``fnmatch.fnmatch``. The argument is a list of
+        lines which have to occur in the output, in any order.
+
+        """
+        self._match_lines_random(lines2, fnmatch)
+
+    def re_match_lines_random(self, lines2):
+        """Check lines exist in the output using ``re.match``, in any order.
+
+        The argument is a list of lines which have to occur in the output, in
+        any order.
+
+        """
+        self._match_lines_random(lines2, lambda name, pat: re.match(pat, name))
+
+    def _match_lines_random(self, lines2, match_func):
+        """Check lines exist in the output.
+
+        The argument is a list of lines which have to occur in the output, in
+        any order.  Each line can contain glob whildcards.
+
+        """
+        lines2 = self._getlines(lines2)
+        for line in lines2:
+            for x in self.lines:
+                if line == x or match_func(x, line):
+                    self._log("matched: ", repr(line))
+                    break
+            else:
+                self._log("line %r not found in output" % line)
+                raise ValueError(self._log_text)
+
+    def get_lines_after(self, fnline):
+        """Return all lines following the given line in the text.
+
+        The given line can contain glob wildcards.
+
+        """
+        for i, line in enumerate(self.lines):
+            if fnline == line or fnmatch(line, fnline):
+                return self.lines[i + 1 :]
+        raise ValueError("line %r not found in output" % fnline)
+
+    def _log(self, *args):
+        self._log_output.append(" ".join(str(x) for x in args))
+
+    @property
+    def _log_text(self):
+        return "\n".join(self._log_output)
+
+    def fnmatch_lines(self, lines2):
+        """Search captured text for matching lines using ``fnmatch.fnmatch``.
+
+        The argument is a list of lines which have to match and can use glob
+        wildcards.  If they do not match a pytest.fail() is called.  The
+        matches and non-matches are also printed on stdout.
+
+        """
+        __tracebackhide__ = True
+        self._match_lines(lines2, fnmatch, "fnmatch")
+
+    def re_match_lines(self, lines2):
+        """Search captured text for matching lines using ``re.match``.
+
+        The argument is a list of lines which have to match using ``re.match``.
+        If they do not match a pytest.fail() is called.
+
+        The matches and non-matches are also printed on stdout.
+
+        """
+        __tracebackhide__ = True
+        self._match_lines(lines2, lambda name, pat: re.match(pat, name), "re.match")
+
+    def _match_lines(self, lines2, match_func, match_nickname):
+        """Underlying implementation of ``fnmatch_lines`` and ``re_match_lines``.
+
+        :param list[str] lines2: list of string patterns to match. The actual
+            format depends on ``match_func``
+        :param match_func: a callable ``match_func(line, pattern)`` where line
+            is the captured line from stdout/stderr and pattern is the matching
+            pattern
+        :param str match_nickname: the nickname for the match function that
+            will be logged to stdout when a match occurs
+
+        """
+        assert isinstance(lines2, Sequence)
+        lines2 = self._getlines(lines2)
+        lines1 = self.lines[:]
+        nextline = None
+        extralines = []
+        __tracebackhide__ = True
+        for line in lines2:
+            nomatchprinted = False
+            while lines1:
+                nextline = lines1.pop(0)
+                if line == nextline:
+                    self._log("exact match:", repr(line))
+                    break
+                elif match_func(nextline, line):
+                    self._log("%s:" % match_nickname, repr(line))
+                    self._log("   with:", repr(nextline))
+                    break
+                else:
+                    if not nomatchprinted:
+                        self._log("nomatch:", repr(line))
+                        nomatchprinted = True
+                    self._log("    and:", repr(nextline))
+                extralines.append(nextline)
+            else:
+                self._log("remains unmatched: %r" % (line,))
+                pytest.fail(self._log_text)
-- 
cgit v1.2.3