summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPavel <[email protected]>2025-08-27 17:37:11 +0300
committerGitHub <[email protected]>2025-08-27 17:37:11 +0300
commitbc5e0001e00ac6770da34981d8408cfa6044ab36 (patch)
tree7f0d96f60004607aa02271fb91c5ddd697fccc24
parent8bc98dd3424ae3f61c5fadff471ad5a37c41c8e8 (diff)
feat: added new workloads to nemesis tool (#23640)
Added ctas, kafka, topic-kafka workloads Refs: #23483
-rw-r--r--ydb/tests/stability/tool/__main__.py112
-rw-r--r--ydb/tests/stability/tool/ya.make8
-rw-r--r--ydb/tests/stress/kafka/workload/__init__.py7
-rw-r--r--ydb/tests/stress/topic/__main__.py2
-rw-r--r--ydb/tests/stress/topic/workload/__init__.py (renamed from ydb/tests/stress/topic/workload/workload_topic.py)5
-rw-r--r--ydb/tests/stress/topic/workload/ya.make2
-rw-r--r--ydb/tests/stress/topic_kafka/__main__.py2
-rw-r--r--ydb/tests/stress/topic_kafka/workload/__init__.py (renamed from ydb/tests/stress/topic_kafka/workload/workload_topic.py)2
-rw-r--r--ydb/tests/stress/topic_kafka/workload/ya.make2
9 files changed, 107 insertions, 35 deletions
diff --git a/ydb/tests/stability/tool/__main__.py b/ydb/tests/stability/tool/__main__.py
index eb939a34f7c..126c6e9c9e3 100644
--- a/ydb/tests/stability/tool/__main__.py
+++ b/ydb/tests/stability/tool/__main__.py
@@ -145,12 +145,36 @@ DICT_OF_PROCESSES = {
},
'workload_topic' : {
'status' : """
- if ps aux | grep -E "/Berkanavt/nemesis/bin/ydb_cli.*workload.*topic.*run.*|/tmp/workload_topic.sh" | grep -v grep > /dev/null; then
+ if ps aux | grep -E "/Berkanavt/nemesis/bin/topic_workload|/tmp/topic_workload.sh" | grep -v grep > /dev/null; then
echo "Running"
else
echo "Stopped"
fi"""
- }
+ },
+ 'workload_ctas' : {
+ 'status' : """
+ if ps aux | grep -E "/Berkanavt/nemesis/bin/ctas_workload|/tmp/ctas_workload.sh" | grep -v grep > /dev/null; then
+ echo "Running"
+ else
+ echo "Stopped"
+ fi"""
+ },
+ 'workload_topic_kafka' : {
+ 'status' : """
+ if ps aux | grep -E "/Berkanavt/nemesis/bin/topic_kafka_workload|/tmp/topic_kafka_workload.sh" | grep -v grep > /dev/null; then
+ echo "Running"
+ else
+ echo "Stopped"
+ fi"""
+ },
+ 'workload_kafka' : {
+ 'status' : """
+ if ps aux | grep -E "/Berkanavt/nemesis/bin/kafka_workload|/tmp/kafka_workload.sh" | grep -v grep > /dev/null; then
+ echo "Running"
+ else
+ echo "Stopped"
+ fi"""
+ },
}
@@ -187,6 +211,9 @@ class CustomArgumentParser(argparse.ArgumentParser):
"start_workload_node_broker_workload": "Start Node Broker workload",
"start_workload_transfer_workload": "Start topic to table transfer workload",
"start_workload_s3_backups_workload": "Start auto removal of tmp tables workload",
+ "start_topic_kafka_workload": "start_topic_kafka_workload",
+ "start_kafka_workload": "start_kafka_workload",
+ "start_ctas_workload": "start_ctas_workload",
"start_workload_log": "Start log workloads with both row and column storage",
"start_workload_log_column": "Start log workload with column storage",
"start_workload_log_row": "Start log workload with row storage",
@@ -242,6 +269,10 @@ class StabilityCluster:
self._unpack_resource('transfer_workload'),
self._unpack_resource('s3_backups_workload'),
self._unpack_resource('statistics_workload'),
+ self._unpack_resource('ctas_workload'),
+ self._unpack_resource('topic_kafka_workload'),
+ self._unpack_resource('kafka_workload'),
+ self._unpack_resource('topic_workload'),
self._unpack_resource('ydb_cli'),
)
@@ -1251,6 +1282,9 @@ Common usage scenarios:
"start_workload_node_broker_workload": "Start Node Broker workload",
"start_workload_transfer_workload": "Start topic to table transfer workload",
"start_workload_s3_backups_workload": "Start auto removal of tmp tables workload",
+ "start_topic_kafka_workload": "Start auto removal of tmp tables workload",
+ "start_kafka_workload": "Start auto removal of tmp tables workload",
+ "start_ctas_workload": "Start auto removal of tmp tables workload",
"start_workload_log": "Start log workloads with both row and column storage",
"start_workload_log_column": "Start log workload with column storage",
"start_workload_log_row": "Start log workload with row storage",
@@ -1284,6 +1318,9 @@ Common usage scenarios:
"start_workload_node_broker_workload",
"start_workload_transfer_workload",
"start_workload_s3_backups_workload",
+ "start_topic_kafka_workload",
+ "start_kafka_workload",
+ "start_ctas_workload",
"start_workload_log", "start_workload_log_column", "start_workload_log_row",
"start_workload_topic",
]
@@ -1493,35 +1530,15 @@ def main():
)
stability_cluster.get_state()
if action == "start_workload_topic":
- def run_topic_workload(node):
- node.ssh_command(['rm', '-f', '/tmp/workload_topic.out.log'], raise_on_error=False)
-
+ for node_id, node in enumerate(stability_cluster.kikimr_cluster.nodes.values()):
stability_cluster._clean_and_start_workload(
node,
- 'workload_topic',
- (
- f'/Berkanavt/nemesis/bin/ydb_cli --verbose --endpoint grpc://localhost:{node.grpc_port} '
- f'--database /Root/db1 workload topic run full -s 60 --byte-rate 100M --use-tx --tx-commit-interval 2000 -p 100 -c 50'
- ),
- '/tmp/workload_topic.out.log'
+ 'topic_workload',
+ f"""/Berkanavt/nemesis/bin/topic_workload --database /Root/db1 \
+ --endpoint grpc://localhost:{node.grpc_port} \
+ --topic_prefix topics/topic_{node_id}_ \
+ --duration 120"""
)
- init_node = list(stability_cluster.kikimr_cluster.nodes.values())[0]
- init_node.ssh_command([
- '/Berkanavt/nemesis/bin/ydb_cli',
- '--verbose',
- '--endpoint',
- f'grpc://localhost:{init_node.grpc_port}',
- '--database',
- '/Root/db1',
- 'workload',
- 'topic',
- 'init',
- '-c',
- '50',
- '-p',
- '100'], raise_on_error=False)
- with ThreadPoolExecutor() as pool:
- pool.map(run_topic_workload, stability_cluster.kikimr_cluster.nodes.values())
stability_cluster.get_state()
if action == "start_workload_simple_queue_row":
for node_id, node in enumerate(stability_cluster.kikimr_cluster.nodes.values()):
@@ -1581,6 +1598,45 @@ def main():
'/Berkanavt/nemesis/bin/s3_backups_workload --database /Root/db1'
)
stability_cluster.get_state()
+ if action == "start_topic_kafka_workload":
+ for node_id, node in enumerate(stability_cluster.kikimr_cluster.nodes.values()):
+ stability_cluster._clean_and_start_workload(
+ node,
+ 'topic_kafka_workload',
+ f"""/Berkanavt/nemesis/bin/topic_kafka_workload --database /Root/db1 --topic_prefix topic_kafka/{node_id}_$RANDOM \
+ --duration 120 \
+ --consumers 2 \
+ --consumer-threads 2 \
+ --restart-interval 15s \
+ --partitions 4 \
+ --write-workload 0.01 9000000 2 big_record 1 \
+ --write-workload 8000 45 1000 small_record 10 \
+ --write-workload 800 409 1 medium_record 10"""
+ )
+ stability_cluster.get_state()
+ if action == "start_kafka_workload":
+ for node_id, node in enumerate(stability_cluster.kikimr_cluster.nodes.values()):
+ stability_cluster._clean_and_start_workload(
+ node,
+ 'kafka_workload',
+ f"""/Berkanavt/nemesis/bin/kafka_workload --database /Root/db1 \
+ --endpoint grpc://localhost:{node.grpc_port} \
+ --bootstrap http://localhost:11223 \
+ --source-path kafka/test-topic-{node_id} \
+ --target-path kafka/target-topic-{node_id} \
+ --consumer workload-consumer-0 \
+ --num-workers 2 \
+ --duration 120"""
+ )
+ stability_cluster.get_state()
+ if action == "start_ctas_workload":
+ for node_id, node in enumerate(stability_cluster.kikimr_cluster.nodes.values()):
+ stability_cluster._clean_and_start_workload(
+ node,
+ 'ctas_workload',
+ f'/Berkanavt/nemesis/bin/ctas_workload --database /Root/db1 --path {node_id}'
+ )
+ stability_cluster.get_state()
if action == "stop_workloads":
stability_cluster.stop_workloads_all_nodes()
stability_cluster.get_state()
diff --git a/ydb/tests/stability/tool/ya.make b/ydb/tests/stability/tool/ya.make
index 349b133c4e4..43b60635daa 100644
--- a/ydb/tests/stability/tool/ya.make
+++ b/ydb/tests/stability/tool/ya.make
@@ -22,6 +22,10 @@ BUNDLE(
ydb/tests/stress/node_broker NAME node_broker_workload
ydb/tests/stress/transfer NAME transfer_workload
ydb/tests/stress/s3_backups NAME s3_backups_workload
+ ydb/tests/stress/ctas NAME ctas_workload
+ ydb/tests/stress/topic_kafka NAME topic_kafka_workload
+ ydb/tests/stress/kafka NAME kafka_workload
+ ydb/tests/stress/topic NAME topic_workload
ydb/tools/cfg/bin NAME cfg
ydb/tests/tools/nemesis/driver NAME nemesis
ydb/apps/ydb NAME ydb_cli
@@ -36,6 +40,10 @@ RESOURCE(
node_broker_workload node_broker_workload
transfer_workload transfer_workload
s3_backups_workload s3_backups_workload
+ ctas_workload ctas_workload
+ topic_kafka_workload topic_kafka_workload
+ kafka_workload kafka_workload
+ topic_workload topic_workload
cfg cfg
nemesis nemesis
ydb/tests/stability/resources/tbl_profile.txt tbl_profile.txt
diff --git a/ydb/tests/stress/kafka/workload/__init__.py b/ydb/tests/stress/kafka/workload/__init__.py
index cbdd9c4511a..89c2752e8e6 100644
--- a/ydb/tests/stress/kafka/workload/__init__.py
+++ b/ydb/tests/stress/kafka/workload/__init__.py
@@ -30,7 +30,7 @@ class Workload(unittest.TestCase):
self._unpack_resource('ydb_cli')
def _unpack_resource(self, name):
- working_dir = os.path.join(tempfile.gettempdir(), "ydb_cli")
+ working_dir = os.path.join(tempfile.gettempdir(), "kafka_ydb_cli")
self.tmp_dirs.append(working_dir)
os.makedirs(working_dir, exist_ok=True)
res = resource.find(name)
@@ -51,11 +51,16 @@ class Workload(unittest.TestCase):
urllib.request.urlretrieve(self.jar_path, TEST_FILES_DIRECTORY + JAR_FILE_NAME)
urllib.request.urlretrieve(self.archive_path, TEST_FILES_DIRECTORY + JDK_FILE_NAME)
+ os.chmod(TEST_FILES_DIRECTORY + JAR_FILE_NAME, 0o777)
+ os.chmod(TEST_FILES_DIRECTORY + JDK_FILE_NAME, 0o777)
tar = tarfile.open(TEST_FILES_DIRECTORY + JDK_FILE_NAME, "r:gz")
tar.extractall(path=TEST_FILES_DIRECTORY)
tar.close()
+ os.chmod(TEST_FILES_DIRECTORY + 'lib/server/classes.jsa', 0o777)
+ os.chmod(TEST_FILES_DIRECTORY + 'lib/server/classes_nocoops.jsa', 0o777)
+
java_path = TEST_FILES_DIRECTORY + "/bin/java"
jar_file_path = TEST_FILES_DIRECTORY + JAR_FILE_NAME
diff --git a/ydb/tests/stress/topic/__main__.py b/ydb/tests/stress/topic/__main__.py
index 84a358785e1..dec1785931a 100644
--- a/ydb/tests/stress/topic/__main__.py
+++ b/ydb/tests/stress/topic/__main__.py
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
import argparse
import logging
-from ydb.tests.stress.topic.workload.workload_topic import YdbTopicWorkload
+from ydb.tests.stress.topic.workload import YdbTopicWorkload
if __name__ == '__main__':
parser = argparse.ArgumentParser(
diff --git a/ydb/tests/stress/topic/workload/workload_topic.py b/ydb/tests/stress/topic/workload/__init__.py
index 4249ef5cb01..676f92e96e1 100644
--- a/ydb/tests/stress/topic/workload/workload_topic.py
+++ b/ydb/tests/stress/topic/workload/__init__.py
@@ -3,6 +3,7 @@ import subprocess
import tempfile
import os
import stat
+import time
from library.python import resource
@@ -27,7 +28,7 @@ class YdbTopicWorkload(WorkloadBase):
def _unpack_resource(self, name):
self.tempdir = tempfile.TemporaryDirectory(dir=os.getcwd())
- self.working_dir = os.path.join(self.tempdir.name, "ydb_cli")
+ self.working_dir = os.path.join(self.tempdir.name, "topic_ydb_cli")
os.makedirs(self.working_dir, exist_ok=True)
res = resource.find(name)
path_to_unpack = os.path.join(self.working_dir, name)
@@ -49,7 +50,9 @@ class YdbTopicWorkload(WorkloadBase):
def cmd_run(self, cmd):
logger.debug(f"Running cmd {cmd}")
+ print(f"Running cmd {cmd} at {time.time()}")
subprocess.run(cmd, check=True, text=True)
+ print(f"End at {time.time()}")
def __loop(self):
# init
diff --git a/ydb/tests/stress/topic/workload/ya.make b/ydb/tests/stress/topic/workload/ya.make
index fe8f877eb09..bc1f146368d 100644
--- a/ydb/tests/stress/topic/workload/ya.make
+++ b/ydb/tests/stress/topic/workload/ya.make
@@ -1,7 +1,7 @@
PY3_LIBRARY()
PY_SRCS(
- workload_topic.py
+ __init__.py
)
BUNDLE(
diff --git a/ydb/tests/stress/topic_kafka/__main__.py b/ydb/tests/stress/topic_kafka/__main__.py
index f7e33cdf948..ff410cceec5 100644
--- a/ydb/tests/stress/topic_kafka/__main__.py
+++ b/ydb/tests/stress/topic_kafka/__main__.py
@@ -2,7 +2,7 @@
import argparse
import logging
import dataclasses
-from ydb.tests.stress.topic_kafka.workload.workload_topic import YdbTopicWorkload, WriteProfile, parse_write_profile
+from ydb.tests.stress.topic_kafka.workload import YdbTopicWorkload, WriteProfile, parse_write_profile
if __name__ == '__main__':
parser = argparse.ArgumentParser(
diff --git a/ydb/tests/stress/topic_kafka/workload/workload_topic.py b/ydb/tests/stress/topic_kafka/workload/__init__.py
index fd2722a2483..0ac845b3d35 100644
--- a/ydb/tests/stress/topic_kafka/workload/workload_topic.py
+++ b/ydb/tests/stress/topic_kafka/workload/__init__.py
@@ -58,7 +58,7 @@ class YdbTopicWorkload(WorkloadBase):
self._unpack_resource('ydb_cli')
def _unpack_resource(self, name):
- self.working_dir = os.path.join(tempfile.gettempdir(), "ydb_cli")
+ self.working_dir = os.path.join(tempfile.gettempdir(), "topic_kafka_ydb_cli")
os.makedirs(self.working_dir, exist_ok=True)
res = resource.find(name)
path_to_unpack = os.path.join(self.working_dir, name)
diff --git a/ydb/tests/stress/topic_kafka/workload/ya.make b/ydb/tests/stress/topic_kafka/workload/ya.make
index fe8f877eb09..bc1f146368d 100644
--- a/ydb/tests/stress/topic_kafka/workload/ya.make
+++ b/ydb/tests/stress/topic_kafka/workload/ya.make
@@ -1,7 +1,7 @@
PY3_LIBRARY()
PY_SRCS(
- workload_topic.py
+ __init__.py
)
BUNDLE(