aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/pytest/py3/_pytest/capture.py
diff options
context:
space:
mode:
authordeshevoy <deshevoy@yandex-team.ru>2022-02-10 16:46:57 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:46:57 +0300
commit28148f76dbfcc644d96427d41c92f36cbf2fdc6e (patch)
treeb83306b6e37edeea782e9eed673d89286c4fef35 /contrib/python/pytest/py3/_pytest/capture.py
parente988f30484abe5fdeedcc7a5d3c226c01a21800c (diff)
downloadydb-28148f76dbfcc644d96427d41c92f36cbf2fdc6e.tar.gz
Restoring authorship annotation for <deshevoy@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'contrib/python/pytest/py3/_pytest/capture.py')
-rw-r--r--contrib/python/pytest/py3/_pytest/capture.py402
1 files changed, 201 insertions, 201 deletions
diff --git a/contrib/python/pytest/py3/_pytest/capture.py b/contrib/python/pytest/py3/_pytest/capture.py
index a94bebe8b1..086302658c 100644
--- a/contrib/python/pytest/py3/_pytest/capture.py
+++ b/contrib/python/pytest/py3/_pytest/capture.py
@@ -1,11 +1,11 @@
"""Per-test stdout/stderr capturing mechanism."""
-import contextlib
+import contextlib
import functools
-import io
-import os
-import sys
-from io import UnsupportedOperation
-from tempfile import TemporaryFile
+import io
+import os
+import sys
+from io import UnsupportedOperation
+from tempfile import TemporaryFile
from typing import Any
from typing import AnyStr
from typing import Generator
@@ -16,7 +16,7 @@ from typing import TextIO
from typing import Tuple
from typing import TYPE_CHECKING
from typing import Union
-
+
from _pytest.compat import final
from _pytest.config import Config
from _pytest.config import hookimpl
@@ -27,32 +27,32 @@ from _pytest.fixtures import SubRequest
from _pytest.nodes import Collector
from _pytest.nodes import File
from _pytest.nodes import Item
-
+
if TYPE_CHECKING:
from typing_extensions import Literal
_CaptureMethod = Literal["fd", "sys", "no", "tee-sys"]
-
+
def pytest_addoption(parser: Parser) -> None:
- group = parser.getgroup("general")
- group._addoption(
- "--capture",
- action="store",
+ group = parser.getgroup("general")
+ group._addoption(
+ "--capture",
+ action="store",
default="fd",
- metavar="method",
+ metavar="method",
choices=["fd", "sys", "no", "tee-sys"],
help="per-test capturing method: one of fd|sys|no|tee-sys.",
- )
- group._addoption(
- "-s",
- action="store_const",
- const="no",
- dest="capture",
- help="shortcut for --capture=no.",
- )
-
-
+ )
+ group._addoption(
+ "-s",
+ action="store_const",
+ const="no",
+ dest="capture",
+ help="shortcut for --capture=no.",
+ )
+
+
def _colorama_workaround() -> None:
"""Ensure colorama is imported so that it attaches to the correct stdio
handles on Windows.
@@ -150,46 +150,46 @@ def _py36_windowsconsoleio_workaround(stream: TextIO) -> None:
@hookimpl(hookwrapper=True)
def pytest_load_initial_conftests(early_config: Config):
- ns = early_config.known_args_namespace
- if ns.capture == "fd":
- _py36_windowsconsoleio_workaround(sys.stdout)
- _colorama_workaround()
- _readline_workaround()
- pluginmanager = early_config.pluginmanager
- capman = CaptureManager(ns.capture)
- pluginmanager.register(capman, "capturemanager")
-
+ ns = early_config.known_args_namespace
+ if ns.capture == "fd":
+ _py36_windowsconsoleio_workaround(sys.stdout)
+ _colorama_workaround()
+ _readline_workaround()
+ pluginmanager = early_config.pluginmanager
+ capman = CaptureManager(ns.capture)
+ pluginmanager.register(capman, "capturemanager")
+
# Make sure that capturemanager is properly reset at final shutdown.
- early_config.add_cleanup(capman.stop_global_capturing)
-
+ early_config.add_cleanup(capman.stop_global_capturing)
+
# Finally trigger conftest loading but while capturing (issue #93).
- capman.start_global_capturing()
- outcome = yield
- capman.suspend_global_capture()
- if outcome.excinfo is not None:
- out, err = capman.read_global_capture()
- sys.stdout.write(out)
- sys.stderr.write(err)
-
-
+ capman.start_global_capturing()
+ outcome = yield
+ capman.suspend_global_capture()
+ if outcome.excinfo is not None:
+ out, err = capman.read_global_capture()
+ sys.stdout.write(out)
+ sys.stderr.write(err)
+
+
# IO Helpers.
class EncodedFile(io.TextIOWrapper):
__slots__ = ()
-
+
@property
def name(self) -> str:
# Ensure that file.name is a string. Workaround for a Python bug
# fixed in >=3.7.4: https://bugs.python.org/issue36015
return repr(self.buffer)
-
+
@property
def mode(self) -> str:
# TextIOWrapper doesn't expose a mode, but at least some of our
# tests check it.
return self.buffer.mode.replace("b", "")
-
+
class CaptureIO(io.TextIOWrapper):
def __init__(self) -> None:
@@ -224,36 +224,36 @@ class DontReadFromInput:
def __iter__(self):
return self
-
+
def fileno(self) -> int:
raise UnsupportedOperation("redirected stdin is pseudofile, has no fileno()")
-
+
def isatty(self) -> bool:
return False
-
+
def close(self) -> None:
pass
-
+
@property
def buffer(self):
return self
-
-
+
+
# Capture classes.
patchsysdict = {0: "stdin", 1: "stdout", 2: "stderr"}
-
-
+
+
class NoCapture:
EMPTY_BUFFER = None
__init__ = start = done = suspend = resume = lambda *args: None
class SysCaptureBinary:
-
+
EMPTY_BUFFER = b""
-
+
def __init__(self, fd: int, tmpfile=None, *, tee: bool = False) -> None:
name = patchsysdict[fd]
self._old = getattr(sys, name)
@@ -265,7 +265,7 @@ class SysCaptureBinary:
tmpfile = CaptureIO() if not tee else TeeCaptureIO(self._old)
self.tmpfile = tmpfile
self._state = "initialized"
-
+
def repr(self, class_name: str) -> str:
return "<{} {} _old={} _state={!r} tmpfile={!r}>".format(
class_name,
@@ -274,7 +274,7 @@ class SysCaptureBinary:
self._state,
self.tmpfile,
)
-
+
def __repr__(self) -> str:
return "<{} {} _old={} _state={!r} tmpfile={!r}>".format(
self.__class__.__name__,
@@ -283,19 +283,19 @@ class SysCaptureBinary:
self._state,
self.tmpfile,
)
-
+
def _assert_state(self, op: str, states: Tuple[str, ...]) -> None:
assert (
self._state in states
), "cannot {} in state {!r}: expected one of {}".format(
op, self._state, ", ".join(states)
)
-
+
def start(self) -> None:
self._assert_state("start", ("initialized",))
setattr(sys, self.name, self.tmpfile)
self._state = "started"
-
+
def snap(self):
self._assert_state("snap", ("started", "suspended"))
self.tmpfile.seek(0)
@@ -303,7 +303,7 @@ class SysCaptureBinary:
self.tmpfile.seek(0)
self.tmpfile.truncate()
return res
-
+
def done(self) -> None:
self._assert_state("done", ("initialized", "started", "suspended", "done"))
if self._state == "done":
@@ -312,49 +312,49 @@ class SysCaptureBinary:
del self._old
self.tmpfile.close()
self._state = "done"
-
+
def suspend(self) -> None:
self._assert_state("suspend", ("started", "suspended"))
setattr(sys, self.name, self._old)
self._state = "suspended"
-
+
def resume(self) -> None:
self._assert_state("resume", ("started", "suspended"))
if self._state == "started":
return
setattr(sys, self.name, self.tmpfile)
self._state = "started"
-
+
def writeorg(self, data) -> None:
self._assert_state("writeorg", ("started", "suspended"))
self._old.flush()
self._old.buffer.write(data)
self._old.buffer.flush()
-
-
+
+
class SysCapture(SysCaptureBinary):
EMPTY_BUFFER = "" # type: ignore[assignment]
-
+
def snap(self):
res = self.tmpfile.getvalue()
self.tmpfile.seek(0)
self.tmpfile.truncate()
return res
-
+
def writeorg(self, data):
self._assert_state("writeorg", ("started", "suspended"))
self._old.write(data)
self._old.flush()
-
+
class FDCaptureBinary:
"""Capture IO to/from a given OS-level file descriptor.
snap() produces `bytes`.
- """
-
+ """
+
EMPTY_BUFFER = b""
-
+
def __init__(self, targetfd: int) -> None:
self.targetfd = targetfd
@@ -377,7 +377,7 @@ class FDCaptureBinary:
else:
self.targetfd_invalid = None
self.targetfd_save = os.dup(targetfd)
-
+
if targetfd == 0:
self.tmpfile = open(os.devnull)
self.syscapture = SysCapture(targetfd)
@@ -393,7 +393,7 @@ class FDCaptureBinary:
self.syscapture = SysCapture(targetfd, self.tmpfile)
else:
self.syscapture = NoCapture()
-
+
self._state = "initialized"
def __repr__(self) -> str:
@@ -403,15 +403,15 @@ class FDCaptureBinary:
self.targetfd_save,
self._state,
self.tmpfile,
- )
-
+ )
+
def _assert_state(self, op: str, states: Tuple[str, ...]) -> None:
assert (
self._state in states
), "cannot {} in state {!r}: expected one of {}".format(
op, self._state, ", ".join(states)
)
-
+
def start(self) -> None:
"""Start capturing on targetfd using memorized tmpfile."""
self._assert_state("start", ("initialized",))
@@ -426,7 +426,7 @@ class FDCaptureBinary:
self.tmpfile.seek(0)
self.tmpfile.truncate()
return res
-
+
def done(self) -> None:
"""Stop capturing, restore streams, return original capture file,
seeked to position zero."""
@@ -442,7 +442,7 @@ class FDCaptureBinary:
self.syscapture.done()
self.tmpfile.close()
self._state = "done"
-
+
def suspend(self) -> None:
self._assert_state("suspend", ("started", "suspended"))
if self._state == "suspended":
@@ -469,11 +469,11 @@ class FDCapture(FDCaptureBinary):
"""Capture IO to/from a given OS-level file descriptor.
snap() produces text.
- """
-
+ """
+
# Ignore type because it doesn't match the type in the superclass (bytes).
EMPTY_BUFFER = "" # type: ignore
-
+
def snap(self):
self._assert_state("snap", ("started", "suspended"))
self.tmpfile.seek(0)
@@ -481,15 +481,15 @@ class FDCapture(FDCaptureBinary):
self.tmpfile.seek(0)
self.tmpfile.truncate()
return res
-
+
def writeorg(self, data):
"""Write to original file descriptor."""
super().writeorg(data.encode("utf-8")) # XXX use encoding of original stream
-
-
+
+
# MultiCapture
-
-
+
+
# This class was a namedtuple, but due to mypy limitation[0] it could not be
# made generic, so was replaced by a regular class which tries to emulate the
# pertinent parts of a namedtuple. If the mypy limitation is ever lifted, can
@@ -499,40 +499,40 @@ class FDCapture(FDCaptureBinary):
@functools.total_ordering
class CaptureResult(Generic[AnyStr]):
"""The result of :method:`CaptureFixture.readouterr`."""
-
+
__slots__ = ("out", "err")
-
+
def __init__(self, out: AnyStr, err: AnyStr) -> None:
self.out: AnyStr = out
self.err: AnyStr = err
-
+
def __len__(self) -> int:
return 2
-
+
def __iter__(self) -> Iterator[AnyStr]:
return iter((self.out, self.err))
-
+
def __getitem__(self, item: int) -> AnyStr:
return tuple(self)[item]
-
+
def _replace(
self, *, out: Optional[AnyStr] = None, err: Optional[AnyStr] = None
) -> "CaptureResult[AnyStr]":
return CaptureResult(
out=self.out if out is None else out, err=self.err if err is None else err
)
-
+
def count(self, value: AnyStr) -> int:
return tuple(self).count(value)
-
+
def index(self, value) -> int:
return tuple(self).index(value)
-
+
def __eq__(self, other: object) -> bool:
if not isinstance(other, (CaptureResult, tuple)):
return NotImplemented
return tuple(self) == tuple(other)
-
+
def __hash__(self) -> int:
return hash(tuple(self))
@@ -540,20 +540,20 @@ class CaptureResult(Generic[AnyStr]):
if not isinstance(other, (CaptureResult, tuple)):
return NotImplemented
return tuple(self) < tuple(other)
-
+
def __repr__(self) -> str:
return f"CaptureResult(out={self.out!r}, err={self.err!r})"
-
-
+
+
class MultiCapture(Generic[AnyStr]):
_state = None
_in_suspended = False
-
+
def __init__(self, in_, out, err) -> None:
self.in_ = in_
self.out = out
self.err = err
-
+
def __repr__(self) -> str:
return "<MultiCapture out={!r} err={!r} in_={!r} _state={!r} _in_suspended={!r}>".format(
self.out, self.err, self.in_, self._state, self._in_suspended,
@@ -561,54 +561,54 @@ class MultiCapture(Generic[AnyStr]):
def start_capturing(self) -> None:
self._state = "started"
- if self.in_:
- self.in_.start()
- if self.out:
- self.out.start()
- if self.err:
- self.err.start()
-
+ if self.in_:
+ self.in_.start()
+ if self.out:
+ self.out.start()
+ if self.err:
+ self.err.start()
+
def pop_outerr_to_orig(self) -> Tuple[AnyStr, AnyStr]:
"""Pop current snapshot out/err capture and flush to orig streams."""
- out, err = self.readouterr()
- if out:
- self.out.writeorg(out)
- if err:
- self.err.writeorg(err)
- return out, err
-
+ out, err = self.readouterr()
+ if out:
+ self.out.writeorg(out)
+ if err:
+ self.err.writeorg(err)
+ return out, err
+
def suspend_capturing(self, in_: bool = False) -> None:
self._state = "suspended"
- if self.out:
- self.out.suspend()
- if self.err:
- self.err.suspend()
- if in_ and self.in_:
- self.in_.suspend()
- self._in_suspended = True
-
+ if self.out:
+ self.out.suspend()
+ if self.err:
+ self.err.suspend()
+ if in_ and self.in_:
+ self.in_.suspend()
+ self._in_suspended = True
+
def resume_capturing(self) -> None:
self._state = "started"
- if self.out:
- self.out.resume()
- if self.err:
- self.err.resume()
+ if self.out:
+ self.out.resume()
+ if self.err:
+ self.err.resume()
if self._in_suspended:
- self.in_.resume()
+ self.in_.resume()
self._in_suspended = False
-
+
def stop_capturing(self) -> None:
"""Stop capturing and reset capturing streams."""
if self._state == "stopped":
- raise ValueError("was already stopped")
+ raise ValueError("was already stopped")
self._state = "stopped"
- if self.out:
- self.out.done()
- if self.err:
- self.err.done()
- if self.in_:
- self.in_.done()
-
+ if self.out:
+ self.out.done()
+ if self.err:
+ self.err.done()
+ if self.in_:
+ self.in_.done()
+
def is_started(self) -> bool:
"""Whether actively capturing -- not suspended or stopped."""
return self._state == "started"
@@ -623,8 +623,8 @@ class MultiCapture(Generic[AnyStr]):
else:
err = ""
return CaptureResult(out, err)
-
-
+
+
def _get_multicapture(method: "_CaptureMethod") -> MultiCapture[str]:
if method == "fd":
return MultiCapture(in_=FDCapture(0), out=FDCapture(1), err=FDCapture(2))
@@ -637,88 +637,88 @@ def _get_multicapture(method: "_CaptureMethod") -> MultiCapture[str]:
in_=None, out=SysCapture(1, tee=True), err=SysCapture(2, tee=True)
)
raise ValueError(f"unknown capturing method: {method!r}")
-
-
+
+
# CaptureManager and CaptureFixture
-
-
+
+
class CaptureManager:
"""The capture plugin.
-
+
Manages that the appropriate capture method is enabled/disabled during
collection and each test phase (setup, call, teardown). After each of
those points, the captured output is obtained and attached to the
collection/runtest report.
-
+
There are two levels of capture:
-
+
* global: enabled by default and can be suppressed by the ``-s``
option. This is always enabled/disabled during collection and each test
phase.
-
+
* fixture: when a test function or one of its fixture depend on the
``capsys`` or ``capfd`` fixtures. In this case special handling is
needed to ensure the fixtures take precedence over the global capture.
"""
-
+
def __init__(self, method: "_CaptureMethod") -> None:
self._method = method
self._global_capturing: Optional[MultiCapture[str]] = None
self._capture_fixture: Optional[CaptureFixture[Any]] = None
-
+
def __repr__(self) -> str:
return "<CaptureManager _method={!r} _global_capturing={!r} _capture_fixture={!r}>".format(
self._method, self._global_capturing, self._capture_fixture
)
-
+
def is_capturing(self) -> Union[str, bool]:
if self.is_globally_capturing():
return "global"
if self._capture_fixture:
return "fixture %s" % self._capture_fixture.request.fixturename
return False
-
+
# Global capturing control
-
+
def is_globally_capturing(self) -> bool:
return self._method != "no"
-
+
def start_global_capturing(self) -> None:
assert self._global_capturing is None
self._global_capturing = _get_multicapture(self._method)
self._global_capturing.start_capturing()
-
+
def stop_global_capturing(self) -> None:
if self._global_capturing is not None:
self._global_capturing.pop_outerr_to_orig()
self._global_capturing.stop_capturing()
self._global_capturing = None
-
+
def resume_global_capture(self) -> None:
# During teardown of the python process, and on rare occasions, capture
# attributes can be `None` while trying to resume global capture.
if self._global_capturing is not None:
self._global_capturing.resume_capturing()
-
+
def suspend_global_capture(self, in_: bool = False) -> None:
if self._global_capturing is not None:
self._global_capturing.suspend_capturing(in_=in_)
-
+
def suspend(self, in_: bool = False) -> None:
# Need to undo local capsys-et-al if it exists before disabling global capture.
self.suspend_fixture()
self.suspend_global_capture(in_)
-
+
def resume(self) -> None:
self.resume_global_capture()
self.resume_fixture()
-
+
def read_global_capture(self) -> CaptureResult[str]:
assert self._global_capturing is not None
return self._global_capturing.readouterr()
# Fixture Control
-
+
def set_fixture(self, capture_fixture: "CaptureFixture[Any]") -> None:
if self._capture_fixture:
current_fixture = self._capture_fixture.request.fixturename
@@ -729,7 +729,7 @@ class CaptureManager:
)
)
self._capture_fixture = capture_fixture
-
+
def unset_fixture(self) -> None:
self._capture_fixture = None
@@ -738,22 +738,22 @@ class CaptureManager:
them so they take precedence over the global capture."""
if self._capture_fixture:
self._capture_fixture._start()
-
+
def deactivate_fixture(self) -> None:
"""Deactivate the ``capsys`` or ``capfd`` fixture of this item, if any."""
if self._capture_fixture:
self._capture_fixture.close()
-
+
def suspend_fixture(self) -> None:
if self._capture_fixture:
self._capture_fixture._suspend()
-
+
def resume_fixture(self) -> None:
if self._capture_fixture:
self._capture_fixture._resume()
-
+
# Helper context managers
-
+
@contextlib.contextmanager
def global_and_fixture_disabled(self) -> Generator[None, None, None]:
"""Context manager to temporarily disable global and current fixture capturing."""
@@ -770,7 +770,7 @@ class CaptureManager:
self.resume_global_capture()
if do_fixture:
self.resume_fixture()
-
+
@contextlib.contextmanager
def item_capture(self, when: str, item: Item) -> Generator[None, None, None]:
self.resume_global_capture()
@@ -780,13 +780,13 @@ class CaptureManager:
finally:
self.deactivate_fixture()
self.suspend_global_capture(in_=False)
-
+
out, err = self.read_global_capture()
item.add_report_section(when, "stdout", out)
item.add_report_section(when, "stderr", err)
-
+
# Hooks
-
+
@hookimpl(hookwrapper=True)
def pytest_make_collect_report(self, collector: Collector):
if isinstance(collector, File):
@@ -801,12 +801,12 @@ class CaptureManager:
rep.sections.append(("Captured stderr", err))
else:
yield
-
+
@hookimpl(hookwrapper=True)
def pytest_runtest_setup(self, item: Item) -> Generator[None, None, None]:
with self.item_capture("setup", item):
yield
-
+
@hookimpl(hookwrapper=True)
def pytest_runtest_call(self, item: Item) -> Generator[None, None, None]:
with self.item_capture("call", item):
@@ -825,11 +825,11 @@ class CaptureManager:
def pytest_internalerror(self) -> None:
self.stop_global_capturing()
-
+
class CaptureFixture(Generic[AnyStr]):
"""Object returned by the :fixture:`capsys`, :fixture:`capsysbinary`,
:fixture:`capfd` and :fixture:`capfdbinary` fixtures."""
-
+
def __init__(
self, captureclass, request: SubRequest, *, _ispytest: bool = False
) -> None:
@@ -839,14 +839,14 @@ class CaptureFixture(Generic[AnyStr]):
self._capture: Optional[MultiCapture[AnyStr]] = None
self._captured_out = self.captureclass.EMPTY_BUFFER
self._captured_err = self.captureclass.EMPTY_BUFFER
-
+
def _start(self) -> None:
if self._capture is None:
self._capture = MultiCapture(
in_=None, out=self.captureclass(1), err=self.captureclass(2),
)
self._capture.start_capturing()
-
+
def close(self) -> None:
if self._capture is not None:
out, err = self._capture.pop_outerr_to_orig()
@@ -854,11 +854,11 @@ class CaptureFixture(Generic[AnyStr]):
self._captured_err += err
self._capture.stop_capturing()
self._capture = None
-
+
def readouterr(self) -> CaptureResult[AnyStr]:
"""Read and return the captured output so far, resetting the internal
buffer.
-
+
:returns:
The captured content as a namedtuple with ``out`` and ``err``
string attributes.
@@ -871,42 +871,42 @@ class CaptureFixture(Generic[AnyStr]):
self._captured_out = self.captureclass.EMPTY_BUFFER
self._captured_err = self.captureclass.EMPTY_BUFFER
return CaptureResult(captured_out, captured_err)
-
+
def _suspend(self) -> None:
"""Suspend this fixture's own capturing temporarily."""
if self._capture is not None:
self._capture.suspend_capturing()
-
+
def _resume(self) -> None:
"""Resume this fixture's own capturing temporarily."""
if self._capture is not None:
self._capture.resume_capturing()
-
+
def _is_started(self) -> bool:
"""Whether actively capturing -- not disabled or closed."""
if self._capture is not None:
return self._capture.is_started()
return False
-
+
@contextlib.contextmanager
def disabled(self) -> Generator[None, None, None]:
"""Temporarily disable capturing while inside the ``with`` block."""
capmanager = self.request.config.pluginmanager.getplugin("capturemanager")
with capmanager.global_and_fixture_disabled():
yield
-
-
+
+
# The fixtures.
-
-
+
+
@fixture
def capsys(request: SubRequest) -> Generator[CaptureFixture[str], None, None]:
"""Enable text capturing of writes to ``sys.stdout`` and ``sys.stderr``.
-
+
The captured output is made available via ``capsys.readouterr()`` method
calls, which return a ``(out, err)`` namedtuple.
``out`` and ``err`` will be ``text`` objects.
- """
+ """
capman = request.config.pluginmanager.getplugin("capturemanager")
capture_fixture = CaptureFixture[str](SysCapture, request, _ispytest=True)
capman.set_fixture(capture_fixture)
@@ -914,8 +914,8 @@ def capsys(request: SubRequest) -> Generator[CaptureFixture[str], None, None]:
yield capture_fixture
capture_fixture.close()
capman.unset_fixture()
-
-
+
+
@fixture
def capsysbinary(request: SubRequest) -> Generator[CaptureFixture[bytes], None, None]:
"""Enable bytes capturing of writes to ``sys.stdout`` and ``sys.stderr``.
@@ -923,7 +923,7 @@ def capsysbinary(request: SubRequest) -> Generator[CaptureFixture[bytes], None,
The captured output is made available via ``capsysbinary.readouterr()``
method calls, which return a ``(out, err)`` namedtuple.
``out`` and ``err`` will be ``bytes`` objects.
- """
+ """
capman = request.config.pluginmanager.getplugin("capturemanager")
capture_fixture = CaptureFixture[bytes](SysCaptureBinary, request, _ispytest=True)
capman.set_fixture(capture_fixture)
@@ -931,16 +931,16 @@ def capsysbinary(request: SubRequest) -> Generator[CaptureFixture[bytes], None,
yield capture_fixture
capture_fixture.close()
capman.unset_fixture()
-
-
+
+
@fixture
def capfd(request: SubRequest) -> Generator[CaptureFixture[str], None, None]:
"""Enable text capturing of writes to file descriptors ``1`` and ``2``.
-
+
The captured output is made available via ``capfd.readouterr()`` method
calls, which return a ``(out, err)`` namedtuple.
``out`` and ``err`` will be ``text`` objects.
- """
+ """
capman = request.config.pluginmanager.getplugin("capturemanager")
capture_fixture = CaptureFixture[str](FDCapture, request, _ispytest=True)
capman.set_fixture(capture_fixture)
@@ -948,12 +948,12 @@ def capfd(request: SubRequest) -> Generator[CaptureFixture[str], None, None]:
yield capture_fixture
capture_fixture.close()
capman.unset_fixture()
-
-
+
+
@fixture
def capfdbinary(request: SubRequest) -> Generator[CaptureFixture[bytes], None, None]:
"""Enable bytes capturing of writes to file descriptors ``1`` and ``2``.
-
+
The captured output is made available via ``capfd.readouterr()`` method
calls, which return a ``(out, err)`` namedtuple.
``out`` and ``err`` will be ``byte`` objects.