diff options
author | Daniil Demin <[email protected]> | 2025-06-03 18:48:02 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2025-06-03 15:48:02 +0000 |
commit | 7ae0076cf92f2aa55f0e3742326d81d906cf2d12 (patch) | |
tree | 242bea97ed262b0960397adabb7a6c580871fe54 | |
parent | 1329bba45019533462b542dfe51561db34163c11 (diff) |
stress test of SHOW CREATE VIEW (#19196)
-rw-r--r-- | ydb/tests/stress/show_create/view/__main__.py | 43 | ||||
-rw-r--r-- | ydb/tests/stress/show_create/view/tests/test_workload.py | 44 | ||||
-rw-r--r-- | ydb/tests/stress/show_create/view/tests/ya.make | 32 | ||||
-rw-r--r-- | ydb/tests/stress/show_create/view/workload/__init__.py | 283 | ||||
-rw-r--r-- | ydb/tests/stress/show_create/view/workload/ya.make | 12 | ||||
-rw-r--r-- | ydb/tests/stress/show_create/view/ya.make | 15 | ||||
-rw-r--r-- | ydb/tests/stress/ya.make | 3 |
7 files changed, 431 insertions, 1 deletions
diff --git a/ydb/tests/stress/show_create/view/__main__.py b/ydb/tests/stress/show_create/view/__main__.py new file mode 100644 index 00000000000..a18374a7e0e --- /dev/null +++ b/ydb/tests/stress/show_create/view/__main__.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- +import argparse +import logging +import sys +from ydb.tests.stress.show_create.view.workload import ShowCreateViewWorkload + +if __name__ == "__main__": + text = "SHOW CREATE VIEW Workload Test" + parser = argparse.ArgumentParser(description=text, formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("--endpoint", required=True, help="YDB endpoint (e.g., grpc://localhost:2135)") + parser.add_argument("--database", required=True, help="YDB database path (e.g., /Root or /local)") + parser.add_argument("--duration", type=int, default=60, help="Workload duration in seconds (default: 60)") + parser.add_argument( + "--path-prefix", + default=None, + help="Optional path prefix for tables/views within the database (e.g., my_tests/scv)", + ) + parser.add_argument( + "--log-level", + default="INFO", + choices=["DEBUG", "INFO", "WARNING", "ERROR"], + help="Logging level (default: INFO)", + ) + + args = parser.parse_args() + + logging.basicConfig( + level=getattr(logging, args.log_level.upper()), format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) + + logger = logging.getLogger("ShowCreateViewWorkload") + logger.info(f"Starting SHOW CREATE VIEW workload with args: {args}") + + with ShowCreateViewWorkload(args.endpoint, args.database, args.duration, args.path_prefix) as workload: + workload.loop() + if workload.failed_cycles > 0: + logger.error("Test completed with failures.") + sys.exit(1) + elif workload.successful_cycles == 0: + logger.error("Test was not successful.") + sys.exit(1) + else: + logger.info("Test completed successfully.") diff --git a/ydb/tests/stress/show_create/view/tests/test_workload.py b/ydb/tests/stress/show_create/view/tests/test_workload.py new file mode 100644 index 00000000000..c8d0cd56ce2 --- /dev/null +++ b/ydb/tests/stress/show_create/view/tests/test_workload.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +import os +import pytest +import yatest + +from ydb.tests.library.harness.kikimr_runner import KiKiMR +from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator + + +class TestShowCreateViewWorkload(object): + @classmethod + def setup_class(cls): + cls.cluster = KiKiMR( + KikimrConfigGenerator( + extra_feature_flags={ + "enable_show_create": True, + } + ) + ) + cls.cluster.start() + + @classmethod + def teardown_class(cls): + cls.cluster.stop() + + @pytest.mark.parametrize( + "duration, path_prefix", + [ + (30, None), + (30, "test_scv"), + ], + ) + def test_show_create_view_workload(self, duration, path_prefix): + cmd = [ + yatest.common.binary_path(os.getenv("STRESS_TEST_UTILITY")), + "--endpoint", f"grpc://localhost:{self.cluster.nodes[1].grpc_port}", + "--database", "/Root", + "--duration", str(duration), + ] + + if path_prefix is not None: + cmd.extend(["--path-prefix", path_prefix]) + + yatest.common.execute(cmd, wait=True) diff --git a/ydb/tests/stress/show_create/view/tests/ya.make b/ydb/tests/stress/show_create/view/tests/ya.make new file mode 100644 index 00000000000..2fab51e9ac7 --- /dev/null +++ b/ydb/tests/stress/show_create/view/tests/ya.make @@ -0,0 +1,32 @@ +IF (NOT WITH_VALGRIND) + +PY3TEST() +ENV(YDB_DRIVER_BINARY="ydb/apps/ydbd/ydbd") +ENV(YDB_CLI_BINARY="ydb/apps/ydb/ydb") +ENV(YDB_USE_IN_MEMORY_PDISKS=true) +ENV(STRESS_TEST_UTILITY="ydb/tests/stress/show_create/view/show_create_view") + +TEST_SRCS( + test_workload.py +) + +IF (SANITIZER_TYPE) + REQUIREMENTS(ram:32) +ENDIF() + +SIZE(MEDIUM) + +DEPENDS( + ydb/apps/ydbd + ydb/apps/ydb + ydb/tests/stress/show_create/view +) + +PEERDIR( + ydb/tests/library + ydb/tests/stress/show_create/view/workload +) + +END() + +ENDIF() diff --git a/ydb/tests/stress/show_create/view/workload/__init__.py b/ydb/tests/stress/show_create/view/workload/__init__.py new file mode 100644 index 00000000000..08e197b5d4a --- /dev/null +++ b/ydb/tests/stress/show_create/view/workload/__init__.py @@ -0,0 +1,283 @@ +# -*- coding: utf-8 -*- +import ydb +import logging +import time +import os +import random +import string +import threading + +logger = logging.getLogger("ShowCreateViewWorkload") + + +def get_unique_suffix(k=5): + return "".join(random.choices(string.ascii_lowercase + string.digits, k=k)) + + +class ShowCreateViewWorkload: + def __init__(self, endpoint, database, duration, path_prefix=None): + self.endpoint = endpoint + self.database = database + self.duration = duration + + self.driver_config = ydb.DriverConfig( + self.endpoint, + self.database, + ) + self.driver = ydb.Driver(self.driver_config) + self.driver.wait(timeout=10, fail_fast=True) + logger.info(f"Driver initialized successfully for endpoint {self.endpoint}, database {self.database}") + + self.pool = ydb.QuerySessionPool(self.driver) + + instance_id = get_unique_suffix() + + base_table_name = f"t_{instance_id}" + base_view_name = f"v_{instance_id}" + + if path_prefix: + self.table_path = os.path.join(path_prefix.strip("/"), base_table_name) + self.view_path = os.path.join(path_prefix.strip("/"), base_view_name) + else: + self.table_path = base_table_name + self.view_path = base_view_name + + logger.info(f"Target table relative path: {self.table_path}") + logger.info(f"Target view relative path: {self.view_path}") + + self.successful_cycles = 0 + self.failed_cycles = 0 + self.no_signal_cycles = 0 + + self._stats_lock = threading.Lock() + self._stop_event = threading.Event() + self.overall_workload_error_occurred = False + + def _increment_successful_cycles(self): + with self._stats_lock: + self.successful_cycles += 1 + + def _increment_failed_cycles(self): + with self._stats_lock: + self.failed_cycles += 1 + self.overall_workload_error_occurred = True + + def _increment_no_signal_cycles(self): + with self._stats_lock: + self.no_signal_cycles += 1 + + def _mark_overall_workload_error(self): + with self._stats_lock: + self.overall_workload_error_occurred = True + + def preliminary_setup(self): + logger.info(f"Performing preliminary setup for table `{self.table_path}`...") + + create_table_query = f""" + CREATE TABLE `{self.table_path}` ( + key Int32, + value Utf8, + PRIMARY KEY (key) + ); + """ + self.pool.execute_with_retries(create_table_query) + logger.info(f"Table `{self.table_path}` created.") + + upsert_query = f""" + UPSERT INTO `{self.table_path}` (key, value) VALUES + (1, "one"), (2, "two"), (3, "three"); + """ + self.pool.execute_with_retries(upsert_query) + logger.info(f"Data upserted into `{self.table_path}`.") + + logger.info("Preliminary setup completed.") + + def cleanup_resources(self): + logger.info(f"Cleaning up view `{self.view_path}` and table `{self.table_path}`...") + + try: + drop_view_query = f"DROP VIEW IF EXISTS `{self.view_path}`;" + self.pool.execute_with_retries(drop_view_query) + logger.info(f"View `{self.view_path}` dropped.") + except Exception as e: + logger.warning(f"Failed to drop view `{self.view_path}` during cleanup: {e}") + + try: + drop_table_query = f"DROP TABLE IF EXISTS `{self.table_path}`;" + self.pool.execute_with_retries(drop_table_query) + logger.info(f"Table `{self.table_path}` dropped.") + except Exception as e: + logger.warning(f"Failed to drop table `{self.table_path}` during cleanup: {e}") + + logger.info("Cleanup finished.") + + def _recreation_loop(self): + thread_name = threading.current_thread().name + logger.info(f"[{thread_name}] started.") + while not self._stop_event.is_set(): + # Drop View + drop_view_query = f"DROP VIEW IF EXISTS `{self.view_path}`;" + self.pool.execute_with_retries(drop_view_query) + logger.debug(f"[{thread_name}] View `{self.view_path}` dropped.") + + # Create View + create_view_query = f""" + CREATE VIEW `{self.view_path}` WITH security_invoker = TRUE AS + SELECT * FROM `{self.table_path}`; + """ + self.pool.execute_with_retries(create_view_query) + logger.debug(f"[{thread_name}] View `{self.view_path}` created.") + + # Let the created view exist for at least some time. + time.sleep(0.2) + + logger.info(f"[{thread_name}] Worker stopped.") + + def _showing_loop(self): + thread_name = threading.current_thread().name + logger.info(f"[{thread_name}] started.") + + while not self._stop_event.is_set(): + select_was_successful = False + select_had_critical_error = False + show_was_successful = False + show_had_critical_error = False + + # Step 1: SELECT * FROM v + try: + select_query = f"SELECT * FROM `{self.view_path}`;" + result_sets = self.pool.execute_with_retries(select_query) + row_count = len(result_sets[0].rows) if result_sets and result_sets[0].rows else 0 + + if row_count == 3: + select_was_successful = True + else: + logger.error( + f"[{thread_name}] SELECT * FROM `{self.view_path}` returned {row_count} rows, expected 3." + ) + select_had_critical_error = True + except ydb.SchemeError: + logger.debug(f"[{thread_name}] SELECT * FROM `{self.view_path}` failed as expected (view likely gone).") + except Exception as e: + logger.warning(f"[{thread_name}] SELECT * FROM `{self.view_path}` failed with other error: {e}") + select_had_critical_error = True + + # Step 2: SHOW CREATE VIEW v + try: + show_create_query = f"SHOW CREATE VIEW `{self.view_path}`;" + result_sets = self.pool.execute_with_retries(show_create_query) + + if not (result_sets and result_sets[0].rows and result_sets[0].rows[0]): + logger.error(f"[{thread_name}] SHOW CREATE VIEW `{self.view_path}` returned no data.") + show_had_critical_error = True + else: + statement_column_index = -1 + for i, col in enumerate(result_sets[0].columns): + if col.name == "Statement": + statement_column_index = i + break + if statement_column_index == -1: + logger.error( + f"[{thread_name}] Column 'Statement' not found in SHOW CREATE VIEW result for {self.view_path}." + ) + show_had_critical_error = True + else: + create_statement = result_sets[0].rows[0][statement_column_index] + logger.debug(f"[{thread_name}] SHOW CREATE VIEW `{self.view_path}` result:\n{create_statement}") + + expected_view_substr = f"CREATE VIEW `{self.view_path}`" + expected_table_substr = f"FROM\n `{self.table_path}`" + + if not (expected_view_substr in create_statement and expected_table_substr in create_statement): + logger.error( + f"[{thread_name}] VALIDATION FAILED for `{self.view_path}`:\n" + f" Expected view part: '{expected_view_substr}'\n" + f" Expected table part: '{expected_table_substr}'\n" + f" Got:\n{create_statement}" + ) + show_had_critical_error = True + else: + logger.debug(f"[{thread_name}] SHOW CREATE VIEW `{self.view_path}` validated.") + show_was_successful = True + except ydb.SchemeError: + logger.debug( + f"[{thread_name}] SHOW CREATE VIEW `{self.view_path}` failed as expected (view likely gone)." + ) + except Exception as e: + logger.warning(f"[{thread_name}] SHOW CREATE VIEW `{self.view_path}` failed with other error: {e}") + show_had_critical_error = True + + if select_was_successful and show_was_successful: + self._increment_successful_cycles() + elif select_had_critical_error or show_had_critical_error: + self._increment_failed_cycles() + else: + self._increment_no_signal_cycles() + + logger.info(f"[{thread_name}] stopped.") + + def loop(self): + logger.info(f"Starting workload loop for {self.duration} seconds...") + + recreation_thread = threading.Thread(target=self._recreation_loop, name="Recreation-Thread") + showing_thread = threading.Thread(target=self._showing_loop, name="Showing-Thread") + + recreation_thread.start() + showing_thread.start() + + main_start_time = time.time() + while time.time() - main_start_time < self.duration: + if not recreation_thread.is_alive() or not showing_thread.is_alive(): + logger.warning("A worker thread stopped prematurely.") + self._mark_overall_workload_error() + break + time.sleep(0.2) + + logger.info("Duration reached or thread stopped. Signaling stop event.") + self._stop_event.set() + + recreation_thread.join(timeout=5) + showing_thread.join(timeout=5) + + if recreation_thread.is_alive(): + error_msg = "Recreation thread did not join in time." + logger.error(error_msg) + raise RuntimeError(error_msg) + if showing_thread.is_alive(): + error_msg = "Showing thread did not join in time." + logger.error(error_msg) + raise RuntimeError(error_msg) + + logger.info( + f"Final showing loop stats: successful cycles = {self.successful_cycles}, failed cycles = {self.failed_cycles}, no signal cycles = {self.no_signal_cycles}" + ) + + if self.overall_workload_error_occurred: + logger.error("Workload finished with critical errors.") + elif self.failed_cycles > 0: + logger.error(f"Workload finished with {self.failed_cycles} failed showing cycles.") + elif self.successful_cycles == 0: + logger.warning("Workload completed with zero showing cycles attempted or recorded.") + + def __enter__(self): + try: + self.preliminary_setup() + except Exception as e: + logger.critical(f"Preliminary setup failed, cannot start workload: {e}") + try: + self.__exit__(type(e), e, e.__traceback__) + except Exception as exit_e: + logger.error(f"Exception during __exit__ after setup failure: {exit_e}") + raise + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + logger.info("Exiting workload...") + if not self._stop_event.is_set(): + logger.info("Signaling threads to stop during exit.") + self._stop_event.set() + + self.cleanup_resources() + self.pool.stop() + self.driver.stop() + logger.info("Workload stopped.") diff --git a/ydb/tests/stress/show_create/view/workload/ya.make b/ydb/tests/stress/show_create/view/workload/ya.make new file mode 100644 index 00000000000..ce70a4145c4 --- /dev/null +++ b/ydb/tests/stress/show_create/view/workload/ya.make @@ -0,0 +1,12 @@ +PY3_LIBRARY() + +PY_SRCS( + __init__.py +) + +PEERDIR( + ydb/public/sdk/python + ydb/public/sdk/python/enable_v3_new_behavior +) + +END() diff --git a/ydb/tests/stress/show_create/view/ya.make b/ydb/tests/stress/show_create/view/ya.make new file mode 100644 index 00000000000..376e8bcd8f6 --- /dev/null +++ b/ydb/tests/stress/show_create/view/ya.make @@ -0,0 +1,15 @@ +PY3_PROGRAM(show_create_view) + +PY_SRCS( + __main__.py +) + +PEERDIR( + ydb/tests/stress/show_create/view/workload +) + +END() + +RECURSE_FOR_TESTS( + tests +) diff --git a/ydb/tests/stress/ya.make b/ydb/tests/stress/ya.make index 6cd44cce1ec..8a6f79e14d0 100644 --- a/ydb/tests/stress/ya.make +++ b/ydb/tests/stress/ya.make @@ -6,6 +6,7 @@ RECURSE( olap_workload oltp_workload simple_queue + show_create/view statistics_workload transfer -)
\ No newline at end of file +) |