aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/pytest/py2/_pytest/logging.py
diff options
context:
space:
mode:
authornkozlovskiy <nmk@ydb.tech>2023-09-29 12:24:06 +0300
committernkozlovskiy <nmk@ydb.tech>2023-09-29 12:41:34 +0300
commite0e3e1717e3d33762ce61950504f9637a6e669ed (patch)
treebca3ff6939b10ed60c3d5c12439963a1146b9711 /contrib/python/pytest/py2/_pytest/logging.py
parent38f2c5852db84c7b4d83adfcb009eb61541d1ccd (diff)
downloadydb-e0e3e1717e3d33762ce61950504f9637a6e669ed.tar.gz
add ydb deps
Diffstat (limited to 'contrib/python/pytest/py2/_pytest/logging.py')
-rw-r--r--contrib/python/pytest/py2/_pytest/logging.py708
1 files changed, 708 insertions, 0 deletions
diff --git a/contrib/python/pytest/py2/_pytest/logging.py b/contrib/python/pytest/py2/_pytest/logging.py
new file mode 100644
index 0000000000..2400737ee4
--- /dev/null
+++ b/contrib/python/pytest/py2/_pytest/logging.py
@@ -0,0 +1,708 @@
+# -*- coding: utf-8 -*-
+""" Access and control log capturing. """
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import logging
+import re
+from contextlib import contextmanager
+
+import py
+import six
+
+import pytest
+from _pytest.compat import dummy_context_manager
+from _pytest.config import create_terminal_writer
+from _pytest.pathlib import Path
+
+DEFAULT_LOG_FORMAT = "%(levelname)-8s %(name)s:%(filename)s:%(lineno)d %(message)s"
+DEFAULT_LOG_DATE_FORMAT = "%H:%M:%S"
+_ANSI_ESCAPE_SEQ = re.compile(r"\x1b\[[\d;]+m")
+
+
+def _remove_ansi_escape_sequences(text):
+ return _ANSI_ESCAPE_SEQ.sub("", text)
+
+
+class ColoredLevelFormatter(logging.Formatter):
+ """
+ Colorize the %(levelname)..s part of the log format passed to __init__.
+ """
+
+ LOGLEVEL_COLOROPTS = {
+ logging.CRITICAL: {"red"},
+ logging.ERROR: {"red", "bold"},
+ logging.WARNING: {"yellow"},
+ logging.WARN: {"yellow"},
+ logging.INFO: {"green"},
+ logging.DEBUG: {"purple"},
+ logging.NOTSET: set(),
+ }
+ LEVELNAME_FMT_REGEX = re.compile(r"%\(levelname\)([+-]?\d*s)")
+
+ def __init__(self, terminalwriter, *args, **kwargs):
+ super(ColoredLevelFormatter, self).__init__(*args, **kwargs)
+ if six.PY2:
+ self._original_fmt = self._fmt
+ else:
+ self._original_fmt = self._style._fmt
+ self._level_to_fmt_mapping = {}
+
+ levelname_fmt_match = self.LEVELNAME_FMT_REGEX.search(self._fmt)
+ if not levelname_fmt_match:
+ return
+ levelname_fmt = levelname_fmt_match.group()
+
+ for level, color_opts in self.LOGLEVEL_COLOROPTS.items():
+ formatted_levelname = levelname_fmt % {
+ "levelname": logging.getLevelName(level)
+ }
+
+ # add ANSI escape sequences around the formatted levelname
+ color_kwargs = {name: True for name in color_opts}
+ colorized_formatted_levelname = terminalwriter.markup(
+ formatted_levelname, **color_kwargs
+ )
+ self._level_to_fmt_mapping[level] = self.LEVELNAME_FMT_REGEX.sub(
+ colorized_formatted_levelname, self._fmt
+ )
+
+ def format(self, record):
+ fmt = self._level_to_fmt_mapping.get(record.levelno, self._original_fmt)
+ if six.PY2:
+ self._fmt = fmt
+ else:
+ self._style._fmt = fmt
+ return super(ColoredLevelFormatter, self).format(record)
+
+
+if not six.PY2:
+ # Formatter classes don't support format styles in PY2
+
+ class PercentStyleMultiline(logging.PercentStyle):
+ """A logging style with special support for multiline messages.
+
+ If the message of a record consists of multiple lines, this style
+ formats the message as if each line were logged separately.
+ """
+
+ @staticmethod
+ def _update_message(record_dict, message):
+ tmp = record_dict.copy()
+ tmp["message"] = message
+ return tmp
+
+ def format(self, record):
+ if "\n" in record.message:
+ lines = record.message.splitlines()
+ formatted = self._fmt % self._update_message(record.__dict__, lines[0])
+ # TODO optimize this by introducing an option that tells the
+ # logging framework that the indentation doesn't
+ # change. This allows to compute the indentation only once.
+ indentation = _remove_ansi_escape_sequences(formatted).find(lines[0])
+ lines[0] = formatted
+ return ("\n" + " " * indentation).join(lines)
+ else:
+ return self._fmt % record.__dict__
+
+
+def get_option_ini(config, *names):
+ for name in names:
+ ret = config.getoption(name) # 'default' arg won't work as expected
+ if ret is None:
+ ret = config.getini(name)
+ if ret:
+ return ret
+
+
+def pytest_addoption(parser):
+ """Add options to control log capturing."""
+ group = parser.getgroup("logging")
+
+ def add_option_ini(option, dest, default=None, type=None, **kwargs):
+ parser.addini(
+ dest, default=default, type=type, help="default value for " + option
+ )
+ group.addoption(option, dest=dest, **kwargs)
+
+ add_option_ini(
+ "--no-print-logs",
+ dest="log_print",
+ action="store_const",
+ const=False,
+ default=True,
+ type="bool",
+ help="disable printing caught logs on failed tests.",
+ )
+ add_option_ini(
+ "--log-level",
+ dest="log_level",
+ default=None,
+ help="logging level used by the logging module",
+ )
+ add_option_ini(
+ "--log-format",
+ dest="log_format",
+ default=DEFAULT_LOG_FORMAT,
+ help="log format as used by the logging module.",
+ )
+ add_option_ini(
+ "--log-date-format",
+ dest="log_date_format",
+ default=DEFAULT_LOG_DATE_FORMAT,
+ help="log date format as used by the logging module.",
+ )
+ parser.addini(
+ "log_cli",
+ default=False,
+ type="bool",
+ help='enable log display during test run (also known as "live logging").',
+ )
+ add_option_ini(
+ "--log-cli-level", dest="log_cli_level", default=None, help="cli logging level."
+ )
+ add_option_ini(
+ "--log-cli-format",
+ dest="log_cli_format",
+ default=None,
+ help="log format as used by the logging module.",
+ )
+ add_option_ini(
+ "--log-cli-date-format",
+ dest="log_cli_date_format",
+ default=None,
+ help="log date format as used by the logging module.",
+ )
+ add_option_ini(
+ "--log-file",
+ dest="log_file",
+ default=None,
+ help="path to a file when logging will be written to.",
+ )
+ add_option_ini(
+ "--log-file-level",
+ dest="log_file_level",
+ default=None,
+ help="log file logging level.",
+ )
+ add_option_ini(
+ "--log-file-format",
+ dest="log_file_format",
+ default=DEFAULT_LOG_FORMAT,
+ help="log format as used by the logging module.",
+ )
+ add_option_ini(
+ "--log-file-date-format",
+ dest="log_file_date_format",
+ default=DEFAULT_LOG_DATE_FORMAT,
+ help="log date format as used by the logging module.",
+ )
+
+
+@contextmanager
+def catching_logs(handler, formatter=None, level=None):
+ """Context manager that prepares the whole logging machinery properly."""
+ root_logger = logging.getLogger()
+
+ if formatter is not None:
+ handler.setFormatter(formatter)
+ if level is not None:
+ handler.setLevel(level)
+
+ # Adding the same handler twice would confuse logging system.
+ # Just don't do that.
+ add_new_handler = handler not in root_logger.handlers
+
+ if add_new_handler:
+ root_logger.addHandler(handler)
+ if level is not None:
+ orig_level = root_logger.level
+ root_logger.setLevel(min(orig_level, level))
+ try:
+ yield handler
+ finally:
+ if level is not None:
+ root_logger.setLevel(orig_level)
+ if add_new_handler:
+ root_logger.removeHandler(handler)
+
+
+class LogCaptureHandler(logging.StreamHandler):
+ """A logging handler that stores log records and the log text."""
+
+ def __init__(self):
+ """Creates a new log handler."""
+ logging.StreamHandler.__init__(self, py.io.TextIO())
+ self.records = []
+
+ def emit(self, record):
+ """Keep the log records in a list in addition to the log text."""
+ self.records.append(record)
+ logging.StreamHandler.emit(self, record)
+
+ def reset(self):
+ self.records = []
+ self.stream = py.io.TextIO()
+
+
+class LogCaptureFixture(object):
+ """Provides access and control of log capturing."""
+
+ def __init__(self, item):
+ """Creates a new funcarg."""
+ self._item = item
+ # dict of log name -> log level
+ self._initial_log_levels = {} # Dict[str, int]
+
+ def _finalize(self):
+ """Finalizes the fixture.
+
+ This restores the log levels changed by :meth:`set_level`.
+ """
+ # restore log levels
+ for logger_name, level in self._initial_log_levels.items():
+ logger = logging.getLogger(logger_name)
+ logger.setLevel(level)
+
+ @property
+ def handler(self):
+ """
+ :rtype: LogCaptureHandler
+ """
+ return self._item.catch_log_handler
+
+ def get_records(self, when):
+ """
+ Get the logging records for one of the possible test phases.
+
+ :param str when:
+ Which test phase to obtain the records from. Valid values are: "setup", "call" and "teardown".
+
+ :rtype: List[logging.LogRecord]
+ :return: the list of captured records at the given stage
+
+ .. versionadded:: 3.4
+ """
+ handler = self._item.catch_log_handlers.get(when)
+ if handler:
+ return handler.records
+ else:
+ return []
+
+ @property
+ def text(self):
+ """Returns the formatted log text."""
+ return _remove_ansi_escape_sequences(self.handler.stream.getvalue())
+
+ @property
+ def records(self):
+ """Returns the list of log records."""
+ return self.handler.records
+
+ @property
+ def record_tuples(self):
+ """Returns a list of a stripped down version of log records intended
+ for use in assertion comparison.
+
+ The format of the tuple is:
+
+ (logger_name, log_level, message)
+ """
+ return [(r.name, r.levelno, r.getMessage()) for r in self.records]
+
+ @property
+ def messages(self):
+ """Returns a list of format-interpolated log messages.
+
+ Unlike 'records', which contains the format string and parameters for interpolation, log messages in this list
+ are all interpolated.
+ Unlike 'text', which contains the output from the handler, log messages in this list are unadorned with
+ levels, timestamps, etc, making exact comparisons more reliable.
+
+ Note that traceback or stack info (from :func:`logging.exception` or the `exc_info` or `stack_info` arguments
+ to the logging functions) is not included, as this is added by the formatter in the handler.
+
+ .. versionadded:: 3.7
+ """
+ return [r.getMessage() for r in self.records]
+
+ def clear(self):
+ """Reset the list of log records and the captured log text."""
+ self.handler.reset()
+
+ def set_level(self, level, logger=None):
+ """Sets the level for capturing of logs. The level will be restored to its previous value at the end of
+ the test.
+
+ :param int level: the logger to level.
+ :param str logger: the logger to update the level. If not given, the root logger level is updated.
+
+ .. versionchanged:: 3.4
+ The levels of the loggers changed by this function will be restored to their initial values at the
+ end of the test.
+ """
+ logger_name = logger
+ logger = logging.getLogger(logger_name)
+ # save the original log-level to restore it during teardown
+ self._initial_log_levels.setdefault(logger_name, logger.level)
+ logger.setLevel(level)
+
+ @contextmanager
+ def at_level(self, level, logger=None):
+ """Context manager that sets the level for capturing of logs. After the end of the 'with' statement the
+ level is restored to its original value.
+
+ :param int level: the logger to level.
+ :param str logger: the logger to update the level. If not given, the root logger level is updated.
+ """
+ logger = logging.getLogger(logger)
+ orig_level = logger.level
+ logger.setLevel(level)
+ try:
+ yield
+ finally:
+ logger.setLevel(orig_level)
+
+
+@pytest.fixture
+def caplog(request):
+ """Access and control log capturing.
+
+ Captured logs are available through the following properties/methods::
+
+ * caplog.text -> string containing formatted log output
+ * caplog.records -> list of logging.LogRecord instances
+ * caplog.record_tuples -> list of (logger_name, level, message) tuples
+ * caplog.clear() -> clear captured records and formatted log output string
+ """
+ result = LogCaptureFixture(request.node)
+ yield result
+ result._finalize()
+
+
+def get_actual_log_level(config, *setting_names):
+ """Return the actual logging level."""
+
+ for setting_name in setting_names:
+ log_level = config.getoption(setting_name)
+ if log_level is None:
+ log_level = config.getini(setting_name)
+ if log_level:
+ break
+ else:
+ return
+
+ if isinstance(log_level, six.string_types):
+ log_level = log_level.upper()
+ try:
+ return int(getattr(logging, log_level, log_level))
+ except ValueError:
+ # Python logging does not recognise this as a logging level
+ raise pytest.UsageError(
+ "'{}' is not recognized as a logging level name for "
+ "'{}'. Please consider passing the "
+ "logging level num instead.".format(log_level, setting_name)
+ )
+
+
+# run after terminalreporter/capturemanager are configured
+@pytest.hookimpl(trylast=True)
+def pytest_configure(config):
+ config.pluginmanager.register(LoggingPlugin(config), "logging-plugin")
+
+
+class LoggingPlugin(object):
+ """Attaches to the logging module and captures log messages for each test.
+ """
+
+ def __init__(self, config):
+ """Creates a new plugin to capture log messages.
+
+ The formatter can be safely shared across all handlers so
+ create a single one for the entire test session here.
+ """
+ self._config = config
+
+ self.print_logs = get_option_ini(config, "log_print")
+ self.formatter = self._create_formatter(
+ get_option_ini(config, "log_format"),
+ get_option_ini(config, "log_date_format"),
+ )
+ self.log_level = get_actual_log_level(config, "log_level")
+
+ self.log_file_level = get_actual_log_level(config, "log_file_level")
+ self.log_file_format = get_option_ini(config, "log_file_format", "log_format")
+ self.log_file_date_format = get_option_ini(
+ config, "log_file_date_format", "log_date_format"
+ )
+ self.log_file_formatter = logging.Formatter(
+ self.log_file_format, datefmt=self.log_file_date_format
+ )
+
+ log_file = get_option_ini(config, "log_file")
+ if log_file:
+ self.log_file_handler = logging.FileHandler(
+ log_file, mode="w", encoding="UTF-8"
+ )
+ self.log_file_handler.setFormatter(self.log_file_formatter)
+ else:
+ self.log_file_handler = None
+
+ self.log_cli_handler = None
+
+ self.live_logs_context = lambda: dummy_context_manager()
+ # Note that the lambda for the live_logs_context is needed because
+ # live_logs_context can otherwise not be entered multiple times due
+ # to limitations of contextlib.contextmanager.
+
+ if self._log_cli_enabled():
+ self._setup_cli_logging()
+
+ def _create_formatter(self, log_format, log_date_format):
+ # color option doesn't exist if terminal plugin is disabled
+ color = getattr(self._config.option, "color", "no")
+ if color != "no" and ColoredLevelFormatter.LEVELNAME_FMT_REGEX.search(
+ log_format
+ ):
+ formatter = ColoredLevelFormatter(
+ create_terminal_writer(self._config), log_format, log_date_format
+ )
+ else:
+ formatter = logging.Formatter(log_format, log_date_format)
+
+ if not six.PY2:
+ formatter._style = PercentStyleMultiline(formatter._style._fmt)
+ return formatter
+
+ def _setup_cli_logging(self):
+ config = self._config
+ terminal_reporter = config.pluginmanager.get_plugin("terminalreporter")
+ if terminal_reporter is None:
+ # terminal reporter is disabled e.g. by pytest-xdist.
+ return
+
+ capture_manager = config.pluginmanager.get_plugin("capturemanager")
+ # if capturemanager plugin is disabled, live logging still works.
+ log_cli_handler = _LiveLoggingStreamHandler(terminal_reporter, capture_manager)
+
+ log_cli_formatter = self._create_formatter(
+ get_option_ini(config, "log_cli_format", "log_format"),
+ get_option_ini(config, "log_cli_date_format", "log_date_format"),
+ )
+
+ log_cli_level = get_actual_log_level(config, "log_cli_level", "log_level")
+ self.log_cli_handler = log_cli_handler
+ self.live_logs_context = lambda: catching_logs(
+ log_cli_handler, formatter=log_cli_formatter, level=log_cli_level
+ )
+
+ def set_log_path(self, fname):
+ """Public method, which can set filename parameter for
+ Logging.FileHandler(). Also creates parent directory if
+ it does not exist.
+
+ .. warning::
+ Please considered as an experimental API.
+ """
+ fname = Path(fname)
+
+ if not fname.is_absolute():
+ fname = Path(self._config.rootdir, fname)
+
+ if not fname.parent.exists():
+ fname.parent.mkdir(exist_ok=True, parents=True)
+
+ self.log_file_handler = logging.FileHandler(
+ str(fname), mode="w", encoding="UTF-8"
+ )
+ self.log_file_handler.setFormatter(self.log_file_formatter)
+
+ def _log_cli_enabled(self):
+ """Return True if log_cli should be considered enabled, either explicitly
+ or because --log-cli-level was given in the command-line.
+ """
+ return self._config.getoption(
+ "--log-cli-level"
+ ) is not None or self._config.getini("log_cli")
+
+ @pytest.hookimpl(hookwrapper=True, tryfirst=True)
+ def pytest_collection(self):
+ with self.live_logs_context():
+ if self.log_cli_handler:
+ self.log_cli_handler.set_when("collection")
+
+ if self.log_file_handler is not None:
+ with catching_logs(self.log_file_handler, level=self.log_file_level):
+ yield
+ else:
+ yield
+
+ @contextmanager
+ def _runtest_for(self, item, when):
+ with self._runtest_for_main(item, when):
+ if self.log_file_handler is not None:
+ with catching_logs(self.log_file_handler, level=self.log_file_level):
+ yield
+ else:
+ yield
+
+ @contextmanager
+ def _runtest_for_main(self, item, when):
+ """Implements the internals of pytest_runtest_xxx() hook."""
+ with catching_logs(
+ LogCaptureHandler(), formatter=self.formatter, level=self.log_level
+ ) as log_handler:
+ if self.log_cli_handler:
+ self.log_cli_handler.set_when(when)
+
+ if item is None:
+ yield # run the test
+ return
+
+ if not hasattr(item, "catch_log_handlers"):
+ item.catch_log_handlers = {}
+ item.catch_log_handlers[when] = log_handler
+ item.catch_log_handler = log_handler
+ try:
+ yield # run test
+ finally:
+ if when == "teardown":
+ del item.catch_log_handler
+ del item.catch_log_handlers
+
+ if self.print_logs:
+ # Add a captured log section to the report.
+ log = log_handler.stream.getvalue().strip()
+ item.add_report_section(when, "log", log)
+
+ @pytest.hookimpl(hookwrapper=True)
+ def pytest_runtest_setup(self, item):
+ with self._runtest_for(item, "setup"):
+ yield
+
+ @pytest.hookimpl(hookwrapper=True)
+ def pytest_runtest_call(self, item):
+ with self._runtest_for(item, "call"):
+ yield
+
+ @pytest.hookimpl(hookwrapper=True)
+ def pytest_runtest_teardown(self, item):
+ with self._runtest_for(item, "teardown"):
+ yield
+
+ @pytest.hookimpl(hookwrapper=True)
+ def pytest_runtest_logstart(self):
+ if self.log_cli_handler:
+ self.log_cli_handler.reset()
+ with self._runtest_for(None, "start"):
+ yield
+
+ @pytest.hookimpl(hookwrapper=True)
+ def pytest_runtest_logfinish(self):
+ with self._runtest_for(None, "finish"):
+ yield
+
+ @pytest.hookimpl(hookwrapper=True)
+ def pytest_runtest_logreport(self):
+ with self._runtest_for(None, "logreport"):
+ yield
+
+ @pytest.hookimpl(hookwrapper=True, tryfirst=True)
+ def pytest_sessionfinish(self):
+ with self.live_logs_context():
+ if self.log_cli_handler:
+ self.log_cli_handler.set_when("sessionfinish")
+ if self.log_file_handler is not None:
+ try:
+ with catching_logs(
+ self.log_file_handler, level=self.log_file_level
+ ):
+ yield
+ finally:
+ # Close the FileHandler explicitly.
+ # (logging.shutdown might have lost the weakref?!)
+ self.log_file_handler.close()
+ else:
+ yield
+
+ @pytest.hookimpl(hookwrapper=True, tryfirst=True)
+ def pytest_sessionstart(self):
+ with self.live_logs_context():
+ if self.log_cli_handler:
+ self.log_cli_handler.set_when("sessionstart")
+ if self.log_file_handler is not None:
+ with catching_logs(self.log_file_handler, level=self.log_file_level):
+ yield
+ else:
+ yield
+
+ @pytest.hookimpl(hookwrapper=True)
+ def pytest_runtestloop(self, session):
+ """Runs all collected test items."""
+
+ if session.config.option.collectonly:
+ yield
+ return
+
+ if self._log_cli_enabled() and self._config.getoption("verbose") < 1:
+ # setting verbose flag is needed to avoid messy test progress output
+ self._config.option.verbose = 1
+
+ with self.live_logs_context():
+ if self.log_file_handler is not None:
+ with catching_logs(self.log_file_handler, level=self.log_file_level):
+ yield # run all the tests
+ else:
+ yield # run all the tests
+
+
+class _LiveLoggingStreamHandler(logging.StreamHandler):
+ """
+ Custom StreamHandler used by the live logging feature: it will write a newline before the first log message
+ in each test.
+
+ During live logging we must also explicitly disable stdout/stderr capturing otherwise it will get captured
+ and won't appear in the terminal.
+ """
+
+ def __init__(self, terminal_reporter, capture_manager):
+ """
+ :param _pytest.terminal.TerminalReporter terminal_reporter:
+ :param _pytest.capture.CaptureManager capture_manager:
+ """
+ logging.StreamHandler.__init__(self, stream=terminal_reporter)
+ self.capture_manager = capture_manager
+ self.reset()
+ self.set_when(None)
+ self._test_outcome_written = False
+
+ def reset(self):
+ """Reset the handler; should be called before the start of each test"""
+ self._first_record_emitted = False
+
+ def set_when(self, when):
+ """Prepares for the given test phase (setup/call/teardown)"""
+ self._when = when
+ self._section_name_shown = False
+ if when == "start":
+ self._test_outcome_written = False
+
+ def emit(self, record):
+ ctx_manager = (
+ self.capture_manager.global_and_fixture_disabled()
+ if self.capture_manager
+ else dummy_context_manager()
+ )
+ with ctx_manager:
+ if not self._first_record_emitted:
+ self.stream.write("\n")
+ self._first_record_emitted = True
+ elif self._when in ("teardown", "finish"):
+ if not self._test_outcome_written:
+ self._test_outcome_written = True
+ self.stream.write("\n")
+ if not self._section_name_shown and self._when:
+ self.stream.section("live log " + self._when, sep="-", bold=True)
+ self._section_name_shown = True
+ logging.StreamHandler.emit(self, record)