diff options
7 files changed, 86 insertions, 29 deletions
diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt index 2f657134cc5..9cf011a39f5 100644 --- a/.github/config/muted_ya.txt +++ b/.github/config/muted_ya.txt @@ -318,6 +318,5 @@ ydb/tests/stress/reconfig_state_storage_workload/tests flake8.sole chunk ydb/tests/stress/reconfig_state_storage_workload/tests import_test.sole chunk ydb/tests/stress/reconfig_state_storage_workload/tests py3test.sole chunk ydb/tests/stress/reconfig_state_storage_workload/tests test_board_workload.py.TestReconfigStateStorageBoardWorkload.test_state_storage_board -ydb/tests/stress/reconfig_state_storage_workload/tests test_scheme_board_workload.py.TestReconfigSchemeBoardWorkload.test_scheme_board ydb/tests/stress/reconfig_state_storage_workload/tests test_state_storage_workload.py.TestReconfigStateStorageWorkload.test_state_storage ydb/tools/stress_tool/ut TDeviceTestTool.PDiskTestLogWrite diff --git a/ydb/core/base/statestorage_impl.h b/ydb/core/base/statestorage_impl.h index 8d53e225483..41de77c3332 100644 --- a/ydb/core/base/statestorage_impl.h +++ b/ydb/core/base/statestorage_impl.h @@ -1,7 +1,11 @@ #pragma once + +#include "statestorage.h" + #include <ydb/core/scheme/scheme_pathid.h> #include <ydb/core/protos/base.pb.h> -#include "statestorage.h" + +#include <util/string/join.h> namespace NKikimr { @@ -149,6 +153,15 @@ struct TEvStateStorage::TEvResolveReplicasList : public TEventLocal<TEvResolveRe TVector<TActorId> Replicas; bool WriteOnly; ERingGroupState State; + + TString ToString() const { + TStringStream str; + str << "{Replicas: [" << JoinSeq(", ", Replicas) << "]" + << " WriteOnly: " << WriteOnly + << " State: " << static_cast<int>(State) + << "}"; + return str.Str(); + } }; TVector<TReplicaGroup> ReplicaGroups; @@ -168,6 +181,14 @@ struct TEvStateStorage::TEvResolveReplicasList : public TEventLocal<TEvResolveRe } return result; } + + TString ToString() const override { + TStringStream str; + str << "{EvResolveReplicasList" + << " ReplicaGroups: [" << JoinSeq(", ", ReplicaGroups) << "]" + << "}"; + return str.Str(); + } }; struct TEvStateStorage::TEvListSchemeBoard : public TEventLocal<TEvListSchemeBoard, EvListSchemeBoard> { @@ -399,3 +420,7 @@ struct TEvStateStorage::TEvReplicaBoardInfoUpdate : public TEventPB<TEvStateStor }; } + +Y_DECLARE_OUT_SPEC(inline, NKikimr::TEvStateStorage::TEvResolveReplicasList::TReplicaGroup, o, x) { + o << x.ToString(); +} diff --git a/ydb/core/tx/scheme_board/replica.cpp b/ydb/core/tx/scheme_board/replica.cpp index 98650006d9b..de2fe205711 100644 --- a/ydb/core/tx/scheme_board/replica.cpp +++ b/ydb/core/tx/scheme_board/replica.cpp @@ -1267,6 +1267,7 @@ private: } void PassAway() override { + SBR_LOG_T("PassAway"); for (const auto& [_, info] : Populators) { if (const auto& actorId = info.PopulatorActor) { Send(actorId, new TEvStateStorage::TEvReplicaShutdown()); @@ -1286,6 +1287,7 @@ public: } void Bootstrap() { + SBR_LOG_T("Bootstrap"); TMonitorableActor::Bootstrap(); auto localNodeId = SelfId().NodeId(); auto whiteboardId = NNodeWhiteboard::MakeNodeWhiteboardServiceId(localNodeId); diff --git a/ydb/tests/stress/common/common.py b/ydb/tests/stress/common/common.py index 1fdaec45c99..60c1bf13a80 100644 --- a/ydb/tests/stress/common/common.py +++ b/ydb/tests/stress/common/common.py @@ -21,7 +21,11 @@ class YdbClient: def query(self, statement, is_ddl): if self.use_query_service: - return self.session_pool.execute_with_retries(statement) + try: + return self.session_pool.execute_with_retries(statement) + except Exception as e: + logger.error(f"Error: {e} while executing query: {statement}") + raise e else: if is_ddl: return self.session_pool.retry_operation_sync(lambda session: session.execute_scheme(statement)) @@ -30,7 +34,7 @@ class YdbClient: def drop_table(self, path_to_table): if self.use_query_service: - self.session_pool.execute_with_retries(f"DROP TABLE `{path_to_table}`") + self.query(f"DROP TABLE `{path_to_table}`", True) else: self.session_pool.retry_operation_sync(lambda session: session.drop_table(path_to_table)) @@ -103,6 +107,9 @@ class WorkloadBase: t.start() self.workload_threads.append(t) - def join(self): + def join(self, timeout=None): for t in self.workload_threads: - t.join() + t.join(timeout) + + def is_alive(self): + return any(t.is_alive() for t in self.workload_threads) diff --git a/ydb/tests/stress/reconfig_state_storage_workload/__main__.py b/ydb/tests/stress/reconfig_state_storage_workload/__main__.py index 1ea900f7f87..fb63c33207d 100644 --- a/ydb/tests/stress/reconfig_state_storage_workload/__main__.py +++ b/ydb/tests/stress/reconfig_state_storage_workload/__main__.py @@ -16,6 +16,11 @@ if __name__ == "__main__": parser.add_argument("--config_name", default="StateStorage", help="Can be StateStorage / StateStorageBoard / SchemeBoard") parser.add_argument("--duration", default=10 ** 9, type=lambda x: int(x), help="A duration of workload in seconds.") args = parser.parse_args() + + logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s - %(levelname)s - %(name)s:%(lineno)d - %(funcName)s: %(message)s" + ) logger = logging.getLogger("reconfig_state_storage_workload") with WorkloadRunner(args.grpc_endpoint, args.http_endpoint, args.database, args.path, args.duration, args.config_name) as runner: diff --git a/ydb/tests/stress/reconfig_state_storage_workload/tests/reconfig_state_storage_workload_test.py b/ydb/tests/stress/reconfig_state_storage_workload/tests/reconfig_state_storage_workload_test.py index 3d7f8788f05..715fc33a62d 100644 --- a/ydb/tests/stress/reconfig_state_storage_workload/tests/reconfig_state_storage_workload_test.py +++ b/ydb/tests/stress/reconfig_state_storage_workload/tests/reconfig_state_storage_workload_test.py @@ -26,8 +26,13 @@ class ReconfigStateStorageWorkloadTest(object): 'BOARD_LOOKUP': LogLevels.DEBUG, 'DISCOVERY': LogLevels.DEBUG, 'INTERCONNECT': LogLevels.INFO, - 'SCHEME_BOARD_SUBSCRIBER': LogLevels.DEBUG, + 'FLAT_TX_SCHEMESHARD': LogLevels.DEBUG, 'SCHEME_BOARD_POPULATOR': LogLevels.DEBUG, + 'SCHEME_BOARD_REPLICA': LogLevels.DEBUG, + 'SCHEME_BOARD_SUBSCRIBER': LogLevels.DEBUG, + 'TX_PROXY_SCHEME_CACHE': LogLevels.DEBUG, + 'KQP_COMPILE_ACTOR': LogLevels.DEBUG, + 'CMS_TENANTS': LogLevels.DEBUG, # 'STATESTORAGE': LogLevels.DEBUG, } )) diff --git a/ydb/tests/stress/reconfig_state_storage_workload/workload/__init__.py b/ydb/tests/stress/reconfig_state_storage_workload/workload/__init__.py index bec1094c7aa..71d34915af3 100644 --- a/ydb/tests/stress/reconfig_state_storage_workload/workload/__init__.py +++ b/ydb/tests/stress/reconfig_state_storage_workload/workload/__init__.py @@ -5,7 +5,6 @@ import random import threading import requests import logging -import grpc from enum import Enum from ydb.tests.stress.common.common import WorkloadBase @@ -49,11 +48,9 @@ supported_types = supported_pk_types + [ class WorkloadTablesCreateDrop(WorkloadBase): class TableStatus(Enum): - CREATING = "Creating", - AVAILABLE = "Available", - DELITING = "Deleting" - - create_table_canceled_cnt = 0 + CREATING = "Creating" + AVAILABLE = "Available" + DELETING = "Deleting" def __init__(self, client, prefix, stop): super().__init__(client, prefix, "create_drop", stop) @@ -78,7 +75,7 @@ class WorkloadTablesCreateDrop(WorkloadBase): with self.lock: for n, s in self.tables.items(): if s == WorkloadTablesCreateDrop.TableStatus.AVAILABLE: - self.tables[n] = WorkloadTablesCreateDrop.TableStatus.DELITING + self.tables[n] = WorkloadTablesCreateDrop.TableStatus.DELETING return n return None @@ -103,27 +100,22 @@ class WorkloadTablesCreateDrop(WorkloadBase): PRIMARY KEY({", ".join(["c" + str(i) for i in range(primary_key_column_n)])}) ) """ - try: - self.client.query(stmt, True) - return - except Exception as e: - if e.code() == grpc.StatusCode.CANCELLED: - self.create_table_canceled_cnt += 1 - if self.create_table_canceled_cnt > 3: - raise e - logger.info(f"Create table cancelled {e}") - else: - raise e + self.client.query(stmt, True) def _create_tables_loop(self): + logger.debug("starting") while not self.is_stop_requested(): n = self._generate_new_table_n() self.create_table(str(n)) with self.lock: self.tables[n] = WorkloadTablesCreateDrop.TableStatus.AVAILABLE self.created += 1 + logger.debug(f"iteration {self.created}") + with self.lock: + logger.debug(f"exiting after {self.created} iterations") def _delete_tables_loop(self): + logger.debug("starting") while not self.is_stop_requested(): n = self._get_table_to_delete() if n is None: @@ -134,11 +126,12 @@ class WorkloadTablesCreateDrop(WorkloadBase): with self.lock: del self.tables[n] self.deleted += 1 + logger.debug(f"iteration {self.deleted}") + with self.lock: + logger.debug(f"exiting after {self.deleted} iterations") def get_workload_thread_funcs(self): - r = [self._create_tables_loop for x in range(0, 3)] - r.append(self._delete_tables_loop) - return r + return [self._create_tables_loop] * 3 + [self._delete_tables_loop] * 2 class WorkloadInsertDelete(WorkloadBase): @@ -154,6 +147,7 @@ class WorkloadInsertDelete(WorkloadBase): return f"Inserted: {self.inserted}, Current: {self.current}" def _loop(self): + logger.debug("starting") table_path = self.get_table_path(self.table_name) self.client.query( f""" @@ -167,6 +161,7 @@ class WorkloadInsertDelete(WorkloadBase): ) i = 1 while not self.is_stop_requested(): + logger.debug(f"iteration {i}") self.client.query( f""" INSERT INTO `{table_path}` (`id`, `i64Val`) @@ -198,6 +193,7 @@ class WorkloadInsertDelete(WorkloadBase): with self.lock: self.inserted += 2 self.current = actual["cnt"] + logger.debug(f"exiting after {i} iterations") def get_workload_thread_funcs(self): return [self._loop] @@ -229,6 +225,7 @@ class WorkloadReconfigStateStorage(WorkloadBase): return res def _loop(self): + logger.debug("starting") while not self.is_stop_requested(): time.sleep(self.wait_for) cfg = self.do_request_config()[f"{self.config_name}Config"] @@ -264,6 +261,9 @@ class WorkloadReconfigStateStorage(WorkloadBase): with self.lock: logger.info(f"Reconfig {self.loop_cnt} finished") self.loop_cnt += 1 + logger.debug(f"iteration {self.loop_cnt}") + with self.lock: + logger.debug(f"exiting after {self.loop_cnt} iterations") def get_workload_thread_funcs(self): return [self._loop] @@ -282,6 +282,7 @@ class WorkloadDiscovery(WorkloadBase): return f"Discovery: {self.cnt}" def _loop(self): + logger.debug("starting") driver_config = ydb.DriverConfig(self.grpc_endpoint, self.client.database) while not self.is_stop_requested(): time.sleep(3) @@ -293,6 +294,9 @@ class WorkloadDiscovery(WorkloadBase): logger.info(f"Len = {len(result.endpoints)} Endpoints: {result.endpoints}") with self.lock: self.cnt += 1 + logger.debug(f"iteration {self.cnt}") + with self.lock: + logger.debug(f"exiting after {self.cnt} iterations") def get_workload_thread_funcs(self): return [self._loop] @@ -344,7 +348,17 @@ class WorkloadRunner: time.sleep(5) stop.set() logger.info("Waiting for stop...") + failed_to_stop = [] for w in workloads: - w.join() + logger.debug(f"Waiting for {w.name} to stop...") + w.join(timeout=30) + if w.is_alive(): + logger.error(f"Workload {w.name} failed to stop within 30 seconds!") + failed_to_stop.append(w.name) + else: + logger.debug(f"{w.name} stopped") + + if failed_to_stop: + raise Exception(f"The following workloads failed to stop: {failed_to_stop}") logger.info("Waiting for stop... stopped") return reconfigWorkload.loop_cnt |
