summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexander Rutkovsky <[email protected]>2025-03-11 18:15:20 +0300
committerGitHub <[email protected]>2025-03-11 18:15:20 +0300
commit033f7f056c4a1de2b2f53e078a05dc28e7ae4a73 (patch)
tree4772e91515bf39877f53ae3a9d07a6409130f316
parente6c8de41f78268c5dbdf6ddf4c6731ae43687dbb (diff)
Correct distconf enabling sequence (#15578)
-rw-r--r--ydb/core/blobstorage/base/blobstorage_console_events.h3
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf.h9
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf_console.cpp17
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf_fsm.cpp50
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf_invoke.cpp139
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_impl.cpp2
-rw-r--r--ydb/core/cms/console/console_handshake.cpp33
-rw-r--r--ydb/core/cms/console/console_impl.h3
-rw-r--r--ydb/core/grpc_services/rpc_config_base.h16
-rw-r--r--ydb/core/protos/blobstorage.proto1
-rw-r--r--ydb/core/protos/blobstorage_distributed_config.proto6
11 files changed, 191 insertions, 88 deletions
diff --git a/ydb/core/blobstorage/base/blobstorage_console_events.h b/ydb/core/blobstorage/base/blobstorage_console_events.h
index 47d3932fa2a..809c1059c0e 100644
--- a/ydb/core/blobstorage/base/blobstorage_console_events.h
+++ b/ydb/core/blobstorage/base/blobstorage_console_events.h
@@ -10,9 +10,10 @@ namespace NKikimr {
NKikimrBlobStorage::TEvControllerProposeConfigRequest, EvControllerProposeConfigRequest> {
TEvControllerProposeConfigRequest() = default;
- TEvControllerProposeConfigRequest(ui64 configHash, ui64 configVersion) {
+ TEvControllerProposeConfigRequest(ui64 configHash, ui64 configVersion, bool distconf) {
Record.SetConfigHash(configHash);
Record.SetConfigVersion(configVersion);
+ Record.SetDistconf(distconf);
}
TString ToString() const override {
diff --git a/ydb/core/blobstorage/nodewarden/distconf.h b/ydb/core/blobstorage/nodewarden/distconf.h
index 6ed32afa36f..c9ce2ce5fec 100644
--- a/ydb/core/blobstorage/nodewarden/distconf.h
+++ b/ydb/core/blobstorage/nodewarden/distconf.h
@@ -351,8 +351,13 @@ namespace NKikimr::NStorage {
bool HasQuorum() const;
void ProcessCollectConfigs(TEvGather::TCollectConfigs *res);
- using TProcessCollectConfigsResult = std::variant<std::monostate, TString, NKikimrBlobStorage::TStorageConfig>;
- TProcessCollectConfigsResult ProcessCollectConfigs(TEvGather::TCollectConfigs *res, const TString *selfAssemblyUUID);
+ struct TProcessCollectConfigsResult {
+ std::variant<std::monostate, TString, NKikimrBlobStorage::TStorageConfig> Outcome;
+ bool IsDistconfDisabledQuorum = false;
+ };
+ TProcessCollectConfigsResult ProcessCollectConfigs(TEvGather::TCollectConfigs *res,
+ std::optional<TStringBuf> selfAssemblyUUID);
+
std::optional<TString> ProcessProposeStorageConfig(TEvGather::TProposeStorageConfig *res);
struct TExConfigError : yexception {};
diff --git a/ydb/core/blobstorage/nodewarden/distconf_console.cpp b/ydb/core/blobstorage/nodewarden/distconf_console.cpp
index ee125a8a53b..a02ab98404d 100644
--- a/ydb/core/blobstorage/nodewarden/distconf_console.cpp
+++ b/ydb/core/blobstorage/nodewarden/distconf_console.cpp
@@ -35,8 +35,14 @@ namespace NKikimr::NStorage {
return; // still waiting for previous one
}
+ ProposeRequestInFlight = true;
+
if (!StorageConfig || !StorageConfig->HasConfigComposite()) {
- return; // no config yet
+ // send empty proposition just to connect to console
+ auto ev = std::make_unique<TEvBlobStorage::TEvControllerProposeConfigRequest>();
+ ev->Record.SetDistconf(true);
+ NTabletPipe::SendData(SelfId(), ConsolePipeId, ev.release(), ++ProposeRequestCookie);
+ return;
}
Y_ABORT_UNLESS(MainConfigYamlVersion);
@@ -51,11 +57,14 @@ namespace NKikimr::NStorage {
MainConfigFetchYamlHash, *MainConfigYamlVersion));
ProposedConfigHashVersion.emplace(MainConfigFetchYamlHash, *MainConfigYamlVersion);
NTabletPipe::SendData(SelfId(), ConsolePipeId, new TEvBlobStorage::TEvControllerProposeConfigRequest(
- MainConfigFetchYamlHash, *MainConfigYamlVersion), ++ProposeRequestCookie);
- ProposeRequestInFlight = true;
+ MainConfigFetchYamlHash, *MainConfigYamlVersion, true), ++ProposeRequestCookie);
}
void TDistributedConfigKeeper::Handle(TEvBlobStorage::TEvControllerValidateConfigResponse::TPtr ev) {
+ STLOG(PRI_DEBUG, BS_NODE, NWDC10, "received TEvControllerValidateConfigResponse",
+ (Sender, ev->Sender), (Cookie, ev->Cookie), (Record, ev->Get()->Record),
+ (ConsoleConfigValidationQ.size, ConsoleConfigValidationQ.size()));
+
auto& q = ConsoleConfigValidationQ;
auto pred = [&](const auto& item) {
const auto& [actorId, yaml, cookie] = item;
@@ -114,7 +123,7 @@ namespace NKikimr::NStorage {
break;
case NKikimrBlobStorage::TEvControllerProposeConfigResponse::ReverseCommit:
- Y_DEBUG_ABORT();
+ // just do nothing, we didn't have the config in distconf, possibly it is being enabled
break;
}
}
diff --git a/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp b/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp
index 1400e22ab7d..4411d5aeca0 100644
--- a/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp
+++ b/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp
@@ -110,16 +110,17 @@ namespace NKikimr::NStorage {
}
void TDistributedConfigKeeper::ProcessCollectConfigs(TEvGather::TCollectConfigs *res) {
- TOverloaded handler{
- [&](std::monostate&&) {
+ auto r = ProcessCollectConfigs(res, std::nullopt);
+ std::visit(TOverloaded{
+ [&](std::monostate&) {
STLOG(PRI_DEBUG, BS_NODE, NWDC61, "ProcessCollectConfigs: monostate");
RootState = ERootState::RELAX;
},
- [&](TString&& error) {
+ [&](TString& error) {
STLOG(PRI_DEBUG, BS_NODE, NWDC63, "ProcessCollectConfigs: error", (Error, error));
SwitchToError(error);
},
- [&](NKikimrBlobStorage::TStorageConfig&& proposedConfig) {
+ [&](NKikimrBlobStorage::TStorageConfig& proposedConfig) {
STLOG(PRI_DEBUG, BS_NODE, NWDC64, "ProcessCollectConfigs: proposed new config",
(ProposedConfig, proposedConfig));
TEvScatter task;
@@ -130,12 +131,11 @@ namespace NKikimr::NStorage {
propose->MutableConfig()->Swap(&proposedConfig);
IssueScatterTask(TActorId(), std::move(task));
}
- };
- std::visit(handler, ProcessCollectConfigs(res, nullptr));
+ }, r.Outcome);
}
TDistributedConfigKeeper::TProcessCollectConfigsResult TDistributedConfigKeeper::ProcessCollectConfigs(
- TEvGather::TCollectConfigs *res, const TString *selfAssemblyUUID) {
+ TEvGather::TCollectConfigs *res, std::optional<TStringBuf> selfAssemblyUUID) {
auto generateSuccessful = [&](auto&& callback) {
for (const auto& item : res->GetNodes()) {
for (const auto& node : item.GetNodeIds()) {
@@ -171,10 +171,24 @@ namespace NKikimr::NStorage {
if (nodeQuorum && !configQuorum) {
// check if there is quorum of no-distconf config along the cluster
+ auto generateNodesWithoutDistconf = [&](auto&& callback) {
+ for (const auto& item : res->GetNodes()) {
+ if (item.GetBaseConfig().GetSelfManagementConfig().GetEnabled()) {
+ continue;
+ }
+ for (const auto& node : item.GetNodeIds()) {
+ callback(node);
+ }
+ }
+ };
+ if (HasNodeQuorum(*StorageConfig, generateNodesWithoutDistconf)) {
+ // yes, distconf is disabled on the majority of the nodes, so we can't do anything about it
+ return {.IsDistconfDisabledQuorum = true};
+ }
}
if (!nodeQuorum || !configQuorum) {
- return "no quorum for CollectConfigs";
+ return {"no quorum for CollectConfigs"};
}
// TODO: validate self-assembly UUID
@@ -223,7 +237,7 @@ namespace NKikimr::NStorage {
(BaseConfigs.size, baseConfigs.size()));
Y_DEBUG_ABORT("Multiple nonintersecting node sets have quorum of BaseConfig");
Halt();
- return "Multiple nonintersecting node sets have quorum of BaseConfig";
+ return {"Multiple nonintersecting node sets have quorum of BaseConfig"};
}
NKikimrBlobStorage::TStorageConfig *baseConfig = nullptr;
for (auto& [meta, info] : baseConfigs) {
@@ -289,13 +303,13 @@ namespace NKikimr::NStorage {
(Generation, generation), (Configs, configs));
Y_DEBUG_ABORT("Multiple nonintersecting node sets have quorum of persistent config");
Halt();
- return "Multiple nonintersecting node sets have quorum of persistent config";
+ return {"Multiple nonintersecting node sets have quorum of persistent config"};
}
Y_ABORT_UNLESS(configs.size() == 1);
persistedConfig = configs.front();
}
if (maxSeenGeneration && (!persistedConfig || persistedConfig->GetGeneration() < maxSeenGeneration)) {
- return "couldn't obtain quorum for configuration that was seen in effect";
+ return {"couldn't obtain quorum for configuration that was seen in effect"};
}
// let's try to find possibly proposed config, but without a quorum, and try to reconstruct it
@@ -311,7 +325,7 @@ namespace NKikimr::NStorage {
(PersistentConfig, *persistedConfig), (ProposedConfig, config));
Y_DEBUG_ABORT("persistently proposed config has too big generation");
Halt();
- return "persistently proposed config has too big generation";
+ return {"persistently proposed config has too big generation"};
}
}
if (proposedConfig && (proposedConfig->GetGeneration() != config.GetGeneration() ||
@@ -361,11 +375,11 @@ namespace NKikimr::NStorage {
if (!CurrentSelfAssemblyUUID) {
CurrentSelfAssemblyUUID.emplace(CreateGuidAsString());
}
- selfAssemblyUUID = &CurrentSelfAssemblyUUID.value();
+ selfAssemblyUUID.emplace(CurrentSelfAssemblyUUID.value());
}
propositionBase.emplace(*baseConfig);
- if (auto error = GenerateFirstConfig(baseConfig, *selfAssemblyUUID)) {
- return *error;
+ if (auto error = GenerateFirstConfig(baseConfig, TString(*selfAssemblyUUID))) {
+ return {*error};
}
configToPropose = baseConfig;
}
@@ -392,12 +406,12 @@ namespace NKikimr::NStorage {
if (error) {
Y_DEBUG_ABORT("incorrect config proposition");
- return "incorrect config proposition";
+ return {"incorrect config proposition"};
}
if (propositionBase) {
if (auto error = ValidateConfig(*propositionBase)) {
- return TStringBuilder() << "failed to propose configuration, base config contains errors: " << *error;
+ return {TStringBuilder() << "failed to propose configuration, base config contains errors: " << *error};
}
if (auto error = ValidateConfigUpdate(*propositionBase, *configToPropose)) {
Y_FAIL_S("incorrect config proposed: " << *error);
@@ -408,7 +422,7 @@ namespace NKikimr::NStorage {
}
}
- return std::move(*configToPropose);
+ return {std::move(*configToPropose)};
}
return {};
diff --git a/ydb/core/blobstorage/nodewarden/distconf_invoke.cpp b/ydb/core/blobstorage/nodewarden/distconf_invoke.cpp
index 4c0eca0139c..c5c63449b85 100644
--- a/ydb/core/blobstorage/nodewarden/distconf_invoke.cpp
+++ b/ydb/core/blobstorage/nodewarden/distconf_invoke.cpp
@@ -30,6 +30,7 @@ namespace NKikimr::NStorage {
NKikimrBlobStorage::TStorageConfig ProposedStorageConfig;
std::optional<TString> NewYaml;
+ std::optional<TString> VersionError;
public:
TInvokeRequestHandlerActor(TDistributedConfigKeeper *self, std::unique_ptr<TEventHandle<TEvNodeConfigInvokeOnRoot>>&& ev)
@@ -707,6 +708,9 @@ namespace NKikimr::NStorage {
newExpectedStorageYamlVersion.emplace(config.GetExpectedStorageYamlVersion());
}
+ std::optional<ui64> storageYamlVersion;
+ std::optional<ui64> mainYamlVersion;
+
try {
auto load = [&](const TString& yaml, ui64& version, const char *expectedKind) {
state = TStringBuilder() << "loading " << expectedKind << " YAML";
@@ -736,29 +740,13 @@ namespace NKikimr::NStorage {
const NJson::TJsonValue *effective = nullptr;
if (newStorageYaml) {
- ui64 version = 0;
- storage = load(*newStorageYaml, version, "StorageConfig");
- if (const ui64 expected = Self->StorageConfig->GetExpectedStorageYamlVersion(); version != expected) {
- return FinishWithError(TResult::ERROR, TStringBuilder()
- << "storage config version must be increasing by one"
- << " new version# " << version
- << " expected version# " << expected);
- }
-
- newExpectedStorageYamlVersion = version + 1;
+ storage = load(*newStorageYaml, storageYamlVersion.emplace(), "StorageConfig");
+ newExpectedStorageYamlVersion = *storageYamlVersion + 1;
effective = &storage;
}
if (NewYaml) {
- ui64 version = 0;
- main = load(*NewYaml, version, "MainConfig");
- if (const ui64 expected = *Self->MainConfigYamlVersion + 1; version != expected) {
- return FinishWithError(TResult::ERROR, TStringBuilder()
- << "main config version must be increasing by one"
- << " new version# " << version
- << " expected version# " << expected);
- }
-
+ main = load(*NewYaml, mainYamlVersion.emplace(), "MainConfig");
if (!effective && !Self->StorageConfigYaml) {
effective = &main;
}
@@ -780,6 +768,29 @@ namespace NKikimr::NStorage {
<< ": " << ex.what());
}
+ if (storageYamlVersion) {
+ const ui64 expected = Self->StorageConfig->GetExpectedStorageYamlVersion();
+ if (*storageYamlVersion != expected) {
+ VersionError = TStringBuilder()
+ << "storage config version must be increasing by one"
+ << " new version# " << *storageYamlVersion
+ << " expected version# " << expected;
+ }
+ }
+
+ if (!VersionError && mainYamlVersion && Self->MainConfigYamlVersion) {
+ // TODO(alexvru): we have to check version when migrating to self-managed mode
+ const ui64 expected = Self->MainConfigYamlVersion
+ ? *Self->MainConfigYamlVersion + 1
+ : 0;
+ if (*mainYamlVersion != expected) {
+ VersionError = TStringBuilder()
+ << "main config version must be increasing by one"
+ << " new version# " << *mainYamlVersion
+ << " expected version# " << expected;
+ }
+ }
+
if (NewYaml) {
if (const auto& error = UpdateConfigComposite(config, *NewYaml, std::nullopt)) {
return FinishWithError(TResult::ERROR, TStringBuilder() << "failed to update config yaml: " << *error);
@@ -813,22 +824,73 @@ namespace NKikimr::NStorage {
<< "ReplaceStorageConfig config validation failed: " << *error);
}
- if (request.GetSkipConsoleValidation() || !NewYaml) {
- return StartProposition(&config);
- }
+ // whether we are enabling distconf right now (by this operation)
+ ProposedStorageConfig = std::move(config);
- // whether we are enabling distconf right now
- const bool enablingDistconf = Self->BaseConfig.GetSelfManagementConfig().GetEnabled() &&
- !Self->SelfManagementEnabled &&
- config.GetSelfManagementConfig().GetEnabled();
+ if (!Self->SelfManagementEnabled && ProposedStorageConfig.GetSelfManagementConfig().GetEnabled()) {
+ TryEnableDistconf();
+ } else {
+ IssueQueryToConsole(false);
+ }
+ }
- if (!Self->EnqueueConsoleConfigValidation(SelfId(), enablingDistconf, *NewYaml)) {
+ void IssueQueryToConsole(bool enablingDistconf) {
+ if (VersionError) {
+ FinishWithError(TResult::ERROR, *VersionError);
+ } else if (Event->Get()->Record.GetReplaceStorageConfig().GetSkipConsoleValidation() || !NewYaml) {
+ StartProposition(&ProposedStorageConfig);
+ } else if (!Self->EnqueueConsoleConfigValidation(SelfId(), enablingDistconf, *NewYaml)) {
FinishWithError(TResult::ERROR, "console pipe is not available");
- } else {
- ProposedStorageConfig = std::move(config);
}
}
+ void TryEnableDistconf() {
+ const ERootState prevState = std::exchange(Self->RootState, ERootState::IN_PROGRESS);
+ Y_ABORT_UNLESS(prevState == ERootState::RELAX);
+
+ TEvScatter task;
+ task.MutableCollectConfigs();
+ IssueScatterTask(std::move(task), [this](TEvGather *res) -> std::optional<TString> {
+ Y_ABORT_UNLESS(Self->StorageConfig); // it can't just disappear
+
+ const ERootState prevState = std::exchange(Self->RootState, ERootState::RELAX);
+ Y_ABORT_UNLESS(prevState == ERootState::IN_PROGRESS);
+
+ if (!res->HasCollectConfigs()) {
+ return "incorrect CollectConfigs response";
+ } else if (Self->CurrentProposedStorageConfig) {
+ FinishWithError(TResult::RACE, "config proposition request in flight");
+ } else if (Scepter.expired()) {
+ return "scepter lost during query execution";
+ } else {
+ auto r = Self->ProcessCollectConfigs(res->MutableCollectConfigs(), std::nullopt);
+ return std::visit<std::optional<TString>>(TOverloaded{
+ [&](std::monostate&) -> std::optional<TString> {
+ if (r.IsDistconfDisabledQuorum) {
+ // distconf is disabled on the majority of nodes; we have just to replace configs
+ // and then to restart these nodes in order to enable it in future
+ auto ev = PrepareResult(TResult::CONTINUE_BSC, "proceed with BSC");
+ ev->Record.MutableReplaceStorageConfig()->SetAllowEnablingDistconf(true);
+ Finish(Sender, SelfId(), ev.release(), 0, Cookie);
+ } else {
+ // we can actually enable distconf with this query, so do it
+ IssueQueryToConsole(true);
+ }
+ return std::nullopt;
+ },
+ [&](TString& error) {
+ return std::move(error);
+ },
+ [&](NKikimrBlobStorage::TStorageConfig& /*proposedConfig*/) {
+ return "unexpected config proposition";
+ }
+ }, r.Outcome);
+ }
+
+ return std::nullopt; // no error or it is already processed
+ });
+ }
+
void Handle(TEvBlobStorage::TEvControllerValidateConfigResponse::TPtr ev) {
const auto& record = ev->Get()->Record;
STLOG(PRI_DEBUG, BS_NODE, NWDC77, "received TEvControllerValidateConfigResponse", (SelfId, SelfId()),
@@ -843,7 +905,7 @@ namespace NKikimr::NStorage {
case NKikimrBlobStorage::TEvControllerValidateConfigResponse::IdPipeServerMismatch:
Self->DisconnectFromConsole();
Self->ConnectToConsole();
- return FinishWithError(TResult::ERROR, TStringBuilder() << "console connection race detected");
+ return FinishWithError(TResult::ERROR, TStringBuilder() << "console connection race detected: " << record.GetErrorReason());
case NKikimrBlobStorage::TEvControllerValidateConfigResponse::ConfigNotValid:
return FinishWithError(TResult::ERROR, TStringBuilder() << "console config validation failed: "
@@ -884,25 +946,24 @@ namespace NKikimr::NStorage {
FinishWithError(TResult::RACE, "storage config generation regenerated while collecting configs");
return std::nullopt;
}
- TOverloaded handler{
- [&](std::monostate&&) {
+ auto r = Self->ProcessCollectConfigs(res->MutableCollectConfigs(), selfAssemblyUUID);
+ return std::visit<std::optional<TString>>(TOverloaded{
+ [&](std::monostate&) {
const ERootState prevState = std::exchange(Self->RootState, ERootState::RELAX);
Y_ABORT_UNLESS(prevState == ERootState::IN_PROGRESS);
Finish(Sender, SelfId(), PrepareResult(TResult::OK, std::nullopt).release(), 0, Cookie);
return std::nullopt;
},
- [&](TString&& error) {
+ [&](TString& error) {
const ERootState prevState = std::exchange(Self->RootState, ERootState::RELAX);
Y_ABORT_UNLESS(prevState == ERootState::IN_PROGRESS);
return error;
},
- [&](NKikimrBlobStorage::TStorageConfig&& proposedConfig) {
+ [&](NKikimrBlobStorage::TStorageConfig& proposedConfig) {
StartProposition(&proposedConfig, false);
return std::nullopt;
}
- };
- return std::visit<std::optional<TString>>(handler,
- Self->ProcessCollectConfigs(res->MutableCollectConfigs(), &selfAssemblyUUID));
+ }, r.Outcome);
};
TEvScatter task;
@@ -980,12 +1041,12 @@ namespace NKikimr::NStorage {
}
std::unique_ptr<TEvNodeConfigInvokeOnRootResult> PrepareResult(TResult::EStatus status,
- std::optional<std::reference_wrapper<const TString>> errorReason) {
+ std::optional<TStringBuf> errorReason) {
auto ev = std::make_unique<TEvNodeConfigInvokeOnRootResult>();
auto *record = &ev->Record;
record->SetStatus(status);
if (errorReason) {
- record->SetErrorReason(*errorReason);
+ record->SetErrorReason(errorReason->data(), errorReason->size());
}
if (auto scepter = Scepter.lock()) {
auto *s = record->MutableScepter();
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
index 0bb0982eed5..6fe9683e99e 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
+++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
@@ -705,7 +705,7 @@ void TNodeWarden::PersistConfig(std::optional<TString> mainYaml, ui64 mainYamlVe
}
if (!NFs::Rename(tempPath, configPath)) {
- STLOG(PRI_ERROR, BS_NODE, NW92, "Failed to rename temporary file", (Error, LastSystemErrorText()));
+ STLOG(PRI_ERROR, BS_NODE, NW53, "Failed to rename temporary file", (Error, LastSystemErrorText()));
success = false;
return false;
}
diff --git a/ydb/core/cms/console/console_handshake.cpp b/ydb/core/cms/console/console_handshake.cpp
index 0f31f2c140b..c486ff1b405 100644
--- a/ydb/core/cms/console/console_handshake.cpp
+++ b/ydb/core/cms/console/console_handshake.cpp
@@ -97,15 +97,22 @@ private:
};
template <typename TRequestEvent, typename TResponse>
-bool TConfigsManager::CheckSession(TEventHandle<TRequestEvent>& ev, std::unique_ptr<TResponse>& failEvent, typename TResponse::ProtoRecordType::EStatus status) {
- if (Self.CurrentSenderId != ev.Sender) {
- failEvent->Record.SetStatus(status);
- SendInReply(ev.Sender, ev.InterconnectSession, std::move(failEvent));
- return false;
- } else if (Self.CurrentPipeServerId != ev.Recipient) {
- return false;
+bool TConfigsManager::CheckSession(TEventHandle<TRequestEvent>& ev, std::unique_ptr<TResponse>& failEvent,
+ typename TResponse::ProtoRecordType::EStatus status) {
+ for (const auto& [senderId, pipeServerId] : Self.ConfigClients) {
+ if (senderId == ev.Sender && pipeServerId == ev.Recipient) { // same sender and pipe
+ return true;
+ } else if (senderId == ev.Sender) { // pipe differs, same sender (obsolete pipe)
+ return false;
+ } else if (pipeServerId == ev.Recipient) { // different sender, same pipe?
+ Y_DEBUG_ABORT();
+ return false;
+ }
}
- return true;
+ // no matching pair found, obsolete event
+ failEvent->Record.SetStatus(status);
+ SendInReply(ev.Sender, ev.InterconnectSession, std::move(failEvent), ev.Cookie);
+ return false;
}
void TConfigsManager::Handle(TEvBlobStorage::TEvControllerProposeConfigRequest::TPtr &ev, const TActorContext &ctx) {
@@ -113,12 +120,14 @@ void TConfigsManager::Handle(TEvBlobStorage::TEvControllerProposeConfigRequest::
const auto& proposedConfigHash = record.GetConfigHash();
const auto& proposedConfigVersion = record.GetConfigVersion();
ui64 currentConfigHash = NKikimr::NYaml::GetConfigHash(MainYamlConfig);
- if (Self.CurrentSenderId != ev->Sender) {
- NTabletPipe::CloseServer(Self.SelfId(), Self.CurrentPipeServerId);
+
+ auto& [senderId, pipeServerId] = Self.ConfigClients[ev->Get()->Record.GetDistconf()];
+ if (pipeServerId != ev->Recipient) {
+ NTabletPipe::CloseServer(Self.SelfId(), pipeServerId);
}
- Self.CurrentSenderId = ev->Sender;
- Self.CurrentPipeServerId = ev->Recipient;
+ senderId = ev->Sender;
+ pipeServerId = ev->Recipient;
auto response = std::make_unique<TEvBlobStorage::TEvControllerProposeConfigResponse>();
auto& responseRecord = response->Record;
diff --git a/ydb/core/cms/console/console_impl.h b/ydb/core/cms/console/console_impl.h
index 1f1a47afbcb..8deb77a5210 100644
--- a/ydb/core/cms/console/console_impl.h
+++ b/ydb/core/cms/console/console_impl.h
@@ -196,8 +196,7 @@ private:
TActorId NetClassifierUpdaterId;
// For handshake with BSController/distconf
- TActorId CurrentSenderId;
- TActorId CurrentPipeServerId;
+ std::array<std::tuple<TActorId, TActorId>, 2> ConfigClients; // distconf -> SenderId, PipeServerId
};
} // namespace NKikimr::NConsole
diff --git a/ydb/core/grpc_services/rpc_config_base.h b/ydb/core/grpc_services/rpc_config_base.h
index 20f858d0508..f65cfeed6cc 100644
--- a/ydb/core/grpc_services/rpc_config_base.h
+++ b/ydb/core/grpc_services/rpc_config_base.h
@@ -183,21 +183,22 @@ protected:
void Handle(TEvNodeWardenStorageConfig::TPtr ev) {
auto *self = Self();
- if (ev->Get()->Config->GetGeneration() || self->IsDistconfEnableQuery()) { // distconf (will be) enabled
+ if (ev->Get()->SelfManagementEnabled || self->IsDistconfEnableQuery()) { // distconf (will be) enabled
auto ev = std::make_unique<NStorage::TEvNodeConfigInvokeOnRoot>();
self->FillDistconfQuery(*ev);
self->Send(MakeBlobStorageNodeWardenID(self->SelfId().NodeId()), ev.release());
} else { // classic BSC
- BSCTabletId = MakeBSControllerID();
CreatePipe();
- SendGetInterfaceVersion();
}
}
void Handle(NStorage::TEvNodeConfigInvokeOnRootResult::TPtr ev) {
auto *self = Self();
auto& record = ev->Get()->Record;
- if (record.GetStatus() != NKikimrBlobStorage::TEvNodeConfigInvokeOnRootResult::OK) {
+ if (auto status = record.GetStatus(); status == NKikimrBlobStorage::TEvNodeConfigInvokeOnRootResult::CONTINUE_BSC) {
+ // continue with BSC
+ CreatePipe();
+ } else if (status != NKikimrBlobStorage::TEvNodeConfigInvokeOnRootResult::OK) {
self->Reply(Ydb::StatusIds::INTERNAL_ERROR, record.GetErrorReason(), NKikimrIssues::TIssuesIds::DEFAULT_ERROR,
self->ActorContext());
} else {
@@ -217,15 +218,13 @@ protected:
void CreatePipe() {
auto *self = Self();
- BSCPipeClient = self->Register(NTabletPipe::CreateClient(self->SelfId(), BSCTabletId, GetPipeConfig()));
- }
+ BSCPipeClient = self->Register(NTabletPipe::CreateClient(self->SelfId(), MakeBSControllerID(), GetPipeConfig()));
- void SendGetInterfaceVersion() {
auto req = std::make_unique<TEvBlobStorage::TEvControllerConfigRequest>();
auto& record = req->Record;
auto *request = record.MutableRequest();
request->AddCommand()->MutableGetInterfaceVersion();
- NTabletPipe::SendData(Self()->SelfId(), BSCPipeClient, req.release(), 0, TBase::Span_.GetTraceId());
+ NTabletPipe::SendData(self->SelfId(), BSCPipeClient, req.release(), 0, TBase::Span_.GetTraceId());
State = EState::GET_INTERFACE_VERSION;
}
@@ -397,7 +396,6 @@ private:
};
EState State = EState::UNKNOWN;
- ui64 BSCTabletId = 0;
TActorId BSCPipeClient;
ui32 InterfaceVersion = 0;
};
diff --git a/ydb/core/protos/blobstorage.proto b/ydb/core/protos/blobstorage.proto
index df0815c5712..70e3b0c3709 100644
--- a/ydb/core/protos/blobstorage.proto
+++ b/ydb/core/protos/blobstorage.proto
@@ -1407,6 +1407,7 @@ message TEvNodeWardenGroupInfo {
message TEvControllerProposeConfigRequest {
optional uint64 ConfigHash = 1;
optional uint64 ConfigVersion = 2;
+ optional bool Distconf = 3; // is the request originated from distconf?
}
message TEvControllerProposeConfigResponse {
diff --git a/ydb/core/protos/blobstorage_distributed_config.proto b/ydb/core/protos/blobstorage_distributed_config.proto
index 6d3756306b5..a186d1b710a 100644
--- a/ydb/core/protos/blobstorage_distributed_config.proto
+++ b/ydb/core/protos/blobstorage_distributed_config.proto
@@ -230,6 +230,7 @@ message TEvNodeConfigInvokeOnRootResult {
NO_QUORUM = 1; // root node did not have quorum of following nodes
ERROR = 2; // failure during request execution
RACE = 3; // race in requests with other entities
+ CONTINUE_BSC = 4; // process this request with BSC
}
message TScepter {
@@ -247,6 +248,10 @@ message TEvNodeConfigInvokeOnRootResult {
optional string StorageYAML = 2;
}
+ message TReplaceStorageConfig {
+ optional bool AllowEnablingDistconf = 1;
+ }
+
EStatus Status = 1;
optional string ErrorReason = 2;
TScepter Scepter = 3;
@@ -256,6 +261,7 @@ message TEvNodeConfigInvokeOnRootResult {
oneof Response {
TQueryConfig QueryConfig = 5;
TFetchStorageConfig FetchStorageConfig = 10;
+ TReplaceStorageConfig ReplaceStorageConfig = 11;
}
}