diff options
author | t1mursadykov <t1mursadykov@ydb.tech> | 2023-04-10 20:11:56 +0300 |
---|---|---|
committer | t1mursadykov <t1mursadykov@ydb.tech> | 2023-04-10 20:11:56 +0300 |
commit | d1a0f92efae0cfe199e472566346081c67689e9a (patch) | |
tree | 995054a1fb3003c836ea4a9915002e94fd9311f0 | |
parent | f8b3a6786752ad2143517db10ad2333ed1eb8a09 (diff) | |
download | ydb-d1a0f92efae0cfe199e472566346081c67689e9a.tar.gz |
Fix long pause in actor system
-rw-r--r-- | ydb/core/cms/cms.cpp | 12 | ||||
-rw-r--r-- | ydb/core/cms/cms_impl.h | 4 | ||||
-rw-r--r-- | ydb/core/cms/cms_ut.cpp | 80 | ||||
-rw-r--r-- | ydb/core/cms/cms_ut_common.cpp | 48 | ||||
-rw-r--r-- | ydb/core/cms/cms_ut_common.h | 5 |
5 files changed, 115 insertions, 34 deletions
diff --git a/ydb/core/cms/cms.cpp b/ydb/core/cms/cms.cpp index 89a0d0ca532..a5beb840e92 100644 --- a/ydb/core/cms/cms.cpp +++ b/ydb/core/cms/cms.cpp @@ -1340,7 +1340,9 @@ void TCms::PersistNodeTenants(TTransactionContext& txc, const TActorContext& ctx void TCms::ProcessQueue(const TActorContext &ctx) { - while (!Queue.empty()) { + // To avoid getting stuck in the processing queue for too long, + // we'll process queue by one. + if (!Queue.empty()) { TabletCounters->Percentile()[COUNTER_LATENCY_REQUEST_QUEUING].IncrementFor((ctx.Now() - Queue.front().ArrivedTime).MilliSeconds()); TabletCounters->Simple()[COUNTER_REQUESTS_QUEUE_SIZE].Sub(1); @@ -1348,10 +1350,14 @@ void TCms::ProcessQueue(const TActorContext &ctx) Queue.pop(); } - // Process events received while collecting - if (!NextQueue.empty()) { + // Process events received while collecting and processing queue + if (Queue.empty() && !NextQueue.empty()) { StartCollecting(ctx); } + + if (!Queue.empty()) { + ctx.Send(SelfId(), new TEvPrivate::TEvProcessQueue); + } } void TCms::ProcessRequest(TAutoPtr<IEventHandle> &ev, const TActorContext &ctx) diff --git a/ydb/core/cms/cms_impl.h b/ydb/core/cms/cms_impl.h index 97c4f6b3b3a..bade5e96ded 100644 --- a/ydb/core/cms/cms_impl.h +++ b/ydb/core/cms/cms_impl.h @@ -41,6 +41,7 @@ public: EvLogAndSend, EvCleanupLog, EvStartCollecting, + EvProcessQueue, EvEnd }; @@ -70,6 +71,8 @@ public: }; struct TEvCleanupLog : public TEventLocal<TEvCleanupLog, EvCleanupLog> {}; + + struct TEvProcessQueue : public TEventLocal<TEvProcessQueue, EvProcessQueue> {}; }; void PersistNodeTenants(TTransactionContext &txc, const TActorContext &ctx); @@ -217,6 +220,7 @@ private: CFunc(TEvPrivate::EvCleanupLog, CleanupLog); CFunc(TEvPrivate::EvCleanupWalle, CleanupWalleTasks); CFunc(TEvPrivate::EvStartCollecting, StartCollecting); + CFunc(TEvPrivate::EvProcessQueue, ProcessQueue); FFunc(TEvCms::EvClusterStateRequest, EnqueueRequest); HFunc(TEvCms::TEvPermissionRequest, CheckAndEnqueueRequest); HFunc(TEvCms::TEvManageRequestRequest, Handle); diff --git a/ydb/core/cms/cms_ut.cpp b/ydb/core/cms/cms_ut.cpp index cf5ab70dff0..4d4b6e79605 100644 --- a/ydb/core/cms/cms_ut.cpp +++ b/ydb/core/cms/cms_ut.cpp @@ -1086,13 +1086,13 @@ Y_UNIT_TEST_SUITE(TCmsTest) { env.CheckPermissionRequest("user", false, false, false, true, MODE_MAX_AVAILABILITY, TStatus::ALLOW, MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(0), 60000000, "storage")); - + env.CheckPermissionRequest("user", false, false, false, true, MODE_MAX_AVAILABILITY, TStatus::DISALLOW_TEMP, MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(2), 60000000, "storage")); } Y_UNIT_TEST(StateStorageNodesFromOneRing) - { + { TTestEnvOpts options(5); options.VDisks = 0; options.NRings = 2; @@ -1104,7 +1104,7 @@ Y_UNIT_TEST_SUITE(TCmsTest) { env.CheckPermissionRequest("user", false, false, false, true, TStatus::ALLOW, MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(0), 60000000, "storage")); - + env.CheckPermissionRequest("user", false, false, false, true, TStatus::ALLOW, MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(1), 60000000, "storage")); } @@ -1130,13 +1130,13 @@ Y_UNIT_TEST_SUITE(TCmsTest) { } Y_UNIT_TEST(StateStorageTwoBrokenRings) - { + { TTestEnvOpts options(10); options.VDisks = 0; options.NRings = 5; options.RingSize = 2; options.NToSelect = 5; - + TCmsTestEnv env(options); TFakeNodeWhiteboardService::Info[env.GetNodeId(1)].Connected = false; @@ -1146,19 +1146,19 @@ Y_UNIT_TEST_SUITE(TCmsTest) { env.CheckPermissionRequest("user", false, true, false, true, MODE_KEEP_AVAILABLE, TStatus::ALLOW, MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(3), 60000000, "storage")); - + env.CheckPermissionRequest("user", false, true, false, true, MODE_MAX_AVAILABILITY, TStatus::DISALLOW_TEMP, MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(3), 60000000, "storage")); } - Y_UNIT_TEST(StateStorageRollingRestart) + Y_UNIT_TEST(StateStorageRollingRestart) { TTestEnvOpts options(20); options.VDisks = 0; options.NRings = 6; options.RingSize = 3; options.NToSelect = 6; - + TCmsTestEnv env(options); TIntrusiveConstPtr<TStateStorageInfo> info = env.GetStateStorageInfo(); @@ -1174,7 +1174,7 @@ Y_UNIT_TEST_SUITE(TCmsTest) { StateStorageNodes.insert(nodeId); } } - + THashSet<TString> restarted; while(restarted.size() < env.GetNodeCount()) { @@ -1193,14 +1193,14 @@ Y_UNIT_TEST_SUITE(TCmsTest) { NKikimrCms::TPermissionResponse res; - // In the last request comes the permission + // In the last request comes the permission // for all nodes of the same ring if (env.GetNodeCount() - restarted.size() == options.RingSize) { res = env.CheckPermissionRequest(event, TStatus::ALLOW); } else { res = env.CheckPermissionRequest(event, TStatus::ALLOW_PARTIAL); } - + ui32 permRing = env.GetNodeCount() + 1; for (auto& perm : res.GetPermissions()) { auto &action = perm.GetAction(); @@ -1212,7 +1212,7 @@ Y_UNIT_TEST_SUITE(TCmsTest) { continue; } - // Check that all state storages in permissions + // Check that all state storages in permissions // from the same ring ui32 curRing = NodeToRing.at(nodeId); if (permRing >= options.NRings) { // we have not met a state storage yet @@ -1246,7 +1246,7 @@ Y_UNIT_TEST_SUITE(TCmsTest) { // Adbance time to run cleanup env.AdvanceCurrentTime(TDuration::Minutes(3)); env.RestartCms(); - + // TODO:: перенести внутрь TCmsTestEnv TAutoPtr<TEvCms::TEvStoreWalleTask> event_store = new TEvCms::TEvStoreWalleTask; event_store->Task.TaskId = "walle-test-task-1"; @@ -1263,7 +1263,7 @@ Y_UNIT_TEST_SUITE(TCmsTest) { TTestEnvOpts opt(6); opt.VDisks = 0; - TCmsTestEnv env(opt); + TCmsTestEnv env(opt); env.EnableSysNodeChecking(); @@ -1288,7 +1288,7 @@ Y_UNIT_TEST_SUITE(TCmsTest) { env.CheckPermissionRequest("user", false, true, false, true, MODE_KEEP_AVAILABLE, TStatus::ALLOW, MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(4), 60000000, "storage")); - + TFakeNodeWhiteboardService::Info[env.GetNodeId(4)].Connected = false; env.RestartCms(); @@ -1296,7 +1296,7 @@ Y_UNIT_TEST_SUITE(TCmsTest) { MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(5), 60000000, "storage")); } - Y_UNIT_TEST(Mirror3dcPermissions) + Y_UNIT_TEST(Mirror3dcPermissions) { TTestEnvOpts options(18); options.UseMirror3dcErasure = true; @@ -1307,7 +1307,7 @@ Y_UNIT_TEST_SUITE(TCmsTest) { env.CheckPermissionRequest("user", false, true, false, true, MODE_MAX_AVAILABILITY, TStatus::ALLOW, MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(2), 60000000, "storage")); - + TFakeNodeWhiteboardService::Info[env.GetNodeId(1)].Connected = false; env.RestartCms(); @@ -1321,7 +1321,7 @@ Y_UNIT_TEST_SUITE(TCmsTest) { env.CheckPermissionRequest("user", false, true, false, true, MODE_KEEP_AVAILABLE, TStatus::DISALLOW_TEMP, MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(5), 60000000, "storage")); - + // 2dc disabled TFakeNodeWhiteboardService::Info[env.GetNodeId(7)].Connected = true; env.RestartCms(); @@ -1332,7 +1332,7 @@ Y_UNIT_TEST_SUITE(TCmsTest) { env.CheckPermissionRequest("user", false, true, false, true, MODE_KEEP_AVAILABLE, TStatus::ALLOW, MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(5), 60000000, "storage")); - + TFakeNodeWhiteboardService::Info[env.GetNodeId(5)].Connected = false; env.RestartCms(); @@ -1368,9 +1368,10 @@ Y_UNIT_TEST_SUITE(TCmsTest) { MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(4), 60000000, "storage")); } - Y_UNIT_TEST(TestTwoOrMoreDisksFromGroupAtTheSameRequestBlock42) { + Y_UNIT_TEST(TestTwoOrMoreDisksFromGroupAtTheSameRequestBlock42) + { TCmsTestEnv env(8); - + // It is impossible to get two or more permissions for one group in one request env.CheckPermissionRequest("user", true, true, false, true, MODE_KEEP_AVAILABLE, TStatus::ALLOW_PARTIAL, MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(2), 60000000, "storage"), @@ -1399,8 +1400,9 @@ Y_UNIT_TEST_SUITE(TCmsTest) { MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(3), 60000000, "storage"), MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(4), 60000000, "storage")); } - - Y_UNIT_TEST(TestTwoOrMoreDisksFromGroupAtTheSameRequestMirror3dc) { + + Y_UNIT_TEST(TestTwoOrMoreDisksFromGroupAtTheSameRequestMirror3dc) + { TTestEnvOpts options(9); options.UseMirror3dcErasure = true; options.VDisks = 9; @@ -1442,7 +1444,37 @@ Y_UNIT_TEST_SUITE(TCmsTest) { env.CheckPermissionRequest("user", false, true, false, true, MODE_MAX_AVAILABILITY, TStatus::DISALLOW, MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(3), 60000000, "storage"), MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(4), 60000000, "storage"), - MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(5), 60000000, "storage")); + MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(5), 60000000, "storage")); + } + + Y_UNIT_TEST(TestProcessingQueue) + { + const ui32 RequestsCount = 8; + TCmsTestEnv env(RequestsCount); + + // We need to send messages in fixed order + env.CreateDefaultCmsPipe(); + env.ProcessQueueCount = 0; + for (ui32 i = 0; i < RequestsCount; ++i) { + auto req = MakePermissionRequest("user", true, true, false, + MakeAction(NKikimrCms::TAction::RESTART_SERVICES, env.GetNodeId(i), 60000000, "storage")); + req->Record.SetDuration(600000); + req->Record.SetAvailabilityMode(NKikimrCms::MODE_MAX_AVAILABILITY); + + env.SendToCms(req.Release()); + } + env.DestroyDefaultCmsPipe(); + + // Check responses order + for (ui32 i = 0; i < RequestsCount; ++i) { + TAutoPtr<IEventHandle> handle; + auto reply = env.GrabEdgeEventRethrow<TEvCms::TEvPermissionResponse>(handle); + const auto &rec = reply->Record; + + UNIT_ASSERT_VALUES_EQUAL(rec.permissions_size(), 1); + UNIT_ASSERT_VALUES_EQUAL(rec.permissions()[0].GetAction().GetHost(), ToString(env.GetNodeId(i))); + } + UNIT_ASSERT_VALUES_EQUAL(env.ProcessQueueCount, RequestsCount); } } } // NCmsTest diff --git a/ydb/core/cms/cms_ut_common.cpp b/ydb/core/cms/cms_ut_common.cpp index 801896088a5..96421cc7a50 100644 --- a/ydb/core/cms/cms_ut_common.cpp +++ b/ydb/core/cms/cms_ut_common.cpp @@ -3,6 +3,7 @@ #include "ut_helpers.h" #include "sentinel.h" +#include <ydb/core/base/tabletid.h> #include <ydb/core/blobstorage/crypto/default.h> #include <ydb/core/mind/bscontroller/bsc.h> #include <ydb/core/testlib/basics/appdata.h> @@ -531,6 +532,8 @@ static void SetupServices(TTestActorRuntime &runtime, TCmsTestEnv::TCmsTestEnv(const TTestEnvOpts &options) : TTestBasicRuntime(options.NodeCount, options.DataCenterCount, false) , CmsId(MakeCmsID(0)) + , ProcessQueueCount(0) + , CmsTabletActor(TActorId()) { TFakeNodeWhiteboardService::Config.MutableResponse()->SetSuccess(true); TFakeNodeWhiteboardService::Config.MutableResponse()->ClearStatus(); @@ -542,19 +545,39 @@ TCmsTestEnv::TCmsTestEnv(const TTestEnvOpts &options) GenerateExtendedInfo(*this, config, options.VDisks, 4, options.Tenants, options.UseMirror3dcErasure); - // Set observer to pass fake base blobstorage config. - auto redirectConfigRequest = [](TTestActorRuntimeBase&, + SetObserverFunc([&ProcessQueueCount = ProcessQueueCount, &CmsTabletActor = CmsTabletActor](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &event) -> auto { if (event->GetTypeRewrite() == TEvBlobStorage::EvControllerConfigRequest - || event->GetTypeRewrite() == TEvConfigsDispatcher::EvGetConfigRequest - ) { + || event->GetTypeRewrite() == TEvConfigsDispatcher::EvGetConfigRequest) { auto fakeId = NNodeWhiteboard::MakeNodeWhiteboardServiceId(event->Recipient.NodeId()); if (event->Recipient != fakeId) event = IEventHandle::Forward(event, fakeId); } + + if (event->GetTypeRewrite() == TCms::TEvPrivate::EvProcessQueue + && event->Recipient == CmsTabletActor) { + ++ProcessQueueCount; + } + + if (event->GetTypeRewrite() == TCms::TEvPrivate::EvUpdateClusterInfo + || event->GetTypeRewrite() == TEvCms::EvClusterStateRequest + || event->GetTypeRewrite() == TEvCms::EvNotification + || event->GetTypeRewrite() == TEvCms::EvResetMarkerRequest + || event->GetTypeRewrite() == TEvCms::EvSetMarkerRequest + || event->GetTypeRewrite() == TEvCms::EvGetClusterInfoRequest) { + --ProcessQueueCount; + } + return TTestActorRuntime::EEventAction::PROCESS; - }; - SetObserverFunc(redirectConfigRequest); + }); + + SetRegistrationObserverFunc([&CmsTabletActor = CmsTabletActor](TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId) { + if (TypeName(*runtime.FindActor(actorId)) == "NKikimr::NCms::TCms") { + CmsTabletActor = actorId; + } + + runtime.DefaultRegistrationObserver(runtime, parentId, actorId); + }); using namespace NMalloc; TMallocInfo mallocInfo = MallocInfo(); @@ -571,6 +594,7 @@ TCmsTestEnv::TCmsTestEnv(const TTestEnvOpts &options) SetupServices(*this, options.Tenants); Sender = AllocateEdgeActor(); + ClientId = TActorId(); NKikimrCms::TCmsConfig cmsConfig; cmsConfig.MutableTenantLimits()->SetDisabledNodesRatioLimit(0); @@ -640,7 +664,17 @@ void TCmsTestEnv::SendRestartCms() void TCmsTestEnv::SendToCms(IEventBase *event) { - SendToPipe(CmsId, Sender, event, 0, GetPipeConfigWithRetries()); + SendToPipe(CmsId, Sender, event, 0, GetPipeConfigWithRetries(), ClientId); +} + +void TCmsTestEnv::CreateDefaultCmsPipe() +{ + ClientId = ConnectToPipe(CmsId, Sender, 0, GetPipeConfigWithRetries()); +} + +void TCmsTestEnv::DestroyDefaultCmsPipe() +{ + ClosePipe(ClientId, Sender, 0); } NKikimrCms::TCmsConfig TCmsTestEnv::GetCmsConfig() diff --git a/ydb/core/cms/cms_ut_common.h b/ydb/core/cms/cms_ut_common.h index 24e5db6b6b5..9d4d81b9e5c 100644 --- a/ydb/core/cms/cms_ut_common.h +++ b/ydb/core/cms/cms_ut_common.h @@ -117,6 +117,8 @@ public: void RestartCms(); void SendRestartCms(); void SendToCms(IEventBase *event); + void CreateDefaultCmsPipe(); + void DestroyDefaultCmsPipe(); NKikimrCms::TCmsConfig GetCmsConfig(); void SendCmsConfig(const NKikimrCms::TCmsConfig &config); @@ -366,6 +368,7 @@ public: void EnableNoisyBSCPipe(); const ui64 CmsId; + i32 ProcessQueueCount; private: void SetupLogging(); @@ -383,6 +386,8 @@ private: void CheckResetMarker(TAutoPtr<NCms::TEvCms::TEvResetMarkerRequest> req, NKikimrCms::TStatus::ECode code); TActorId Sender; + TActorId ClientId; + TActorId CmsTabletActor; }; } |