aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/pytest/py2/_pytest/pytester.py
diff options
context:
space:
mode:
authordeshevoy <deshevoy@yandex-team.ru>2022-02-10 16:46:56 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:46:56 +0300
commite988f30484abe5fdeedcc7a5d3c226c01a21800c (patch)
tree0a217b173aabb57b7e51f8a169989b1a3e0309fe /contrib/python/pytest/py2/_pytest/pytester.py
parent33ee501c05d3f24036ae89766a858930ae66c548 (diff)
downloadydb-e988f30484abe5fdeedcc7a5d3c226c01a21800c.tar.gz
Restoring authorship annotation for <deshevoy@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/python/pytest/py2/_pytest/pytester.py')
-rw-r--r--contrib/python/pytest/py2/_pytest/pytester.py2566
1 files changed, 1283 insertions, 1283 deletions
diff --git a/contrib/python/pytest/py2/_pytest/pytester.py b/contrib/python/pytest/py2/_pytest/pytester.py
index f1d739c991..b907216770 100644
--- a/contrib/python/pytest/py2/_pytest/pytester.py
+++ b/contrib/python/pytest/py2/_pytest/pytester.py
@@ -1,342 +1,342 @@
# -*- coding: utf-8 -*-
-"""(disabled by default) support for testing pytest and pytest plugins."""
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
-import codecs
-import gc
-import os
-import platform
-import re
-import subprocess
-import sys
-import time
-import traceback
-from fnmatch import fnmatch
-from weakref import WeakKeyDictionary
-
-import py
-import six
-
-import pytest
-from _pytest._code import Source
+"""(disabled by default) support for testing pytest and pytest plugins."""
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import codecs
+import gc
+import os
+import platform
+import re
+import subprocess
+import sys
+import time
+import traceback
+from fnmatch import fnmatch
+from weakref import WeakKeyDictionary
+
+import py
+import six
+
+import pytest
+from _pytest._code import Source
from _pytest._io.saferepr import saferepr
-from _pytest.assertion.rewrite import AssertionRewritingHook
-from _pytest.capture import MultiCapture
-from _pytest.capture import SysCapture
-from _pytest.compat import safe_str
+from _pytest.assertion.rewrite import AssertionRewritingHook
+from _pytest.capture import MultiCapture
+from _pytest.capture import SysCapture
+from _pytest.compat import safe_str
from _pytest.compat import Sequence
-from _pytest.main import EXIT_INTERRUPTED
-from _pytest.main import EXIT_OK
-from _pytest.main import Session
+from _pytest.main import EXIT_INTERRUPTED
+from _pytest.main import EXIT_OK
+from _pytest.main import Session
from _pytest.monkeypatch import MonkeyPatch
-from _pytest.pathlib import Path
-
-IGNORE_PAM = [ # filenames added when obtaining details about the current user
- u"/var/lib/sss/mc/passwd"
-]
-
-
-def pytest_addoption(parser):
- parser.addoption(
- "--lsof",
- action="store_true",
- dest="lsof",
- default=False,
- help="run FD checks if lsof is available",
- )
-
- parser.addoption(
- "--runpytest",
- default="inprocess",
- dest="runpytest",
- choices=("inprocess", "subprocess"),
- help=(
- "run pytest sub runs in tests using an 'inprocess' "
- "or 'subprocess' (python -m main) method"
- ),
- )
-
- parser.addini(
- "pytester_example_dir", help="directory to take the pytester example files from"
- )
-
-
-def pytest_configure(config):
- if config.getvalue("lsof"):
- checker = LsofFdLeakChecker()
- if checker.matching_platform():
- config.pluginmanager.register(checker)
-
+from _pytest.pathlib import Path
+
+IGNORE_PAM = [ # filenames added when obtaining details about the current user
+ u"/var/lib/sss/mc/passwd"
+]
+
+
+def pytest_addoption(parser):
+ parser.addoption(
+ "--lsof",
+ action="store_true",
+ dest="lsof",
+ default=False,
+ help="run FD checks if lsof is available",
+ )
+
+ parser.addoption(
+ "--runpytest",
+ default="inprocess",
+ dest="runpytest",
+ choices=("inprocess", "subprocess"),
+ help=(
+ "run pytest sub runs in tests using an 'inprocess' "
+ "or 'subprocess' (python -m main) method"
+ ),
+ )
+
+ parser.addini(
+ "pytester_example_dir", help="directory to take the pytester example files from"
+ )
+
+
+def pytest_configure(config):
+ if config.getvalue("lsof"):
+ checker = LsofFdLeakChecker()
+ if checker.matching_platform():
+ config.pluginmanager.register(checker)
+
config.addinivalue_line(
"markers",
"pytester_example_path(*path_segments): join the given path "
"segments to `pytester_example_dir` for this test.",
)
+
-
-def raise_on_kwargs(kwargs):
+def raise_on_kwargs(kwargs):
__tracebackhide__ = True
if kwargs: # pragma: no branch
raise TypeError(
"Unexpected keyword arguments: {}".format(", ".join(sorted(kwargs)))
)
-
-
-class LsofFdLeakChecker(object):
- def get_open_files(self):
- out = self._exec_lsof()
- open_files = self._parse_lsof_output(out)
- return open_files
-
- def _exec_lsof(self):
- pid = os.getpid()
+
+
+class LsofFdLeakChecker(object):
+ def get_open_files(self):
+ out = self._exec_lsof()
+ open_files = self._parse_lsof_output(out)
+ return open_files
+
+ def _exec_lsof(self):
+ pid = os.getpid()
# py3: use subprocess.DEVNULL directly.
with open(os.devnull, "wb") as devnull:
return subprocess.check_output(
("lsof", "-Ffn0", "-p", str(pid)), stderr=devnull
).decode()
-
- def _parse_lsof_output(self, out):
- def isopen(line):
- return line.startswith("f") and (
- "deleted" not in line
- and "mem" not in line
- and "txt" not in line
- and "cwd" not in line
- )
-
- open_files = []
-
- for line in out.split("\n"):
- if isopen(line):
- fields = line.split("\0")
- fd = fields[0][1:]
- filename = fields[1][1:]
- if filename in IGNORE_PAM:
- continue
- if filename.startswith("/"):
- open_files.append((fd, filename))
-
- return open_files
-
- def matching_platform(self):
- try:
+
+ def _parse_lsof_output(self, out):
+ def isopen(line):
+ return line.startswith("f") and (
+ "deleted" not in line
+ and "mem" not in line
+ and "txt" not in line
+ and "cwd" not in line
+ )
+
+ open_files = []
+
+ for line in out.split("\n"):
+ if isopen(line):
+ fields = line.split("\0")
+ fd = fields[0][1:]
+ filename = fields[1][1:]
+ if filename in IGNORE_PAM:
+ continue
+ if filename.startswith("/"):
+ open_files.append((fd, filename))
+
+ return open_files
+
+ def matching_platform(self):
+ try:
subprocess.check_output(("lsof", "-v"))
except (OSError, subprocess.CalledProcessError):
- return False
- else:
- return True
-
- @pytest.hookimpl(hookwrapper=True, tryfirst=True)
- def pytest_runtest_protocol(self, item):
- lines1 = self.get_open_files()
- yield
- if hasattr(sys, "pypy_version_info"):
- gc.collect()
- lines2 = self.get_open_files()
-
- new_fds = {t[0] for t in lines2} - {t[0] for t in lines1}
- leaked_files = [t for t in lines2 if t[0] in new_fds]
- if leaked_files:
- error = []
- error.append("***** %s FD leakage detected" % len(leaked_files))
- error.extend([str(f) for f in leaked_files])
- error.append("*** Before:")
- error.extend([str(f) for f in lines1])
- error.append("*** After:")
- error.extend([str(f) for f in lines2])
- error.append(error[0])
- error.append("*** function %s:%s: %s " % item.location)
- error.append("See issue #2366")
- item.warn(pytest.PytestWarning("\n".join(error)))
-
-
-# used at least by pytest-xdist plugin
-
-
-@pytest.fixture
-def _pytest(request):
- """Return a helper which offers a gethookrecorder(hook) method which
- returns a HookRecorder instance which helps to make assertions about called
- hooks.
-
- """
- return PytestArg(request)
-
-
-class PytestArg(object):
- def __init__(self, request):
- self.request = request
-
- def gethookrecorder(self, hook):
- hookrecorder = HookRecorder(hook._pm)
- self.request.addfinalizer(hookrecorder.finish_recording)
- return hookrecorder
-
-
-def get_public_names(values):
- """Only return names from iterator values without a leading underscore."""
- return [x for x in values if x[0] != "_"]
-
-
-class ParsedCall(object):
- def __init__(self, name, kwargs):
- self.__dict__.update(kwargs)
- self._name = name
-
- def __repr__(self):
- d = self.__dict__.copy()
- del d["_name"]
- return "<ParsedCall %r(**%r)>" % (self._name, d)
-
-
-class HookRecorder(object):
- """Record all hooks called in a plugin manager.
-
- This wraps all the hook calls in the plugin manager, recording each call
- before propagating the normal calls.
-
- """
-
- def __init__(self, pluginmanager):
- self._pluginmanager = pluginmanager
- self.calls = []
-
- def before(hook_name, hook_impls, kwargs):
- self.calls.append(ParsedCall(hook_name, kwargs))
-
- def after(outcome, hook_name, hook_impls, kwargs):
- pass
-
- self._undo_wrapping = pluginmanager.add_hookcall_monitoring(before, after)
-
- def finish_recording(self):
- self._undo_wrapping()
-
- def getcalls(self, names):
- if isinstance(names, str):
- names = names.split()
- return [call for call in self.calls if call._name in names]
-
- def assert_contains(self, entries):
- __tracebackhide__ = True
- i = 0
- entries = list(entries)
- backlocals = sys._getframe(1).f_locals
- while entries:
- name, check = entries.pop(0)
- for ind, call in enumerate(self.calls[i:]):
- if call._name == name:
- print("NAMEMATCH", name, call)
- if eval(check, backlocals, call.__dict__):
- print("CHECKERMATCH", repr(check), "->", call)
- else:
- print("NOCHECKERMATCH", repr(check), "-", call)
- continue
- i += ind + 1
- break
- print("NONAMEMATCH", name, "with", call)
- else:
- pytest.fail("could not find %r check %r" % (name, check))
-
- def popcall(self, name):
- __tracebackhide__ = True
- for i, call in enumerate(self.calls):
- if call._name == name:
- del self.calls[i]
- return call
- lines = ["could not find call %r, in:" % (name,)]
- lines.extend([" %s" % x for x in self.calls])
- pytest.fail("\n".join(lines))
-
- def getcall(self, name):
- values = self.getcalls(name)
- assert len(values) == 1, (name, values)
- return values[0]
-
- # functionality for test reports
-
- def getreports(self, names="pytest_runtest_logreport pytest_collectreport"):
- return [x.report for x in self.getcalls(names)]
-
- def matchreport(
- self,
- inamepart="",
- names="pytest_runtest_logreport pytest_collectreport",
- when=None,
- ):
- """return a testreport whose dotted import path matches"""
- values = []
- for rep in self.getreports(names=names):
+ return False
+ else:
+ return True
+
+ @pytest.hookimpl(hookwrapper=True, tryfirst=True)
+ def pytest_runtest_protocol(self, item):
+ lines1 = self.get_open_files()
+ yield
+ if hasattr(sys, "pypy_version_info"):
+ gc.collect()
+ lines2 = self.get_open_files()
+
+ new_fds = {t[0] for t in lines2} - {t[0] for t in lines1}
+ leaked_files = [t for t in lines2 if t[0] in new_fds]
+ if leaked_files:
+ error = []
+ error.append("***** %s FD leakage detected" % len(leaked_files))
+ error.extend([str(f) for f in leaked_files])
+ error.append("*** Before:")
+ error.extend([str(f) for f in lines1])
+ error.append("*** After:")
+ error.extend([str(f) for f in lines2])
+ error.append(error[0])
+ error.append("*** function %s:%s: %s " % item.location)
+ error.append("See issue #2366")
+ item.warn(pytest.PytestWarning("\n".join(error)))
+
+
+# used at least by pytest-xdist plugin
+
+
+@pytest.fixture
+def _pytest(request):
+ """Return a helper which offers a gethookrecorder(hook) method which
+ returns a HookRecorder instance which helps to make assertions about called
+ hooks.
+
+ """
+ return PytestArg(request)
+
+
+class PytestArg(object):
+ def __init__(self, request):
+ self.request = request
+
+ def gethookrecorder(self, hook):
+ hookrecorder = HookRecorder(hook._pm)
+ self.request.addfinalizer(hookrecorder.finish_recording)
+ return hookrecorder
+
+
+def get_public_names(values):
+ """Only return names from iterator values without a leading underscore."""
+ return [x for x in values if x[0] != "_"]
+
+
+class ParsedCall(object):
+ def __init__(self, name, kwargs):
+ self.__dict__.update(kwargs)
+ self._name = name
+
+ def __repr__(self):
+ d = self.__dict__.copy()
+ del d["_name"]
+ return "<ParsedCall %r(**%r)>" % (self._name, d)
+
+
+class HookRecorder(object):
+ """Record all hooks called in a plugin manager.
+
+ This wraps all the hook calls in the plugin manager, recording each call
+ before propagating the normal calls.
+
+ """
+
+ def __init__(self, pluginmanager):
+ self._pluginmanager = pluginmanager
+ self.calls = []
+
+ def before(hook_name, hook_impls, kwargs):
+ self.calls.append(ParsedCall(hook_name, kwargs))
+
+ def after(outcome, hook_name, hook_impls, kwargs):
+ pass
+
+ self._undo_wrapping = pluginmanager.add_hookcall_monitoring(before, after)
+
+ def finish_recording(self):
+ self._undo_wrapping()
+
+ def getcalls(self, names):
+ if isinstance(names, str):
+ names = names.split()
+ return [call for call in self.calls if call._name in names]
+
+ def assert_contains(self, entries):
+ __tracebackhide__ = True
+ i = 0
+ entries = list(entries)
+ backlocals = sys._getframe(1).f_locals
+ while entries:
+ name, check = entries.pop(0)
+ for ind, call in enumerate(self.calls[i:]):
+ if call._name == name:
+ print("NAMEMATCH", name, call)
+ if eval(check, backlocals, call.__dict__):
+ print("CHECKERMATCH", repr(check), "->", call)
+ else:
+ print("NOCHECKERMATCH", repr(check), "-", call)
+ continue
+ i += ind + 1
+ break
+ print("NONAMEMATCH", name, "with", call)
+ else:
+ pytest.fail("could not find %r check %r" % (name, check))
+
+ def popcall(self, name):
+ __tracebackhide__ = True
+ for i, call in enumerate(self.calls):
+ if call._name == name:
+ del self.calls[i]
+ return call
+ lines = ["could not find call %r, in:" % (name,)]
+ lines.extend([" %s" % x for x in self.calls])
+ pytest.fail("\n".join(lines))
+
+ def getcall(self, name):
+ values = self.getcalls(name)
+ assert len(values) == 1, (name, values)
+ return values[0]
+
+ # functionality for test reports
+
+ def getreports(self, names="pytest_runtest_logreport pytest_collectreport"):
+ return [x.report for x in self.getcalls(names)]
+
+ def matchreport(
+ self,
+ inamepart="",
+ names="pytest_runtest_logreport pytest_collectreport",
+ when=None,
+ ):
+ """return a testreport whose dotted import path matches"""
+ values = []
+ for rep in self.getreports(names=names):
if not when and rep.when != "call" and rep.passed:
# setup/teardown passing reports - let's ignore those
- continue
+ continue
if when and rep.when != when:
continue
- if not inamepart or inamepart in rep.nodeid.split("::"):
- values.append(rep)
- if not values:
- raise ValueError(
- "could not find test report matching %r: "
- "no test reports at all!" % (inamepart,)
- )
- if len(values) > 1:
- raise ValueError(
- "found 2 or more testreports matching %r: %s" % (inamepart, values)
- )
- return values[0]
-
- def getfailures(self, names="pytest_runtest_logreport pytest_collectreport"):
- return [rep for rep in self.getreports(names) if rep.failed]
-
- def getfailedcollections(self):
- return self.getfailures("pytest_collectreport")
-
- def listoutcomes(self):
- passed = []
- skipped = []
- failed = []
- for rep in self.getreports("pytest_collectreport pytest_runtest_logreport"):
- if rep.passed:
+ if not inamepart or inamepart in rep.nodeid.split("::"):
+ values.append(rep)
+ if not values:
+ raise ValueError(
+ "could not find test report matching %r: "
+ "no test reports at all!" % (inamepart,)
+ )
+ if len(values) > 1:
+ raise ValueError(
+ "found 2 or more testreports matching %r: %s" % (inamepart, values)
+ )
+ return values[0]
+
+ def getfailures(self, names="pytest_runtest_logreport pytest_collectreport"):
+ return [rep for rep in self.getreports(names) if rep.failed]
+
+ def getfailedcollections(self):
+ return self.getfailures("pytest_collectreport")
+
+ def listoutcomes(self):
+ passed = []
+ skipped = []
+ failed = []
+ for rep in self.getreports("pytest_collectreport pytest_runtest_logreport"):
+ if rep.passed:
if rep.when == "call":
- passed.append(rep)
- elif rep.skipped:
- skipped.append(rep)
+ passed.append(rep)
+ elif rep.skipped:
+ skipped.append(rep)
else:
assert rep.failed, "Unexpected outcome: {!r}".format(rep)
- failed.append(rep)
- return passed, skipped, failed
-
- def countoutcomes(self):
- return [len(x) for x in self.listoutcomes()]
-
- def assertoutcome(self, passed=0, skipped=0, failed=0):
- realpassed, realskipped, realfailed = self.listoutcomes()
- assert passed == len(realpassed)
- assert skipped == len(realskipped)
- assert failed == len(realfailed)
-
- def clear(self):
- self.calls[:] = []
-
-
-@pytest.fixture
-def linecomp(request):
- return LineComp()
-
-
-@pytest.fixture(name="LineMatcher")
-def LineMatcher_fixture(request):
- return LineMatcher
-
-
-@pytest.fixture
-def testdir(request, tmpdir_factory):
- return Testdir(request, tmpdir_factory)
-
-
+ failed.append(rep)
+ return passed, skipped, failed
+
+ def countoutcomes(self):
+ return [len(x) for x in self.listoutcomes()]
+
+ def assertoutcome(self, passed=0, skipped=0, failed=0):
+ realpassed, realskipped, realfailed = self.listoutcomes()
+ assert passed == len(realpassed)
+ assert skipped == len(realskipped)
+ assert failed == len(realfailed)
+
+ def clear(self):
+ self.calls[:] = []
+
+
+@pytest.fixture
+def linecomp(request):
+ return LineComp()
+
+
+@pytest.fixture(name="LineMatcher")
+def LineMatcher_fixture(request):
+ return LineMatcher
+
+
+@pytest.fixture
+def testdir(request, tmpdir_factory):
+ return Testdir(request, tmpdir_factory)
+
+
@pytest.fixture
def _sys_snapshot():
snappaths = SysPathsSnapshot()
@@ -355,152 +355,152 @@ def _config_for_test():
config._ensure_unconfigure() # cleanup, e.g. capman closing tmpfiles.
-rex_outcome = re.compile(r"(\d+) ([\w-]+)")
-
-
-class RunResult(object):
- """The result of running a command.
-
- Attributes:
-
- :ret: the return value
- :outlines: list of lines captured from stdout
- :errlines: list of lines captures from stderr
- :stdout: :py:class:`LineMatcher` of stdout, use ``stdout.str()`` to
- reconstruct stdout or the commonly used ``stdout.fnmatch_lines()``
- method
- :stderr: :py:class:`LineMatcher` of stderr
- :duration: duration in seconds
-
- """
-
- def __init__(self, ret, outlines, errlines, duration):
- self.ret = ret
- self.outlines = outlines
- self.errlines = errlines
- self.stdout = LineMatcher(outlines)
- self.stderr = LineMatcher(errlines)
- self.duration = duration
-
+rex_outcome = re.compile(r"(\d+) ([\w-]+)")
+
+
+class RunResult(object):
+ """The result of running a command.
+
+ Attributes:
+
+ :ret: the return value
+ :outlines: list of lines captured from stdout
+ :errlines: list of lines captures from stderr
+ :stdout: :py:class:`LineMatcher` of stdout, use ``stdout.str()`` to
+ reconstruct stdout or the commonly used ``stdout.fnmatch_lines()``
+ method
+ :stderr: :py:class:`LineMatcher` of stderr
+ :duration: duration in seconds
+
+ """
+
+ def __init__(self, ret, outlines, errlines, duration):
+ self.ret = ret
+ self.outlines = outlines
+ self.errlines = errlines
+ self.stdout = LineMatcher(outlines)
+ self.stderr = LineMatcher(errlines)
+ self.duration = duration
+
def __repr__(self):
return (
"<RunResult ret=%r len(stdout.lines)=%d len(stderr.lines)=%d duration=%.2fs>"
% (self.ret, len(self.stdout.lines), len(self.stderr.lines), self.duration)
)
- def parseoutcomes(self):
- """Return a dictionary of outcomestring->num from parsing the terminal
- output that the test process produced.
-
- """
- for line in reversed(self.outlines):
- if "seconds" in line:
- outcomes = rex_outcome.findall(line)
- if outcomes:
- d = {}
- for num, cat in outcomes:
- d[cat] = int(num)
- return d
- raise ValueError("Pytest terminal report not found")
-
- def assert_outcomes(
- self, passed=0, skipped=0, failed=0, error=0, xpassed=0, xfailed=0
- ):
- """Assert that the specified outcomes appear with the respective
- numbers (0 means it didn't occur) in the text output from a test run.
-
- """
- d = self.parseoutcomes()
- obtained = {
- "passed": d.get("passed", 0),
- "skipped": d.get("skipped", 0),
- "failed": d.get("failed", 0),
- "error": d.get("error", 0),
- "xpassed": d.get("xpassed", 0),
- "xfailed": d.get("xfailed", 0),
- }
- expected = {
- "passed": passed,
- "skipped": skipped,
- "failed": failed,
- "error": error,
- "xpassed": xpassed,
- "xfailed": xfailed,
- }
- assert obtained == expected
-
-
-class CwdSnapshot(object):
- def __init__(self):
- self.__saved = os.getcwd()
-
- def restore(self):
- os.chdir(self.__saved)
-
-
-class SysModulesSnapshot(object):
- def __init__(self, preserve=None):
- self.__preserve = preserve
- self.__saved = dict(sys.modules)
-
- def restore(self):
- if self.__preserve:
- self.__saved.update(
- (k, m) for k, m in sys.modules.items() if self.__preserve(k)
- )
- sys.modules.clear()
- sys.modules.update(self.__saved)
-
-
-class SysPathsSnapshot(object):
- def __init__(self):
- self.__saved = list(sys.path), list(sys.meta_path)
-
- def restore(self):
- sys.path[:], sys.meta_path[:] = self.__saved
-
-
-class Testdir(object):
- """Temporary test directory with tools to test/run pytest itself.
-
- This is based on the ``tmpdir`` fixture but provides a number of methods
- which aid with testing pytest itself. Unless :py:meth:`chdir` is used all
- methods will use :py:attr:`tmpdir` as their current working directory.
-
- Attributes:
-
- :tmpdir: The :py:class:`py.path.local` instance of the temporary directory.
-
- :plugins: A list of plugins to use with :py:meth:`parseconfig` and
- :py:meth:`runpytest`. Initially this is an empty list but plugins can
- be added to the list. The type of items to add to the list depends on
- the method using them so refer to them for details.
-
- """
-
+ def parseoutcomes(self):
+ """Return a dictionary of outcomestring->num from parsing the terminal
+ output that the test process produced.
+
+ """
+ for line in reversed(self.outlines):
+ if "seconds" in line:
+ outcomes = rex_outcome.findall(line)
+ if outcomes:
+ d = {}
+ for num, cat in outcomes:
+ d[cat] = int(num)
+ return d
+ raise ValueError("Pytest terminal report not found")
+
+ def assert_outcomes(
+ self, passed=0, skipped=0, failed=0, error=0, xpassed=0, xfailed=0
+ ):
+ """Assert that the specified outcomes appear with the respective
+ numbers (0 means it didn't occur) in the text output from a test run.
+
+ """
+ d = self.parseoutcomes()
+ obtained = {
+ "passed": d.get("passed", 0),
+ "skipped": d.get("skipped", 0),
+ "failed": d.get("failed", 0),
+ "error": d.get("error", 0),
+ "xpassed": d.get("xpassed", 0),
+ "xfailed": d.get("xfailed", 0),
+ }
+ expected = {
+ "passed": passed,
+ "skipped": skipped,
+ "failed": failed,
+ "error": error,
+ "xpassed": xpassed,
+ "xfailed": xfailed,
+ }
+ assert obtained == expected
+
+
+class CwdSnapshot(object):
+ def __init__(self):
+ self.__saved = os.getcwd()
+
+ def restore(self):
+ os.chdir(self.__saved)
+
+
+class SysModulesSnapshot(object):
+ def __init__(self, preserve=None):
+ self.__preserve = preserve
+ self.__saved = dict(sys.modules)
+
+ def restore(self):
+ if self.__preserve:
+ self.__saved.update(
+ (k, m) for k, m in sys.modules.items() if self.__preserve(k)
+ )
+ sys.modules.clear()
+ sys.modules.update(self.__saved)
+
+
+class SysPathsSnapshot(object):
+ def __init__(self):
+ self.__saved = list(sys.path), list(sys.meta_path)
+
+ def restore(self):
+ sys.path[:], sys.meta_path[:] = self.__saved
+
+
+class Testdir(object):
+ """Temporary test directory with tools to test/run pytest itself.
+
+ This is based on the ``tmpdir`` fixture but provides a number of methods
+ which aid with testing pytest itself. Unless :py:meth:`chdir` is used all
+ methods will use :py:attr:`tmpdir` as their current working directory.
+
+ Attributes:
+
+ :tmpdir: The :py:class:`py.path.local` instance of the temporary directory.
+
+ :plugins: A list of plugins to use with :py:meth:`parseconfig` and
+ :py:meth:`runpytest`. Initially this is an empty list but plugins can
+ be added to the list. The type of items to add to the list depends on
+ the method using them so refer to them for details.
+
+ """
+
CLOSE_STDIN = object
- class TimeoutExpired(Exception):
- pass
-
- def __init__(self, request, tmpdir_factory):
- self.request = request
- self._mod_collections = WeakKeyDictionary()
- name = request.function.__name__
- self.tmpdir = tmpdir_factory.mktemp(name, numbered=True)
- self.test_tmproot = tmpdir_factory.mktemp("tmp-" + name, numbered=True)
- self.plugins = []
- self._cwd_snapshot = CwdSnapshot()
- self._sys_path_snapshot = SysPathsSnapshot()
- self._sys_modules_snapshot = self.__take_sys_modules_snapshot()
- self.chdir()
- self.request.addfinalizer(self.finalize)
- method = self.request.config.getoption("--runpytest")
- if method == "inprocess":
- self._runpytest_method = self.runpytest_inprocess
- elif method == "subprocess":
- self._runpytest_method = self.runpytest_subprocess
-
+ class TimeoutExpired(Exception):
+ pass
+
+ def __init__(self, request, tmpdir_factory):
+ self.request = request
+ self._mod_collections = WeakKeyDictionary()
+ name = request.function.__name__
+ self.tmpdir = tmpdir_factory.mktemp(name, numbered=True)
+ self.test_tmproot = tmpdir_factory.mktemp("tmp-" + name, numbered=True)
+ self.plugins = []
+ self._cwd_snapshot = CwdSnapshot()
+ self._sys_path_snapshot = SysPathsSnapshot()
+ self._sys_modules_snapshot = self.__take_sys_modules_snapshot()
+ self.chdir()
+ self.request.addfinalizer(self.finalize)
+ method = self.request.config.getoption("--runpytest")
+ if method == "inprocess":
+ self._runpytest_method = self.runpytest_inprocess
+ elif method == "subprocess":
+ self._runpytest_method = self.runpytest_subprocess
+
mp = self.monkeypatch = MonkeyPatch()
mp.setenv("PYTEST_DEBUG_TEMPROOT", str(self.test_tmproot))
# Ensure no unexpected caching via tox.
@@ -512,523 +512,523 @@ class Testdir(object):
tmphome = str(self.tmpdir)
self._env_run_update = {"HOME": tmphome, "USERPROFILE": tmphome}
- def __repr__(self):
- return "<Testdir %r>" % (self.tmpdir,)
-
- def __str__(self):
- return str(self.tmpdir)
-
- def finalize(self):
- """Clean up global state artifacts.
-
- Some methods modify the global interpreter state and this tries to
- clean this up. It does not remove the temporary directory however so
- it can be looked at after the test run has finished.
-
- """
- self._sys_modules_snapshot.restore()
- self._sys_path_snapshot.restore()
- self._cwd_snapshot.restore()
+ def __repr__(self):
+ return "<Testdir %r>" % (self.tmpdir,)
+
+ def __str__(self):
+ return str(self.tmpdir)
+
+ def finalize(self):
+ """Clean up global state artifacts.
+
+ Some methods modify the global interpreter state and this tries to
+ clean this up. It does not remove the temporary directory however so
+ it can be looked at after the test run has finished.
+
+ """
+ self._sys_modules_snapshot.restore()
+ self._sys_path_snapshot.restore()
+ self._cwd_snapshot.restore()
self.monkeypatch.undo()
-
- def __take_sys_modules_snapshot(self):
- # some zope modules used by twisted-related tests keep internal state
- # and can't be deleted; we had some trouble in the past with
- # `zope.interface` for example
- def preserve_module(name):
- return name.startswith("zope")
-
- return SysModulesSnapshot(preserve=preserve_module)
-
- def make_hook_recorder(self, pluginmanager):
- """Create a new :py:class:`HookRecorder` for a PluginManager."""
- pluginmanager.reprec = reprec = HookRecorder(pluginmanager)
- self.request.addfinalizer(reprec.finish_recording)
- return reprec
-
- def chdir(self):
- """Cd into the temporary directory.
-
- This is done automatically upon instantiation.
-
- """
- self.tmpdir.chdir()
-
- def _makefile(self, ext, args, kwargs, encoding="utf-8"):
- items = list(kwargs.items())
-
- def to_text(s):
- return s.decode(encoding) if isinstance(s, bytes) else six.text_type(s)
-
- if args:
- source = u"\n".join(to_text(x) for x in args)
- basename = self.request.function.__name__
- items.insert(0, (basename, source))
-
- ret = None
- for basename, value in items:
- p = self.tmpdir.join(basename).new(ext=ext)
- p.dirpath().ensure_dir()
- source = Source(value)
- source = u"\n".join(to_text(line) for line in source.lines)
- p.write(source.strip().encode(encoding), "wb")
- if ret is None:
- ret = p
- return ret
-
- def makefile(self, ext, *args, **kwargs):
- r"""Create new file(s) in the testdir.
-
- :param str ext: The extension the file(s) should use, including the dot, e.g. `.py`.
- :param list[str] args: All args will be treated as strings and joined using newlines.
- The result will be written as contents to the file. The name of the
- file will be based on the test function requesting this fixture.
- :param kwargs: Each keyword is the name of a file, while the value of it will
- be written as contents of the file.
-
- Examples:
-
- .. code-block:: python
-
- testdir.makefile(".txt", "line1", "line2")
-
- testdir.makefile(".ini", pytest="[pytest]\naddopts=-rs\n")
-
- """
- return self._makefile(ext, args, kwargs)
-
- def makeconftest(self, source):
- """Write a contest.py file with 'source' as contents."""
- return self.makepyfile(conftest=source)
-
- def makeini(self, source):
- """Write a tox.ini file with 'source' as contents."""
- return self.makefile(".ini", tox=source)
-
- def getinicfg(self, source):
- """Return the pytest section from the tox.ini config file."""
- p = self.makeini(source)
- return py.iniconfig.IniConfig(p)["pytest"]
-
- def makepyfile(self, *args, **kwargs):
- """Shortcut for .makefile() with a .py extension."""
- return self._makefile(".py", args, kwargs)
-
- def maketxtfile(self, *args, **kwargs):
- """Shortcut for .makefile() with a .txt extension."""
- return self._makefile(".txt", args, kwargs)
-
- def syspathinsert(self, path=None):
- """Prepend a directory to sys.path, defaults to :py:attr:`tmpdir`.
-
- This is undone automatically when this object dies at the end of each
- test.
- """
- if path is None:
- path = self.tmpdir
-
+
+ def __take_sys_modules_snapshot(self):
+ # some zope modules used by twisted-related tests keep internal state
+ # and can't be deleted; we had some trouble in the past with
+ # `zope.interface` for example
+ def preserve_module(name):
+ return name.startswith("zope")
+
+ return SysModulesSnapshot(preserve=preserve_module)
+
+ def make_hook_recorder(self, pluginmanager):
+ """Create a new :py:class:`HookRecorder` for a PluginManager."""
+ pluginmanager.reprec = reprec = HookRecorder(pluginmanager)
+ self.request.addfinalizer(reprec.finish_recording)
+ return reprec
+
+ def chdir(self):
+ """Cd into the temporary directory.
+
+ This is done automatically upon instantiation.
+
+ """
+ self.tmpdir.chdir()
+
+ def _makefile(self, ext, args, kwargs, encoding="utf-8"):
+ items = list(kwargs.items())
+
+ def to_text(s):
+ return s.decode(encoding) if isinstance(s, bytes) else six.text_type(s)
+
+ if args:
+ source = u"\n".join(to_text(x) for x in args)
+ basename = self.request.function.__name__
+ items.insert(0, (basename, source))
+
+ ret = None
+ for basename, value in items:
+ p = self.tmpdir.join(basename).new(ext=ext)
+ p.dirpath().ensure_dir()
+ source = Source(value)
+ source = u"\n".join(to_text(line) for line in source.lines)
+ p.write(source.strip().encode(encoding), "wb")
+ if ret is None:
+ ret = p
+ return ret
+
+ def makefile(self, ext, *args, **kwargs):
+ r"""Create new file(s) in the testdir.
+
+ :param str ext: The extension the file(s) should use, including the dot, e.g. `.py`.
+ :param list[str] args: All args will be treated as strings and joined using newlines.
+ The result will be written as contents to the file. The name of the
+ file will be based on the test function requesting this fixture.
+ :param kwargs: Each keyword is the name of a file, while the value of it will
+ be written as contents of the file.
+
+ Examples:
+
+ .. code-block:: python
+
+ testdir.makefile(".txt", "line1", "line2")
+
+ testdir.makefile(".ini", pytest="[pytest]\naddopts=-rs\n")
+
+ """
+ return self._makefile(ext, args, kwargs)
+
+ def makeconftest(self, source):
+ """Write a contest.py file with 'source' as contents."""
+ return self.makepyfile(conftest=source)
+
+ def makeini(self, source):
+ """Write a tox.ini file with 'source' as contents."""
+ return self.makefile(".ini", tox=source)
+
+ def getinicfg(self, source):
+ """Return the pytest section from the tox.ini config file."""
+ p = self.makeini(source)
+ return py.iniconfig.IniConfig(p)["pytest"]
+
+ def makepyfile(self, *args, **kwargs):
+ """Shortcut for .makefile() with a .py extension."""
+ return self._makefile(".py", args, kwargs)
+
+ def maketxtfile(self, *args, **kwargs):
+ """Shortcut for .makefile() with a .txt extension."""
+ return self._makefile(".txt", args, kwargs)
+
+ def syspathinsert(self, path=None):
+ """Prepend a directory to sys.path, defaults to :py:attr:`tmpdir`.
+
+ This is undone automatically when this object dies at the end of each
+ test.
+ """
+ if path is None:
+ path = self.tmpdir
+
self.monkeypatch.syspath_prepend(str(path))
-
- def mkdir(self, name):
- """Create a new (sub)directory."""
- return self.tmpdir.mkdir(name)
-
- def mkpydir(self, name):
- """Create a new python package.
-
- This creates a (sub)directory with an empty ``__init__.py`` file so it
- gets recognised as a python package.
-
- """
- p = self.mkdir(name)
- p.ensure("__init__.py")
- return p
-
- def copy_example(self, name=None):
- import warnings
- from _pytest.warning_types import PYTESTER_COPY_EXAMPLE
-
- warnings.warn(PYTESTER_COPY_EXAMPLE, stacklevel=2)
- example_dir = self.request.config.getini("pytester_example_dir")
- if example_dir is None:
- raise ValueError("pytester_example_dir is unset, can't copy examples")
- example_dir = self.request.config.rootdir.join(example_dir)
-
- for extra_element in self.request.node.iter_markers("pytester_example_path"):
- assert extra_element.args
- example_dir = example_dir.join(*extra_element.args)
-
- if name is None:
- func_name = self.request.function.__name__
- maybe_dir = example_dir / func_name
- maybe_file = example_dir / (func_name + ".py")
-
- if maybe_dir.isdir():
- example_path = maybe_dir
- elif maybe_file.isfile():
- example_path = maybe_file
- else:
- raise LookupError(
- "{} cant be found as module or package in {}".format(
+
+ def mkdir(self, name):
+ """Create a new (sub)directory."""
+ return self.tmpdir.mkdir(name)
+
+ def mkpydir(self, name):
+ """Create a new python package.
+
+ This creates a (sub)directory with an empty ``__init__.py`` file so it
+ gets recognised as a python package.
+
+ """
+ p = self.mkdir(name)
+ p.ensure("__init__.py")
+ return p
+
+ def copy_example(self, name=None):
+ import warnings
+ from _pytest.warning_types import PYTESTER_COPY_EXAMPLE
+
+ warnings.warn(PYTESTER_COPY_EXAMPLE, stacklevel=2)
+ example_dir = self.request.config.getini("pytester_example_dir")
+ if example_dir is None:
+ raise ValueError("pytester_example_dir is unset, can't copy examples")
+ example_dir = self.request.config.rootdir.join(example_dir)
+
+ for extra_element in self.request.node.iter_markers("pytester_example_path"):
+ assert extra_element.args
+ example_dir = example_dir.join(*extra_element.args)
+
+ if name is None:
+ func_name = self.request.function.__name__
+ maybe_dir = example_dir / func_name
+ maybe_file = example_dir / (func_name + ".py")
+
+ if maybe_dir.isdir():
+ example_path = maybe_dir
+ elif maybe_file.isfile():
+ example_path = maybe_file
+ else:
+ raise LookupError(
+ "{} cant be found as module or package in {}".format(
func_name, example_dir.bestrelpath(self.request.config.rootdir)
- )
- )
- else:
- example_path = example_dir.join(name)
-
- if example_path.isdir() and not example_path.join("__init__.py").isfile():
- example_path.copy(self.tmpdir)
- return self.tmpdir
- elif example_path.isfile():
- result = self.tmpdir.join(example_path.basename)
- example_path.copy(result)
- return result
- else:
- raise LookupError(
- 'example "{}" is not found as a file or directory'.format(example_path)
- )
-
- Session = Session
-
- def getnode(self, config, arg):
- """Return the collection node of a file.
-
- :param config: :py:class:`_pytest.config.Config` instance, see
- :py:meth:`parseconfig` and :py:meth:`parseconfigure` to create the
- configuration
-
- :param arg: a :py:class:`py.path.local` instance of the file
-
- """
- session = Session(config)
- assert "::" not in str(arg)
- p = py.path.local(arg)
- config.hook.pytest_sessionstart(session=session)
- res = session.perform_collect([str(p)], genitems=False)[0]
- config.hook.pytest_sessionfinish(session=session, exitstatus=EXIT_OK)
- return res
-
- def getpathnode(self, path):
- """Return the collection node of a file.
-
- This is like :py:meth:`getnode` but uses :py:meth:`parseconfigure` to
- create the (configured) pytest Config instance.
-
- :param path: a :py:class:`py.path.local` instance of the file
-
- """
- config = self.parseconfigure(path)
- session = Session(config)
- x = session.fspath.bestrelpath(path)
- config.hook.pytest_sessionstart(session=session)
- res = session.perform_collect([x], genitems=False)[0]
- config.hook.pytest_sessionfinish(session=session, exitstatus=EXIT_OK)
- return res
-
- def genitems(self, colitems):
- """Generate all test items from a collection node.
-
- This recurses into the collection node and returns a list of all the
- test items contained within.
-
- """
- session = colitems[0].session
- result = []
- for colitem in colitems:
- result.extend(session.genitems(colitem))
- return result
-
- def runitem(self, source):
- """Run the "test_func" Item.
-
- The calling test instance (class containing the test method) must
- provide a ``.getrunner()`` method which should return a runner which
- can run the test protocol for a single item, e.g.
- :py:func:`_pytest.runner.runtestprotocol`.
-
- """
- # used from runner functional tests
- item = self.getitem(source)
- # the test class where we are called from wants to provide the runner
- testclassinstance = self.request.instance
- runner = testclassinstance.getrunner()
- return runner(item)
-
- def inline_runsource(self, source, *cmdlineargs):
- """Run a test module in process using ``pytest.main()``.
-
- This run writes "source" into a temporary file and runs
- ``pytest.main()`` on it, returning a :py:class:`HookRecorder` instance
- for the result.
-
- :param source: the source code of the test module
-
- :param cmdlineargs: any extra command line arguments to use
-
- :return: :py:class:`HookRecorder` instance of the result
-
- """
- p = self.makepyfile(source)
- values = list(cmdlineargs) + [p]
- return self.inline_run(*values)
-
- def inline_genitems(self, *args):
- """Run ``pytest.main(['--collectonly'])`` in-process.
-
- Runs the :py:func:`pytest.main` function to run all of pytest inside
- the test process itself like :py:meth:`inline_run`, but returns a
- tuple of the collected items and a :py:class:`HookRecorder` instance.
-
- """
- rec = self.inline_run("--collect-only", *args)
- items = [x.item for x in rec.getcalls("pytest_itemcollected")]
- return items, rec
-
- def inline_run(self, *args, **kwargs):
- """Run ``pytest.main()`` in-process, returning a HookRecorder.
-
- Runs the :py:func:`pytest.main` function to run all of pytest inside
- the test process itself. This means it can return a
- :py:class:`HookRecorder` instance which gives more detailed results
- from that run than can be done by matching stdout/stderr from
- :py:meth:`runpytest`.
-
- :param args: command line arguments to pass to :py:func:`pytest.main`
-
+ )
+ )
+ else:
+ example_path = example_dir.join(name)
+
+ if example_path.isdir() and not example_path.join("__init__.py").isfile():
+ example_path.copy(self.tmpdir)
+ return self.tmpdir
+ elif example_path.isfile():
+ result = self.tmpdir.join(example_path.basename)
+ example_path.copy(result)
+ return result
+ else:
+ raise LookupError(
+ 'example "{}" is not found as a file or directory'.format(example_path)
+ )
+
+ Session = Session
+
+ def getnode(self, config, arg):
+ """Return the collection node of a file.
+
+ :param config: :py:class:`_pytest.config.Config` instance, see
+ :py:meth:`parseconfig` and :py:meth:`parseconfigure` to create the
+ configuration
+
+ :param arg: a :py:class:`py.path.local` instance of the file
+
+ """
+ session = Session(config)
+ assert "::" not in str(arg)
+ p = py.path.local(arg)
+ config.hook.pytest_sessionstart(session=session)
+ res = session.perform_collect([str(p)], genitems=False)[0]
+ config.hook.pytest_sessionfinish(session=session, exitstatus=EXIT_OK)
+ return res
+
+ def getpathnode(self, path):
+ """Return the collection node of a file.
+
+ This is like :py:meth:`getnode` but uses :py:meth:`parseconfigure` to
+ create the (configured) pytest Config instance.
+
+ :param path: a :py:class:`py.path.local` instance of the file
+
+ """
+ config = self.parseconfigure(path)
+ session = Session(config)
+ x = session.fspath.bestrelpath(path)
+ config.hook.pytest_sessionstart(session=session)
+ res = session.perform_collect([x], genitems=False)[0]
+ config.hook.pytest_sessionfinish(session=session, exitstatus=EXIT_OK)
+ return res
+
+ def genitems(self, colitems):
+ """Generate all test items from a collection node.
+
+ This recurses into the collection node and returns a list of all the
+ test items contained within.
+
+ """
+ session = colitems[0].session
+ result = []
+ for colitem in colitems:
+ result.extend(session.genitems(colitem))
+ return result
+
+ def runitem(self, source):
+ """Run the "test_func" Item.
+
+ The calling test instance (class containing the test method) must
+ provide a ``.getrunner()`` method which should return a runner which
+ can run the test protocol for a single item, e.g.
+ :py:func:`_pytest.runner.runtestprotocol`.
+
+ """
+ # used from runner functional tests
+ item = self.getitem(source)
+ # the test class where we are called from wants to provide the runner
+ testclassinstance = self.request.instance
+ runner = testclassinstance.getrunner()
+ return runner(item)
+
+ def inline_runsource(self, source, *cmdlineargs):
+ """Run a test module in process using ``pytest.main()``.
+
+ This run writes "source" into a temporary file and runs
+ ``pytest.main()`` on it, returning a :py:class:`HookRecorder` instance
+ for the result.
+
+ :param source: the source code of the test module
+
+ :param cmdlineargs: any extra command line arguments to use
+
+ :return: :py:class:`HookRecorder` instance of the result
+
+ """
+ p = self.makepyfile(source)
+ values = list(cmdlineargs) + [p]
+ return self.inline_run(*values)
+
+ def inline_genitems(self, *args):
+ """Run ``pytest.main(['--collectonly'])`` in-process.
+
+ Runs the :py:func:`pytest.main` function to run all of pytest inside
+ the test process itself like :py:meth:`inline_run`, but returns a
+ tuple of the collected items and a :py:class:`HookRecorder` instance.
+
+ """
+ rec = self.inline_run("--collect-only", *args)
+ items = [x.item for x in rec.getcalls("pytest_itemcollected")]
+ return items, rec
+
+ def inline_run(self, *args, **kwargs):
+ """Run ``pytest.main()`` in-process, returning a HookRecorder.
+
+ Runs the :py:func:`pytest.main` function to run all of pytest inside
+ the test process itself. This means it can return a
+ :py:class:`HookRecorder` instance which gives more detailed results
+ from that run than can be done by matching stdout/stderr from
+ :py:meth:`runpytest`.
+
+ :param args: command line arguments to pass to :py:func:`pytest.main`
+
:param plugins: (keyword-only) extra plugin instances the
- ``pytest.main()`` instance should use
-
- :return: a :py:class:`HookRecorder` instance
+ ``pytest.main()`` instance should use
+
+ :return: a :py:class:`HookRecorder` instance
"""
plugins = kwargs.pop("plugins", [])
no_reraise_ctrlc = kwargs.pop("no_reraise_ctrlc", None)
raise_on_kwargs(kwargs)
-
- finalizers = []
- try:
+
+ finalizers = []
+ try:
# Do not load user config (during runs only).
mp_run = MonkeyPatch()
for k, v in self._env_run_update.items():
mp_run.setenv(k, v)
finalizers.append(mp_run.undo)
- # When running pytest inline any plugins active in the main test
- # process are already imported. So this disables the warning which
- # will trigger to say they can no longer be rewritten, which is
- # fine as they have already been rewritten.
- orig_warn = AssertionRewritingHook._warn_already_imported
-
- def revert_warn_already_imported():
- AssertionRewritingHook._warn_already_imported = orig_warn
-
- finalizers.append(revert_warn_already_imported)
- AssertionRewritingHook._warn_already_imported = lambda *a: None
-
- # Any sys.module or sys.path changes done while running pytest
- # inline should be reverted after the test run completes to avoid
- # clashing with later inline tests run within the same pytest test,
- # e.g. just because they use matching test module names.
- finalizers.append(self.__take_sys_modules_snapshot().restore)
- finalizers.append(SysPathsSnapshot().restore)
-
- # Important note:
- # - our tests should not leave any other references/registrations
- # laying around other than possibly loaded test modules
- # referenced from sys.modules, as nothing will clean those up
- # automatically
-
- rec = []
-
- class Collect(object):
- def pytest_configure(x, config):
- rec.append(self.make_hook_recorder(config.pluginmanager))
-
- plugins.append(Collect())
- ret = pytest.main(list(args), plugins=plugins)
- if len(rec) == 1:
- reprec = rec.pop()
- else:
-
- class reprec(object):
- pass
-
- reprec.ret = ret
-
- # typically we reraise keyboard interrupts from the child run
- # because it's our user requesting interruption of the testing
+ # When running pytest inline any plugins active in the main test
+ # process are already imported. So this disables the warning which
+ # will trigger to say they can no longer be rewritten, which is
+ # fine as they have already been rewritten.
+ orig_warn = AssertionRewritingHook._warn_already_imported
+
+ def revert_warn_already_imported():
+ AssertionRewritingHook._warn_already_imported = orig_warn
+
+ finalizers.append(revert_warn_already_imported)
+ AssertionRewritingHook._warn_already_imported = lambda *a: None
+
+ # Any sys.module or sys.path changes done while running pytest
+ # inline should be reverted after the test run completes to avoid
+ # clashing with later inline tests run within the same pytest test,
+ # e.g. just because they use matching test module names.
+ finalizers.append(self.__take_sys_modules_snapshot().restore)
+ finalizers.append(SysPathsSnapshot().restore)
+
+ # Important note:
+ # - our tests should not leave any other references/registrations
+ # laying around other than possibly loaded test modules
+ # referenced from sys.modules, as nothing will clean those up
+ # automatically
+
+ rec = []
+
+ class Collect(object):
+ def pytest_configure(x, config):
+ rec.append(self.make_hook_recorder(config.pluginmanager))
+
+ plugins.append(Collect())
+ ret = pytest.main(list(args), plugins=plugins)
+ if len(rec) == 1:
+ reprec = rec.pop()
+ else:
+
+ class reprec(object):
+ pass
+
+ reprec.ret = ret
+
+ # typically we reraise keyboard interrupts from the child run
+ # because it's our user requesting interruption of the testing
if ret == EXIT_INTERRUPTED and not no_reraise_ctrlc:
- calls = reprec.getcalls("pytest_keyboard_interrupt")
- if calls and calls[-1].excinfo.type == KeyboardInterrupt:
- raise KeyboardInterrupt()
- return reprec
- finally:
- for finalizer in finalizers:
- finalizer()
-
- def runpytest_inprocess(self, *args, **kwargs):
- """Return result of running pytest in-process, providing a similar
- interface to what self.runpytest() provides.
+ calls = reprec.getcalls("pytest_keyboard_interrupt")
+ if calls and calls[-1].excinfo.type == KeyboardInterrupt:
+ raise KeyboardInterrupt()
+ return reprec
+ finally:
+ for finalizer in finalizers:
+ finalizer()
+
+ def runpytest_inprocess(self, *args, **kwargs):
+ """Return result of running pytest in-process, providing a similar
+ interface to what self.runpytest() provides.
"""
syspathinsert = kwargs.pop("syspathinsert", False)
-
+
if syspathinsert:
- self.syspathinsert()
- now = time.time()
- capture = MultiCapture(Capture=SysCapture)
- capture.start_capturing()
- try:
- try:
- reprec = self.inline_run(*args, **kwargs)
- except SystemExit as e:
-
- class reprec(object):
- ret = e.args[0]
-
- except Exception:
- traceback.print_exc()
-
- class reprec(object):
- ret = 3
-
- finally:
- out, err = capture.readouterr()
- capture.stop_capturing()
- sys.stdout.write(out)
- sys.stderr.write(err)
-
- res = RunResult(reprec.ret, out.split("\n"), err.split("\n"), time.time() - now)
- res.reprec = reprec
- return res
-
- def runpytest(self, *args, **kwargs):
- """Run pytest inline or in a subprocess, depending on the command line
- option "--runpytest" and return a :py:class:`RunResult`.
-
- """
- args = self._ensure_basetemp(args)
- return self._runpytest_method(*args, **kwargs)
-
- def _ensure_basetemp(self, args):
- args = list(args)
- for x in args:
- if safe_str(x).startswith("--basetemp"):
- break
- else:
- args.append("--basetemp=%s" % self.tmpdir.dirpath("basetemp"))
- return args
-
- def parseconfig(self, *args):
- """Return a new pytest Config instance from given commandline args.
-
- This invokes the pytest bootstrapping code in _pytest.config to create
- a new :py:class:`_pytest.core.PluginManager` and call the
- pytest_cmdline_parse hook to create a new
- :py:class:`_pytest.config.Config` instance.
-
- If :py:attr:`plugins` has been populated they should be plugin modules
- to be registered with the PluginManager.
-
- """
- args = self._ensure_basetemp(args)
-
- import _pytest.config
-
- config = _pytest.config._prepareconfig(args, self.plugins)
- # we don't know what the test will do with this half-setup config
- # object and thus we make sure it gets unconfigured properly in any
- # case (otherwise capturing could still be active, for example)
- self.request.addfinalizer(config._ensure_unconfigure)
- return config
-
- def parseconfigure(self, *args):
- """Return a new pytest configured Config instance.
-
- This returns a new :py:class:`_pytest.config.Config` instance like
- :py:meth:`parseconfig`, but also calls the pytest_configure hook.
-
- """
- config = self.parseconfig(*args)
- config._do_configure()
- self.request.addfinalizer(config._ensure_unconfigure)
- return config
-
- def getitem(self, source, funcname="test_func"):
- """Return the test item for a test function.
-
- This writes the source to a python file and runs pytest's collection on
- the resulting module, returning the test item for the requested
- function name.
-
- :param source: the module source
-
- :param funcname: the name of the test function for which to return a
- test item
-
- """
- items = self.getitems(source)
- for item in items:
- if item.name == funcname:
- return item
- assert 0, "%r item not found in module:\n%s\nitems: %s" % (
- funcname,
- source,
- items,
- )
-
- def getitems(self, source):
- """Return all test items collected from the module.
-
- This writes the source to a python file and runs pytest's collection on
- the resulting module, returning all test items contained within.
-
- """
- modcol = self.getmodulecol(source)
- return self.genitems([modcol])
-
- def getmodulecol(self, source, configargs=(), withinit=False):
- """Return the module collection node for ``source``.
-
- This writes ``source`` to a file using :py:meth:`makepyfile` and then
- runs the pytest collection on it, returning the collection node for the
- test module.
-
- :param source: the source code of the module to collect
-
- :param configargs: any extra arguments to pass to
- :py:meth:`parseconfigure`
-
- :param withinit: whether to also write an ``__init__.py`` file to the
- same directory to ensure it is a package
-
- """
- if isinstance(source, Path):
- path = self.tmpdir.join(str(source))
- assert not withinit, "not supported for paths"
- else:
- kw = {self.request.function.__name__: Source(source).strip()}
- path = self.makepyfile(**kw)
- if withinit:
- self.makepyfile(__init__="#")
- self.config = config = self.parseconfigure(path, *configargs)
- return self.getnode(config, path)
-
- def collect_by_name(self, modcol, name):
- """Return the collection node for name from the module collection.
-
- This will search a module collection node for a collection node
- matching the given name.
-
- :param modcol: a module collection node; see :py:meth:`getmodulecol`
-
- :param name: the name of the node to return
-
- """
- if modcol not in self._mod_collections:
- self._mod_collections[modcol] = list(modcol.collect())
- for colitem in self._mod_collections[modcol]:
- if colitem.name == name:
- return colitem
-
+ self.syspathinsert()
+ now = time.time()
+ capture = MultiCapture(Capture=SysCapture)
+ capture.start_capturing()
+ try:
+ try:
+ reprec = self.inline_run(*args, **kwargs)
+ except SystemExit as e:
+
+ class reprec(object):
+ ret = e.args[0]
+
+ except Exception:
+ traceback.print_exc()
+
+ class reprec(object):
+ ret = 3
+
+ finally:
+ out, err = capture.readouterr()
+ capture.stop_capturing()
+ sys.stdout.write(out)
+ sys.stderr.write(err)
+
+ res = RunResult(reprec.ret, out.split("\n"), err.split("\n"), time.time() - now)
+ res.reprec = reprec
+ return res
+
+ def runpytest(self, *args, **kwargs):
+ """Run pytest inline or in a subprocess, depending on the command line
+ option "--runpytest" and return a :py:class:`RunResult`.
+
+ """
+ args = self._ensure_basetemp(args)
+ return self._runpytest_method(*args, **kwargs)
+
+ def _ensure_basetemp(self, args):
+ args = list(args)
+ for x in args:
+ if safe_str(x).startswith("--basetemp"):
+ break
+ else:
+ args.append("--basetemp=%s" % self.tmpdir.dirpath("basetemp"))
+ return args
+
+ def parseconfig(self, *args):
+ """Return a new pytest Config instance from given commandline args.
+
+ This invokes the pytest bootstrapping code in _pytest.config to create
+ a new :py:class:`_pytest.core.PluginManager` and call the
+ pytest_cmdline_parse hook to create a new
+ :py:class:`_pytest.config.Config` instance.
+
+ If :py:attr:`plugins` has been populated they should be plugin modules
+ to be registered with the PluginManager.
+
+ """
+ args = self._ensure_basetemp(args)
+
+ import _pytest.config
+
+ config = _pytest.config._prepareconfig(args, self.plugins)
+ # we don't know what the test will do with this half-setup config
+ # object and thus we make sure it gets unconfigured properly in any
+ # case (otherwise capturing could still be active, for example)
+ self.request.addfinalizer(config._ensure_unconfigure)
+ return config
+
+ def parseconfigure(self, *args):
+ """Return a new pytest configured Config instance.
+
+ This returns a new :py:class:`_pytest.config.Config` instance like
+ :py:meth:`parseconfig`, but also calls the pytest_configure hook.
+
+ """
+ config = self.parseconfig(*args)
+ config._do_configure()
+ self.request.addfinalizer(config._ensure_unconfigure)
+ return config
+
+ def getitem(self, source, funcname="test_func"):
+ """Return the test item for a test function.
+
+ This writes the source to a python file and runs pytest's collection on
+ the resulting module, returning the test item for the requested
+ function name.
+
+ :param source: the module source
+
+ :param funcname: the name of the test function for which to return a
+ test item
+
+ """
+ items = self.getitems(source)
+ for item in items:
+ if item.name == funcname:
+ return item
+ assert 0, "%r item not found in module:\n%s\nitems: %s" % (
+ funcname,
+ source,
+ items,
+ )
+
+ def getitems(self, source):
+ """Return all test items collected from the module.
+
+ This writes the source to a python file and runs pytest's collection on
+ the resulting module, returning all test items contained within.
+
+ """
+ modcol = self.getmodulecol(source)
+ return self.genitems([modcol])
+
+ def getmodulecol(self, source, configargs=(), withinit=False):
+ """Return the module collection node for ``source``.
+
+ This writes ``source`` to a file using :py:meth:`makepyfile` and then
+ runs the pytest collection on it, returning the collection node for the
+ test module.
+
+ :param source: the source code of the module to collect
+
+ :param configargs: any extra arguments to pass to
+ :py:meth:`parseconfigure`
+
+ :param withinit: whether to also write an ``__init__.py`` file to the
+ same directory to ensure it is a package
+
+ """
+ if isinstance(source, Path):
+ path = self.tmpdir.join(str(source))
+ assert not withinit, "not supported for paths"
+ else:
+ kw = {self.request.function.__name__: Source(source).strip()}
+ path = self.makepyfile(**kw)
+ if withinit:
+ self.makepyfile(__init__="#")
+ self.config = config = self.parseconfigure(path, *configargs)
+ return self.getnode(config, path)
+
+ def collect_by_name(self, modcol, name):
+ """Return the collection node for name from the module collection.
+
+ This will search a module collection node for a collection node
+ matching the given name.
+
+ :param modcol: a module collection node; see :py:meth:`getmodulecol`
+
+ :param name: the name of the node to return
+
+ """
+ if modcol not in self._mod_collections:
+ self._mod_collections[modcol] = list(modcol.collect())
+ for colitem in self._mod_collections[modcol]:
+ if colitem.name == name:
+ return colitem
+
def popen(
self,
cmdargs,
@@ -1037,377 +1037,377 @@ class Testdir(object):
stdin=CLOSE_STDIN,
**kw
):
- """Invoke subprocess.Popen.
-
- This calls subprocess.Popen making sure the current working directory
- is in the PYTHONPATH.
-
- You probably want to use :py:meth:`run` instead.
-
- """
- env = os.environ.copy()
- env["PYTHONPATH"] = os.pathsep.join(
- filter(None, [os.getcwd(), env.get("PYTHONPATH", "")])
- )
+ """Invoke subprocess.Popen.
+
+ This calls subprocess.Popen making sure the current working directory
+ is in the PYTHONPATH.
+
+ You probably want to use :py:meth:`run` instead.
+
+ """
+ env = os.environ.copy()
+ env["PYTHONPATH"] = os.pathsep.join(
+ filter(None, [os.getcwd(), env.get("PYTHONPATH", "")])
+ )
env.update(self._env_run_update)
- kw["env"] = env
-
+ kw["env"] = env
+
if stdin is Testdir.CLOSE_STDIN:
kw["stdin"] = subprocess.PIPE
elif isinstance(stdin, bytes):
kw["stdin"] = subprocess.PIPE
else:
kw["stdin"] = stdin
-
+
popen = subprocess.Popen(cmdargs, stdout=stdout, stderr=stderr, **kw)
if stdin is Testdir.CLOSE_STDIN:
popen.stdin.close()
elif isinstance(stdin, bytes):
popen.stdin.write(stdin)
- return popen
-
- def run(self, *cmdargs, **kwargs):
- """Run a command with arguments.
-
- Run a process using subprocess.Popen saving the stdout and stderr.
-
- :param args: the sequence of arguments to pass to `subprocess.Popen()`
- :param timeout: the period in seconds after which to timeout and raise
- :py:class:`Testdir.TimeoutExpired`
+ return popen
+
+ def run(self, *cmdargs, **kwargs):
+ """Run a command with arguments.
+
+ Run a process using subprocess.Popen saving the stdout and stderr.
+
+ :param args: the sequence of arguments to pass to `subprocess.Popen()`
+ :param timeout: the period in seconds after which to timeout and raise
+ :py:class:`Testdir.TimeoutExpired`
:param stdin: optional standard input. Bytes are being send, closing
the pipe, otherwise it is passed through to ``popen``.
Defaults to ``CLOSE_STDIN``, which translates to using a pipe
(``subprocess.PIPE``) that gets closed.
-
- Returns a :py:class:`RunResult`.
-
- """
- __tracebackhide__ = True
-
- timeout = kwargs.pop("timeout", None)
+
+ Returns a :py:class:`RunResult`.
+
+ """
+ __tracebackhide__ = True
+
+ timeout = kwargs.pop("timeout", None)
stdin = kwargs.pop("stdin", Testdir.CLOSE_STDIN)
- raise_on_kwargs(kwargs)
-
- cmdargs = [
- str(arg) if isinstance(arg, py.path.local) else arg for arg in cmdargs
- ]
- p1 = self.tmpdir.join("stdout")
- p2 = self.tmpdir.join("stderr")
- print("running:", *cmdargs)
- print(" in:", py.path.local())
- f1 = codecs.open(str(p1), "w", encoding="utf8")
- f2 = codecs.open(str(p2), "w", encoding="utf8")
- try:
- now = time.time()
- popen = self.popen(
+ raise_on_kwargs(kwargs)
+
+ cmdargs = [
+ str(arg) if isinstance(arg, py.path.local) else arg for arg in cmdargs
+ ]
+ p1 = self.tmpdir.join("stdout")
+ p2 = self.tmpdir.join("stderr")
+ print("running:", *cmdargs)
+ print(" in:", py.path.local())
+ f1 = codecs.open(str(p1), "w", encoding="utf8")
+ f2 = codecs.open(str(p2), "w", encoding="utf8")
+ try:
+ now = time.time()
+ popen = self.popen(
cmdargs,
stdin=stdin,
stdout=f1,
stderr=f2,
close_fds=(sys.platform != "win32"),
- )
+ )
if isinstance(stdin, bytes):
popen.stdin.close()
-
- def handle_timeout():
- __tracebackhide__ = True
-
- timeout_message = (
- "{seconds} second timeout expired running:"
- " {command}".format(seconds=timeout, command=cmdargs)
- )
-
- popen.kill()
- popen.wait()
- raise self.TimeoutExpired(timeout_message)
-
- if timeout is None:
- ret = popen.wait()
+
+ def handle_timeout():
+ __tracebackhide__ = True
+
+ timeout_message = (
+ "{seconds} second timeout expired running:"
+ " {command}".format(seconds=timeout, command=cmdargs)
+ )
+
+ popen.kill()
+ popen.wait()
+ raise self.TimeoutExpired(timeout_message)
+
+ if timeout is None:
+ ret = popen.wait()
elif not six.PY2:
- try:
- ret = popen.wait(timeout)
- except subprocess.TimeoutExpired:
- handle_timeout()
- else:
- end = time.time() + timeout
-
- resolution = min(0.1, timeout / 10)
-
- while True:
- ret = popen.poll()
- if ret is not None:
- break
-
- if time.time() > end:
- handle_timeout()
-
- time.sleep(resolution)
- finally:
- f1.close()
- f2.close()
- f1 = codecs.open(str(p1), "r", encoding="utf8")
- f2 = codecs.open(str(p2), "r", encoding="utf8")
- try:
- out = f1.read().splitlines()
- err = f2.read().splitlines()
- finally:
- f1.close()
- f2.close()
- self._dump_lines(out, sys.stdout)
- self._dump_lines(err, sys.stderr)
- return RunResult(ret, out, err, time.time() - now)
-
- def _dump_lines(self, lines, fp):
- try:
- for line in lines:
- print(line, file=fp)
- except UnicodeEncodeError:
- print("couldn't print to %s because of encoding" % (fp,))
-
- def _getpytestargs(self):
- return sys.executable, "-mpytest"
-
- def runpython(self, script):
- """Run a python script using sys.executable as interpreter.
-
- Returns a :py:class:`RunResult`.
-
- """
- return self.run(sys.executable, script)
-
- def runpython_c(self, command):
- """Run python -c "command", return a :py:class:`RunResult`."""
- return self.run(sys.executable, "-c", command)
-
- def runpytest_subprocess(self, *args, **kwargs):
- """Run pytest as a subprocess with given arguments.
-
- Any plugins added to the :py:attr:`plugins` list will be added using the
- ``-p`` command line option. Additionally ``--basetemp`` is used to put
- any temporary files and directories in a numbered directory prefixed
- with "runpytest-" to not conflict with the normal numbered pytest
- location for temporary files and directories.
-
- :param args: the sequence of arguments to pass to the pytest subprocess
- :param timeout: the period in seconds after which to timeout and raise
- :py:class:`Testdir.TimeoutExpired`
-
- Returns a :py:class:`RunResult`.
- """
- __tracebackhide__ = True
+ try:
+ ret = popen.wait(timeout)
+ except subprocess.TimeoutExpired:
+ handle_timeout()
+ else:
+ end = time.time() + timeout
+
+ resolution = min(0.1, timeout / 10)
+
+ while True:
+ ret = popen.poll()
+ if ret is not None:
+ break
+
+ if time.time() > end:
+ handle_timeout()
+
+ time.sleep(resolution)
+ finally:
+ f1.close()
+ f2.close()
+ f1 = codecs.open(str(p1), "r", encoding="utf8")
+ f2 = codecs.open(str(p2), "r", encoding="utf8")
+ try:
+ out = f1.read().splitlines()
+ err = f2.read().splitlines()
+ finally:
+ f1.close()
+ f2.close()
+ self._dump_lines(out, sys.stdout)
+ self._dump_lines(err, sys.stderr)
+ return RunResult(ret, out, err, time.time() - now)
+
+ def _dump_lines(self, lines, fp):
+ try:
+ for line in lines:
+ print(line, file=fp)
+ except UnicodeEncodeError:
+ print("couldn't print to %s because of encoding" % (fp,))
+
+ def _getpytestargs(self):
+ return sys.executable, "-mpytest"
+
+ def runpython(self, script):
+ """Run a python script using sys.executable as interpreter.
+
+ Returns a :py:class:`RunResult`.
+
+ """
+ return self.run(sys.executable, script)
+
+ def runpython_c(self, command):
+ """Run python -c "command", return a :py:class:`RunResult`."""
+ return self.run(sys.executable, "-c", command)
+
+ def runpytest_subprocess(self, *args, **kwargs):
+ """Run pytest as a subprocess with given arguments.
+
+ Any plugins added to the :py:attr:`plugins` list will be added using the
+ ``-p`` command line option. Additionally ``--basetemp`` is used to put
+ any temporary files and directories in a numbered directory prefixed
+ with "runpytest-" to not conflict with the normal numbered pytest
+ location for temporary files and directories.
+
+ :param args: the sequence of arguments to pass to the pytest subprocess
+ :param timeout: the period in seconds after which to timeout and raise
+ :py:class:`Testdir.TimeoutExpired`
+
+ Returns a :py:class:`RunResult`.
+ """
+ __tracebackhide__ = True
timeout = kwargs.pop("timeout", None)
raise_on_kwargs(kwargs)
-
- p = py.path.local.make_numbered_dir(
- prefix="runpytest-", keep=None, rootdir=self.tmpdir
- )
- args = ("--basetemp=%s" % p,) + args
- plugins = [x for x in self.plugins if isinstance(x, str)]
- if plugins:
- args = ("-p", plugins[0]) + args
- args = self._getpytestargs() + args
+
+ p = py.path.local.make_numbered_dir(
+ prefix="runpytest-", keep=None, rootdir=self.tmpdir
+ )
+ args = ("--basetemp=%s" % p,) + args
+ plugins = [x for x in self.plugins if isinstance(x, str)]
+ if plugins:
+ args = ("-p", plugins[0]) + args
+ args = self._getpytestargs() + args
return self.run(*args, timeout=timeout)
-
- def spawn_pytest(self, string, expect_timeout=10.0):
- """Run pytest using pexpect.
-
- This makes sure to use the right pytest and sets up the temporary
- directory locations.
-
- The pexpect child is returned.
-
- """
- basetemp = self.tmpdir.mkdir("temp-pexpect")
- invoke = " ".join(map(str, self._getpytestargs()))
- cmd = "%s --basetemp=%s %s" % (invoke, basetemp, string)
- return self.spawn(cmd, expect_timeout=expect_timeout)
-
- def spawn(self, cmd, expect_timeout=10.0):
- """Run a command using pexpect.
-
- The pexpect child is returned.
-
- """
- pexpect = pytest.importorskip("pexpect", "3.0")
- if hasattr(sys, "pypy_version_info") and "64" in platform.machine():
- pytest.skip("pypy-64 bit not supported")
- if sys.platform.startswith("freebsd"):
- pytest.xfail("pexpect does not work reliably on freebsd")
- logfile = self.tmpdir.join("spawn.out").open("wb")
+
+ def spawn_pytest(self, string, expect_timeout=10.0):
+ """Run pytest using pexpect.
+
+ This makes sure to use the right pytest and sets up the temporary
+ directory locations.
+
+ The pexpect child is returned.
+
+ """
+ basetemp = self.tmpdir.mkdir("temp-pexpect")
+ invoke = " ".join(map(str, self._getpytestargs()))
+ cmd = "%s --basetemp=%s %s" % (invoke, basetemp, string)
+ return self.spawn(cmd, expect_timeout=expect_timeout)
+
+ def spawn(self, cmd, expect_timeout=10.0):
+ """Run a command using pexpect.
+
+ The pexpect child is returned.
+
+ """
+ pexpect = pytest.importorskip("pexpect", "3.0")
+ if hasattr(sys, "pypy_version_info") and "64" in platform.machine():
+ pytest.skip("pypy-64 bit not supported")
+ if sys.platform.startswith("freebsd"):
+ pytest.xfail("pexpect does not work reliably on freebsd")
+ logfile = self.tmpdir.join("spawn.out").open("wb")
# Do not load user config.
env = os.environ.copy()
env.update(self._env_run_update)
child = pexpect.spawn(cmd, logfile=logfile, env=env)
- self.request.addfinalizer(logfile.close)
- child.timeout = expect_timeout
- return child
-
-
-def getdecoded(out):
- try:
- return out.decode("utf-8")
- except UnicodeDecodeError:
+ self.request.addfinalizer(logfile.close)
+ child.timeout = expect_timeout
+ return child
+
+
+def getdecoded(out):
+ try:
+ return out.decode("utf-8")
+ except UnicodeDecodeError:
return "INTERNAL not-utf8-decodeable, truncated string:\n%s" % (saferepr(out),)
-
-
-class LineComp(object):
- def __init__(self):
- self.stringio = py.io.TextIO()
-
- def assert_contains_lines(self, lines2):
- """Assert that lines2 are contained (linearly) in lines1.
-
- Return a list of extralines found.
-
- """
- __tracebackhide__ = True
- val = self.stringio.getvalue()
- self.stringio.truncate(0)
- self.stringio.seek(0)
- lines1 = val.split("\n")
- return LineMatcher(lines1).fnmatch_lines(lines2)
-
-
-class LineMatcher(object):
- """Flexible matching of text.
-
- This is a convenience class to test large texts like the output of
- commands.
-
- The constructor takes a list of lines without their trailing newlines, i.e.
- ``text.splitlines()``.
-
- """
-
- def __init__(self, lines):
- self.lines = lines
- self._log_output = []
-
- def str(self):
- """Return the entire original text."""
- return "\n".join(self.lines)
-
- def _getlines(self, lines2):
- if isinstance(lines2, str):
- lines2 = Source(lines2)
- if isinstance(lines2, Source):
- lines2 = lines2.strip().lines
- return lines2
-
- def fnmatch_lines_random(self, lines2):
- """Check lines exist in the output using in any order.
-
- Lines are checked using ``fnmatch.fnmatch``. The argument is a list of
- lines which have to occur in the output, in any order.
-
- """
- self._match_lines_random(lines2, fnmatch)
-
- def re_match_lines_random(self, lines2):
- """Check lines exist in the output using ``re.match``, in any order.
-
- The argument is a list of lines which have to occur in the output, in
- any order.
-
- """
- self._match_lines_random(lines2, lambda name, pat: re.match(pat, name))
-
- def _match_lines_random(self, lines2, match_func):
- """Check lines exist in the output.
-
- The argument is a list of lines which have to occur in the output, in
- any order. Each line can contain glob whildcards.
-
- """
- lines2 = self._getlines(lines2)
- for line in lines2:
- for x in self.lines:
- if line == x or match_func(x, line):
- self._log("matched: ", repr(line))
- break
- else:
- self._log("line %r not found in output" % line)
- raise ValueError(self._log_text)
-
- def get_lines_after(self, fnline):
- """Return all lines following the given line in the text.
-
- The given line can contain glob wildcards.
-
- """
- for i, line in enumerate(self.lines):
- if fnline == line or fnmatch(line, fnline):
- return self.lines[i + 1 :]
- raise ValueError("line %r not found in output" % fnline)
-
- def _log(self, *args):
+
+
+class LineComp(object):
+ def __init__(self):
+ self.stringio = py.io.TextIO()
+
+ def assert_contains_lines(self, lines2):
+ """Assert that lines2 are contained (linearly) in lines1.
+
+ Return a list of extralines found.
+
+ """
+ __tracebackhide__ = True
+ val = self.stringio.getvalue()
+ self.stringio.truncate(0)
+ self.stringio.seek(0)
+ lines1 = val.split("\n")
+ return LineMatcher(lines1).fnmatch_lines(lines2)
+
+
+class LineMatcher(object):
+ """Flexible matching of text.
+
+ This is a convenience class to test large texts like the output of
+ commands.
+
+ The constructor takes a list of lines without their trailing newlines, i.e.
+ ``text.splitlines()``.
+
+ """
+
+ def __init__(self, lines):
+ self.lines = lines
+ self._log_output = []
+
+ def str(self):
+ """Return the entire original text."""
+ return "\n".join(self.lines)
+
+ def _getlines(self, lines2):
+ if isinstance(lines2, str):
+ lines2 = Source(lines2)
+ if isinstance(lines2, Source):
+ lines2 = lines2.strip().lines
+ return lines2
+
+ def fnmatch_lines_random(self, lines2):
+ """Check lines exist in the output using in any order.
+
+ Lines are checked using ``fnmatch.fnmatch``. The argument is a list of
+ lines which have to occur in the output, in any order.
+
+ """
+ self._match_lines_random(lines2, fnmatch)
+
+ def re_match_lines_random(self, lines2):
+ """Check lines exist in the output using ``re.match``, in any order.
+
+ The argument is a list of lines which have to occur in the output, in
+ any order.
+
+ """
+ self._match_lines_random(lines2, lambda name, pat: re.match(pat, name))
+
+ def _match_lines_random(self, lines2, match_func):
+ """Check lines exist in the output.
+
+ The argument is a list of lines which have to occur in the output, in
+ any order. Each line can contain glob whildcards.
+
+ """
+ lines2 = self._getlines(lines2)
+ for line in lines2:
+ for x in self.lines:
+ if line == x or match_func(x, line):
+ self._log("matched: ", repr(line))
+ break
+ else:
+ self._log("line %r not found in output" % line)
+ raise ValueError(self._log_text)
+
+ def get_lines_after(self, fnline):
+ """Return all lines following the given line in the text.
+
+ The given line can contain glob wildcards.
+
+ """
+ for i, line in enumerate(self.lines):
+ if fnline == line or fnmatch(line, fnline):
+ return self.lines[i + 1 :]
+ raise ValueError("line %r not found in output" % fnline)
+
+ def _log(self, *args):
self._log_output.append(" ".join(str(x) for x in args))
-
- @property
- def _log_text(self):
- return "\n".join(self._log_output)
-
- def fnmatch_lines(self, lines2):
- """Search captured text for matching lines using ``fnmatch.fnmatch``.
-
- The argument is a list of lines which have to match and can use glob
- wildcards. If they do not match a pytest.fail() is called. The
- matches and non-matches are also printed on stdout.
-
- """
- __tracebackhide__ = True
- self._match_lines(lines2, fnmatch, "fnmatch")
-
- def re_match_lines(self, lines2):
- """Search captured text for matching lines using ``re.match``.
-
- The argument is a list of lines which have to match using ``re.match``.
- If they do not match a pytest.fail() is called.
-
- The matches and non-matches are also printed on stdout.
-
- """
- __tracebackhide__ = True
- self._match_lines(lines2, lambda name, pat: re.match(pat, name), "re.match")
-
- def _match_lines(self, lines2, match_func, match_nickname):
- """Underlying implementation of ``fnmatch_lines`` and ``re_match_lines``.
-
- :param list[str] lines2: list of string patterns to match. The actual
- format depends on ``match_func``
- :param match_func: a callable ``match_func(line, pattern)`` where line
- is the captured line from stdout/stderr and pattern is the matching
- pattern
- :param str match_nickname: the nickname for the match function that
- will be logged to stdout when a match occurs
-
- """
+
+ @property
+ def _log_text(self):
+ return "\n".join(self._log_output)
+
+ def fnmatch_lines(self, lines2):
+ """Search captured text for matching lines using ``fnmatch.fnmatch``.
+
+ The argument is a list of lines which have to match and can use glob
+ wildcards. If they do not match a pytest.fail() is called. The
+ matches and non-matches are also printed on stdout.
+
+ """
+ __tracebackhide__ = True
+ self._match_lines(lines2, fnmatch, "fnmatch")
+
+ def re_match_lines(self, lines2):
+ """Search captured text for matching lines using ``re.match``.
+
+ The argument is a list of lines which have to match using ``re.match``.
+ If they do not match a pytest.fail() is called.
+
+ The matches and non-matches are also printed on stdout.
+
+ """
+ __tracebackhide__ = True
+ self._match_lines(lines2, lambda name, pat: re.match(pat, name), "re.match")
+
+ def _match_lines(self, lines2, match_func, match_nickname):
+ """Underlying implementation of ``fnmatch_lines`` and ``re_match_lines``.
+
+ :param list[str] lines2: list of string patterns to match. The actual
+ format depends on ``match_func``
+ :param match_func: a callable ``match_func(line, pattern)`` where line
+ is the captured line from stdout/stderr and pattern is the matching
+ pattern
+ :param str match_nickname: the nickname for the match function that
+ will be logged to stdout when a match occurs
+
+ """
assert isinstance(lines2, Sequence)
- lines2 = self._getlines(lines2)
- lines1 = self.lines[:]
- nextline = None
- extralines = []
- __tracebackhide__ = True
- for line in lines2:
- nomatchprinted = False
- while lines1:
- nextline = lines1.pop(0)
- if line == nextline:
- self._log("exact match:", repr(line))
- break
- elif match_func(nextline, line):
- self._log("%s:" % match_nickname, repr(line))
- self._log(" with:", repr(nextline))
- break
- else:
- if not nomatchprinted:
- self._log("nomatch:", repr(line))
- nomatchprinted = True
- self._log(" and:", repr(nextline))
- extralines.append(nextline)
- else:
- self._log("remains unmatched: %r" % (line,))
- pytest.fail(self._log_text)
+ lines2 = self._getlines(lines2)
+ lines1 = self.lines[:]
+ nextline = None
+ extralines = []
+ __tracebackhide__ = True
+ for line in lines2:
+ nomatchprinted = False
+ while lines1:
+ nextline = lines1.pop(0)
+ if line == nextline:
+ self._log("exact match:", repr(line))
+ break
+ elif match_func(nextline, line):
+ self._log("%s:" % match_nickname, repr(line))
+ self._log(" with:", repr(nextline))
+ break
+ else:
+ if not nomatchprinted:
+ self._log("nomatch:", repr(line))
+ nomatchprinted = True
+ self._log(" and:", repr(nextline))
+ extralines.append(nextline)
+ else:
+ self._log("remains unmatched: %r" % (line,))
+ pytest.fail(self._log_text)