aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/allure-pytest/allure_pytest
diff options
context:
space:
mode:
authoriddqd <iddqd@yandex-team.com>2024-05-13 17:19:30 +0300
committeriddqd <iddqd@yandex-team.com>2024-05-13 17:28:44 +0300
commit84d127b9b7e96ba4352e3f5ddc9222aee9a66053 (patch)
tree2ebb2689abf65e68dfe92a3ca9b161b4b6ae183f /contrib/python/allure-pytest/allure_pytest
parentb7deb7f0b71db7419781d1b0357dfa443ccc3ff1 (diff)
downloadydb-84d127b9b7e96ba4352e3f5ddc9222aee9a66053.tar.gz
Add allure support to ydb github export
d6cba27d09fb5e50a99c36070a6a3545c8393ea1
Diffstat (limited to 'contrib/python/allure-pytest/allure_pytest')
-rw-r--r--contrib/python/allure-pytest/allure_pytest/__init__.py0
-rw-r--r--contrib/python/allure-pytest/allure_pytest/compat.py34
-rw-r--r--contrib/python/allure-pytest/allure_pytest/helper.py47
-rw-r--r--contrib/python/allure-pytest/allure_pytest/listener.py364
-rw-r--r--contrib/python/allure-pytest/allure_pytest/plugin.py236
-rw-r--r--contrib/python/allure-pytest/allure_pytest/utils.py197
6 files changed, 878 insertions, 0 deletions
diff --git a/contrib/python/allure-pytest/allure_pytest/__init__.py b/contrib/python/allure-pytest/allure_pytest/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/contrib/python/allure-pytest/allure_pytest/__init__.py
diff --git a/contrib/python/allure-pytest/allure_pytest/compat.py b/contrib/python/allure-pytest/allure_pytest/compat.py
new file mode 100644
index 0000000000..bf7db2dd27
--- /dev/null
+++ b/contrib/python/allure-pytest/allure_pytest/compat.py
@@ -0,0 +1,34 @@
+"""Provides compatibility with different pytest versions."""
+
+from inspect import signature
+
+__GETFIXTUREDEFS_2ND_PAR_IS_STR = None
+
+
+def getfixturedefs(fixturemanager, name, item):
+ """Calls FixtureManager.getfixturedefs in a way compatible with Python
+ versions before and after the change described in pytest-dev/pytest#11785.
+ """
+ getfixturedefs = fixturemanager.getfixturedefs
+ itemarg = __resolve_getfixturedefs_2nd_arg(getfixturedefs, item)
+ return getfixturedefs(name, itemarg)
+
+
+def __resolve_getfixturedefs_2nd_arg(getfixturedefs, item):
+ # Starting from pytest 8.1, getfixturedefs requires the item itself.
+ # In earlier versions it requires the nodeid string.
+ return item.nodeid if __2nd_parameter_is_str(getfixturedefs) else item
+
+
+def __2nd_parameter_is_str(getfixturedefs):
+ global __GETFIXTUREDEFS_2ND_PAR_IS_STR
+ if __GETFIXTUREDEFS_2ND_PAR_IS_STR is None:
+ __GETFIXTUREDEFS_2ND_PAR_IS_STR =\
+ __get_2nd_parameter_type(getfixturedefs) is str
+ return __GETFIXTUREDEFS_2ND_PAR_IS_STR
+
+
+def __get_2nd_parameter_type(fn):
+ return list(
+ signature(fn).parameters.values()
+ )[1].annotation
diff --git a/contrib/python/allure-pytest/allure_pytest/helper.py b/contrib/python/allure-pytest/allure_pytest/helper.py
new file mode 100644
index 0000000000..e6944ef40c
--- /dev/null
+++ b/contrib/python/allure-pytest/allure_pytest/helper.py
@@ -0,0 +1,47 @@
+import pytest
+import allure_commons
+from allure_pytest.utils import ALLURE_DESCRIPTION_MARK, ALLURE_DESCRIPTION_HTML_MARK
+from allure_pytest.utils import ALLURE_LABEL_MARK, ALLURE_LINK_MARK
+from allure_pytest.utils import format_allure_link
+
+
+class AllureTitleHelper:
+ @allure_commons.hookimpl
+ def decorate_as_title(self, test_title):
+ def decorator(func):
+ # pytest.fixture wraps function, so we need to get it directly
+ if getattr(func, '__pytest_wrapped__', None):
+ function = func.__pytest_wrapped__.obj
+ else:
+ function = func
+ function.__allure_display_name__ = test_title
+ return func
+
+ return decorator
+
+
+class AllureTestHelper:
+ def __init__(self, config):
+ self.config = config
+
+ @allure_commons.hookimpl
+ def decorate_as_description(self, test_description):
+ allure_description = getattr(pytest.mark, ALLURE_DESCRIPTION_MARK)
+ return allure_description(test_description)
+
+ @allure_commons.hookimpl
+ def decorate_as_description_html(self, test_description_html):
+ allure_description_html = getattr(pytest.mark, ALLURE_DESCRIPTION_HTML_MARK)
+ return allure_description_html(test_description_html)
+
+ @allure_commons.hookimpl
+ def decorate_as_label(self, label_type, labels):
+ allure_label = getattr(pytest.mark, ALLURE_LABEL_MARK)
+ return allure_label(*labels, label_type=label_type)
+
+ @allure_commons.hookimpl
+ def decorate_as_link(self, url, link_type, name):
+ url = format_allure_link(self.config, url, link_type)
+ allure_link = getattr(pytest.mark, ALLURE_LINK_MARK)
+ name = url if name is None else name
+ return allure_link(url, name=name, link_type=link_type)
diff --git a/contrib/python/allure-pytest/allure_pytest/listener.py b/contrib/python/allure-pytest/allure_pytest/listener.py
new file mode 100644
index 0000000000..1115363091
--- /dev/null
+++ b/contrib/python/allure-pytest/allure_pytest/listener.py
@@ -0,0 +1,364 @@
+import pytest
+import doctest
+
+import allure_commons
+from allure_commons.utils import now
+from allure_commons.utils import uuid4
+from allure_commons.utils import represent
+from allure_commons.utils import platform_label
+from allure_commons.utils import host_tag, thread_tag
+from allure_commons.utils import md5
+from allure_commons.reporter import AllureReporter
+from allure_commons.model2 import TestStepResult, TestResult, TestBeforeResult, TestAfterResult
+from allure_commons.model2 import TestResultContainer
+from allure_commons.model2 import StatusDetails
+from allure_commons.model2 import Parameter
+from allure_commons.model2 import Label, Link
+from allure_commons.model2 import Status
+from allure_commons.types import LabelType, AttachmentType, ParameterMode
+from allure_pytest.utils import allure_description, allure_description_html
+from allure_pytest.utils import allure_labels, allure_links, pytest_markers
+from allure_pytest.utils import allure_full_name, allure_package, allure_name
+from allure_pytest.utils import allure_suite_labels
+from allure_pytest.utils import get_status, get_status_details
+from allure_pytest.utils import get_outcome_status, get_outcome_status_details
+from allure_pytest.utils import get_pytest_report_status
+from allure_pytest.utils import format_allure_link
+from allure_pytest.utils import get_history_id
+from allure_pytest.compat import getfixturedefs
+
+
+class AllureListener:
+
+ SUITE_LABELS = {
+ LabelType.PARENT_SUITE,
+ LabelType.SUITE,
+ LabelType.SUB_SUITE,
+ }
+
+ def __init__(self, config):
+ self.config = config
+ self.allure_logger = AllureReporter()
+ self._cache = ItemCache()
+ self._host = host_tag()
+ self._thread = thread_tag()
+
+ @allure_commons.hookimpl
+ def start_step(self, uuid, title, params):
+ parameters = [Parameter(name=name, value=value) for name, value in params.items()]
+ step = TestStepResult(name=title, start=now(), parameters=parameters)
+ self.allure_logger.start_step(None, uuid, step)
+
+ @allure_commons.hookimpl
+ def stop_step(self, uuid, exc_type, exc_val, exc_tb):
+ self.allure_logger.stop_step(uuid,
+ stop=now(),
+ status=get_status(exc_val),
+ statusDetails=get_status_details(exc_type, exc_val, exc_tb))
+
+ @allure_commons.hookimpl
+ def start_fixture(self, parent_uuid, uuid, name):
+ after_fixture = TestAfterResult(name=name, start=now())
+ self.allure_logger.start_after_fixture(parent_uuid, uuid, after_fixture)
+
+ @allure_commons.hookimpl
+ def stop_fixture(self, parent_uuid, uuid, name, exc_type, exc_val, exc_tb):
+ self.allure_logger.stop_after_fixture(uuid,
+ stop=now(),
+ status=get_status(exc_val),
+ statusDetails=get_status_details(exc_type, exc_val, exc_tb))
+
+ def _update_fixtures_children(self, item):
+ uuid = self._cache.get(item.nodeid)
+ for fixturedef in _test_fixtures(item):
+ group_uuid = self._cache.get(fixturedef)
+ if group_uuid:
+ group = self.allure_logger.get_item(group_uuid)
+ else:
+ group_uuid = self._cache.push(fixturedef)
+ group = TestResultContainer(uuid=group_uuid)
+ self.allure_logger.start_group(group_uuid, group)
+ if uuid not in group.children:
+ self.allure_logger.update_group(group_uuid, children=uuid)
+
+ @pytest.hookimpl(hookwrapper=True, tryfirst=True)
+ def pytest_runtest_protocol(self, item, nextitem):
+ uuid = self._cache.push(item.nodeid)
+ test_result = TestResult(name=item.name, uuid=uuid, start=now(), stop=now())
+ self.allure_logger.schedule_test(uuid, test_result)
+ yield
+ uuid = self._cache.pop(item.nodeid)
+ if uuid:
+ test_result = self.allure_logger.get_test(uuid)
+ if test_result.status is None:
+ test_result.status = Status.SKIPPED
+ self.allure_logger.close_test(uuid)
+
+ @pytest.hookimpl(hookwrapper=True)
+ def pytest_runtest_setup(self, item):
+ if not self._cache.get(item.nodeid):
+ uuid = self._cache.push(item.nodeid)
+ test_result = TestResult(name=item.name, uuid=uuid, start=now(), stop=now())
+ self.allure_logger.schedule_test(uuid, test_result)
+ yield
+ self._update_fixtures_children(item)
+ uuid = self._cache.get(item.nodeid)
+ test_result = self.allure_logger.get_test(uuid)
+ params = self.__get_pytest_params(item)
+ param_id = self.__get_pytest_param_id(item)
+ test_result.name = allure_name(item, params, param_id)
+ full_name = allure_full_name(item)
+ test_result.fullName = full_name
+ test_result.testCaseId = md5(full_name)
+ test_result.description = allure_description(item)
+ test_result.descriptionHtml = allure_description_html(item)
+ current_param_names = [param.name for param in test_result.parameters]
+ test_result.parameters.extend([
+ Parameter(name=name, value=represent(value))
+ for name, value in params.items()
+ if name not in current_param_names
+ ])
+
+ @pytest.hookimpl(hookwrapper=True)
+ def pytest_runtest_call(self, item):
+ uuid = self._cache.get(item.nodeid)
+ test_result = self.allure_logger.get_test(uuid)
+ if test_result:
+ self.allure_logger.drop_test(uuid)
+ self.allure_logger.schedule_test(uuid, test_result)
+ test_result.start = now()
+ yield
+ self._update_fixtures_children(item)
+ if test_result:
+ test_result.stop = now()
+
+ @pytest.hookimpl(hookwrapper=True)
+ def pytest_runtest_teardown(self, item):
+ yield
+ uuid = self._cache.get(item.nodeid)
+ test_result = self.allure_logger.get_test(uuid)
+ test_result.historyId = get_history_id(
+ test_result.fullName,
+ test_result.parameters,
+ original_values=self.__get_pytest_params(item)
+ )
+ test_result.labels.extend([Label(name=name, value=value) for name, value in allure_labels(item)])
+ test_result.labels.extend([Label(name=LabelType.TAG, value=value) for value in pytest_markers(item)])
+ self.__apply_default_suites(item, test_result)
+ test_result.labels.append(Label(name=LabelType.HOST, value=self._host))
+ test_result.labels.append(Label(name=LabelType.THREAD, value=self._thread))
+ test_result.labels.append(Label(name=LabelType.FRAMEWORK, value='pytest'))
+ test_result.labels.append(Label(name=LabelType.LANGUAGE, value=platform_label()))
+ test_result.labels.append(Label(name='package', value=allure_package(item)))
+ test_result.links.extend([Link(link_type, url, name) for link_type, url, name in allure_links(item)])
+
+ @pytest.hookimpl(hookwrapper=True)
+ def pytest_fixture_setup(self, fixturedef, request):
+ fixture_name = getattr(fixturedef.func, '__allure_display_name__', fixturedef.argname)
+
+ container_uuid = self._cache.get(fixturedef)
+
+ if not container_uuid:
+ container_uuid = self._cache.push(fixturedef)
+ container = TestResultContainer(uuid=container_uuid)
+ self.allure_logger.start_group(container_uuid, container)
+
+ self.allure_logger.update_group(container_uuid, start=now())
+
+ before_fixture_uuid = uuid4()
+ before_fixture = TestBeforeResult(name=fixture_name, start=now())
+ self.allure_logger.start_before_fixture(container_uuid, before_fixture_uuid, before_fixture)
+
+ outcome = yield
+
+ self.allure_logger.stop_before_fixture(before_fixture_uuid,
+ stop=now(),
+ status=get_outcome_status(outcome),
+ statusDetails=get_outcome_status_details(outcome))
+
+ finalizers = getattr(fixturedef, '_finalizers', [])
+ for index, finalizer in enumerate(finalizers):
+ finalizer_name = getattr(finalizer, "__name__", index)
+ name = f'{fixture_name}::{finalizer_name}'
+ finalizers[index] = allure_commons.fixture(finalizer, parent_uuid=container_uuid, name=name)
+
+ @pytest.hookimpl(hookwrapper=True)
+ def pytest_fixture_post_finalizer(self, fixturedef):
+ yield
+ if hasattr(fixturedef, 'cached_result') and self._cache.get(fixturedef):
+ container_uuid = self._cache.pop(fixturedef)
+ self.allure_logger.stop_group(container_uuid, stop=now())
+
+ @pytest.hookimpl(hookwrapper=True)
+ def pytest_runtest_makereport(self, item, call):
+ uuid = self._cache.get(item.nodeid)
+
+ report = (yield).get_result()
+
+ test_result = self.allure_logger.get_test(uuid)
+ status = get_pytest_report_status(report)
+ status_details = None
+
+ if call.excinfo:
+ message = call.excinfo.exconly()
+ if hasattr(report, 'wasxfail'):
+ reason = report.wasxfail
+ message = (f'XFAIL {reason}' if reason else 'XFAIL') + '\n\n' + message
+ trace = report.longreprtext
+ status_details = StatusDetails(
+ message=message,
+ trace=trace)
+
+ exception = call.excinfo.value
+ if (status != Status.SKIPPED and _exception_brokes_test(exception)):
+ status = Status.BROKEN
+
+ if status == Status.PASSED and hasattr(report, 'wasxfail'):
+ reason = report.wasxfail
+ message = f'XPASS {reason}' if reason else 'XPASS'
+ status_details = StatusDetails(message=message)
+
+ if report.when == 'setup':
+ test_result.status = status
+ test_result.statusDetails = status_details
+
+ if report.when == 'call':
+ if test_result.status == Status.PASSED:
+ test_result.status = status
+ test_result.statusDetails = status_details
+
+ if report.when == 'teardown':
+ if status in (Status.FAILED, Status.BROKEN) and test_result.status == Status.PASSED:
+ test_result.status = status
+ test_result.statusDetails = status_details
+
+ if self.config.option.attach_capture:
+ if report.caplog:
+ self.attach_data(report.caplog, "log", AttachmentType.TEXT, None)
+ if report.capstdout:
+ self.attach_data(report.capstdout, "stdout", AttachmentType.TEXT, None)
+ if report.capstderr:
+ self.attach_data(report.capstderr, "stderr", AttachmentType.TEXT, None)
+
+ @pytest.hookimpl(hookwrapper=True)
+ def pytest_runtest_logfinish(self, nodeid, location):
+ yield
+ uuid = self._cache.pop(nodeid)
+ if uuid:
+ self.allure_logger.close_test(uuid)
+
+ @allure_commons.hookimpl
+ def attach_data(self, body, name, attachment_type, extension):
+ self.allure_logger.attach_data(uuid4(), body, name=name, attachment_type=attachment_type, extension=extension)
+
+ @allure_commons.hookimpl
+ def attach_file(self, source, name, attachment_type, extension):
+ self.allure_logger.attach_file(uuid4(), source, name=name, attachment_type=attachment_type, extension=extension)
+
+ @allure_commons.hookimpl
+ def add_title(self, test_title):
+ test_result = self.allure_logger.get_test(None)
+ if test_result:
+ test_result.name = test_title
+
+ @allure_commons.hookimpl
+ def add_description(self, test_description):
+ test_result = self.allure_logger.get_test(None)
+ if test_result:
+ test_result.description = test_description
+
+ @allure_commons.hookimpl
+ def add_description_html(self, test_description_html):
+ test_result = self.allure_logger.get_test(None)
+ if test_result:
+ test_result.descriptionHtml = test_description_html
+
+ @allure_commons.hookimpl
+ def add_link(self, url, link_type, name):
+ test_result = self.allure_logger.get_test(None)
+ if test_result:
+ link_url = format_allure_link(self.config, url, link_type)
+ new_link = Link(link_type, link_url, link_url if name is None else name)
+ for link in test_result.links:
+ if link.url == new_link.url:
+ return
+ test_result.links.append(new_link)
+
+ @allure_commons.hookimpl
+ def add_label(self, label_type, labels):
+ test_result = self.allure_logger.get_test(None)
+ for label in labels if test_result else ():
+ test_result.labels.append(Label(label_type, label))
+
+ @allure_commons.hookimpl
+ def add_parameter(self, name, value, excluded, mode: ParameterMode):
+ test_result: TestResult = self.allure_logger.get_test(None)
+ existing_param = next(filter(lambda x: x.name == name, test_result.parameters), None)
+ if existing_param:
+ existing_param.value = represent(value)
+ else:
+ test_result.parameters.append(
+ Parameter(
+ name=name,
+ value=represent(value),
+ excluded=excluded or None,
+ mode=mode.value if mode else None
+ )
+ )
+
+ @staticmethod
+ def __get_pytest_params(item):
+ return item.callspec.params if hasattr(item, 'callspec') else {}
+
+ @staticmethod
+ def __get_pytest_param_id(item):
+ return item.callspec.id if hasattr(item, 'callspec') else None
+
+ def __apply_default_suites(self, item, test_result):
+ default_suites = allure_suite_labels(item)
+ existing_suites = {
+ label.name
+ for label in test_result.labels
+ if label.name in AllureListener.SUITE_LABELS
+ }
+ test_result.labels.extend(
+ Label(name=name, value=value)
+ for name, value in default_suites
+ if name not in existing_suites
+ )
+
+
+class ItemCache:
+
+ def __init__(self):
+ self._items = dict()
+
+ def get(self, _id):
+ return self._items.get(id(_id))
+
+ def push(self, _id):
+ return self._items.setdefault(id(_id), uuid4())
+
+ def pop(self, _id):
+ return self._items.pop(id(_id), None)
+
+
+def _test_fixtures(item):
+ fixturemanager = item.session._fixturemanager
+ fixturedefs = []
+
+ if hasattr(item, "_request") and hasattr(item._request, "fixturenames"):
+ for name in item._request.fixturenames:
+ fixturedefs_pytest = getfixturedefs(fixturemanager, name, item)
+ if fixturedefs_pytest:
+ fixturedefs.extend(fixturedefs_pytest)
+
+ return fixturedefs
+
+
+def _exception_brokes_test(exception):
+ return not isinstance(exception, (
+ AssertionError,
+ pytest.fail.Exception,
+ doctest.DocTestFailure
+ ))
diff --git a/contrib/python/allure-pytest/allure_pytest/plugin.py b/contrib/python/allure-pytest/allure_pytest/plugin.py
new file mode 100644
index 0000000000..2771722ffc
--- /dev/null
+++ b/contrib/python/allure-pytest/allure_pytest/plugin.py
@@ -0,0 +1,236 @@
+import argparse
+
+import allure
+import allure_commons
+import os
+
+from allure_commons.types import LabelType, Severity
+from allure_commons.logger import AllureFileLogger
+from allure_commons.utils import get_testplan
+
+from allure_pytest.utils import allure_label, allure_labels, allure_full_name
+from allure_pytest.helper import AllureTestHelper, AllureTitleHelper
+from allure_pytest.listener import AllureListener
+
+from allure_pytest.utils import ALLURE_DESCRIPTION_MARK, ALLURE_DESCRIPTION_HTML_MARK
+from allure_pytest.utils import ALLURE_LABEL_MARK, ALLURE_LINK_MARK
+
+
+def pytest_addoption(parser):
+ parser.getgroup("reporting").addoption('--alluredir',
+ action="store",
+ dest="allure_report_dir",
+ metavar="DIR",
+ default=None,
+ help="Generate Allure report in the specified directory (may not exist)")
+
+ parser.getgroup("reporting").addoption('--clean-alluredir',
+ action="store_true",
+ dest="clean_alluredir",
+ help="Clean alluredir folder if it exists")
+
+ parser.getgroup("reporting").addoption('--allure-no-capture',
+ action="store_false",
+ dest="attach_capture",
+ help="Do not attach pytest captured logging/stdout/stderr to report")
+
+ parser.getgroup("reporting").addoption('--inversion',
+ action="store",
+ dest="inversion",
+ default=False,
+ help="Run tests not in testplan")
+
+ def label_type(type_name, legal_values=set()):
+ def a_label_type(string):
+ atoms = set(string.split(','))
+ if type_name is LabelType.SEVERITY:
+ if not atoms <= legal_values:
+ raise argparse.ArgumentTypeError('Illegal {} values: {}, only [{}] are allowed'.format(
+ type_name,
+ ', '.join(atoms - legal_values),
+ ', '.join(legal_values)
+ ))
+ return set((type_name, allure.severity_level(atom)) for atom in atoms)
+ return set((type_name, atom) for atom in atoms)
+ return a_label_type
+
+ severities = [x.value for x in list(allure.severity_level)]
+ formatted_severities = ', '.join(severities)
+ parser.getgroup("general").addoption('--allure-severities',
+ action="store",
+ dest="allure_severities",
+ metavar="SEVERITIES_SET",
+ default={},
+ type=label_type(LabelType.SEVERITY, legal_values=set(severities)),
+ help=f"""Comma-separated list of severity names.
+ Tests only with these severities will be run.
+ Possible values are: {formatted_severities}.""")
+
+ parser.getgroup("general").addoption('--allure-epics',
+ action="store",
+ dest="allure_epics",
+ metavar="EPICS_SET",
+ default={},
+ type=label_type(LabelType.EPIC),
+ help="""Comma-separated list of epic names.
+ Run tests that have at least one of the specified feature labels.""")
+
+ parser.getgroup("general").addoption('--allure-features',
+ action="store",
+ dest="allure_features",
+ metavar="FEATURES_SET",
+ default={},
+ type=label_type(LabelType.FEATURE),
+ help="""Comma-separated list of feature names.
+ Run tests that have at least one of the specified feature labels.""")
+
+ parser.getgroup("general").addoption('--allure-stories',
+ action="store",
+ dest="allure_stories",
+ metavar="STORIES_SET",
+ default={},
+ type=label_type(LabelType.STORY),
+ help="""Comma-separated list of story names.
+ Run tests that have at least one of the specified story labels.""")
+
+ parser.getgroup("general").addoption('--allure-ids',
+ action="store",
+ dest="allure_ids",
+ metavar="IDS_SET",
+ default={},
+ type=label_type(LabelType.ID),
+ help="""Comma-separated list of IDs.
+ Run tests that have at least one of the specified id labels.""")
+
+ def cf_type(string):
+ type_name, values = string.split("=", 1)
+ atoms = set(values.split(","))
+ return [(type_name, atom) for atom in atoms]
+
+ parser.getgroup("general").addoption('--allure-label',
+ action="append",
+ dest="allure_labels",
+ metavar="LABELS_SET",
+ default=[],
+ type=cf_type,
+ help="""List of labels to run in format label_name=value1,value2.
+ "Run tests that have at least one of the specified labels.""")
+
+ def link_pattern(string):
+ pattern = string.split(':', 1)
+ if not pattern[0]:
+ raise argparse.ArgumentTypeError('Link type is mandatory.')
+
+ if len(pattern) != 2:
+ raise argparse.ArgumentTypeError('Link pattern is mandatory')
+ return pattern
+
+ parser.getgroup("general").addoption('--allure-link-pattern',
+ action="append",
+ dest="allure_link_pattern",
+ metavar="LINK_TYPE:LINK_PATTERN",
+ default=[],
+ type=link_pattern,
+ help="""Url pattern for link type. Allows short links in test,
+ like 'issue-1'. Text will be formatted to full url with python
+ str.format().""")
+
+
+def cleanup_factory(plugin):
+ def clean_up():
+ name = allure_commons.plugin_manager.get_name(plugin)
+ allure_commons.plugin_manager.unregister(name=name)
+ return clean_up
+
+
+def pytest_addhooks(pluginmanager):
+ # Need register title hooks before conftest init
+ title_helper = AllureTitleHelper()
+ allure_commons.plugin_manager.register(title_helper)
+
+
+def pytest_configure(config):
+ report_dir = config.option.allure_report_dir
+ clean = False if config.option.collectonly else config.option.clean_alluredir
+
+ test_helper = AllureTestHelper(config)
+ allure_commons.plugin_manager.register(test_helper)
+ config.add_cleanup(cleanup_factory(test_helper))
+
+ if report_dir:
+ report_dir = os.path.abspath(report_dir)
+ test_listener = AllureListener(config)
+ config.pluginmanager.register(test_listener, 'allure_listener')
+ allure_commons.plugin_manager.register(test_listener)
+ config.add_cleanup(cleanup_factory(test_listener))
+
+ file_logger = AllureFileLogger(report_dir, clean)
+ allure_commons.plugin_manager.register(file_logger)
+ config.add_cleanup(cleanup_factory(file_logger))
+
+ config.addinivalue_line("markers", f"{ALLURE_LABEL_MARK}: allure label marker")
+ config.addinivalue_line("markers", f"{ALLURE_LINK_MARK}: allure link marker")
+ config.addinivalue_line("markers", f"{ALLURE_DESCRIPTION_MARK}: allure description")
+ config.addinivalue_line("markers", f"{ALLURE_DESCRIPTION_HTML_MARK}: allure description html")
+
+
+def select_by_labels(items, config):
+ arg_labels = set().union(
+ config.option.allure_epics,
+ config.option.allure_features,
+ config.option.allure_stories,
+ config.option.allure_ids,
+ config.option.allure_severities,
+ *config.option.allure_labels
+ )
+ if arg_labels:
+ selected, deselected = [], []
+ for item in items:
+ test_labels = set(allure_labels(item))
+ test_severity = allure_label(item, LabelType.SEVERITY)
+ if not test_severity:
+ test_labels.add((LabelType.SEVERITY, Severity.NORMAL))
+ if arg_labels & test_labels:
+ selected.append(item)
+ else:
+ deselected.append(item)
+ return selected, deselected
+ else:
+ return items, []
+
+
+def select_by_testcase(items, config):
+ planned_tests = get_testplan()
+ is_inversion = config.option.inversion
+
+ if planned_tests:
+
+ def is_planed(item):
+ allure_ids = allure_label(item, LabelType.ID)
+ allure_string_ids = list(map(str, allure_ids))
+ for planed_item in planned_tests:
+ planed_item_string_id = str(planed_item.get("id"))
+ planed_item_selector = planed_item.get("selector")
+ if (
+ planed_item_string_id in allure_string_ids
+ or planed_item_selector == allure_full_name(item)
+ ):
+ return True if not is_inversion else False
+ return False if not is_inversion else True
+
+ selected, deselected = [], []
+ for item in items:
+ selected.append(item) if is_planed(item) else deselected.append(item)
+ return selected, deselected
+ else:
+ return items, []
+
+
+def pytest_collection_modifyitems(items, config):
+ selected, deselected_by_testcase = select_by_testcase(items, config)
+ selected, deselected_by_labels = select_by_labels(selected, config)
+
+ items[:] = selected
+
+ if deselected_by_testcase or deselected_by_labels:
+ config.hook.pytest_deselected(items=[*deselected_by_testcase, *deselected_by_labels])
diff --git a/contrib/python/allure-pytest/allure_pytest/utils.py b/contrib/python/allure-pytest/allure_pytest/utils.py
new file mode 100644
index 0000000000..1e07cb492c
--- /dev/null
+++ b/contrib/python/allure-pytest/allure_pytest/utils.py
@@ -0,0 +1,197 @@
+import pytest
+from itertools import chain, islice
+from allure_commons.utils import represent, SafeFormatter, md5
+from allure_commons.utils import format_exception, format_traceback
+from allure_commons.model2 import Status
+from allure_commons.model2 import StatusDetails
+from allure_commons.types import LabelType
+
+
+ALLURE_DESCRIPTION_MARK = 'allure_description'
+ALLURE_DESCRIPTION_HTML_MARK = 'allure_description_html'
+ALLURE_LABEL_MARK = 'allure_label'
+ALLURE_LINK_MARK = 'allure_link'
+ALLURE_UNIQUE_LABELS = [
+ LabelType.SEVERITY,
+ LabelType.FRAMEWORK,
+ LabelType.HOST,
+ LabelType.SUITE,
+ LabelType.PARENT_SUITE,
+ LabelType.SUB_SUITE
+]
+
+
+def get_marker_value(item, keyword):
+ marker = item.get_closest_marker(keyword)
+ return marker.args[0] if marker and marker.args else None
+
+
+def allure_title(item):
+ return getattr(
+ getattr(item, "obj", None),
+ "__allure_display_name__",
+ None
+ )
+
+
+def allure_description(item):
+ description = get_marker_value(item, ALLURE_DESCRIPTION_MARK)
+ if description:
+ return description
+ elif hasattr(item, 'function'):
+ return item.function.__doc__
+
+
+def allure_description_html(item):
+ return get_marker_value(item, ALLURE_DESCRIPTION_HTML_MARK)
+
+
+def allure_label(item, label):
+ labels = []
+ for mark in item.iter_markers(name=ALLURE_LABEL_MARK):
+ if mark.kwargs.get("label_type") == label:
+ labels.extend(mark.args)
+ return labels
+
+
+def allure_labels(item):
+ unique_labels = dict()
+ labels = set()
+ for mark in item.iter_markers(name=ALLURE_LABEL_MARK):
+ label_type = mark.kwargs["label_type"]
+ if label_type in ALLURE_UNIQUE_LABELS:
+ if label_type not in unique_labels.keys():
+ unique_labels[label_type] = mark.args[0]
+ else:
+ for arg in mark.args:
+ labels.add((label_type, arg))
+ for k, v in unique_labels.items():
+ labels.add((k, v))
+ return labels
+
+
+def allure_links(item):
+ for mark in item.iter_markers(name=ALLURE_LINK_MARK):
+ yield (mark.kwargs["link_type"], mark.args[0], mark.kwargs["name"])
+
+
+def format_allure_link(config, url, link_type):
+ pattern = dict(config.option.allure_link_pattern).get(link_type, '{}')
+ return pattern.format(url)
+
+
+def pytest_markers(item):
+ for keyword in item.keywords.keys():
+ if any([keyword.startswith('allure_'), keyword == 'parametrize']):
+ continue
+ marker = item.get_closest_marker(keyword)
+ if marker is None:
+ continue
+
+ yield mark_to_str(marker)
+
+
+def mark_to_str(marker):
+ args = [represent(arg) for arg in marker.args]
+ kwargs = [f'{key}={represent(value)}' for key, value in marker.kwargs.items()]
+ if marker.name in ('filterwarnings', 'skip', 'skipif', 'xfail', 'usefixtures', 'tryfirst', 'trylast'):
+ markstr = f'@pytest.mark.{marker.name}'
+ else:
+ markstr = str(marker.name)
+ if args or kwargs:
+ parameters = ', '.join(args + kwargs)
+ markstr = f'{markstr}({parameters})'
+ return markstr
+
+
+def allure_package(item):
+ parts = item.nodeid.split('::')
+ path = parts[0].rsplit('.', 1)[0]
+ return path.replace('/', '.')
+
+
+def allure_name(item, parameters, param_id=None):
+ name = item.name
+ title = allure_title(item)
+ param_id_kwargs = {}
+ if param_id:
+ # if param_id is an ASCII string, it could have been encoded by pytest (_pytest.compat.ascii_escaped)
+ if param_id.isascii():
+ param_id = param_id.encode().decode("unicode-escape")
+ param_id_kwargs["param_id"] = param_id
+ return SafeFormatter().format(
+ title,
+ **{**param_id_kwargs, **parameters, **item.funcargs}
+ ) if title else name
+
+
+def allure_full_name(item: pytest.Item):
+ package = allure_package(item)
+ class_name = f".{item.parent.name}" if isinstance(item.parent, pytest.Class) else ''
+ test = item.originalname if isinstance(item, pytest.Function) else item.name.split("[")[0]
+ full_name = f'{package}{class_name}#{test}'
+ return full_name
+
+
+def allure_suite_labels(item):
+ head, possibly_clazz, tail = islice(chain(item.nodeid.split('::'), [None], [None]), 3)
+ clazz = possibly_clazz if tail else None
+ file_name, path = islice(chain(reversed(head.rsplit('/', 1)), [None]), 2)
+ module = file_name.split('.')[0]
+ package = path.replace('/', '.') if path else None
+ pairs = dict(zip([LabelType.PARENT_SUITE, LabelType.SUITE, LabelType.SUB_SUITE], [package, module, clazz]))
+ labels = dict(allure_labels(item))
+ default_suite_labels = []
+ for label, value in pairs.items():
+ if label not in labels.keys() and value:
+ default_suite_labels.append((label, value))
+
+ return default_suite_labels
+
+
+def get_outcome_status(outcome):
+ _, exception, _ = outcome.excinfo or (None, None, None)
+ return get_status(exception)
+
+
+def get_outcome_status_details(outcome):
+ exception_type, exception, exception_traceback = outcome.excinfo or (None, None, None)
+ return get_status_details(exception_type, exception, exception_traceback)
+
+
+def get_status(exception):
+ if exception:
+ if isinstance(exception, AssertionError) or isinstance(exception, pytest.fail.Exception):
+ return Status.FAILED
+ elif isinstance(exception, pytest.skip.Exception):
+ return Status.SKIPPED
+ return Status.BROKEN
+ else:
+ return Status.PASSED
+
+
+def get_status_details(exception_type, exception, exception_traceback):
+ message = format_exception(exception_type, exception)
+ trace = format_traceback(exception_traceback)
+ return StatusDetails(message=message, trace=trace) if message or trace else None
+
+
+def get_pytest_report_status(pytest_report):
+ pytest_statuses = ('failed', 'passed', 'skipped')
+ statuses = (Status.FAILED, Status.PASSED, Status.SKIPPED)
+ for pytest_status, status in zip(pytest_statuses, statuses):
+ if getattr(pytest_report, pytest_status):
+ return status
+
+
+def get_history_id(full_name, parameters, original_values):
+ return md5(
+ full_name,
+ *(original_values.get(p.name, p.value) for p in sorted(
+ filter(
+ lambda p: not p.excluded,
+ parameters
+ ),
+ key=lambda p: p.name
+ ))
+ )