diff options
author | t1mursadykov <t1mursadykov@ydb.tech> | 2023-03-31 01:07:58 +0300 |
---|---|---|
committer | t1mursadykov <t1mursadykov@ydb.tech> | 2023-03-31 01:07:58 +0300 |
commit | ccb2e27952e4fe4baa1c8946c641e56f281aa063 (patch) | |
tree | 84650660dda759e4d30831fecd13722c521f3aff | |
parent | 269dd0c05067655357cc60ff52fbf09a2a9b5dfb (diff) | |
download | ydb-ccb2e27952e4fe4baa1c8946c641e56f281aa063.tar.gz |
Remove O(n^2) while checking nodes
-rw-r--r-- | ydb/core/cms/CMakeLists.darwin-x86_64.txt | 12 | ||||
-rw-r--r-- | ydb/core/cms/CMakeLists.linux-aarch64.txt | 12 | ||||
-rw-r--r-- | ydb/core/cms/CMakeLists.linux-x86_64.txt | 12 | ||||
-rw-r--r-- | ydb/core/cms/CMakeLists.windows-x86_64.txt | 12 | ||||
-rw-r--r-- | ydb/core/cms/cluster_info.cpp | 118 | ||||
-rw-r--r-- | ydb/core/cms/cluster_info.h | 152 | ||||
-rw-r--r-- | ydb/core/cms/cluster_info_ut.cpp | 286 | ||||
-rw-r--r-- | ydb/core/cms/cms.cpp | 164 | ||||
-rw-r--r-- | ydb/core/cms/cms_impl.h | 13 | ||||
-rw-r--r-- | ydb/core/cms/cms_ut_common.cpp | 5 | ||||
-rw-r--r-- | ydb/core/cms/erasure_checkers.cpp | 62 | ||||
-rw-r--r-- | ydb/core/cms/info_collector.cpp | 10 | ||||
-rw-r--r-- | ydb/core/cms/node_checkers.cpp | 131 | ||||
-rw-r--r-- | ydb/core/cms/node_checkers.h | 117 |
14 files changed, 782 insertions, 324 deletions
diff --git a/ydb/core/cms/CMakeLists.darwin-x86_64.txt b/ydb/core/cms/CMakeLists.darwin-x86_64.txt index 1bc9ade2a4..6bcec6f550 100644 --- a/ydb/core/cms/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/cms/CMakeLists.darwin-x86_64.txt @@ -16,6 +16,12 @@ get_built_tool_path( enum_parser ) get_built_tool_path( + TOOL_enum_parser_bin + TOOL_enum_parser_dependency + tools/enum_parser/enum_parser + enum_parser +) +get_built_tool_path( TOOL_rescompiler_bin TOOL_rescompiler_dependency tools/rescompiler/bin @@ -68,6 +74,7 @@ target_sources(ydb-core-cms PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/cms/http.cpp ${CMAKE_SOURCE_DIR}/ydb/core/cms/info_collector.cpp ${CMAKE_SOURCE_DIR}/ydb/core/cms/logger.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/cms/node_checkers.cpp ${CMAKE_SOURCE_DIR}/ydb/core/cms/sentinel.cpp ${CMAKE_SOURCE_DIR}/ydb/core/cms/services.cpp ${CMAKE_SOURCE_DIR}/ydb/core/cms/walle_api_handler.cpp @@ -81,6 +88,11 @@ generate_enum_serilization(ydb-core-cms INCLUDE_HEADERS ydb/core/cms/services.h ) +generate_enum_serilization(ydb-core-cms + ${CMAKE_SOURCE_DIR}/ydb/core/cms/node_checkers.h + INCLUDE_HEADERS + ydb/core/cms/node_checkers.h +) add_global_library_for(ydb-core-cms.global ydb-core-cms) target_link_libraries(ydb-core-cms.global PUBLIC diff --git a/ydb/core/cms/CMakeLists.linux-aarch64.txt b/ydb/core/cms/CMakeLists.linux-aarch64.txt index 6b60e1bbb9..1c7e6bbe64 100644 --- a/ydb/core/cms/CMakeLists.linux-aarch64.txt +++ b/ydb/core/cms/CMakeLists.linux-aarch64.txt @@ -16,6 +16,12 @@ get_built_tool_path( enum_parser ) get_built_tool_path( + TOOL_enum_parser_bin + TOOL_enum_parser_dependency + tools/enum_parser/enum_parser + enum_parser +) +get_built_tool_path( TOOL_rescompiler_bin TOOL_rescompiler_dependency tools/rescompiler/bin @@ -69,6 +75,7 @@ target_sources(ydb-core-cms PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/cms/http.cpp ${CMAKE_SOURCE_DIR}/ydb/core/cms/info_collector.cpp ${CMAKE_SOURCE_DIR}/ydb/core/cms/logger.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/cms/node_checkers.cpp ${CMAKE_SOURCE_DIR}/ydb/core/cms/sentinel.cpp ${CMAKE_SOURCE_DIR}/ydb/core/cms/services.cpp ${CMAKE_SOURCE_DIR}/ydb/core/cms/walle_api_handler.cpp @@ -82,6 +89,11 @@ generate_enum_serilization(ydb-core-cms INCLUDE_HEADERS ydb/core/cms/services.h ) +generate_enum_serilization(ydb-core-cms + ${CMAKE_SOURCE_DIR}/ydb/core/cms/node_checkers.h + INCLUDE_HEADERS + ydb/core/cms/node_checkers.h +) add_global_library_for(ydb-core-cms.global ydb-core-cms) target_link_libraries(ydb-core-cms.global PUBLIC diff --git a/ydb/core/cms/CMakeLists.linux-x86_64.txt b/ydb/core/cms/CMakeLists.linux-x86_64.txt index 6b60e1bbb9..1c7e6bbe64 100644 --- a/ydb/core/cms/CMakeLists.linux-x86_64.txt +++ b/ydb/core/cms/CMakeLists.linux-x86_64.txt @@ -16,6 +16,12 @@ get_built_tool_path( enum_parser ) get_built_tool_path( + TOOL_enum_parser_bin + TOOL_enum_parser_dependency + tools/enum_parser/enum_parser + enum_parser +) +get_built_tool_path( TOOL_rescompiler_bin TOOL_rescompiler_dependency tools/rescompiler/bin @@ -69,6 +75,7 @@ target_sources(ydb-core-cms PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/cms/http.cpp ${CMAKE_SOURCE_DIR}/ydb/core/cms/info_collector.cpp ${CMAKE_SOURCE_DIR}/ydb/core/cms/logger.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/cms/node_checkers.cpp ${CMAKE_SOURCE_DIR}/ydb/core/cms/sentinel.cpp ${CMAKE_SOURCE_DIR}/ydb/core/cms/services.cpp ${CMAKE_SOURCE_DIR}/ydb/core/cms/walle_api_handler.cpp @@ -82,6 +89,11 @@ generate_enum_serilization(ydb-core-cms INCLUDE_HEADERS ydb/core/cms/services.h ) +generate_enum_serilization(ydb-core-cms + ${CMAKE_SOURCE_DIR}/ydb/core/cms/node_checkers.h + INCLUDE_HEADERS + ydb/core/cms/node_checkers.h +) add_global_library_for(ydb-core-cms.global ydb-core-cms) target_link_libraries(ydb-core-cms.global PUBLIC diff --git a/ydb/core/cms/CMakeLists.windows-x86_64.txt b/ydb/core/cms/CMakeLists.windows-x86_64.txt index 1bc9ade2a4..6bcec6f550 100644 --- a/ydb/core/cms/CMakeLists.windows-x86_64.txt +++ b/ydb/core/cms/CMakeLists.windows-x86_64.txt @@ -16,6 +16,12 @@ get_built_tool_path( enum_parser ) get_built_tool_path( + TOOL_enum_parser_bin + TOOL_enum_parser_dependency + tools/enum_parser/enum_parser + enum_parser +) +get_built_tool_path( TOOL_rescompiler_bin TOOL_rescompiler_dependency tools/rescompiler/bin @@ -68,6 +74,7 @@ target_sources(ydb-core-cms PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/cms/http.cpp ${CMAKE_SOURCE_DIR}/ydb/core/cms/info_collector.cpp ${CMAKE_SOURCE_DIR}/ydb/core/cms/logger.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/cms/node_checkers.cpp ${CMAKE_SOURCE_DIR}/ydb/core/cms/sentinel.cpp ${CMAKE_SOURCE_DIR}/ydb/core/cms/services.cpp ${CMAKE_SOURCE_DIR}/ydb/core/cms/walle_api_handler.cpp @@ -81,6 +88,11 @@ generate_enum_serilization(ydb-core-cms INCLUDE_HEADERS ydb/core/cms/services.h ) +generate_enum_serilization(ydb-core-cms + ${CMAKE_SOURCE_DIR}/ydb/core/cms/node_checkers.h + INCLUDE_HEADERS + ydb/core/cms/node_checkers.h +) add_global_library_for(ydb-core-cms.global ydb-core-cms) target_link_libraries(ydb-core-cms.global PUBLIC diff --git a/ydb/core/cms/cluster_info.cpp b/ydb/core/cms/cluster_info.cpp index b744e84685..236af9107d 100644 --- a/ydb/core/cms/cluster_info.cpp +++ b/ydb/core/cms/cluster_info.cpp @@ -1,9 +1,17 @@ #include "cluster_info.h" +#include "node_checkers.h" #include "cms_state.h" +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/log.h> + +#include <ydb/core/protos/services.pb.h> +#include <ydb/public/api/protos/draft/ydb_maintenance.pb.h> + +#include <util/datetime/base.h> +#include <util/generic/ptr.h> #include <util/string/builder.h> #include <util/system/hostname.h> -#include <util/datetime/base.h> #if defined BLOG_D || defined BLOG_I || defined BLOG_ERROR #error log macro definition clash @@ -298,9 +306,9 @@ void TVDiskInfo::MigrateOldInfo(const TLockableItem &old) } } -TStateStorageRingInfo::RingState TStateStorageRingInfo::CountState(TInstant now, +TStateStorageRingInfo::RingState TStateStorageRingInfo::CountState(TInstant now, TDuration retryTime, - TDuration duration) const + TDuration duration) const { if (IsDisabled) { return Disabled; @@ -369,7 +377,7 @@ void TClusterInfo::AddNode(const TEvInterconnect::TNodeInfo &info, const TActorC break; } } - + ClusterNodes->AddNode(node->NodeId); HostNameToNodeId.emplace(node->Host, node->NodeId); LockableItems[node->ItemName()] = node; } @@ -384,6 +392,8 @@ void TClusterInfo::SetNodeState(ui32 nodeId, NKikimrCms::EState state, const NKi node.StartTime = TInstant::MilliSeconds(info.GetStartTime()); node.Version = info.GetVersion(); + ClusterNodes->UpdateNode(nodeId, state); + node.Services = TServices(); for (const auto& role : info.GetRoles()) { EService value; @@ -403,6 +413,9 @@ void TClusterInfo::ClearNode(ui32 nodeId) Tablets.erase(tablet); node.Tablets.clear(); node.State = NKikimrCms::DOWN; + node.HasTenantInfo = false; + + ClusterNodes->UpdateNode(node.NodeId, NKikimrCms::DOWN); } void TClusterInfo::ApplyInitialNodeTenants(const TActorContext& ctx, const THashMap<ui32, TString>& nodeTenants) @@ -624,6 +637,50 @@ static TServices MakeServices(const NKikimrCms::TAction &action) { return services; } +void TClusterInfo::ApplyActionWithoutLog(const NKikimrCms::TAction &action) +{ + if (ActionRequiresHost(action) && !HasNode(action.GetHost())) { + return; + } + + switch (action.GetType()) { + case TAction::RESTART_SERVICES: + case TAction::SHUTDOWN_HOST: + if (auto nodes = NodePtrs(action.GetHost(), MakeServices(action))) { + for (const auto node : nodes) { + ClusterNodes->LockNode(node->NodeId); + if (node->Tenant) { + TenantNodesChecker[node->Tenant]->LockNode(node->NodeId); + } + } + } + break; + case TAction::REPLACE_DEVICES: + for (const auto &device : action.GetDevices()) { + if (HasPDisk(device)) { + auto pdisk = &PDiskRef(device); + ClusterNodes->LockNode(pdisk->NodeId); + } else if (HasVDisk(device)) { + auto vdisk = &VDiskRef(device); + ClusterNodes->LockNode(vdisk->NodeId); + } + } + break; + + default: + break; + } +} + +void TClusterInfo::ApplyNodeLimits(ui32 clusterLimit, ui32 clusterRatioLimit, ui32 tenantLimit, ui32 tenantRatioLimit) +{ + ClusterNodes->ApplyLimits(clusterLimit, clusterRatioLimit); + + for (auto &[_, tenantChecker] : TenantNodesChecker) { + tenantChecker->ApplyLimits(tenantLimit, tenantRatioLimit); + } +} + TSet<TLockableItem *> TClusterInfo::FindLockedItems(const NKikimrCms::TAction &action, const TActorContext *ctx) { @@ -708,6 +765,8 @@ ui64 TClusterInfo::AddLocks(const TPermissionInfo &permission, const TActorConte } } + ApplyActionWithoutLog(permission.Action); + return locks; } @@ -716,6 +775,7 @@ ui64 TClusterInfo::AddExternalLocks(const TNotificationInfo ¬ification, const ui64 locks = 0; for (const auto &action : notification.Notification.GetActions()) { auto items = FindLockedItems(action, ctx); + ApplyActionWithoutLog(action); for (auto item : items) { if (ctx) @@ -763,6 +823,8 @@ ui64 TClusterInfo::AddTempLocks(const NKikimrCms::TAction &action, const TActorC { auto items = FindLockedItems(action, ctx); + LogManager.ApplyAction(action, this); + for (auto item : items) item->TempLocks.push_back({RollbackPoint, action}); @@ -810,6 +872,8 @@ void TClusterInfo::RollbackLocks(ui64 point) for (auto &entry : LockableItems) entry.second->RollbackLocks(point); RollbackPoint = point - 1; + + LogManager.RollbackOperations(); } void TClusterInfo::MigrateOldInfo(TClusterInfoPtr old) @@ -824,7 +888,7 @@ void TClusterInfo::MigrateOldInfo(TClusterInfoPtr old) void TClusterInfo::ApplySysTabletsInfo(const NKikimrConfig::TBootstrap& config) { for (ui32 i = 0; i < config.TabletSize(); ++i) { const auto &tablet = config.GetTablet(i); - + for (ui32 j = 0; j < tablet.NodeSize(); ++j) { ui32 nodeId = tablet.GetNode(j); TabletTypeToNodes[tablet.GetType()].push_back(nodeId); @@ -853,6 +917,17 @@ void TClusterInfo::ApplyStateStorageInfo(TIntrusiveConstPtr<TStateStorageInfo> i } } +void TClusterInfo::GenerateTenantNodesCheckers() { + for (auto &[nodeId, nodeInfo] : Nodes) { + if (nodeInfo->Tenant) { + if (!TenantNodesChecker.contains(nodeInfo->Tenant)) + TenantNodesChecker[nodeInfo->Tenant] = TSimpleSharedPtr<TNodesStateBase>(new TTenantState(nodeInfo->Tenant, 0, 0)); + + TenantNodesChecker[nodeInfo->Tenant]->UpdateNode(nodeId, nodeInfo->State); + } + } +} + void TClusterInfo::DebugDump(const TActorContext &ctx) const { LOG_DEBUG_S(ctx, NKikimrServices::CMS, @@ -915,4 +990,37 @@ void TClusterInfo::DebugDump(const TActorContext &ctx) const } } +void TOperationLogManager::ApplyAction(const NKikimrCms::TAction &action, + TClusterInfoPtr clusterState) +{ + switch (action.GetType()) { + case NKikimrCms::TAction::RESTART_SERVICES: + case NKikimrCms::TAction::SHUTDOWN_HOST: + if (auto nodes = clusterState->NodePtrs(action.GetHost(), MakeServices(action))) { + for (const auto node : nodes) { + AddNodeLockOperation(node->NodeId, clusterState->ClusterNodes); + + if (node->Tenant) { + AddNodeLockOperation(node->NodeId, clusterState->TenantNodesChecker[node->Tenant]); + } + } + } + break; + case NKikimrCms::TAction::REPLACE_DEVICES: + for (const auto &device : action.GetDevices()) { + if (clusterState->HasPDisk(device)) { + auto pdisk = &clusterState->PDisk(device); + AddNodeLockOperation(pdisk->NodeId, clusterState->ClusterNodes); + + } else if (clusterState->HasVDisk(device)) { + auto vdisk = &clusterState->VDisk(device); + AddNodeLockOperation(vdisk->NodeId, clusterState->ClusterNodes); + } + } + break; + + default: + break; + } +} } // namespace NKikimr::NCms diff --git a/ydb/core/cms/cluster_info.h b/ydb/core/cms/cluster_info.h index 3eacdb8934..dc5cbe1489 100644 --- a/ydb/core/cms/cluster_info.h +++ b/ydb/core/cms/cluster_info.h @@ -1,24 +1,32 @@ #pragma once #include "defs.h" + #include "config.h" #include "downtime.h" +#include "node_checkers.h" #include "services.h" -#include <library/cpp/actors/interconnect/interconnect.h> -#include <library/cpp/actors/core/actor.h> #include <ydb/core/base/blobstorage.h> #include <ydb/core/base/statestorage.h> -#include <ydb/core/node_whiteboard/node_whiteboard.h> #include <ydb/core/blobstorage/base/blobstorage_vdiskid.h> #include <ydb/core/mind/tenant_pool.h> +#include <ydb/core/node_whiteboard/node_whiteboard.h> #include <ydb/core/protos/cms.pb.h> #include <ydb/core/protos/console.pb.h> +#include <ydb/public/api/protos/draft/ydb_maintenance.pb.h> + +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/interconnect/interconnect.h> + +#include <util/datetime/base.h> #include <util/generic/hash.h> +#include <util/generic/hash_set.h> #include <util/generic/maybe.h> +#include <util/generic/ptr.h> #include <util/generic/set.h> -#include "util/generic/ptr.h" +#include <util/generic/vector.h> namespace NKikimr::NCms { @@ -455,8 +463,8 @@ struct TBSGroupInfo { }; /** - * Structure to hold info and state for a state storage. It helps to - * avoid the situation when we quickly unlock one state stotage node and + * Structure to hold info and state for a state storage. It helps to + * avoid the situation when we quickly unlock one state stotage node and * immediately lock another node from different ring */ class TStateStorageRingInfo : public TThrRefBase { @@ -464,13 +472,13 @@ public: /** * Ok: we can allow to restart nodes; - * - * Locked: all nodes are up. We restarted some nodes before and waiting + * + * Locked: all nodes are up. We restarted some nodes before and waiting * some timeout to allow restart nodes from other ring. * But, we still can restart nodes from this ring; - * - * Disabled: Disabled ring (see state storage config). The ring - * affects permissions of other rings, but this ring + * + * Disabled: Disabled ring (see state storage config). The ring + * affects permissions of other rings, but this ring * can be disabled without considering the others; * * Restart: has some restarting or down nodes. We can still restart @@ -490,7 +498,7 @@ public: TStateStorageRingInfo &operator=(const TStateStorageRingInfo &other) = default; TStateStorageRingInfo &operator=(TStateStorageRingInfo &&other) = default; - + static TString RingStateToString(RingState state) { switch (state) { case Unknown: @@ -519,7 +527,7 @@ public: IsDisabled = true; } - RingState CountState(TInstant now, + RingState CountState(TInstant now, TDuration retryTime, TDuration duration) const; @@ -531,6 +539,103 @@ public: }; using TStateStorageRingInfoPtr = TIntrusivePtr<TStateStorageRingInfo>; + + +enum EOperationType { + OPERATION_TYPE_UNKNOWN = 0, + OPERATION_TYPE_LOCK_DISK = 1, + OPERATION_TYPE_LOCK_NODE = 2, + OPERATION_TYPE_ROLLBACK_POINT = 3, +}; + +class TOperationBase { +public: + const EOperationType Type; + + explicit TOperationBase(EOperationType type) + : Type(type) + { + } + virtual ~TOperationBase() = default; + + virtual void Do() = 0; + virtual void Undo() = 0; +}; + +class TLockNodeOperation : public TOperationBase { +public: + const ui32 NodeId; + +private: + + TSimpleSharedPtr<TNodesStateBase> NodesState; + +public: + + TLockNodeOperation(ui32 nodeId, TSimpleSharedPtr<TNodesStateBase> nodesState) + : TOperationBase(OPERATION_TYPE_LOCK_NODE) + , NodeId(nodeId) + , NodesState(nodesState) + { + } + ~TLockNodeOperation() = default; + + void Do() override final { + NodesState->LockNode(NodeId); + } + + void Undo() override final { + NodesState->UnlockNode(NodeId); + } + +}; + +class TLogRollbackPoint : public TOperationBase { +public: + TLogRollbackPoint() : TOperationBase(OPERATION_TYPE_ROLLBACK_POINT) + { + } + + void Do() override final { + return; + } + + void Undo() override final { + return; + } + +}; + +class TOperationLogManager { +private: + TVector<TSimpleSharedPtr<TOperationBase>> Log; + +public: + void PushRollbackPoint() { + Log.push_back(TSimpleSharedPtr<TOperationBase>(new TLogRollbackPoint())); + } + + void AddNodeLockOperation(ui32 nodeId, TSimpleSharedPtr<TNodesStateBase> nodesState) { + Log.push_back(TSimpleSharedPtr<TOperationBase>(new TLockNodeOperation(nodeId, nodesState))); + Log[Log.size() - 1]->Do(); + } + + void RollbackOperations() { + for (auto operation : Log) { + if (operation->Type == OPERATION_TYPE_ROLLBACK_POINT) { + Log.pop_back(); + break; + } + + operation->Undo(); + Log.pop_back(); + } + } + + void ApplyAction(const NKikimrCms::TAction &action, + TClusterInfoPtr clusterState); +}; + /** * Main class to hold current cluster state. * @@ -548,6 +653,20 @@ public: using TVDisks = THashMap<TVDiskID, TVDiskInfoPtr>; using TBSGroups = THashMap<ui32, TBSGroupInfo>; + using TenantNodesCheckers = THashMap<TString, TSimpleSharedPtr<TNodesStateBase>>; + + friend TOperationLogManager; + + TenantNodesCheckers TenantNodesChecker; + TSimpleSharedPtr<TClusterNodesState> ClusterNodes = TSimpleSharedPtr<TClusterNodesState>(new TClusterNodesState(0, 0)); + + TOperationLogManager LogManager; + TOperationLogManager ScheduledLogManager; + + void ApplyActionToOperationLog(const NKikimrCms::TAction &action); + void ApplyActionWithoutLog(const NKikimrCms::TAction &action); + void ApplyNodeLimits(ui32 clusterLimit, ui32 clusterRatioLimit, ui32 tenantLimit, ui32 tenantRatioLimit); + TClusterInfo() = default; TClusterInfo(const TClusterInfo &other) = default; TClusterInfo(TClusterInfo &&other) = default; @@ -557,14 +676,16 @@ public: void ApplyStateStorageInfo(TIntrusiveConstPtr<TStateStorageInfo> info); + void GenerateTenantNodesCheckers(); + bool IsStateStorageReplicaNode(ui32 nodeId) { return StateStorageReplicas.contains(nodeId); } - + bool IsStateStorageinfoReceived() { return StateStorageInfoReceived; } - + ui32 GetRingId(ui32 nodeId) { Y_VERIFY(IsStateStorageReplicaNode(nodeId)); return StateStorageNodeToRingId[nodeId]; @@ -804,6 +925,7 @@ public: void RollbackLocks(ui64 point); ui64 PushRollbackPoint() { + LogManager.PushRollbackPoint(); return ++RollbackPoint; } diff --git a/ydb/core/cms/cluster_info_ut.cpp b/ydb/core/cms/cluster_info_ut.cpp index 8b9d3b3d65..2e385e988f 100644 --- a/ydb/core/cms/cluster_info_ut.cpp +++ b/ydb/core/cms/cluster_info_ut.cpp @@ -263,153 +263,153 @@ Y_UNIT_TEST_SUITE(TClusterInfoTest) { TEvInterconnect::TNodeInfo node4 = { 3, "::2", "localhost", "localhost", 1, TNodeLocation() }; - TClusterInfo cluster; - cluster.AddNode(node1, nullptr); - UNIT_ASSERT(cluster.HasNode(1)); - UNIT_ASSERT(!cluster.HasNode(2)); - UNIT_ASSERT(cluster.HasNode("test1")); - UNIT_ASSERT(!cluster.HasNode("test2")); - UNIT_ASSERT_VALUES_EQUAL(cluster.HostNodes("test1").size(), 1); - CheckNode(cluster.Node(1), 1, "test1", "::1", EState::UNKNOWN); - - cluster.AddNode(node2, nullptr); - UNIT_ASSERT(cluster.HasNode(1)); - UNIT_ASSERT(!cluster.HasNode("test1")); - UNIT_ASSERT(cluster.HasNode("test2")); - UNIT_ASSERT_VALUES_EQUAL(cluster.HostNodes("test2").size(), 1); - CheckNode(cluster.Node(1), 1, "test2", "::1", EState::UNKNOWN); - - cluster.AddNode(node3, nullptr); - UNIT_ASSERT(cluster.HasNode(2)); - UNIT_ASSERT(cluster.HasNode("localhost")); - UNIT_ASSERT_VALUES_EQUAL(cluster.NodesCount("localhost"), 1); - UNIT_ASSERT_VALUES_EQUAL(cluster.HostNodes("localhost").size(), 1); - UNIT_ASSERT_VALUES_EQUAL(cluster.HostNodes("localhost")[0]->NodeId, 2); - CheckNode(cluster.Node(2), 2, "localhost", "::2", EState::UNKNOWN); - - cluster.AddNode(node4, nullptr); - UNIT_ASSERT(cluster.HasNode(3)); - UNIT_ASSERT_VALUES_EQUAL(cluster.NodesCount("localhost"), 2); - - cluster.AddPDisk(MakePDiskConfig(1, 1)); - cluster.UpdatePDiskState(NCms::TPDiskID(1, 1), MakePDiskInfo(1)); - UNIT_ASSERT(cluster.HasPDisk(NCms::TPDiskID(1, 1))); - UNIT_ASSERT(!cluster.HasPDisk(NCms::TPDiskID(1, 2))); - UNIT_ASSERT(!cluster.HasPDisk(NCms::TPDiskID(2, 1))); - CheckPDisk(cluster.PDisk(NCms::TPDiskID(1, 1)), 1, 1, UP, 0); - UNIT_ASSERT(cluster.Node(1).PDisks.contains(NCms::TPDiskID(1, 1))); - - cluster.AddVDisk(MakeVSlotConfig(1, {0, 1, 0, 0, 0}, 1, 0)); - cluster.UpdateVDiskState({0, 1, 0, 0, 0}, MakeVDiskInfo({0, 1, 0, 0, 0}, 1, 0)); - UNIT_ASSERT(cluster.HasVDisk({0, 1, 0, 0, 0})); - UNIT_ASSERT(!cluster.HasVDisk({0, 1, 0, 1, 0})); - CheckVDisk(cluster.VDisk({0, 1, 0, 0, 0}), {0, 1, 0, 0, 0}, 1, UP, 1, 0); - UNIT_ASSERT_VALUES_EQUAL(cluster.PDisk(NCms::TPDiskID(1, 1)).VDisks.size(), 1); - UNIT_ASSERT(cluster.PDisk(NCms::TPDiskID(1, 1)).VDisks.contains(TVDiskID(0, 1, 0, 0, 0))); - - cluster.AddPDisk(MakePDiskConfig(2, 2)); - cluster.UpdatePDiskState(NCms::TPDiskID(2, 2), MakePDiskInfo(2)); - UNIT_ASSERT(cluster.HasPDisk(NCms::TPDiskID(2, 2))); - CheckPDisk(cluster.PDisk(NCms::TPDiskID(2, 2)), 2, 2, UP, 0); - - cluster.AddVDisk(MakeVSlotConfig(2, {0, 1, 0, 1, 0}, 2, 0)); - cluster.UpdateVDiskState({0, 1, 0, 1, 0}, MakeVDiskInfo({0, 1, 0, 1, 0}, 2, 0)); - UNIT_ASSERT(cluster.HasVDisk({0, 1, 0, 1, 0})); - UNIT_ASSERT(cluster.HasPDisk(NCms::TPDiskID(2, 2))); - CheckPDisk(cluster.PDisk(NCms::TPDiskID(2, 2)), 2, 2, UP, 1); - UNIT_ASSERT(cluster.PDisk(NCms::TPDiskID(2, 2)).VDisks.contains(TVDiskID(0, 1, 0, 1, 0))); - - cluster.AddBSGroup(MakeBSGroup(1, "none", 1, 1, 0, 2, 2, 0)); - UNIT_ASSERT(cluster.HasBSGroup(1)); - UNIT_ASSERT(!cluster.HasBSGroup(2)); - CheckBSGroup(cluster.BSGroup(1), 1, TErasureType::ErasureNone, 2, + TClusterInfoPtr cluster(new TClusterInfo); + cluster->AddNode(node1, nullptr); + UNIT_ASSERT(cluster->HasNode(1)); + UNIT_ASSERT(!cluster->HasNode(2)); + UNIT_ASSERT(cluster->HasNode("test1")); + UNIT_ASSERT(!cluster->HasNode("test2")); + UNIT_ASSERT_VALUES_EQUAL(cluster->HostNodes("test1").size(), 1); + CheckNode(cluster->Node(1), 1, "test1", "::1", EState::UNKNOWN); + + cluster->AddNode(node2, nullptr); + UNIT_ASSERT(cluster->HasNode(1)); + UNIT_ASSERT(!cluster->HasNode("test1")); + UNIT_ASSERT(cluster->HasNode("test2")); + UNIT_ASSERT_VALUES_EQUAL(cluster->HostNodes("test2").size(), 1); + CheckNode(cluster->Node(1), 1, "test2", "::1", EState::UNKNOWN); + + cluster->AddNode(node3, nullptr); + UNIT_ASSERT(cluster->HasNode(2)); + UNIT_ASSERT(cluster->HasNode("localhost")); + UNIT_ASSERT_VALUES_EQUAL(cluster->NodesCount("localhost"), 1); + UNIT_ASSERT_VALUES_EQUAL(cluster->HostNodes("localhost").size(), 1); + UNIT_ASSERT_VALUES_EQUAL(cluster->HostNodes("localhost")[0]->NodeId, 2); + CheckNode(cluster->Node(2), 2, "localhost", "::2", EState::UNKNOWN); + + cluster->AddNode(node4, nullptr); + UNIT_ASSERT(cluster->HasNode(3)); + UNIT_ASSERT_VALUES_EQUAL(cluster->NodesCount("localhost"), 2); + + cluster->AddPDisk(MakePDiskConfig(1, 1)); + cluster->UpdatePDiskState(NCms::TPDiskID(1, 1), MakePDiskInfo(1)); + UNIT_ASSERT(cluster->HasPDisk(NCms::TPDiskID(1, 1))); + UNIT_ASSERT(!cluster->HasPDisk(NCms::TPDiskID(1, 2))); + UNIT_ASSERT(!cluster->HasPDisk(NCms::TPDiskID(2, 1))); + CheckPDisk(cluster->PDisk(NCms::TPDiskID(1, 1)), 1, 1, UP, 0); + UNIT_ASSERT(cluster->Node(1).PDisks.contains(NCms::TPDiskID(1, 1))); + + cluster->AddVDisk(MakeVSlotConfig(1, {0, 1, 0, 0, 0}, 1, 0)); + cluster->UpdateVDiskState({0, 1, 0, 0, 0}, MakeVDiskInfo({0, 1, 0, 0, 0}, 1, 0)); + UNIT_ASSERT(cluster->HasVDisk({0, 1, 0, 0, 0})); + UNIT_ASSERT(!cluster->HasVDisk({0, 1, 0, 1, 0})); + CheckVDisk(cluster->VDisk({0, 1, 0, 0, 0}), {0, 1, 0, 0, 0}, 1, UP, 1, 0); + UNIT_ASSERT_VALUES_EQUAL(cluster->PDisk(NCms::TPDiskID(1, 1)).VDisks.size(), 1); + UNIT_ASSERT(cluster->PDisk(NCms::TPDiskID(1, 1)).VDisks.contains(TVDiskID(0, 1, 0, 0, 0))); + + cluster->AddPDisk(MakePDiskConfig(2, 2)); + cluster->UpdatePDiskState(NCms::TPDiskID(2, 2), MakePDiskInfo(2)); + UNIT_ASSERT(cluster->HasPDisk(NCms::TPDiskID(2, 2))); + CheckPDisk(cluster->PDisk(NCms::TPDiskID(2, 2)), 2, 2, UP, 0); + + cluster->AddVDisk(MakeVSlotConfig(2, {0, 1, 0, 1, 0}, 2, 0)); + cluster->UpdateVDiskState({0, 1, 0, 1, 0}, MakeVDiskInfo({0, 1, 0, 1, 0}, 2, 0)); + UNIT_ASSERT(cluster->HasVDisk({0, 1, 0, 1, 0})); + UNIT_ASSERT(cluster->HasPDisk(NCms::TPDiskID(2, 2))); + CheckPDisk(cluster->PDisk(NCms::TPDiskID(2, 2)), 2, 2, UP, 1); + UNIT_ASSERT(cluster->PDisk(NCms::TPDiskID(2, 2)).VDisks.contains(TVDiskID(0, 1, 0, 1, 0))); + + cluster->AddBSGroup(MakeBSGroup(1, "none", 1, 1, 0, 2, 2, 0)); + UNIT_ASSERT(cluster->HasBSGroup(1)); + UNIT_ASSERT(!cluster->HasBSGroup(2)); + CheckBSGroup(cluster->BSGroup(1), 1, TErasureType::ErasureNone, 2, TVDiskID(0, 1, 0, 0, 0), TVDiskID(0, 1, 0, 1, 0)); - CheckVDisk(cluster.VDisk({0, 1, 0, 0, 0}), 1, 1); - CheckVDisk(cluster.VDisk({0, 1, 0, 1, 0}), 1, 1); - - cluster.AddPDisk(MakePDiskConfig(3, 3)); - cluster.UpdatePDiskState(NCms::TPDiskID(3, 3), MakePDiskInfo(3)); - UNIT_ASSERT(cluster.HasPDisk(NCms::TPDiskID(3, 3))); - CheckPDisk(cluster.PDisk(NCms::TPDiskID(3, 3)), 3, 3, UP, 0); - - cluster.AddVDisk(MakeVSlotConfig(3, {0, 1, 0, 2, 0}, 3, 0)); - cluster.UpdateVDiskState({0, 1, 0, 2, 0}, MakeVDiskInfo({0, 1, 0, 2, 0}, 3, 0)); - UNIT_ASSERT(cluster.HasVDisk({0, 1, 0, 2, 0})); - CheckVDisk(cluster.VDisk({0, 1, 0, 2, 0}), TVDiskID(0, 1, 0, 2, 0), 3, UP, 3, 0); - - cluster.AddBSGroup(MakeBSGroup(2, "none", 1, 1, 0, 3, 3, 0)); - UNIT_ASSERT(cluster.HasBSGroup(2)); - CheckBSGroup(cluster.BSGroup(2), 2, TErasureType::ErasureNone, 2, + CheckVDisk(cluster->VDisk({0, 1, 0, 0, 0}), 1, 1); + CheckVDisk(cluster->VDisk({0, 1, 0, 1, 0}), 1, 1); + + cluster->AddPDisk(MakePDiskConfig(3, 3)); + cluster->UpdatePDiskState(NCms::TPDiskID(3, 3), MakePDiskInfo(3)); + UNIT_ASSERT(cluster->HasPDisk(NCms::TPDiskID(3, 3))); + CheckPDisk(cluster->PDisk(NCms::TPDiskID(3, 3)), 3, 3, UP, 0); + + cluster->AddVDisk(MakeVSlotConfig(3, {0, 1, 0, 2, 0}, 3, 0)); + cluster->UpdateVDiskState({0, 1, 0, 2, 0}, MakeVDiskInfo({0, 1, 0, 2, 0}, 3, 0)); + UNIT_ASSERT(cluster->HasVDisk({0, 1, 0, 2, 0})); + CheckVDisk(cluster->VDisk({0, 1, 0, 2, 0}), TVDiskID(0, 1, 0, 2, 0), 3, UP, 3, 0); + + cluster->AddBSGroup(MakeBSGroup(2, "none", 1, 1, 0, 3, 3, 0)); + UNIT_ASSERT(cluster->HasBSGroup(2)); + CheckBSGroup(cluster->BSGroup(2), 2, TErasureType::ErasureNone, 2, TVDiskID(0, 1, 0, 0, 0), TVDiskID(0, 1, 0, 2, 0)); - CheckVDisk(cluster.VDisk({0, 1, 0, 0, 0}), 2, 1, 2); - UNIT_ASSERT(cluster.HasVDisk({0, 1, 0, 2, 0})); - CheckVDisk(cluster.VDisk({0, 1, 0, 2, 0}), TVDiskID(0, 1, 0, 2, 0), 3, UP, 3, 1, 2); - - cluster.AddTablet(1, MakeTabletInfo(1, TTabletTypes::Hive, TTabletStateInfo::Active, true)); - UNIT_ASSERT(cluster.HasTablet(1)); - UNIT_ASSERT(!cluster.HasTablet(2)); - UNIT_ASSERT_VALUES_EQUAL(cluster.Node(1).Tablets.size(), 1); - UNIT_ASSERT_VALUES_EQUAL(cluster.Tablet(1).TabletId, 1); - UNIT_ASSERT_VALUES_EQUAL(cluster.Tablet(1).Type, TTabletTypes::Hive); - UNIT_ASSERT_VALUES_EQUAL(cluster.Tablet(1).State, TTabletStateInfo::Active); - UNIT_ASSERT_VALUES_EQUAL(cluster.Tablet(1).Leader, true); + CheckVDisk(cluster->VDisk({0, 1, 0, 0, 0}), 2, 1, 2); + UNIT_ASSERT(cluster->HasVDisk({0, 1, 0, 2, 0})); + CheckVDisk(cluster->VDisk({0, 1, 0, 2, 0}), TVDiskID(0, 1, 0, 2, 0), 3, UP, 3, 1, 2); + + cluster->AddTablet(1, MakeTabletInfo(1, TTabletTypes::Hive, TTabletStateInfo::Active, true)); + UNIT_ASSERT(cluster->HasTablet(1)); + UNIT_ASSERT(!cluster->HasTablet(2)); + UNIT_ASSERT_VALUES_EQUAL(cluster->Node(1).Tablets.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(cluster->Tablet(1).TabletId, 1); + UNIT_ASSERT_VALUES_EQUAL(cluster->Tablet(1).Type, TTabletTypes::Hive); + UNIT_ASSERT_VALUES_EQUAL(cluster->Tablet(1).State, TTabletStateInfo::Active); + UNIT_ASSERT_VALUES_EQUAL(cluster->Tablet(1).Leader, true); auto now = Now(); - cluster.SetTimestamp(now); + cluster->SetTimestamp(now); TPermissionInfo permission; permission.PermissionId = "1"; permission.Owner = "user"; permission.Action = MakeAction(TAction::SHUTDOWN_HOST, "test1", 60000000); permission.Deadline = now - TDuration::Seconds(61); - UNIT_ASSERT_VALUES_EQUAL(cluster.AddLocks(permission, nullptr), 0); + UNIT_ASSERT_VALUES_EQUAL(cluster->AddLocks(permission, nullptr), 0); permission.Action.SetHost("test2"); - UNIT_ASSERT_VALUES_EQUAL(cluster.AddLocks(permission, nullptr), 0); + UNIT_ASSERT_VALUES_EQUAL(cluster->AddLocks(permission, nullptr), 0); permission.Deadline = now + TDuration::Seconds(60); - UNIT_ASSERT_VALUES_EQUAL(cluster.AddLocks(permission, nullptr), 1); - UNIT_ASSERT_VALUES_EQUAL(cluster.HostNodes("test2").size(), 1); - UNIT_ASSERT_VALUES_EQUAL(cluster.HostNodes("test2")[0]->Lock->Action.GetType(), TAction::SHUTDOWN_HOST); - UNIT_ASSERT_VALUES_EQUAL(cluster.HostNodes("test2")[0]->Lock->ActionDeadline, now + TDuration::Minutes(2)); + UNIT_ASSERT_VALUES_EQUAL(cluster->AddLocks(permission, nullptr), 1); + UNIT_ASSERT_VALUES_EQUAL(cluster->HostNodes("test2").size(), 1); + UNIT_ASSERT_VALUES_EQUAL(cluster->HostNodes("test2")[0]->Lock->Action.GetType(), TAction::SHUTDOWN_HOST); + UNIT_ASSERT_VALUES_EQUAL(cluster->HostNodes("test2")[0]->Lock->ActionDeadline, now + TDuration::Minutes(2)); permission.Action.SetHost("2"); permission.Deadline = now - TDuration::Seconds(30); - cluster.SetNodeState(2, DOWN, MakeSystemStateInfo("1")); - UNIT_ASSERT_VALUES_EQUAL(cluster.AddLocks(permission, nullptr), 1); - UNIT_ASSERT_VALUES_EQUAL(cluster.Node(2).State, EState::RESTART); - UNIT_ASSERT_VALUES_EQUAL(cluster.Node(2).Lock->ActionDeadline, now + TDuration::Seconds(30)); + cluster->SetNodeState(2, DOWN, MakeSystemStateInfo("1")); + UNIT_ASSERT_VALUES_EQUAL(cluster->AddLocks(permission, nullptr), 1); + UNIT_ASSERT_VALUES_EQUAL(cluster->Node(2).State, EState::RESTART); + UNIT_ASSERT_VALUES_EQUAL(cluster->Node(2).Lock->ActionDeadline, now + TDuration::Seconds(30)); - cluster.ClearNode(1); - UNIT_ASSERT(!cluster.HasTablet(1)); - UNIT_ASSERT_VALUES_EQUAL(cluster.Node(1).Tablets.size(), 0); - UNIT_ASSERT_VALUES_EQUAL(cluster.Node(1).State, DOWN); + cluster->ClearNode(1); + UNIT_ASSERT(!cluster->HasTablet(1)); + UNIT_ASSERT_VALUES_EQUAL(cluster->Node(1).Tablets.size(), 0); + UNIT_ASSERT_VALUES_EQUAL(cluster->Node(1).State, DOWN); - auto point1 = cluster.PushRollbackPoint(); + auto point1 = cluster->PushRollbackPoint(); auto action1 = MakeAction(TAction::SHUTDOWN_HOST, 3, 60000000); - UNIT_ASSERT_VALUES_EQUAL(cluster.AddTempLocks(action1, nullptr), 1); - UNIT_ASSERT_VALUES_EQUAL(cluster.Node(3).TempLocks.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(cluster->AddTempLocks(action1, nullptr), 1); + UNIT_ASSERT_VALUES_EQUAL(cluster->Node(3).TempLocks.size(), 1); - cluster.RollbackLocks(point1); - UNIT_ASSERT_VALUES_EQUAL(cluster.Node(3).TempLocks.size(), 0); + cluster->RollbackLocks(point1); + UNIT_ASSERT_VALUES_EQUAL(cluster->Node(3).TempLocks.size(), 0); - auto point2 = cluster.PushRollbackPoint(); + auto point2 = cluster->PushRollbackPoint(); auto action2 = MakeAction(TAction::REPLACE_DEVICES, 3, 60000000, "pdisk-1-1", "vdisk-0-1-0-1-0"); permission.Action = action2; permission.Deadline = now + TDuration::Seconds(60); - UNIT_ASSERT(!cluster.PDisk(NCms::TPDiskID(1, 1)).Lock.Defined()); - UNIT_ASSERT(!cluster.VDisk(TVDiskID(0, 1, 0, 1, 0)).Lock.Defined()); - UNIT_ASSERT_VALUES_EQUAL(cluster.AddLocks(permission, nullptr), 2); - UNIT_ASSERT(cluster.PDisk(NCms::TPDiskID(1, 1)).Lock.Defined()); - UNIT_ASSERT(cluster.VDisk(TVDiskID(0, 1, 0, 1, 0)).Lock.Defined()); - UNIT_ASSERT_VALUES_EQUAL(cluster.AddTempLocks(action2, nullptr), 2); - UNIT_ASSERT_VALUES_EQUAL(cluster.PDisk(NCms::TPDiskID(1, 1)).TempLocks.size(), 1); - UNIT_ASSERT_VALUES_EQUAL(cluster.VDisk(TVDiskID(0, 1, 0, 1, 0)).TempLocks.size(), 1); - cluster.RollbackLocks(point2); - UNIT_ASSERT(cluster.PDisk(NCms::TPDiskID(1, 1)).Lock.Defined()); - UNIT_ASSERT(cluster.VDisk(TVDiskID(0, 1, 0, 1, 0)).Lock.Defined()); - UNIT_ASSERT_VALUES_EQUAL(cluster.PDisk(NCms::TPDiskID(1, 1)).TempLocks.size(), 0); - UNIT_ASSERT_VALUES_EQUAL(cluster.VDisk(TVDiskID(0, 1, 0, 1, 0)).TempLocks.size(), 0); + UNIT_ASSERT(!cluster->PDisk(NCms::TPDiskID(1, 1)).Lock.Defined()); + UNIT_ASSERT(!cluster->VDisk(TVDiskID(0, 1, 0, 1, 0)).Lock.Defined()); + UNIT_ASSERT_VALUES_EQUAL(cluster->AddLocks(permission, nullptr), 2); + UNIT_ASSERT(cluster->PDisk(NCms::TPDiskID(1, 1)).Lock.Defined()); + UNIT_ASSERT(cluster->VDisk(TVDiskID(0, 1, 0, 1, 0)).Lock.Defined()); + UNIT_ASSERT_VALUES_EQUAL(cluster->AddTempLocks(action2, nullptr), 2); + UNIT_ASSERT_VALUES_EQUAL(cluster->PDisk(NCms::TPDiskID(1, 1)).TempLocks.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(cluster->VDisk(TVDiskID(0, 1, 0, 1, 0)).TempLocks.size(), 1); + cluster->RollbackLocks(point2); + UNIT_ASSERT(cluster->PDisk(NCms::TPDiskID(1, 1)).Lock.Defined()); + UNIT_ASSERT(cluster->VDisk(TVDiskID(0, 1, 0, 1, 0)).Lock.Defined()); + UNIT_ASSERT_VALUES_EQUAL(cluster->PDisk(NCms::TPDiskID(1, 1)).TempLocks.size(), 0); + UNIT_ASSERT_VALUES_EQUAL(cluster->VDisk(TVDiskID(0, 1, 0, 1, 0)).TempLocks.size(), 0); auto request1 = MakeRequest("request-1", "user-1", 1, MakeAction(TAction::REPLACE_DEVICES, 1, 60000000, @@ -425,42 +425,42 @@ Y_UNIT_TEST_SUITE(TClusterInfoTest) { auto request4 = MakeRequest("request-4", "user-4", 4, MakeAction(TAction::REPLACE_DEVICES, 1, 60000000, "pdisk-1-1", "vdisk-0-1-0-1-0")); - UNIT_ASSERT_VALUES_EQUAL(cluster.ScheduleActions(request2, nullptr), 3); - UNIT_ASSERT_VALUES_EQUAL(cluster.ScheduleActions(request4, nullptr), 2); - UNIT_ASSERT_VALUES_EQUAL(cluster.ScheduleActions(request3, nullptr), 2); - UNIT_ASSERT_VALUES_EQUAL(cluster.ScheduleActions(request1, nullptr), 3); - - CheckScheduledLocks(cluster.Node(1), "request-1", "user-1", 1); - CheckScheduledLocks(cluster.Node(3), "request-2", "user-2", 2); - CheckScheduledLocks(cluster.PDisk(NCms::TPDiskID(1, 1)), "request-1", "user-1", 1, + UNIT_ASSERT_VALUES_EQUAL(cluster->ScheduleActions(request2, nullptr), 3); + UNIT_ASSERT_VALUES_EQUAL(cluster->ScheduleActions(request4, nullptr), 2); + UNIT_ASSERT_VALUES_EQUAL(cluster->ScheduleActions(request3, nullptr), 2); + UNIT_ASSERT_VALUES_EQUAL(cluster->ScheduleActions(request1, nullptr), 3); + + CheckScheduledLocks(cluster->Node(1), "request-1", "user-1", 1); + CheckScheduledLocks(cluster->Node(3), "request-2", "user-2", 2); + CheckScheduledLocks(cluster->PDisk(NCms::TPDiskID(1, 1)), "request-1", "user-1", 1, "request-2", "user-2", 2, "request-3", "user-3", 3, "request-4", "user-4", 4); - CheckScheduledLocks(cluster.VDisk({0, 1, 0, 1, 0}), + CheckScheduledLocks(cluster->VDisk({0, 1, 0, 1, 0}), "request-1", "user-1", 1, "request-2", "user-2", 2, "request-3", "user-3", 3, "request-4", "user-4", 4); - cluster.DeactivateScheduledLocks(request2.Order); + cluster->DeactivateScheduledLocks(request2.Order); TErrorInfo error; - UNIT_ASSERT(cluster.Node(1).IsLocked(error, TDuration(), Now(), TDuration())); - UNIT_ASSERT(!cluster.Node(3).IsLocked(error, TDuration(), Now(), TDuration())); + UNIT_ASSERT(cluster->Node(1).IsLocked(error, TDuration(), Now(), TDuration())); + UNIT_ASSERT(!cluster->Node(3).IsLocked(error, TDuration(), Now(), TDuration())); - cluster.ReactivateScheduledLocks(); + cluster->ReactivateScheduledLocks(); - UNIT_ASSERT(cluster.Node(1).IsLocked(error, TDuration(), Now(), TDuration())); - UNIT_ASSERT(cluster.Node(3).IsLocked(error, TDuration(), Now(), TDuration())); + UNIT_ASSERT(cluster->Node(1).IsLocked(error, TDuration(), Now(), TDuration())); + UNIT_ASSERT(cluster->Node(3).IsLocked(error, TDuration(), Now(), TDuration())); - cluster.UnscheduleActions(request3.RequestId); - cluster.UnscheduleActions(request2.RequestId); + cluster->UnscheduleActions(request3.RequestId); + cluster->UnscheduleActions(request2.RequestId); - CheckScheduledLocks(cluster.Node(1), "request-1", "user-1", 1); - UNIT_ASSERT(cluster.Node(3).ScheduledLocks.empty()); - CheckScheduledLocks(cluster.PDisk(NCms::TPDiskID(1, 1)), "request-1", "user-1", 1, + CheckScheduledLocks(cluster->Node(1), "request-1", "user-1", 1); + UNIT_ASSERT(cluster->Node(3).ScheduledLocks.empty()); + CheckScheduledLocks(cluster->PDisk(NCms::TPDiskID(1, 1)), "request-1", "user-1", 1, "request-4", "user-4", 4); - CheckScheduledLocks(cluster.VDisk({0, 1, 0, 1, 0}), + CheckScheduledLocks(cluster->VDisk({0, 1, 0, 1, 0}), "request-1", "user-1", 1, "request-4", "user-4", 4); diff --git a/ydb/core/cms/cms.cpp b/ydb/core/cms/cms.cpp index 6bdff4b4af..bc14f117e2 100644 --- a/ydb/core/cms/cms.cpp +++ b/ydb/core/cms/cms.cpp @@ -1,24 +1,28 @@ #include "cms_impl.h" #include "info_collector.h" -#include "library/cpp/actors/core/actor.h" -#include "library/cpp/actors/core/hfunc.h" +#include "erasure_checkers.h" +#include "node_checkers.h" #include "scheme.h" #include "sentinel.h" -#include "erasure_checkers.h" -#include "ydb/core/protos/config_units.pb.h" #include <ydb/core/actorlib_impl/long_timer.h> #include <ydb/core/base/appdata.h> #include <ydb/core/base/counters.h> #include <ydb/core/base/statestorage.h> #include <ydb/core/base/statestorage_impl.h> -#include <ydb/core/cms/console/config_helpers.h> #include <ydb/core/base/ticket_parser.h> +#include <ydb/core/cms/console/config_helpers.h> +#include <ydb/core/erasure/erasure.h> +#include <ydb/core/protos/cms.pb.h> +#include <ydb/core/protos/config_units.pb.h> #include <ydb/core/tablet_flat/tablet_flat_executed.h> +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/interconnect/interconnect.h> #include <util/generic/serialized_enum.h> +#include <util/string/builder.h> #include <util/string/join.h> #include <util/system/hostname.h> @@ -27,50 +31,6 @@ namespace NKikimr::NCms { using namespace NNodeWhiteboard; using namespace NKikimrCms; -void TCms::TNodeCounter::CountNode(const TNodeInfo &node, - bool ignoreDownState) -{ - ++Total; - TErrorInfo error; - if (node.IsLocked(error, TDuration(), TActivationContext::Now(), TDuration())) { - ++Locked; - if (error.Code == TStatus::DISALLOW) - Code = error.Code; - } else if (!ignoreDownState && node.IsDown(error, TInstant())) { - ++Down; - } -} - -bool TCms::TNodeCounter::CheckLimit(ui32 limit, - EAvailabilityMode mode) const -{ - // No limit is set. - if (!limit) - return true; - // Limit is OK. - if ((Down + Locked + 1) <= limit) - return true; - // Allow to restart at least one node for forced restart mode. - return (mode == MODE_FORCE_RESTART - && !Locked); -} - -bool TCms::TNodeCounter::CheckRatio(ui32 ratio, - EAvailabilityMode mode) const -{ - // No limit is set. - if (!ratio) - return true; - // Always allow at least one node to be locked. - if (!Down && !Locked) - return true; - // Limit is OK. - if (((Down + Locked + 1) * 100) <= (Total * ratio)) - return true; - // Allow to restart at least one node for forced restart mode. - return (mode == MODE_FORCE_RESTART - && !Locked); -} void TCms::OnActivateExecutor(const TActorContext &ctx) { @@ -648,74 +608,25 @@ bool TCms::TryToLockNode(const TAction& action, TDuration duration = TDuration::MicroSeconds(action.GetDuration()); duration += opts.PermissionDuration; - if (node.IsLocked(error, State->Config.DefaultRetryTime, TActivationContext::Now(), duration)) - return false; - - ui32 tenantLimit = State->Config.TenantLimits.GetDisabledNodesLimit(); - ui32 tenantRatioLimit = State->Config.TenantLimits.GetDisabledNodesRatioLimit(); - ui32 clusterLimit = State->Config.ClusterLimits.GetDisabledNodesLimit(); - ui32 clusterRatioLimit = State->Config.ClusterLimits.GetDisabledNodesRatioLimit(); + bool isForceRestart = opts.AvailabilityMode == NKikimrCms::MODE_FORCE_RESTART; - // Check if limits should be checked. - if ((opts.TenantPolicy == NONE - || !node.Tenant - || (!tenantLimit && !tenantRatioLimit)) - && !clusterLimit - && !clusterRatioLimit) - return true; - - TNodeCounter tenantNodes; - TNodeCounter clusterNodes; - for (const auto& pr : ClusterInfo->AllNodes()) { - const auto& otherNode = pr.second; - bool ignoreDown = node.NodeId == otherNode->NodeId; - clusterNodes.CountNode(*otherNode, ignoreDown); - if (node.Tenant == otherNode->Tenant) - tenantNodes.CountNode(*otherNode, ignoreDown); - } - - if (opts.TenantPolicy == DEFAULT - && node.Tenant) { - if (!tenantNodes.CheckLimit(tenantLimit, opts.AvailabilityMode)) { - error.Code = tenantNodes.Code; - error.Reason = TStringBuilder() << "Too many locked nodes for " << node.Tenant - << " locked: " << tenantNodes.Locked - << " down: " << tenantNodes.Down - << " total: " << tenantNodes.Total - << " limit: " << tenantLimit; - error.Deadline = TActivationContext::Now() + State->Config.DefaultRetryTime; - return false; - } - if (!tenantNodes.CheckRatio(tenantRatioLimit, opts.AvailabilityMode)) { - error.Code = tenantNodes.Code; - error.Reason = TStringBuilder() << "Too many locked nodes for " << node.Tenant - << " locked: " << tenantNodes.Locked - << " down: " << tenantNodes.Down - << " total: " << tenantNodes.Total - << " limit: " << tenantRatioLimit << "%"; - error.Deadline = TActivationContext::Now() + State->Config.DefaultRetryTime; - return false; - } - } - - if (!clusterNodes.CheckLimit(clusterLimit, opts.AvailabilityMode)) { - error.Code = clusterNodes.Code; - error.Reason = TStringBuilder() << "Too many locked nodes" - << " locked: " << clusterNodes.Locked - << " down: " << clusterNodes.Down - << " total: " << clusterNodes.Total - << " limit: " << clusterLimit; + if (!ClusterInfo->ClusterNodes->TryToLockNode(node.NodeId, isForceRestart)) + { + error.Code = TStatus::DISALLOW_TEMP; + error.Reason = ClusterInfo->ClusterNodes->ReadableReason(); error.Deadline = TActivationContext::Now() + State->Config.DefaultRetryTime; + return false; } - if (!clusterNodes.CheckRatio(clusterRatioLimit, opts.AvailabilityMode)) { - error.Code = clusterNodes.Code; - error.Reason = TStringBuilder() << "Too many locked nodes" - << " locked: " << clusterNodes.Locked - << " down: " << clusterNodes.Down - << " total: " << clusterNodes.Total - << " limit: " << clusterRatioLimit << "%"; + + if (node.Tenant + && opts.TenantPolicy != NONE + && !ClusterInfo->TenantNodesChecker[node.Tenant]->TryToLockNode(node.NodeId, isForceRestart)) + { + error.Code = TStatus::DISALLOW_TEMP; + error.Reason = ClusterInfo->TenantNodesChecker[node.Tenant]->ReadableReason(); error.Deadline = TActivationContext::Now() + State->Config.DefaultRetryTime; + return false; } @@ -810,9 +721,9 @@ bool TCms::TryToLockVDisk(const TActionOptions& opts, } break; case MODE_FORCE_RESTART: - if ( counters->GroupAlreadyHasLockedDisks() && opts.PartialPermissionAllowed) { + if ( counters->GroupAlreadyHasLockedDisks() && opts.PartialPermissionAllowed) { error.Code = TStatus::DISALLOW_TEMP; - error.Reason = "You cannot get two or more disks from the same group at the same time" + error.Reason = "You cannot get two or more disks from the same group at the same time" " without specifying the PartialPermissionAllowed parameter"; error.Deadline = defaultDeadline; return false; @@ -888,6 +799,7 @@ void TCms::AcceptPermissions(TPermissionResponse &resp, const TString &requestId auto &permission = *resp.MutablePermissions(i); permission.SetId(owner + "-p-" + ToString(State->NextPermissionId++)); State->Permissions.emplace(permission.GetId(), TPermissionInfo(permission, requestId, owner)); + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::CMS, "Accepting permission"); ClusterInfo->AddLocks(permission, requestId, owner, &ctx); if (!check || owner != WALLE_CMS_USER) { @@ -1500,11 +1412,21 @@ void TCms::Handle(TEvPrivate::TEvClusterInfo::TPtr &ev, const TActorContext &ctx info->ApplyDowntimes(State->Downtimes); } + // We need to generate NodeCheckers after MigrateOldInfo to get + // all the information about the tenants on the disconnected nodes + info->GenerateTenantNodesCheckers(); + AdjustInfo(info, ctx); State->ClusterInfo = info; ClusterInfo = info; + ui32 tenantLimit = State->Config.TenantLimits.GetDisabledNodesLimit(); + ui32 tenantRatioLimit = State->Config.TenantLimits.GetDisabledNodesRatioLimit(); + ui32 clusterLimit = State->Config.ClusterLimits.GetDisabledNodesLimit(); + ui32 clusterRatioLimit = State->Config.ClusterLimits.GetDisabledNodesRatioLimit(); + ClusterInfo->ApplyNodeLimits(clusterLimit, clusterRatioLimit, tenantLimit, tenantRatioLimit); + ClusterInfo->UpdateDowntimes(State->Downtimes, ctx); Execute(CreateTxUpdateDowntimes(), ctx); @@ -1658,7 +1580,13 @@ void TCms::Handle(TEvCms::TEvPermissionRequest::TPtr &ev, } } + ClusterInfo->LogManager.PushRollbackPoint(); + for (const auto &scheduled_request : State->ScheduledRequests) { + for (auto &action : scheduled_request.second.Request.GetActions()) + ClusterInfo->LogManager.ApplyAction(action, ClusterInfo); + } bool ok = CheckPermissionRequest(rec, resp->Record, scheduled.Request, ctx); + ClusterInfo->LogManager.RollbackOperations(); // Schedule request if required. if (rec.GetDryRun()) { @@ -1714,12 +1642,20 @@ void TCms::Handle(TEvCms::TEvCheckRequest::TPtr &ev, const TActorContext &ctx) TAutoPtr<TEvCms::TEvPermissionResponse> resp = new TEvCms::TEvPermissionResponse; TRequestInfo scheduled; + ClusterInfo->LogManager.PushRollbackPoint(); + for (const auto &scheduled_request : State->ScheduledRequests) { + if (scheduled_request.second.Order < request.Order) { + for (auto &action : scheduled_request.second.Request.GetActions()) + ClusterInfo->LogManager.ApplyAction(action, ClusterInfo); + } + } // Deactivate locks of this and later requests to // avoid false conflicts. ClusterInfo->DeactivateScheduledLocks(request.Order); request.Request.SetAvailabilityMode(rec.GetAvailabilityMode()); bool ok = CheckPermissionRequest(request.Request, resp->Record, scheduled.Request, ctx); ClusterInfo->ReactivateScheduledLocks(); + ClusterInfo->LogManager.RollbackOperations(); // Schedule request if required. if (rec.GetDryRun()) { diff --git a/ydb/core/cms/cms_impl.h b/ydb/core/cms/cms_impl.h index 4012fb2c59..249c8238c4 100644 --- a/ydb/core/cms/cms_impl.h +++ b/ydb/core/cms/cms_impl.h @@ -110,19 +110,6 @@ private: {} }; - struct TNodeCounter { - ui32 Total = 0; - ui32 Down = 0; - ui32 Locked = 0; - NKikimrCms::TStatus::ECode Code = NKikimrCms::TStatus::DISALLOW_TEMP; - - void CountNode(const TNodeInfo &node, bool ignoreDownState); - // Check if we can lock one more node and stay in lock limits. - bool CheckLimit(ui32 limit, NKikimrCms::EAvailabilityMode mode) const; - // Check if we can lock one more node and stay in lock ration limits. - bool CheckRatio(ui32 ratio, NKikimrCms::EAvailabilityMode mode) const; - }; - ITransaction *CreateTxGetLogTail(TEvCms::TEvGetLogTailRequest::TPtr &ev); ITransaction *CreateTxInitScheme(); ITransaction *CreateTxLoadState(); diff --git a/ydb/core/cms/cms_ut_common.cpp b/ydb/core/cms/cms_ut_common.cpp index e159470783..801896088a 100644 --- a/ydb/core/cms/cms_ut_common.cpp +++ b/ydb/core/cms/cms_ut_common.cpp @@ -379,9 +379,10 @@ static NKikimrConfig::TBootstrap GenerateBootstrapConfig(TTestActorRuntime &runt TVector<ui32> nodes; nodes.reserve(nodesCount); for (ui32 nodeIndex = 0; nodeIndex < nodesCount; ++nodeIndex) { - ui32 nodeId = runtime.GetNodeId(nodeIndex); - if (tenants.contains(nodeId)) + if (tenants.contains(nodeIndex)) continue; + + ui32 nodeId = runtime.GetNodeId(nodeIndex); nodes.push_back(nodeId); } diff --git a/ydb/core/cms/erasure_checkers.cpp b/ydb/core/cms/erasure_checkers.cpp index e3ad883620..ffca291764 100644 --- a/ydb/core/cms/erasure_checkers.cpp +++ b/ydb/core/cms/erasure_checkers.cpp @@ -2,9 +2,9 @@ namespace NKikimr::NCms { -bool TErasureCounterBase::IsDown(const TVDiskInfo& vdisk, +bool TErasureCounterBase::IsDown(const TVDiskInfo& vdisk, TClusterInfoPtr info, - TDuration& retryTime, + TDuration& retryTime, TErrorInfo& error) { const auto& node = info->Node(vdisk.NodeId); @@ -24,9 +24,9 @@ bool TErasureCounterBase::IsDown(const TVDiskInfo& vdisk, } bool TErasureCounterBase::IsLocked(const TVDiskInfo& vdisk, - TClusterInfoPtr info, + TClusterInfoPtr info, TDuration& retryTime, - TDuration& duration, + TDuration& duration, TErrorInfo& error) { const auto& node = info->Node(vdisk.NodeId); @@ -40,8 +40,8 @@ bool TErasureCounterBase::IsLocked(const TVDiskInfo& vdisk, return false; } - return node.IsLocked(error, retryTime, TActivationContext::Now(), duration) - || pdisk.IsLocked(error, retryTime, TActivationContext::Now(), duration) + return node.IsLocked(error, retryTime, TActivationContext::Now(), duration) + || pdisk.IsLocked(error, retryTime, TActivationContext::Now(), duration) || vdisk.IsLocked(error, retryTime, TActivationContext::Now(), duration); } @@ -50,8 +50,8 @@ bool TErasureCounterBase::GroupAlreadyHasLockedDisks() const return HasAlreadyLockedDisks; } -bool TErasureCounterBase::CheckForMaxAvailability(TErrorInfo& error, - TInstant& defaultDeadline, +bool TErasureCounterBase::CheckForMaxAvailability(TErrorInfo& error, + TInstant& defaultDeadline, bool allowPartial) const { if (Locked + Down > 1) { @@ -74,8 +74,8 @@ bool TErasureCounterBase::CheckForMaxAvailability(TErrorInfo& error, void TDefaultErasureCounter::CountVDisk(const TVDiskInfo& vdisk, TClusterInfoPtr info, TDuration retryTime, - TDuration duration, - TErrorInfo& error) + TDuration duration, + TErrorInfo& error) { Y_VERIFY_DEBUG(vdisk.VDiskId != VDisk.VDiskId); @@ -84,7 +84,7 @@ void TDefaultErasureCounter::CountVDisk(const TVDiskInfo& vdisk, if (IsLocked(vdisk, info, retryTime, duration, err)) { ++Locked; error.Code = err.Code; - error.Reason = TStringBuilder() << "Issue in affected group " << GroupId + error.Reason = TStringBuilder() << "Issue in affected group " << GroupId << ". " << err.Reason; error.Deadline = Max(error.Deadline, err.Deadline); return; @@ -100,14 +100,14 @@ void TDefaultErasureCounter::CountVDisk(const TVDiskInfo& vdisk, } } -bool TDefaultErasureCounter::CheckForKeepAvailability(TClusterInfoPtr info, - TErrorInfo& error, - TInstant& defaultDeadline, +bool TDefaultErasureCounter::CheckForKeepAvailability(TClusterInfoPtr info, + TErrorInfo& error, + TInstant& defaultDeadline, bool allowPartial) const { if (HasAlreadyLockedDisks && allowPartial) { error.Code = TStatus::DISALLOW_TEMP; - error.Reason = "You cannot get two or more disks from the same group at the same time" + error.Reason = "You cannot get two or more disks from the same group at the same time" " without specifying the PartialPermissionAllowed parameter"; error.Deadline = defaultDeadline; return false; @@ -129,16 +129,16 @@ bool TDefaultErasureCounter::CheckForKeepAvailability(TClusterInfoPtr info, return true; } -bool TMirror3dcCounter::CheckForKeepAvailability(TClusterInfoPtr info, - TErrorInfo& error, - TInstant& defaultDeadline, +bool TMirror3dcCounter::CheckForKeepAvailability(TClusterInfoPtr info, + TErrorInfo& error, + TInstant& defaultDeadline, bool allowPartial) const { Y_UNUSED(info); if (HasAlreadyLockedDisks && allowPartial) { error.Code = TStatus::DISALLOW_TEMP; - error.Reason = "You cannot get two or more disks from the same group at the same time" + error.Reason = "You cannot get two or more disks from the same group at the same time" " without specifying the PartialPermissionAllowed parameter"; error.Deadline = defaultDeadline; return false; @@ -146,10 +146,10 @@ bool TMirror3dcCounter::CheckForKeepAvailability(TClusterInfoPtr info, if (DataCenterDisabledNodes.size() <= 1) return true; - + if (DataCenterDisabledNodes.size() == 2 && (DataCenterDisabledNodes.begin()->second <= 1 - || (++DataCenterDisabledNodes.begin())->second <= 1)) + || (++DataCenterDisabledNodes.begin())->second <= 1)) { return true; } @@ -163,7 +163,7 @@ bool TMirror3dcCounter::CheckForKeepAvailability(TClusterInfoPtr info, if (DataCenterDisabledNodes.size() > 2) { error.Code = TStatus::DISALLOW_TEMP; - error.Reason = TStringBuilder() << "Issue in affected group " << GroupId + error.Reason = TStringBuilder() << "Issue in affected group " << GroupId << ". Too many data centers have unavailable vdisks: " << DataCenterDisabledNodes.size(); error.Deadline = defaultDeadline; @@ -171,7 +171,7 @@ bool TMirror3dcCounter::CheckForKeepAvailability(TClusterInfoPtr info, } error.Code = TStatus::DISALLOW_TEMP; - error.Reason = TStringBuilder() << "Issue in affected group " << GroupId + error.Reason = TStringBuilder() << "Issue in affected group " << GroupId << ". Data centers have too many unavailable vdisks"; error.Deadline = defaultDeadline; @@ -179,10 +179,10 @@ bool TMirror3dcCounter::CheckForKeepAvailability(TClusterInfoPtr info, } void TMirror3dcCounter::CountVDisk(const TVDiskInfo& vdisk, - TClusterInfoPtr info, + TClusterInfoPtr info, TDuration retryTime, TDuration duration, - TErrorInfo& error) + TErrorInfo& error) { Y_VERIFY_DEBUG(vdisk.VDiskId != VDisk.VDiskId); @@ -191,7 +191,7 @@ void TMirror3dcCounter::CountVDisk(const TVDiskInfo& vdisk, if (IsLocked(vdisk, info, retryTime, duration, err) || IsDown(vdisk, info, retryTime, err)) { error.Code = err.Code; - error.Reason = TStringBuilder() << "Issue in affected group " << GroupId + error.Reason = TStringBuilder() << "Issue in affected group " << GroupId << ". " << err.Reason; error.Deadline = Max(error.Deadline, err.Deadline); ++Locked; @@ -200,9 +200,9 @@ void TMirror3dcCounter::CountVDisk(const TVDiskInfo& vdisk, } void TMirror3dcCounter::CountGroupState(TClusterInfoPtr info, - TDuration retryTime, + TDuration retryTime, TDuration duration, - TErrorInfo &error) + TErrorInfo &error) { for (const auto &vdId : info->BSGroup(GroupId).VDisks) { if (vdId != VDisk.VDiskId) @@ -219,7 +219,7 @@ void TMirror3dcCounter::CountGroupState(TClusterInfoPtr info, void TDefaultErasureCounter::CountGroupState(TClusterInfoPtr info, TDuration retryTime, TDuration duration, - TErrorInfo &error) + TErrorInfo &error) { for (const auto &vdId : info->BSGroup(GroupId).VDisks) { if (vdId != VDisk.VDiskId) @@ -231,8 +231,8 @@ void TDefaultErasureCounter::CountGroupState(TClusterInfoPtr info, ++Locked; } -TSimpleSharedPtr<IErasureCounter> CreateErasureCounter(TErasureType::EErasureSpecies es, - const TVDiskInfo& vdisk, ui32 groupId) +TSimpleSharedPtr<IErasureCounter> CreateErasureCounter(TErasureType::EErasureSpecies es, + const TVDiskInfo& vdisk, ui32 groupId) { switch (es) { case TErasureType::ErasureNone: diff --git a/ydb/core/cms/info_collector.cpp b/ydb/core/cms/info_collector.cpp index 14e3fce757..221a6a333d 100644 --- a/ydb/core/cms/info_collector.cpp +++ b/ydb/core/cms/info_collector.cpp @@ -121,6 +121,8 @@ private: bool BootstrapConfigReceived; bool BaseConfigReceived; bool StateStorageInfoReceived; + + THashSet<ui32> UndeliveredNodes; THashMap<ui32, TSet<ui32>> NodeEvents; // nodeId -> expected events THashMap<TPDiskID, TPDiskStateInfo, TPDiskIDHash> PDiskInfo; THashMap<TVDiskID, TVDiskStateInfo> VDiskInfo; @@ -150,6 +152,12 @@ void TInfoCollector::ReplyAndDie() { Info->ApplyStateStorageInfo(Info->StateStorageInfo); } + // It is also necessary to mark the disks, + // and to do this we must wait for the base config + for (const auto &nodeId : UndeliveredNodes) { + Info->ClearNode(nodeId); + } + Send(Client, std::move(ev)); PassAway(); } @@ -484,7 +492,7 @@ void TInfoCollector::Handle(TEvents::TEvUndelivered::TPtr& ev) { if (msg.SourceType == TEvTenantPool::EvGetStatus && msg.Reason == TEvents::TEvUndelivered::ReasonActorUnknown) { ResponseProcessed(nodeId, TEvTenantPool::EvTenantPoolStatus); } else { - Info->ClearNode(nodeId); + UndeliveredNodes.insert(nodeId); NodeEvents[nodeId].clear(); } diff --git a/ydb/core/cms/node_checkers.cpp b/ydb/core/cms/node_checkers.cpp new file mode 100644 index 0000000000..b5d3c6dbf7 --- /dev/null +++ b/ydb/core/cms/node_checkers.cpp @@ -0,0 +1,131 @@ +#include "node_checkers.h" +#include "util/string/cast.h" + +#include <ydb/core/protos/cms.pb.h> + +namespace NKikimr::NCms { + +#define NCH_LOG_D(stream) LOG_DEBUG_S (*TlsActivationContext, NKikimrServices::CMS, "[Checker] " << stream) +#define NCH_LOG_T(stream) LOG_TRACE_S (*TlsActivationContext, NKikimrServices::CMS, "[Checker] " << stream) + +TNodesStateBase::ENodeState TNodesStateBase::NodeState(NKikimrCms::EState state) { + switch (state) { + case NKikimrCms::UP: + return NODE_STATE_UP; + case NKikimrCms::UNKNOWN: + return NODE_STATE_UNSPECIFIED; + case NKikimrCms::DOWN: + return NODE_STATE_DOWN; + case NKikimrCms::RESTART: + return NODE_STATE_RESTART; + default: + Y_FAIL("Unknown EState"); + } +} + +void TNodesStateBase::AddNode(ui32 nodeId) { + if (NodeToState.contains(nodeId)) { + return; + } + NodeToState[nodeId] = NODE_STATE_UNSPECIFIED; +} + +void TNodesStateBase::UpdateNode(ui32 nodeId, NKikimrCms::EState state) { + if (!NodeToState.contains(nodeId)) { + AddNode(nodeId); + } + + if (NodeToState[nodeId] == NODE_STATE_DOWN) { + --DownNodesCount; + } + + if (NodeToState[nodeId] == NODE_STATE_LOCKED || + NodeToState[nodeId] == NODE_STATE_RESTART) { + --LockedNodesCount; + } + + const auto nodeState = NodeState(state); + NodeToState[nodeId] = nodeState; + + if (nodeState == NODE_STATE_RESTART || nodeState == NODE_STATE_LOCKED) { + ++LockedNodesCount; + } + + if (nodeState == NODE_STATE_DOWN) { + ++DownNodesCount; + } +} + +void TNodesStateBase::LockNode(ui32 nodeId) { + Y_VERIFY(NodeToState.contains(nodeId)); + + ++LockedNodesCount; + if (NodeToState[nodeId] == NODE_STATE_DOWN) { + NodeToState[nodeId] = NODE_STATE_RESTART; + --DownNodesCount; + } else { + NodeToState[nodeId] = NODE_STATE_LOCKED; + } +} + +void TNodesStateBase::UnlockNode(ui32 nodeId) { + Y_VERIFY(NodeToState.contains(nodeId)); + + --LockedNodesCount; + if (NodeToState[nodeId] == NODE_STATE_RESTART) { + NodeToState[nodeId] = NODE_STATE_DOWN; + ++DownNodesCount; + } else { + NodeToState[nodeId] = NODE_STATE_UP; + } +} + +bool TNodesStateBase::TryToLockNode(ui32 nodeId, bool isForceRestart) { + Y_VERIFY(NodeToState.contains(nodeId)); + + auto nodeState = NodeToState[nodeId]; + + NCH_LOG_D("Checking Node: " + << nodeId << ", with state: " << ToString(nodeState) + << ", with limit: " << DisabledNodesLimit + << ", with ratio limit: " << DisabledNodesRatioLimit + << ", locked nodes: " << LockedNodesCount + << ", down nodes: " << DownNodesCount); + + // Allow to maintain down/unavailable node + if (nodeState == NODE_STATE_DOWN) { + return true; + } + + if (nodeState == NODE_STATE_RESTART || + nodeState == NODE_STATE_LOCKED || + nodeState == NODE_STATE_UNSPECIFIED) { + + return false; + } + + // Always allow at least one node + if (LockedNodesCount + DownNodesCount == 0) { + return true; + } + + if (isForceRestart && !LockedNodesCount) { + return true; + } + + if (DisabledNodesLimit > 0 && + (LockedNodesCount + DownNodesCount + 1 > DisabledNodesLimit)) { + return false; + } + + if (DisabledNodesRatioLimit > 0 && + ((LockedNodesCount + DownNodesCount + 1) * 100 > + (NodeToState.size() * DisabledNodesRatioLimit))) { + return false; + } + + return true; +} + + +} // namespace NKikimr::NCms diff --git a/ydb/core/cms/node_checkers.h b/ydb/core/cms/node_checkers.h new file mode 100644 index 0000000000..26aec2ca7b --- /dev/null +++ b/ydb/core/cms/node_checkers.h @@ -0,0 +1,117 @@ +#pragma once + +#include "defs.h" + +#include <ydb/core/blobstorage/base/blobstorage_vdiskid.h> +#include <ydb/core/erasure/erasure.h> +#include <ydb/core/protos/cms.pb.h> +#include <ydb/public/api/protos/draft/ydb_maintenance.pb.h> + +#include <library/cpp/actors/core/log.h> + +#include <util/generic/hash.h> +#include <util/generic/hash_set.h> +#include <util/system/compiler.h> +#include <util/system/yassert.h> + +#include <bitset> +#include <sstream> +#include <algorithm> +#include <string> + +namespace NKikimr::NCms { + +class TNodesStateBase { +public: + enum ENodeState : ui32 { + NODE_STATE_UNSPECIFIED /* "Unspecified" */, + NODE_STATE_UP /* "Up" */, + NODE_STATE_LOCKED /* "Locked" */, + NODE_STATE_RESTART /* "Restart" */, + NODE_STATE_DOWN /* "Down" */ + }; + +private: + static ENodeState NodeState(NKikimrCms::EState state); + +protected: + ui32 DisabledNodesLimit; + ui32 DisabledNodesRatioLimit; + + THashMap<ui32, ENodeState> NodeToState; + ui32 LockedNodesCount; + ui32 DownNodesCount; + +public: + TNodesStateBase(ui32 disabledNodesLimit, ui32 disabledNodesRatioLimit) + : DisabledNodesLimit(disabledNodesLimit) + , DisabledNodesRatioLimit(disabledNodesRatioLimit) + , LockedNodesCount(0) + , DownNodesCount(0) + { + } + virtual ~TNodesStateBase() = default; + + void ApplyLimits(ui32 nodesLimit, ui32 ratioLimit) { + DisabledNodesLimit = nodesLimit; + DisabledNodesRatioLimit = ratioLimit; + } + + void AddNode(ui32 nodeId); + void UpdateNode(ui32 nodeId, NKikimrCms::EState); + + void LockNode(ui32 nodeId); + void UnlockNode(ui32 nodeId); + + bool TryToLockNode(ui32 nodeId, bool isForceRestart = false); + + virtual std::string ReadableReason() const = 0; + +}; + +class TTenantState : public TNodesStateBase { +private: + const std::string TenantName; + +public: + TTenantState(const std::string &tenantName, ui32 disabledNodesLimit, ui32 disabledNodesRatioLimit) + : TNodesStateBase(disabledNodesLimit, disabledNodesRatioLimit) + , TenantName(tenantName) + { + } + + std::string ReadableReason() const override final { + std::stringstream reason; + reason << "Too many locked nodes for tenant " << TenantName + << "; locked: " << LockedNodesCount + << "; down: " << DownNodesCount + << "; total: " << NodeToState.size() + << "; limit: " << DisabledNodesLimit + << "; ratio limit: " << DisabledNodesRatioLimit << "%"; + + return reason.str(); + } +}; + + +class TClusterNodesState : public TNodesStateBase { +public: + TClusterNodesState(ui32 disabledNodesLimit, ui32 disabledNodesRatioLimit) + : TNodesStateBase(disabledNodesLimit, disabledNodesRatioLimit) + { + } + + std::string ReadableReason() const override final { + std::stringstream reason; + reason << "Too many locked nodes in cluster" + << "; locked: " << LockedNodesCount + << "; down: " << DownNodesCount + << "; total: " << NodeToState.size() + << "; limit: " << DisabledNodesLimit + << "; ratio limit: " << DisabledNodesRatioLimit << "%"; + + return reason.str(); + } +}; + +} // namespace NKikimr::NCms |