diff options
author | Pavel <[email protected]> | 2025-08-27 17:37:11 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2025-08-27 17:37:11 +0300 |
commit | bc5e0001e00ac6770da34981d8408cfa6044ab36 (patch) | |
tree | 7f0d96f60004607aa02271fb91c5ddd697fccc24 | |
parent | 8bc98dd3424ae3f61c5fadff471ad5a37c41c8e8 (diff) |
feat: added new workloads to nemesis tool (#23640)
Added ctas, kafka, topic-kafka workloads
Refs: #23483
-rw-r--r-- | ydb/tests/stability/tool/__main__.py | 112 | ||||
-rw-r--r-- | ydb/tests/stability/tool/ya.make | 8 | ||||
-rw-r--r-- | ydb/tests/stress/kafka/workload/__init__.py | 7 | ||||
-rw-r--r-- | ydb/tests/stress/topic/__main__.py | 2 | ||||
-rw-r--r-- | ydb/tests/stress/topic/workload/__init__.py (renamed from ydb/tests/stress/topic/workload/workload_topic.py) | 5 | ||||
-rw-r--r-- | ydb/tests/stress/topic/workload/ya.make | 2 | ||||
-rw-r--r-- | ydb/tests/stress/topic_kafka/__main__.py | 2 | ||||
-rw-r--r-- | ydb/tests/stress/topic_kafka/workload/__init__.py (renamed from ydb/tests/stress/topic_kafka/workload/workload_topic.py) | 2 | ||||
-rw-r--r-- | ydb/tests/stress/topic_kafka/workload/ya.make | 2 |
9 files changed, 107 insertions, 35 deletions
diff --git a/ydb/tests/stability/tool/__main__.py b/ydb/tests/stability/tool/__main__.py index eb939a34f7c..126c6e9c9e3 100644 --- a/ydb/tests/stability/tool/__main__.py +++ b/ydb/tests/stability/tool/__main__.py @@ -145,12 +145,36 @@ DICT_OF_PROCESSES = { }, 'workload_topic' : { 'status' : """ - if ps aux | grep -E "/Berkanavt/nemesis/bin/ydb_cli.*workload.*topic.*run.*|/tmp/workload_topic.sh" | grep -v grep > /dev/null; then + if ps aux | grep -E "/Berkanavt/nemesis/bin/topic_workload|/tmp/topic_workload.sh" | grep -v grep > /dev/null; then echo "Running" else echo "Stopped" fi""" - } + }, + 'workload_ctas' : { + 'status' : """ + if ps aux | grep -E "/Berkanavt/nemesis/bin/ctas_workload|/tmp/ctas_workload.sh" | grep -v grep > /dev/null; then + echo "Running" + else + echo "Stopped" + fi""" + }, + 'workload_topic_kafka' : { + 'status' : """ + if ps aux | grep -E "/Berkanavt/nemesis/bin/topic_kafka_workload|/tmp/topic_kafka_workload.sh" | grep -v grep > /dev/null; then + echo "Running" + else + echo "Stopped" + fi""" + }, + 'workload_kafka' : { + 'status' : """ + if ps aux | grep -E "/Berkanavt/nemesis/bin/kafka_workload|/tmp/kafka_workload.sh" | grep -v grep > /dev/null; then + echo "Running" + else + echo "Stopped" + fi""" + }, } @@ -187,6 +211,9 @@ class CustomArgumentParser(argparse.ArgumentParser): "start_workload_node_broker_workload": "Start Node Broker workload", "start_workload_transfer_workload": "Start topic to table transfer workload", "start_workload_s3_backups_workload": "Start auto removal of tmp tables workload", + "start_topic_kafka_workload": "start_topic_kafka_workload", + "start_kafka_workload": "start_kafka_workload", + "start_ctas_workload": "start_ctas_workload", "start_workload_log": "Start log workloads with both row and column storage", "start_workload_log_column": "Start log workload with column storage", "start_workload_log_row": "Start log workload with row storage", @@ -242,6 +269,10 @@ class StabilityCluster: self._unpack_resource('transfer_workload'), self._unpack_resource('s3_backups_workload'), self._unpack_resource('statistics_workload'), + self._unpack_resource('ctas_workload'), + self._unpack_resource('topic_kafka_workload'), + self._unpack_resource('kafka_workload'), + self._unpack_resource('topic_workload'), self._unpack_resource('ydb_cli'), ) @@ -1251,6 +1282,9 @@ Common usage scenarios: "start_workload_node_broker_workload": "Start Node Broker workload", "start_workload_transfer_workload": "Start topic to table transfer workload", "start_workload_s3_backups_workload": "Start auto removal of tmp tables workload", + "start_topic_kafka_workload": "Start auto removal of tmp tables workload", + "start_kafka_workload": "Start auto removal of tmp tables workload", + "start_ctas_workload": "Start auto removal of tmp tables workload", "start_workload_log": "Start log workloads with both row and column storage", "start_workload_log_column": "Start log workload with column storage", "start_workload_log_row": "Start log workload with row storage", @@ -1284,6 +1318,9 @@ Common usage scenarios: "start_workload_node_broker_workload", "start_workload_transfer_workload", "start_workload_s3_backups_workload", + "start_topic_kafka_workload", + "start_kafka_workload", + "start_ctas_workload", "start_workload_log", "start_workload_log_column", "start_workload_log_row", "start_workload_topic", ] @@ -1493,35 +1530,15 @@ def main(): ) stability_cluster.get_state() if action == "start_workload_topic": - def run_topic_workload(node): - node.ssh_command(['rm', '-f', '/tmp/workload_topic.out.log'], raise_on_error=False) - + for node_id, node in enumerate(stability_cluster.kikimr_cluster.nodes.values()): stability_cluster._clean_and_start_workload( node, - 'workload_topic', - ( - f'/Berkanavt/nemesis/bin/ydb_cli --verbose --endpoint grpc://localhost:{node.grpc_port} ' - f'--database /Root/db1 workload topic run full -s 60 --byte-rate 100M --use-tx --tx-commit-interval 2000 -p 100 -c 50' - ), - '/tmp/workload_topic.out.log' + 'topic_workload', + f"""/Berkanavt/nemesis/bin/topic_workload --database /Root/db1 \ + --endpoint grpc://localhost:{node.grpc_port} \ + --topic_prefix topics/topic_{node_id}_ \ + --duration 120""" ) - init_node = list(stability_cluster.kikimr_cluster.nodes.values())[0] - init_node.ssh_command([ - '/Berkanavt/nemesis/bin/ydb_cli', - '--verbose', - '--endpoint', - f'grpc://localhost:{init_node.grpc_port}', - '--database', - '/Root/db1', - 'workload', - 'topic', - 'init', - '-c', - '50', - '-p', - '100'], raise_on_error=False) - with ThreadPoolExecutor() as pool: - pool.map(run_topic_workload, stability_cluster.kikimr_cluster.nodes.values()) stability_cluster.get_state() if action == "start_workload_simple_queue_row": for node_id, node in enumerate(stability_cluster.kikimr_cluster.nodes.values()): @@ -1581,6 +1598,45 @@ def main(): '/Berkanavt/nemesis/bin/s3_backups_workload --database /Root/db1' ) stability_cluster.get_state() + if action == "start_topic_kafka_workload": + for node_id, node in enumerate(stability_cluster.kikimr_cluster.nodes.values()): + stability_cluster._clean_and_start_workload( + node, + 'topic_kafka_workload', + f"""/Berkanavt/nemesis/bin/topic_kafka_workload --database /Root/db1 --topic_prefix topic_kafka/{node_id}_$RANDOM \ + --duration 120 \ + --consumers 2 \ + --consumer-threads 2 \ + --restart-interval 15s \ + --partitions 4 \ + --write-workload 0.01 9000000 2 big_record 1 \ + --write-workload 8000 45 1000 small_record 10 \ + --write-workload 800 409 1 medium_record 10""" + ) + stability_cluster.get_state() + if action == "start_kafka_workload": + for node_id, node in enumerate(stability_cluster.kikimr_cluster.nodes.values()): + stability_cluster._clean_and_start_workload( + node, + 'kafka_workload', + f"""/Berkanavt/nemesis/bin/kafka_workload --database /Root/db1 \ + --endpoint grpc://localhost:{node.grpc_port} \ + --bootstrap http://localhost:11223 \ + --source-path kafka/test-topic-{node_id} \ + --target-path kafka/target-topic-{node_id} \ + --consumer workload-consumer-0 \ + --num-workers 2 \ + --duration 120""" + ) + stability_cluster.get_state() + if action == "start_ctas_workload": + for node_id, node in enumerate(stability_cluster.kikimr_cluster.nodes.values()): + stability_cluster._clean_and_start_workload( + node, + 'ctas_workload', + f'/Berkanavt/nemesis/bin/ctas_workload --database /Root/db1 --path {node_id}' + ) + stability_cluster.get_state() if action == "stop_workloads": stability_cluster.stop_workloads_all_nodes() stability_cluster.get_state() diff --git a/ydb/tests/stability/tool/ya.make b/ydb/tests/stability/tool/ya.make index 349b133c4e4..43b60635daa 100644 --- a/ydb/tests/stability/tool/ya.make +++ b/ydb/tests/stability/tool/ya.make @@ -22,6 +22,10 @@ BUNDLE( ydb/tests/stress/node_broker NAME node_broker_workload ydb/tests/stress/transfer NAME transfer_workload ydb/tests/stress/s3_backups NAME s3_backups_workload + ydb/tests/stress/ctas NAME ctas_workload + ydb/tests/stress/topic_kafka NAME topic_kafka_workload + ydb/tests/stress/kafka NAME kafka_workload + ydb/tests/stress/topic NAME topic_workload ydb/tools/cfg/bin NAME cfg ydb/tests/tools/nemesis/driver NAME nemesis ydb/apps/ydb NAME ydb_cli @@ -36,6 +40,10 @@ RESOURCE( node_broker_workload node_broker_workload transfer_workload transfer_workload s3_backups_workload s3_backups_workload + ctas_workload ctas_workload + topic_kafka_workload topic_kafka_workload + kafka_workload kafka_workload + topic_workload topic_workload cfg cfg nemesis nemesis ydb/tests/stability/resources/tbl_profile.txt tbl_profile.txt diff --git a/ydb/tests/stress/kafka/workload/__init__.py b/ydb/tests/stress/kafka/workload/__init__.py index cbdd9c4511a..89c2752e8e6 100644 --- a/ydb/tests/stress/kafka/workload/__init__.py +++ b/ydb/tests/stress/kafka/workload/__init__.py @@ -30,7 +30,7 @@ class Workload(unittest.TestCase): self._unpack_resource('ydb_cli') def _unpack_resource(self, name): - working_dir = os.path.join(tempfile.gettempdir(), "ydb_cli") + working_dir = os.path.join(tempfile.gettempdir(), "kafka_ydb_cli") self.tmp_dirs.append(working_dir) os.makedirs(working_dir, exist_ok=True) res = resource.find(name) @@ -51,11 +51,16 @@ class Workload(unittest.TestCase): urllib.request.urlretrieve(self.jar_path, TEST_FILES_DIRECTORY + JAR_FILE_NAME) urllib.request.urlretrieve(self.archive_path, TEST_FILES_DIRECTORY + JDK_FILE_NAME) + os.chmod(TEST_FILES_DIRECTORY + JAR_FILE_NAME, 0o777) + os.chmod(TEST_FILES_DIRECTORY + JDK_FILE_NAME, 0o777) tar = tarfile.open(TEST_FILES_DIRECTORY + JDK_FILE_NAME, "r:gz") tar.extractall(path=TEST_FILES_DIRECTORY) tar.close() + os.chmod(TEST_FILES_DIRECTORY + 'lib/server/classes.jsa', 0o777) + os.chmod(TEST_FILES_DIRECTORY + 'lib/server/classes_nocoops.jsa', 0o777) + java_path = TEST_FILES_DIRECTORY + "/bin/java" jar_file_path = TEST_FILES_DIRECTORY + JAR_FILE_NAME diff --git a/ydb/tests/stress/topic/__main__.py b/ydb/tests/stress/topic/__main__.py index 84a358785e1..dec1785931a 100644 --- a/ydb/tests/stress/topic/__main__.py +++ b/ydb/tests/stress/topic/__main__.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- import argparse import logging -from ydb.tests.stress.topic.workload.workload_topic import YdbTopicWorkload +from ydb.tests.stress.topic.workload import YdbTopicWorkload if __name__ == '__main__': parser = argparse.ArgumentParser( diff --git a/ydb/tests/stress/topic/workload/workload_topic.py b/ydb/tests/stress/topic/workload/__init__.py index 4249ef5cb01..676f92e96e1 100644 --- a/ydb/tests/stress/topic/workload/workload_topic.py +++ b/ydb/tests/stress/topic/workload/__init__.py @@ -3,6 +3,7 @@ import subprocess import tempfile import os import stat +import time from library.python import resource @@ -27,7 +28,7 @@ class YdbTopicWorkload(WorkloadBase): def _unpack_resource(self, name): self.tempdir = tempfile.TemporaryDirectory(dir=os.getcwd()) - self.working_dir = os.path.join(self.tempdir.name, "ydb_cli") + self.working_dir = os.path.join(self.tempdir.name, "topic_ydb_cli") os.makedirs(self.working_dir, exist_ok=True) res = resource.find(name) path_to_unpack = os.path.join(self.working_dir, name) @@ -49,7 +50,9 @@ class YdbTopicWorkload(WorkloadBase): def cmd_run(self, cmd): logger.debug(f"Running cmd {cmd}") + print(f"Running cmd {cmd} at {time.time()}") subprocess.run(cmd, check=True, text=True) + print(f"End at {time.time()}") def __loop(self): # init diff --git a/ydb/tests/stress/topic/workload/ya.make b/ydb/tests/stress/topic/workload/ya.make index fe8f877eb09..bc1f146368d 100644 --- a/ydb/tests/stress/topic/workload/ya.make +++ b/ydb/tests/stress/topic/workload/ya.make @@ -1,7 +1,7 @@ PY3_LIBRARY() PY_SRCS( - workload_topic.py + __init__.py ) BUNDLE( diff --git a/ydb/tests/stress/topic_kafka/__main__.py b/ydb/tests/stress/topic_kafka/__main__.py index f7e33cdf948..ff410cceec5 100644 --- a/ydb/tests/stress/topic_kafka/__main__.py +++ b/ydb/tests/stress/topic_kafka/__main__.py @@ -2,7 +2,7 @@ import argparse import logging import dataclasses -from ydb.tests.stress.topic_kafka.workload.workload_topic import YdbTopicWorkload, WriteProfile, parse_write_profile +from ydb.tests.stress.topic_kafka.workload import YdbTopicWorkload, WriteProfile, parse_write_profile if __name__ == '__main__': parser = argparse.ArgumentParser( diff --git a/ydb/tests/stress/topic_kafka/workload/workload_topic.py b/ydb/tests/stress/topic_kafka/workload/__init__.py index fd2722a2483..0ac845b3d35 100644 --- a/ydb/tests/stress/topic_kafka/workload/workload_topic.py +++ b/ydb/tests/stress/topic_kafka/workload/__init__.py @@ -58,7 +58,7 @@ class YdbTopicWorkload(WorkloadBase): self._unpack_resource('ydb_cli') def _unpack_resource(self, name): - self.working_dir = os.path.join(tempfile.gettempdir(), "ydb_cli") + self.working_dir = os.path.join(tempfile.gettempdir(), "topic_kafka_ydb_cli") os.makedirs(self.working_dir, exist_ok=True) res = resource.find(name) path_to_unpack = os.path.join(self.working_dir, name) diff --git a/ydb/tests/stress/topic_kafka/workload/ya.make b/ydb/tests/stress/topic_kafka/workload/ya.make index fe8f877eb09..bc1f146368d 100644 --- a/ydb/tests/stress/topic_kafka/workload/ya.make +++ b/ydb/tests/stress/topic_kafka/workload/ya.make @@ -1,7 +1,7 @@ PY3_LIBRARY() PY_SRCS( - workload_topic.py + __init__.py ) BUNDLE( |