summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.github/config/muted_ya.txt1
-rw-r--r--ydb/core/base/statestorage_impl.h27
-rw-r--r--ydb/core/tx/scheme_board/replica.cpp2
-rw-r--r--ydb/tests/stress/common/common.py15
-rw-r--r--ydb/tests/stress/reconfig_state_storage_workload/__main__.py5
-rw-r--r--ydb/tests/stress/reconfig_state_storage_workload/tests/reconfig_state_storage_workload_test.py7
-rw-r--r--ydb/tests/stress/reconfig_state_storage_workload/workload/__init__.py58
7 files changed, 86 insertions, 29 deletions
diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt
index 2f657134cc5..9cf011a39f5 100644
--- a/.github/config/muted_ya.txt
+++ b/.github/config/muted_ya.txt
@@ -318,6 +318,5 @@ ydb/tests/stress/reconfig_state_storage_workload/tests flake8.sole chunk
ydb/tests/stress/reconfig_state_storage_workload/tests import_test.sole chunk
ydb/tests/stress/reconfig_state_storage_workload/tests py3test.sole chunk
ydb/tests/stress/reconfig_state_storage_workload/tests test_board_workload.py.TestReconfigStateStorageBoardWorkload.test_state_storage_board
-ydb/tests/stress/reconfig_state_storage_workload/tests test_scheme_board_workload.py.TestReconfigSchemeBoardWorkload.test_scheme_board
ydb/tests/stress/reconfig_state_storage_workload/tests test_state_storage_workload.py.TestReconfigStateStorageWorkload.test_state_storage
ydb/tools/stress_tool/ut TDeviceTestTool.PDiskTestLogWrite
diff --git a/ydb/core/base/statestorage_impl.h b/ydb/core/base/statestorage_impl.h
index 8d53e225483..41de77c3332 100644
--- a/ydb/core/base/statestorage_impl.h
+++ b/ydb/core/base/statestorage_impl.h
@@ -1,7 +1,11 @@
#pragma once
+
+#include "statestorage.h"
+
#include <ydb/core/scheme/scheme_pathid.h>
#include <ydb/core/protos/base.pb.h>
-#include "statestorage.h"
+
+#include <util/string/join.h>
namespace NKikimr {
@@ -149,6 +153,15 @@ struct TEvStateStorage::TEvResolveReplicasList : public TEventLocal<TEvResolveRe
TVector<TActorId> Replicas;
bool WriteOnly;
ERingGroupState State;
+
+ TString ToString() const {
+ TStringStream str;
+ str << "{Replicas: [" << JoinSeq(", ", Replicas) << "]"
+ << " WriteOnly: " << WriteOnly
+ << " State: " << static_cast<int>(State)
+ << "}";
+ return str.Str();
+ }
};
TVector<TReplicaGroup> ReplicaGroups;
@@ -168,6 +181,14 @@ struct TEvStateStorage::TEvResolveReplicasList : public TEventLocal<TEvResolveRe
}
return result;
}
+
+ TString ToString() const override {
+ TStringStream str;
+ str << "{EvResolveReplicasList"
+ << " ReplicaGroups: [" << JoinSeq(", ", ReplicaGroups) << "]"
+ << "}";
+ return str.Str();
+ }
};
struct TEvStateStorage::TEvListSchemeBoard : public TEventLocal<TEvListSchemeBoard, EvListSchemeBoard> {
@@ -399,3 +420,7 @@ struct TEvStateStorage::TEvReplicaBoardInfoUpdate : public TEventPB<TEvStateStor
};
}
+
+Y_DECLARE_OUT_SPEC(inline, NKikimr::TEvStateStorage::TEvResolveReplicasList::TReplicaGroup, o, x) {
+ o << x.ToString();
+}
diff --git a/ydb/core/tx/scheme_board/replica.cpp b/ydb/core/tx/scheme_board/replica.cpp
index 98650006d9b..de2fe205711 100644
--- a/ydb/core/tx/scheme_board/replica.cpp
+++ b/ydb/core/tx/scheme_board/replica.cpp
@@ -1267,6 +1267,7 @@ private:
}
void PassAway() override {
+ SBR_LOG_T("PassAway");
for (const auto& [_, info] : Populators) {
if (const auto& actorId = info.PopulatorActor) {
Send(actorId, new TEvStateStorage::TEvReplicaShutdown());
@@ -1286,6 +1287,7 @@ public:
}
void Bootstrap() {
+ SBR_LOG_T("Bootstrap");
TMonitorableActor::Bootstrap();
auto localNodeId = SelfId().NodeId();
auto whiteboardId = NNodeWhiteboard::MakeNodeWhiteboardServiceId(localNodeId);
diff --git a/ydb/tests/stress/common/common.py b/ydb/tests/stress/common/common.py
index 1fdaec45c99..60c1bf13a80 100644
--- a/ydb/tests/stress/common/common.py
+++ b/ydb/tests/stress/common/common.py
@@ -21,7 +21,11 @@ class YdbClient:
def query(self, statement, is_ddl):
if self.use_query_service:
- return self.session_pool.execute_with_retries(statement)
+ try:
+ return self.session_pool.execute_with_retries(statement)
+ except Exception as e:
+ logger.error(f"Error: {e} while executing query: {statement}")
+ raise e
else:
if is_ddl:
return self.session_pool.retry_operation_sync(lambda session: session.execute_scheme(statement))
@@ -30,7 +34,7 @@ class YdbClient:
def drop_table(self, path_to_table):
if self.use_query_service:
- self.session_pool.execute_with_retries(f"DROP TABLE `{path_to_table}`")
+ self.query(f"DROP TABLE `{path_to_table}`", True)
else:
self.session_pool.retry_operation_sync(lambda session: session.drop_table(path_to_table))
@@ -103,6 +107,9 @@ class WorkloadBase:
t.start()
self.workload_threads.append(t)
- def join(self):
+ def join(self, timeout=None):
for t in self.workload_threads:
- t.join()
+ t.join(timeout)
+
+ def is_alive(self):
+ return any(t.is_alive() for t in self.workload_threads)
diff --git a/ydb/tests/stress/reconfig_state_storage_workload/__main__.py b/ydb/tests/stress/reconfig_state_storage_workload/__main__.py
index 1ea900f7f87..fb63c33207d 100644
--- a/ydb/tests/stress/reconfig_state_storage_workload/__main__.py
+++ b/ydb/tests/stress/reconfig_state_storage_workload/__main__.py
@@ -16,6 +16,11 @@ if __name__ == "__main__":
parser.add_argument("--config_name", default="StateStorage", help="Can be StateStorage / StateStorageBoard / SchemeBoard")
parser.add_argument("--duration", default=10 ** 9, type=lambda x: int(x), help="A duration of workload in seconds.")
args = parser.parse_args()
+
+ logging.basicConfig(
+ level=logging.DEBUG,
+ format="%(asctime)s - %(levelname)s - %(name)s:%(lineno)d - %(funcName)s: %(message)s"
+ )
logger = logging.getLogger("reconfig_state_storage_workload")
with WorkloadRunner(args.grpc_endpoint, args.http_endpoint, args.database, args.path, args.duration, args.config_name) as runner:
diff --git a/ydb/tests/stress/reconfig_state_storage_workload/tests/reconfig_state_storage_workload_test.py b/ydb/tests/stress/reconfig_state_storage_workload/tests/reconfig_state_storage_workload_test.py
index 3d7f8788f05..715fc33a62d 100644
--- a/ydb/tests/stress/reconfig_state_storage_workload/tests/reconfig_state_storage_workload_test.py
+++ b/ydb/tests/stress/reconfig_state_storage_workload/tests/reconfig_state_storage_workload_test.py
@@ -26,8 +26,13 @@ class ReconfigStateStorageWorkloadTest(object):
'BOARD_LOOKUP': LogLevels.DEBUG,
'DISCOVERY': LogLevels.DEBUG,
'INTERCONNECT': LogLevels.INFO,
- 'SCHEME_BOARD_SUBSCRIBER': LogLevels.DEBUG,
+ 'FLAT_TX_SCHEMESHARD': LogLevels.DEBUG,
'SCHEME_BOARD_POPULATOR': LogLevels.DEBUG,
+ 'SCHEME_BOARD_REPLICA': LogLevels.DEBUG,
+ 'SCHEME_BOARD_SUBSCRIBER': LogLevels.DEBUG,
+ 'TX_PROXY_SCHEME_CACHE': LogLevels.DEBUG,
+ 'KQP_COMPILE_ACTOR': LogLevels.DEBUG,
+ 'CMS_TENANTS': LogLevels.DEBUG,
# 'STATESTORAGE': LogLevels.DEBUG,
}
))
diff --git a/ydb/tests/stress/reconfig_state_storage_workload/workload/__init__.py b/ydb/tests/stress/reconfig_state_storage_workload/workload/__init__.py
index bec1094c7aa..71d34915af3 100644
--- a/ydb/tests/stress/reconfig_state_storage_workload/workload/__init__.py
+++ b/ydb/tests/stress/reconfig_state_storage_workload/workload/__init__.py
@@ -5,7 +5,6 @@ import random
import threading
import requests
import logging
-import grpc
from enum import Enum
from ydb.tests.stress.common.common import WorkloadBase
@@ -49,11 +48,9 @@ supported_types = supported_pk_types + [
class WorkloadTablesCreateDrop(WorkloadBase):
class TableStatus(Enum):
- CREATING = "Creating",
- AVAILABLE = "Available",
- DELITING = "Deleting"
-
- create_table_canceled_cnt = 0
+ CREATING = "Creating"
+ AVAILABLE = "Available"
+ DELETING = "Deleting"
def __init__(self, client, prefix, stop):
super().__init__(client, prefix, "create_drop", stop)
@@ -78,7 +75,7 @@ class WorkloadTablesCreateDrop(WorkloadBase):
with self.lock:
for n, s in self.tables.items():
if s == WorkloadTablesCreateDrop.TableStatus.AVAILABLE:
- self.tables[n] = WorkloadTablesCreateDrop.TableStatus.DELITING
+ self.tables[n] = WorkloadTablesCreateDrop.TableStatus.DELETING
return n
return None
@@ -103,27 +100,22 @@ class WorkloadTablesCreateDrop(WorkloadBase):
PRIMARY KEY({", ".join(["c" + str(i) for i in range(primary_key_column_n)])})
)
"""
- try:
- self.client.query(stmt, True)
- return
- except Exception as e:
- if e.code() == grpc.StatusCode.CANCELLED:
- self.create_table_canceled_cnt += 1
- if self.create_table_canceled_cnt > 3:
- raise e
- logger.info(f"Create table cancelled {e}")
- else:
- raise e
+ self.client.query(stmt, True)
def _create_tables_loop(self):
+ logger.debug("starting")
while not self.is_stop_requested():
n = self._generate_new_table_n()
self.create_table(str(n))
with self.lock:
self.tables[n] = WorkloadTablesCreateDrop.TableStatus.AVAILABLE
self.created += 1
+ logger.debug(f"iteration {self.created}")
+ with self.lock:
+ logger.debug(f"exiting after {self.created} iterations")
def _delete_tables_loop(self):
+ logger.debug("starting")
while not self.is_stop_requested():
n = self._get_table_to_delete()
if n is None:
@@ -134,11 +126,12 @@ class WorkloadTablesCreateDrop(WorkloadBase):
with self.lock:
del self.tables[n]
self.deleted += 1
+ logger.debug(f"iteration {self.deleted}")
+ with self.lock:
+ logger.debug(f"exiting after {self.deleted} iterations")
def get_workload_thread_funcs(self):
- r = [self._create_tables_loop for x in range(0, 3)]
- r.append(self._delete_tables_loop)
- return r
+ return [self._create_tables_loop] * 3 + [self._delete_tables_loop] * 2
class WorkloadInsertDelete(WorkloadBase):
@@ -154,6 +147,7 @@ class WorkloadInsertDelete(WorkloadBase):
return f"Inserted: {self.inserted}, Current: {self.current}"
def _loop(self):
+ logger.debug("starting")
table_path = self.get_table_path(self.table_name)
self.client.query(
f"""
@@ -167,6 +161,7 @@ class WorkloadInsertDelete(WorkloadBase):
)
i = 1
while not self.is_stop_requested():
+ logger.debug(f"iteration {i}")
self.client.query(
f"""
INSERT INTO `{table_path}` (`id`, `i64Val`)
@@ -198,6 +193,7 @@ class WorkloadInsertDelete(WorkloadBase):
with self.lock:
self.inserted += 2
self.current = actual["cnt"]
+ logger.debug(f"exiting after {i} iterations")
def get_workload_thread_funcs(self):
return [self._loop]
@@ -229,6 +225,7 @@ class WorkloadReconfigStateStorage(WorkloadBase):
return res
def _loop(self):
+ logger.debug("starting")
while not self.is_stop_requested():
time.sleep(self.wait_for)
cfg = self.do_request_config()[f"{self.config_name}Config"]
@@ -264,6 +261,9 @@ class WorkloadReconfigStateStorage(WorkloadBase):
with self.lock:
logger.info(f"Reconfig {self.loop_cnt} finished")
self.loop_cnt += 1
+ logger.debug(f"iteration {self.loop_cnt}")
+ with self.lock:
+ logger.debug(f"exiting after {self.loop_cnt} iterations")
def get_workload_thread_funcs(self):
return [self._loop]
@@ -282,6 +282,7 @@ class WorkloadDiscovery(WorkloadBase):
return f"Discovery: {self.cnt}"
def _loop(self):
+ logger.debug("starting")
driver_config = ydb.DriverConfig(self.grpc_endpoint, self.client.database)
while not self.is_stop_requested():
time.sleep(3)
@@ -293,6 +294,9 @@ class WorkloadDiscovery(WorkloadBase):
logger.info(f"Len = {len(result.endpoints)} Endpoints: {result.endpoints}")
with self.lock:
self.cnt += 1
+ logger.debug(f"iteration {self.cnt}")
+ with self.lock:
+ logger.debug(f"exiting after {self.cnt} iterations")
def get_workload_thread_funcs(self):
return [self._loop]
@@ -344,7 +348,17 @@ class WorkloadRunner:
time.sleep(5)
stop.set()
logger.info("Waiting for stop...")
+ failed_to_stop = []
for w in workloads:
- w.join()
+ logger.debug(f"Waiting for {w.name} to stop...")
+ w.join(timeout=30)
+ if w.is_alive():
+ logger.error(f"Workload {w.name} failed to stop within 30 seconds!")
+ failed_to_stop.append(w.name)
+ else:
+ logger.debug(f"{w.name} stopped")
+
+ if failed_to_stop:
+ raise Exception(f"The following workloads failed to stop: {failed_to_stop}")
logger.info("Waiting for stop... stopped")
return reconfigWorkload.loop_cnt