diff options
author | iddqd <iddqd@yandex-team.com> | 2024-05-13 17:19:30 +0300 |
---|---|---|
committer | iddqd <iddqd@yandex-team.com> | 2024-05-13 17:28:44 +0300 |
commit | 84d127b9b7e96ba4352e3f5ddc9222aee9a66053 (patch) | |
tree | 2ebb2689abf65e68dfe92a3ca9b161b4b6ae183f /contrib/python/allure-pytest/allure_pytest | |
parent | b7deb7f0b71db7419781d1b0357dfa443ccc3ff1 (diff) | |
download | ydb-84d127b9b7e96ba4352e3f5ddc9222aee9a66053.tar.gz |
Add allure support to ydb github export
d6cba27d09fb5e50a99c36070a6a3545c8393ea1
Diffstat (limited to 'contrib/python/allure-pytest/allure_pytest')
-rw-r--r-- | contrib/python/allure-pytest/allure_pytest/__init__.py | 0 | ||||
-rw-r--r-- | contrib/python/allure-pytest/allure_pytest/compat.py | 34 | ||||
-rw-r--r-- | contrib/python/allure-pytest/allure_pytest/helper.py | 47 | ||||
-rw-r--r-- | contrib/python/allure-pytest/allure_pytest/listener.py | 364 | ||||
-rw-r--r-- | contrib/python/allure-pytest/allure_pytest/plugin.py | 236 | ||||
-rw-r--r-- | contrib/python/allure-pytest/allure_pytest/utils.py | 197 |
6 files changed, 878 insertions, 0 deletions
diff --git a/contrib/python/allure-pytest/allure_pytest/__init__.py b/contrib/python/allure-pytest/allure_pytest/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/contrib/python/allure-pytest/allure_pytest/__init__.py diff --git a/contrib/python/allure-pytest/allure_pytest/compat.py b/contrib/python/allure-pytest/allure_pytest/compat.py new file mode 100644 index 0000000000..bf7db2dd27 --- /dev/null +++ b/contrib/python/allure-pytest/allure_pytest/compat.py @@ -0,0 +1,34 @@ +"""Provides compatibility with different pytest versions.""" + +from inspect import signature + +__GETFIXTUREDEFS_2ND_PAR_IS_STR = None + + +def getfixturedefs(fixturemanager, name, item): + """Calls FixtureManager.getfixturedefs in a way compatible with Python + versions before and after the change described in pytest-dev/pytest#11785. + """ + getfixturedefs = fixturemanager.getfixturedefs + itemarg = __resolve_getfixturedefs_2nd_arg(getfixturedefs, item) + return getfixturedefs(name, itemarg) + + +def __resolve_getfixturedefs_2nd_arg(getfixturedefs, item): + # Starting from pytest 8.1, getfixturedefs requires the item itself. + # In earlier versions it requires the nodeid string. + return item.nodeid if __2nd_parameter_is_str(getfixturedefs) else item + + +def __2nd_parameter_is_str(getfixturedefs): + global __GETFIXTUREDEFS_2ND_PAR_IS_STR + if __GETFIXTUREDEFS_2ND_PAR_IS_STR is None: + __GETFIXTUREDEFS_2ND_PAR_IS_STR =\ + __get_2nd_parameter_type(getfixturedefs) is str + return __GETFIXTUREDEFS_2ND_PAR_IS_STR + + +def __get_2nd_parameter_type(fn): + return list( + signature(fn).parameters.values() + )[1].annotation diff --git a/contrib/python/allure-pytest/allure_pytest/helper.py b/contrib/python/allure-pytest/allure_pytest/helper.py new file mode 100644 index 0000000000..e6944ef40c --- /dev/null +++ b/contrib/python/allure-pytest/allure_pytest/helper.py @@ -0,0 +1,47 @@ +import pytest +import allure_commons +from allure_pytest.utils import ALLURE_DESCRIPTION_MARK, ALLURE_DESCRIPTION_HTML_MARK +from allure_pytest.utils import ALLURE_LABEL_MARK, ALLURE_LINK_MARK +from allure_pytest.utils import format_allure_link + + +class AllureTitleHelper: + @allure_commons.hookimpl + def decorate_as_title(self, test_title): + def decorator(func): + # pytest.fixture wraps function, so we need to get it directly + if getattr(func, '__pytest_wrapped__', None): + function = func.__pytest_wrapped__.obj + else: + function = func + function.__allure_display_name__ = test_title + return func + + return decorator + + +class AllureTestHelper: + def __init__(self, config): + self.config = config + + @allure_commons.hookimpl + def decorate_as_description(self, test_description): + allure_description = getattr(pytest.mark, ALLURE_DESCRIPTION_MARK) + return allure_description(test_description) + + @allure_commons.hookimpl + def decorate_as_description_html(self, test_description_html): + allure_description_html = getattr(pytest.mark, ALLURE_DESCRIPTION_HTML_MARK) + return allure_description_html(test_description_html) + + @allure_commons.hookimpl + def decorate_as_label(self, label_type, labels): + allure_label = getattr(pytest.mark, ALLURE_LABEL_MARK) + return allure_label(*labels, label_type=label_type) + + @allure_commons.hookimpl + def decorate_as_link(self, url, link_type, name): + url = format_allure_link(self.config, url, link_type) + allure_link = getattr(pytest.mark, ALLURE_LINK_MARK) + name = url if name is None else name + return allure_link(url, name=name, link_type=link_type) diff --git a/contrib/python/allure-pytest/allure_pytest/listener.py b/contrib/python/allure-pytest/allure_pytest/listener.py new file mode 100644 index 0000000000..1115363091 --- /dev/null +++ b/contrib/python/allure-pytest/allure_pytest/listener.py @@ -0,0 +1,364 @@ +import pytest +import doctest + +import allure_commons +from allure_commons.utils import now +from allure_commons.utils import uuid4 +from allure_commons.utils import represent +from allure_commons.utils import platform_label +from allure_commons.utils import host_tag, thread_tag +from allure_commons.utils import md5 +from allure_commons.reporter import AllureReporter +from allure_commons.model2 import TestStepResult, TestResult, TestBeforeResult, TestAfterResult +from allure_commons.model2 import TestResultContainer +from allure_commons.model2 import StatusDetails +from allure_commons.model2 import Parameter +from allure_commons.model2 import Label, Link +from allure_commons.model2 import Status +from allure_commons.types import LabelType, AttachmentType, ParameterMode +from allure_pytest.utils import allure_description, allure_description_html +from allure_pytest.utils import allure_labels, allure_links, pytest_markers +from allure_pytest.utils import allure_full_name, allure_package, allure_name +from allure_pytest.utils import allure_suite_labels +from allure_pytest.utils import get_status, get_status_details +from allure_pytest.utils import get_outcome_status, get_outcome_status_details +from allure_pytest.utils import get_pytest_report_status +from allure_pytest.utils import format_allure_link +from allure_pytest.utils import get_history_id +from allure_pytest.compat import getfixturedefs + + +class AllureListener: + + SUITE_LABELS = { + LabelType.PARENT_SUITE, + LabelType.SUITE, + LabelType.SUB_SUITE, + } + + def __init__(self, config): + self.config = config + self.allure_logger = AllureReporter() + self._cache = ItemCache() + self._host = host_tag() + self._thread = thread_tag() + + @allure_commons.hookimpl + def start_step(self, uuid, title, params): + parameters = [Parameter(name=name, value=value) for name, value in params.items()] + step = TestStepResult(name=title, start=now(), parameters=parameters) + self.allure_logger.start_step(None, uuid, step) + + @allure_commons.hookimpl + def stop_step(self, uuid, exc_type, exc_val, exc_tb): + self.allure_logger.stop_step(uuid, + stop=now(), + status=get_status(exc_val), + statusDetails=get_status_details(exc_type, exc_val, exc_tb)) + + @allure_commons.hookimpl + def start_fixture(self, parent_uuid, uuid, name): + after_fixture = TestAfterResult(name=name, start=now()) + self.allure_logger.start_after_fixture(parent_uuid, uuid, after_fixture) + + @allure_commons.hookimpl + def stop_fixture(self, parent_uuid, uuid, name, exc_type, exc_val, exc_tb): + self.allure_logger.stop_after_fixture(uuid, + stop=now(), + status=get_status(exc_val), + statusDetails=get_status_details(exc_type, exc_val, exc_tb)) + + def _update_fixtures_children(self, item): + uuid = self._cache.get(item.nodeid) + for fixturedef in _test_fixtures(item): + group_uuid = self._cache.get(fixturedef) + if group_uuid: + group = self.allure_logger.get_item(group_uuid) + else: + group_uuid = self._cache.push(fixturedef) + group = TestResultContainer(uuid=group_uuid) + self.allure_logger.start_group(group_uuid, group) + if uuid not in group.children: + self.allure_logger.update_group(group_uuid, children=uuid) + + @pytest.hookimpl(hookwrapper=True, tryfirst=True) + def pytest_runtest_protocol(self, item, nextitem): + uuid = self._cache.push(item.nodeid) + test_result = TestResult(name=item.name, uuid=uuid, start=now(), stop=now()) + self.allure_logger.schedule_test(uuid, test_result) + yield + uuid = self._cache.pop(item.nodeid) + if uuid: + test_result = self.allure_logger.get_test(uuid) + if test_result.status is None: + test_result.status = Status.SKIPPED + self.allure_logger.close_test(uuid) + + @pytest.hookimpl(hookwrapper=True) + def pytest_runtest_setup(self, item): + if not self._cache.get(item.nodeid): + uuid = self._cache.push(item.nodeid) + test_result = TestResult(name=item.name, uuid=uuid, start=now(), stop=now()) + self.allure_logger.schedule_test(uuid, test_result) + yield + self._update_fixtures_children(item) + uuid = self._cache.get(item.nodeid) + test_result = self.allure_logger.get_test(uuid) + params = self.__get_pytest_params(item) + param_id = self.__get_pytest_param_id(item) + test_result.name = allure_name(item, params, param_id) + full_name = allure_full_name(item) + test_result.fullName = full_name + test_result.testCaseId = md5(full_name) + test_result.description = allure_description(item) + test_result.descriptionHtml = allure_description_html(item) + current_param_names = [param.name for param in test_result.parameters] + test_result.parameters.extend([ + Parameter(name=name, value=represent(value)) + for name, value in params.items() + if name not in current_param_names + ]) + + @pytest.hookimpl(hookwrapper=True) + def pytest_runtest_call(self, item): + uuid = self._cache.get(item.nodeid) + test_result = self.allure_logger.get_test(uuid) + if test_result: + self.allure_logger.drop_test(uuid) + self.allure_logger.schedule_test(uuid, test_result) + test_result.start = now() + yield + self._update_fixtures_children(item) + if test_result: + test_result.stop = now() + + @pytest.hookimpl(hookwrapper=True) + def pytest_runtest_teardown(self, item): + yield + uuid = self._cache.get(item.nodeid) + test_result = self.allure_logger.get_test(uuid) + test_result.historyId = get_history_id( + test_result.fullName, + test_result.parameters, + original_values=self.__get_pytest_params(item) + ) + test_result.labels.extend([Label(name=name, value=value) for name, value in allure_labels(item)]) + test_result.labels.extend([Label(name=LabelType.TAG, value=value) for value in pytest_markers(item)]) + self.__apply_default_suites(item, test_result) + test_result.labels.append(Label(name=LabelType.HOST, value=self._host)) + test_result.labels.append(Label(name=LabelType.THREAD, value=self._thread)) + test_result.labels.append(Label(name=LabelType.FRAMEWORK, value='pytest')) + test_result.labels.append(Label(name=LabelType.LANGUAGE, value=platform_label())) + test_result.labels.append(Label(name='package', value=allure_package(item))) + test_result.links.extend([Link(link_type, url, name) for link_type, url, name in allure_links(item)]) + + @pytest.hookimpl(hookwrapper=True) + def pytest_fixture_setup(self, fixturedef, request): + fixture_name = getattr(fixturedef.func, '__allure_display_name__', fixturedef.argname) + + container_uuid = self._cache.get(fixturedef) + + if not container_uuid: + container_uuid = self._cache.push(fixturedef) + container = TestResultContainer(uuid=container_uuid) + self.allure_logger.start_group(container_uuid, container) + + self.allure_logger.update_group(container_uuid, start=now()) + + before_fixture_uuid = uuid4() + before_fixture = TestBeforeResult(name=fixture_name, start=now()) + self.allure_logger.start_before_fixture(container_uuid, before_fixture_uuid, before_fixture) + + outcome = yield + + self.allure_logger.stop_before_fixture(before_fixture_uuid, + stop=now(), + status=get_outcome_status(outcome), + statusDetails=get_outcome_status_details(outcome)) + + finalizers = getattr(fixturedef, '_finalizers', []) + for index, finalizer in enumerate(finalizers): + finalizer_name = getattr(finalizer, "__name__", index) + name = f'{fixture_name}::{finalizer_name}' + finalizers[index] = allure_commons.fixture(finalizer, parent_uuid=container_uuid, name=name) + + @pytest.hookimpl(hookwrapper=True) + def pytest_fixture_post_finalizer(self, fixturedef): + yield + if hasattr(fixturedef, 'cached_result') and self._cache.get(fixturedef): + container_uuid = self._cache.pop(fixturedef) + self.allure_logger.stop_group(container_uuid, stop=now()) + + @pytest.hookimpl(hookwrapper=True) + def pytest_runtest_makereport(self, item, call): + uuid = self._cache.get(item.nodeid) + + report = (yield).get_result() + + test_result = self.allure_logger.get_test(uuid) + status = get_pytest_report_status(report) + status_details = None + + if call.excinfo: + message = call.excinfo.exconly() + if hasattr(report, 'wasxfail'): + reason = report.wasxfail + message = (f'XFAIL {reason}' if reason else 'XFAIL') + '\n\n' + message + trace = report.longreprtext + status_details = StatusDetails( + message=message, + trace=trace) + + exception = call.excinfo.value + if (status != Status.SKIPPED and _exception_brokes_test(exception)): + status = Status.BROKEN + + if status == Status.PASSED and hasattr(report, 'wasxfail'): + reason = report.wasxfail + message = f'XPASS {reason}' if reason else 'XPASS' + status_details = StatusDetails(message=message) + + if report.when == 'setup': + test_result.status = status + test_result.statusDetails = status_details + + if report.when == 'call': + if test_result.status == Status.PASSED: + test_result.status = status + test_result.statusDetails = status_details + + if report.when == 'teardown': + if status in (Status.FAILED, Status.BROKEN) and test_result.status == Status.PASSED: + test_result.status = status + test_result.statusDetails = status_details + + if self.config.option.attach_capture: + if report.caplog: + self.attach_data(report.caplog, "log", AttachmentType.TEXT, None) + if report.capstdout: + self.attach_data(report.capstdout, "stdout", AttachmentType.TEXT, None) + if report.capstderr: + self.attach_data(report.capstderr, "stderr", AttachmentType.TEXT, None) + + @pytest.hookimpl(hookwrapper=True) + def pytest_runtest_logfinish(self, nodeid, location): + yield + uuid = self._cache.pop(nodeid) + if uuid: + self.allure_logger.close_test(uuid) + + @allure_commons.hookimpl + def attach_data(self, body, name, attachment_type, extension): + self.allure_logger.attach_data(uuid4(), body, name=name, attachment_type=attachment_type, extension=extension) + + @allure_commons.hookimpl + def attach_file(self, source, name, attachment_type, extension): + self.allure_logger.attach_file(uuid4(), source, name=name, attachment_type=attachment_type, extension=extension) + + @allure_commons.hookimpl + def add_title(self, test_title): + test_result = self.allure_logger.get_test(None) + if test_result: + test_result.name = test_title + + @allure_commons.hookimpl + def add_description(self, test_description): + test_result = self.allure_logger.get_test(None) + if test_result: + test_result.description = test_description + + @allure_commons.hookimpl + def add_description_html(self, test_description_html): + test_result = self.allure_logger.get_test(None) + if test_result: + test_result.descriptionHtml = test_description_html + + @allure_commons.hookimpl + def add_link(self, url, link_type, name): + test_result = self.allure_logger.get_test(None) + if test_result: + link_url = format_allure_link(self.config, url, link_type) + new_link = Link(link_type, link_url, link_url if name is None else name) + for link in test_result.links: + if link.url == new_link.url: + return + test_result.links.append(new_link) + + @allure_commons.hookimpl + def add_label(self, label_type, labels): + test_result = self.allure_logger.get_test(None) + for label in labels if test_result else (): + test_result.labels.append(Label(label_type, label)) + + @allure_commons.hookimpl + def add_parameter(self, name, value, excluded, mode: ParameterMode): + test_result: TestResult = self.allure_logger.get_test(None) + existing_param = next(filter(lambda x: x.name == name, test_result.parameters), None) + if existing_param: + existing_param.value = represent(value) + else: + test_result.parameters.append( + Parameter( + name=name, + value=represent(value), + excluded=excluded or None, + mode=mode.value if mode else None + ) + ) + + @staticmethod + def __get_pytest_params(item): + return item.callspec.params if hasattr(item, 'callspec') else {} + + @staticmethod + def __get_pytest_param_id(item): + return item.callspec.id if hasattr(item, 'callspec') else None + + def __apply_default_suites(self, item, test_result): + default_suites = allure_suite_labels(item) + existing_suites = { + label.name + for label in test_result.labels + if label.name in AllureListener.SUITE_LABELS + } + test_result.labels.extend( + Label(name=name, value=value) + for name, value in default_suites + if name not in existing_suites + ) + + +class ItemCache: + + def __init__(self): + self._items = dict() + + def get(self, _id): + return self._items.get(id(_id)) + + def push(self, _id): + return self._items.setdefault(id(_id), uuid4()) + + def pop(self, _id): + return self._items.pop(id(_id), None) + + +def _test_fixtures(item): + fixturemanager = item.session._fixturemanager + fixturedefs = [] + + if hasattr(item, "_request") and hasattr(item._request, "fixturenames"): + for name in item._request.fixturenames: + fixturedefs_pytest = getfixturedefs(fixturemanager, name, item) + if fixturedefs_pytest: + fixturedefs.extend(fixturedefs_pytest) + + return fixturedefs + + +def _exception_brokes_test(exception): + return not isinstance(exception, ( + AssertionError, + pytest.fail.Exception, + doctest.DocTestFailure + )) diff --git a/contrib/python/allure-pytest/allure_pytest/plugin.py b/contrib/python/allure-pytest/allure_pytest/plugin.py new file mode 100644 index 0000000000..2771722ffc --- /dev/null +++ b/contrib/python/allure-pytest/allure_pytest/plugin.py @@ -0,0 +1,236 @@ +import argparse + +import allure +import allure_commons +import os + +from allure_commons.types import LabelType, Severity +from allure_commons.logger import AllureFileLogger +from allure_commons.utils import get_testplan + +from allure_pytest.utils import allure_label, allure_labels, allure_full_name +from allure_pytest.helper import AllureTestHelper, AllureTitleHelper +from allure_pytest.listener import AllureListener + +from allure_pytest.utils import ALLURE_DESCRIPTION_MARK, ALLURE_DESCRIPTION_HTML_MARK +from allure_pytest.utils import ALLURE_LABEL_MARK, ALLURE_LINK_MARK + + +def pytest_addoption(parser): + parser.getgroup("reporting").addoption('--alluredir', + action="store", + dest="allure_report_dir", + metavar="DIR", + default=None, + help="Generate Allure report in the specified directory (may not exist)") + + parser.getgroup("reporting").addoption('--clean-alluredir', + action="store_true", + dest="clean_alluredir", + help="Clean alluredir folder if it exists") + + parser.getgroup("reporting").addoption('--allure-no-capture', + action="store_false", + dest="attach_capture", + help="Do not attach pytest captured logging/stdout/stderr to report") + + parser.getgroup("reporting").addoption('--inversion', + action="store", + dest="inversion", + default=False, + help="Run tests not in testplan") + + def label_type(type_name, legal_values=set()): + def a_label_type(string): + atoms = set(string.split(',')) + if type_name is LabelType.SEVERITY: + if not atoms <= legal_values: + raise argparse.ArgumentTypeError('Illegal {} values: {}, only [{}] are allowed'.format( + type_name, + ', '.join(atoms - legal_values), + ', '.join(legal_values) + )) + return set((type_name, allure.severity_level(atom)) for atom in atoms) + return set((type_name, atom) for atom in atoms) + return a_label_type + + severities = [x.value for x in list(allure.severity_level)] + formatted_severities = ', '.join(severities) + parser.getgroup("general").addoption('--allure-severities', + action="store", + dest="allure_severities", + metavar="SEVERITIES_SET", + default={}, + type=label_type(LabelType.SEVERITY, legal_values=set(severities)), + help=f"""Comma-separated list of severity names. + Tests only with these severities will be run. + Possible values are: {formatted_severities}.""") + + parser.getgroup("general").addoption('--allure-epics', + action="store", + dest="allure_epics", + metavar="EPICS_SET", + default={}, + type=label_type(LabelType.EPIC), + help="""Comma-separated list of epic names. + Run tests that have at least one of the specified feature labels.""") + + parser.getgroup("general").addoption('--allure-features', + action="store", + dest="allure_features", + metavar="FEATURES_SET", + default={}, + type=label_type(LabelType.FEATURE), + help="""Comma-separated list of feature names. + Run tests that have at least one of the specified feature labels.""") + + parser.getgroup("general").addoption('--allure-stories', + action="store", + dest="allure_stories", + metavar="STORIES_SET", + default={}, + type=label_type(LabelType.STORY), + help="""Comma-separated list of story names. + Run tests that have at least one of the specified story labels.""") + + parser.getgroup("general").addoption('--allure-ids', + action="store", + dest="allure_ids", + metavar="IDS_SET", + default={}, + type=label_type(LabelType.ID), + help="""Comma-separated list of IDs. + Run tests that have at least one of the specified id labels.""") + + def cf_type(string): + type_name, values = string.split("=", 1) + atoms = set(values.split(",")) + return [(type_name, atom) for atom in atoms] + + parser.getgroup("general").addoption('--allure-label', + action="append", + dest="allure_labels", + metavar="LABELS_SET", + default=[], + type=cf_type, + help="""List of labels to run in format label_name=value1,value2. + "Run tests that have at least one of the specified labels.""") + + def link_pattern(string): + pattern = string.split(':', 1) + if not pattern[0]: + raise argparse.ArgumentTypeError('Link type is mandatory.') + + if len(pattern) != 2: + raise argparse.ArgumentTypeError('Link pattern is mandatory') + return pattern + + parser.getgroup("general").addoption('--allure-link-pattern', + action="append", + dest="allure_link_pattern", + metavar="LINK_TYPE:LINK_PATTERN", + default=[], + type=link_pattern, + help="""Url pattern for link type. Allows short links in test, + like 'issue-1'. Text will be formatted to full url with python + str.format().""") + + +def cleanup_factory(plugin): + def clean_up(): + name = allure_commons.plugin_manager.get_name(plugin) + allure_commons.plugin_manager.unregister(name=name) + return clean_up + + +def pytest_addhooks(pluginmanager): + # Need register title hooks before conftest init + title_helper = AllureTitleHelper() + allure_commons.plugin_manager.register(title_helper) + + +def pytest_configure(config): + report_dir = config.option.allure_report_dir + clean = False if config.option.collectonly else config.option.clean_alluredir + + test_helper = AllureTestHelper(config) + allure_commons.plugin_manager.register(test_helper) + config.add_cleanup(cleanup_factory(test_helper)) + + if report_dir: + report_dir = os.path.abspath(report_dir) + test_listener = AllureListener(config) + config.pluginmanager.register(test_listener, 'allure_listener') + allure_commons.plugin_manager.register(test_listener) + config.add_cleanup(cleanup_factory(test_listener)) + + file_logger = AllureFileLogger(report_dir, clean) + allure_commons.plugin_manager.register(file_logger) + config.add_cleanup(cleanup_factory(file_logger)) + + config.addinivalue_line("markers", f"{ALLURE_LABEL_MARK}: allure label marker") + config.addinivalue_line("markers", f"{ALLURE_LINK_MARK}: allure link marker") + config.addinivalue_line("markers", f"{ALLURE_DESCRIPTION_MARK}: allure description") + config.addinivalue_line("markers", f"{ALLURE_DESCRIPTION_HTML_MARK}: allure description html") + + +def select_by_labels(items, config): + arg_labels = set().union( + config.option.allure_epics, + config.option.allure_features, + config.option.allure_stories, + config.option.allure_ids, + config.option.allure_severities, + *config.option.allure_labels + ) + if arg_labels: + selected, deselected = [], [] + for item in items: + test_labels = set(allure_labels(item)) + test_severity = allure_label(item, LabelType.SEVERITY) + if not test_severity: + test_labels.add((LabelType.SEVERITY, Severity.NORMAL)) + if arg_labels & test_labels: + selected.append(item) + else: + deselected.append(item) + return selected, deselected + else: + return items, [] + + +def select_by_testcase(items, config): + planned_tests = get_testplan() + is_inversion = config.option.inversion + + if planned_tests: + + def is_planed(item): + allure_ids = allure_label(item, LabelType.ID) + allure_string_ids = list(map(str, allure_ids)) + for planed_item in planned_tests: + planed_item_string_id = str(planed_item.get("id")) + planed_item_selector = planed_item.get("selector") + if ( + planed_item_string_id in allure_string_ids + or planed_item_selector == allure_full_name(item) + ): + return True if not is_inversion else False + return False if not is_inversion else True + + selected, deselected = [], [] + for item in items: + selected.append(item) if is_planed(item) else deselected.append(item) + return selected, deselected + else: + return items, [] + + +def pytest_collection_modifyitems(items, config): + selected, deselected_by_testcase = select_by_testcase(items, config) + selected, deselected_by_labels = select_by_labels(selected, config) + + items[:] = selected + + if deselected_by_testcase or deselected_by_labels: + config.hook.pytest_deselected(items=[*deselected_by_testcase, *deselected_by_labels]) diff --git a/contrib/python/allure-pytest/allure_pytest/utils.py b/contrib/python/allure-pytest/allure_pytest/utils.py new file mode 100644 index 0000000000..1e07cb492c --- /dev/null +++ b/contrib/python/allure-pytest/allure_pytest/utils.py @@ -0,0 +1,197 @@ +import pytest +from itertools import chain, islice +from allure_commons.utils import represent, SafeFormatter, md5 +from allure_commons.utils import format_exception, format_traceback +from allure_commons.model2 import Status +from allure_commons.model2 import StatusDetails +from allure_commons.types import LabelType + + +ALLURE_DESCRIPTION_MARK = 'allure_description' +ALLURE_DESCRIPTION_HTML_MARK = 'allure_description_html' +ALLURE_LABEL_MARK = 'allure_label' +ALLURE_LINK_MARK = 'allure_link' +ALLURE_UNIQUE_LABELS = [ + LabelType.SEVERITY, + LabelType.FRAMEWORK, + LabelType.HOST, + LabelType.SUITE, + LabelType.PARENT_SUITE, + LabelType.SUB_SUITE +] + + +def get_marker_value(item, keyword): + marker = item.get_closest_marker(keyword) + return marker.args[0] if marker and marker.args else None + + +def allure_title(item): + return getattr( + getattr(item, "obj", None), + "__allure_display_name__", + None + ) + + +def allure_description(item): + description = get_marker_value(item, ALLURE_DESCRIPTION_MARK) + if description: + return description + elif hasattr(item, 'function'): + return item.function.__doc__ + + +def allure_description_html(item): + return get_marker_value(item, ALLURE_DESCRIPTION_HTML_MARK) + + +def allure_label(item, label): + labels = [] + for mark in item.iter_markers(name=ALLURE_LABEL_MARK): + if mark.kwargs.get("label_type") == label: + labels.extend(mark.args) + return labels + + +def allure_labels(item): + unique_labels = dict() + labels = set() + for mark in item.iter_markers(name=ALLURE_LABEL_MARK): + label_type = mark.kwargs["label_type"] + if label_type in ALLURE_UNIQUE_LABELS: + if label_type not in unique_labels.keys(): + unique_labels[label_type] = mark.args[0] + else: + for arg in mark.args: + labels.add((label_type, arg)) + for k, v in unique_labels.items(): + labels.add((k, v)) + return labels + + +def allure_links(item): + for mark in item.iter_markers(name=ALLURE_LINK_MARK): + yield (mark.kwargs["link_type"], mark.args[0], mark.kwargs["name"]) + + +def format_allure_link(config, url, link_type): + pattern = dict(config.option.allure_link_pattern).get(link_type, '{}') + return pattern.format(url) + + +def pytest_markers(item): + for keyword in item.keywords.keys(): + if any([keyword.startswith('allure_'), keyword == 'parametrize']): + continue + marker = item.get_closest_marker(keyword) + if marker is None: + continue + + yield mark_to_str(marker) + + +def mark_to_str(marker): + args = [represent(arg) for arg in marker.args] + kwargs = [f'{key}={represent(value)}' for key, value in marker.kwargs.items()] + if marker.name in ('filterwarnings', 'skip', 'skipif', 'xfail', 'usefixtures', 'tryfirst', 'trylast'): + markstr = f'@pytest.mark.{marker.name}' + else: + markstr = str(marker.name) + if args or kwargs: + parameters = ', '.join(args + kwargs) + markstr = f'{markstr}({parameters})' + return markstr + + +def allure_package(item): + parts = item.nodeid.split('::') + path = parts[0].rsplit('.', 1)[0] + return path.replace('/', '.') + + +def allure_name(item, parameters, param_id=None): + name = item.name + title = allure_title(item) + param_id_kwargs = {} + if param_id: + # if param_id is an ASCII string, it could have been encoded by pytest (_pytest.compat.ascii_escaped) + if param_id.isascii(): + param_id = param_id.encode().decode("unicode-escape") + param_id_kwargs["param_id"] = param_id + return SafeFormatter().format( + title, + **{**param_id_kwargs, **parameters, **item.funcargs} + ) if title else name + + +def allure_full_name(item: pytest.Item): + package = allure_package(item) + class_name = f".{item.parent.name}" if isinstance(item.parent, pytest.Class) else '' + test = item.originalname if isinstance(item, pytest.Function) else item.name.split("[")[0] + full_name = f'{package}{class_name}#{test}' + return full_name + + +def allure_suite_labels(item): + head, possibly_clazz, tail = islice(chain(item.nodeid.split('::'), [None], [None]), 3) + clazz = possibly_clazz if tail else None + file_name, path = islice(chain(reversed(head.rsplit('/', 1)), [None]), 2) + module = file_name.split('.')[0] + package = path.replace('/', '.') if path else None + pairs = dict(zip([LabelType.PARENT_SUITE, LabelType.SUITE, LabelType.SUB_SUITE], [package, module, clazz])) + labels = dict(allure_labels(item)) + default_suite_labels = [] + for label, value in pairs.items(): + if label not in labels.keys() and value: + default_suite_labels.append((label, value)) + + return default_suite_labels + + +def get_outcome_status(outcome): + _, exception, _ = outcome.excinfo or (None, None, None) + return get_status(exception) + + +def get_outcome_status_details(outcome): + exception_type, exception, exception_traceback = outcome.excinfo or (None, None, None) + return get_status_details(exception_type, exception, exception_traceback) + + +def get_status(exception): + if exception: + if isinstance(exception, AssertionError) or isinstance(exception, pytest.fail.Exception): + return Status.FAILED + elif isinstance(exception, pytest.skip.Exception): + return Status.SKIPPED + return Status.BROKEN + else: + return Status.PASSED + + +def get_status_details(exception_type, exception, exception_traceback): + message = format_exception(exception_type, exception) + trace = format_traceback(exception_traceback) + return StatusDetails(message=message, trace=trace) if message or trace else None + + +def get_pytest_report_status(pytest_report): + pytest_statuses = ('failed', 'passed', 'skipped') + statuses = (Status.FAILED, Status.PASSED, Status.SKIPPED) + for pytest_status, status in zip(pytest_statuses, statuses): + if getattr(pytest_report, pytest_status): + return status + + +def get_history_id(full_name, parameters, original_values): + return md5( + full_name, + *(original_values.get(p.name, p.value) for p in sorted( + filter( + lambda p: not p.excluded, + parameters + ), + key=lambda p: p.name + )) + ) |