aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authort1mursadykov <t1mursadykov@ydb.tech>2023-04-10 20:11:56 +0300
committert1mursadykov <t1mursadykov@ydb.tech>2023-04-10 20:11:56 +0300
commitd1a0f92efae0cfe199e472566346081c67689e9a (patch)
tree995054a1fb3003c836ea4a9915002e94fd9311f0
parentf8b3a6786752ad2143517db10ad2333ed1eb8a09 (diff)
downloadydb-d1a0f92efae0cfe199e472566346081c67689e9a.tar.gz
Fix long pause in actor system
-rw-r--r--ydb/core/cms/cms.cpp12
-rw-r--r--ydb/core/cms/cms_impl.h4
-rw-r--r--ydb/core/cms/cms_ut.cpp80
-rw-r--r--ydb/core/cms/cms_ut_common.cpp48
-rw-r--r--ydb/core/cms/cms_ut_common.h5
5 files changed, 115 insertions, 34 deletions
diff --git a/ydb/core/cms/cms.cpp b/ydb/core/cms/cms.cpp
index 89a0d0ca532..a5beb840e92 100644
--- a/ydb/core/cms/cms.cpp
+++ b/ydb/core/cms/cms.cpp
@@ -1340,7 +1340,9 @@ void TCms::PersistNodeTenants(TTransactionContext& txc, const TActorContext& ctx
void TCms::ProcessQueue(const TActorContext &ctx)
{
- while (!Queue.empty()) {
+ // To avoid getting stuck in the processing queue for too long,
+ // we'll process queue by one.
+ if (!Queue.empty()) {
TabletCounters->Percentile()[COUNTER_LATENCY_REQUEST_QUEUING].IncrementFor((ctx.Now() - Queue.front().ArrivedTime).MilliSeconds());
TabletCounters->Simple()[COUNTER_REQUESTS_QUEUE_SIZE].Sub(1);
@@ -1348,10 +1350,14 @@ void TCms::ProcessQueue(const TActorContext &ctx)
Queue.pop();
}
- // Process events received while collecting
- if (!NextQueue.empty()) {
+ // Process events received while collecting and processing queue
+ if (Queue.empty() && !NextQueue.empty()) {
StartCollecting(ctx);
}
+
+ if (!Queue.empty()) {
+ ctx.Send(SelfId(), new TEvPrivate::TEvProcessQueue);
+ }
}
void TCms::ProcessRequest(TAutoPtr<IEventHandle> &ev, const TActorContext &ctx)
diff --git a/ydb/core/cms/cms_impl.h b/ydb/core/cms/cms_impl.h
index 97c4f6b3b3a..bade5e96ded 100644
--- a/ydb/core/cms/cms_impl.h
+++ b/ydb/core/cms/cms_impl.h
@@ -41,6 +41,7 @@ public:
EvLogAndSend,
EvCleanupLog,
EvStartCollecting,
+ EvProcessQueue,
EvEnd
};
@@ -70,6 +71,8 @@ public:
};
struct TEvCleanupLog : public TEventLocal<TEvCleanupLog, EvCleanupLog> {};
+
+ struct TEvProcessQueue : public TEventLocal<TEvProcessQueue, EvProcessQueue> {};
};
void PersistNodeTenants(TTransactionContext &txc, const TActorContext &ctx);
@@ -217,6 +220,7 @@ private:
CFunc(TEvPrivate::EvCleanupLog, CleanupLog);
CFunc(TEvPrivate::EvCleanupWalle, CleanupWalleTasks);
CFunc(TEvPrivate::EvStartCollecting, StartCollecting);
+ CFunc(TEvPrivate::EvProcessQueue, ProcessQueue);
FFunc(TEvCms::EvClusterStateRequest, EnqueueRequest);
HFunc(TEvCms::TEvPermissionRequest, CheckAndEnqueueRequest);
HFunc(TEvCms::TEvManageRequestRequest, Handle);
diff --git a/ydb/core/cms/cms_ut.cpp b/ydb/core/cms/cms_ut.cpp
index cf5ab70dff0..4d4b6e79605 100644
--- a/ydb/core/cms/cms_ut.cpp
+++ b/ydb/core/cms/cms_ut.cpp
@@ -1086,13 +1086,13 @@ Y_UNIT_TEST_SUITE(TCmsTest) {
env.CheckPermissionRequest("user", false, false, false, true, MODE_MAX_AVAILABILITY, TStatus::ALLOW,
MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(0), 60000000, "storage"));
-
+
env.CheckPermissionRequest("user", false, false, false, true, MODE_MAX_AVAILABILITY, TStatus::DISALLOW_TEMP,
MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(2), 60000000, "storage"));
}
Y_UNIT_TEST(StateStorageNodesFromOneRing)
- {
+ {
TTestEnvOpts options(5);
options.VDisks = 0;
options.NRings = 2;
@@ -1104,7 +1104,7 @@ Y_UNIT_TEST_SUITE(TCmsTest) {
env.CheckPermissionRequest("user", false, false, false, true, TStatus::ALLOW,
MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(0), 60000000, "storage"));
-
+
env.CheckPermissionRequest("user", false, false, false, true, TStatus::ALLOW,
MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(1), 60000000, "storage"));
}
@@ -1130,13 +1130,13 @@ Y_UNIT_TEST_SUITE(TCmsTest) {
}
Y_UNIT_TEST(StateStorageTwoBrokenRings)
- {
+ {
TTestEnvOpts options(10);
options.VDisks = 0;
options.NRings = 5;
options.RingSize = 2;
options.NToSelect = 5;
-
+
TCmsTestEnv env(options);
TFakeNodeWhiteboardService::Info[env.GetNodeId(1)].Connected = false;
@@ -1146,19 +1146,19 @@ Y_UNIT_TEST_SUITE(TCmsTest) {
env.CheckPermissionRequest("user", false, true, false, true, MODE_KEEP_AVAILABLE, TStatus::ALLOW,
MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(3), 60000000, "storage"));
-
+
env.CheckPermissionRequest("user", false, true, false, true, MODE_MAX_AVAILABILITY, TStatus::DISALLOW_TEMP,
MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(3), 60000000, "storage"));
}
- Y_UNIT_TEST(StateStorageRollingRestart)
+ Y_UNIT_TEST(StateStorageRollingRestart)
{
TTestEnvOpts options(20);
options.VDisks = 0;
options.NRings = 6;
options.RingSize = 3;
options.NToSelect = 6;
-
+
TCmsTestEnv env(options);
TIntrusiveConstPtr<TStateStorageInfo> info = env.GetStateStorageInfo();
@@ -1174,7 +1174,7 @@ Y_UNIT_TEST_SUITE(TCmsTest) {
StateStorageNodes.insert(nodeId);
}
}
-
+
THashSet<TString> restarted;
while(restarted.size() < env.GetNodeCount()) {
@@ -1193,14 +1193,14 @@ Y_UNIT_TEST_SUITE(TCmsTest) {
NKikimrCms::TPermissionResponse res;
- // In the last request comes the permission
+ // In the last request comes the permission
// for all nodes of the same ring
if (env.GetNodeCount() - restarted.size() == options.RingSize) {
res = env.CheckPermissionRequest(event, TStatus::ALLOW);
} else {
res = env.CheckPermissionRequest(event, TStatus::ALLOW_PARTIAL);
}
-
+
ui32 permRing = env.GetNodeCount() + 1;
for (auto& perm : res.GetPermissions()) {
auto &action = perm.GetAction();
@@ -1212,7 +1212,7 @@ Y_UNIT_TEST_SUITE(TCmsTest) {
continue;
}
- // Check that all state storages in permissions
+ // Check that all state storages in permissions
// from the same ring
ui32 curRing = NodeToRing.at(nodeId);
if (permRing >= options.NRings) { // we have not met a state storage yet
@@ -1246,7 +1246,7 @@ Y_UNIT_TEST_SUITE(TCmsTest) {
// Adbance time to run cleanup
env.AdvanceCurrentTime(TDuration::Minutes(3));
env.RestartCms();
-
+
// TODO:: перенести внутрь TCmsTestEnv
TAutoPtr<TEvCms::TEvStoreWalleTask> event_store = new TEvCms::TEvStoreWalleTask;
event_store->Task.TaskId = "walle-test-task-1";
@@ -1263,7 +1263,7 @@ Y_UNIT_TEST_SUITE(TCmsTest) {
TTestEnvOpts opt(6);
opt.VDisks = 0;
- TCmsTestEnv env(opt);
+ TCmsTestEnv env(opt);
env.EnableSysNodeChecking();
@@ -1288,7 +1288,7 @@ Y_UNIT_TEST_SUITE(TCmsTest) {
env.CheckPermissionRequest("user", false, true, false, true, MODE_KEEP_AVAILABLE, TStatus::ALLOW,
MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(4), 60000000, "storage"));
-
+
TFakeNodeWhiteboardService::Info[env.GetNodeId(4)].Connected = false;
env.RestartCms();
@@ -1296,7 +1296,7 @@ Y_UNIT_TEST_SUITE(TCmsTest) {
MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(5), 60000000, "storage"));
}
- Y_UNIT_TEST(Mirror3dcPermissions)
+ Y_UNIT_TEST(Mirror3dcPermissions)
{
TTestEnvOpts options(18);
options.UseMirror3dcErasure = true;
@@ -1307,7 +1307,7 @@ Y_UNIT_TEST_SUITE(TCmsTest) {
env.CheckPermissionRequest("user", false, true, false, true, MODE_MAX_AVAILABILITY, TStatus::ALLOW,
MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(2), 60000000, "storage"));
-
+
TFakeNodeWhiteboardService::Info[env.GetNodeId(1)].Connected = false;
env.RestartCms();
@@ -1321,7 +1321,7 @@ Y_UNIT_TEST_SUITE(TCmsTest) {
env.CheckPermissionRequest("user", false, true, false, true, MODE_KEEP_AVAILABLE, TStatus::DISALLOW_TEMP,
MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(5), 60000000, "storage"));
-
+
// 2dc disabled
TFakeNodeWhiteboardService::Info[env.GetNodeId(7)].Connected = true;
env.RestartCms();
@@ -1332,7 +1332,7 @@ Y_UNIT_TEST_SUITE(TCmsTest) {
env.CheckPermissionRequest("user", false, true, false, true, MODE_KEEP_AVAILABLE, TStatus::ALLOW,
MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(5), 60000000, "storage"));
-
+
TFakeNodeWhiteboardService::Info[env.GetNodeId(5)].Connected = false;
env.RestartCms();
@@ -1368,9 +1368,10 @@ Y_UNIT_TEST_SUITE(TCmsTest) {
MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(4), 60000000, "storage"));
}
- Y_UNIT_TEST(TestTwoOrMoreDisksFromGroupAtTheSameRequestBlock42) {
+ Y_UNIT_TEST(TestTwoOrMoreDisksFromGroupAtTheSameRequestBlock42)
+ {
TCmsTestEnv env(8);
-
+
// It is impossible to get two or more permissions for one group in one request
env.CheckPermissionRequest("user", true, true, false, true, MODE_KEEP_AVAILABLE, TStatus::ALLOW_PARTIAL,
MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(2), 60000000, "storage"),
@@ -1399,8 +1400,9 @@ Y_UNIT_TEST_SUITE(TCmsTest) {
MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(3), 60000000, "storage"),
MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(4), 60000000, "storage"));
}
-
- Y_UNIT_TEST(TestTwoOrMoreDisksFromGroupAtTheSameRequestMirror3dc) {
+
+ Y_UNIT_TEST(TestTwoOrMoreDisksFromGroupAtTheSameRequestMirror3dc)
+ {
TTestEnvOpts options(9);
options.UseMirror3dcErasure = true;
options.VDisks = 9;
@@ -1442,7 +1444,37 @@ Y_UNIT_TEST_SUITE(TCmsTest) {
env.CheckPermissionRequest("user", false, true, false, true, MODE_MAX_AVAILABILITY, TStatus::DISALLOW,
MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(3), 60000000, "storage"),
MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(4), 60000000, "storage"),
- MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(5), 60000000, "storage"));
+ MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(5), 60000000, "storage"));
+ }
+
+ Y_UNIT_TEST(TestProcessingQueue)
+ {
+ const ui32 RequestsCount = 8;
+ TCmsTestEnv env(RequestsCount);
+
+ // We need to send messages in fixed order
+ env.CreateDefaultCmsPipe();
+ env.ProcessQueueCount = 0;
+ for (ui32 i = 0; i < RequestsCount; ++i) {
+ auto req = MakePermissionRequest("user", true, true, false,
+ MakeAction(NKikimrCms::TAction::RESTART_SERVICES, env.GetNodeId(i), 60000000, "storage"));
+ req->Record.SetDuration(600000);
+ req->Record.SetAvailabilityMode(NKikimrCms::MODE_MAX_AVAILABILITY);
+
+ env.SendToCms(req.Release());
+ }
+ env.DestroyDefaultCmsPipe();
+
+ // Check responses order
+ for (ui32 i = 0; i < RequestsCount; ++i) {
+ TAutoPtr<IEventHandle> handle;
+ auto reply = env.GrabEdgeEventRethrow<TEvCms::TEvPermissionResponse>(handle);
+ const auto &rec = reply->Record;
+
+ UNIT_ASSERT_VALUES_EQUAL(rec.permissions_size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(rec.permissions()[0].GetAction().GetHost(), ToString(env.GetNodeId(i)));
+ }
+ UNIT_ASSERT_VALUES_EQUAL(env.ProcessQueueCount, RequestsCount);
}
}
} // NCmsTest
diff --git a/ydb/core/cms/cms_ut_common.cpp b/ydb/core/cms/cms_ut_common.cpp
index 801896088a5..96421cc7a50 100644
--- a/ydb/core/cms/cms_ut_common.cpp
+++ b/ydb/core/cms/cms_ut_common.cpp
@@ -3,6 +3,7 @@
#include "ut_helpers.h"
#include "sentinel.h"
+#include <ydb/core/base/tabletid.h>
#include <ydb/core/blobstorage/crypto/default.h>
#include <ydb/core/mind/bscontroller/bsc.h>
#include <ydb/core/testlib/basics/appdata.h>
@@ -531,6 +532,8 @@ static void SetupServices(TTestActorRuntime &runtime,
TCmsTestEnv::TCmsTestEnv(const TTestEnvOpts &options)
: TTestBasicRuntime(options.NodeCount, options.DataCenterCount, false)
, CmsId(MakeCmsID(0))
+ , ProcessQueueCount(0)
+ , CmsTabletActor(TActorId())
{
TFakeNodeWhiteboardService::Config.MutableResponse()->SetSuccess(true);
TFakeNodeWhiteboardService::Config.MutableResponse()->ClearStatus();
@@ -542,19 +545,39 @@ TCmsTestEnv::TCmsTestEnv(const TTestEnvOpts &options)
GenerateExtendedInfo(*this, config, options.VDisks, 4, options.Tenants, options.UseMirror3dcErasure);
- // Set observer to pass fake base blobstorage config.
- auto redirectConfigRequest = [](TTestActorRuntimeBase&,
+ SetObserverFunc([&ProcessQueueCount = ProcessQueueCount, &CmsTabletActor = CmsTabletActor](TTestActorRuntimeBase&,
TAutoPtr<IEventHandle> &event) -> auto {
if (event->GetTypeRewrite() == TEvBlobStorage::EvControllerConfigRequest
- || event->GetTypeRewrite() == TEvConfigsDispatcher::EvGetConfigRequest
- ) {
+ || event->GetTypeRewrite() == TEvConfigsDispatcher::EvGetConfigRequest) {
auto fakeId = NNodeWhiteboard::MakeNodeWhiteboardServiceId(event->Recipient.NodeId());
if (event->Recipient != fakeId)
event = IEventHandle::Forward(event, fakeId);
}
+
+ if (event->GetTypeRewrite() == TCms::TEvPrivate::EvProcessQueue
+ && event->Recipient == CmsTabletActor) {
+ ++ProcessQueueCount;
+ }
+
+ if (event->GetTypeRewrite() == TCms::TEvPrivate::EvUpdateClusterInfo
+ || event->GetTypeRewrite() == TEvCms::EvClusterStateRequest
+ || event->GetTypeRewrite() == TEvCms::EvNotification
+ || event->GetTypeRewrite() == TEvCms::EvResetMarkerRequest
+ || event->GetTypeRewrite() == TEvCms::EvSetMarkerRequest
+ || event->GetTypeRewrite() == TEvCms::EvGetClusterInfoRequest) {
+ --ProcessQueueCount;
+ }
+
return TTestActorRuntime::EEventAction::PROCESS;
- };
- SetObserverFunc(redirectConfigRequest);
+ });
+
+ SetRegistrationObserverFunc([&CmsTabletActor = CmsTabletActor](TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId) {
+ if (TypeName(*runtime.FindActor(actorId)) == "NKikimr::NCms::TCms") {
+ CmsTabletActor = actorId;
+ }
+
+ runtime.DefaultRegistrationObserver(runtime, parentId, actorId);
+ });
using namespace NMalloc;
TMallocInfo mallocInfo = MallocInfo();
@@ -571,6 +594,7 @@ TCmsTestEnv::TCmsTestEnv(const TTestEnvOpts &options)
SetupServices(*this, options.Tenants);
Sender = AllocateEdgeActor();
+ ClientId = TActorId();
NKikimrCms::TCmsConfig cmsConfig;
cmsConfig.MutableTenantLimits()->SetDisabledNodesRatioLimit(0);
@@ -640,7 +664,17 @@ void TCmsTestEnv::SendRestartCms()
void TCmsTestEnv::SendToCms(IEventBase *event)
{
- SendToPipe(CmsId, Sender, event, 0, GetPipeConfigWithRetries());
+ SendToPipe(CmsId, Sender, event, 0, GetPipeConfigWithRetries(), ClientId);
+}
+
+void TCmsTestEnv::CreateDefaultCmsPipe()
+{
+ ClientId = ConnectToPipe(CmsId, Sender, 0, GetPipeConfigWithRetries());
+}
+
+void TCmsTestEnv::DestroyDefaultCmsPipe()
+{
+ ClosePipe(ClientId, Sender, 0);
}
NKikimrCms::TCmsConfig TCmsTestEnv::GetCmsConfig()
diff --git a/ydb/core/cms/cms_ut_common.h b/ydb/core/cms/cms_ut_common.h
index 24e5db6b6b5..9d4d81b9e5c 100644
--- a/ydb/core/cms/cms_ut_common.h
+++ b/ydb/core/cms/cms_ut_common.h
@@ -117,6 +117,8 @@ public:
void RestartCms();
void SendRestartCms();
void SendToCms(IEventBase *event);
+ void CreateDefaultCmsPipe();
+ void DestroyDefaultCmsPipe();
NKikimrCms::TCmsConfig GetCmsConfig();
void SendCmsConfig(const NKikimrCms::TCmsConfig &config);
@@ -366,6 +368,7 @@ public:
void EnableNoisyBSCPipe();
const ui64 CmsId;
+ i32 ProcessQueueCount;
private:
void SetupLogging();
@@ -383,6 +386,8 @@ private:
void CheckResetMarker(TAutoPtr<NCms::TEvCms::TEvResetMarkerRequest> req, NKikimrCms::TStatus::ECode code);
TActorId Sender;
+ TActorId ClientId;
+ TActorId CmsTabletActor;
};
}