aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPavel <pefavel@ydb.tech>2025-07-30 12:17:31 +0300
committerGitHub <noreply@github.com>2025-07-30 11:17:31 +0200
commit2f87b776a601db60d6d1b7484897545d96fa3112 (patch)
tree45dd723755f5765a116437a6418cb5d33b3fdc67
parent1cabcac91c47bc5fa7caaa2468a4bee19664dc35 (diff)
downloadydb-2f87b776a601db60d6d1b7484897545d96fa3112.tar.gz
Add workload topic to Nemesis toolset (#21871)
-rw-r--r--ydb/tests/olap/load/README.md6
-rw-r--r--ydb/tests/olap/load/lib/workload_topic.py46
-rw-r--r--ydb/tests/olap/load/lib/ya.make1
-rw-r--r--ydb/tests/olap/load/test_workload_topic.py1
-rw-r--r--ydb/tests/olap/load/ya.make3
-rw-r--r--ydb/tests/stress/log/__main__.py29
-rw-r--r--ydb/tests/stress/log/tests/test_workload.py61
-rw-r--r--ydb/tests/stress/log/tests/ya.make2
-rw-r--r--ydb/tests/stress/log/workload/workload_log.py98
-rw-r--r--ydb/tests/stress/log/workload/ya.make19
-rw-r--r--ydb/tests/stress/log/ya.make16
-rw-r--r--ydb/tests/stress/topic/__main__.py31
-rw-r--r--ydb/tests/stress/topic/tests/test_workload_topic.py22
-rw-r--r--ydb/tests/stress/topic/tests/ya.make24
-rw-r--r--ydb/tests/stress/topic/workload/workload_topic.py65
-rw-r--r--ydb/tests/stress/topic/workload/ya.make18
-rw-r--r--ydb/tests/stress/topic/ya.make16
17 files changed, 402 insertions, 56 deletions
diff --git a/ydb/tests/olap/load/README.md b/ydb/tests/olap/load/README.md
index 4521af1a0e1..d1b1d2f99cf 100644
--- a/ydb/tests/olap/load/README.md
+++ b/ydb/tests/olap/load/README.md
@@ -94,6 +94,12 @@ from .lib.workload_my_workload import TestMyWorkload
# Тест автоматически будет импортирован и запущен
```
+### 3. Добавление новго файла в ya.make
+
+Добавить новые файлы с тестами и путь до ворклоада в `ydb/tests/olap/load/ya.make` и в `ydb/tests/olap/load/lib/ya.make`
+
+
+
---
## Setup и Teardown
diff --git a/ydb/tests/olap/load/lib/workload_topic.py b/ydb/tests/olap/load/lib/workload_topic.py
new file mode 100644
index 00000000000..f710408b1ce
--- /dev/null
+++ b/ydb/tests/olap/load/lib/workload_topic.py
@@ -0,0 +1,46 @@
+import time
+import pytest
+from .workload_executor import WorkloadTestBase
+from ydb.tests.olap.lib.ydb_cluster import YdbCluster
+from ydb.tests.olap.lib.utils import get_external_param
+
+import logging
+LOGGER = logging.getLogger(__name__)
+
+
+class WorkloadTopicBase(WorkloadTestBase):
+ workload_binary_name = 'workload_topic'
+ workload_env_var = 'TOPIC_WORKLOAD_BINARY'
+
+ @pytest.mark.parametrize(
+ 'nemesis_enabled', [True, False],
+ ids=['nemesis_true', 'nemesis_false']
+ )
+ def test_workload_topic(self, nemesis_enabled: bool):
+ command_args_template = (
+ "--endpoint grpc://{node_host}:2135 "
+ f"--database /{YdbCluster.ydb_database} "
+ "--topic_prefix workload_topic_{node_host}_iter_{iteration_num}_{uuid} "
+ )
+
+ additional_stats = {
+ "workload_type": "topic",
+ "topic_template": "workload_topic_{node_host}_iter_{iteration_num}_{uuid}",
+ "nemesis": nemesis_enabled,
+ "test_timestamp": int(time.time()),
+ }
+
+ self.execute_workload_test(
+ workload_name=f"TopicWorkload_nemesis_{nemesis_enabled}",
+ command_args=command_args_template,
+ duration_value=self.timeout,
+ additional_stats=additional_stats,
+ use_chunks=True,
+ duration_param="--duration",
+ nemesis=nemesis_enabled,
+ nodes_percentage=100
+ )
+
+
+class TestWorkloadTopic(WorkloadTopicBase):
+ timeout = int(get_external_param('workload_duration', 120))
diff --git a/ydb/tests/olap/load/lib/ya.make b/ydb/tests/olap/load/lib/ya.make
index 9e5850cec14..0c9202379a5 100644
--- a/ydb/tests/olap/load/lib/ya.make
+++ b/ydb/tests/olap/load/lib/ya.make
@@ -10,6 +10,7 @@ PY3_LIBRARY()
workload_executor.py
workload_simple_queue.py
workload_oltp.py
+ workload_topic.py
upload.py
)
diff --git a/ydb/tests/olap/load/test_workload_topic.py b/ydb/tests/olap/load/test_workload_topic.py
new file mode 100644
index 00000000000..352db314749
--- /dev/null
+++ b/ydb/tests/olap/load/test_workload_topic.py
@@ -0,0 +1 @@
+from ydb.tests.olap.load.lib.workload_topic import * # noqa
diff --git a/ydb/tests/olap/load/ya.make b/ydb/tests/olap/load/ya.make
index 69d81d660df..5646fa437b1 100644
--- a/ydb/tests/olap/load/ya.make
+++ b/ydb/tests/olap/load/ya.make
@@ -7,6 +7,7 @@ PY3TEST()
ENV(YDB_CLI_BINARY="ydb/apps/ydb/ydb")
ENV(SIMPLE_QUEUE_BINARY="ydb/tests/stress/simple_queue/simple_queue")
ENV(OLTP_WORKLOAD_BINARY="ydb/tests/stress/oltp_workload/oltp_workload")
+ ENV(TOPIC_WORKLOAD_BINARY="ydb/tests/stress/topic/workload_topic")
ENV(NEMESIS_BINARY="ydb/tests/tools/nemesis/driver/nemesis")
TEST_SRCS (
@@ -18,6 +19,7 @@ PY3TEST()
test_upload.py
test_workload_simple_queue.py
test_workload_oltp.py
+ test_workload_topic.py
)
PEERDIR (
@@ -28,6 +30,7 @@ PY3TEST()
DEPENDS (
ydb/apps/ydb
ydb/tests/stress/simple_queue
+ ydb/tests/stress/topic
ydb/tests/stress/oltp_workload
ydb/tests/tools/nemesis/driver
)
diff --git a/ydb/tests/stress/log/__main__.py b/ydb/tests/stress/log/__main__.py
new file mode 100644
index 00000000000..a1740ef2a28
--- /dev/null
+++ b/ydb/tests/stress/log/__main__.py
@@ -0,0 +1,29 @@
+# -*- coding: utf-8 -*-
+import argparse
+import logging
+from ydb.tests.stress.log.workload.workload_log import YdbLogWorkload
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser(
+ description="Workload log wrapper", formatter_class=argparse.RawDescriptionHelpFormatter
+ )
+ parser.add_argument('--endpoint', default='grpc://localhost:2135', help="YDB endpoint")
+ parser.add_argument('--database', default=None, required=True, help='A database to connect')
+ parser.add_argument('--duration', default=120, type=lambda x: int(x), help='A duration of workload in seconds')
+ parser.add_argument('--store_type', default='row', choices=['row', 'column'], help='Table type either row or column')
+ parser.add_argument('--log_file', default=None, help='Append log into specified file')
+
+ args = parser.parse_args()
+
+ if args.log_file:
+ logging.basicConfig(
+ filename=args.log_file,
+ filemode='a',
+ format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
+ datefmt='%H:%M:%S',
+ level=logging.INFO
+ )
+
+ workload = YdbLogWorkload(args.endpoint, args.database, args.store_type, f'log_{args.store_type}')
+ workload.start()
+ workload.join()
diff --git a/ydb/tests/stress/log/tests/test_workload.py b/ydb/tests/stress/log/tests/test_workload.py
index f12f8338760..0e255d7b9b5 100644
--- a/ydb/tests/stress/log/tests/test_workload.py
+++ b/ydb/tests/stress/log/tests/test_workload.py
@@ -14,59 +14,12 @@ class TestYdbLogWorkload(StressFixture):
'disabled_on_scheme_shard': False,
})
- def get_command_prefix(self, subcmds: list[str], path: str) -> list[str]:
- return [
- yatest.common.binary_path(os.getenv('YDB_CLI_BINARY')),
- '--verbose',
- '--endpoint', self.endpoint,
- '--database={}'.format(self.database),
- 'workload', 'log'
- ] + subcmds + [
- '--path', path
- ]
-
- @classmethod
- def get_insert_command_params(cls) -> list[str]:
- return [
- '--int-cols', '2',
- '--str-cols', '5',
- '--key-cols', '4',
- '--len', '200',
- ]
-
@pytest.mark.parametrize('store_type', ['row', 'column'])
def test(self, store_type):
- upload_commands = [
- # import command
- self.get_command_prefix(subcmds=['import', '--bulk-size', '1000', '-t', '1', 'generator'], path=store_type) + self.get_insert_command_params() + ['--rows', '100000'],
- # bulk upsert workload
- self.get_command_prefix(subcmds=['run', 'bulk_upsert'], path=store_type) + self.get_insert_command_params() + ['--seconds', '10', '--threads', '10'],
-
- # upsert workload
- self.get_command_prefix(subcmds=['run', 'upsert'], path=store_type) + self.get_insert_command_params() + ['--seconds', '10', '--threads', '10'],
-
- # insert workload
- self.get_command_prefix(subcmds=['run', 'insert'], path=store_type) + self.get_insert_command_params() + ['--seconds', '10', '--threads', '10'],
- ]
-
- # init
- yatest.common.execute(
- self.get_command_prefix(subcmds=['init'], path=store_type) + self.get_insert_command_params() + [
- '--store', store_type,
- '--min-partitions', '100',
- '--partition-size', '10',
- '--auto-partition', '0',
- ],
- )
-
- select = yatest.common.execute(
- self.get_command_prefix(subcmds=['run', 'select'], path=store_type) + [
- '--client-timeout', '10000',
- '--threads', '10',
- '--seconds', str(10 * len(upload_commands)),
- ], wait=False)
-
- for command in upload_commands:
- yatest.common.execute(command, wait=True)
-
- select.wait()
+ yatest.common.execute([
+ yatest.common.binary_path(os.environ["YDB_WORKLOAD_PATH"]),
+ "--endpoint", self.endpoint,
+ "--database", self.database,
+ "--store_type", store_type,
+ "--duration", "120",
+ ])
diff --git a/ydb/tests/stress/log/tests/ya.make b/ydb/tests/stress/log/tests/ya.make
index 318764ae911..ca39ab1014b 100644
--- a/ydb/tests/stress/log/tests/ya.make
+++ b/ydb/tests/stress/log/tests/ya.make
@@ -1,6 +1,7 @@
PY3TEST()
INCLUDE(${ARCADIA_ROOT}/ydb/tests/ydbd_dep.inc)
ENV(YDB_CLI_BINARY="ydb/apps/ydb/ydb")
+ENV(YDB_WORKLOAD_PATH="ydb/tests/stress/log/workload_log")
TEST_SRCS(
test_workload.py
@@ -11,6 +12,7 @@ REQUIREMENTS(ram:32)
DEPENDS(
ydb/apps/ydb
+ ydb/tests/stress/log
)
PEERDIR(
diff --git a/ydb/tests/stress/log/workload/workload_log.py b/ydb/tests/stress/log/workload/workload_log.py
new file mode 100644
index 00000000000..e567f4706a0
--- /dev/null
+++ b/ydb/tests/stress/log/workload/workload_log.py
@@ -0,0 +1,98 @@
+from concurrent.futures import ThreadPoolExecutor
+import logging
+import subprocess
+import tempfile
+import os
+import stat
+from library.python import resource
+
+
+from ydb.tests.stress.common.common import WorkloadBase
+
+logger = logging.getLogger("YdbLogWorkload")
+
+
+class YdbLogWorkload(WorkloadBase):
+ def __init__(self, endpoint, database, store_type, tables_prefix):
+ super().__init__(None, tables_prefix, 'log', None)
+ self.store_type = store_type
+ self.endpoint = endpoint
+ self.database = database
+ self._unpack_resource('ydb_cli')
+
+ def _unpack_resource(self, name):
+ self.working_dir = os.path.join(tempfile.gettempdir(), "ydb_cli")
+ os.makedirs(self.working_dir, exist_ok=True)
+ res = resource.find(name)
+ path_to_unpack = os.path.join(self.working_dir, name)
+ with open(path_to_unpack, "wb") as f:
+ f.write(res)
+
+ st = os.stat(path_to_unpack)
+ os.chmod(path_to_unpack, st.st_mode | stat.S_IEXEC)
+ self.cli_path = path_to_unpack
+
+ def get_command_prefix(self, subcmds: list[str], path: str) -> list[str]:
+ return [
+ self.cli_path,
+ '--verbose',
+ '--endpoint', self.endpoint,
+ '--database={}'.format(self.database),
+ 'workload', 'log'
+ ] + subcmds + [
+ '--path', path
+ ]
+
+ def cmd_run(self, cmd):
+ logger.debug(f"Running cmd {cmd}")
+ subprocess.run(cmd, check=True, text=True)
+
+ @classmethod
+ def get_insert_command_params(cls) -> list[str]:
+ return [
+ '--int-cols', '2',
+ '--str-cols', '5',
+ '--key-cols', '4',
+ '--len', '200',
+ ]
+
+ def __loop(self):
+ upload_commands = [
+ # import command
+ self.get_command_prefix(subcmds=['import', '--bulk-size', '1000', '-t', '1', 'generator'], path=self.store_type) + self.get_insert_command_params() + ['--rows', '100000'],
+ # bulk upsert workload
+ self.get_command_prefix(subcmds=['run', 'bulk_upsert'], path=self.store_type) + self.get_insert_command_params() + ['--seconds', '10', '--threads', '10'],
+
+ # upsert workload
+ self.get_command_prefix(subcmds=['run', 'upsert'], path=self.store_type) + self.get_insert_command_params() + ['--seconds', '10', '--threads', '10'],
+
+ # insert workload
+ self.get_command_prefix(subcmds=['run', 'insert'], path=self.store_type) + self.get_insert_command_params() + ['--seconds', '10', '--threads', '10'],
+ ]
+
+ # init
+
+ self.cmd_run(
+ self.get_command_prefix(subcmds=['init'], path=self.store_type) + self.get_insert_command_params() + [
+ '--store', self.store_type,
+ '--min-partitions', '100',
+ '--partition-size', '10',
+ '--auto-partition', '0',
+ ],
+ )
+ with ThreadPoolExecutor() as executor:
+ executor.submit(
+ self.cmd_run,
+ self.get_command_prefix(subcmds=['run', 'select'], path=self.store_type) + [
+ '--client-timeout', '10000',
+ '--threads', '10',
+ '--seconds', str(10 * len(upload_commands)),
+ ]
+ )
+
+ for command in upload_commands:
+ self.cmd_run(command)
+
+ def get_workload_thread_funcs(self):
+ r = [self.__loop]
+ return r
diff --git a/ydb/tests/stress/log/workload/ya.make b/ydb/tests/stress/log/workload/ya.make
new file mode 100644
index 00000000000..cb3d0bce000
--- /dev/null
+++ b/ydb/tests/stress/log/workload/ya.make
@@ -0,0 +1,19 @@
+PY3_LIBRARY()
+
+PY_SRCS(
+ workload_log.py
+)
+
+BUNDLE(
+ ydb/apps/ydb NAME ydb_cli
+)
+RESOURCE(ydb_cli ydb_cli)
+PEERDIR(
+ ydb/tests/stress/common
+ ydb/public/sdk/python
+ library/python/testing/yatest_common
+ library/python/resource
+ ydb/public/sdk/python/enable_v3_new_behavior
+)
+
+END()
diff --git a/ydb/tests/stress/log/ya.make b/ydb/tests/stress/log/ya.make
index d850faba6b2..c66eabc88cb 100644
--- a/ydb/tests/stress/log/ya.make
+++ b/ydb/tests/stress/log/ya.make
@@ -1,4 +1,16 @@
-RECURSE_FOR_TESTS(
- tests
+PY3_PROGRAM(workload_log)
+
+PY_SRCS(
+ __main__.py
+)
+
+PEERDIR(
+ ydb/tests/stress/common
+ ydb/tests/stress/log/workload
)
+END()
+
+RECURSE_FOR_TESTS(
+ tests
+) \ No newline at end of file
diff --git a/ydb/tests/stress/topic/__main__.py b/ydb/tests/stress/topic/__main__.py
new file mode 100644
index 00000000000..84a358785e1
--- /dev/null
+++ b/ydb/tests/stress/topic/__main__.py
@@ -0,0 +1,31 @@
+# -*- coding: utf-8 -*-
+import argparse
+import logging
+from ydb.tests.stress.topic.workload.workload_topic import YdbTopicWorkload
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser(
+ description="Workload topic wrapper", formatter_class=argparse.RawDescriptionHelpFormatter
+ )
+ parser.add_argument('--endpoint', default='grpc://localhost:2135', help="YDB endpoint")
+ parser.add_argument('--database', default=None, required=True, help='A database to connect')
+ parser.add_argument('--duration', default=120, type=lambda x: int(x), help='A duration of workload in seconds')
+ parser.add_argument('--consumers', default=50, type=lambda x: int(x), help='Consumers of the topic')
+ parser.add_argument('--producers', default=100, type=lambda x: int(x), help='Producers of the topic')
+ parser.add_argument('--topic_prefix', default='topic', help='Topic name')
+ parser.add_argument('--log_file', default=None, help='Append log into specified file')
+
+ args = parser.parse_args()
+
+ if args.log_file:
+ logging.basicConfig(
+ filename=args.log_file,
+ filemode='a',
+ format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
+ datefmt='%H:%M:%S',
+ level=logging.INFO
+ )
+
+ workload = YdbTopicWorkload(args.endpoint, args.database, duration=args.duration, consumers=args.consumers, producers=args.producers, tables_prefix=args.topic_prefix)
+ workload.start()
+ workload.join()
diff --git a/ydb/tests/stress/topic/tests/test_workload_topic.py b/ydb/tests/stress/topic/tests/test_workload_topic.py
new file mode 100644
index 00000000000..c116fc31315
--- /dev/null
+++ b/ydb/tests/stress/topic/tests/test_workload_topic.py
@@ -0,0 +1,22 @@
+# -*- coding: utf-8 -*-
+import os
+import pytest
+import yatest
+
+from ydb.tests.library.stress.fixtures import StressFixture
+
+
+class TestYdbTopicWorkload(StressFixture):
+ @pytest.fixture(autouse=True, scope="function")
+ def setup(self):
+ yield from self.setup_cluster()
+
+ def test(self):
+ yatest.common.execute([
+ yatest.common.binary_path(os.environ["YDB_WORKLOAD_PATH"]),
+ "--endpoint", self.endpoint,
+ "--database", self.database,
+ "--duration", "60",
+ "--consumers", "50",
+ "--producers", "100",
+ ])
diff --git a/ydb/tests/stress/topic/tests/ya.make b/ydb/tests/stress/topic/tests/ya.make
new file mode 100644
index 00000000000..9c940a1cfc6
--- /dev/null
+++ b/ydb/tests/stress/topic/tests/ya.make
@@ -0,0 +1,24 @@
+PY3TEST()
+INCLUDE(${ARCADIA_ROOT}/ydb/tests/ydbd_dep.inc)
+ENV(YDB_CLI_BINARY="ydb/apps/ydb/ydb")
+ENV(YDB_WORKLOAD_PATH="ydb/tests/stress/topic/workload_topic")
+
+TEST_SRCS(
+ test_workload_topic.py
+)
+
+SIZE(MEDIUM)
+REQUIREMENTS(ram:32)
+
+DEPENDS(
+ ydb/apps/ydb
+ ydb/tests/stress/topic
+)
+
+PEERDIR(
+ ydb/tests/library
+ ydb/tests/library/stress
+)
+
+
+END()
diff --git a/ydb/tests/stress/topic/workload/workload_topic.py b/ydb/tests/stress/topic/workload/workload_topic.py
new file mode 100644
index 00000000000..47dbe3c63b5
--- /dev/null
+++ b/ydb/tests/stress/topic/workload/workload_topic.py
@@ -0,0 +1,65 @@
+import logging
+import subprocess
+import tempfile
+import os
+import stat
+from library.python import resource
+
+
+from ydb.tests.stress.common.common import WorkloadBase
+
+logger = logging.getLogger("YdbTopicWorkload")
+
+
+class YdbTopicWorkload(WorkloadBase):
+ def __init__(self, endpoint, database, duration, consumers, producers, tables_prefix):
+ super().__init__(None, tables_prefix, 'topic', None)
+ self.endpoint = endpoint
+ self.database = database
+ self.duration = str(duration)
+ self.consumers = str(consumers)
+ self.producers = str(producers)
+ self._unpack_resource('ydb_cli')
+
+ def _unpack_resource(self, name):
+ self.working_dir = os.path.join(tempfile.gettempdir(), "ydb_cli")
+ os.makedirs(self.working_dir, exist_ok=True)
+ res = resource.find(name)
+ path_to_unpack = os.path.join(self.working_dir, name)
+ with open(path_to_unpack, "wb") as f:
+ f.write(res)
+
+ st = os.stat(path_to_unpack)
+ os.chmod(path_to_unpack, st.st_mode | stat.S_IEXEC)
+ self.cli_path = path_to_unpack
+
+ def get_command_prefix(self, subcmds: list[str]) -> list[str]:
+ return [
+ self.cli_path,
+ '--verbose',
+ '--endpoint', self.endpoint,
+ '--database={}'.format(self.database),
+ 'workload', 'topic'
+ ] + subcmds + ['--topic', f'{self.table_prefix}']
+
+ def cmd_run(self, cmd):
+ logger.debug(f"Running cmd {cmd}")
+ subprocess.run(cmd, check=True, text=True)
+
+ def __loop(self):
+ # init
+ self.cmd_run(
+ self.get_command_prefix(subcmds=['init', '-c', self.consumers, '-p', self.producers])
+ )
+ # run
+ self.cmd_run(
+ self.get_command_prefix(subcmds=['run', 'full', '-s', self.duration, '--byte-rate', '100M', '--use-tx', '--tx-commit-interval', '2000', '-p', self.producers, '-c', self.consumers])
+ )
+ # clean
+ self.cmd_run(
+ self.get_command_prefix(subcmds=['clean'])
+ )
+
+ def get_workload_thread_funcs(self):
+ r = [self.__loop]
+ return r
diff --git a/ydb/tests/stress/topic/workload/ya.make b/ydb/tests/stress/topic/workload/ya.make
new file mode 100644
index 00000000000..fe8f877eb09
--- /dev/null
+++ b/ydb/tests/stress/topic/workload/ya.make
@@ -0,0 +1,18 @@
+PY3_LIBRARY()
+
+PY_SRCS(
+ workload_topic.py
+)
+
+BUNDLE(
+ ydb/apps/ydb NAME ydb_cli
+)
+RESOURCE(ydb_cli ydb_cli)
+PEERDIR(
+ ydb/tests/stress/common
+ ydb/public/sdk/python
+ library/python/resource
+ ydb/public/sdk/python/enable_v3_new_behavior
+)
+
+END()
diff --git a/ydb/tests/stress/topic/ya.make b/ydb/tests/stress/topic/ya.make
new file mode 100644
index 00000000000..e08853f3249
--- /dev/null
+++ b/ydb/tests/stress/topic/ya.make
@@ -0,0 +1,16 @@
+PY3_PROGRAM(workload_topic)
+
+PY_SRCS(
+ __main__.py
+)
+
+PEERDIR(
+ ydb/tests/stress/common
+ ydb/tests/stress/topic/workload
+)
+
+END()
+
+RECURSE_FOR_TESTS(
+ tests
+) \ No newline at end of file