diff options
author | Pavel <pefavel@ydb.tech> | 2025-07-30 12:17:31 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-07-30 11:17:31 +0200 |
commit | 2f87b776a601db60d6d1b7484897545d96fa3112 (patch) | |
tree | 45dd723755f5765a116437a6418cb5d33b3fdc67 | |
parent | 1cabcac91c47bc5fa7caaa2468a4bee19664dc35 (diff) | |
download | ydb-2f87b776a601db60d6d1b7484897545d96fa3112.tar.gz |
Add workload topic to Nemesis toolset (#21871)
-rw-r--r-- | ydb/tests/olap/load/README.md | 6 | ||||
-rw-r--r-- | ydb/tests/olap/load/lib/workload_topic.py | 46 | ||||
-rw-r--r-- | ydb/tests/olap/load/lib/ya.make | 1 | ||||
-rw-r--r-- | ydb/tests/olap/load/test_workload_topic.py | 1 | ||||
-rw-r--r-- | ydb/tests/olap/load/ya.make | 3 | ||||
-rw-r--r-- | ydb/tests/stress/log/__main__.py | 29 | ||||
-rw-r--r-- | ydb/tests/stress/log/tests/test_workload.py | 61 | ||||
-rw-r--r-- | ydb/tests/stress/log/tests/ya.make | 2 | ||||
-rw-r--r-- | ydb/tests/stress/log/workload/workload_log.py | 98 | ||||
-rw-r--r-- | ydb/tests/stress/log/workload/ya.make | 19 | ||||
-rw-r--r-- | ydb/tests/stress/log/ya.make | 16 | ||||
-rw-r--r-- | ydb/tests/stress/topic/__main__.py | 31 | ||||
-rw-r--r-- | ydb/tests/stress/topic/tests/test_workload_topic.py | 22 | ||||
-rw-r--r-- | ydb/tests/stress/topic/tests/ya.make | 24 | ||||
-rw-r--r-- | ydb/tests/stress/topic/workload/workload_topic.py | 65 | ||||
-rw-r--r-- | ydb/tests/stress/topic/workload/ya.make | 18 | ||||
-rw-r--r-- | ydb/tests/stress/topic/ya.make | 16 |
17 files changed, 402 insertions, 56 deletions
diff --git a/ydb/tests/olap/load/README.md b/ydb/tests/olap/load/README.md index 4521af1a0e1..d1b1d2f99cf 100644 --- a/ydb/tests/olap/load/README.md +++ b/ydb/tests/olap/load/README.md @@ -94,6 +94,12 @@ from .lib.workload_my_workload import TestMyWorkload # Тест автоматически будет импортирован и запущен ``` +### 3. Добавление новго файла в ya.make + +Добавить новые файлы с тестами и путь до ворклоада в `ydb/tests/olap/load/ya.make` и в `ydb/tests/olap/load/lib/ya.make` + + + --- ## Setup и Teardown diff --git a/ydb/tests/olap/load/lib/workload_topic.py b/ydb/tests/olap/load/lib/workload_topic.py new file mode 100644 index 00000000000..f710408b1ce --- /dev/null +++ b/ydb/tests/olap/load/lib/workload_topic.py @@ -0,0 +1,46 @@ +import time +import pytest +from .workload_executor import WorkloadTestBase +from ydb.tests.olap.lib.ydb_cluster import YdbCluster +from ydb.tests.olap.lib.utils import get_external_param + +import logging +LOGGER = logging.getLogger(__name__) + + +class WorkloadTopicBase(WorkloadTestBase): + workload_binary_name = 'workload_topic' + workload_env_var = 'TOPIC_WORKLOAD_BINARY' + + @pytest.mark.parametrize( + 'nemesis_enabled', [True, False], + ids=['nemesis_true', 'nemesis_false'] + ) + def test_workload_topic(self, nemesis_enabled: bool): + command_args_template = ( + "--endpoint grpc://{node_host}:2135 " + f"--database /{YdbCluster.ydb_database} " + "--topic_prefix workload_topic_{node_host}_iter_{iteration_num}_{uuid} " + ) + + additional_stats = { + "workload_type": "topic", + "topic_template": "workload_topic_{node_host}_iter_{iteration_num}_{uuid}", + "nemesis": nemesis_enabled, + "test_timestamp": int(time.time()), + } + + self.execute_workload_test( + workload_name=f"TopicWorkload_nemesis_{nemesis_enabled}", + command_args=command_args_template, + duration_value=self.timeout, + additional_stats=additional_stats, + use_chunks=True, + duration_param="--duration", + nemesis=nemesis_enabled, + nodes_percentage=100 + ) + + +class TestWorkloadTopic(WorkloadTopicBase): + timeout = int(get_external_param('workload_duration', 120)) diff --git a/ydb/tests/olap/load/lib/ya.make b/ydb/tests/olap/load/lib/ya.make index 9e5850cec14..0c9202379a5 100644 --- a/ydb/tests/olap/load/lib/ya.make +++ b/ydb/tests/olap/load/lib/ya.make @@ -10,6 +10,7 @@ PY3_LIBRARY() workload_executor.py workload_simple_queue.py workload_oltp.py + workload_topic.py upload.py ) diff --git a/ydb/tests/olap/load/test_workload_topic.py b/ydb/tests/olap/load/test_workload_topic.py new file mode 100644 index 00000000000..352db314749 --- /dev/null +++ b/ydb/tests/olap/load/test_workload_topic.py @@ -0,0 +1 @@ +from ydb.tests.olap.load.lib.workload_topic import * # noqa diff --git a/ydb/tests/olap/load/ya.make b/ydb/tests/olap/load/ya.make index 69d81d660df..5646fa437b1 100644 --- a/ydb/tests/olap/load/ya.make +++ b/ydb/tests/olap/load/ya.make @@ -7,6 +7,7 @@ PY3TEST() ENV(YDB_CLI_BINARY="ydb/apps/ydb/ydb") ENV(SIMPLE_QUEUE_BINARY="ydb/tests/stress/simple_queue/simple_queue") ENV(OLTP_WORKLOAD_BINARY="ydb/tests/stress/oltp_workload/oltp_workload") + ENV(TOPIC_WORKLOAD_BINARY="ydb/tests/stress/topic/workload_topic") ENV(NEMESIS_BINARY="ydb/tests/tools/nemesis/driver/nemesis") TEST_SRCS ( @@ -18,6 +19,7 @@ PY3TEST() test_upload.py test_workload_simple_queue.py test_workload_oltp.py + test_workload_topic.py ) PEERDIR ( @@ -28,6 +30,7 @@ PY3TEST() DEPENDS ( ydb/apps/ydb ydb/tests/stress/simple_queue + ydb/tests/stress/topic ydb/tests/stress/oltp_workload ydb/tests/tools/nemesis/driver ) diff --git a/ydb/tests/stress/log/__main__.py b/ydb/tests/stress/log/__main__.py new file mode 100644 index 00000000000..a1740ef2a28 --- /dev/null +++ b/ydb/tests/stress/log/__main__.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- +import argparse +import logging +from ydb.tests.stress.log.workload.workload_log import YdbLogWorkload + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description="Workload log wrapper", formatter_class=argparse.RawDescriptionHelpFormatter + ) + parser.add_argument('--endpoint', default='grpc://localhost:2135', help="YDB endpoint") + parser.add_argument('--database', default=None, required=True, help='A database to connect') + parser.add_argument('--duration', default=120, type=lambda x: int(x), help='A duration of workload in seconds') + parser.add_argument('--store_type', default='row', choices=['row', 'column'], help='Table type either row or column') + parser.add_argument('--log_file', default=None, help='Append log into specified file') + + args = parser.parse_args() + + if args.log_file: + logging.basicConfig( + filename=args.log_file, + filemode='a', + format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s', + datefmt='%H:%M:%S', + level=logging.INFO + ) + + workload = YdbLogWorkload(args.endpoint, args.database, args.store_type, f'log_{args.store_type}') + workload.start() + workload.join() diff --git a/ydb/tests/stress/log/tests/test_workload.py b/ydb/tests/stress/log/tests/test_workload.py index f12f8338760..0e255d7b9b5 100644 --- a/ydb/tests/stress/log/tests/test_workload.py +++ b/ydb/tests/stress/log/tests/test_workload.py @@ -14,59 +14,12 @@ class TestYdbLogWorkload(StressFixture): 'disabled_on_scheme_shard': False, }) - def get_command_prefix(self, subcmds: list[str], path: str) -> list[str]: - return [ - yatest.common.binary_path(os.getenv('YDB_CLI_BINARY')), - '--verbose', - '--endpoint', self.endpoint, - '--database={}'.format(self.database), - 'workload', 'log' - ] + subcmds + [ - '--path', path - ] - - @classmethod - def get_insert_command_params(cls) -> list[str]: - return [ - '--int-cols', '2', - '--str-cols', '5', - '--key-cols', '4', - '--len', '200', - ] - @pytest.mark.parametrize('store_type', ['row', 'column']) def test(self, store_type): - upload_commands = [ - # import command - self.get_command_prefix(subcmds=['import', '--bulk-size', '1000', '-t', '1', 'generator'], path=store_type) + self.get_insert_command_params() + ['--rows', '100000'], - # bulk upsert workload - self.get_command_prefix(subcmds=['run', 'bulk_upsert'], path=store_type) + self.get_insert_command_params() + ['--seconds', '10', '--threads', '10'], - - # upsert workload - self.get_command_prefix(subcmds=['run', 'upsert'], path=store_type) + self.get_insert_command_params() + ['--seconds', '10', '--threads', '10'], - - # insert workload - self.get_command_prefix(subcmds=['run', 'insert'], path=store_type) + self.get_insert_command_params() + ['--seconds', '10', '--threads', '10'], - ] - - # init - yatest.common.execute( - self.get_command_prefix(subcmds=['init'], path=store_type) + self.get_insert_command_params() + [ - '--store', store_type, - '--min-partitions', '100', - '--partition-size', '10', - '--auto-partition', '0', - ], - ) - - select = yatest.common.execute( - self.get_command_prefix(subcmds=['run', 'select'], path=store_type) + [ - '--client-timeout', '10000', - '--threads', '10', - '--seconds', str(10 * len(upload_commands)), - ], wait=False) - - for command in upload_commands: - yatest.common.execute(command, wait=True) - - select.wait() + yatest.common.execute([ + yatest.common.binary_path(os.environ["YDB_WORKLOAD_PATH"]), + "--endpoint", self.endpoint, + "--database", self.database, + "--store_type", store_type, + "--duration", "120", + ]) diff --git a/ydb/tests/stress/log/tests/ya.make b/ydb/tests/stress/log/tests/ya.make index 318764ae911..ca39ab1014b 100644 --- a/ydb/tests/stress/log/tests/ya.make +++ b/ydb/tests/stress/log/tests/ya.make @@ -1,6 +1,7 @@ PY3TEST() INCLUDE(${ARCADIA_ROOT}/ydb/tests/ydbd_dep.inc) ENV(YDB_CLI_BINARY="ydb/apps/ydb/ydb") +ENV(YDB_WORKLOAD_PATH="ydb/tests/stress/log/workload_log") TEST_SRCS( test_workload.py @@ -11,6 +12,7 @@ REQUIREMENTS(ram:32) DEPENDS( ydb/apps/ydb + ydb/tests/stress/log ) PEERDIR( diff --git a/ydb/tests/stress/log/workload/workload_log.py b/ydb/tests/stress/log/workload/workload_log.py new file mode 100644 index 00000000000..e567f4706a0 --- /dev/null +++ b/ydb/tests/stress/log/workload/workload_log.py @@ -0,0 +1,98 @@ +from concurrent.futures import ThreadPoolExecutor +import logging +import subprocess +import tempfile +import os +import stat +from library.python import resource + + +from ydb.tests.stress.common.common import WorkloadBase + +logger = logging.getLogger("YdbLogWorkload") + + +class YdbLogWorkload(WorkloadBase): + def __init__(self, endpoint, database, store_type, tables_prefix): + super().__init__(None, tables_prefix, 'log', None) + self.store_type = store_type + self.endpoint = endpoint + self.database = database + self._unpack_resource('ydb_cli') + + def _unpack_resource(self, name): + self.working_dir = os.path.join(tempfile.gettempdir(), "ydb_cli") + os.makedirs(self.working_dir, exist_ok=True) + res = resource.find(name) + path_to_unpack = os.path.join(self.working_dir, name) + with open(path_to_unpack, "wb") as f: + f.write(res) + + st = os.stat(path_to_unpack) + os.chmod(path_to_unpack, st.st_mode | stat.S_IEXEC) + self.cli_path = path_to_unpack + + def get_command_prefix(self, subcmds: list[str], path: str) -> list[str]: + return [ + self.cli_path, + '--verbose', + '--endpoint', self.endpoint, + '--database={}'.format(self.database), + 'workload', 'log' + ] + subcmds + [ + '--path', path + ] + + def cmd_run(self, cmd): + logger.debug(f"Running cmd {cmd}") + subprocess.run(cmd, check=True, text=True) + + @classmethod + def get_insert_command_params(cls) -> list[str]: + return [ + '--int-cols', '2', + '--str-cols', '5', + '--key-cols', '4', + '--len', '200', + ] + + def __loop(self): + upload_commands = [ + # import command + self.get_command_prefix(subcmds=['import', '--bulk-size', '1000', '-t', '1', 'generator'], path=self.store_type) + self.get_insert_command_params() + ['--rows', '100000'], + # bulk upsert workload + self.get_command_prefix(subcmds=['run', 'bulk_upsert'], path=self.store_type) + self.get_insert_command_params() + ['--seconds', '10', '--threads', '10'], + + # upsert workload + self.get_command_prefix(subcmds=['run', 'upsert'], path=self.store_type) + self.get_insert_command_params() + ['--seconds', '10', '--threads', '10'], + + # insert workload + self.get_command_prefix(subcmds=['run', 'insert'], path=self.store_type) + self.get_insert_command_params() + ['--seconds', '10', '--threads', '10'], + ] + + # init + + self.cmd_run( + self.get_command_prefix(subcmds=['init'], path=self.store_type) + self.get_insert_command_params() + [ + '--store', self.store_type, + '--min-partitions', '100', + '--partition-size', '10', + '--auto-partition', '0', + ], + ) + with ThreadPoolExecutor() as executor: + executor.submit( + self.cmd_run, + self.get_command_prefix(subcmds=['run', 'select'], path=self.store_type) + [ + '--client-timeout', '10000', + '--threads', '10', + '--seconds', str(10 * len(upload_commands)), + ] + ) + + for command in upload_commands: + self.cmd_run(command) + + def get_workload_thread_funcs(self): + r = [self.__loop] + return r diff --git a/ydb/tests/stress/log/workload/ya.make b/ydb/tests/stress/log/workload/ya.make new file mode 100644 index 00000000000..cb3d0bce000 --- /dev/null +++ b/ydb/tests/stress/log/workload/ya.make @@ -0,0 +1,19 @@ +PY3_LIBRARY() + +PY_SRCS( + workload_log.py +) + +BUNDLE( + ydb/apps/ydb NAME ydb_cli +) +RESOURCE(ydb_cli ydb_cli) +PEERDIR( + ydb/tests/stress/common + ydb/public/sdk/python + library/python/testing/yatest_common + library/python/resource + ydb/public/sdk/python/enable_v3_new_behavior +) + +END() diff --git a/ydb/tests/stress/log/ya.make b/ydb/tests/stress/log/ya.make index d850faba6b2..c66eabc88cb 100644 --- a/ydb/tests/stress/log/ya.make +++ b/ydb/tests/stress/log/ya.make @@ -1,4 +1,16 @@ -RECURSE_FOR_TESTS( - tests +PY3_PROGRAM(workload_log) + +PY_SRCS( + __main__.py +) + +PEERDIR( + ydb/tests/stress/common + ydb/tests/stress/log/workload ) +END() + +RECURSE_FOR_TESTS( + tests +)
\ No newline at end of file diff --git a/ydb/tests/stress/topic/__main__.py b/ydb/tests/stress/topic/__main__.py new file mode 100644 index 00000000000..84a358785e1 --- /dev/null +++ b/ydb/tests/stress/topic/__main__.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +import argparse +import logging +from ydb.tests.stress.topic.workload.workload_topic import YdbTopicWorkload + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description="Workload topic wrapper", formatter_class=argparse.RawDescriptionHelpFormatter + ) + parser.add_argument('--endpoint', default='grpc://localhost:2135', help="YDB endpoint") + parser.add_argument('--database', default=None, required=True, help='A database to connect') + parser.add_argument('--duration', default=120, type=lambda x: int(x), help='A duration of workload in seconds') + parser.add_argument('--consumers', default=50, type=lambda x: int(x), help='Consumers of the topic') + parser.add_argument('--producers', default=100, type=lambda x: int(x), help='Producers of the topic') + parser.add_argument('--topic_prefix', default='topic', help='Topic name') + parser.add_argument('--log_file', default=None, help='Append log into specified file') + + args = parser.parse_args() + + if args.log_file: + logging.basicConfig( + filename=args.log_file, + filemode='a', + format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s', + datefmt='%H:%M:%S', + level=logging.INFO + ) + + workload = YdbTopicWorkload(args.endpoint, args.database, duration=args.duration, consumers=args.consumers, producers=args.producers, tables_prefix=args.topic_prefix) + workload.start() + workload.join() diff --git a/ydb/tests/stress/topic/tests/test_workload_topic.py b/ydb/tests/stress/topic/tests/test_workload_topic.py new file mode 100644 index 00000000000..c116fc31315 --- /dev/null +++ b/ydb/tests/stress/topic/tests/test_workload_topic.py @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- +import os +import pytest +import yatest + +from ydb.tests.library.stress.fixtures import StressFixture + + +class TestYdbTopicWorkload(StressFixture): + @pytest.fixture(autouse=True, scope="function") + def setup(self): + yield from self.setup_cluster() + + def test(self): + yatest.common.execute([ + yatest.common.binary_path(os.environ["YDB_WORKLOAD_PATH"]), + "--endpoint", self.endpoint, + "--database", self.database, + "--duration", "60", + "--consumers", "50", + "--producers", "100", + ]) diff --git a/ydb/tests/stress/topic/tests/ya.make b/ydb/tests/stress/topic/tests/ya.make new file mode 100644 index 00000000000..9c940a1cfc6 --- /dev/null +++ b/ydb/tests/stress/topic/tests/ya.make @@ -0,0 +1,24 @@ +PY3TEST() +INCLUDE(${ARCADIA_ROOT}/ydb/tests/ydbd_dep.inc) +ENV(YDB_CLI_BINARY="ydb/apps/ydb/ydb") +ENV(YDB_WORKLOAD_PATH="ydb/tests/stress/topic/workload_topic") + +TEST_SRCS( + test_workload_topic.py +) + +SIZE(MEDIUM) +REQUIREMENTS(ram:32) + +DEPENDS( + ydb/apps/ydb + ydb/tests/stress/topic +) + +PEERDIR( + ydb/tests/library + ydb/tests/library/stress +) + + +END() diff --git a/ydb/tests/stress/topic/workload/workload_topic.py b/ydb/tests/stress/topic/workload/workload_topic.py new file mode 100644 index 00000000000..47dbe3c63b5 --- /dev/null +++ b/ydb/tests/stress/topic/workload/workload_topic.py @@ -0,0 +1,65 @@ +import logging +import subprocess +import tempfile +import os +import stat +from library.python import resource + + +from ydb.tests.stress.common.common import WorkloadBase + +logger = logging.getLogger("YdbTopicWorkload") + + +class YdbTopicWorkload(WorkloadBase): + def __init__(self, endpoint, database, duration, consumers, producers, tables_prefix): + super().__init__(None, tables_prefix, 'topic', None) + self.endpoint = endpoint + self.database = database + self.duration = str(duration) + self.consumers = str(consumers) + self.producers = str(producers) + self._unpack_resource('ydb_cli') + + def _unpack_resource(self, name): + self.working_dir = os.path.join(tempfile.gettempdir(), "ydb_cli") + os.makedirs(self.working_dir, exist_ok=True) + res = resource.find(name) + path_to_unpack = os.path.join(self.working_dir, name) + with open(path_to_unpack, "wb") as f: + f.write(res) + + st = os.stat(path_to_unpack) + os.chmod(path_to_unpack, st.st_mode | stat.S_IEXEC) + self.cli_path = path_to_unpack + + def get_command_prefix(self, subcmds: list[str]) -> list[str]: + return [ + self.cli_path, + '--verbose', + '--endpoint', self.endpoint, + '--database={}'.format(self.database), + 'workload', 'topic' + ] + subcmds + ['--topic', f'{self.table_prefix}'] + + def cmd_run(self, cmd): + logger.debug(f"Running cmd {cmd}") + subprocess.run(cmd, check=True, text=True) + + def __loop(self): + # init + self.cmd_run( + self.get_command_prefix(subcmds=['init', '-c', self.consumers, '-p', self.producers]) + ) + # run + self.cmd_run( + self.get_command_prefix(subcmds=['run', 'full', '-s', self.duration, '--byte-rate', '100M', '--use-tx', '--tx-commit-interval', '2000', '-p', self.producers, '-c', self.consumers]) + ) + # clean + self.cmd_run( + self.get_command_prefix(subcmds=['clean']) + ) + + def get_workload_thread_funcs(self): + r = [self.__loop] + return r diff --git a/ydb/tests/stress/topic/workload/ya.make b/ydb/tests/stress/topic/workload/ya.make new file mode 100644 index 00000000000..fe8f877eb09 --- /dev/null +++ b/ydb/tests/stress/topic/workload/ya.make @@ -0,0 +1,18 @@ +PY3_LIBRARY() + +PY_SRCS( + workload_topic.py +) + +BUNDLE( + ydb/apps/ydb NAME ydb_cli +) +RESOURCE(ydb_cli ydb_cli) +PEERDIR( + ydb/tests/stress/common + ydb/public/sdk/python + library/python/resource + ydb/public/sdk/python/enable_v3_new_behavior +) + +END() diff --git a/ydb/tests/stress/topic/ya.make b/ydb/tests/stress/topic/ya.make new file mode 100644 index 00000000000..e08853f3249 --- /dev/null +++ b/ydb/tests/stress/topic/ya.make @@ -0,0 +1,16 @@ +PY3_PROGRAM(workload_topic) + +PY_SRCS( + __main__.py +) + +PEERDIR( + ydb/tests/stress/common + ydb/tests/stress/topic/workload +) + +END() + +RECURSE_FOR_TESTS( + tests +)
\ No newline at end of file |