diff options
author | deshevoy <deshevoy@yandex-team.ru> | 2022-02-10 16:46:56 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:46:56 +0300 |
commit | e988f30484abe5fdeedcc7a5d3c226c01a21800c (patch) | |
tree | 0a217b173aabb57b7e51f8a169989b1a3e0309fe /contrib/python/pytest/py3/_pytest/capture.py | |
parent | 33ee501c05d3f24036ae89766a858930ae66c548 (diff) | |
download | ydb-e988f30484abe5fdeedcc7a5d3c226c01a21800c.tar.gz |
Restoring authorship annotation for <deshevoy@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/python/pytest/py3/_pytest/capture.py')
-rw-r--r-- | contrib/python/pytest/py3/_pytest/capture.py | 402 |
1 files changed, 201 insertions, 201 deletions
diff --git a/contrib/python/pytest/py3/_pytest/capture.py b/contrib/python/pytest/py3/_pytest/capture.py index 086302658c..a94bebe8b1 100644 --- a/contrib/python/pytest/py3/_pytest/capture.py +++ b/contrib/python/pytest/py3/_pytest/capture.py @@ -1,11 +1,11 @@ """Per-test stdout/stderr capturing mechanism.""" -import contextlib +import contextlib import functools -import io -import os -import sys -from io import UnsupportedOperation -from tempfile import TemporaryFile +import io +import os +import sys +from io import UnsupportedOperation +from tempfile import TemporaryFile from typing import Any from typing import AnyStr from typing import Generator @@ -16,7 +16,7 @@ from typing import TextIO from typing import Tuple from typing import TYPE_CHECKING from typing import Union - + from _pytest.compat import final from _pytest.config import Config from _pytest.config import hookimpl @@ -27,32 +27,32 @@ from _pytest.fixtures import SubRequest from _pytest.nodes import Collector from _pytest.nodes import File from _pytest.nodes import Item - + if TYPE_CHECKING: from typing_extensions import Literal _CaptureMethod = Literal["fd", "sys", "no", "tee-sys"] - + def pytest_addoption(parser: Parser) -> None: - group = parser.getgroup("general") - group._addoption( - "--capture", - action="store", + group = parser.getgroup("general") + group._addoption( + "--capture", + action="store", default="fd", - metavar="method", + metavar="method", choices=["fd", "sys", "no", "tee-sys"], help="per-test capturing method: one of fd|sys|no|tee-sys.", - ) - group._addoption( - "-s", - action="store_const", - const="no", - dest="capture", - help="shortcut for --capture=no.", - ) - - + ) + group._addoption( + "-s", + action="store_const", + const="no", + dest="capture", + help="shortcut for --capture=no.", + ) + + def _colorama_workaround() -> None: """Ensure colorama is imported so that it attaches to the correct stdio handles on Windows. @@ -150,46 +150,46 @@ def _py36_windowsconsoleio_workaround(stream: TextIO) -> None: @hookimpl(hookwrapper=True) def pytest_load_initial_conftests(early_config: Config): - ns = early_config.known_args_namespace - if ns.capture == "fd": - _py36_windowsconsoleio_workaround(sys.stdout) - _colorama_workaround() - _readline_workaround() - pluginmanager = early_config.pluginmanager - capman = CaptureManager(ns.capture) - pluginmanager.register(capman, "capturemanager") - + ns = early_config.known_args_namespace + if ns.capture == "fd": + _py36_windowsconsoleio_workaround(sys.stdout) + _colorama_workaround() + _readline_workaround() + pluginmanager = early_config.pluginmanager + capman = CaptureManager(ns.capture) + pluginmanager.register(capman, "capturemanager") + # Make sure that capturemanager is properly reset at final shutdown. - early_config.add_cleanup(capman.stop_global_capturing) - + early_config.add_cleanup(capman.stop_global_capturing) + # Finally trigger conftest loading but while capturing (issue #93). - capman.start_global_capturing() - outcome = yield - capman.suspend_global_capture() - if outcome.excinfo is not None: - out, err = capman.read_global_capture() - sys.stdout.write(out) - sys.stderr.write(err) - - + capman.start_global_capturing() + outcome = yield + capman.suspend_global_capture() + if outcome.excinfo is not None: + out, err = capman.read_global_capture() + sys.stdout.write(out) + sys.stderr.write(err) + + # IO Helpers. class EncodedFile(io.TextIOWrapper): __slots__ = () - + @property def name(self) -> str: # Ensure that file.name is a string. Workaround for a Python bug # fixed in >=3.7.4: https://bugs.python.org/issue36015 return repr(self.buffer) - + @property def mode(self) -> str: # TextIOWrapper doesn't expose a mode, but at least some of our # tests check it. return self.buffer.mode.replace("b", "") - + class CaptureIO(io.TextIOWrapper): def __init__(self) -> None: @@ -224,36 +224,36 @@ class DontReadFromInput: def __iter__(self): return self - + def fileno(self) -> int: raise UnsupportedOperation("redirected stdin is pseudofile, has no fileno()") - + def isatty(self) -> bool: return False - + def close(self) -> None: pass - + @property def buffer(self): return self - - + + # Capture classes. patchsysdict = {0: "stdin", 1: "stdout", 2: "stderr"} - - + + class NoCapture: EMPTY_BUFFER = None __init__ = start = done = suspend = resume = lambda *args: None class SysCaptureBinary: - + EMPTY_BUFFER = b"" - + def __init__(self, fd: int, tmpfile=None, *, tee: bool = False) -> None: name = patchsysdict[fd] self._old = getattr(sys, name) @@ -265,7 +265,7 @@ class SysCaptureBinary: tmpfile = CaptureIO() if not tee else TeeCaptureIO(self._old) self.tmpfile = tmpfile self._state = "initialized" - + def repr(self, class_name: str) -> str: return "<{} {} _old={} _state={!r} tmpfile={!r}>".format( class_name, @@ -274,7 +274,7 @@ class SysCaptureBinary: self._state, self.tmpfile, ) - + def __repr__(self) -> str: return "<{} {} _old={} _state={!r} tmpfile={!r}>".format( self.__class__.__name__, @@ -283,19 +283,19 @@ class SysCaptureBinary: self._state, self.tmpfile, ) - + def _assert_state(self, op: str, states: Tuple[str, ...]) -> None: assert ( self._state in states ), "cannot {} in state {!r}: expected one of {}".format( op, self._state, ", ".join(states) ) - + def start(self) -> None: self._assert_state("start", ("initialized",)) setattr(sys, self.name, self.tmpfile) self._state = "started" - + def snap(self): self._assert_state("snap", ("started", "suspended")) self.tmpfile.seek(0) @@ -303,7 +303,7 @@ class SysCaptureBinary: self.tmpfile.seek(0) self.tmpfile.truncate() return res - + def done(self) -> None: self._assert_state("done", ("initialized", "started", "suspended", "done")) if self._state == "done": @@ -312,49 +312,49 @@ class SysCaptureBinary: del self._old self.tmpfile.close() self._state = "done" - + def suspend(self) -> None: self._assert_state("suspend", ("started", "suspended")) setattr(sys, self.name, self._old) self._state = "suspended" - + def resume(self) -> None: self._assert_state("resume", ("started", "suspended")) if self._state == "started": return setattr(sys, self.name, self.tmpfile) self._state = "started" - + def writeorg(self, data) -> None: self._assert_state("writeorg", ("started", "suspended")) self._old.flush() self._old.buffer.write(data) self._old.buffer.flush() - - + + class SysCapture(SysCaptureBinary): EMPTY_BUFFER = "" # type: ignore[assignment] - + def snap(self): res = self.tmpfile.getvalue() self.tmpfile.seek(0) self.tmpfile.truncate() return res - + def writeorg(self, data): self._assert_state("writeorg", ("started", "suspended")) self._old.write(data) self._old.flush() - + class FDCaptureBinary: """Capture IO to/from a given OS-level file descriptor. snap() produces `bytes`. - """ - + """ + EMPTY_BUFFER = b"" - + def __init__(self, targetfd: int) -> None: self.targetfd = targetfd @@ -377,7 +377,7 @@ class FDCaptureBinary: else: self.targetfd_invalid = None self.targetfd_save = os.dup(targetfd) - + if targetfd == 0: self.tmpfile = open(os.devnull) self.syscapture = SysCapture(targetfd) @@ -393,7 +393,7 @@ class FDCaptureBinary: self.syscapture = SysCapture(targetfd, self.tmpfile) else: self.syscapture = NoCapture() - + self._state = "initialized" def __repr__(self) -> str: @@ -403,15 +403,15 @@ class FDCaptureBinary: self.targetfd_save, self._state, self.tmpfile, - ) - + ) + def _assert_state(self, op: str, states: Tuple[str, ...]) -> None: assert ( self._state in states ), "cannot {} in state {!r}: expected one of {}".format( op, self._state, ", ".join(states) ) - + def start(self) -> None: """Start capturing on targetfd using memorized tmpfile.""" self._assert_state("start", ("initialized",)) @@ -426,7 +426,7 @@ class FDCaptureBinary: self.tmpfile.seek(0) self.tmpfile.truncate() return res - + def done(self) -> None: """Stop capturing, restore streams, return original capture file, seeked to position zero.""" @@ -442,7 +442,7 @@ class FDCaptureBinary: self.syscapture.done() self.tmpfile.close() self._state = "done" - + def suspend(self) -> None: self._assert_state("suspend", ("started", "suspended")) if self._state == "suspended": @@ -469,11 +469,11 @@ class FDCapture(FDCaptureBinary): """Capture IO to/from a given OS-level file descriptor. snap() produces text. - """ - + """ + # Ignore type because it doesn't match the type in the superclass (bytes). EMPTY_BUFFER = "" # type: ignore - + def snap(self): self._assert_state("snap", ("started", "suspended")) self.tmpfile.seek(0) @@ -481,15 +481,15 @@ class FDCapture(FDCaptureBinary): self.tmpfile.seek(0) self.tmpfile.truncate() return res - + def writeorg(self, data): """Write to original file descriptor.""" super().writeorg(data.encode("utf-8")) # XXX use encoding of original stream - - + + # MultiCapture - - + + # This class was a namedtuple, but due to mypy limitation[0] it could not be # made generic, so was replaced by a regular class which tries to emulate the # pertinent parts of a namedtuple. If the mypy limitation is ever lifted, can @@ -499,40 +499,40 @@ class FDCapture(FDCaptureBinary): @functools.total_ordering class CaptureResult(Generic[AnyStr]): """The result of :method:`CaptureFixture.readouterr`.""" - + __slots__ = ("out", "err") - + def __init__(self, out: AnyStr, err: AnyStr) -> None: self.out: AnyStr = out self.err: AnyStr = err - + def __len__(self) -> int: return 2 - + def __iter__(self) -> Iterator[AnyStr]: return iter((self.out, self.err)) - + def __getitem__(self, item: int) -> AnyStr: return tuple(self)[item] - + def _replace( self, *, out: Optional[AnyStr] = None, err: Optional[AnyStr] = None ) -> "CaptureResult[AnyStr]": return CaptureResult( out=self.out if out is None else out, err=self.err if err is None else err ) - + def count(self, value: AnyStr) -> int: return tuple(self).count(value) - + def index(self, value) -> int: return tuple(self).index(value) - + def __eq__(self, other: object) -> bool: if not isinstance(other, (CaptureResult, tuple)): return NotImplemented return tuple(self) == tuple(other) - + def __hash__(self) -> int: return hash(tuple(self)) @@ -540,20 +540,20 @@ class CaptureResult(Generic[AnyStr]): if not isinstance(other, (CaptureResult, tuple)): return NotImplemented return tuple(self) < tuple(other) - + def __repr__(self) -> str: return f"CaptureResult(out={self.out!r}, err={self.err!r})" - - + + class MultiCapture(Generic[AnyStr]): _state = None _in_suspended = False - + def __init__(self, in_, out, err) -> None: self.in_ = in_ self.out = out self.err = err - + def __repr__(self) -> str: return "<MultiCapture out={!r} err={!r} in_={!r} _state={!r} _in_suspended={!r}>".format( self.out, self.err, self.in_, self._state, self._in_suspended, @@ -561,54 +561,54 @@ class MultiCapture(Generic[AnyStr]): def start_capturing(self) -> None: self._state = "started" - if self.in_: - self.in_.start() - if self.out: - self.out.start() - if self.err: - self.err.start() - + if self.in_: + self.in_.start() + if self.out: + self.out.start() + if self.err: + self.err.start() + def pop_outerr_to_orig(self) -> Tuple[AnyStr, AnyStr]: """Pop current snapshot out/err capture and flush to orig streams.""" - out, err = self.readouterr() - if out: - self.out.writeorg(out) - if err: - self.err.writeorg(err) - return out, err - + out, err = self.readouterr() + if out: + self.out.writeorg(out) + if err: + self.err.writeorg(err) + return out, err + def suspend_capturing(self, in_: bool = False) -> None: self._state = "suspended" - if self.out: - self.out.suspend() - if self.err: - self.err.suspend() - if in_ and self.in_: - self.in_.suspend() - self._in_suspended = True - + if self.out: + self.out.suspend() + if self.err: + self.err.suspend() + if in_ and self.in_: + self.in_.suspend() + self._in_suspended = True + def resume_capturing(self) -> None: self._state = "started" - if self.out: - self.out.resume() - if self.err: - self.err.resume() + if self.out: + self.out.resume() + if self.err: + self.err.resume() if self._in_suspended: - self.in_.resume() + self.in_.resume() self._in_suspended = False - + def stop_capturing(self) -> None: """Stop capturing and reset capturing streams.""" if self._state == "stopped": - raise ValueError("was already stopped") + raise ValueError("was already stopped") self._state = "stopped" - if self.out: - self.out.done() - if self.err: - self.err.done() - if self.in_: - self.in_.done() - + if self.out: + self.out.done() + if self.err: + self.err.done() + if self.in_: + self.in_.done() + def is_started(self) -> bool: """Whether actively capturing -- not suspended or stopped.""" return self._state == "started" @@ -623,8 +623,8 @@ class MultiCapture(Generic[AnyStr]): else: err = "" return CaptureResult(out, err) - - + + def _get_multicapture(method: "_CaptureMethod") -> MultiCapture[str]: if method == "fd": return MultiCapture(in_=FDCapture(0), out=FDCapture(1), err=FDCapture(2)) @@ -637,88 +637,88 @@ def _get_multicapture(method: "_CaptureMethod") -> MultiCapture[str]: in_=None, out=SysCapture(1, tee=True), err=SysCapture(2, tee=True) ) raise ValueError(f"unknown capturing method: {method!r}") - - + + # CaptureManager and CaptureFixture - - + + class CaptureManager: """The capture plugin. - + Manages that the appropriate capture method is enabled/disabled during collection and each test phase (setup, call, teardown). After each of those points, the captured output is obtained and attached to the collection/runtest report. - + There are two levels of capture: - + * global: enabled by default and can be suppressed by the ``-s`` option. This is always enabled/disabled during collection and each test phase. - + * fixture: when a test function or one of its fixture depend on the ``capsys`` or ``capfd`` fixtures. In this case special handling is needed to ensure the fixtures take precedence over the global capture. """ - + def __init__(self, method: "_CaptureMethod") -> None: self._method = method self._global_capturing: Optional[MultiCapture[str]] = None self._capture_fixture: Optional[CaptureFixture[Any]] = None - + def __repr__(self) -> str: return "<CaptureManager _method={!r} _global_capturing={!r} _capture_fixture={!r}>".format( self._method, self._global_capturing, self._capture_fixture ) - + def is_capturing(self) -> Union[str, bool]: if self.is_globally_capturing(): return "global" if self._capture_fixture: return "fixture %s" % self._capture_fixture.request.fixturename return False - + # Global capturing control - + def is_globally_capturing(self) -> bool: return self._method != "no" - + def start_global_capturing(self) -> None: assert self._global_capturing is None self._global_capturing = _get_multicapture(self._method) self._global_capturing.start_capturing() - + def stop_global_capturing(self) -> None: if self._global_capturing is not None: self._global_capturing.pop_outerr_to_orig() self._global_capturing.stop_capturing() self._global_capturing = None - + def resume_global_capture(self) -> None: # During teardown of the python process, and on rare occasions, capture # attributes can be `None` while trying to resume global capture. if self._global_capturing is not None: self._global_capturing.resume_capturing() - + def suspend_global_capture(self, in_: bool = False) -> None: if self._global_capturing is not None: self._global_capturing.suspend_capturing(in_=in_) - + def suspend(self, in_: bool = False) -> None: # Need to undo local capsys-et-al if it exists before disabling global capture. self.suspend_fixture() self.suspend_global_capture(in_) - + def resume(self) -> None: self.resume_global_capture() self.resume_fixture() - + def read_global_capture(self) -> CaptureResult[str]: assert self._global_capturing is not None return self._global_capturing.readouterr() # Fixture Control - + def set_fixture(self, capture_fixture: "CaptureFixture[Any]") -> None: if self._capture_fixture: current_fixture = self._capture_fixture.request.fixturename @@ -729,7 +729,7 @@ class CaptureManager: ) ) self._capture_fixture = capture_fixture - + def unset_fixture(self) -> None: self._capture_fixture = None @@ -738,22 +738,22 @@ class CaptureManager: them so they take precedence over the global capture.""" if self._capture_fixture: self._capture_fixture._start() - + def deactivate_fixture(self) -> None: """Deactivate the ``capsys`` or ``capfd`` fixture of this item, if any.""" if self._capture_fixture: self._capture_fixture.close() - + def suspend_fixture(self) -> None: if self._capture_fixture: self._capture_fixture._suspend() - + def resume_fixture(self) -> None: if self._capture_fixture: self._capture_fixture._resume() - + # Helper context managers - + @contextlib.contextmanager def global_and_fixture_disabled(self) -> Generator[None, None, None]: """Context manager to temporarily disable global and current fixture capturing.""" @@ -770,7 +770,7 @@ class CaptureManager: self.resume_global_capture() if do_fixture: self.resume_fixture() - + @contextlib.contextmanager def item_capture(self, when: str, item: Item) -> Generator[None, None, None]: self.resume_global_capture() @@ -780,13 +780,13 @@ class CaptureManager: finally: self.deactivate_fixture() self.suspend_global_capture(in_=False) - + out, err = self.read_global_capture() item.add_report_section(when, "stdout", out) item.add_report_section(when, "stderr", err) - + # Hooks - + @hookimpl(hookwrapper=True) def pytest_make_collect_report(self, collector: Collector): if isinstance(collector, File): @@ -801,12 +801,12 @@ class CaptureManager: rep.sections.append(("Captured stderr", err)) else: yield - + @hookimpl(hookwrapper=True) def pytest_runtest_setup(self, item: Item) -> Generator[None, None, None]: with self.item_capture("setup", item): yield - + @hookimpl(hookwrapper=True) def pytest_runtest_call(self, item: Item) -> Generator[None, None, None]: with self.item_capture("call", item): @@ -825,11 +825,11 @@ class CaptureManager: def pytest_internalerror(self) -> None: self.stop_global_capturing() - + class CaptureFixture(Generic[AnyStr]): """Object returned by the :fixture:`capsys`, :fixture:`capsysbinary`, :fixture:`capfd` and :fixture:`capfdbinary` fixtures.""" - + def __init__( self, captureclass, request: SubRequest, *, _ispytest: bool = False ) -> None: @@ -839,14 +839,14 @@ class CaptureFixture(Generic[AnyStr]): self._capture: Optional[MultiCapture[AnyStr]] = None self._captured_out = self.captureclass.EMPTY_BUFFER self._captured_err = self.captureclass.EMPTY_BUFFER - + def _start(self) -> None: if self._capture is None: self._capture = MultiCapture( in_=None, out=self.captureclass(1), err=self.captureclass(2), ) self._capture.start_capturing() - + def close(self) -> None: if self._capture is not None: out, err = self._capture.pop_outerr_to_orig() @@ -854,11 +854,11 @@ class CaptureFixture(Generic[AnyStr]): self._captured_err += err self._capture.stop_capturing() self._capture = None - + def readouterr(self) -> CaptureResult[AnyStr]: """Read and return the captured output so far, resetting the internal buffer. - + :returns: The captured content as a namedtuple with ``out`` and ``err`` string attributes. @@ -871,42 +871,42 @@ class CaptureFixture(Generic[AnyStr]): self._captured_out = self.captureclass.EMPTY_BUFFER self._captured_err = self.captureclass.EMPTY_BUFFER return CaptureResult(captured_out, captured_err) - + def _suspend(self) -> None: """Suspend this fixture's own capturing temporarily.""" if self._capture is not None: self._capture.suspend_capturing() - + def _resume(self) -> None: """Resume this fixture's own capturing temporarily.""" if self._capture is not None: self._capture.resume_capturing() - + def _is_started(self) -> bool: """Whether actively capturing -- not disabled or closed.""" if self._capture is not None: return self._capture.is_started() return False - + @contextlib.contextmanager def disabled(self) -> Generator[None, None, None]: """Temporarily disable capturing while inside the ``with`` block.""" capmanager = self.request.config.pluginmanager.getplugin("capturemanager") with capmanager.global_and_fixture_disabled(): yield - - + + # The fixtures. - - + + @fixture def capsys(request: SubRequest) -> Generator[CaptureFixture[str], None, None]: """Enable text capturing of writes to ``sys.stdout`` and ``sys.stderr``. - + The captured output is made available via ``capsys.readouterr()`` method calls, which return a ``(out, err)`` namedtuple. ``out`` and ``err`` will be ``text`` objects. - """ + """ capman = request.config.pluginmanager.getplugin("capturemanager") capture_fixture = CaptureFixture[str](SysCapture, request, _ispytest=True) capman.set_fixture(capture_fixture) @@ -914,8 +914,8 @@ def capsys(request: SubRequest) -> Generator[CaptureFixture[str], None, None]: yield capture_fixture capture_fixture.close() capman.unset_fixture() - - + + @fixture def capsysbinary(request: SubRequest) -> Generator[CaptureFixture[bytes], None, None]: """Enable bytes capturing of writes to ``sys.stdout`` and ``sys.stderr``. @@ -923,7 +923,7 @@ def capsysbinary(request: SubRequest) -> Generator[CaptureFixture[bytes], None, The captured output is made available via ``capsysbinary.readouterr()`` method calls, which return a ``(out, err)`` namedtuple. ``out`` and ``err`` will be ``bytes`` objects. - """ + """ capman = request.config.pluginmanager.getplugin("capturemanager") capture_fixture = CaptureFixture[bytes](SysCaptureBinary, request, _ispytest=True) capman.set_fixture(capture_fixture) @@ -931,16 +931,16 @@ def capsysbinary(request: SubRequest) -> Generator[CaptureFixture[bytes], None, yield capture_fixture capture_fixture.close() capman.unset_fixture() - - + + @fixture def capfd(request: SubRequest) -> Generator[CaptureFixture[str], None, None]: """Enable text capturing of writes to file descriptors ``1`` and ``2``. - + The captured output is made available via ``capfd.readouterr()`` method calls, which return a ``(out, err)`` namedtuple. ``out`` and ``err`` will be ``text`` objects. - """ + """ capman = request.config.pluginmanager.getplugin("capturemanager") capture_fixture = CaptureFixture[str](FDCapture, request, _ispytest=True) capman.set_fixture(capture_fixture) @@ -948,12 +948,12 @@ def capfd(request: SubRequest) -> Generator[CaptureFixture[str], None, None]: yield capture_fixture capture_fixture.close() capman.unset_fixture() - - + + @fixture def capfdbinary(request: SubRequest) -> Generator[CaptureFixture[bytes], None, None]: """Enable bytes capturing of writes to file descriptors ``1`` and ``2``. - + The captured output is made available via ``capfd.readouterr()`` method calls, which return a ``(out, err)`` namedtuple. ``out`` and ``err`` will be ``byte`` objects. |