aboutsummaryrefslogtreecommitdiffstats
path: root/library/python/testing/yatest_lib
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/python/testing/yatest_lib
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/python/testing/yatest_lib')
-rw-r--r--library/python/testing/yatest_lib/__init__.py0
-rw-r--r--library/python/testing/yatest_lib/external.py192
-rw-r--r--library/python/testing/yatest_lib/test_splitter.py102
-rw-r--r--library/python/testing/yatest_lib/tests/test_external.py20
-rw-r--r--library/python/testing/yatest_lib/tests/test_testsplitter.py103
-rw-r--r--library/python/testing/yatest_lib/tests/ya.make14
-rw-r--r--library/python/testing/yatest_lib/tools.py64
-rw-r--r--library/python/testing/yatest_lib/ya.make26
-rw-r--r--library/python/testing/yatest_lib/ya.py239
9 files changed, 760 insertions, 0 deletions
diff --git a/library/python/testing/yatest_lib/__init__.py b/library/python/testing/yatest_lib/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/library/python/testing/yatest_lib/__init__.py
diff --git a/library/python/testing/yatest_lib/external.py b/library/python/testing/yatest_lib/external.py
new file mode 100644
index 0000000000..39113230d9
--- /dev/null
+++ b/library/python/testing/yatest_lib/external.py
@@ -0,0 +1,192 @@
+from __future__ import absolute_import
+
+import re
+import sys
+import copy
+import logging
+
+from . import tools
+from datetime import date, datetime
+
+import enum
+import six
+
+logger = logging.getLogger(__name__)
+MDS_URI_PREFIX = 'https://storage.yandex-team.ru/get-devtools/'
+
+
+def apply(func, value, apply_to_keys=False):
+ """
+ Applies func to every possible member of value
+ :param value: could be either a primitive object or a complex one (list, dicts)
+ :param func: func to be applied
+ :return:
+ """
+ def _apply(func, value, value_path):
+ if value_path is None:
+ value_path = []
+
+ if isinstance(value, list) or isinstance(value, tuple):
+ res = []
+ for ind, item in enumerate(value):
+ path = copy.copy(value_path)
+ path.append(ind)
+ res.append(_apply(func, item, path))
+ elif isinstance(value, dict):
+ if is_external(value):
+ # this is a special serialized object pointing to some external place
+ res = func(value, value_path)
+ else:
+ res = {}
+ for key, val in sorted(value.items(), key=lambda dict_item: dict_item[0]):
+ path = copy.copy(value_path)
+ path.append(key)
+ res[_apply(func, key, path) if apply_to_keys else key] = _apply(func, val, path)
+ else:
+ res = func(value, value_path)
+ return res
+ return _apply(func, value, None)
+
+
+def is_coroutine(val):
+ if sys.version_info[0] < 3:
+ return False
+ else:
+ import asyncio
+ return asyncio.iscoroutinefunction(val) or asyncio.iscoroutine(val)
+
+
+def serialize(value):
+ """
+ Serialize value to json-convertible object
+ Ensures that all components of value can be serialized to json
+ :param value: object to be serialized
+ """
+ def _serialize(val, _):
+ if val is None:
+ return val
+ if isinstance(val, six.string_types) or isinstance(val, bytes):
+ return tools.to_utf8(val)
+ if isinstance(val, enum.Enum):
+ return str(val)
+ if isinstance(val, six.integer_types) or type(val) in [float, bool]:
+ return val
+ if is_external(val):
+ return dict(val)
+ if isinstance(val, (date, datetime)):
+ return repr(val)
+ if is_coroutine(val):
+ return None
+ raise ValueError("Cannot serialize value '{}' of type {}".format(val, type(val)))
+ return apply(_serialize, value, apply_to_keys=True)
+
+
+def is_external(value):
+ return isinstance(value, dict) and "uri" in value.keys()
+
+
+class ExternalSchema(object):
+ File = "file"
+ SandboxResource = "sbr"
+ Delayed = "delayed"
+ HTTP = "http"
+
+
+class CanonicalObject(dict):
+ def __iter__(self):
+ raise TypeError("Iterating canonical object is not implemented")
+
+
+class ExternalDataInfo(object):
+
+ def __init__(self, data):
+ assert is_external(data)
+ self._data = data
+
+ def __str__(self):
+ type_str = "File" if self.is_file else "Sandbox resource"
+ return "{}({})".format(type_str, self.path)
+
+ def __repr__(self):
+ return str(self)
+
+ @property
+ def uri(self):
+ return self._data["uri"]
+
+ @property
+ def checksum(self):
+ return self._data.get("checksum")
+
+ @property
+ def is_file(self):
+ return self.uri.startswith(ExternalSchema.File)
+
+ @property
+ def is_sandbox_resource(self):
+ return self.uri.startswith(ExternalSchema.SandboxResource)
+
+ @property
+ def is_delayed(self):
+ return self.uri.startswith(ExternalSchema.Delayed)
+
+ @property
+ def is_http(self):
+ return self.uri.startswith(ExternalSchema.HTTP)
+
+ @property
+ def path(self):
+ if self.uri.count("://") != 1:
+ logger.error("Invalid external data uri: '%s'", self.uri)
+ return self.uri
+ _, path = self.uri.split("://")
+ return path
+
+ def get_mds_key(self):
+ assert self.is_http
+ m = re.match(re.escape(MDS_URI_PREFIX) + r'(.*?)($|#)', self.uri)
+ if m:
+ return m.group(1)
+ raise AssertionError("Failed to extract mds key properly from '{}'".format(self.uri))
+
+ @property
+ def size(self):
+ return self._data.get("size")
+
+ def serialize(self):
+ return self._data
+
+ @classmethod
+ def _serialize(cls, schema, path, checksum=None, attrs=None):
+ res = CanonicalObject({"uri": "{}://{}".format(schema, path)})
+ if checksum:
+ res["checksum"] = checksum
+ if attrs:
+ res.update(attrs)
+ return res
+
+ @classmethod
+ def serialize_file(cls, path, checksum=None, diff_tool=None, local=False, diff_file_name=None, diff_tool_timeout=None, size=None):
+ attrs = {}
+ if diff_tool:
+ attrs["diff_tool"] = diff_tool
+ if local:
+ attrs["local"] = local
+ if diff_file_name:
+ attrs["diff_file_name"] = diff_file_name
+ if diff_tool_timeout:
+ attrs["diff_tool_timeout"] = diff_tool_timeout
+ if size is not None:
+ attrs["size"] = size
+ return cls._serialize(ExternalSchema.File, path, checksum, attrs=attrs)
+
+ @classmethod
+ def serialize_resource(cls, id, checksum=None):
+ return cls._serialize(ExternalSchema.SandboxResource, id, checksum)
+
+ @classmethod
+ def serialize_delayed(cls, upload_id, checksum):
+ return cls._serialize(ExternalSchema.Delayed, upload_id, checksum)
+
+ def get(self, key, default=None):
+ return self._data.get(key, default)
diff --git a/library/python/testing/yatest_lib/test_splitter.py b/library/python/testing/yatest_lib/test_splitter.py
new file mode 100644
index 0000000000..acbcd4300e
--- /dev/null
+++ b/library/python/testing/yatest_lib/test_splitter.py
@@ -0,0 +1,102 @@
+# coding: utf-8
+
+import collections
+
+
+def flatten_tests(test_classes):
+ """
+ >>> test_classes = {x: [x] for x in range(5)}
+ >>> flatten_tests(test_classes)
+ [(0, 0), (1, 1), (2, 2), (3, 3), (4, 4)]
+ >>> test_classes = {x: [x + 1, x + 2] for x in range(2)}
+ >>> flatten_tests(test_classes)
+ [(0, 1), (0, 2), (1, 2), (1, 3)]
+ """
+ tests = []
+ for class_name, test_names in test_classes.items():
+ tests += [(class_name, test_name) for test_name in test_names]
+ return tests
+
+
+def get_sequential_chunk(tests, modulo, modulo_index, is_sorted=False):
+ """
+ >>> get_sequential_chunk(range(10), 4, 0)
+ [0, 1, 2]
+ >>> get_sequential_chunk(range(10), 4, 1)
+ [3, 4, 5]
+ >>> get_sequential_chunk(range(10), 4, 2)
+ [6, 7]
+ >>> get_sequential_chunk(range(10), 4, 3)
+ [8, 9]
+ >>> get_sequential_chunk(range(10), 4, 4)
+ []
+ >>> get_sequential_chunk(range(10), 4, 5)
+ []
+ """
+ if not is_sorted:
+ tests = sorted(tests)
+ chunk_size = len(tests) // modulo
+ not_used = len(tests) % modulo
+ shift = chunk_size + (modulo_index < not_used)
+ start = chunk_size * modulo_index + min(modulo_index, not_used)
+ end = start + shift
+ return [] if end > len(tests) else tests[start:end]
+
+
+def get_shuffled_chunk(tests, modulo, modulo_index, is_sorted=False):
+ """
+ >>> get_shuffled_chunk(range(10), 4, 0)
+ [0, 4, 8]
+ >>> get_shuffled_chunk(range(10), 4, 1)
+ [1, 5, 9]
+ >>> get_shuffled_chunk(range(10), 4, 2)
+ [2, 6]
+ >>> get_shuffled_chunk(range(10), 4, 3)
+ [3, 7]
+ >>> get_shuffled_chunk(range(10), 4, 4)
+ []
+ >>> get_shuffled_chunk(range(10), 4, 5)
+ []
+ """
+ if not is_sorted:
+ tests = sorted(tests)
+ result_tests = []
+ for i, test in enumerate(tests):
+ if i % modulo == modulo_index:
+ result_tests.append(test)
+ return result_tests
+
+
+def get_splitted_tests(test_entities, modulo, modulo_index, partition_mode, is_sorted=False):
+ if partition_mode == 'SEQUENTIAL':
+ return get_sequential_chunk(test_entities, modulo, modulo_index, is_sorted)
+ elif partition_mode == 'MODULO':
+ return get_shuffled_chunk(test_entities, modulo, modulo_index, is_sorted)
+ else:
+ raise ValueError("detected unknown partition mode: {}".format(partition_mode))
+
+
+def filter_tests_by_modulo(test_classes, modulo, modulo_index, split_by_tests, partition_mode="SEQUENTIAL"):
+ """
+ >>> test_classes = {x: [x] for x in range(20)}
+ >>> filter_tests_by_modulo(test_classes, 4, 0, False)
+ {0: [0], 1: [1], 2: [2], 3: [3], 4: [4]}
+ >>> filter_tests_by_modulo(test_classes, 4, 1, False)
+ {8: [8], 9: [9], 5: [5], 6: [6], 7: [7]}
+ >>> filter_tests_by_modulo(test_classes, 4, 2, False)
+ {10: [10], 11: [11], 12: [12], 13: [13], 14: [14]}
+
+ >>> dict(filter_tests_by_modulo(test_classes, 4, 0, True))
+ {0: [0], 1: [1], 2: [2], 3: [3], 4: [4]}
+ >>> dict(filter_tests_by_modulo(test_classes, 4, 1, True))
+ {8: [8], 9: [9], 5: [5], 6: [6], 7: [7]}
+ """
+ if split_by_tests:
+ tests = get_splitted_tests(flatten_tests(test_classes), modulo, modulo_index, partition_mode)
+ test_classes = collections.defaultdict(list)
+ for class_name, test_name in tests:
+ test_classes[class_name].append(test_name)
+ return test_classes
+ else:
+ target_classes = get_splitted_tests(test_classes, modulo, modulo_index, partition_mode)
+ return {class_name: test_classes[class_name] for class_name in target_classes}
diff --git a/library/python/testing/yatest_lib/tests/test_external.py b/library/python/testing/yatest_lib/tests/test_external.py
new file mode 100644
index 0000000000..18cb560b17
--- /dev/null
+++ b/library/python/testing/yatest_lib/tests/test_external.py
@@ -0,0 +1,20 @@
+import enum
+import pytest
+
+from yatest_lib import external
+
+
+class MyEnum(enum.Enum):
+ VAL1 = 1
+ VAL2 = 2
+
+
+@pytest.mark.parametrize("data, expected_val, expected_type", [
+ ({}, {}, dict),
+ (MyEnum.VAL1, "MyEnum.VAL1", str),
+ ({MyEnum.VAL1: MyEnum.VAL2}, {"MyEnum.VAL1": "MyEnum.VAL2"}, dict),
+])
+def test_serialize(data, expected_val, expected_type):
+ data = external.serialize(data)
+ assert expected_type == type(data), data
+ assert expected_val == data
diff --git a/library/python/testing/yatest_lib/tests/test_testsplitter.py b/library/python/testing/yatest_lib/tests/test_testsplitter.py
new file mode 100644
index 0000000000..394bfe5a74
--- /dev/null
+++ b/library/python/testing/yatest_lib/tests/test_testsplitter.py
@@ -0,0 +1,103 @@
+# coding: utf-8
+from yatest_lib import test_splitter
+
+
+def get_chunks(tests, modulo, mode):
+ chunks = []
+ if mode == "MODULO":
+ for modulo_index in range(modulo):
+ chunks.append(test_splitter.get_shuffled_chunk(tests, modulo, modulo_index))
+ elif mode == "SEQUENTIAL":
+ for modulo_index in range(modulo):
+ chunks.append(test_splitter.get_sequential_chunk(tests, modulo, modulo_index))
+ else:
+ raise ValueError("no such mode")
+ return chunks
+
+
+def check_not_intersect(chunk_list):
+ test_set = set()
+ total_size = 0
+ for tests in chunk_list:
+ total_size += len(tests)
+ test_set.update(tests)
+ return total_size == len(test_set)
+
+
+def check_max_diff(chunk_list):
+ return max(map(len, chunk_list)) - min(map(len, chunk_list))
+
+
+def test_lot_of_chunks():
+ for chunk_count in range(10, 20):
+ for tests_count in range(chunk_count):
+ chunks = get_chunks(range(tests_count), chunk_count, "SEQUENTIAL")
+ assert check_not_intersect(chunks)
+ assert check_max_diff(chunks) <= 1
+ assert chunks.count([]) == chunk_count - tests_count
+ assert len(chunks) == chunk_count
+ chunks = get_chunks(range(tests_count), chunk_count, "MODULO")
+ assert check_not_intersect(chunks)
+ assert check_max_diff(chunks) <= 1
+ assert chunks.count([]) == chunk_count - tests_count
+ assert len(chunks) == chunk_count
+
+
+def test_lot_of_tests():
+ for tests_count in range(10, 20):
+ for chunk_count in range(2, tests_count):
+ chunks = get_chunks(range(tests_count), chunk_count, "SEQUENTIAL")
+ assert check_not_intersect(chunks)
+ assert check_max_diff(chunks) <= 1
+ assert len(chunks) == chunk_count
+ chunks = get_chunks(range(tests_count), chunk_count, "MODULO")
+ assert check_not_intersect(chunks)
+ assert check_max_diff(chunks) <= 1
+ assert len(chunks) == chunk_count
+
+
+def prime_chunk_count():
+ for chunk_count in [7, 11, 13, 17, 23, 29]:
+ for tests_count in range(chunk_count):
+ chunks = get_chunks(range(tests_count), chunk_count, "SEQUENTIAL")
+ assert check_not_intersect(chunks)
+ assert check_max_diff(chunks) <= 1
+ assert len(chunks) == chunk_count
+ chunks = get_chunks(range(tests_count), chunk_count, "MODULO")
+ assert check_not_intersect(chunks)
+ assert check_max_diff(chunks) <= 1
+ assert len(chunks) == chunk_count
+
+
+def get_divisors(number):
+ divisors = []
+ for d in range(1, number + 1):
+ if number % d == 0:
+ divisors.append(d)
+ return divisors
+
+
+def equal_chunks():
+ for chunk_count in range(12, 31):
+ for tests_count in get_divisors(chunk_count):
+ chunks = get_chunks(range(tests_count), chunk_count, "SEQUENTIAL")
+ assert check_not_intersect(chunks)
+ assert check_max_diff(chunks) == 0
+ assert len(chunks) == chunk_count
+ chunks = get_chunks(range(tests_count), chunk_count, "MODULO")
+ assert check_not_intersect(chunks)
+ assert check_max_diff(chunks) == 0
+ assert len(chunks) == chunk_count
+
+
+def chunk_count_equal_tests_count():
+ for chunk_count in range(10, 20):
+ tests_count = chunk_count
+ chunks = get_chunks(range(tests_count), chunk_count, "SEQUENTIAL")
+ assert check_not_intersect(chunks)
+ assert check_max_diff(chunks) <= 1
+ assert len(chunks) == chunk_count
+ chunks = get_chunks(range(tests_count), chunk_count, "MODULO")
+ assert check_not_intersect(chunks)
+ assert check_max_diff(chunks) <= 1
+ assert len(chunks) == chunk_count
diff --git a/library/python/testing/yatest_lib/tests/ya.make b/library/python/testing/yatest_lib/tests/ya.make
new file mode 100644
index 0000000000..8586c6ef7d
--- /dev/null
+++ b/library/python/testing/yatest_lib/tests/ya.make
@@ -0,0 +1,14 @@
+OWNER(g:yatest)
+
+PY23_TEST()
+
+PEERDIR(
+ library/python/testing/yatest_lib
+)
+
+TEST_SRCS(
+ test_external.py
+ test_testsplitter.py
+)
+
+END()
diff --git a/library/python/testing/yatest_lib/tools.py b/library/python/testing/yatest_lib/tools.py
new file mode 100644
index 0000000000..b72d79c162
--- /dev/null
+++ b/library/python/testing/yatest_lib/tools.py
@@ -0,0 +1,64 @@
+import six
+import sys
+
+
+def to_utf8(value):
+ """
+ Converts value to string encoded into utf-8
+ :param value:
+ :return:
+ """
+ if sys.version_info[0] < 3:
+ if not isinstance(value, basestring): # noqa
+ value = unicode(value) # noqa
+ if type(value) == str:
+ value = value.decode("utf-8", errors="ignore")
+ return value.encode('utf-8', 'ignore')
+ else:
+ return str(value)
+
+
+def trim_string(s, max_bytes):
+ """
+ Adjusts the length of the string s in order to fit it
+ into max_bytes bytes of storage after encoding as UTF-8.
+ Useful when cutting filesystem paths.
+ :param s: unicode string
+ :param max_bytes: number of bytes
+ :return the prefix of s
+ """
+ if isinstance(s, six.text_type):
+ return _trim_unicode_string(s, max_bytes)
+
+ if isinstance(s, six.binary_type):
+ if len(s) <= max_bytes:
+ return s
+ s = s.decode('utf-8', errors='ignore')
+ s = _trim_unicode_string(s, max_bytes)
+ s = s.encode('utf-8', errors='ignore')
+ return s
+
+ raise TypeError('a string is expected')
+
+
+def _trim_unicode_string(s, max_bytes):
+ if len(s) * 4 <= max_bytes:
+ # UTF-8 uses at most 4 bytes per character
+ return s
+
+ result = []
+ cur_byte_length = 0
+
+ for ch in s:
+ cur_byte_length += len(ch.encode('utf-8'))
+ if cur_byte_length > max_bytes:
+ break
+ result.append(ch)
+
+ return ''.join(result)
+
+
+def to_str(s):
+ if six.PY2 and isinstance(s, six.text_type):
+ return s.encode('utf8')
+ return s
diff --git a/library/python/testing/yatest_lib/ya.make b/library/python/testing/yatest_lib/ya.make
new file mode 100644
index 0000000000..342bae82ba
--- /dev/null
+++ b/library/python/testing/yatest_lib/ya.make
@@ -0,0 +1,26 @@
+OWNER(g:yatest)
+
+PY23_LIBRARY()
+
+PY_SRCS(
+ NAMESPACE
+ yatest_lib
+ external.py
+ test_splitter.py
+ tools.py
+ ya.py
+)
+
+PEERDIR(
+ contrib/python/six
+)
+
+IF(PYTHON2)
+ PEERDIR(
+ contrib/python/enum34
+ )
+ENDIF()
+
+END()
+
+RECURSE_FOR_TESTS(tests)
diff --git a/library/python/testing/yatest_lib/ya.py b/library/python/testing/yatest_lib/ya.py
new file mode 100644
index 0000000000..c13b58a19f
--- /dev/null
+++ b/library/python/testing/yatest_lib/ya.py
@@ -0,0 +1,239 @@
+import os
+import sys
+import logging
+import json
+
+from .tools import to_str
+from .external import ExternalDataInfo
+
+
+TESTING_OUT_DIR_NAME = "testing_out_stuff" # XXX import from test.const
+
+yatest_logger = logging.getLogger("ya.test")
+
+
+class RunMode(object):
+ Run = "run"
+ List = "list"
+
+
+class TestMisconfigurationException(Exception):
+ pass
+
+
+class Ya(object):
+ """
+ Adds integration with ya, helps in finding dependencies
+ """
+
+ def __init__(
+ self,
+ mode=None,
+ source_root=None,
+ build_root=None,
+ dep_roots=None,
+ output_dir=None,
+ test_params=None,
+ context=None,
+ python_path=None,
+ valgrind_path=None,
+ gdb_path=None,
+ data_root=None,
+ ):
+ context_file_path = os.environ.get("YA_TEST_CONTEXT_FILE", None)
+ if context_file_path:
+ with open(context_file_path, 'r') as afile:
+ test_context = json.load(afile)
+ context_runtime = test_context["runtime"]
+ context_internal = test_context.get("internal", {})
+ context_build = test_context.get("build", {})
+ else:
+ context_runtime = {}
+ context_internal = {}
+ context_build = {}
+ self._mode = mode
+ self._build_root = to_str(context_runtime.get("build_root", "")) or build_root
+ self._source_root = to_str(context_runtime.get("source_root", "")) or source_root or self._detect_source_root()
+ self._output_dir = to_str(context_runtime.get("output_path", "")) or output_dir or self._detect_output_root()
+ if not self._output_dir:
+ raise Exception("Run ya make -t before running test binary")
+ if not self._source_root:
+ logging.warning("Source root was not set neither determined, use --source-root to set it explicitly")
+ if not self._build_root:
+ if self._source_root:
+ self._build_root = self._source_root
+ else:
+ logging.warning("Build root was not set neither determined, use --build-root to set it explicitly")
+
+ if data_root:
+ self._data_root = data_root
+ elif self._source_root:
+ self._data_root = os.path.abspath(os.path.join(self._source_root, "..", "arcadia_tests_data"))
+
+ self._dep_roots = dep_roots
+
+ self._python_path = to_str(context_runtime.get("python_bin", "")) or python_path
+ self._valgrind_path = valgrind_path
+ self._gdb_path = to_str(context_runtime.get("gdb_bin", "")) or gdb_path
+ self._test_params = {}
+ self._context = {}
+ self._test_item_node_id = None
+
+ ram_drive_path = to_str(context_runtime.get("ram_drive_path", ""))
+ if ram_drive_path:
+ self._test_params["ram_drive_path"] = ram_drive_path
+ if test_params:
+ self._test_params.update(dict(x.split('=', 1) for x in test_params))
+ self._test_params.update(context_runtime.get("test_params", {}))
+
+ self._context["project_path"] = context_runtime.get("project_path")
+ self._context["modulo"] = context_runtime.get("split_count", 1)
+ self._context["modulo_index"] = context_runtime.get("split_index", 0)
+ self._context["work_path"] = context_runtime.get("work_path")
+
+ self._context["sanitize"] = context_build.get("sanitizer")
+ self._context["ya_trace_path"] = context_internal.get("trace_file")
+
+ self._env_file = context_internal.get("env_file")
+
+ if context:
+ self._context.update(context)
+
+ @property
+ def source_root(self):
+ return self._source_root
+
+ @property
+ def data_root(self):
+ return self._data_root
+
+ @property
+ def build_root(self):
+ return self._build_root
+
+ @property
+ def dep_roots(self):
+ return self._dep_roots
+
+ @property
+ def output_dir(self):
+ return self._output_dir
+
+ @property
+ def python_path(self):
+ return self._python_path or sys.executable
+
+ @property
+ def valgrind_path(self):
+ if not self._valgrind_path:
+ raise ValueError("path to valgrind was not pass correctly, use --valgrind-path to fix it")
+ return self._valgrind_path
+
+ @property
+ def gdb_path(self):
+ return self._gdb_path
+
+ @property
+ def env_file(self):
+ return self._env_file
+
+ def get_binary(self, *path):
+ assert self._build_root, "Build root was not set neither determined, use --build-root to set it explicitly"
+ path = list(path)
+ if os.name == "nt":
+ if not path[-1].endswith(".exe"):
+ path[-1] += ".exe"
+
+ target_dirs = [self.build_root]
+ # Search for binaries within PATH dirs to be able to get path to the binaries specified by basename for exectests
+ if 'PATH' in os.environ:
+ target_dirs += os.environ['PATH'].split(':')
+
+ for target_dir in target_dirs:
+ binary_path = os.path.join(target_dir, *path)
+ if os.path.exists(binary_path):
+ yatest_logger.debug("Binary was found by %s", binary_path)
+ return binary_path
+
+ error_message = "Cannot find binary '{binary}': make sure it was added in the DEPENDS section".format(binary=path)
+ yatest_logger.debug(error_message)
+ if self._mode == RunMode.Run:
+ raise TestMisconfigurationException(error_message)
+
+ def file(self, path, diff_tool=None, local=False, diff_file_name=None, diff_tool_timeout=None):
+ return ExternalDataInfo.serialize_file(path, diff_tool=diff_tool, local=local, diff_file_name=diff_file_name, diff_tool_timeout=diff_tool_timeout)
+
+ def get_param(self, key, default=None):
+ return self._test_params.get(key, default)
+
+ def get_param_dict_copy(self):
+ return dict(self._test_params)
+
+ def get_context(self, key):
+ return self._context.get(key)
+
+ def _detect_source_root(self):
+ root = None
+ try:
+ import library.python.find_root
+ # try to determine source root from cwd
+ cwd = os.getcwd()
+ root = library.python.find_root.detect_root(cwd)
+
+ if not root:
+ # try to determine root pretending we are in the test work dir made from --keep-temps run
+ env_subdir = os.path.join("environment", "arcadia")
+ root = library.python.find_root.detect_root(cwd, detector=lambda p: os.path.exists(os.path.join(p, env_subdir)))
+ except ImportError:
+ logging.warning("Unable to import library.python.find_root")
+
+ return root
+
+ def _detect_output_root(self):
+
+ # if run from kept test working dir
+ if os.path.exists(TESTING_OUT_DIR_NAME):
+ return TESTING_OUT_DIR_NAME
+
+ # if run from source dir
+ if sys.version_info.major == 3:
+ test_results_dir = "py3test"
+ else:
+ test_results_dir = "pytest"
+
+ test_results_output_path = os.path.join("test-results", test_results_dir, TESTING_OUT_DIR_NAME)
+ if os.path.exists(test_results_output_path):
+ return test_results_output_path
+
+ if os.path.exists(os.path.dirname(test_results_output_path)):
+ os.mkdir(test_results_output_path)
+ return test_results_output_path
+
+ return None
+
+ def set_test_item_node_id(self, node_id):
+ self._test_item_node_id = node_id
+
+ def get_test_item_node_id(self):
+ assert self._test_item_node_id
+ return self._test_item_node_id
+
+ @property
+ def pytest_config(self):
+ if not hasattr(self, "_pytest_config"):
+ import library.python.pytest.plugins.ya as ya_plugin
+ self._pytest_config = ya_plugin.pytest_config
+ return self._pytest_config
+
+ def set_metric_value(self, name, val):
+ node_id = self.get_test_item_node_id()
+ if node_id not in self.pytest_config.test_metrics:
+ self.pytest_config.test_metrics[node_id] = {}
+
+ self.pytest_config.test_metrics[node_id][name] = val
+
+ def get_metric_value(self, name, default=None):
+ res = self.pytest_config.test_metrics.get(self.get_test_item_node_id(), {}).get(name)
+ if res is None:
+ return default
+ return res