diff options
author | shadchin <shadchin@yandex-team.ru> | 2022-02-10 16:44:30 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:44:30 +0300 |
commit | 2598ef1d0aee359b4b6d5fdd1758916d5907d04f (patch) | |
tree | 012bb94d777798f1f56ac1cec429509766d05181 /contrib/python/pytest/py3/_pytest/capture.py | |
parent | 6751af0b0c1b952fede40b19b71da8025b5d8bcf (diff) | |
download | ydb-2598ef1d0aee359b4b6d5fdd1758916d5907d04f.tar.gz |
Restoring authorship annotation for <shadchin@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/python/pytest/py3/_pytest/capture.py')
-rw-r--r-- | contrib/python/pytest/py3/_pytest/capture.py | 1720 |
1 files changed, 860 insertions, 860 deletions
diff --git a/contrib/python/pytest/py3/_pytest/capture.py b/contrib/python/pytest/py3/_pytest/capture.py index 086302658c..c233058585 100644 --- a/contrib/python/pytest/py3/_pytest/capture.py +++ b/contrib/python/pytest/py3/_pytest/capture.py @@ -1,48 +1,48 @@ -"""Per-test stdout/stderr capturing mechanism.""" +"""Per-test stdout/stderr capturing mechanism.""" import contextlib -import functools +import functools import io import os import sys from io import UnsupportedOperation from tempfile import TemporaryFile -from typing import Any -from typing import AnyStr -from typing import Generator -from typing import Generic -from typing import Iterator -from typing import Optional -from typing import TextIO -from typing import Tuple -from typing import TYPE_CHECKING -from typing import Union - -from _pytest.compat import final -from _pytest.config import Config -from _pytest.config import hookimpl -from _pytest.config.argparsing import Parser -from _pytest.deprecated import check_ispytest -from _pytest.fixtures import fixture -from _pytest.fixtures import SubRequest -from _pytest.nodes import Collector -from _pytest.nodes import File -from _pytest.nodes import Item - -if TYPE_CHECKING: - from typing_extensions import Literal - - _CaptureMethod = Literal["fd", "sys", "no", "tee-sys"] - - -def pytest_addoption(parser: Parser) -> None: +from typing import Any +from typing import AnyStr +from typing import Generator +from typing import Generic +from typing import Iterator +from typing import Optional +from typing import TextIO +from typing import Tuple +from typing import TYPE_CHECKING +from typing import Union + +from _pytest.compat import final +from _pytest.config import Config +from _pytest.config import hookimpl +from _pytest.config.argparsing import Parser +from _pytest.deprecated import check_ispytest +from _pytest.fixtures import fixture +from _pytest.fixtures import SubRequest +from _pytest.nodes import Collector +from _pytest.nodes import File +from _pytest.nodes import Item + +if TYPE_CHECKING: + from typing_extensions import Literal + + _CaptureMethod = Literal["fd", "sys", "no", "tee-sys"] + + +def pytest_addoption(parser: Parser) -> None: group = parser.getgroup("general") group._addoption( "--capture", action="store", - default="fd", + default="fd", metavar="method", - choices=["fd", "sys", "no", "tee-sys"], - help="per-test capturing method: one of fd|sys|no|tee-sys.", + choices=["fd", "sys", "no", "tee-sys"], + help="per-test capturing method: one of fd|sys|no|tee-sys.", ) group._addoption( "-s", @@ -53,103 +53,103 @@ def pytest_addoption(parser: Parser) -> None: ) -def _colorama_workaround() -> None: - """Ensure colorama is imported so that it attaches to the correct stdio - handles on Windows. - - colorama uses the terminal on import time. So if something does the - first import of colorama while I/O capture is active, colorama will - fail in various ways. - """ - if sys.platform.startswith("win32"): - try: - import colorama # noqa: F401 - except ImportError: - pass - - -def _readline_workaround() -> None: - """Ensure readline is imported so that it attaches to the correct stdio - handles on Windows. - - Pdb uses readline support where available--when not running from the Python - prompt, the readline module is not imported until running the pdb REPL. If - running pytest with the --pdb option this means the readline module is not - imported until after I/O capture has been started. - - This is a problem for pyreadline, which is often used to implement readline - support on Windows, as it does not attach to the correct handles for stdout - and/or stdin if they have been redirected by the FDCapture mechanism. This - workaround ensures that readline is imported before I/O capture is setup so - that it can attach to the actual stdin/out for the console. - - See https://github.com/pytest-dev/pytest/pull/1281. - """ - if sys.platform.startswith("win32"): - try: - import readline # noqa: F401 - except ImportError: - pass - - -def _py36_windowsconsoleio_workaround(stream: TextIO) -> None: - """Workaround for Windows Unicode console handling on Python>=3.6. - - Python 3.6 implemented Unicode console handling for Windows. This works - by reading/writing to the raw console handle using - ``{Read,Write}ConsoleW``. - - The problem is that we are going to ``dup2`` over the stdio file - descriptors when doing ``FDCapture`` and this will ``CloseHandle`` the - handles used by Python to write to the console. Though there is still some - weirdness and the console handle seems to only be closed randomly and not - on the first call to ``CloseHandle``, or maybe it gets reopened with the - same handle value when we suspend capturing. - - The workaround in this case will reopen stdio with a different fd which - also means a different handle by replicating the logic in - "Py_lifecycle.c:initstdio/create_stdio". - - :param stream: - In practice ``sys.stdout`` or ``sys.stderr``, but given - here as parameter for unittesting purposes. - - See https://github.com/pytest-dev/py/issues/103. - """ - if not sys.platform.startswith("win32") or hasattr(sys, "pypy_version_info"): - return - - # Bail out if ``stream`` doesn't seem like a proper ``io`` stream (#2666). - if not hasattr(stream, "buffer"): # type: ignore[unreachable] - return - - buffered = hasattr(stream.buffer, "raw") - raw_stdout = stream.buffer.raw if buffered else stream.buffer # type: ignore[attr-defined] - - if not isinstance(raw_stdout, io._WindowsConsoleIO): # type: ignore[attr-defined] - return - - def _reopen_stdio(f, mode): - if not buffered and mode[0] == "w": - buffering = 0 - else: - buffering = -1 - - return io.TextIOWrapper( - open(os.dup(f.fileno()), mode, buffering), # type: ignore[arg-type] - f.encoding, - f.errors, - f.newlines, - f.line_buffering, - ) - - sys.stdin = _reopen_stdio(sys.stdin, "rb") - sys.stdout = _reopen_stdio(sys.stdout, "wb") - sys.stderr = _reopen_stdio(sys.stderr, "wb") - - -@hookimpl(hookwrapper=True) -def pytest_load_initial_conftests(early_config: Config): +def _colorama_workaround() -> None: + """Ensure colorama is imported so that it attaches to the correct stdio + handles on Windows. + + colorama uses the terminal on import time. So if something does the + first import of colorama while I/O capture is active, colorama will + fail in various ways. + """ + if sys.platform.startswith("win32"): + try: + import colorama # noqa: F401 + except ImportError: + pass + + +def _readline_workaround() -> None: + """Ensure readline is imported so that it attaches to the correct stdio + handles on Windows. + + Pdb uses readline support where available--when not running from the Python + prompt, the readline module is not imported until running the pdb REPL. If + running pytest with the --pdb option this means the readline module is not + imported until after I/O capture has been started. + + This is a problem for pyreadline, which is often used to implement readline + support on Windows, as it does not attach to the correct handles for stdout + and/or stdin if they have been redirected by the FDCapture mechanism. This + workaround ensures that readline is imported before I/O capture is setup so + that it can attach to the actual stdin/out for the console. + + See https://github.com/pytest-dev/pytest/pull/1281. + """ + if sys.platform.startswith("win32"): + try: + import readline # noqa: F401 + except ImportError: + pass + + +def _py36_windowsconsoleio_workaround(stream: TextIO) -> None: + """Workaround for Windows Unicode console handling on Python>=3.6. + + Python 3.6 implemented Unicode console handling for Windows. This works + by reading/writing to the raw console handle using + ``{Read,Write}ConsoleW``. + + The problem is that we are going to ``dup2`` over the stdio file + descriptors when doing ``FDCapture`` and this will ``CloseHandle`` the + handles used by Python to write to the console. Though there is still some + weirdness and the console handle seems to only be closed randomly and not + on the first call to ``CloseHandle``, or maybe it gets reopened with the + same handle value when we suspend capturing. + + The workaround in this case will reopen stdio with a different fd which + also means a different handle by replicating the logic in + "Py_lifecycle.c:initstdio/create_stdio". + + :param stream: + In practice ``sys.stdout`` or ``sys.stderr``, but given + here as parameter for unittesting purposes. + + See https://github.com/pytest-dev/py/issues/103. + """ + if not sys.platform.startswith("win32") or hasattr(sys, "pypy_version_info"): + return + + # Bail out if ``stream`` doesn't seem like a proper ``io`` stream (#2666). + if not hasattr(stream, "buffer"): # type: ignore[unreachable] + return + + buffered = hasattr(stream.buffer, "raw") + raw_stdout = stream.buffer.raw if buffered else stream.buffer # type: ignore[attr-defined] + + if not isinstance(raw_stdout, io._WindowsConsoleIO): # type: ignore[attr-defined] + return + + def _reopen_stdio(f, mode): + if not buffered and mode[0] == "w": + buffering = 0 + else: + buffering = -1 + + return io.TextIOWrapper( + open(os.dup(f.fileno()), mode, buffering), # type: ignore[arg-type] + f.encoding, + f.errors, + f.newlines, + f.line_buffering, + ) + + sys.stdin = _reopen_stdio(sys.stdin, "rb") + sys.stdout = _reopen_stdio(sys.stdout, "wb") + sys.stderr = _reopen_stdio(sys.stderr, "wb") + + +@hookimpl(hookwrapper=True) +def pytest_load_initial_conftests(early_config: Config): ns = early_config.known_args_namespace if ns.capture == "fd": _py36_windowsconsoleio_workaround(sys.stdout) @@ -159,10 +159,10 @@ def pytest_load_initial_conftests(early_config: Config): capman = CaptureManager(ns.capture) pluginmanager.register(capman, "capturemanager") - # Make sure that capturemanager is properly reset at final shutdown. + # Make sure that capturemanager is properly reset at final shutdown. early_config.add_cleanup(capman.stop_global_capturing) - # Finally trigger conftest loading but while capturing (issue #93). + # Finally trigger conftest loading but while capturing (issue #93). capman.start_global_capturing() outcome = yield capman.suspend_global_capture() @@ -172,395 +172,395 @@ def pytest_load_initial_conftests(early_config: Config): sys.stderr.write(err) -# IO Helpers. - - -class EncodedFile(io.TextIOWrapper): - __slots__ = () - - @property - def name(self) -> str: - # Ensure that file.name is a string. Workaround for a Python bug - # fixed in >=3.7.4: https://bugs.python.org/issue36015 - return repr(self.buffer) - - @property - def mode(self) -> str: - # TextIOWrapper doesn't expose a mode, but at least some of our - # tests check it. - return self.buffer.mode.replace("b", "") - - -class CaptureIO(io.TextIOWrapper): - def __init__(self) -> None: - super().__init__(io.BytesIO(), encoding="UTF-8", newline="", write_through=True) - - def getvalue(self) -> str: - assert isinstance(self.buffer, io.BytesIO) - return self.buffer.getvalue().decode("UTF-8") - - -class TeeCaptureIO(CaptureIO): - def __init__(self, other: TextIO) -> None: - self._other = other - super().__init__() - - def write(self, s: str) -> int: - super().write(s) - return self._other.write(s) - - -class DontReadFromInput: - encoding = None - - def read(self, *args): - raise OSError( - "pytest: reading from stdin while output is captured! Consider using `-s`." - ) - - readline = read - readlines = read - __next__ = read - - def __iter__(self): - return self - - def fileno(self) -> int: - raise UnsupportedOperation("redirected stdin is pseudofile, has no fileno()") - - def isatty(self) -> bool: - return False - - def close(self) -> None: - pass - - @property - def buffer(self): - return self - - -# Capture classes. - - -patchsysdict = {0: "stdin", 1: "stdout", 2: "stderr"} - - -class NoCapture: - EMPTY_BUFFER = None - __init__ = start = done = suspend = resume = lambda *args: None - - -class SysCaptureBinary: - - EMPTY_BUFFER = b"" - - def __init__(self, fd: int, tmpfile=None, *, tee: bool = False) -> None: - name = patchsysdict[fd] - self._old = getattr(sys, name) - self.name = name - if tmpfile is None: - if name == "stdin": - tmpfile = DontReadFromInput() - else: - tmpfile = CaptureIO() if not tee else TeeCaptureIO(self._old) - self.tmpfile = tmpfile - self._state = "initialized" - - def repr(self, class_name: str) -> str: - return "<{} {} _old={} _state={!r} tmpfile={!r}>".format( - class_name, - self.name, - hasattr(self, "_old") and repr(self._old) or "<UNSET>", - self._state, - self.tmpfile, - ) - - def __repr__(self) -> str: - return "<{} {} _old={} _state={!r} tmpfile={!r}>".format( - self.__class__.__name__, - self.name, - hasattr(self, "_old") and repr(self._old) or "<UNSET>", - self._state, - self.tmpfile, - ) - - def _assert_state(self, op: str, states: Tuple[str, ...]) -> None: - assert ( - self._state in states - ), "cannot {} in state {!r}: expected one of {}".format( - op, self._state, ", ".join(states) - ) - - def start(self) -> None: - self._assert_state("start", ("initialized",)) - setattr(sys, self.name, self.tmpfile) - self._state = "started" - - def snap(self): - self._assert_state("snap", ("started", "suspended")) - self.tmpfile.seek(0) - res = self.tmpfile.buffer.read() - self.tmpfile.seek(0) - self.tmpfile.truncate() - return res - - def done(self) -> None: - self._assert_state("done", ("initialized", "started", "suspended", "done")) - if self._state == "done": - return - setattr(sys, self.name, self._old) - del self._old - self.tmpfile.close() - self._state = "done" - - def suspend(self) -> None: - self._assert_state("suspend", ("started", "suspended")) - setattr(sys, self.name, self._old) - self._state = "suspended" - - def resume(self) -> None: - self._assert_state("resume", ("started", "suspended")) - if self._state == "started": - return - setattr(sys, self.name, self.tmpfile) - self._state = "started" - - def writeorg(self, data) -> None: - self._assert_state("writeorg", ("started", "suspended")) - self._old.flush() - self._old.buffer.write(data) - self._old.buffer.flush() - - -class SysCapture(SysCaptureBinary): - EMPTY_BUFFER = "" # type: ignore[assignment] - - def snap(self): - res = self.tmpfile.getvalue() - self.tmpfile.seek(0) - self.tmpfile.truncate() - return res - - def writeorg(self, data): - self._assert_state("writeorg", ("started", "suspended")) - self._old.write(data) - self._old.flush() - - -class FDCaptureBinary: - """Capture IO to/from a given OS-level file descriptor. - - snap() produces `bytes`. +# IO Helpers. + + +class EncodedFile(io.TextIOWrapper): + __slots__ = () + + @property + def name(self) -> str: + # Ensure that file.name is a string. Workaround for a Python bug + # fixed in >=3.7.4: https://bugs.python.org/issue36015 + return repr(self.buffer) + + @property + def mode(self) -> str: + # TextIOWrapper doesn't expose a mode, but at least some of our + # tests check it. + return self.buffer.mode.replace("b", "") + + +class CaptureIO(io.TextIOWrapper): + def __init__(self) -> None: + super().__init__(io.BytesIO(), encoding="UTF-8", newline="", write_through=True) + + def getvalue(self) -> str: + assert isinstance(self.buffer, io.BytesIO) + return self.buffer.getvalue().decode("UTF-8") + + +class TeeCaptureIO(CaptureIO): + def __init__(self, other: TextIO) -> None: + self._other = other + super().__init__() + + def write(self, s: str) -> int: + super().write(s) + return self._other.write(s) + + +class DontReadFromInput: + encoding = None + + def read(self, *args): + raise OSError( + "pytest: reading from stdin while output is captured! Consider using `-s`." + ) + + readline = read + readlines = read + __next__ = read + + def __iter__(self): + return self + + def fileno(self) -> int: + raise UnsupportedOperation("redirected stdin is pseudofile, has no fileno()") + + def isatty(self) -> bool: + return False + + def close(self) -> None: + pass + + @property + def buffer(self): + return self + + +# Capture classes. + + +patchsysdict = {0: "stdin", 1: "stdout", 2: "stderr"} + + +class NoCapture: + EMPTY_BUFFER = None + __init__ = start = done = suspend = resume = lambda *args: None + + +class SysCaptureBinary: + + EMPTY_BUFFER = b"" + + def __init__(self, fd: int, tmpfile=None, *, tee: bool = False) -> None: + name = patchsysdict[fd] + self._old = getattr(sys, name) + self.name = name + if tmpfile is None: + if name == "stdin": + tmpfile = DontReadFromInput() + else: + tmpfile = CaptureIO() if not tee else TeeCaptureIO(self._old) + self.tmpfile = tmpfile + self._state = "initialized" + + def repr(self, class_name: str) -> str: + return "<{} {} _old={} _state={!r} tmpfile={!r}>".format( + class_name, + self.name, + hasattr(self, "_old") and repr(self._old) or "<UNSET>", + self._state, + self.tmpfile, + ) + + def __repr__(self) -> str: + return "<{} {} _old={} _state={!r} tmpfile={!r}>".format( + self.__class__.__name__, + self.name, + hasattr(self, "_old") and repr(self._old) or "<UNSET>", + self._state, + self.tmpfile, + ) + + def _assert_state(self, op: str, states: Tuple[str, ...]) -> None: + assert ( + self._state in states + ), "cannot {} in state {!r}: expected one of {}".format( + op, self._state, ", ".join(states) + ) + + def start(self) -> None: + self._assert_state("start", ("initialized",)) + setattr(sys, self.name, self.tmpfile) + self._state = "started" + + def snap(self): + self._assert_state("snap", ("started", "suspended")) + self.tmpfile.seek(0) + res = self.tmpfile.buffer.read() + self.tmpfile.seek(0) + self.tmpfile.truncate() + return res + + def done(self) -> None: + self._assert_state("done", ("initialized", "started", "suspended", "done")) + if self._state == "done": + return + setattr(sys, self.name, self._old) + del self._old + self.tmpfile.close() + self._state = "done" + + def suspend(self) -> None: + self._assert_state("suspend", ("started", "suspended")) + setattr(sys, self.name, self._old) + self._state = "suspended" + + def resume(self) -> None: + self._assert_state("resume", ("started", "suspended")) + if self._state == "started": + return + setattr(sys, self.name, self.tmpfile) + self._state = "started" + + def writeorg(self, data) -> None: + self._assert_state("writeorg", ("started", "suspended")) + self._old.flush() + self._old.buffer.write(data) + self._old.buffer.flush() + + +class SysCapture(SysCaptureBinary): + EMPTY_BUFFER = "" # type: ignore[assignment] + + def snap(self): + res = self.tmpfile.getvalue() + self.tmpfile.seek(0) + self.tmpfile.truncate() + return res + + def writeorg(self, data): + self._assert_state("writeorg", ("started", "suspended")) + self._old.write(data) + self._old.flush() + + +class FDCaptureBinary: + """Capture IO to/from a given OS-level file descriptor. + + snap() produces `bytes`. """ - EMPTY_BUFFER = b"" - - def __init__(self, targetfd: int) -> None: - self.targetfd = targetfd - - try: - os.fstat(targetfd) - except OSError: - # FD capturing is conceptually simple -- create a temporary file, - # redirect the FD to it, redirect back when done. But when the - # target FD is invalid it throws a wrench into this loveley scheme. - # - # Tests themselves shouldn't care if the FD is valid, FD capturing - # should work regardless of external circumstances. So falling back - # to just sys capturing is not a good option. - # - # Further complications are the need to support suspend() and the - # possibility of FD reuse (e.g. the tmpfile getting the very same - # target FD). The following approach is robust, I believe. - self.targetfd_invalid: Optional[int] = os.open(os.devnull, os.O_RDWR) - os.dup2(self.targetfd_invalid, targetfd) - else: - self.targetfd_invalid = None - self.targetfd_save = os.dup(targetfd) - - if targetfd == 0: - self.tmpfile = open(os.devnull) - self.syscapture = SysCapture(targetfd) - else: - self.tmpfile = EncodedFile( - TemporaryFile(buffering=0), - encoding="utf-8", - errors="replace", - newline="", - write_through=True, - ) - if targetfd in patchsysdict: - self.syscapture = SysCapture(targetfd, self.tmpfile) - else: - self.syscapture = NoCapture() - - self._state = "initialized" - - def __repr__(self) -> str: - return "<{} {} oldfd={} _state={!r} tmpfile={!r}>".format( - self.__class__.__name__, - self.targetfd, - self.targetfd_save, - self._state, - self.tmpfile, - ) - - def _assert_state(self, op: str, states: Tuple[str, ...]) -> None: - assert ( - self._state in states - ), "cannot {} in state {!r}: expected one of {}".format( - op, self._state, ", ".join(states) + EMPTY_BUFFER = b"" + + def __init__(self, targetfd: int) -> None: + self.targetfd = targetfd + + try: + os.fstat(targetfd) + except OSError: + # FD capturing is conceptually simple -- create a temporary file, + # redirect the FD to it, redirect back when done. But when the + # target FD is invalid it throws a wrench into this loveley scheme. + # + # Tests themselves shouldn't care if the FD is valid, FD capturing + # should work regardless of external circumstances. So falling back + # to just sys capturing is not a good option. + # + # Further complications are the need to support suspend() and the + # possibility of FD reuse (e.g. the tmpfile getting the very same + # target FD). The following approach is robust, I believe. + self.targetfd_invalid: Optional[int] = os.open(os.devnull, os.O_RDWR) + os.dup2(self.targetfd_invalid, targetfd) + else: + self.targetfd_invalid = None + self.targetfd_save = os.dup(targetfd) + + if targetfd == 0: + self.tmpfile = open(os.devnull) + self.syscapture = SysCapture(targetfd) + else: + self.tmpfile = EncodedFile( + TemporaryFile(buffering=0), + encoding="utf-8", + errors="replace", + newline="", + write_through=True, + ) + if targetfd in patchsysdict: + self.syscapture = SysCapture(targetfd, self.tmpfile) + else: + self.syscapture = NoCapture() + + self._state = "initialized" + + def __repr__(self) -> str: + return "<{} {} oldfd={} _state={!r} tmpfile={!r}>".format( + self.__class__.__name__, + self.targetfd, + self.targetfd_save, + self._state, + self.tmpfile, ) - def start(self) -> None: - """Start capturing on targetfd using memorized tmpfile.""" - self._assert_state("start", ("initialized",)) - os.dup2(self.tmpfile.fileno(), self.targetfd) - self.syscapture.start() - self._state = "started" - - def snap(self): - self._assert_state("snap", ("started", "suspended")) - self.tmpfile.seek(0) - res = self.tmpfile.buffer.read() - self.tmpfile.seek(0) - self.tmpfile.truncate() - return res - - def done(self) -> None: - """Stop capturing, restore streams, return original capture file, - seeked to position zero.""" - self._assert_state("done", ("initialized", "started", "suspended", "done")) - if self._state == "done": - return - os.dup2(self.targetfd_save, self.targetfd) - os.close(self.targetfd_save) - if self.targetfd_invalid is not None: - if self.targetfd_invalid != self.targetfd: - os.close(self.targetfd) - os.close(self.targetfd_invalid) - self.syscapture.done() - self.tmpfile.close() - self._state = "done" - - def suspend(self) -> None: - self._assert_state("suspend", ("started", "suspended")) - if self._state == "suspended": - return - self.syscapture.suspend() - os.dup2(self.targetfd_save, self.targetfd) - self._state = "suspended" - - def resume(self) -> None: - self._assert_state("resume", ("started", "suspended")) - if self._state == "started": - return - self.syscapture.resume() - os.dup2(self.tmpfile.fileno(), self.targetfd) - self._state = "started" - - def writeorg(self, data): - """Write to original file descriptor.""" - self._assert_state("writeorg", ("started", "suspended")) - os.write(self.targetfd_save, data) - - -class FDCapture(FDCaptureBinary): - """Capture IO to/from a given OS-level file descriptor. - - snap() produces text. + def _assert_state(self, op: str, states: Tuple[str, ...]) -> None: + assert ( + self._state in states + ), "cannot {} in state {!r}: expected one of {}".format( + op, self._state, ", ".join(states) + ) + + def start(self) -> None: + """Start capturing on targetfd using memorized tmpfile.""" + self._assert_state("start", ("initialized",)) + os.dup2(self.tmpfile.fileno(), self.targetfd) + self.syscapture.start() + self._state = "started" + + def snap(self): + self._assert_state("snap", ("started", "suspended")) + self.tmpfile.seek(0) + res = self.tmpfile.buffer.read() + self.tmpfile.seek(0) + self.tmpfile.truncate() + return res + + def done(self) -> None: + """Stop capturing, restore streams, return original capture file, + seeked to position zero.""" + self._assert_state("done", ("initialized", "started", "suspended", "done")) + if self._state == "done": + return + os.dup2(self.targetfd_save, self.targetfd) + os.close(self.targetfd_save) + if self.targetfd_invalid is not None: + if self.targetfd_invalid != self.targetfd: + os.close(self.targetfd) + os.close(self.targetfd_invalid) + self.syscapture.done() + self.tmpfile.close() + self._state = "done" + + def suspend(self) -> None: + self._assert_state("suspend", ("started", "suspended")) + if self._state == "suspended": + return + self.syscapture.suspend() + os.dup2(self.targetfd_save, self.targetfd) + self._state = "suspended" + + def resume(self) -> None: + self._assert_state("resume", ("started", "suspended")) + if self._state == "started": + return + self.syscapture.resume() + os.dup2(self.tmpfile.fileno(), self.targetfd) + self._state = "started" + + def writeorg(self, data): + """Write to original file descriptor.""" + self._assert_state("writeorg", ("started", "suspended")) + os.write(self.targetfd_save, data) + + +class FDCapture(FDCaptureBinary): + """Capture IO to/from a given OS-level file descriptor. + + snap() produces text. """ - # Ignore type because it doesn't match the type in the superclass (bytes). - EMPTY_BUFFER = "" # type: ignore - - def snap(self): - self._assert_state("snap", ("started", "suspended")) - self.tmpfile.seek(0) - res = self.tmpfile.read() - self.tmpfile.seek(0) - self.tmpfile.truncate() - return res - - def writeorg(self, data): - """Write to original file descriptor.""" - super().writeorg(data.encode("utf-8")) # XXX use encoding of original stream + # Ignore type because it doesn't match the type in the superclass (bytes). + EMPTY_BUFFER = "" # type: ignore + def snap(self): + self._assert_state("snap", ("started", "suspended")) + self.tmpfile.seek(0) + res = self.tmpfile.read() + self.tmpfile.seek(0) + self.tmpfile.truncate() + return res -# MultiCapture + def writeorg(self, data): + """Write to original file descriptor.""" + super().writeorg(data.encode("utf-8")) # XXX use encoding of original stream -# This class was a namedtuple, but due to mypy limitation[0] it could not be -# made generic, so was replaced by a regular class which tries to emulate the -# pertinent parts of a namedtuple. If the mypy limitation is ever lifted, can -# make it a namedtuple again. -# [0]: https://github.com/python/mypy/issues/685 -@final -@functools.total_ordering -class CaptureResult(Generic[AnyStr]): - """The result of :method:`CaptureFixture.readouterr`.""" +# MultiCapture - __slots__ = ("out", "err") - def __init__(self, out: AnyStr, err: AnyStr) -> None: - self.out: AnyStr = out - self.err: AnyStr = err +# This class was a namedtuple, but due to mypy limitation[0] it could not be +# made generic, so was replaced by a regular class which tries to emulate the +# pertinent parts of a namedtuple. If the mypy limitation is ever lifted, can +# make it a namedtuple again. +# [0]: https://github.com/python/mypy/issues/685 +@final +@functools.total_ordering +class CaptureResult(Generic[AnyStr]): + """The result of :method:`CaptureFixture.readouterr`.""" - def __len__(self) -> int: - return 2 + __slots__ = ("out", "err") - def __iter__(self) -> Iterator[AnyStr]: - return iter((self.out, self.err)) + def __init__(self, out: AnyStr, err: AnyStr) -> None: + self.out: AnyStr = out + self.err: AnyStr = err - def __getitem__(self, item: int) -> AnyStr: - return tuple(self)[item] + def __len__(self) -> int: + return 2 - def _replace( - self, *, out: Optional[AnyStr] = None, err: Optional[AnyStr] = None - ) -> "CaptureResult[AnyStr]": - return CaptureResult( - out=self.out if out is None else out, err=self.err if err is None else err - ) + def __iter__(self) -> Iterator[AnyStr]: + return iter((self.out, self.err)) - def count(self, value: AnyStr) -> int: - return tuple(self).count(value) + def __getitem__(self, item: int) -> AnyStr: + return tuple(self)[item] - def index(self, value) -> int: - return tuple(self).index(value) + def _replace( + self, *, out: Optional[AnyStr] = None, err: Optional[AnyStr] = None + ) -> "CaptureResult[AnyStr]": + return CaptureResult( + out=self.out if out is None else out, err=self.err if err is None else err + ) - def __eq__(self, other: object) -> bool: - if not isinstance(other, (CaptureResult, tuple)): - return NotImplemented - return tuple(self) == tuple(other) + def count(self, value: AnyStr) -> int: + return tuple(self).count(value) - def __hash__(self) -> int: - return hash(tuple(self)) + def index(self, value) -> int: + return tuple(self).index(value) - def __lt__(self, other: object) -> bool: - if not isinstance(other, (CaptureResult, tuple)): - return NotImplemented - return tuple(self) < tuple(other) + def __eq__(self, other: object) -> bool: + if not isinstance(other, (CaptureResult, tuple)): + return NotImplemented + return tuple(self) == tuple(other) - def __repr__(self) -> str: - return f"CaptureResult(out={self.out!r}, err={self.err!r})" + def __hash__(self) -> int: + return hash(tuple(self)) + + def __lt__(self, other: object) -> bool: + if not isinstance(other, (CaptureResult, tuple)): + return NotImplemented + return tuple(self) < tuple(other) + def __repr__(self) -> str: + return f"CaptureResult(out={self.out!r}, err={self.err!r})" -class MultiCapture(Generic[AnyStr]): - _state = None - _in_suspended = False - def __init__(self, in_, out, err) -> None: - self.in_ = in_ - self.out = out - self.err = err +class MultiCapture(Generic[AnyStr]): + _state = None + _in_suspended = False - def __repr__(self) -> str: - return "<MultiCapture out={!r} err={!r} in_={!r} _state={!r} _in_suspended={!r}>".format( - self.out, self.err, self.in_, self._state, self._in_suspended, - ) + def __init__(self, in_, out, err) -> None: + self.in_ = in_ + self.out = out + self.err = err - def start_capturing(self) -> None: - self._state = "started" + def __repr__(self) -> str: + return "<MultiCapture out={!r} err={!r} in_={!r} _state={!r} _in_suspended={!r}>".format( + self.out, self.err, self.in_, self._state, self._in_suspended, + ) + + def start_capturing(self) -> None: + self._state = "started" if self.in_: self.in_.start() if self.out: @@ -568,8 +568,8 @@ class MultiCapture(Generic[AnyStr]): if self.err: self.err.start() - def pop_outerr_to_orig(self) -> Tuple[AnyStr, AnyStr]: - """Pop current snapshot out/err capture and flush to orig streams.""" + def pop_outerr_to_orig(self) -> Tuple[AnyStr, AnyStr]: + """Pop current snapshot out/err capture and flush to orig streams.""" out, err = self.readouterr() if out: self.out.writeorg(out) @@ -577,8 +577,8 @@ class MultiCapture(Generic[AnyStr]): self.err.writeorg(err) return out, err - def suspend_capturing(self, in_: bool = False) -> None: - self._state = "suspended" + def suspend_capturing(self, in_: bool = False) -> None: + self._state = "suspended" if self.out: self.out.suspend() if self.err: @@ -587,21 +587,21 @@ class MultiCapture(Generic[AnyStr]): self.in_.suspend() self._in_suspended = True - def resume_capturing(self) -> None: - self._state = "started" + def resume_capturing(self) -> None: + self._state = "started" if self.out: self.out.resume() if self.err: self.err.resume() - if self._in_suspended: + if self._in_suspended: self.in_.resume() - self._in_suspended = False + self._in_suspended = False - def stop_capturing(self) -> None: - """Stop capturing and reset capturing streams.""" - if self._state == "stopped": + def stop_capturing(self) -> None: + """Stop capturing and reset capturing streams.""" + if self._state == "stopped": raise ValueError("was already stopped") - self._state = "stopped" + self._state = "stopped" if self.out: self.out.done() if self.err: @@ -609,359 +609,359 @@ class MultiCapture(Generic[AnyStr]): if self.in_: self.in_.done() - def is_started(self) -> bool: - """Whether actively capturing -- not suspended or stopped.""" - return self._state == "started" - - def readouterr(self) -> CaptureResult[AnyStr]: - if self.out: - out = self.out.snap() - else: - out = "" - if self.err: - err = self.err.snap() - else: - err = "" - return CaptureResult(out, err) - - -def _get_multicapture(method: "_CaptureMethod") -> MultiCapture[str]: - if method == "fd": - return MultiCapture(in_=FDCapture(0), out=FDCapture(1), err=FDCapture(2)) - elif method == "sys": - return MultiCapture(in_=SysCapture(0), out=SysCapture(1), err=SysCapture(2)) - elif method == "no": - return MultiCapture(in_=None, out=None, err=None) - elif method == "tee-sys": - return MultiCapture( - in_=None, out=SysCapture(1, tee=True), err=SysCapture(2, tee=True) - ) - raise ValueError(f"unknown capturing method: {method!r}") - - -# CaptureManager and CaptureFixture - - -class CaptureManager: - """The capture plugin. - - Manages that the appropriate capture method is enabled/disabled during - collection and each test phase (setup, call, teardown). After each of - those points, the captured output is obtained and attached to the - collection/runtest report. - - There are two levels of capture: - - * global: enabled by default and can be suppressed by the ``-s`` - option. This is always enabled/disabled during collection and each test - phase. - - * fixture: when a test function or one of its fixture depend on the - ``capsys`` or ``capfd`` fixtures. In this case special handling is - needed to ensure the fixtures take precedence over the global capture. - """ - - def __init__(self, method: "_CaptureMethod") -> None: - self._method = method - self._global_capturing: Optional[MultiCapture[str]] = None - self._capture_fixture: Optional[CaptureFixture[Any]] = None - - def __repr__(self) -> str: - return "<CaptureManager _method={!r} _global_capturing={!r} _capture_fixture={!r}>".format( - self._method, self._global_capturing, self._capture_fixture - ) - - def is_capturing(self) -> Union[str, bool]: - if self.is_globally_capturing(): - return "global" - if self._capture_fixture: - return "fixture %s" % self._capture_fixture.request.fixturename - return False - - # Global capturing control - - def is_globally_capturing(self) -> bool: - return self._method != "no" - - def start_global_capturing(self) -> None: - assert self._global_capturing is None - self._global_capturing = _get_multicapture(self._method) - self._global_capturing.start_capturing() - - def stop_global_capturing(self) -> None: - if self._global_capturing is not None: - self._global_capturing.pop_outerr_to_orig() - self._global_capturing.stop_capturing() - self._global_capturing = None - - def resume_global_capture(self) -> None: - # During teardown of the python process, and on rare occasions, capture - # attributes can be `None` while trying to resume global capture. - if self._global_capturing is not None: - self._global_capturing.resume_capturing() - - def suspend_global_capture(self, in_: bool = False) -> None: - if self._global_capturing is not None: - self._global_capturing.suspend_capturing(in_=in_) - - def suspend(self, in_: bool = False) -> None: - # Need to undo local capsys-et-al if it exists before disabling global capture. - self.suspend_fixture() - self.suspend_global_capture(in_) - - def resume(self) -> None: - self.resume_global_capture() - self.resume_fixture() - - def read_global_capture(self) -> CaptureResult[str]: - assert self._global_capturing is not None - return self._global_capturing.readouterr() - - # Fixture Control - - def set_fixture(self, capture_fixture: "CaptureFixture[Any]") -> None: - if self._capture_fixture: - current_fixture = self._capture_fixture.request.fixturename - requested_fixture = capture_fixture.request.fixturename - capture_fixture.request.raiseerror( - "cannot use {} and {} at the same time".format( - requested_fixture, current_fixture - ) - ) - self._capture_fixture = capture_fixture - - def unset_fixture(self) -> None: - self._capture_fixture = None - - def activate_fixture(self) -> None: - """If the current item is using ``capsys`` or ``capfd``, activate - them so they take precedence over the global capture.""" - if self._capture_fixture: - self._capture_fixture._start() - - def deactivate_fixture(self) -> None: - """Deactivate the ``capsys`` or ``capfd`` fixture of this item, if any.""" - if self._capture_fixture: - self._capture_fixture.close() - - def suspend_fixture(self) -> None: - if self._capture_fixture: - self._capture_fixture._suspend() - - def resume_fixture(self) -> None: - if self._capture_fixture: - self._capture_fixture._resume() - - # Helper context managers - - @contextlib.contextmanager - def global_and_fixture_disabled(self) -> Generator[None, None, None]: - """Context manager to temporarily disable global and current fixture capturing.""" - do_fixture = self._capture_fixture and self._capture_fixture._is_started() - if do_fixture: - self.suspend_fixture() - do_global = self._global_capturing and self._global_capturing.is_started() - if do_global: - self.suspend_global_capture() - try: - yield - finally: - if do_global: - self.resume_global_capture() - if do_fixture: - self.resume_fixture() - - @contextlib.contextmanager - def item_capture(self, when: str, item: Item) -> Generator[None, None, None]: - self.resume_global_capture() - self.activate_fixture() - try: - yield - finally: - self.deactivate_fixture() - self.suspend_global_capture(in_=False) - - out, err = self.read_global_capture() - item.add_report_section(when, "stdout", out) - item.add_report_section(when, "stderr", err) - - # Hooks - - @hookimpl(hookwrapper=True) - def pytest_make_collect_report(self, collector: Collector): - if isinstance(collector, File): - self.resume_global_capture() - outcome = yield - self.suspend_global_capture() - out, err = self.read_global_capture() - rep = outcome.get_result() - if out: - rep.sections.append(("Captured stdout", out)) - if err: - rep.sections.append(("Captured stderr", err)) - else: - yield - - @hookimpl(hookwrapper=True) - def pytest_runtest_setup(self, item: Item) -> Generator[None, None, None]: - with self.item_capture("setup", item): - yield - - @hookimpl(hookwrapper=True) - def pytest_runtest_call(self, item: Item) -> Generator[None, None, None]: - with self.item_capture("call", item): - yield - - @hookimpl(hookwrapper=True) - def pytest_runtest_teardown(self, item: Item) -> Generator[None, None, None]: - with self.item_capture("teardown", item): - yield - - @hookimpl(tryfirst=True) - def pytest_keyboard_interrupt(self) -> None: - self.stop_global_capturing() - - @hookimpl(tryfirst=True) - def pytest_internalerror(self) -> None: - self.stop_global_capturing() - - -class CaptureFixture(Generic[AnyStr]): - """Object returned by the :fixture:`capsys`, :fixture:`capsysbinary`, - :fixture:`capfd` and :fixture:`capfdbinary` fixtures.""" - - def __init__( - self, captureclass, request: SubRequest, *, _ispytest: bool = False - ) -> None: - check_ispytest(_ispytest) - self.captureclass = captureclass - self.request = request - self._capture: Optional[MultiCapture[AnyStr]] = None - self._captured_out = self.captureclass.EMPTY_BUFFER - self._captured_err = self.captureclass.EMPTY_BUFFER - - def _start(self) -> None: - if self._capture is None: - self._capture = MultiCapture( - in_=None, out=self.captureclass(1), err=self.captureclass(2), - ) - self._capture.start_capturing() - - def close(self) -> None: - if self._capture is not None: - out, err = self._capture.pop_outerr_to_orig() - self._captured_out += out - self._captured_err += err - self._capture.stop_capturing() - self._capture = None - - def readouterr(self) -> CaptureResult[AnyStr]: - """Read and return the captured output so far, resetting the internal - buffer. - - :returns: - The captured content as a namedtuple with ``out`` and ``err`` - string attributes. - """ - captured_out, captured_err = self._captured_out, self._captured_err - if self._capture is not None: - out, err = self._capture.readouterr() - captured_out += out - captured_err += err - self._captured_out = self.captureclass.EMPTY_BUFFER - self._captured_err = self.captureclass.EMPTY_BUFFER - return CaptureResult(captured_out, captured_err) - - def _suspend(self) -> None: - """Suspend this fixture's own capturing temporarily.""" - if self._capture is not None: - self._capture.suspend_capturing() - - def _resume(self) -> None: - """Resume this fixture's own capturing temporarily.""" - if self._capture is not None: - self._capture.resume_capturing() - - def _is_started(self) -> bool: - """Whether actively capturing -- not disabled or closed.""" - if self._capture is not None: - return self._capture.is_started() - return False - - @contextlib.contextmanager - def disabled(self) -> Generator[None, None, None]: - """Temporarily disable capturing while inside the ``with`` block.""" - capmanager = self.request.config.pluginmanager.getplugin("capturemanager") - with capmanager.global_and_fixture_disabled(): - yield - - -# The fixtures. - - -@fixture -def capsys(request: SubRequest) -> Generator[CaptureFixture[str], None, None]: - """Enable text capturing of writes to ``sys.stdout`` and ``sys.stderr``. - - The captured output is made available via ``capsys.readouterr()`` method - calls, which return a ``(out, err)`` namedtuple. - ``out`` and ``err`` will be ``text`` objects. - """ - capman = request.config.pluginmanager.getplugin("capturemanager") - capture_fixture = CaptureFixture[str](SysCapture, request, _ispytest=True) - capman.set_fixture(capture_fixture) - capture_fixture._start() - yield capture_fixture - capture_fixture.close() - capman.unset_fixture() - - -@fixture -def capsysbinary(request: SubRequest) -> Generator[CaptureFixture[bytes], None, None]: - """Enable bytes capturing of writes to ``sys.stdout`` and ``sys.stderr``. - - The captured output is made available via ``capsysbinary.readouterr()`` - method calls, which return a ``(out, err)`` namedtuple. - ``out`` and ``err`` will be ``bytes`` objects. + def is_started(self) -> bool: + """Whether actively capturing -- not suspended or stopped.""" + return self._state == "started" + + def readouterr(self) -> CaptureResult[AnyStr]: + if self.out: + out = self.out.snap() + else: + out = "" + if self.err: + err = self.err.snap() + else: + err = "" + return CaptureResult(out, err) + + +def _get_multicapture(method: "_CaptureMethod") -> MultiCapture[str]: + if method == "fd": + return MultiCapture(in_=FDCapture(0), out=FDCapture(1), err=FDCapture(2)) + elif method == "sys": + return MultiCapture(in_=SysCapture(0), out=SysCapture(1), err=SysCapture(2)) + elif method == "no": + return MultiCapture(in_=None, out=None, err=None) + elif method == "tee-sys": + return MultiCapture( + in_=None, out=SysCapture(1, tee=True), err=SysCapture(2, tee=True) + ) + raise ValueError(f"unknown capturing method: {method!r}") + + +# CaptureManager and CaptureFixture + + +class CaptureManager: + """The capture plugin. + + Manages that the appropriate capture method is enabled/disabled during + collection and each test phase (setup, call, teardown). After each of + those points, the captured output is obtained and attached to the + collection/runtest report. + + There are two levels of capture: + + * global: enabled by default and can be suppressed by the ``-s`` + option. This is always enabled/disabled during collection and each test + phase. + + * fixture: when a test function or one of its fixture depend on the + ``capsys`` or ``capfd`` fixtures. In this case special handling is + needed to ensure the fixtures take precedence over the global capture. + """ + + def __init__(self, method: "_CaptureMethod") -> None: + self._method = method + self._global_capturing: Optional[MultiCapture[str]] = None + self._capture_fixture: Optional[CaptureFixture[Any]] = None + + def __repr__(self) -> str: + return "<CaptureManager _method={!r} _global_capturing={!r} _capture_fixture={!r}>".format( + self._method, self._global_capturing, self._capture_fixture + ) + + def is_capturing(self) -> Union[str, bool]: + if self.is_globally_capturing(): + return "global" + if self._capture_fixture: + return "fixture %s" % self._capture_fixture.request.fixturename + return False + + # Global capturing control + + def is_globally_capturing(self) -> bool: + return self._method != "no" + + def start_global_capturing(self) -> None: + assert self._global_capturing is None + self._global_capturing = _get_multicapture(self._method) + self._global_capturing.start_capturing() + + def stop_global_capturing(self) -> None: + if self._global_capturing is not None: + self._global_capturing.pop_outerr_to_orig() + self._global_capturing.stop_capturing() + self._global_capturing = None + + def resume_global_capture(self) -> None: + # During teardown of the python process, and on rare occasions, capture + # attributes can be `None` while trying to resume global capture. + if self._global_capturing is not None: + self._global_capturing.resume_capturing() + + def suspend_global_capture(self, in_: bool = False) -> None: + if self._global_capturing is not None: + self._global_capturing.suspend_capturing(in_=in_) + + def suspend(self, in_: bool = False) -> None: + # Need to undo local capsys-et-al if it exists before disabling global capture. + self.suspend_fixture() + self.suspend_global_capture(in_) + + def resume(self) -> None: + self.resume_global_capture() + self.resume_fixture() + + def read_global_capture(self) -> CaptureResult[str]: + assert self._global_capturing is not None + return self._global_capturing.readouterr() + + # Fixture Control + + def set_fixture(self, capture_fixture: "CaptureFixture[Any]") -> None: + if self._capture_fixture: + current_fixture = self._capture_fixture.request.fixturename + requested_fixture = capture_fixture.request.fixturename + capture_fixture.request.raiseerror( + "cannot use {} and {} at the same time".format( + requested_fixture, current_fixture + ) + ) + self._capture_fixture = capture_fixture + + def unset_fixture(self) -> None: + self._capture_fixture = None + + def activate_fixture(self) -> None: + """If the current item is using ``capsys`` or ``capfd``, activate + them so they take precedence over the global capture.""" + if self._capture_fixture: + self._capture_fixture._start() + + def deactivate_fixture(self) -> None: + """Deactivate the ``capsys`` or ``capfd`` fixture of this item, if any.""" + if self._capture_fixture: + self._capture_fixture.close() + + def suspend_fixture(self) -> None: + if self._capture_fixture: + self._capture_fixture._suspend() + + def resume_fixture(self) -> None: + if self._capture_fixture: + self._capture_fixture._resume() + + # Helper context managers + + @contextlib.contextmanager + def global_and_fixture_disabled(self) -> Generator[None, None, None]: + """Context manager to temporarily disable global and current fixture capturing.""" + do_fixture = self._capture_fixture and self._capture_fixture._is_started() + if do_fixture: + self.suspend_fixture() + do_global = self._global_capturing and self._global_capturing.is_started() + if do_global: + self.suspend_global_capture() + try: + yield + finally: + if do_global: + self.resume_global_capture() + if do_fixture: + self.resume_fixture() + + @contextlib.contextmanager + def item_capture(self, when: str, item: Item) -> Generator[None, None, None]: + self.resume_global_capture() + self.activate_fixture() + try: + yield + finally: + self.deactivate_fixture() + self.suspend_global_capture(in_=False) + + out, err = self.read_global_capture() + item.add_report_section(when, "stdout", out) + item.add_report_section(when, "stderr", err) + + # Hooks + + @hookimpl(hookwrapper=True) + def pytest_make_collect_report(self, collector: Collector): + if isinstance(collector, File): + self.resume_global_capture() + outcome = yield + self.suspend_global_capture() + out, err = self.read_global_capture() + rep = outcome.get_result() + if out: + rep.sections.append(("Captured stdout", out)) + if err: + rep.sections.append(("Captured stderr", err)) + else: + yield + + @hookimpl(hookwrapper=True) + def pytest_runtest_setup(self, item: Item) -> Generator[None, None, None]: + with self.item_capture("setup", item): + yield + + @hookimpl(hookwrapper=True) + def pytest_runtest_call(self, item: Item) -> Generator[None, None, None]: + with self.item_capture("call", item): + yield + + @hookimpl(hookwrapper=True) + def pytest_runtest_teardown(self, item: Item) -> Generator[None, None, None]: + with self.item_capture("teardown", item): + yield + + @hookimpl(tryfirst=True) + def pytest_keyboard_interrupt(self) -> None: + self.stop_global_capturing() + + @hookimpl(tryfirst=True) + def pytest_internalerror(self) -> None: + self.stop_global_capturing() + + +class CaptureFixture(Generic[AnyStr]): + """Object returned by the :fixture:`capsys`, :fixture:`capsysbinary`, + :fixture:`capfd` and :fixture:`capfdbinary` fixtures.""" + + def __init__( + self, captureclass, request: SubRequest, *, _ispytest: bool = False + ) -> None: + check_ispytest(_ispytest) + self.captureclass = captureclass + self.request = request + self._capture: Optional[MultiCapture[AnyStr]] = None + self._captured_out = self.captureclass.EMPTY_BUFFER + self._captured_err = self.captureclass.EMPTY_BUFFER + + def _start(self) -> None: + if self._capture is None: + self._capture = MultiCapture( + in_=None, out=self.captureclass(1), err=self.captureclass(2), + ) + self._capture.start_capturing() + + def close(self) -> None: + if self._capture is not None: + out, err = self._capture.pop_outerr_to_orig() + self._captured_out += out + self._captured_err += err + self._capture.stop_capturing() + self._capture = None + + def readouterr(self) -> CaptureResult[AnyStr]: + """Read and return the captured output so far, resetting the internal + buffer. + + :returns: + The captured content as a namedtuple with ``out`` and ``err`` + string attributes. + """ + captured_out, captured_err = self._captured_out, self._captured_err + if self._capture is not None: + out, err = self._capture.readouterr() + captured_out += out + captured_err += err + self._captured_out = self.captureclass.EMPTY_BUFFER + self._captured_err = self.captureclass.EMPTY_BUFFER + return CaptureResult(captured_out, captured_err) + + def _suspend(self) -> None: + """Suspend this fixture's own capturing temporarily.""" + if self._capture is not None: + self._capture.suspend_capturing() + + def _resume(self) -> None: + """Resume this fixture's own capturing temporarily.""" + if self._capture is not None: + self._capture.resume_capturing() + + def _is_started(self) -> bool: + """Whether actively capturing -- not disabled or closed.""" + if self._capture is not None: + return self._capture.is_started() + return False + + @contextlib.contextmanager + def disabled(self) -> Generator[None, None, None]: + """Temporarily disable capturing while inside the ``with`` block.""" + capmanager = self.request.config.pluginmanager.getplugin("capturemanager") + with capmanager.global_and_fixture_disabled(): + yield + + +# The fixtures. + + +@fixture +def capsys(request: SubRequest) -> Generator[CaptureFixture[str], None, None]: + """Enable text capturing of writes to ``sys.stdout`` and ``sys.stderr``. + + The captured output is made available via ``capsys.readouterr()`` method + calls, which return a ``(out, err)`` namedtuple. + ``out`` and ``err`` will be ``text`` objects. """ - capman = request.config.pluginmanager.getplugin("capturemanager") - capture_fixture = CaptureFixture[bytes](SysCaptureBinary, request, _ispytest=True) - capman.set_fixture(capture_fixture) - capture_fixture._start() - yield capture_fixture - capture_fixture.close() - capman.unset_fixture() - - -@fixture -def capfd(request: SubRequest) -> Generator[CaptureFixture[str], None, None]: - """Enable text capturing of writes to file descriptors ``1`` and ``2``. - - The captured output is made available via ``capfd.readouterr()`` method - calls, which return a ``(out, err)`` namedtuple. - ``out`` and ``err`` will be ``text`` objects. + capman = request.config.pluginmanager.getplugin("capturemanager") + capture_fixture = CaptureFixture[str](SysCapture, request, _ispytest=True) + capman.set_fixture(capture_fixture) + capture_fixture._start() + yield capture_fixture + capture_fixture.close() + capman.unset_fixture() + + +@fixture +def capsysbinary(request: SubRequest) -> Generator[CaptureFixture[bytes], None, None]: + """Enable bytes capturing of writes to ``sys.stdout`` and ``sys.stderr``. + + The captured output is made available via ``capsysbinary.readouterr()`` + method calls, which return a ``(out, err)`` namedtuple. + ``out`` and ``err`` will be ``bytes`` objects. """ - capman = request.config.pluginmanager.getplugin("capturemanager") - capture_fixture = CaptureFixture[str](FDCapture, request, _ispytest=True) - capman.set_fixture(capture_fixture) - capture_fixture._start() - yield capture_fixture - capture_fixture.close() - capman.unset_fixture() - - -@fixture -def capfdbinary(request: SubRequest) -> Generator[CaptureFixture[bytes], None, None]: - """Enable bytes capturing of writes to file descriptors ``1`` and ``2``. - - The captured output is made available via ``capfd.readouterr()`` method - calls, which return a ``(out, err)`` namedtuple. - ``out`` and ``err`` will be ``byte`` objects. + capman = request.config.pluginmanager.getplugin("capturemanager") + capture_fixture = CaptureFixture[bytes](SysCaptureBinary, request, _ispytest=True) + capman.set_fixture(capture_fixture) + capture_fixture._start() + yield capture_fixture + capture_fixture.close() + capman.unset_fixture() + + +@fixture +def capfd(request: SubRequest) -> Generator[CaptureFixture[str], None, None]: + """Enable text capturing of writes to file descriptors ``1`` and ``2``. + + The captured output is made available via ``capfd.readouterr()`` method + calls, which return a ``(out, err)`` namedtuple. + ``out`` and ``err`` will be ``text`` objects. """ - capman = request.config.pluginmanager.getplugin("capturemanager") - capture_fixture = CaptureFixture[bytes](FDCaptureBinary, request, _ispytest=True) - capman.set_fixture(capture_fixture) - capture_fixture._start() - yield capture_fixture - capture_fixture.close() - capman.unset_fixture() + capman = request.config.pluginmanager.getplugin("capturemanager") + capture_fixture = CaptureFixture[str](FDCapture, request, _ispytest=True) + capman.set_fixture(capture_fixture) + capture_fixture._start() + yield capture_fixture + capture_fixture.close() + capman.unset_fixture() + + +@fixture +def capfdbinary(request: SubRequest) -> Generator[CaptureFixture[bytes], None, None]: + """Enable bytes capturing of writes to file descriptors ``1`` and ``2``. + + The captured output is made available via ``capfd.readouterr()`` method + calls, which return a ``(out, err)`` namedtuple. + ``out`` and ``err`` will be ``byte`` objects. + """ + capman = request.config.pluginmanager.getplugin("capturemanager") + capture_fixture = CaptureFixture[bytes](FDCaptureBinary, request, _ispytest=True) + capman.set_fixture(capture_fixture) + capture_fixture._start() + yield capture_fixture + capture_fixture.close() + capman.unset_fixture() |