aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/pytest/py3/_pytest/capture.py
diff options
context:
space:
mode:
authorarcadia-devtools <arcadia-devtools@yandex-team.ru>2022-02-09 12:00:52 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 15:58:17 +0300
commit8e1413fed79d1e8036e65228af6c93399ccf5502 (patch)
tree502c9df7b2614d20541c7a2d39d390e9a51877cc /contrib/python/pytest/py3/_pytest/capture.py
parent6b813c17d56d1d05f92c61ddc347d0e4d358fe85 (diff)
downloadydb-8e1413fed79d1e8036e65228af6c93399ccf5502.tar.gz
intermediate changes
ref:614ed510ddd3cdf86a8c5dbf19afd113397e0172
Diffstat (limited to 'contrib/python/pytest/py3/_pytest/capture.py')
-rw-r--r--contrib/python/pytest/py3/_pytest/capture.py1318
1 files changed, 716 insertions, 602 deletions
diff --git a/contrib/python/pytest/py3/_pytest/capture.py b/contrib/python/pytest/py3/_pytest/capture.py
index 673bb07a9b..086302658c 100644
--- a/contrib/python/pytest/py3/_pytest/capture.py
+++ b/contrib/python/pytest/py3/_pytest/capture.py
@@ -1,40 +1,45 @@
-"""
-per-test stdout/stderr capturing mechanism.
-
-"""
-import collections
+"""Per-test stdout/stderr capturing mechanism."""
import contextlib
+import functools
import io
import os
import sys
from io import UnsupportedOperation
from tempfile import TemporaryFile
-from typing import BinaryIO
+from typing import Any
+from typing import AnyStr
from typing import Generator
-from typing import Iterable
+from typing import Generic
+from typing import Iterator
from typing import Optional
+from typing import TextIO
+from typing import Tuple
+from typing import TYPE_CHECKING
+from typing import Union
-import pytest
-from _pytest.compat import CaptureAndPassthroughIO
-from _pytest.compat import CaptureIO
-from _pytest.compat import TYPE_CHECKING
+from _pytest.compat import final
from _pytest.config import Config
-from _pytest.fixtures import FixtureRequest
+from _pytest.config import hookimpl
+from _pytest.config.argparsing import Parser
+from _pytest.deprecated import check_ispytest
+from _pytest.fixtures import fixture
+from _pytest.fixtures import SubRequest
+from _pytest.nodes import Collector
+from _pytest.nodes import File
+from _pytest.nodes import Item
if TYPE_CHECKING:
from typing_extensions import Literal
_CaptureMethod = Literal["fd", "sys", "no", "tee-sys"]
-patchsysdict = {0: "stdin", 1: "stdout", 2: "stderr"}
-
-def pytest_addoption(parser):
+def pytest_addoption(parser: Parser) -> None:
group = parser.getgroup("general")
group._addoption(
"--capture",
action="store",
- default="fd" if hasattr(os, "dup") else "sys",
+ default="fd",
metavar="method",
choices=["fd", "sys", "no", "tee-sys"],
help="per-test capturing method: one of fd|sys|no|tee-sys.",
@@ -48,7 +53,102 @@ def pytest_addoption(parser):
)
-@pytest.hookimpl(hookwrapper=True)
+def _colorama_workaround() -> None:
+ """Ensure colorama is imported so that it attaches to the correct stdio
+ handles on Windows.
+
+ colorama uses the terminal on import time. So if something does the
+ first import of colorama while I/O capture is active, colorama will
+ fail in various ways.
+ """
+ if sys.platform.startswith("win32"):
+ try:
+ import colorama # noqa: F401
+ except ImportError:
+ pass
+
+
+def _readline_workaround() -> None:
+ """Ensure readline is imported so that it attaches to the correct stdio
+ handles on Windows.
+
+ Pdb uses readline support where available--when not running from the Python
+ prompt, the readline module is not imported until running the pdb REPL. If
+ running pytest with the --pdb option this means the readline module is not
+ imported until after I/O capture has been started.
+
+ This is a problem for pyreadline, which is often used to implement readline
+ support on Windows, as it does not attach to the correct handles for stdout
+ and/or stdin if they have been redirected by the FDCapture mechanism. This
+ workaround ensures that readline is imported before I/O capture is setup so
+ that it can attach to the actual stdin/out for the console.
+
+ See https://github.com/pytest-dev/pytest/pull/1281.
+ """
+ if sys.platform.startswith("win32"):
+ try:
+ import readline # noqa: F401
+ except ImportError:
+ pass
+
+
+def _py36_windowsconsoleio_workaround(stream: TextIO) -> None:
+ """Workaround for Windows Unicode console handling on Python>=3.6.
+
+ Python 3.6 implemented Unicode console handling for Windows. This works
+ by reading/writing to the raw console handle using
+ ``{Read,Write}ConsoleW``.
+
+ The problem is that we are going to ``dup2`` over the stdio file
+ descriptors when doing ``FDCapture`` and this will ``CloseHandle`` the
+ handles used by Python to write to the console. Though there is still some
+ weirdness and the console handle seems to only be closed randomly and not
+ on the first call to ``CloseHandle``, or maybe it gets reopened with the
+ same handle value when we suspend capturing.
+
+ The workaround in this case will reopen stdio with a different fd which
+ also means a different handle by replicating the logic in
+ "Py_lifecycle.c:initstdio/create_stdio".
+
+ :param stream:
+ In practice ``sys.stdout`` or ``sys.stderr``, but given
+ here as parameter for unittesting purposes.
+
+ See https://github.com/pytest-dev/py/issues/103.
+ """
+ if not sys.platform.startswith("win32") or hasattr(sys, "pypy_version_info"):
+ return
+
+ # Bail out if ``stream`` doesn't seem like a proper ``io`` stream (#2666).
+ if not hasattr(stream, "buffer"): # type: ignore[unreachable]
+ return
+
+ buffered = hasattr(stream.buffer, "raw")
+ raw_stdout = stream.buffer.raw if buffered else stream.buffer # type: ignore[attr-defined]
+
+ if not isinstance(raw_stdout, io._WindowsConsoleIO): # type: ignore[attr-defined]
+ return
+
+ def _reopen_stdio(f, mode):
+ if not buffered and mode[0] == "w":
+ buffering = 0
+ else:
+ buffering = -1
+
+ return io.TextIOWrapper(
+ open(os.dup(f.fileno()), mode, buffering), # type: ignore[arg-type]
+ f.encoding,
+ f.errors,
+ f.newlines,
+ f.line_buffering,
+ )
+
+ sys.stdin = _reopen_stdio(sys.stdin, "rb")
+ sys.stdout = _reopen_stdio(sys.stdout, "wb")
+ sys.stderr = _reopen_stdio(sys.stderr, "wb")
+
+
+@hookimpl(hookwrapper=True)
def pytest_load_initial_conftests(early_config: Config):
ns = early_config.known_args_namespace
if ns.capture == "fd":
@@ -59,10 +159,10 @@ def pytest_load_initial_conftests(early_config: Config):
capman = CaptureManager(ns.capture)
pluginmanager.register(capman, "capturemanager")
- # make sure that capturemanager is properly reset at final shutdown
+ # Make sure that capturemanager is properly reset at final shutdown.
early_config.add_cleanup(capman.stop_global_capturing)
- # finally trigger conftest loading but while capturing (issue93)
+ # Finally trigger conftest loading but while capturing (issue #93).
capman.start_global_capturing()
outcome = yield
capman.suspend_global_capture()
@@ -72,395 +172,394 @@ def pytest_load_initial_conftests(early_config: Config):
sys.stderr.write(err)
-def _get_multicapture(method: "_CaptureMethod") -> "MultiCapture":
- if method == "fd":
- return MultiCapture(out=True, err=True, Capture=FDCapture)
- elif method == "sys":
- return MultiCapture(out=True, err=True, Capture=SysCapture)
- elif method == "no":
- return MultiCapture(out=False, err=False, in_=False)
- elif method == "tee-sys":
- return MultiCapture(out=True, err=True, in_=False, Capture=TeeSysCapture)
- raise ValueError("unknown capturing method: {!r}".format(method))
+# IO Helpers.
-class CaptureManager:
- """
- Capture plugin, manages that the appropriate capture method is enabled/disabled during collection and each
- test phase (setup, call, teardown). After each of those points, the captured output is obtained and
- attached to the collection/runtest report.
+class EncodedFile(io.TextIOWrapper):
+ __slots__ = ()
- There are two levels of capture:
- * global: which is enabled by default and can be suppressed by the ``-s`` option. This is always enabled/disabled
- during collection and each test phase.
- * fixture: when a test function or one of its fixture depend on the ``capsys`` or ``capfd`` fixtures. In this
- case special handling is needed to ensure the fixtures take precedence over the global capture.
- """
+ @property
+ def name(self) -> str:
+ # Ensure that file.name is a string. Workaround for a Python bug
+ # fixed in >=3.7.4: https://bugs.python.org/issue36015
+ return repr(self.buffer)
- def __init__(self, method: "_CaptureMethod") -> None:
- self._method = method
- self._global_capturing = None
- self._capture_fixture = None # type: Optional[CaptureFixture]
+ @property
+ def mode(self) -> str:
+ # TextIOWrapper doesn't expose a mode, but at least some of our
+ # tests check it.
+ return self.buffer.mode.replace("b", "")
- def __repr__(self):
- return "<CaptureManager _method={!r} _global_capturing={!r} _capture_fixture={!r}>".format(
- self._method, self._global_capturing, self._capture_fixture
+
+class CaptureIO(io.TextIOWrapper):
+ def __init__(self) -> None:
+ super().__init__(io.BytesIO(), encoding="UTF-8", newline="", write_through=True)
+
+ def getvalue(self) -> str:
+ assert isinstance(self.buffer, io.BytesIO)
+ return self.buffer.getvalue().decode("UTF-8")
+
+
+class TeeCaptureIO(CaptureIO):
+ def __init__(self, other: TextIO) -> None:
+ self._other = other
+ super().__init__()
+
+ def write(self, s: str) -> int:
+ super().write(s)
+ return self._other.write(s)
+
+
+class DontReadFromInput:
+ encoding = None
+
+ def read(self, *args):
+ raise OSError(
+ "pytest: reading from stdin while output is captured! Consider using `-s`."
)
- def is_capturing(self):
- if self.is_globally_capturing():
- return "global"
- if self._capture_fixture:
- return "fixture %s" % self._capture_fixture.request.fixturename
- return False
+ readline = read
+ readlines = read
+ __next__ = read
- # Global capturing control
+ def __iter__(self):
+ return self
- def is_globally_capturing(self):
- return self._method != "no"
+ def fileno(self) -> int:
+ raise UnsupportedOperation("redirected stdin is pseudofile, has no fileno()")
- def start_global_capturing(self):
- assert self._global_capturing is None
- self._global_capturing = _get_multicapture(self._method)
- self._global_capturing.start_capturing()
+ def isatty(self) -> bool:
+ return False
- def stop_global_capturing(self):
- if self._global_capturing is not None:
- self._global_capturing.pop_outerr_to_orig()
- self._global_capturing.stop_capturing()
- self._global_capturing = None
+ def close(self) -> None:
+ pass
- def resume_global_capture(self):
- # During teardown of the python process, and on rare occasions, capture
- # attributes can be `None` while trying to resume global capture.
- if self._global_capturing is not None:
- self._global_capturing.resume_capturing()
+ @property
+ def buffer(self):
+ return self
- def suspend_global_capture(self, in_=False):
- cap = getattr(self, "_global_capturing", None)
- if cap is not None:
- cap.suspend_capturing(in_=in_)
- def suspend(self, in_=False):
- # Need to undo local capsys-et-al if it exists before disabling global capture.
- self.suspend_fixture()
- self.suspend_global_capture(in_)
+# Capture classes.
- def resume(self):
- self.resume_global_capture()
- self.resume_fixture()
- def read_global_capture(self):
- return self._global_capturing.readouterr()
+patchsysdict = {0: "stdin", 1: "stdout", 2: "stderr"}
- # Fixture Control (it's just forwarding, think about removing this later)
- @contextlib.contextmanager
- def _capturing_for_request(
- self, request: FixtureRequest
- ) -> Generator["CaptureFixture", None, None]:
- """
- Context manager that creates a ``CaptureFixture`` instance for the
- given ``request``, ensuring there is only a single one being requested
- at the same time.
+class NoCapture:
+ EMPTY_BUFFER = None
+ __init__ = start = done = suspend = resume = lambda *args: None
- This is used as a helper with ``capsys``, ``capfd`` etc.
- """
- if self._capture_fixture:
- other_name = next(
- k
- for k, v in map_fixname_class.items()
- if v is self._capture_fixture.captureclass
- )
- raise request.raiseerror(
- "cannot use {} and {} at the same time".format(
- request.fixturename, other_name
- )
- )
- capture_class = map_fixname_class[request.fixturename]
- self._capture_fixture = CaptureFixture(capture_class, request)
- self.activate_fixture()
- yield self._capture_fixture
- self._capture_fixture.close()
- self._capture_fixture = None
- def activate_fixture(self):
- """If the current item is using ``capsys`` or ``capfd``, activate them so they take precedence over
- the global capture.
- """
- if self._capture_fixture:
- self._capture_fixture._start()
+class SysCaptureBinary:
- def deactivate_fixture(self):
- """Deactivates the ``capsys`` or ``capfd`` fixture of this item, if any."""
- if self._capture_fixture:
- self._capture_fixture.close()
+ EMPTY_BUFFER = b""
- def suspend_fixture(self):
- if self._capture_fixture:
- self._capture_fixture._suspend()
+ def __init__(self, fd: int, tmpfile=None, *, tee: bool = False) -> None:
+ name = patchsysdict[fd]
+ self._old = getattr(sys, name)
+ self.name = name
+ if tmpfile is None:
+ if name == "stdin":
+ tmpfile = DontReadFromInput()
+ else:
+ tmpfile = CaptureIO() if not tee else TeeCaptureIO(self._old)
+ self.tmpfile = tmpfile
+ self._state = "initialized"
- def resume_fixture(self):
- if self._capture_fixture:
- self._capture_fixture._resume()
+ def repr(self, class_name: str) -> str:
+ return "<{} {} _old={} _state={!r} tmpfile={!r}>".format(
+ class_name,
+ self.name,
+ hasattr(self, "_old") and repr(self._old) or "<UNSET>",
+ self._state,
+ self.tmpfile,
+ )
- # Helper context managers
+ def __repr__(self) -> str:
+ return "<{} {} _old={} _state={!r} tmpfile={!r}>".format(
+ self.__class__.__name__,
+ self.name,
+ hasattr(self, "_old") and repr(self._old) or "<UNSET>",
+ self._state,
+ self.tmpfile,
+ )
- @contextlib.contextmanager
- def global_and_fixture_disabled(self):
- """Context manager to temporarily disable global and current fixture capturing."""
- self.suspend()
- try:
- yield
- finally:
- self.resume()
+ def _assert_state(self, op: str, states: Tuple[str, ...]) -> None:
+ assert (
+ self._state in states
+ ), "cannot {} in state {!r}: expected one of {}".format(
+ op, self._state, ", ".join(states)
+ )
- @contextlib.contextmanager
- def item_capture(self, when, item):
- self.resume_global_capture()
- self.activate_fixture()
- try:
- yield
- finally:
- self.deactivate_fixture()
- self.suspend_global_capture(in_=False)
+ def start(self) -> None:
+ self._assert_state("start", ("initialized",))
+ setattr(sys, self.name, self.tmpfile)
+ self._state = "started"
- out, err = self.read_global_capture()
- item.add_report_section(when, "stdout", out)
- item.add_report_section(when, "stderr", err)
+ def snap(self):
+ self._assert_state("snap", ("started", "suspended"))
+ self.tmpfile.seek(0)
+ res = self.tmpfile.buffer.read()
+ self.tmpfile.seek(0)
+ self.tmpfile.truncate()
+ return res
- # Hooks
+ def done(self) -> None:
+ self._assert_state("done", ("initialized", "started", "suspended", "done"))
+ if self._state == "done":
+ return
+ setattr(sys, self.name, self._old)
+ del self._old
+ self.tmpfile.close()
+ self._state = "done"
- @pytest.hookimpl(hookwrapper=True)
- def pytest_make_collect_report(self, collector):
- if isinstance(collector, pytest.File):
- self.resume_global_capture()
- outcome = yield
- self.suspend_global_capture()
- out, err = self.read_global_capture()
- rep = outcome.get_result()
- if out:
- rep.sections.append(("Captured stdout", out))
- if err:
- rep.sections.append(("Captured stderr", err))
- else:
- yield
+ def suspend(self) -> None:
+ self._assert_state("suspend", ("started", "suspended"))
+ setattr(sys, self.name, self._old)
+ self._state = "suspended"
- @pytest.hookimpl(hookwrapper=True)
- def pytest_runtest_setup(self, item):
- with self.item_capture("setup", item):
- yield
+ def resume(self) -> None:
+ self._assert_state("resume", ("started", "suspended"))
+ if self._state == "started":
+ return
+ setattr(sys, self.name, self.tmpfile)
+ self._state = "started"
- @pytest.hookimpl(hookwrapper=True)
- def pytest_runtest_call(self, item):
- with self.item_capture("call", item):
- yield
+ def writeorg(self, data) -> None:
+ self._assert_state("writeorg", ("started", "suspended"))
+ self._old.flush()
+ self._old.buffer.write(data)
+ self._old.buffer.flush()
- @pytest.hookimpl(hookwrapper=True)
- def pytest_runtest_teardown(self, item):
- with self.item_capture("teardown", item):
- yield
- @pytest.hookimpl(tryfirst=True)
- def pytest_keyboard_interrupt(self, excinfo):
- self.stop_global_capturing()
+class SysCapture(SysCaptureBinary):
+ EMPTY_BUFFER = "" # type: ignore[assignment]
- @pytest.hookimpl(tryfirst=True)
- def pytest_internalerror(self, excinfo):
- self.stop_global_capturing()
+ def snap(self):
+ res = self.tmpfile.getvalue()
+ self.tmpfile.seek(0)
+ self.tmpfile.truncate()
+ return res
+ def writeorg(self, data):
+ self._assert_state("writeorg", ("started", "suspended"))
+ self._old.write(data)
+ self._old.flush()
-@pytest.fixture
-def capsys(request):
- """Enable text capturing of writes to ``sys.stdout`` and ``sys.stderr``.
- The captured output is made available via ``capsys.readouterr()`` method
- calls, which return a ``(out, err)`` namedtuple.
- ``out`` and ``err`` will be ``text`` objects.
+class FDCaptureBinary:
+ """Capture IO to/from a given OS-level file descriptor.
+
+ snap() produces `bytes`.
"""
- capman = request.config.pluginmanager.getplugin("capturemanager")
- with capman._capturing_for_request(request) as fixture:
- yield fixture
+ EMPTY_BUFFER = b""
-@pytest.fixture
-def capsysbinary(request):
- """Enable bytes capturing of writes to ``sys.stdout`` and ``sys.stderr``.
+ def __init__(self, targetfd: int) -> None:
+ self.targetfd = targetfd
- The captured output is made available via ``capsysbinary.readouterr()``
- method calls, which return a ``(out, err)`` namedtuple.
- ``out`` and ``err`` will be ``bytes`` objects.
- """
- capman = request.config.pluginmanager.getplugin("capturemanager")
- with capman._capturing_for_request(request) as fixture:
- yield fixture
+ try:
+ os.fstat(targetfd)
+ except OSError:
+ # FD capturing is conceptually simple -- create a temporary file,
+ # redirect the FD to it, redirect back when done. But when the
+ # target FD is invalid it throws a wrench into this loveley scheme.
+ #
+ # Tests themselves shouldn't care if the FD is valid, FD capturing
+ # should work regardless of external circumstances. So falling back
+ # to just sys capturing is not a good option.
+ #
+ # Further complications are the need to support suspend() and the
+ # possibility of FD reuse (e.g. the tmpfile getting the very same
+ # target FD). The following approach is robust, I believe.
+ self.targetfd_invalid: Optional[int] = os.open(os.devnull, os.O_RDWR)
+ os.dup2(self.targetfd_invalid, targetfd)
+ else:
+ self.targetfd_invalid = None
+ self.targetfd_save = os.dup(targetfd)
+ if targetfd == 0:
+ self.tmpfile = open(os.devnull)
+ self.syscapture = SysCapture(targetfd)
+ else:
+ self.tmpfile = EncodedFile(
+ TemporaryFile(buffering=0),
+ encoding="utf-8",
+ errors="replace",
+ newline="",
+ write_through=True,
+ )
+ if targetfd in patchsysdict:
+ self.syscapture = SysCapture(targetfd, self.tmpfile)
+ else:
+ self.syscapture = NoCapture()
-@pytest.fixture
-def capfd(request):
- """Enable text capturing of writes to file descriptors ``1`` and ``2``.
+ self._state = "initialized"
- The captured output is made available via ``capfd.readouterr()`` method
- calls, which return a ``(out, err)`` namedtuple.
- ``out`` and ``err`` will be ``text`` objects.
- """
- if not hasattr(os, "dup"):
- pytest.skip(
- "capfd fixture needs os.dup function which is not available in this system"
+ def __repr__(self) -> str:
+ return "<{} {} oldfd={} _state={!r} tmpfile={!r}>".format(
+ self.__class__.__name__,
+ self.targetfd,
+ self.targetfd_save,
+ self._state,
+ self.tmpfile,
)
- capman = request.config.pluginmanager.getplugin("capturemanager")
- with capman._capturing_for_request(request) as fixture:
- yield fixture
+ def _assert_state(self, op: str, states: Tuple[str, ...]) -> None:
+ assert (
+ self._state in states
+ ), "cannot {} in state {!r}: expected one of {}".format(
+ op, self._state, ", ".join(states)
+ )
-@pytest.fixture
-def capfdbinary(request):
- """Enable bytes capturing of writes to file descriptors ``1`` and ``2``.
+ def start(self) -> None:
+ """Start capturing on targetfd using memorized tmpfile."""
+ self._assert_state("start", ("initialized",))
+ os.dup2(self.tmpfile.fileno(), self.targetfd)
+ self.syscapture.start()
+ self._state = "started"
- The captured output is made available via ``capfd.readouterr()`` method
- calls, which return a ``(out, err)`` namedtuple.
- ``out`` and ``err`` will be ``byte`` objects.
- """
- if not hasattr(os, "dup"):
- pytest.skip(
- "capfdbinary fixture needs os.dup function which is not available in this system"
- )
- capman = request.config.pluginmanager.getplugin("capturemanager")
- with capman._capturing_for_request(request) as fixture:
- yield fixture
+ def snap(self):
+ self._assert_state("snap", ("started", "suspended"))
+ self.tmpfile.seek(0)
+ res = self.tmpfile.buffer.read()
+ self.tmpfile.seek(0)
+ self.tmpfile.truncate()
+ return res
+ def done(self) -> None:
+ """Stop capturing, restore streams, return original capture file,
+ seeked to position zero."""
+ self._assert_state("done", ("initialized", "started", "suspended", "done"))
+ if self._state == "done":
+ return
+ os.dup2(self.targetfd_save, self.targetfd)
+ os.close(self.targetfd_save)
+ if self.targetfd_invalid is not None:
+ if self.targetfd_invalid != self.targetfd:
+ os.close(self.targetfd)
+ os.close(self.targetfd_invalid)
+ self.syscapture.done()
+ self.tmpfile.close()
+ self._state = "done"
-class CaptureFixture:
- """
- Object returned by :py:func:`capsys`, :py:func:`capsysbinary`, :py:func:`capfd` and :py:func:`capfdbinary`
- fixtures.
+ def suspend(self) -> None:
+ self._assert_state("suspend", ("started", "suspended"))
+ if self._state == "suspended":
+ return
+ self.syscapture.suspend()
+ os.dup2(self.targetfd_save, self.targetfd)
+ self._state = "suspended"
+
+ def resume(self) -> None:
+ self._assert_state("resume", ("started", "suspended"))
+ if self._state == "started":
+ return
+ self.syscapture.resume()
+ os.dup2(self.tmpfile.fileno(), self.targetfd)
+ self._state = "started"
+
+ def writeorg(self, data):
+ """Write to original file descriptor."""
+ self._assert_state("writeorg", ("started", "suspended"))
+ os.write(self.targetfd_save, data)
+
+
+class FDCapture(FDCaptureBinary):
+ """Capture IO to/from a given OS-level file descriptor.
+
+ snap() produces text.
"""
- def __init__(self, captureclass, request):
- self.captureclass = captureclass
- self.request = request
- self._capture = None
- self._captured_out = self.captureclass.EMPTY_BUFFER
- self._captured_err = self.captureclass.EMPTY_BUFFER
+ # Ignore type because it doesn't match the type in the superclass (bytes).
+ EMPTY_BUFFER = "" # type: ignore
- def _start(self):
- if self._capture is None:
- self._capture = MultiCapture(
- out=True, err=True, in_=False, Capture=self.captureclass
- )
- self._capture.start_capturing()
+ def snap(self):
+ self._assert_state("snap", ("started", "suspended"))
+ self.tmpfile.seek(0)
+ res = self.tmpfile.read()
+ self.tmpfile.seek(0)
+ self.tmpfile.truncate()
+ return res
- def close(self):
- if self._capture is not None:
- out, err = self._capture.pop_outerr_to_orig()
- self._captured_out += out
- self._captured_err += err
- self._capture.stop_capturing()
- self._capture = None
+ def writeorg(self, data):
+ """Write to original file descriptor."""
+ super().writeorg(data.encode("utf-8")) # XXX use encoding of original stream
- def readouterr(self):
- """Read and return the captured output so far, resetting the internal buffer.
- :return: captured content as a namedtuple with ``out`` and ``err`` string attributes
- """
- captured_out, captured_err = self._captured_out, self._captured_err
- if self._capture is not None:
- out, err = self._capture.readouterr()
- captured_out += out
- captured_err += err
- self._captured_out = self.captureclass.EMPTY_BUFFER
- self._captured_err = self.captureclass.EMPTY_BUFFER
- return CaptureResult(captured_out, captured_err)
+# MultiCapture
- def _suspend(self):
- """Suspends this fixture's own capturing temporarily."""
- if self._capture is not None:
- self._capture.suspend_capturing()
- def _resume(self):
- """Resumes this fixture's own capturing temporarily."""
- if self._capture is not None:
- self._capture.resume_capturing()
+# This class was a namedtuple, but due to mypy limitation[0] it could not be
+# made generic, so was replaced by a regular class which tries to emulate the
+# pertinent parts of a namedtuple. If the mypy limitation is ever lifted, can
+# make it a namedtuple again.
+# [0]: https://github.com/python/mypy/issues/685
+@final
+@functools.total_ordering
+class CaptureResult(Generic[AnyStr]):
+ """The result of :method:`CaptureFixture.readouterr`."""
- @contextlib.contextmanager
- def disabled(self):
- """Temporarily disables capture while inside the 'with' block."""
- capmanager = self.request.config.pluginmanager.getplugin("capturemanager")
- with capmanager.global_and_fixture_disabled():
- yield
+ __slots__ = ("out", "err")
+ def __init__(self, out: AnyStr, err: AnyStr) -> None:
+ self.out: AnyStr = out
+ self.err: AnyStr = err
-def safe_text_dupfile(f, mode, default_encoding="UTF8"):
- """ return an open text file object that's a duplicate of f on the
- FD-level if possible.
- """
- encoding = getattr(f, "encoding", None)
- try:
- fd = f.fileno()
- except Exception:
- if "b" not in getattr(f, "mode", "") and hasattr(f, "encoding"):
- # we seem to have a text stream, let's just use it
- return f
- else:
- newfd = os.dup(fd)
- if "b" not in mode:
- mode += "b"
- f = os.fdopen(newfd, mode, 0) # no buffering
- return EncodedFile(f, encoding or default_encoding)
-
-
-class EncodedFile:
- errors = "strict" # possibly needed by py3 code (issue555)
-
- def __init__(self, buffer: BinaryIO, encoding: str) -> None:
- self.buffer = buffer
- self.encoding = encoding
+ def __len__(self) -> int:
+ return 2
- def write(self, s: str) -> int:
- if not isinstance(s, str):
- raise TypeError(
- "write() argument must be str, not {}".format(type(s).__name__)
- )
- return self.buffer.write(s.encode(self.encoding, "replace"))
+ def __iter__(self) -> Iterator[AnyStr]:
+ return iter((self.out, self.err))
- def writelines(self, lines: Iterable[str]) -> None:
- self.buffer.writelines(x.encode(self.encoding, "replace") for x in lines)
+ def __getitem__(self, item: int) -> AnyStr:
+ return tuple(self)[item]
- @property
- def name(self) -> str:
- """Ensure that file.name is a string."""
- return repr(self.buffer)
+ def _replace(
+ self, *, out: Optional[AnyStr] = None, err: Optional[AnyStr] = None
+ ) -> "CaptureResult[AnyStr]":
+ return CaptureResult(
+ out=self.out if out is None else out, err=self.err if err is None else err
+ )
- @property
- def mode(self) -> str:
- return self.buffer.mode.replace("b", "")
+ def count(self, value: AnyStr) -> int:
+ return tuple(self).count(value)
+
+ def index(self, value) -> int:
+ return tuple(self).index(value)
+
+ def __eq__(self, other: object) -> bool:
+ if not isinstance(other, (CaptureResult, tuple)):
+ return NotImplemented
+ return tuple(self) == tuple(other)
- def __getattr__(self, name):
- return getattr(object.__getattribute__(self, "buffer"), name)
+ def __hash__(self) -> int:
+ return hash(tuple(self))
+ def __lt__(self, other: object) -> bool:
+ if not isinstance(other, (CaptureResult, tuple)):
+ return NotImplemented
+ return tuple(self) < tuple(other)
-CaptureResult = collections.namedtuple("CaptureResult", ["out", "err"])
+ def __repr__(self) -> str:
+ return f"CaptureResult(out={self.out!r}, err={self.err!r})"
-class MultiCapture:
- out = err = in_ = None
+class MultiCapture(Generic[AnyStr]):
_state = None
_in_suspended = False
- def __init__(self, out=True, err=True, in_=True, Capture=None):
- if in_:
- self.in_ = Capture(0)
- if out:
- self.out = Capture(1)
- if err:
- self.err = Capture(2)
+ def __init__(self, in_, out, err) -> None:
+ self.in_ = in_
+ self.out = out
+ self.err = err
- def __repr__(self):
+ def __repr__(self) -> str:
return "<MultiCapture out={!r} err={!r} in_={!r} _state={!r} _in_suspended={!r}>".format(
self.out, self.err, self.in_, self._state, self._in_suspended,
)
- def start_capturing(self):
+ def start_capturing(self) -> None:
self._state = "started"
if self.in_:
self.in_.start()
@@ -469,8 +568,8 @@ class MultiCapture:
if self.err:
self.err.start()
- def pop_outerr_to_orig(self):
- """ pop current snapshot out/err capture and flush to orig streams. """
+ def pop_outerr_to_orig(self) -> Tuple[AnyStr, AnyStr]:
+ """Pop current snapshot out/err capture and flush to orig streams."""
out, err = self.readouterr()
if out:
self.out.writeorg(out)
@@ -478,7 +577,7 @@ class MultiCapture:
self.err.writeorg(err)
return out, err
- def suspend_capturing(self, in_=False):
+ def suspend_capturing(self, in_: bool = False) -> None:
self._state = "suspended"
if self.out:
self.out.suspend()
@@ -488,8 +587,8 @@ class MultiCapture:
self.in_.suspend()
self._in_suspended = True
- def resume_capturing(self):
- self._state = "resumed"
+ def resume_capturing(self) -> None:
+ self._state = "started"
if self.out:
self.out.resume()
if self.err:
@@ -498,8 +597,8 @@ class MultiCapture:
self.in_.resume()
self._in_suspended = False
- def stop_capturing(self):
- """ stop capturing and reset capturing streams """
+ def stop_capturing(self) -> None:
+ """Stop capturing and reset capturing streams."""
if self._state == "stopped":
raise ValueError("was already stopped")
self._state = "stopped"
@@ -510,7 +609,11 @@ class MultiCapture:
if self.in_:
self.in_.done()
- def readouterr(self) -> CaptureResult:
+ def is_started(self) -> bool:
+ """Whether actively capturing -- not suspended or stopped."""
+ return self._state == "started"
+
+ def readouterr(self) -> CaptureResult[AnyStr]:
if self.out:
out = self.out.snap()
else:
@@ -522,332 +625,343 @@ class MultiCapture:
return CaptureResult(out, err)
-class NoCapture:
- EMPTY_BUFFER = None
- __init__ = start = done = suspend = resume = lambda *args: None
+def _get_multicapture(method: "_CaptureMethod") -> MultiCapture[str]:
+ if method == "fd":
+ return MultiCapture(in_=FDCapture(0), out=FDCapture(1), err=FDCapture(2))
+ elif method == "sys":
+ return MultiCapture(in_=SysCapture(0), out=SysCapture(1), err=SysCapture(2))
+ elif method == "no":
+ return MultiCapture(in_=None, out=None, err=None)
+ elif method == "tee-sys":
+ return MultiCapture(
+ in_=None, out=SysCapture(1, tee=True), err=SysCapture(2, tee=True)
+ )
+ raise ValueError(f"unknown capturing method: {method!r}")
-class FDCaptureBinary:
- """Capture IO to/from a given os-level filedescriptor.
+# CaptureManager and CaptureFixture
- snap() produces `bytes`
- """
- EMPTY_BUFFER = b""
- _state = None
+class CaptureManager:
+ """The capture plugin.
- def __init__(self, targetfd, tmpfile=None):
- self.targetfd = targetfd
- try:
- self.targetfd_save = os.dup(self.targetfd)
- except OSError:
- self.start = lambda: None
- self.done = lambda: None
- else:
- self.start = self._start
- self.done = self._done
- if targetfd == 0:
- assert not tmpfile, "cannot set tmpfile with stdin"
- tmpfile = open(os.devnull, "r")
- self.syscapture = SysCapture(targetfd)
- else:
- if tmpfile is None:
- f = TemporaryFile()
- with f:
- tmpfile = safe_text_dupfile(f, mode="wb+")
- if targetfd in patchsysdict:
- self.syscapture = SysCapture(targetfd, tmpfile)
- else:
- self.syscapture = NoCapture()
- self.tmpfile = tmpfile
- self.tmpfile_fd = tmpfile.fileno()
-
- def __repr__(self):
- return "<{} {} oldfd={} _state={!r} tmpfile={}>".format(
- self.__class__.__name__,
- self.targetfd,
- getattr(self, "targetfd_save", "<UNSET>"),
- self._state,
- hasattr(self, "tmpfile") and repr(self.tmpfile) or "<UNSET>",
- )
+ Manages that the appropriate capture method is enabled/disabled during
+ collection and each test phase (setup, call, teardown). After each of
+ those points, the captured output is obtained and attached to the
+ collection/runtest report.
- def _start(self):
- """ Start capturing on targetfd using memorized tmpfile. """
- try:
- os.fstat(self.targetfd_save)
- except (AttributeError, OSError):
- raise ValueError("saved filedescriptor not valid anymore")
- os.dup2(self.tmpfile_fd, self.targetfd)
- self.syscapture.start()
- self._state = "started"
+ There are two levels of capture:
- def snap(self):
- self.tmpfile.seek(0)
- res = self.tmpfile.read()
- self.tmpfile.seek(0)
- self.tmpfile.truncate()
- return res
+ * global: enabled by default and can be suppressed by the ``-s``
+ option. This is always enabled/disabled during collection and each test
+ phase.
- def _done(self):
- """ stop capturing, restore streams, return original capture file,
- seeked to position zero. """
- targetfd_save = self.__dict__.pop("targetfd_save")
- os.dup2(targetfd_save, self.targetfd)
- os.close(targetfd_save)
- self.syscapture.done()
- self.tmpfile.close()
- self._state = "done"
-
- def suspend(self):
- self.syscapture.suspend()
- os.dup2(self.targetfd_save, self.targetfd)
- self._state = "suspended"
+ * fixture: when a test function or one of its fixture depend on the
+ ``capsys`` or ``capfd`` fixtures. In this case special handling is
+ needed to ensure the fixtures take precedence over the global capture.
+ """
- def resume(self):
- self.syscapture.resume()
- os.dup2(self.tmpfile_fd, self.targetfd)
- self._state = "resumed"
+ def __init__(self, method: "_CaptureMethod") -> None:
+ self._method = method
+ self._global_capturing: Optional[MultiCapture[str]] = None
+ self._capture_fixture: Optional[CaptureFixture[Any]] = None
- def writeorg(self, data):
- """ write to original file descriptor. """
- os.write(self.targetfd_save, data)
+ def __repr__(self) -> str:
+ return "<CaptureManager _method={!r} _global_capturing={!r} _capture_fixture={!r}>".format(
+ self._method, self._global_capturing, self._capture_fixture
+ )
+ def is_capturing(self) -> Union[str, bool]:
+ if self.is_globally_capturing():
+ return "global"
+ if self._capture_fixture:
+ return "fixture %s" % self._capture_fixture.request.fixturename
+ return False
-class FDCapture(FDCaptureBinary):
- """Capture IO to/from a given os-level filedescriptor.
+ # Global capturing control
- snap() produces text
- """
+ def is_globally_capturing(self) -> bool:
+ return self._method != "no"
- # Ignore type because it doesn't match the type in the superclass (bytes).
- EMPTY_BUFFER = str() # type: ignore
+ def start_global_capturing(self) -> None:
+ assert self._global_capturing is None
+ self._global_capturing = _get_multicapture(self._method)
+ self._global_capturing.start_capturing()
- def snap(self):
- res = super().snap()
- enc = getattr(self.tmpfile, "encoding", None)
- if enc and isinstance(res, bytes):
- res = str(res, enc, "replace")
- return res
+ def stop_global_capturing(self) -> None:
+ if self._global_capturing is not None:
+ self._global_capturing.pop_outerr_to_orig()
+ self._global_capturing.stop_capturing()
+ self._global_capturing = None
- def writeorg(self, data):
- """ write to original file descriptor. """
- data = data.encode("utf-8") # XXX use encoding of original stream
- os.write(self.targetfd_save, data)
+ def resume_global_capture(self) -> None:
+ # During teardown of the python process, and on rare occasions, capture
+ # attributes can be `None` while trying to resume global capture.
+ if self._global_capturing is not None:
+ self._global_capturing.resume_capturing()
+ def suspend_global_capture(self, in_: bool = False) -> None:
+ if self._global_capturing is not None:
+ self._global_capturing.suspend_capturing(in_=in_)
-class SysCaptureBinary:
+ def suspend(self, in_: bool = False) -> None:
+ # Need to undo local capsys-et-al if it exists before disabling global capture.
+ self.suspend_fixture()
+ self.suspend_global_capture(in_)
- EMPTY_BUFFER = b""
- _state = None
+ def resume(self) -> None:
+ self.resume_global_capture()
+ self.resume_fixture()
- def __init__(self, fd, tmpfile=None):
- name = patchsysdict[fd]
- self._old = getattr(sys, name)
- self.name = name
- if tmpfile is None:
- if name == "stdin":
- tmpfile = DontReadFromInput()
- else:
- tmpfile = CaptureIO()
- self.tmpfile = tmpfile
+ def read_global_capture(self) -> CaptureResult[str]:
+ assert self._global_capturing is not None
+ return self._global_capturing.readouterr()
- def __repr__(self):
- return "<{} {} _old={} _state={!r} tmpfile={!r}>".format(
- self.__class__.__name__,
- self.name,
- hasattr(self, "_old") and repr(self._old) or "<UNSET>",
- self._state,
- self.tmpfile,
- )
+ # Fixture Control
- def start(self):
- setattr(sys, self.name, self.tmpfile)
- self._state = "started"
+ def set_fixture(self, capture_fixture: "CaptureFixture[Any]") -> None:
+ if self._capture_fixture:
+ current_fixture = self._capture_fixture.request.fixturename
+ requested_fixture = capture_fixture.request.fixturename
+ capture_fixture.request.raiseerror(
+ "cannot use {} and {} at the same time".format(
+ requested_fixture, current_fixture
+ )
+ )
+ self._capture_fixture = capture_fixture
- def snap(self):
- res = self.tmpfile.buffer.getvalue()
- self.tmpfile.seek(0)
- self.tmpfile.truncate()
- return res
+ def unset_fixture(self) -> None:
+ self._capture_fixture = None
- def done(self):
- setattr(sys, self.name, self._old)
- del self._old
- self.tmpfile.close()
- self._state = "done"
+ def activate_fixture(self) -> None:
+ """If the current item is using ``capsys`` or ``capfd``, activate
+ them so they take precedence over the global capture."""
+ if self._capture_fixture:
+ self._capture_fixture._start()
- def suspend(self):
- setattr(sys, self.name, self._old)
- self._state = "suspended"
+ def deactivate_fixture(self) -> None:
+ """Deactivate the ``capsys`` or ``capfd`` fixture of this item, if any."""
+ if self._capture_fixture:
+ self._capture_fixture.close()
- def resume(self):
- setattr(sys, self.name, self.tmpfile)
- self._state = "resumed"
+ def suspend_fixture(self) -> None:
+ if self._capture_fixture:
+ self._capture_fixture._suspend()
- def writeorg(self, data):
- self._old.flush()
- self._old.buffer.write(data)
- self._old.buffer.flush()
+ def resume_fixture(self) -> None:
+ if self._capture_fixture:
+ self._capture_fixture._resume()
+ # Helper context managers
-class SysCapture(SysCaptureBinary):
- EMPTY_BUFFER = str() # type: ignore[assignment] # noqa: F821
+ @contextlib.contextmanager
+ def global_and_fixture_disabled(self) -> Generator[None, None, None]:
+ """Context manager to temporarily disable global and current fixture capturing."""
+ do_fixture = self._capture_fixture and self._capture_fixture._is_started()
+ if do_fixture:
+ self.suspend_fixture()
+ do_global = self._global_capturing and self._global_capturing.is_started()
+ if do_global:
+ self.suspend_global_capture()
+ try:
+ yield
+ finally:
+ if do_global:
+ self.resume_global_capture()
+ if do_fixture:
+ self.resume_fixture()
- def snap(self):
- res = self.tmpfile.getvalue()
- self.tmpfile.seek(0)
- self.tmpfile.truncate()
- return res
+ @contextlib.contextmanager
+ def item_capture(self, when: str, item: Item) -> Generator[None, None, None]:
+ self.resume_global_capture()
+ self.activate_fixture()
+ try:
+ yield
+ finally:
+ self.deactivate_fixture()
+ self.suspend_global_capture(in_=False)
- def writeorg(self, data):
- self._old.write(data)
- self._old.flush()
+ out, err = self.read_global_capture()
+ item.add_report_section(when, "stdout", out)
+ item.add_report_section(when, "stderr", err)
+ # Hooks
-class TeeSysCapture(SysCapture):
- def __init__(self, fd, tmpfile=None):
- name = patchsysdict[fd]
- self._old = getattr(sys, name)
- self.name = name
- if tmpfile is None:
- if name == "stdin":
- tmpfile = DontReadFromInput()
- else:
- tmpfile = CaptureAndPassthroughIO(self._old)
- self.tmpfile = tmpfile
+ @hookimpl(hookwrapper=True)
+ def pytest_make_collect_report(self, collector: Collector):
+ if isinstance(collector, File):
+ self.resume_global_capture()
+ outcome = yield
+ self.suspend_global_capture()
+ out, err = self.read_global_capture()
+ rep = outcome.get_result()
+ if out:
+ rep.sections.append(("Captured stdout", out))
+ if err:
+ rep.sections.append(("Captured stderr", err))
+ else:
+ yield
+ @hookimpl(hookwrapper=True)
+ def pytest_runtest_setup(self, item: Item) -> Generator[None, None, None]:
+ with self.item_capture("setup", item):
+ yield
-map_fixname_class = {
- "capfd": FDCapture,
- "capfdbinary": FDCaptureBinary,
- "capsys": SysCapture,
- "capsysbinary": SysCaptureBinary,
-}
+ @hookimpl(hookwrapper=True)
+ def pytest_runtest_call(self, item: Item) -> Generator[None, None, None]:
+ with self.item_capture("call", item):
+ yield
+ @hookimpl(hookwrapper=True)
+ def pytest_runtest_teardown(self, item: Item) -> Generator[None, None, None]:
+ with self.item_capture("teardown", item):
+ yield
-class DontReadFromInput:
- encoding = None
+ @hookimpl(tryfirst=True)
+ def pytest_keyboard_interrupt(self) -> None:
+ self.stop_global_capturing()
- def read(self, *args):
- raise IOError(
- "pytest: reading from stdin while output is captured! Consider using `-s`."
- )
+ @hookimpl(tryfirst=True)
+ def pytest_internalerror(self) -> None:
+ self.stop_global_capturing()
- readline = read
- readlines = read
- __next__ = read
- def __iter__(self):
- return self
+class CaptureFixture(Generic[AnyStr]):
+ """Object returned by the :fixture:`capsys`, :fixture:`capsysbinary`,
+ :fixture:`capfd` and :fixture:`capfdbinary` fixtures."""
- def fileno(self):
- raise UnsupportedOperation("redirected stdin is pseudofile, has no fileno()")
+ def __init__(
+ self, captureclass, request: SubRequest, *, _ispytest: bool = False
+ ) -> None:
+ check_ispytest(_ispytest)
+ self.captureclass = captureclass
+ self.request = request
+ self._capture: Optional[MultiCapture[AnyStr]] = None
+ self._captured_out = self.captureclass.EMPTY_BUFFER
+ self._captured_err = self.captureclass.EMPTY_BUFFER
- def isatty(self):
- return False
+ def _start(self) -> None:
+ if self._capture is None:
+ self._capture = MultiCapture(
+ in_=None, out=self.captureclass(1), err=self.captureclass(2),
+ )
+ self._capture.start_capturing()
- def close(self):
- pass
+ def close(self) -> None:
+ if self._capture is not None:
+ out, err = self._capture.pop_outerr_to_orig()
+ self._captured_out += out
+ self._captured_err += err
+ self._capture.stop_capturing()
+ self._capture = None
- @property
- def buffer(self):
- return self
+ def readouterr(self) -> CaptureResult[AnyStr]:
+ """Read and return the captured output so far, resetting the internal
+ buffer.
+ :returns:
+ The captured content as a namedtuple with ``out`` and ``err``
+ string attributes.
+ """
+ captured_out, captured_err = self._captured_out, self._captured_err
+ if self._capture is not None:
+ out, err = self._capture.readouterr()
+ captured_out += out
+ captured_err += err
+ self._captured_out = self.captureclass.EMPTY_BUFFER
+ self._captured_err = self.captureclass.EMPTY_BUFFER
+ return CaptureResult(captured_out, captured_err)
-def _colorama_workaround():
- """
- Ensure colorama is imported so that it attaches to the correct stdio
- handles on Windows.
+ def _suspend(self) -> None:
+ """Suspend this fixture's own capturing temporarily."""
+ if self._capture is not None:
+ self._capture.suspend_capturing()
- colorama uses the terminal on import time. So if something does the
- first import of colorama while I/O capture is active, colorama will
- fail in various ways.
- """
- if sys.platform.startswith("win32"):
- try:
- import colorama # noqa: F401
- except ImportError:
- pass
+ def _resume(self) -> None:
+ """Resume this fixture's own capturing temporarily."""
+ if self._capture is not None:
+ self._capture.resume_capturing()
+ def _is_started(self) -> bool:
+ """Whether actively capturing -- not disabled or closed."""
+ if self._capture is not None:
+ return self._capture.is_started()
+ return False
-def _readline_workaround():
- """
- Ensure readline is imported so that it attaches to the correct stdio
- handles on Windows.
+ @contextlib.contextmanager
+ def disabled(self) -> Generator[None, None, None]:
+ """Temporarily disable capturing while inside the ``with`` block."""
+ capmanager = self.request.config.pluginmanager.getplugin("capturemanager")
+ with capmanager.global_and_fixture_disabled():
+ yield
- Pdb uses readline support where available--when not running from the Python
- prompt, the readline module is not imported until running the pdb REPL. If
- running pytest with the --pdb option this means the readline module is not
- imported until after I/O capture has been started.
- This is a problem for pyreadline, which is often used to implement readline
- support on Windows, as it does not attach to the correct handles for stdout
- and/or stdin if they have been redirected by the FDCapture mechanism. This
- workaround ensures that readline is imported before I/O capture is setup so
- that it can attach to the actual stdin/out for the console.
+# The fixtures.
- See https://github.com/pytest-dev/pytest/pull/1281
- """
- if sys.platform.startswith("win32"):
- try:
- import readline # noqa: F401
- except ImportError:
- pass
+@fixture
+def capsys(request: SubRequest) -> Generator[CaptureFixture[str], None, None]:
+ """Enable text capturing of writes to ``sys.stdout`` and ``sys.stderr``.
-def _py36_windowsconsoleio_workaround(stream):
+ The captured output is made available via ``capsys.readouterr()`` method
+ calls, which return a ``(out, err)`` namedtuple.
+ ``out`` and ``err`` will be ``text`` objects.
"""
- Python 3.6 implemented unicode console handling for Windows. This works
- by reading/writing to the raw console handle using
- ``{Read,Write}ConsoleW``.
-
- The problem is that we are going to ``dup2`` over the stdio file
- descriptors when doing ``FDCapture`` and this will ``CloseHandle`` the
- handles used by Python to write to the console. Though there is still some
- weirdness and the console handle seems to only be closed randomly and not
- on the first call to ``CloseHandle``, or maybe it gets reopened with the
- same handle value when we suspend capturing.
+ capman = request.config.pluginmanager.getplugin("capturemanager")
+ capture_fixture = CaptureFixture[str](SysCapture, request, _ispytest=True)
+ capman.set_fixture(capture_fixture)
+ capture_fixture._start()
+ yield capture_fixture
+ capture_fixture.close()
+ capman.unset_fixture()
- The workaround in this case will reopen stdio with a different fd which
- also means a different handle by replicating the logic in
- "Py_lifecycle.c:initstdio/create_stdio".
- :param stream: in practice ``sys.stdout`` or ``sys.stderr``, but given
- here as parameter for unittesting purposes.
+@fixture
+def capsysbinary(request: SubRequest) -> Generator[CaptureFixture[bytes], None, None]:
+ """Enable bytes capturing of writes to ``sys.stdout`` and ``sys.stderr``.
- See https://github.com/pytest-dev/py/issues/103
+ The captured output is made available via ``capsysbinary.readouterr()``
+ method calls, which return a ``(out, err)`` namedtuple.
+ ``out`` and ``err`` will be ``bytes`` objects.
"""
- if (
- not sys.platform.startswith("win32")
- or sys.version_info[:2] < (3, 6)
- or hasattr(sys, "pypy_version_info")
- ):
- return
+ capman = request.config.pluginmanager.getplugin("capturemanager")
+ capture_fixture = CaptureFixture[bytes](SysCaptureBinary, request, _ispytest=True)
+ capman.set_fixture(capture_fixture)
+ capture_fixture._start()
+ yield capture_fixture
+ capture_fixture.close()
+ capman.unset_fixture()
- # bail out if ``stream`` doesn't seem like a proper ``io`` stream (#2666)
- if not hasattr(stream, "buffer"):
- return
- buffered = hasattr(stream.buffer, "raw")
- raw_stdout = stream.buffer.raw if buffered else stream.buffer
+@fixture
+def capfd(request: SubRequest) -> Generator[CaptureFixture[str], None, None]:
+ """Enable text capturing of writes to file descriptors ``1`` and ``2``.
- if not isinstance(raw_stdout, io._WindowsConsoleIO):
- return
+ The captured output is made available via ``capfd.readouterr()`` method
+ calls, which return a ``(out, err)`` namedtuple.
+ ``out`` and ``err`` will be ``text`` objects.
+ """
+ capman = request.config.pluginmanager.getplugin("capturemanager")
+ capture_fixture = CaptureFixture[str](FDCapture, request, _ispytest=True)
+ capman.set_fixture(capture_fixture)
+ capture_fixture._start()
+ yield capture_fixture
+ capture_fixture.close()
+ capman.unset_fixture()
- def _reopen_stdio(f, mode):
- if not buffered and mode[0] == "w":
- buffering = 0
- else:
- buffering = -1
- return io.TextIOWrapper(
- open(os.dup(f.fileno()), mode, buffering),
- f.encoding,
- f.errors,
- f.newlines,
- f.line_buffering,
- )
+@fixture
+def capfdbinary(request: SubRequest) -> Generator[CaptureFixture[bytes], None, None]:
+ """Enable bytes capturing of writes to file descriptors ``1`` and ``2``.
- sys.stdin = _reopen_stdio(sys.stdin, "rb")
- sys.stdout = _reopen_stdio(sys.stdout, "wb")
- sys.stderr = _reopen_stdio(sys.stderr, "wb")
+ The captured output is made available via ``capfd.readouterr()`` method
+ calls, which return a ``(out, err)`` namedtuple.
+ ``out`` and ``err`` will be ``byte`` objects.
+ """
+ capman = request.config.pluginmanager.getplugin("capturemanager")
+ capture_fixture = CaptureFixture[bytes](FDCaptureBinary, request, _ispytest=True)
+ capman.set_fixture(capture_fixture)
+ capture_fixture._start()
+ yield capture_fixture
+ capture_fixture.close()
+ capman.unset_fixture()