aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorinnokentii <innokentii@yandex-team.com>2022-11-29 19:01:47 +0300
committerinnokentii <innokentii@yandex-team.com>2022-11-29 19:01:47 +0300
commit6074addcd7dca29be21ea14353f957eb8fd5a9a2 (patch)
tree6ac7968d97a7ed41ba555bd055949e8200a2841b
parent94dfc801249f4120d026f7f52e04123af5e887d2 (diff)
downloadydb-6074addcd7dca29be21ea14353f957eb8fd5a9a2.tar.gz
Implement batching in sentinel,
implement batching in sentinel add batching to sentinel
-rw-r--r--ydb/core/cms/cms_ut_common.cpp38
-rw-r--r--ydb/core/cms/cms_ut_common.h15
-rw-r--r--ydb/core/cms/config.h2
-rw-r--r--ydb/core/cms/sentinel.cpp296
-rw-r--r--ydb/core/cms/sentinel.h2
-rw-r--r--ydb/core/cms/sentinel_impl.h13
-rw-r--r--ydb/core/cms/sentinel_ut.cpp97
7 files changed, 245 insertions, 218 deletions
diff --git a/ydb/core/cms/cms_ut_common.cpp b/ydb/core/cms/cms_ut_common.cpp
index 98b6eb3bbe4..5e45421a565 100644
--- a/ydb/core/cms/cms_ut_common.cpp
+++ b/ydb/core/cms/cms_ut_common.cpp
@@ -1,6 +1,7 @@
#include "cms_impl.h"
#include "cms_ut_common.h"
#include "ut_helpers.h"
+#include "sentinel.h"
#include <ydb/core/blobstorage/crypto/default.h>
#include <ydb/core/mind/bscontroller/bsc.h>
@@ -66,9 +67,25 @@ void TFakeNodeWhiteboardService::Handle(TEvBlobStorage::TEvControllerConfigReque
driveStatus.MutableHostKey()->SetIcPort(drive.GetHostKey().GetIcPort());
driveStatus.SetPath(drive.GetPath());
driveStatus.SetStatus(NKikimrBlobStorage::ACTIVE);
- } else if (rec.GetRequest().CommandSize() && rec.GetRequest().GetCommand(0).HasUpdateDriveStatus()) {
- resp->Record.MutableResponse()->AddStatus()->SetSuccess(true);
- resp->Record.MutableResponse()->SetSuccess(true);
+ } else if (rec.GetRequest().CommandSize() && rec.GetRequest().GetCommand(0).HasUpdateDriveStatus()) { // assume that all commands are UpdateDriveStatus
+ if (NoisyBSCPipe && ++NoisyBSCPipeCounter % 3) {
+ ctx.Send(ev->Sender, new TEvSentinel::TEvBSCPipeDisconnected, 0);
+ delete resp;
+ return;
+ }
+ bool success = true;
+ for (ui32 i = 0; i < rec.GetRequest().CommandSize(); ++i) {
+ auto cmd = rec.GetRequest().GetCommand(i).GetUpdateDriveStatus();
+ auto id = NCms::TPDiskID(cmd.GetHostKey().GetNodeId(), cmd.GetPDiskId());
+ if (auto& pattern = BSControllerResponsePatterns[id]; !pattern.empty() && !pattern[0]) {
+ success = false;
+ pattern.erase(pattern.begin());
+ resp->Record.MutableResponse()->AddStatus()->SetSuccess(false);
+ } else {
+ resp->Record.MutableResponse()->AddStatus()->SetSuccess(true);
+ }
+ }
+ resp->Record.MutableResponse()->SetSuccess(success);
}
ctx.Send(ev->Sender, resp, 0, ev->Cookie);
}
@@ -166,11 +183,6 @@ void TFakeNodeWhiteboardService::Handle(TEvWhiteboard::TEvSystemStateRequest::TP
ctx.Send(ev->Sender, response.Release(), 0, ev->Cookie);
}
-NKikimrBlobStorage::TEvControllerConfigResponse TFakeNodeWhiteboardService::Config;
-THashMap<ui32, TFakeNodeInfo> TFakeNodeWhiteboardService::Info;
-TMutex TFakeNodeWhiteboardService::Mutex;
-NKikimrConfig::TBootstrap TFakeNodeWhiteboardService::BootstrapConfig;
-
namespace {
struct TFakeNodeInfo {
@@ -1129,6 +1141,16 @@ NKikimrCms::TGetLogTailResponse TCmsTestEnv::GetLogTail(ui32 type,
return rec;
}
+void TCmsTestEnv::AddBSCFailures(const NCms::TPDiskID& id, TVector<bool> failuresPattern) {
+ TGuard<TMutex> guard(TFakeNodeWhiteboardService::Mutex);
+ auto& vec = TFakeNodeWhiteboardService::BSControllerResponsePatterns[id];
+ vec.insert(vec.end(), failuresPattern.begin(), failuresPattern.end());
+}
+
+void TCmsTestEnv::EnableNoisyBSCPipe() {
+ TGuard<TMutex> guard(TFakeNodeWhiteboardService::Mutex);
+ TFakeNodeWhiteboardService::NoisyBSCPipe = true;
+}
} // namespace NCmsTest
} // namespace NKikimr
diff --git a/ydb/core/cms/cms_ut_common.h b/ydb/core/cms/cms_ut_common.h
index 5ec1fdaec27..69fcf0fa222 100644
--- a/ydb/core/cms/cms_ut_common.h
+++ b/ydb/core/cms/cms_ut_common.h
@@ -40,10 +40,13 @@ class TFakeNodeWhiteboardService : public TActorBootstrapped<TFakeNodeWhiteboard
public:
using TEvWhiteboard = NNodeWhiteboard::TEvWhiteboard;
- static NKikimrBlobStorage::TEvControllerConfigResponse Config;
- static NKikimrConfig::TBootstrap BootstrapConfig;
- static THashMap<ui32, TFakeNodeInfo> Info;
- static TMutex Mutex;
+ static inline NKikimrBlobStorage::TEvControllerConfigResponse Config;
+ static inline NKikimrConfig::TBootstrap BootstrapConfig;
+ static inline THashMap<ui32, TFakeNodeInfo> Info;
+ static inline THashMap<NCms::TPDiskID, TVector<bool>, NCms::TPDiskIDHash> BSControllerResponsePatterns;
+ static inline bool NoisyBSCPipe = false;
+ static inline ui32 NoisyBSCPipeCounter = 0;
+ static inline TMutex Mutex;
void Bootstrap(const TActorContext &ctx)
{
@@ -382,6 +385,10 @@ public:
ui32 limit = 100,
ui32 offset = 0);
+ void AddBSCFailures(const NCms::TPDiskID& id, TVector<bool> failuresPattern);
+
+ void EnableNoisyBSCPipe();
+
const ui64 CmsId;
private:
diff --git a/ydb/core/cms/config.h b/ydb/core/cms/config.h
index 5e865a91eb2..301a5d20ec2 100644
--- a/ydb/core/cms/config.h
+++ b/ydb/core/cms/config.h
@@ -24,6 +24,8 @@ struct TCmsSentinelConfig {
TDuration RetryChangeStatus;
ui32 ChangeStatusRetries;
+ TDuration BSCBatchTimeout;
+
ui32 DefaultStateLimit;
TMap<EPDiskState, ui32> StateLimits;
diff --git a/ydb/core/cms/sentinel.cpp b/ydb/core/cms/sentinel.cpp
index 7b7492cbf2e..dca83597ec9 100644
--- a/ydb/core/cms/sentinel.cpp
+++ b/ydb/core/cms/sentinel.cpp
@@ -4,7 +4,6 @@
#include <ydb/core/base/appdata.h>
#include <ydb/core/base/counters.h>
-#include <ydb/core/base/tablet_pipe.h>
#include <ydb/core/blobstorage/base/blobstorage_events.h>
#include <ydb/core/node_whiteboard/node_whiteboard.h>
#include <ydb/core/protos/services.pb.h>
@@ -280,17 +279,6 @@ TClusterMap::TPDiskIDSet TGuardian::GetAllowedPDisks(const TClusterMap& all, TSt
template <typename TDerived>
class TSentinelChildBase: public TActorBootstrapped<TDerived> {
-protected:
- void ConnectBSC() {
- auto domains = AppData()->DomainsInfo;
- const ui32 domainUid = domains->GetDomainUidByTabletId(CmsState->CmsTabletId);
- const ui64 bscId = MakeBSControllerID(domains->GetDefaultStateStorageGroup(domainUid));
-
- NTabletPipe::TClientConfig config;
- config.RetryPolicy = NTabletPipe::TClientRetryPolicy::WithRetries();
- CmsState->BSControllerPipe = this->Register(NTabletPipe::CreateClient(CmsState->CmsActorId, bscId, config));
- }
-
public:
using TBase = TSentinelChildBase<TDerived>;
@@ -375,7 +363,7 @@ class TConfigUpdater: public TUpdaterBase<TEvSentinel::TEvConfigUpdated, TConfig
<< ": attempt# " << SentinelState->ConfigUpdaterState.BSCAttempt);
if (!CmsState->BSControllerPipe) {
- ConnectBSC();
+ CmsState->BSControllerPipe = this->Register(CreateBSCClientActor(CmsState));
}
auto request = MakeHolder<TEvBlobStorage::TEvControllerConfigRequest>();
@@ -696,115 +684,8 @@ public:
}; // TStateUpdater
-class TStatusChanger: public TSentinelChildBase<TStatusChanger> {
- void Reply(bool success = true) {
- Send(Parent, new TEvSentinel::TEvStatusChanged(Id, success));
- PassAway();
- }
-
- void MaybeRetry() {
- if (Info->StatusChangerState->Attempt++ < Config.ChangeStatusRetries) {
- Schedule(Config.RetryChangeStatus, new TEvSentinel::TEvRetry());
- } else {
- Reply(false);
- }
- }
-
- void RequestStatusChange() {
- LOG_D("Change pdisk status"
- << ": pdiskId# " << Id
- << ", status# " << Info->StatusChangerState->Status
- << ", attempt# " << Info->StatusChangerState->Attempt);
-
- if (!CmsState->BSControllerPipe) {
- ConnectBSC();
- }
-
- auto request = MakeHolder<TEvBlobStorage::TEvControllerConfigRequest>();
- auto& command = *request->Record.MutableRequest()->AddCommand()->MutableUpdateDriveStatus();
- command.MutableHostKey()->SetNodeId(Id.NodeId);
- command.SetPDiskId(Id.DiskId);
- command.SetStatus(Info->StatusChangerState->Status);
- NTabletPipe::SendData(SelfId(), CmsState->BSControllerPipe, request.Release());
- }
-
- void Handle(TEvBlobStorage::TEvControllerConfigResponse::TPtr& ev) {
- const auto& response = ev->Get()->Record.GetResponse();
-
- LOG_D("Handle TEvBlobStorage::TEvControllerConfigResponse"
- << ": response# " << response.ShortDebugString());
-
- if (!response.GetSuccess() || !response.StatusSize() || !response.GetStatus(0).GetSuccess()) {
- TString error = "<no description>";
- if (response.StatusSize()) {
- error = response.GetStatus(0).GetErrorDescription();
- }
-
- LOG_E("Unsuccesful response from BSC"
- << ", size# " << response.StatusSize()
- << ", error# " << error);
- MaybeRetry();
- } else {
- Reply();
- }
- }
-
- void OnPipeDisconnected() {
- LOG_E("Pipe to BSC disconnected");
- MaybeRetry();
- }
-
-public:
- static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
- return NKikimrServices::TActivity::CMS_SENTINEL_STATUS_CHANGER_ACTOR;
- }
-
- void PassAway() override {
- Info->LastStatusChange = Now();
- Info->PrevStatusChangerState = Info->StatusChangerState;
- Info->StatusChangerState.Reset();
- TActor::PassAway();
- }
-
- static TStringBuf Name() {
- return "StatusChanger"sv;
- }
-
- explicit TStatusChanger(
- const TActorId& parent,
- TCmsStatePtr state,
- const TPDiskID& id,
- TPDiskInfo::TPtr info,
- NKikimrBlobStorage::EDriveStatus status)
- : TBase(parent, state)
- , Id(id)
- , Info(info)
- {
- info->StatusChangerState = new TStatusChangerState(status);
- }
-
- void Bootstrap() {
- RequestStatusChange();
- Become(&TThis::StateWork);
- }
-
- STATEFN(StateWork) {
- switch (ev->GetTypeRewrite()) {
- sFunc(TEvSentinel::TEvRetry, RequestStatusChange);
- sFunc(TEvSentinel::TEvBSCPipeDisconnected, OnPipeDisconnected);
-
- hFunc(TEvBlobStorage::TEvControllerConfigResponse, Handle);
-
- sFunc(TEvents::TEvPoisonPill, PassAway);
- }
- }
-
-private:
- const TPDiskID Id;
- TPDiskInfo::TPtr Info;
-}; // TStatusChanger
-
class TSentinel: public TActorBootstrapped<TSentinel> {
+
struct TCounters {
using TDynamicCounters = ::NMonitoring::TDynamicCounters;
using TDynamicCounterPtr = ::NMonitoring::TDynamicCounterPtr;
@@ -994,6 +875,10 @@ class TSentinel: public TActorBootstrapped<TSentinel> {
TClusterMap::TPDiskIDSet allowed = changed.GetAllowedPDisks(all, issues, disallowed);
std::move(alwaysAllowed.begin(), alwaysAllowed.end(), std::inserter(allowed, allowed.begin()));
+ // we ignore all previous unhandled requests
+ // usually it will never happen with correct config
+ SentinelState->ChangeRequests.clear();
+
for (const auto& id : allowed) {
Y_VERIFY(SentinelState->PDisks.contains(id));
TPDiskInfo::TPtr info = SentinelState->PDisks.at(id);
@@ -1005,10 +890,6 @@ class TSentinel: public TActorBootstrapped<TSentinel> {
continue;
}
- if (info->StatusChanger) {
- continue;
- }
-
const EPDiskStatus status = info->GetStatus();
TString reason;
info->ApplyChanges(reason);
@@ -1023,7 +904,7 @@ class TSentinel: public TActorBootstrapped<TSentinel> {
LogStatusChange(id, status, requiredStatus, reason);
if (!Config.DryRun) {
- info->StatusChanger = RegisterWithSameMailbox(new TStatusChanger(SelfId(), CmsState, id, info, requiredStatus));
+ SentinelState->ChangeRequests.emplace(id, info);
(*Counters->PDisksPendingChange)++;
}
}
@@ -1042,6 +923,30 @@ class TSentinel: public TActorBootstrapped<TSentinel> {
ScheduleUpdate<TEvSentinel::TEvUpdateState, TConfigUpdater>(
StateUpdater, Config.UpdateStateInterval, ConfigUpdater
);
+
+ SendBSCRequests();
+ }
+
+ void SendBSCRequests() {
+ if (SentinelState->ChangeRequests.empty()) {
+ return;
+ }
+
+ if (!CmsState->BSControllerPipe) {
+ CmsState->BSControllerPipe = this->Register(CreateBSCClientActor(CmsState));
+ }
+
+ LOG_D("Change pdisk status"
+ << ": requestsSize# " << SentinelState->ChangeRequests.size());
+
+ auto request = MakeHolder<TEvBlobStorage::TEvControllerConfigRequest>();
+ for (auto& [id, info] : SentinelState->ChangeRequests) {
+ auto& command = *request->Record.MutableRequest()->AddCommand()->MutableUpdateDriveStatus();
+ command.MutableHostKey()->SetNodeId(id.NodeId);
+ command.SetPDiskId(id.DiskId);
+ command.SetStatus(info->GetStatus());
+ }
+ NTabletPipe::SendData(SelfId(), CmsState->BSControllerPipe, request.Release(), ++SentinelState->ChangeRequestId);
}
void Handle(TEvCms::TEvGetSentinelStateRequest::TPtr& ev) {
@@ -1079,15 +984,10 @@ class TSentinel: public TActorBootstrapped<TSentinel> {
switch(filter) {
case NKikimrCms::TGetSentinelStateRequest::UNHEALTHY:
return info.GetState() != NKikimrBlobStorage::TPDiskState::Normal
- || info.ActualStatus != EPDiskStatus::ACTIVE
- || info.GetStatus() != EPDiskStatus::ACTIVE
- || info.StatusChangeFailed;
+ || info.GetStatus() != EPDiskStatus::ACTIVE;
case NKikimrCms::TGetSentinelStateRequest::SUSPICIOUS:
return info.GetState() != NKikimrBlobStorage::TPDiskState::Normal
- || info.ActualStatus != EPDiskStatus::ACTIVE
|| info.GetStatus() != EPDiskStatus::ACTIVE
- || info.StatusChangeFailed
- || info.StatusChangerState
|| !info.IsTouched()
|| !info.IsChangingAllowed();
default:
@@ -1133,18 +1033,14 @@ class TSentinel: public TActorBootstrapped<TSentinel> {
entry.MutableInfo()->SetStateCounter(info->GetStateCounter());
entry.MutableInfo()->SetStatus(info->ActualStatus);
entry.MutableInfo()->SetDesiredStatus(info->GetStatus());
+ entry.MutableInfo()->SetPrevDesiredStatus(info->PrevStatus);
+ entry.MutableInfo()->SetStatusChangeAttempts(info->StatusChangeAttempt);
+ entry.MutableInfo()->SetPrevStatusChangeAttempts(info->PrevStatusChangeAttempt);
entry.MutableInfo()->SetChangingAllowed(info->IsChangingAllowed());
entry.MutableInfo()->SetTouched(info->IsTouched());
entry.MutableInfo()->SetLastStatusChange(info->LastStatusChange.ToString());
- entry.MutableInfo()->SetStatusChangeFailed(info->StatusChangeFailed);
- if (info->StatusChangerState) {
- entry.MutableInfo()->SetStatusChangeAttempts(info->StatusChangerState->Attempt);
- }
- if (info->PrevStatusChangerState) {
- entry.MutableInfo()->SetPrevDesiredStatus(info->PrevStatusChangerState->Status);
- entry.MutableInfo()->SetPrevStatusChangeAttempts(info->PrevStatusChangerState->Attempt);
- }
entry.MutableInfo()->SetIgnoreReason(info->IgnoreReason);
+ entry.MutableInfo()->SetStatusChangeFailed(info->StatusChangeFailed);
}
}
}
@@ -1152,35 +1048,92 @@ class TSentinel: public TActorBootstrapped<TSentinel> {
Send(ev->Sender, std::move(response));
}
- void Handle(TEvSentinel::TEvStatusChanged::TPtr& ev) {
- const TPDiskID& id = ev->Get()->Id;
- const bool success = ev->Get()->Success;
+ void Handle(TEvBlobStorage::TEvControllerConfigResponse::TPtr& ev) {
+ const auto& response = ev->Get()->Record.GetResponse();
+
+ LOG_D("Handle TEvBlobStorage::TEvControllerConfigResponse"
+ << ": response# " << response.ShortDebugString());
- LOG_D("Handle TEvSentinel::TEvStatusChanged"
- << ": pdiskId# " << id
- << ", success# " << (success ? "true" : "false"));
+ if (ev->Cookie != SentinelState->ChangeRequestId) {
+ return;
+ }
- auto it = SentinelState->PDisks.find(id);
- if (it == SentinelState->PDisks.end()) {
- LOG_W("Status of unknown pdisk has been changed"
- << ": pdiskId# " << id);
+ if (SentinelState->ChangeRequests.empty()) {
return;
}
- if (!success) {
- LOG_C("PDisk status has NOT been changed"
- << ": pdiskId# " << id);
- it->second->StatusChangeFailed = true;
- (*Counters->PDisksNotChanged)++;
- } else {
+ auto onPDiskStatusChanged = [&](const TPDiskID& id, TPDiskInfo& info) {
+ info.StatusChangeFailed = false;
+ info.PrevStatus = info.ActualStatus;
+ info.ActualStatus = info.GetStatus();
+ info.LastStatusChange = Now();
+ info.PrevStatusChangeAttempt = info.StatusChangeAttempt;
+ info.StatusChangeAttempt = SentinelState->StatusChangeAttempt;
+
LOG_N("PDisk status has been changed"
<< ": pdiskId# " << id);
- it->second->ActualStatus = it->second->GetStatus();
- it->second->StatusChangeFailed = false;
+
(*Counters->PDisksChanged)++;
+ };
+
+
+ if (!response.GetSuccess() || !response.StatusSize() || !response.GetStatus(0).GetSuccess()) {
+ Y_VERIFY(SentinelState->ChangeRequests.size() == response.StatusSize());
+ auto it = SentinelState->ChangeRequests.begin();
+ for (ui32 i = 0; i < response.StatusSize(); ++i) {
+ if (!response.GetStatus(i).GetSuccess()) {
+ TString error = "<no description>";
+ if (response.StatusSize()) {
+ error = response.GetStatus(i).GetErrorDescription();
+ }
+
+ LOG_E("Unsuccesful response from BSC"
+ << ", error# " << error);
+ it->second->StatusChangeFailed = true;
+ it->second->StatusChangeAttempt = SentinelState->StatusChangeAttempt;
+
+ ++it;
+ } else {
+ onPDiskStatusChanged(it->first, *(it->second));
+
+ it = SentinelState->ChangeRequests.erase(it);
+ }
+ }
+
+ MaybeRetry();
+ } else {
+ for (auto& [id, info] : SentinelState->ChangeRequests) {
+ onPDiskStatusChanged(id, *info);
+ }
+
+ SentinelState->ChangeRequests.clear();
+ SentinelState->StatusChangeAttempt = 0;
}
+ }
+
+ void OnRetry() {
+ LOG_D("Retrying"
+ << ", attempt# " << SentinelState->StatusChangeAttempt);
+ SendBSCRequests();
+ }
- it->second->StatusChanger = TActorId();
+ void MaybeRetry() {
+ if (SentinelState->StatusChangeAttempt++ < Config.ChangeStatusRetries && !SentinelState->ChangeRequests.empty()) {
+ Schedule(Config.RetryChangeStatus, new TEvents::TEvWakeup());
+ } else {
+ SentinelState->StatusChangeAttempt = 0;
+
+ for (auto& kv : SentinelState->ChangeRequests) {
+ kv.second->StatusChangeFailed = true;
+
+ LOG_C("PDisk status has NOT been changed"
+ << ": pdiskId# " << kv.first);
+
+ (*Counters->PDisksNotChanged)++;
+ }
+
+ SentinelState->ChangeRequests.clear();
+ }
}
void OnPipeDisconnected() {
@@ -1188,11 +1141,7 @@ class TSentinel: public TActorBootstrapped<TSentinel> {
Send(actor, new TEvSentinel::TEvBSCPipeDisconnected());
}
- for (const auto& [_, info] : SentinelState->PDisks) {
- if (const TActorId& actor = info->StatusChanger) {
- Send(actor, new TEvSentinel::TEvBSCPipeDisconnected());
- }
- }
+ MaybeRetry();
}
void PassAway() override {
@@ -1204,12 +1153,6 @@ class TSentinel: public TActorBootstrapped<TSentinel> {
Send(actor, new TEvents::TEvPoisonPill());
}
- for (const auto& [_, info] : SentinelState->PDisks) {
- if (const TActorId& actor = info->StatusChanger) {
- Send(actor, new TEvents::TEvPoisonPill());
- }
- }
-
TActor::PassAway();
}
@@ -1245,9 +1188,10 @@ public:
sFunc(TEvSentinel::TEvConfigUpdated, OnConfigUpdated);
sFunc(TEvSentinel::TEvUpdateState, UpdateState);
sFunc(TEvSentinel::TEvStateUpdated, OnStateUpdated);
- hFunc(TEvSentinel::TEvStatusChanged, Handle);
hFunc(TEvCms::TEvGetSentinelStateRequest, Handle);
+ hFunc(TEvBlobStorage::TEvControllerConfigResponse, Handle);
sFunc(TEvSentinel::TEvBSCPipeDisconnected, OnPipeDisconnected);
+ sFunc(TEvents::TEvWakeup, OnRetry);
sFunc(TEvents::TEvPoisonPill, PassAway);
}
@@ -1264,6 +1208,16 @@ private:
}; // TSentinel
+IActor* CreateBSCClientActor(const TCmsStatePtr& cmsState) {
+ auto domains = AppData()->DomainsInfo;
+ const ui32 domainUid = domains->GetDomainUidByTabletId(cmsState->CmsTabletId);
+ const ui64 bscId = MakeBSControllerID(domains->GetDefaultStateStorageGroup(domainUid));
+
+ NTabletPipe::TClientConfig config;
+ config.RetryPolicy = NTabletPipe::TClientRetryPolicy::WithRetries();
+ return NTabletPipe::CreateClient(cmsState->CmsActorId, bscId, config);
+}
+
} // NSentinel
IActor* CreateSentinel(TCmsStatePtr state) {
diff --git a/ydb/core/cms/sentinel.h b/ydb/core/cms/sentinel.h
index ec31096219f..911ad892fe7 100644
--- a/ydb/core/cms/sentinel.h
+++ b/ydb/core/cms/sentinel.h
@@ -17,7 +17,6 @@ struct TEvSentinel {
EvStatusChanged,
- EvRetry,
EvTimeout,
EvBSCPipeDisconnected,
@@ -33,7 +32,6 @@ struct TEvSentinel {
struct TEvUpdateState: public TEventLocal<TEvUpdateState, EvUpdateState> {};
struct TEvStateUpdated: public TEventLocal<TEvStateUpdated, EvStateUpdated> {};
- struct TEvRetry: public TEventLocal<TEvRetry, EvRetry> {};
struct TEvTimeout: public TEventLocal<TEvTimeout, EvTimeout> {};
struct TEvStatusChanged: public TEventLocal<TEvStatusChanged, EvStatusChanged> {
diff --git a/ydb/core/cms/sentinel_impl.h b/ydb/core/cms/sentinel_impl.h
index 55ab90abab4..ea05ebcb2fa 100644
--- a/ydb/core/cms/sentinel_impl.h
+++ b/ydb/core/cms/sentinel_impl.h
@@ -90,12 +90,12 @@ struct TPDiskInfo
using EIgnoreReason = NKikimrCms::TPDiskInfo::EIgnoreReason;
- TActorId StatusChanger;
+ EPDiskStatus ActualStatus = EPDiskStatus::ACTIVE;
+ EPDiskStatus PrevStatus = EPDiskStatus::ACTIVE;
TInstant LastStatusChange;
bool StatusChangeFailed = false;
- EPDiskStatus ActualStatus = EPDiskStatus::ACTIVE;
- TStatusChangerState::TPtr StatusChangerState;
- TStatusChangerState::TPtr PrevStatusChangerState;
+ ui32 StatusChangeAttempt = 0;
+ ui32 PrevStatusChangeAttempt = 0;
EIgnoreReason IgnoreReason = NKikimrCms::TPDiskInfo::NOT_IGNORED;
explicit TPDiskInfo(EPDiskStatus initialStatus, const ui32& defaultStateLimit, const TLimitsMap& stateLimits);
@@ -137,6 +137,9 @@ struct TSentinelState: public TSimpleRefCount<TSentinelState> {
THashSet<ui32> StateUpdaterWaitNodes;
TConfigUpdaterState ConfigUpdaterState;
TConfigUpdaterState PrevConfigUpdaterState;
+ TMap<TPDiskID, TPDiskInfo::TPtr> ChangeRequests;
+ ui32 StatusChangeAttempt = 0;
+ ui32 ChangeRequestId = 0;
};
class TClusterMap {
@@ -177,6 +180,8 @@ private:
const ui32 RackRatio;
}; // TGuardian
+IActor* CreateBSCClientActor(const TCmsStatePtr& cmsState);
+
} // NSentinel
} // NCms
} // NKikimr
diff --git a/ydb/core/cms/sentinel_ut.cpp b/ydb/core/cms/sentinel_ut.cpp
index 1867ef3d032..05410b5a90c 100644
--- a/ydb/core/cms/sentinel_ut.cpp
+++ b/ydb/core/cms/sentinel_ut.cpp
@@ -378,6 +378,7 @@ Y_UNIT_TEST_SUITE(TSentinelTests) {
switch (ev->GetTypeRewrite()) {
case TEvSentinel::TEvUpdateConfig::EventType:
case TEvSentinel::TEvUpdateState::EventType:
+ case TEvents::TEvWakeup::EventType:
return false;
default:
@@ -387,7 +388,9 @@ Y_UNIT_TEST_SUITE(TSentinelTests) {
auto prevObserver = SetObserverFunc(&TTestActorRuntimeBase::DefaultObserverFunc);
SetObserverFunc([this, prevObserver](TTestActorRuntimeBase& runtime,
TAutoPtr<IEventHandle> &event){
- if (event->GetTypeRewrite() == TEvCms::TEvClusterStateRequest::EventType) {
+ switch (event->GetTypeRewrite()) {
+ case TEvCms::TEvClusterStateRequest::EventType:
+ {
TAutoPtr<TEvCms::TEvClusterStateResponse> resp = new TEvCms::TEvClusterStateResponse;
if (State) {
resp->Record.MutableStatus()->SetCode(NKikimrCms::TStatus::OK);
@@ -398,8 +401,9 @@ Y_UNIT_TEST_SUITE(TSentinelTests) {
Send(new IEventHandle(event->Sender, TActorId(), resp.Release()));
return TTestActorRuntime::EEventAction::PROCESS;
}
-
- return prevObserver(runtime, event);
+ default:
+ return prevObserver(runtime, event);
+ }
});
State = new TCmsState;
@@ -464,11 +468,11 @@ Y_UNIT_TEST_SUITE(TSentinelTests) {
struct TPDiskUpdates {
bool UpdateStatusRequested = false;
- bool StatusChanged = false;
+ ui32 IgnoredUpdateRequests = 0;
};
THashMap<TPDiskID, TPDiskUpdates, TPDiskIDHash> pdiskUpdates;
for (const auto& id : pdisks) {
- pdiskUpdates[id] = {};// TPDiskUpdates {false, false});
+ pdiskUpdates[id] = {};// TPDiskUpdates {false, 0});
}
auto check = [&](IEventHandle& ev) {
@@ -479,43 +483,39 @@ Y_UNIT_TEST_SUITE(TSentinelTests) {
case TEvBlobStorage::TEvControllerConfigRequest::EventType:
{
+ TGuard<TMutex> guard(TFakeNodeWhiteboardService::Mutex);
const auto& request = ev.Get<TEvBlobStorage::TEvControllerConfigRequest>()->Record;
UNIT_ASSERT(request.HasRequest());
- if (request.GetRequest().CommandSize() && request.GetRequest().GetCommand(0).HasUpdateDriveStatus()) {
- const auto& update = request.GetRequest().GetCommand(0).GetUpdateDriveStatus();
- TPDiskID id(update.GetHostKey().GetNodeId(), update.GetPDiskId());
- auto it = pdiskUpdates.find(id);
- if (it != pdiskUpdates.end()) {
- it->second.UpdateStatusRequested = (expectedStatus == update.GetStatus());
+ for (ui32 i = 0; i < request.GetRequest().CommandSize(); ++i) {
+ if (request.GetRequest().GetCommand(i).HasUpdateDriveStatus()) {
+ const auto& update = request.GetRequest().GetCommand(i).GetUpdateDriveStatus();
+ TPDiskID id(update.GetHostKey().GetNodeId(), update.GetPDiskId());
+
+ auto it = pdiskUpdates.find(id);
+ if (it != pdiskUpdates.end()) {
+ if (expectedStatus == update.GetStatus()) {
+ auto& vec = TFakeNodeWhiteboardService::BSControllerResponsePatterns[id];
+ if (!(TFakeNodeWhiteboardService::NoisyBSCPipeCounter % 3) && (vec.empty() || *vec.begin())) {
+ it->second.UpdateStatusRequested = true;
+ } else {
+ it->second.IgnoredUpdateRequests++;
+ }
+ }
+ }
}
}
}
break;
-
- case TEvSentinel::TEvStatusChanged::EventType:
- {
- const auto* event = ev.Get<TEvSentinel::TEvStatusChanged>();
-
- auto it = pdiskUpdates.find(event->Id);
- if (it != pdiskUpdates.end()) {
- UNIT_ASSERT(event->Success);
- it->second.StatusChanged = true;
- }
- }
- break;
-
default:
break;
}
- bool allUpdateStatusRequested = true;
- bool allStatusChanged = true;
+ bool allUpdateStatusRequestedOrIgnored = true;
for (const auto& [id, info] : pdiskUpdates) {
- allUpdateStatusRequested &= info.UpdateStatusRequested;
- allStatusChanged &= info.StatusChanged;
+ allUpdateStatusRequestedOrIgnored &= (info.UpdateStatusRequested || info.IgnoredUpdateRequests == 6);
}
- return stateUpdated && pdiskUpdates.size() && allUpdateStatusRequested && allStatusChanged;
+ return stateUpdated && pdiskUpdates.size() && allUpdateStatusRequestedOrIgnored;
};
TDispatchOptions options;
@@ -525,6 +525,7 @@ Y_UNIT_TEST_SUITE(TSentinelTests) {
private:
TCmsStatePtr State;
+ std::atomic<bool> NosiyBlobstoragePipe = false;
TActorId Sentinel;
}; // TTestEnv
@@ -613,6 +614,44 @@ Y_UNIT_TEST_SUITE(TSentinelTests) {
env.SetPDiskState(pdisks, NKikimrBlobStorage::TPDiskState::Normal, EPDiskStatus::ACTIVE);
}
}
+
+ Y_UNIT_TEST(BSControllerUnresponsive) {
+ TTestEnv env(8, 4);
+
+ env.EnableNoisyBSCPipe();
+
+ const TPDiskID id1 = env.RandomPDiskID();
+ const TPDiskID id2 = env.RandomPDiskID();
+ const TPDiskID id3 = env.RandomPDiskID();
+ for (size_t i = 0; i < sizeof(ErrorStates) / sizeof(ErrorStates[0]); ++i) {
+ env.AddBSCFailures(id1, {false, true});
+ env.AddBSCFailures(id2, {false, false, false, false, false, false});
+ }
+
+ for (const EPDiskState state : ErrorStates) {
+ env.SetPDiskState({id1, id2, id3}, state, EPDiskStatus::FAULTY);
+ env.SetPDiskState({id1, id2, id3}, NKikimrBlobStorage::TPDiskState::Normal, EPDiskStatus::ACTIVE);
+ }
+ }
+
+ Y_UNIT_TEST(BSControllerCantChangeStatus) {
+ TTestEnv env(8, 4);
+
+ const TPDiskID id1 = env.RandomPDiskID();
+ const TPDiskID id2 = env.RandomPDiskID();
+ const TPDiskID id3 = env.RandomPDiskID();
+ for (size_t i = 0; i < sizeof(ErrorStates) / sizeof(ErrorStates[0]); ++i) {
+ env.AddBSCFailures(id1, {true, false, false, true, false, false});
+ // will fail for all requests assuming there is only 5 retries
+ env.AddBSCFailures(id2, {false, false, false, false, false, false});
+ env.AddBSCFailures(id3, {false, true, false, false, true, false});
+ }
+
+ for (const EPDiskState state : ErrorStates) {
+ env.SetPDiskState({id1, id2, id3}, state, EPDiskStatus::FAULTY);
+ env.SetPDiskState({id1, id2, id3}, NKikimrBlobStorage::TPDiskState::Normal, EPDiskStatus::ACTIVE);
+ }
+ }
} // TSentinelTests
} // NCmsTest