diff options
author | deshevoy <deshevoy@yandex-team.ru> | 2022-02-10 16:46:56 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:46:56 +0300 |
commit | e988f30484abe5fdeedcc7a5d3c226c01a21800c (patch) | |
tree | 0a217b173aabb57b7e51f8a169989b1a3e0309fe /contrib/python/pytest/py2/_pytest/capture.py | |
parent | 33ee501c05d3f24036ae89766a858930ae66c548 (diff) | |
download | ydb-e988f30484abe5fdeedcc7a5d3c226c01a21800c.tar.gz |
Restoring authorship annotation for <deshevoy@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/python/pytest/py2/_pytest/capture.py')
-rw-r--r-- | contrib/python/pytest/py2/_pytest/capture.py | 1448 |
1 files changed, 724 insertions, 724 deletions
diff --git a/contrib/python/pytest/py2/_pytest/capture.py b/contrib/python/pytest/py2/_pytest/capture.py index 68c17772f39..676c1d9eb54 100644 --- a/contrib/python/pytest/py2/_pytest/capture.py +++ b/contrib/python/pytest/py2/_pytest/capture.py @@ -1,90 +1,90 @@ # -*- coding: utf-8 -*- -""" -per-test stdout/stderr capturing mechanism. - -""" -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import collections -import contextlib -import io -import os -import sys -from io import UnsupportedOperation -from tempfile import TemporaryFile - -import six - -import pytest +""" +per-test stdout/stderr capturing mechanism. + +""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import collections +import contextlib +import io +import os +import sys +from io import UnsupportedOperation +from tempfile import TemporaryFile + +import six + +import pytest from _pytest.compat import _PY3 -from _pytest.compat import CaptureIO - -patchsysdict = {0: "stdin", 1: "stdout", 2: "stderr"} - - -def pytest_addoption(parser): - group = parser.getgroup("general") - group._addoption( - "--capture", - action="store", - default="fd" if hasattr(os, "dup") else "sys", - metavar="method", - choices=["fd", "sys", "no"], - help="per-test capturing method: one of fd|sys|no.", - ) - group._addoption( - "-s", - action="store_const", - const="no", - dest="capture", - help="shortcut for --capture=no.", - ) - - -@pytest.hookimpl(hookwrapper=True) -def pytest_load_initial_conftests(early_config, parser, args): - ns = early_config.known_args_namespace - if ns.capture == "fd": - _py36_windowsconsoleio_workaround(sys.stdout) - _colorama_workaround() - _readline_workaround() - pluginmanager = early_config.pluginmanager - capman = CaptureManager(ns.capture) - pluginmanager.register(capman, "capturemanager") - - # make sure that capturemanager is properly reset at final shutdown - early_config.add_cleanup(capman.stop_global_capturing) - - # finally trigger conftest loading but while capturing (issue93) - capman.start_global_capturing() - outcome = yield - capman.suspend_global_capture() - if outcome.excinfo is not None: - out, err = capman.read_global_capture() - sys.stdout.write(out) - sys.stderr.write(err) - - -class CaptureManager(object): - """ - Capture plugin, manages that the appropriate capture method is enabled/disabled during collection and each - test phase (setup, call, teardown). After each of those points, the captured output is obtained and - attached to the collection/runtest report. - - There are two levels of capture: - * global: which is enabled by default and can be suppressed by the ``-s`` option. This is always enabled/disabled - during collection and each test phase. - * fixture: when a test function or one of its fixture depend on the ``capsys`` or ``capfd`` fixtures. In this - case special handling is needed to ensure the fixtures take precedence over the global capture. - """ - - def __init__(self, method): - self._method = method - self._global_capturing = None - self._current_item = None - +from _pytest.compat import CaptureIO + +patchsysdict = {0: "stdin", 1: "stdout", 2: "stderr"} + + +def pytest_addoption(parser): + group = parser.getgroup("general") + group._addoption( + "--capture", + action="store", + default="fd" if hasattr(os, "dup") else "sys", + metavar="method", + choices=["fd", "sys", "no"], + help="per-test capturing method: one of fd|sys|no.", + ) + group._addoption( + "-s", + action="store_const", + const="no", + dest="capture", + help="shortcut for --capture=no.", + ) + + +@pytest.hookimpl(hookwrapper=True) +def pytest_load_initial_conftests(early_config, parser, args): + ns = early_config.known_args_namespace + if ns.capture == "fd": + _py36_windowsconsoleio_workaround(sys.stdout) + _colorama_workaround() + _readline_workaround() + pluginmanager = early_config.pluginmanager + capman = CaptureManager(ns.capture) + pluginmanager.register(capman, "capturemanager") + + # make sure that capturemanager is properly reset at final shutdown + early_config.add_cleanup(capman.stop_global_capturing) + + # finally trigger conftest loading but while capturing (issue93) + capman.start_global_capturing() + outcome = yield + capman.suspend_global_capture() + if outcome.excinfo is not None: + out, err = capman.read_global_capture() + sys.stdout.write(out) + sys.stderr.write(err) + + +class CaptureManager(object): + """ + Capture plugin, manages that the appropriate capture method is enabled/disabled during collection and each + test phase (setup, call, teardown). After each of those points, the captured output is obtained and + attached to the collection/runtest report. + + There are two levels of capture: + * global: which is enabled by default and can be suppressed by the ``-s`` option. This is always enabled/disabled + during collection and each test phase. + * fixture: when a test function or one of its fixture depend on the ``capsys`` or ``capfd`` fixtures. In this + case special handling is needed to ensure the fixtures take precedence over the global capture. + """ + + def __init__(self, method): + self._method = method + self._global_capturing = None + self._current_item = None + def __repr__(self): return "<CaptureManager _method=%r _global_capturing=%r _current_item=%r>" % ( self._method, @@ -92,15 +92,15 @@ class CaptureManager(object): self._current_item, ) - def _getcapture(self, method): - if method == "fd": - return MultiCapture(out=True, err=True, Capture=FDCapture) - elif method == "sys": - return MultiCapture(out=True, err=True, Capture=SysCapture) - elif method == "no": - return MultiCapture(out=False, err=False, in_=False) + def _getcapture(self, method): + if method == "fd": + return MultiCapture(out=True, err=True, Capture=FDCapture) + elif method == "sys": + return MultiCapture(out=True, err=True, Capture=SysCapture) + elif method == "no": + return MultiCapture(out=False, err=False, in_=False) raise ValueError("unknown capturing method: %r" % method) # pragma: no cover - + def is_capturing(self): if self.is_globally_capturing(): return "global" @@ -111,33 +111,33 @@ class CaptureManager(object): ) return False - # Global capturing control - - def is_globally_capturing(self): - return self._method != "no" - - def start_global_capturing(self): - assert self._global_capturing is None - self._global_capturing = self._getcapture(self._method) - self._global_capturing.start_capturing() - - def stop_global_capturing(self): - if self._global_capturing is not None: - self._global_capturing.pop_outerr_to_orig() - self._global_capturing.stop_capturing() - self._global_capturing = None - - def resume_global_capture(self): + # Global capturing control + + def is_globally_capturing(self): + return self._method != "no" + + def start_global_capturing(self): + assert self._global_capturing is None + self._global_capturing = self._getcapture(self._method) + self._global_capturing.start_capturing() + + def stop_global_capturing(self): + if self._global_capturing is not None: + self._global_capturing.pop_outerr_to_orig() + self._global_capturing.stop_capturing() + self._global_capturing = None + + def resume_global_capture(self): # During teardown of the python process, and on rare occasions, capture # attributes can be `None` while trying to resume global capture. if self._global_capturing is not None: self._global_capturing.resume_capturing() - - def suspend_global_capture(self, in_=False): - cap = getattr(self, "_global_capturing", None) - if cap is not None: - cap.suspend_capturing(in_=in_) - + + def suspend_global_capture(self, in_=False): + cap = getattr(self, "_global_capturing", None) + if cap is not None: + cap.suspend_capturing(in_=in_) + def suspend(self, in_=False): # Need to undo local capsys-et-al if it exists before disabling global capture. self.suspend_fixture(self._current_item) @@ -147,331 +147,331 @@ class CaptureManager(object): self.resume_global_capture() self.resume_fixture(self._current_item) - def read_global_capture(self): - return self._global_capturing.readouterr() - - # Fixture Control (it's just forwarding, think about removing this later) - - def activate_fixture(self, item): - """If the current item is using ``capsys`` or ``capfd``, activate them so they take precedence over - the global capture. - """ - fixture = getattr(item, "_capture_fixture", None) - if fixture is not None: - fixture._start() - - def deactivate_fixture(self, item): - """Deactivates the ``capsys`` or ``capfd`` fixture of this item, if any.""" - fixture = getattr(item, "_capture_fixture", None) - if fixture is not None: - fixture.close() - - def suspend_fixture(self, item): - fixture = getattr(item, "_capture_fixture", None) - if fixture is not None: - fixture._suspend() - - def resume_fixture(self, item): - fixture = getattr(item, "_capture_fixture", None) - if fixture is not None: - fixture._resume() - - # Helper context managers - - @contextlib.contextmanager - def global_and_fixture_disabled(self): + def read_global_capture(self): + return self._global_capturing.readouterr() + + # Fixture Control (it's just forwarding, think about removing this later) + + def activate_fixture(self, item): + """If the current item is using ``capsys`` or ``capfd``, activate them so they take precedence over + the global capture. + """ + fixture = getattr(item, "_capture_fixture", None) + if fixture is not None: + fixture._start() + + def deactivate_fixture(self, item): + """Deactivates the ``capsys`` or ``capfd`` fixture of this item, if any.""" + fixture = getattr(item, "_capture_fixture", None) + if fixture is not None: + fixture.close() + + def suspend_fixture(self, item): + fixture = getattr(item, "_capture_fixture", None) + if fixture is not None: + fixture._suspend() + + def resume_fixture(self, item): + fixture = getattr(item, "_capture_fixture", None) + if fixture is not None: + fixture._resume() + + # Helper context managers + + @contextlib.contextmanager + def global_and_fixture_disabled(self): """Context manager to temporarily disable global and current fixture capturing.""" self.suspend() - try: - yield - finally: + try: + yield + finally: self.resume() - - @contextlib.contextmanager - def item_capture(self, when, item): - self.resume_global_capture() - self.activate_fixture(item) - try: - yield - finally: - self.deactivate_fixture(item) - self.suspend_global_capture(in_=False) - - out, err = self.read_global_capture() - item.add_report_section(when, "stdout", out) - item.add_report_section(when, "stderr", err) - - # Hooks - - @pytest.hookimpl(hookwrapper=True) - def pytest_make_collect_report(self, collector): - if isinstance(collector, pytest.File): - self.resume_global_capture() - outcome = yield - self.suspend_global_capture() - out, err = self.read_global_capture() - rep = outcome.get_result() - if out: - rep.sections.append(("Captured stdout", out)) - if err: - rep.sections.append(("Captured stderr", err)) - else: - yield - - @pytest.hookimpl(hookwrapper=True) - def pytest_runtest_protocol(self, item): - self._current_item = item - yield - self._current_item = None - - @pytest.hookimpl(hookwrapper=True) - def pytest_runtest_setup(self, item): - with self.item_capture("setup", item): - yield - - @pytest.hookimpl(hookwrapper=True) - def pytest_runtest_call(self, item): - with self.item_capture("call", item): - yield - - @pytest.hookimpl(hookwrapper=True) - def pytest_runtest_teardown(self, item): - with self.item_capture("teardown", item): - yield - - @pytest.hookimpl(tryfirst=True) - def pytest_keyboard_interrupt(self, excinfo): - self.stop_global_capturing() - - @pytest.hookimpl(tryfirst=True) - def pytest_internalerror(self, excinfo): - self.stop_global_capturing() - - -capture_fixtures = {"capfd", "capfdbinary", "capsys", "capsysbinary"} - - -def _ensure_only_one_capture_fixture(request, name): - fixtures = set(request.fixturenames) & capture_fixtures - {name} - if fixtures: - fixtures = sorted(fixtures) - fixtures = fixtures[0] if len(fixtures) == 1 else fixtures - raise request.raiseerror( - "cannot use {} and {} at the same time".format(fixtures, name) - ) - - -@pytest.fixture -def capsys(request): + + @contextlib.contextmanager + def item_capture(self, when, item): + self.resume_global_capture() + self.activate_fixture(item) + try: + yield + finally: + self.deactivate_fixture(item) + self.suspend_global_capture(in_=False) + + out, err = self.read_global_capture() + item.add_report_section(when, "stdout", out) + item.add_report_section(when, "stderr", err) + + # Hooks + + @pytest.hookimpl(hookwrapper=True) + def pytest_make_collect_report(self, collector): + if isinstance(collector, pytest.File): + self.resume_global_capture() + outcome = yield + self.suspend_global_capture() + out, err = self.read_global_capture() + rep = outcome.get_result() + if out: + rep.sections.append(("Captured stdout", out)) + if err: + rep.sections.append(("Captured stderr", err)) + else: + yield + + @pytest.hookimpl(hookwrapper=True) + def pytest_runtest_protocol(self, item): + self._current_item = item + yield + self._current_item = None + + @pytest.hookimpl(hookwrapper=True) + def pytest_runtest_setup(self, item): + with self.item_capture("setup", item): + yield + + @pytest.hookimpl(hookwrapper=True) + def pytest_runtest_call(self, item): + with self.item_capture("call", item): + yield + + @pytest.hookimpl(hookwrapper=True) + def pytest_runtest_teardown(self, item): + with self.item_capture("teardown", item): + yield + + @pytest.hookimpl(tryfirst=True) + def pytest_keyboard_interrupt(self, excinfo): + self.stop_global_capturing() + + @pytest.hookimpl(tryfirst=True) + def pytest_internalerror(self, excinfo): + self.stop_global_capturing() + + +capture_fixtures = {"capfd", "capfdbinary", "capsys", "capsysbinary"} + + +def _ensure_only_one_capture_fixture(request, name): + fixtures = set(request.fixturenames) & capture_fixtures - {name} + if fixtures: + fixtures = sorted(fixtures) + fixtures = fixtures[0] if len(fixtures) == 1 else fixtures + raise request.raiseerror( + "cannot use {} and {} at the same time".format(fixtures, name) + ) + + +@pytest.fixture +def capsys(request): """Enable text capturing of writes to ``sys.stdout`` and ``sys.stderr``. The captured output is made available via ``capsys.readouterr()`` method calls, which return a ``(out, err)`` namedtuple. ``out`` and ``err`` will be ``text`` objects. - """ - _ensure_only_one_capture_fixture(request, "capsys") - with _install_capture_fixture_on_item(request, SysCapture) as fixture: - yield fixture - - -@pytest.fixture -def capsysbinary(request): + """ + _ensure_only_one_capture_fixture(request, "capsys") + with _install_capture_fixture_on_item(request, SysCapture) as fixture: + yield fixture + + +@pytest.fixture +def capsysbinary(request): """Enable bytes capturing of writes to ``sys.stdout`` and ``sys.stderr``. The captured output is made available via ``capsysbinary.readouterr()`` method calls, which return a ``(out, err)`` namedtuple. ``out`` and ``err`` will be ``bytes`` objects. - """ - _ensure_only_one_capture_fixture(request, "capsysbinary") - # Currently, the implementation uses the python3 specific `.buffer` - # property of CaptureIO. - if sys.version_info < (3,): + """ + _ensure_only_one_capture_fixture(request, "capsysbinary") + # Currently, the implementation uses the python3 specific `.buffer` + # property of CaptureIO. + if sys.version_info < (3,): raise request.raiseerror("capsysbinary is only supported on Python 3") - with _install_capture_fixture_on_item(request, SysCaptureBinary) as fixture: - yield fixture - - -@pytest.fixture -def capfd(request): + with _install_capture_fixture_on_item(request, SysCaptureBinary) as fixture: + yield fixture + + +@pytest.fixture +def capfd(request): """Enable text capturing of writes to file descriptors ``1`` and ``2``. The captured output is made available via ``capfd.readouterr()`` method calls, which return a ``(out, err)`` namedtuple. ``out`` and ``err`` will be ``text`` objects. - """ - _ensure_only_one_capture_fixture(request, "capfd") - if not hasattr(os, "dup"): - pytest.skip( - "capfd fixture needs os.dup function which is not available in this system" - ) - with _install_capture_fixture_on_item(request, FDCapture) as fixture: - yield fixture - - -@pytest.fixture -def capfdbinary(request): + """ + _ensure_only_one_capture_fixture(request, "capfd") + if not hasattr(os, "dup"): + pytest.skip( + "capfd fixture needs os.dup function which is not available in this system" + ) + with _install_capture_fixture_on_item(request, FDCapture) as fixture: + yield fixture + + +@pytest.fixture +def capfdbinary(request): """Enable bytes capturing of writes to file descriptors ``1`` and ``2``. The captured output is made available via ``capfd.readouterr()`` method calls, which return a ``(out, err)`` namedtuple. ``out`` and ``err`` will be ``byte`` objects. - """ - _ensure_only_one_capture_fixture(request, "capfdbinary") - if not hasattr(os, "dup"): - pytest.skip( - "capfdbinary fixture needs os.dup function which is not available in this system" - ) - with _install_capture_fixture_on_item(request, FDCaptureBinary) as fixture: - yield fixture - - -@contextlib.contextmanager -def _install_capture_fixture_on_item(request, capture_class): - """ - Context manager which creates a ``CaptureFixture`` instance and "installs" it on - the item/node of the given request. Used by ``capsys`` and ``capfd``. - - The CaptureFixture is added as attribute of the item because it needs to accessed - by ``CaptureManager`` during its ``pytest_runtest_*`` hooks. - """ - request.node._capture_fixture = fixture = CaptureFixture(capture_class, request) - capmanager = request.config.pluginmanager.getplugin("capturemanager") + """ + _ensure_only_one_capture_fixture(request, "capfdbinary") + if not hasattr(os, "dup"): + pytest.skip( + "capfdbinary fixture needs os.dup function which is not available in this system" + ) + with _install_capture_fixture_on_item(request, FDCaptureBinary) as fixture: + yield fixture + + +@contextlib.contextmanager +def _install_capture_fixture_on_item(request, capture_class): + """ + Context manager which creates a ``CaptureFixture`` instance and "installs" it on + the item/node of the given request. Used by ``capsys`` and ``capfd``. + + The CaptureFixture is added as attribute of the item because it needs to accessed + by ``CaptureManager`` during its ``pytest_runtest_*`` hooks. + """ + request.node._capture_fixture = fixture = CaptureFixture(capture_class, request) + capmanager = request.config.pluginmanager.getplugin("capturemanager") # Need to active this fixture right away in case it is being used by another fixture (setup phase). # If this fixture is being used only by a test function (call phase), then we wouldn't need this # activation, but it doesn't hurt. - capmanager.activate_fixture(request.node) - yield fixture - fixture.close() - del request.node._capture_fixture - - -class CaptureFixture(object): - """ - Object returned by :py:func:`capsys`, :py:func:`capsysbinary`, :py:func:`capfd` and :py:func:`capfdbinary` - fixtures. - """ - - def __init__(self, captureclass, request): - self.captureclass = captureclass - self.request = request - self._capture = None - self._captured_out = self.captureclass.EMPTY_BUFFER - self._captured_err = self.captureclass.EMPTY_BUFFER - - def _start(self): + capmanager.activate_fixture(request.node) + yield fixture + fixture.close() + del request.node._capture_fixture + + +class CaptureFixture(object): + """ + Object returned by :py:func:`capsys`, :py:func:`capsysbinary`, :py:func:`capfd` and :py:func:`capfdbinary` + fixtures. + """ + + def __init__(self, captureclass, request): + self.captureclass = captureclass + self.request = request + self._capture = None + self._captured_out = self.captureclass.EMPTY_BUFFER + self._captured_err = self.captureclass.EMPTY_BUFFER + + def _start(self): if self._capture is None: - self._capture = MultiCapture( - out=True, err=True, in_=False, Capture=self.captureclass - ) - self._capture.start_capturing() - - def close(self): - if self._capture is not None: - out, err = self._capture.pop_outerr_to_orig() - self._captured_out += out - self._captured_err += err - self._capture.stop_capturing() - self._capture = None - - def readouterr(self): - """Read and return the captured output so far, resetting the internal buffer. - + self._capture = MultiCapture( + out=True, err=True, in_=False, Capture=self.captureclass + ) + self._capture.start_capturing() + + def close(self): + if self._capture is not None: + out, err = self._capture.pop_outerr_to_orig() + self._captured_out += out + self._captured_err += err + self._capture.stop_capturing() + self._capture = None + + def readouterr(self): + """Read and return the captured output so far, resetting the internal buffer. + :return: captured content as a namedtuple with ``out`` and ``err`` string attributes - """ - captured_out, captured_err = self._captured_out, self._captured_err - if self._capture is not None: - out, err = self._capture.readouterr() - captured_out += out - captured_err += err - self._captured_out = self.captureclass.EMPTY_BUFFER - self._captured_err = self.captureclass.EMPTY_BUFFER - return CaptureResult(captured_out, captured_err) - - def _suspend(self): - """Suspends this fixture's own capturing temporarily.""" + """ + captured_out, captured_err = self._captured_out, self._captured_err + if self._capture is not None: + out, err = self._capture.readouterr() + captured_out += out + captured_err += err + self._captured_out = self.captureclass.EMPTY_BUFFER + self._captured_err = self.captureclass.EMPTY_BUFFER + return CaptureResult(captured_out, captured_err) + + def _suspend(self): + """Suspends this fixture's own capturing temporarily.""" if self._capture is not None: self._capture.suspend_capturing() - - def _resume(self): - """Resumes this fixture's own capturing temporarily.""" + + def _resume(self): + """Resumes this fixture's own capturing temporarily.""" if self._capture is not None: self._capture.resume_capturing() - - @contextlib.contextmanager - def disabled(self): - """Temporarily disables capture while inside the 'with' block.""" - capmanager = self.request.config.pluginmanager.getplugin("capturemanager") - with capmanager.global_and_fixture_disabled(): - yield - - -def safe_text_dupfile(f, mode, default_encoding="UTF8"): - """ return an open text file object that's a duplicate of f on the - FD-level if possible. - """ - encoding = getattr(f, "encoding", None) - try: - fd = f.fileno() - except Exception: - if "b" not in getattr(f, "mode", "") and hasattr(f, "encoding"): - # we seem to have a text stream, let's just use it - return f - else: - newfd = os.dup(fd) - if "b" not in mode: - mode += "b" - f = os.fdopen(newfd, mode, 0) # no buffering - return EncodedFile(f, encoding or default_encoding) - - -class EncodedFile(object): - errors = "strict" # possibly needed by py3 code (issue555) - - def __init__(self, buffer, encoding): - self.buffer = buffer - self.encoding = encoding - - def write(self, obj): - if isinstance(obj, six.text_type): - obj = obj.encode(self.encoding, "replace") + + @contextlib.contextmanager + def disabled(self): + """Temporarily disables capture while inside the 'with' block.""" + capmanager = self.request.config.pluginmanager.getplugin("capturemanager") + with capmanager.global_and_fixture_disabled(): + yield + + +def safe_text_dupfile(f, mode, default_encoding="UTF8"): + """ return an open text file object that's a duplicate of f on the + FD-level if possible. + """ + encoding = getattr(f, "encoding", None) + try: + fd = f.fileno() + except Exception: + if "b" not in getattr(f, "mode", "") and hasattr(f, "encoding"): + # we seem to have a text stream, let's just use it + return f + else: + newfd = os.dup(fd) + if "b" not in mode: + mode += "b" + f = os.fdopen(newfd, mode, 0) # no buffering + return EncodedFile(f, encoding or default_encoding) + + +class EncodedFile(object): + errors = "strict" # possibly needed by py3 code (issue555) + + def __init__(self, buffer, encoding): + self.buffer = buffer + self.encoding = encoding + + def write(self, obj): + if isinstance(obj, six.text_type): + obj = obj.encode(self.encoding, "replace") elif _PY3: raise TypeError( "write() argument must be str, not {}".format(type(obj).__name__) ) - self.buffer.write(obj) - - def writelines(self, linelist): - data = "".join(linelist) - self.write(data) - - @property - def name(self): - """Ensure that file.name is a string.""" - return repr(self.buffer) - + self.buffer.write(obj) + + def writelines(self, linelist): + data = "".join(linelist) + self.write(data) + + @property + def name(self): + """Ensure that file.name is a string.""" + return repr(self.buffer) + @property def mode(self): return self.buffer.mode.replace("b", "") - def __getattr__(self, name): - return getattr(object.__getattribute__(self, "buffer"), name) - - -CaptureResult = collections.namedtuple("CaptureResult", ["out", "err"]) - - -class MultiCapture(object): - out = err = in_ = None + def __getattr__(self, name): + return getattr(object.__getattribute__(self, "buffer"), name) + + +CaptureResult = collections.namedtuple("CaptureResult", ["out", "err"]) + + +class MultiCapture(object): + out = err = in_ = None _state = None - - def __init__(self, out=True, err=True, in_=True, Capture=None): - if in_: - self.in_ = Capture(0) - if out: - self.out = Capture(1) - if err: - self.err = Capture(2) - + + def __init__(self, out=True, err=True, in_=True, Capture=None): + if in_: + self.in_ = Capture(0) + if out: + self.out = Capture(1) + if err: + self.err = Capture(2) + def __repr__(self): return "<MultiCapture out=%r err=%r in_=%r _state=%r _in_suspended=%r>" % ( self.out, @@ -481,185 +481,185 @@ class MultiCapture(object): getattr(self, "_in_suspended", "<UNSET>"), ) - def start_capturing(self): + def start_capturing(self): self._state = "started" - if self.in_: - self.in_.start() - if self.out: - self.out.start() - if self.err: - self.err.start() - - def pop_outerr_to_orig(self): - """ pop current snapshot out/err capture and flush to orig streams. """ - out, err = self.readouterr() - if out: - self.out.writeorg(out) - if err: - self.err.writeorg(err) - return out, err - - def suspend_capturing(self, in_=False): + if self.in_: + self.in_.start() + if self.out: + self.out.start() + if self.err: + self.err.start() + + def pop_outerr_to_orig(self): + """ pop current snapshot out/err capture and flush to orig streams. """ + out, err = self.readouterr() + if out: + self.out.writeorg(out) + if err: + self.err.writeorg(err) + return out, err + + def suspend_capturing(self, in_=False): self._state = "suspended" - if self.out: - self.out.suspend() - if self.err: - self.err.suspend() - if in_ and self.in_: - self.in_.suspend() - self._in_suspended = True - - def resume_capturing(self): + if self.out: + self.out.suspend() + if self.err: + self.err.suspend() + if in_ and self.in_: + self.in_.suspend() + self._in_suspended = True + + def resume_capturing(self): self._state = "resumed" - if self.out: - self.out.resume() - if self.err: - self.err.resume() - if hasattr(self, "_in_suspended"): - self.in_.resume() - del self._in_suspended - - def stop_capturing(self): - """ stop capturing and reset capturing streams """ + if self.out: + self.out.resume() + if self.err: + self.err.resume() + if hasattr(self, "_in_suspended"): + self.in_.resume() + del self._in_suspended + + def stop_capturing(self): + """ stop capturing and reset capturing streams """ if self._state == "stopped": - raise ValueError("was already stopped") + raise ValueError("was already stopped") self._state = "stopped" - if self.out: - self.out.done() - if self.err: - self.err.done() - if self.in_: - self.in_.done() - - def readouterr(self): - """ return snapshot unicode value of stdout/stderr capturings. """ - return CaptureResult( - self.out.snap() if self.out is not None else "", - self.err.snap() if self.err is not None else "", - ) - - -class NoCapture(object): - EMPTY_BUFFER = None - __init__ = start = done = suspend = resume = lambda *args: None - - -class FDCaptureBinary(object): - """Capture IO to/from a given os-level filedescriptor. - - snap() produces `bytes` - """ - - EMPTY_BUFFER = b"" + if self.out: + self.out.done() + if self.err: + self.err.done() + if self.in_: + self.in_.done() + + def readouterr(self): + """ return snapshot unicode value of stdout/stderr capturings. """ + return CaptureResult( + self.out.snap() if self.out is not None else "", + self.err.snap() if self.err is not None else "", + ) + + +class NoCapture(object): + EMPTY_BUFFER = None + __init__ = start = done = suspend = resume = lambda *args: None + + +class FDCaptureBinary(object): + """Capture IO to/from a given os-level filedescriptor. + + snap() produces `bytes` + """ + + EMPTY_BUFFER = b"" _state = None - - def __init__(self, targetfd, tmpfile=None): - self.targetfd = targetfd - try: - self.targetfd_save = os.dup(self.targetfd) - except OSError: - self.start = lambda: None - self.done = lambda: None - else: - if targetfd == 0: - assert not tmpfile, "cannot set tmpfile with stdin" - tmpfile = open(os.devnull, "r") - self.syscapture = SysCapture(targetfd) - else: - if tmpfile is None: - f = TemporaryFile() - with f: - tmpfile = safe_text_dupfile(f, mode="wb+") - if targetfd in patchsysdict: - self.syscapture = SysCapture(targetfd, tmpfile) - else: - self.syscapture = NoCapture() - self.tmpfile = tmpfile - self.tmpfile_fd = tmpfile.fileno() - - def __repr__(self): + + def __init__(self, targetfd, tmpfile=None): + self.targetfd = targetfd + try: + self.targetfd_save = os.dup(self.targetfd) + except OSError: + self.start = lambda: None + self.done = lambda: None + else: + if targetfd == 0: + assert not tmpfile, "cannot set tmpfile with stdin" + tmpfile = open(os.devnull, "r") + self.syscapture = SysCapture(targetfd) + else: + if tmpfile is None: + f = TemporaryFile() + with f: + tmpfile = safe_text_dupfile(f, mode="wb+") + if targetfd in patchsysdict: + self.syscapture = SysCapture(targetfd, tmpfile) + else: + self.syscapture = NoCapture() + self.tmpfile = tmpfile + self.tmpfile_fd = tmpfile.fileno() + + def __repr__(self): return "<FDCapture %s oldfd=%s _state=%r>" % ( self.targetfd, getattr(self, "targetfd_save", None), self._state, ) - - def start(self): - """ Start capturing on targetfd using memorized tmpfile. """ - try: - os.fstat(self.targetfd_save) - except (AttributeError, OSError): - raise ValueError("saved filedescriptor not valid anymore") - os.dup2(self.tmpfile_fd, self.targetfd) - self.syscapture.start() + + def start(self): + """ Start capturing on targetfd using memorized tmpfile. """ + try: + os.fstat(self.targetfd_save) + except (AttributeError, OSError): + raise ValueError("saved filedescriptor not valid anymore") + os.dup2(self.tmpfile_fd, self.targetfd) + self.syscapture.start() self._state = "started" - - def snap(self): - self.tmpfile.seek(0) - res = self.tmpfile.read() - self.tmpfile.seek(0) - self.tmpfile.truncate() - return res - - def done(self): - """ stop capturing, restore streams, return original capture file, - seeked to position zero. """ - targetfd_save = self.__dict__.pop("targetfd_save") - os.dup2(targetfd_save, self.targetfd) - os.close(targetfd_save) - self.syscapture.done() - _attempt_to_close_capture_file(self.tmpfile) + + def snap(self): + self.tmpfile.seek(0) + res = self.tmpfile.read() + self.tmpfile.seek(0) + self.tmpfile.truncate() + return res + + def done(self): + """ stop capturing, restore streams, return original capture file, + seeked to position zero. """ + targetfd_save = self.__dict__.pop("targetfd_save") + os.dup2(targetfd_save, self.targetfd) + os.close(targetfd_save) + self.syscapture.done() + _attempt_to_close_capture_file(self.tmpfile) self._state = "done" - - def suspend(self): - self.syscapture.suspend() - os.dup2(self.targetfd_save, self.targetfd) + + def suspend(self): + self.syscapture.suspend() + os.dup2(self.targetfd_save, self.targetfd) self._state = "suspended" - - def resume(self): - self.syscapture.resume() - os.dup2(self.tmpfile_fd, self.targetfd) + + def resume(self): + self.syscapture.resume() + os.dup2(self.tmpfile_fd, self.targetfd) self._state = "resumed" - - def writeorg(self, data): - """ write to original file descriptor. """ - if isinstance(data, six.text_type): - data = data.encode("utf8") # XXX use encoding of original stream - os.write(self.targetfd_save, data) - - -class FDCapture(FDCaptureBinary): - """Capture IO to/from a given os-level filedescriptor. - - snap() produces text - """ - - EMPTY_BUFFER = str() - - def snap(self): + + def writeorg(self, data): + """ write to original file descriptor. """ + if isinstance(data, six.text_type): + data = data.encode("utf8") # XXX use encoding of original stream + os.write(self.targetfd_save, data) + + +class FDCapture(FDCaptureBinary): + """Capture IO to/from a given os-level filedescriptor. + + snap() produces text + """ + + EMPTY_BUFFER = str() + + def snap(self): res = super(FDCapture, self).snap() - enc = getattr(self.tmpfile, "encoding", None) - if enc and isinstance(res, bytes): - res = six.text_type(res, enc, "replace") - return res - - -class SysCapture(object): - - EMPTY_BUFFER = str() + enc = getattr(self.tmpfile, "encoding", None) + if enc and isinstance(res, bytes): + res = six.text_type(res, enc, "replace") + return res + + +class SysCapture(object): + + EMPTY_BUFFER = str() _state = None - - def __init__(self, fd, tmpfile=None): - name = patchsysdict[fd] - self._old = getattr(sys, name) - self.name = name - if tmpfile is None: - if name == "stdin": - tmpfile = DontReadFromInput() - else: - tmpfile = CaptureIO() - self.tmpfile = tmpfile - + + def __init__(self, fd, tmpfile=None): + name = patchsysdict[fd] + self._old = getattr(sys, name) + self.name = name + if tmpfile is None: + if name == "stdin": + tmpfile = DontReadFromInput() + else: + tmpfile = CaptureIO() + self.tmpfile = tmpfile + def __repr__(self): return "<SysCapture %s _old=%r, tmpfile=%r _state=%r>" % ( self.name, @@ -668,183 +668,183 @@ class SysCapture(object): self._state, ) - def start(self): - setattr(sys, self.name, self.tmpfile) + def start(self): + setattr(sys, self.name, self.tmpfile) self._state = "started" - - def snap(self): - res = self.tmpfile.getvalue() - self.tmpfile.seek(0) - self.tmpfile.truncate() - return res - - def done(self): - setattr(sys, self.name, self._old) - del self._old - _attempt_to_close_capture_file(self.tmpfile) + + def snap(self): + res = self.tmpfile.getvalue() + self.tmpfile.seek(0) + self.tmpfile.truncate() + return res + + def done(self): + setattr(sys, self.name, self._old) + del self._old + _attempt_to_close_capture_file(self.tmpfile) self._state = "done" - - def suspend(self): - setattr(sys, self.name, self._old) + + def suspend(self): + setattr(sys, self.name, self._old) self._state = "suspended" - - def resume(self): - setattr(sys, self.name, self.tmpfile) + + def resume(self): + setattr(sys, self.name, self.tmpfile) self._state = "resumed" - - def writeorg(self, data): - self._old.write(data) - self._old.flush() - - -class SysCaptureBinary(SysCapture): - EMPTY_BUFFER = b"" - - def snap(self): - res = self.tmpfile.buffer.getvalue() - self.tmpfile.seek(0) - self.tmpfile.truncate() - return res - - -class DontReadFromInput(six.Iterator): - """Temporary stub class. Ideally when stdin is accessed, the - capturing should be turned off, with possibly all data captured - so far sent to the screen. This should be configurable, though, - because in automated test runs it is better to crash than - hang indefinitely. - """ - - encoding = None - - def read(self, *args): - raise IOError("reading from stdin while output is captured") - - readline = read - readlines = read - __next__ = read - - def __iter__(self): - return self - - def fileno(self): - raise UnsupportedOperation("redirected stdin is pseudofile, has no fileno()") - - def isatty(self): - return False - - def close(self): - pass - - @property - def buffer(self): - if sys.version_info >= (3, 0): - return self - else: - raise AttributeError("redirected stdin has no attribute buffer") - - -def _colorama_workaround(): - """ - Ensure colorama is imported so that it attaches to the correct stdio - handles on Windows. - - colorama uses the terminal on import time. So if something does the - first import of colorama while I/O capture is active, colorama will - fail in various ways. - """ + + def writeorg(self, data): + self._old.write(data) + self._old.flush() + + +class SysCaptureBinary(SysCapture): + EMPTY_BUFFER = b"" + + def snap(self): + res = self.tmpfile.buffer.getvalue() + self.tmpfile.seek(0) + self.tmpfile.truncate() + return res + + +class DontReadFromInput(six.Iterator): + """Temporary stub class. Ideally when stdin is accessed, the + capturing should be turned off, with possibly all data captured + so far sent to the screen. This should be configurable, though, + because in automated test runs it is better to crash than + hang indefinitely. + """ + + encoding = None + + def read(self, *args): + raise IOError("reading from stdin while output is captured") + + readline = read + readlines = read + __next__ = read + + def __iter__(self): + return self + + def fileno(self): + raise UnsupportedOperation("redirected stdin is pseudofile, has no fileno()") + + def isatty(self): + return False + + def close(self): + pass + + @property + def buffer(self): + if sys.version_info >= (3, 0): + return self + else: + raise AttributeError("redirected stdin has no attribute buffer") + + +def _colorama_workaround(): + """ + Ensure colorama is imported so that it attaches to the correct stdio + handles on Windows. + + colorama uses the terminal on import time. So if something does the + first import of colorama while I/O capture is active, colorama will + fail in various ways. + """ if sys.platform.startswith("win32"): try: import colorama # noqa: F401 except ImportError: pass - - -def _readline_workaround(): - """ - Ensure readline is imported so that it attaches to the correct stdio - handles on Windows. - - Pdb uses readline support where available--when not running from the Python - prompt, the readline module is not imported until running the pdb REPL. If - running pytest with the --pdb option this means the readline module is not - imported until after I/O capture has been started. - - This is a problem for pyreadline, which is often used to implement readline - support on Windows, as it does not attach to the correct handles for stdout - and/or stdin if they have been redirected by the FDCapture mechanism. This - workaround ensures that readline is imported before I/O capture is setup so - that it can attach to the actual stdin/out for the console. - - See https://github.com/pytest-dev/pytest/pull/1281 - """ + + +def _readline_workaround(): + """ + Ensure readline is imported so that it attaches to the correct stdio + handles on Windows. + + Pdb uses readline support where available--when not running from the Python + prompt, the readline module is not imported until running the pdb REPL. If + running pytest with the --pdb option this means the readline module is not + imported until after I/O capture has been started. + + This is a problem for pyreadline, which is often used to implement readline + support on Windows, as it does not attach to the correct handles for stdout + and/or stdin if they have been redirected by the FDCapture mechanism. This + workaround ensures that readline is imported before I/O capture is setup so + that it can attach to the actual stdin/out for the console. + + See https://github.com/pytest-dev/pytest/pull/1281 + """ if sys.platform.startswith("win32"): try: import readline # noqa: F401 except ImportError: pass - - -def _py36_windowsconsoleio_workaround(stream): - """ - Python 3.6 implemented unicode console handling for Windows. This works - by reading/writing to the raw console handle using - ``{Read,Write}ConsoleW``. - - The problem is that we are going to ``dup2`` over the stdio file - descriptors when doing ``FDCapture`` and this will ``CloseHandle`` the - handles used by Python to write to the console. Though there is still some - weirdness and the console handle seems to only be closed randomly and not - on the first call to ``CloseHandle``, or maybe it gets reopened with the - same handle value when we suspend capturing. - - The workaround in this case will reopen stdio with a different fd which - also means a different handle by replicating the logic in - "Py_lifecycle.c:initstdio/create_stdio". - - :param stream: in practice ``sys.stdout`` or ``sys.stderr``, but given - here as parameter for unittesting purposes. - - See https://github.com/pytest-dev/py/issues/103 - """ - if not sys.platform.startswith("win32") or sys.version_info[:2] < (3, 6): - return - - # bail out if ``stream`` doesn't seem like a proper ``io`` stream (#2666) - if not hasattr(stream, "buffer"): - return - - buffered = hasattr(stream.buffer, "raw") - raw_stdout = stream.buffer.raw if buffered else stream.buffer - - if not isinstance(raw_stdout, io._WindowsConsoleIO): - return - - def _reopen_stdio(f, mode): - if not buffered and mode[0] == "w": - buffering = 0 - else: - buffering = -1 - - return io.TextIOWrapper( - open(os.dup(f.fileno()), mode, buffering), - f.encoding, - f.errors, - f.newlines, - f.line_buffering, - ) - + + +def _py36_windowsconsoleio_workaround(stream): + """ + Python 3.6 implemented unicode console handling for Windows. This works + by reading/writing to the raw console handle using + ``{Read,Write}ConsoleW``. + + The problem is that we are going to ``dup2`` over the stdio file + descriptors when doing ``FDCapture`` and this will ``CloseHandle`` the + handles used by Python to write to the console. Though there is still some + weirdness and the console handle seems to only be closed randomly and not + on the first call to ``CloseHandle``, or maybe it gets reopened with the + same handle value when we suspend capturing. + + The workaround in this case will reopen stdio with a different fd which + also means a different handle by replicating the logic in + "Py_lifecycle.c:initstdio/create_stdio". + + :param stream: in practice ``sys.stdout`` or ``sys.stderr``, but given + here as parameter for unittesting purposes. + + See https://github.com/pytest-dev/py/issues/103 + """ + if not sys.platform.startswith("win32") or sys.version_info[:2] < (3, 6): + return + + # bail out if ``stream`` doesn't seem like a proper ``io`` stream (#2666) + if not hasattr(stream, "buffer"): + return + + buffered = hasattr(stream.buffer, "raw") + raw_stdout = stream.buffer.raw if buffered else stream.buffer + + if not isinstance(raw_stdout, io._WindowsConsoleIO): + return + + def _reopen_stdio(f, mode): + if not buffered and mode[0] == "w": + buffering = 0 + else: + buffering = -1 + + return io.TextIOWrapper( + open(os.dup(f.fileno()), mode, buffering), + f.encoding, + f.errors, + f.newlines, + f.line_buffering, + ) + sys.stdin = _reopen_stdio(sys.stdin, "rb") sys.stdout = _reopen_stdio(sys.stdout, "wb") sys.stderr = _reopen_stdio(sys.stderr, "wb") - - -def _attempt_to_close_capture_file(f): - """Suppress IOError when closing the temporary file used for capturing streams in py27 (#2370)""" - if six.PY2: - try: - f.close() - except IOError: - pass - else: - f.close() + + +def _attempt_to_close_capture_file(f): + """Suppress IOError when closing the temporary file used for capturing streams in py27 (#2370)""" + if six.PY2: + try: + f.close() + except IOError: + pass + else: + f.close() |