diff options
| author | Alexander Rutkovsky <[email protected]> | 2025-03-11 18:15:20 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2025-03-11 18:15:20 +0300 |
| commit | 033f7f056c4a1de2b2f53e078a05dc28e7ae4a73 (patch) | |
| tree | 4772e91515bf39877f53ae3a9d07a6409130f316 | |
| parent | e6c8de41f78268c5dbdf6ddf4c6731ae43687dbb (diff) | |
Correct distconf enabling sequence (#15578)
| -rw-r--r-- | ydb/core/blobstorage/base/blobstorage_console_events.h | 3 | ||||
| -rw-r--r-- | ydb/core/blobstorage/nodewarden/distconf.h | 9 | ||||
| -rw-r--r-- | ydb/core/blobstorage/nodewarden/distconf_console.cpp | 17 | ||||
| -rw-r--r-- | ydb/core/blobstorage/nodewarden/distconf_fsm.cpp | 50 | ||||
| -rw-r--r-- | ydb/core/blobstorage/nodewarden/distconf_invoke.cpp | 139 | ||||
| -rw-r--r-- | ydb/core/blobstorage/nodewarden/node_warden_impl.cpp | 2 | ||||
| -rw-r--r-- | ydb/core/cms/console/console_handshake.cpp | 33 | ||||
| -rw-r--r-- | ydb/core/cms/console/console_impl.h | 3 | ||||
| -rw-r--r-- | ydb/core/grpc_services/rpc_config_base.h | 16 | ||||
| -rw-r--r-- | ydb/core/protos/blobstorage.proto | 1 | ||||
| -rw-r--r-- | ydb/core/protos/blobstorage_distributed_config.proto | 6 |
11 files changed, 191 insertions, 88 deletions
diff --git a/ydb/core/blobstorage/base/blobstorage_console_events.h b/ydb/core/blobstorage/base/blobstorage_console_events.h index 47d3932fa2a..809c1059c0e 100644 --- a/ydb/core/blobstorage/base/blobstorage_console_events.h +++ b/ydb/core/blobstorage/base/blobstorage_console_events.h @@ -10,9 +10,10 @@ namespace NKikimr { NKikimrBlobStorage::TEvControllerProposeConfigRequest, EvControllerProposeConfigRequest> { TEvControllerProposeConfigRequest() = default; - TEvControllerProposeConfigRequest(ui64 configHash, ui64 configVersion) { + TEvControllerProposeConfigRequest(ui64 configHash, ui64 configVersion, bool distconf) { Record.SetConfigHash(configHash); Record.SetConfigVersion(configVersion); + Record.SetDistconf(distconf); } TString ToString() const override { diff --git a/ydb/core/blobstorage/nodewarden/distconf.h b/ydb/core/blobstorage/nodewarden/distconf.h index 6ed32afa36f..c9ce2ce5fec 100644 --- a/ydb/core/blobstorage/nodewarden/distconf.h +++ b/ydb/core/blobstorage/nodewarden/distconf.h @@ -351,8 +351,13 @@ namespace NKikimr::NStorage { bool HasQuorum() const; void ProcessCollectConfigs(TEvGather::TCollectConfigs *res); - using TProcessCollectConfigsResult = std::variant<std::monostate, TString, NKikimrBlobStorage::TStorageConfig>; - TProcessCollectConfigsResult ProcessCollectConfigs(TEvGather::TCollectConfigs *res, const TString *selfAssemblyUUID); + struct TProcessCollectConfigsResult { + std::variant<std::monostate, TString, NKikimrBlobStorage::TStorageConfig> Outcome; + bool IsDistconfDisabledQuorum = false; + }; + TProcessCollectConfigsResult ProcessCollectConfigs(TEvGather::TCollectConfigs *res, + std::optional<TStringBuf> selfAssemblyUUID); + std::optional<TString> ProcessProposeStorageConfig(TEvGather::TProposeStorageConfig *res); struct TExConfigError : yexception {}; diff --git a/ydb/core/blobstorage/nodewarden/distconf_console.cpp b/ydb/core/blobstorage/nodewarden/distconf_console.cpp index ee125a8a53b..a02ab98404d 100644 --- a/ydb/core/blobstorage/nodewarden/distconf_console.cpp +++ b/ydb/core/blobstorage/nodewarden/distconf_console.cpp @@ -35,8 +35,14 @@ namespace NKikimr::NStorage { return; // still waiting for previous one } + ProposeRequestInFlight = true; + if (!StorageConfig || !StorageConfig->HasConfigComposite()) { - return; // no config yet + // send empty proposition just to connect to console + auto ev = std::make_unique<TEvBlobStorage::TEvControllerProposeConfigRequest>(); + ev->Record.SetDistconf(true); + NTabletPipe::SendData(SelfId(), ConsolePipeId, ev.release(), ++ProposeRequestCookie); + return; } Y_ABORT_UNLESS(MainConfigYamlVersion); @@ -51,11 +57,14 @@ namespace NKikimr::NStorage { MainConfigFetchYamlHash, *MainConfigYamlVersion)); ProposedConfigHashVersion.emplace(MainConfigFetchYamlHash, *MainConfigYamlVersion); NTabletPipe::SendData(SelfId(), ConsolePipeId, new TEvBlobStorage::TEvControllerProposeConfigRequest( - MainConfigFetchYamlHash, *MainConfigYamlVersion), ++ProposeRequestCookie); - ProposeRequestInFlight = true; + MainConfigFetchYamlHash, *MainConfigYamlVersion, true), ++ProposeRequestCookie); } void TDistributedConfigKeeper::Handle(TEvBlobStorage::TEvControllerValidateConfigResponse::TPtr ev) { + STLOG(PRI_DEBUG, BS_NODE, NWDC10, "received TEvControllerValidateConfigResponse", + (Sender, ev->Sender), (Cookie, ev->Cookie), (Record, ev->Get()->Record), + (ConsoleConfigValidationQ.size, ConsoleConfigValidationQ.size())); + auto& q = ConsoleConfigValidationQ; auto pred = [&](const auto& item) { const auto& [actorId, yaml, cookie] = item; @@ -114,7 +123,7 @@ namespace NKikimr::NStorage { break; case NKikimrBlobStorage::TEvControllerProposeConfigResponse::ReverseCommit: - Y_DEBUG_ABORT(); + // just do nothing, we didn't have the config in distconf, possibly it is being enabled break; } } diff --git a/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp b/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp index 1400e22ab7d..4411d5aeca0 100644 --- a/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp +++ b/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp @@ -110,16 +110,17 @@ namespace NKikimr::NStorage { } void TDistributedConfigKeeper::ProcessCollectConfigs(TEvGather::TCollectConfigs *res) { - TOverloaded handler{ - [&](std::monostate&&) { + auto r = ProcessCollectConfigs(res, std::nullopt); + std::visit(TOverloaded{ + [&](std::monostate&) { STLOG(PRI_DEBUG, BS_NODE, NWDC61, "ProcessCollectConfigs: monostate"); RootState = ERootState::RELAX; }, - [&](TString&& error) { + [&](TString& error) { STLOG(PRI_DEBUG, BS_NODE, NWDC63, "ProcessCollectConfigs: error", (Error, error)); SwitchToError(error); }, - [&](NKikimrBlobStorage::TStorageConfig&& proposedConfig) { + [&](NKikimrBlobStorage::TStorageConfig& proposedConfig) { STLOG(PRI_DEBUG, BS_NODE, NWDC64, "ProcessCollectConfigs: proposed new config", (ProposedConfig, proposedConfig)); TEvScatter task; @@ -130,12 +131,11 @@ namespace NKikimr::NStorage { propose->MutableConfig()->Swap(&proposedConfig); IssueScatterTask(TActorId(), std::move(task)); } - }; - std::visit(handler, ProcessCollectConfigs(res, nullptr)); + }, r.Outcome); } TDistributedConfigKeeper::TProcessCollectConfigsResult TDistributedConfigKeeper::ProcessCollectConfigs( - TEvGather::TCollectConfigs *res, const TString *selfAssemblyUUID) { + TEvGather::TCollectConfigs *res, std::optional<TStringBuf> selfAssemblyUUID) { auto generateSuccessful = [&](auto&& callback) { for (const auto& item : res->GetNodes()) { for (const auto& node : item.GetNodeIds()) { @@ -171,10 +171,24 @@ namespace NKikimr::NStorage { if (nodeQuorum && !configQuorum) { // check if there is quorum of no-distconf config along the cluster + auto generateNodesWithoutDistconf = [&](auto&& callback) { + for (const auto& item : res->GetNodes()) { + if (item.GetBaseConfig().GetSelfManagementConfig().GetEnabled()) { + continue; + } + for (const auto& node : item.GetNodeIds()) { + callback(node); + } + } + }; + if (HasNodeQuorum(*StorageConfig, generateNodesWithoutDistconf)) { + // yes, distconf is disabled on the majority of the nodes, so we can't do anything about it + return {.IsDistconfDisabledQuorum = true}; + } } if (!nodeQuorum || !configQuorum) { - return "no quorum for CollectConfigs"; + return {"no quorum for CollectConfigs"}; } // TODO: validate self-assembly UUID @@ -223,7 +237,7 @@ namespace NKikimr::NStorage { (BaseConfigs.size, baseConfigs.size())); Y_DEBUG_ABORT("Multiple nonintersecting node sets have quorum of BaseConfig"); Halt(); - return "Multiple nonintersecting node sets have quorum of BaseConfig"; + return {"Multiple nonintersecting node sets have quorum of BaseConfig"}; } NKikimrBlobStorage::TStorageConfig *baseConfig = nullptr; for (auto& [meta, info] : baseConfigs) { @@ -289,13 +303,13 @@ namespace NKikimr::NStorage { (Generation, generation), (Configs, configs)); Y_DEBUG_ABORT("Multiple nonintersecting node sets have quorum of persistent config"); Halt(); - return "Multiple nonintersecting node sets have quorum of persistent config"; + return {"Multiple nonintersecting node sets have quorum of persistent config"}; } Y_ABORT_UNLESS(configs.size() == 1); persistedConfig = configs.front(); } if (maxSeenGeneration && (!persistedConfig || persistedConfig->GetGeneration() < maxSeenGeneration)) { - return "couldn't obtain quorum for configuration that was seen in effect"; + return {"couldn't obtain quorum for configuration that was seen in effect"}; } // let's try to find possibly proposed config, but without a quorum, and try to reconstruct it @@ -311,7 +325,7 @@ namespace NKikimr::NStorage { (PersistentConfig, *persistedConfig), (ProposedConfig, config)); Y_DEBUG_ABORT("persistently proposed config has too big generation"); Halt(); - return "persistently proposed config has too big generation"; + return {"persistently proposed config has too big generation"}; } } if (proposedConfig && (proposedConfig->GetGeneration() != config.GetGeneration() || @@ -361,11 +375,11 @@ namespace NKikimr::NStorage { if (!CurrentSelfAssemblyUUID) { CurrentSelfAssemblyUUID.emplace(CreateGuidAsString()); } - selfAssemblyUUID = &CurrentSelfAssemblyUUID.value(); + selfAssemblyUUID.emplace(CurrentSelfAssemblyUUID.value()); } propositionBase.emplace(*baseConfig); - if (auto error = GenerateFirstConfig(baseConfig, *selfAssemblyUUID)) { - return *error; + if (auto error = GenerateFirstConfig(baseConfig, TString(*selfAssemblyUUID))) { + return {*error}; } configToPropose = baseConfig; } @@ -392,12 +406,12 @@ namespace NKikimr::NStorage { if (error) { Y_DEBUG_ABORT("incorrect config proposition"); - return "incorrect config proposition"; + return {"incorrect config proposition"}; } if (propositionBase) { if (auto error = ValidateConfig(*propositionBase)) { - return TStringBuilder() << "failed to propose configuration, base config contains errors: " << *error; + return {TStringBuilder() << "failed to propose configuration, base config contains errors: " << *error}; } if (auto error = ValidateConfigUpdate(*propositionBase, *configToPropose)) { Y_FAIL_S("incorrect config proposed: " << *error); @@ -408,7 +422,7 @@ namespace NKikimr::NStorage { } } - return std::move(*configToPropose); + return {std::move(*configToPropose)}; } return {}; diff --git a/ydb/core/blobstorage/nodewarden/distconf_invoke.cpp b/ydb/core/blobstorage/nodewarden/distconf_invoke.cpp index 4c0eca0139c..c5c63449b85 100644 --- a/ydb/core/blobstorage/nodewarden/distconf_invoke.cpp +++ b/ydb/core/blobstorage/nodewarden/distconf_invoke.cpp @@ -30,6 +30,7 @@ namespace NKikimr::NStorage { NKikimrBlobStorage::TStorageConfig ProposedStorageConfig; std::optional<TString> NewYaml; + std::optional<TString> VersionError; public: TInvokeRequestHandlerActor(TDistributedConfigKeeper *self, std::unique_ptr<TEventHandle<TEvNodeConfigInvokeOnRoot>>&& ev) @@ -707,6 +708,9 @@ namespace NKikimr::NStorage { newExpectedStorageYamlVersion.emplace(config.GetExpectedStorageYamlVersion()); } + std::optional<ui64> storageYamlVersion; + std::optional<ui64> mainYamlVersion; + try { auto load = [&](const TString& yaml, ui64& version, const char *expectedKind) { state = TStringBuilder() << "loading " << expectedKind << " YAML"; @@ -736,29 +740,13 @@ namespace NKikimr::NStorage { const NJson::TJsonValue *effective = nullptr; if (newStorageYaml) { - ui64 version = 0; - storage = load(*newStorageYaml, version, "StorageConfig"); - if (const ui64 expected = Self->StorageConfig->GetExpectedStorageYamlVersion(); version != expected) { - return FinishWithError(TResult::ERROR, TStringBuilder() - << "storage config version must be increasing by one" - << " new version# " << version - << " expected version# " << expected); - } - - newExpectedStorageYamlVersion = version + 1; + storage = load(*newStorageYaml, storageYamlVersion.emplace(), "StorageConfig"); + newExpectedStorageYamlVersion = *storageYamlVersion + 1; effective = &storage; } if (NewYaml) { - ui64 version = 0; - main = load(*NewYaml, version, "MainConfig"); - if (const ui64 expected = *Self->MainConfigYamlVersion + 1; version != expected) { - return FinishWithError(TResult::ERROR, TStringBuilder() - << "main config version must be increasing by one" - << " new version# " << version - << " expected version# " << expected); - } - + main = load(*NewYaml, mainYamlVersion.emplace(), "MainConfig"); if (!effective && !Self->StorageConfigYaml) { effective = &main; } @@ -780,6 +768,29 @@ namespace NKikimr::NStorage { << ": " << ex.what()); } + if (storageYamlVersion) { + const ui64 expected = Self->StorageConfig->GetExpectedStorageYamlVersion(); + if (*storageYamlVersion != expected) { + VersionError = TStringBuilder() + << "storage config version must be increasing by one" + << " new version# " << *storageYamlVersion + << " expected version# " << expected; + } + } + + if (!VersionError && mainYamlVersion && Self->MainConfigYamlVersion) { + // TODO(alexvru): we have to check version when migrating to self-managed mode + const ui64 expected = Self->MainConfigYamlVersion + ? *Self->MainConfigYamlVersion + 1 + : 0; + if (*mainYamlVersion != expected) { + VersionError = TStringBuilder() + << "main config version must be increasing by one" + << " new version# " << *mainYamlVersion + << " expected version# " << expected; + } + } + if (NewYaml) { if (const auto& error = UpdateConfigComposite(config, *NewYaml, std::nullopt)) { return FinishWithError(TResult::ERROR, TStringBuilder() << "failed to update config yaml: " << *error); @@ -813,22 +824,73 @@ namespace NKikimr::NStorage { << "ReplaceStorageConfig config validation failed: " << *error); } - if (request.GetSkipConsoleValidation() || !NewYaml) { - return StartProposition(&config); - } + // whether we are enabling distconf right now (by this operation) + ProposedStorageConfig = std::move(config); - // whether we are enabling distconf right now - const bool enablingDistconf = Self->BaseConfig.GetSelfManagementConfig().GetEnabled() && - !Self->SelfManagementEnabled && - config.GetSelfManagementConfig().GetEnabled(); + if (!Self->SelfManagementEnabled && ProposedStorageConfig.GetSelfManagementConfig().GetEnabled()) { + TryEnableDistconf(); + } else { + IssueQueryToConsole(false); + } + } - if (!Self->EnqueueConsoleConfigValidation(SelfId(), enablingDistconf, *NewYaml)) { + void IssueQueryToConsole(bool enablingDistconf) { + if (VersionError) { + FinishWithError(TResult::ERROR, *VersionError); + } else if (Event->Get()->Record.GetReplaceStorageConfig().GetSkipConsoleValidation() || !NewYaml) { + StartProposition(&ProposedStorageConfig); + } else if (!Self->EnqueueConsoleConfigValidation(SelfId(), enablingDistconf, *NewYaml)) { FinishWithError(TResult::ERROR, "console pipe is not available"); - } else { - ProposedStorageConfig = std::move(config); } } + void TryEnableDistconf() { + const ERootState prevState = std::exchange(Self->RootState, ERootState::IN_PROGRESS); + Y_ABORT_UNLESS(prevState == ERootState::RELAX); + + TEvScatter task; + task.MutableCollectConfigs(); + IssueScatterTask(std::move(task), [this](TEvGather *res) -> std::optional<TString> { + Y_ABORT_UNLESS(Self->StorageConfig); // it can't just disappear + + const ERootState prevState = std::exchange(Self->RootState, ERootState::RELAX); + Y_ABORT_UNLESS(prevState == ERootState::IN_PROGRESS); + + if (!res->HasCollectConfigs()) { + return "incorrect CollectConfigs response"; + } else if (Self->CurrentProposedStorageConfig) { + FinishWithError(TResult::RACE, "config proposition request in flight"); + } else if (Scepter.expired()) { + return "scepter lost during query execution"; + } else { + auto r = Self->ProcessCollectConfigs(res->MutableCollectConfigs(), std::nullopt); + return std::visit<std::optional<TString>>(TOverloaded{ + [&](std::monostate&) -> std::optional<TString> { + if (r.IsDistconfDisabledQuorum) { + // distconf is disabled on the majority of nodes; we have just to replace configs + // and then to restart these nodes in order to enable it in future + auto ev = PrepareResult(TResult::CONTINUE_BSC, "proceed with BSC"); + ev->Record.MutableReplaceStorageConfig()->SetAllowEnablingDistconf(true); + Finish(Sender, SelfId(), ev.release(), 0, Cookie); + } else { + // we can actually enable distconf with this query, so do it + IssueQueryToConsole(true); + } + return std::nullopt; + }, + [&](TString& error) { + return std::move(error); + }, + [&](NKikimrBlobStorage::TStorageConfig& /*proposedConfig*/) { + return "unexpected config proposition"; + } + }, r.Outcome); + } + + return std::nullopt; // no error or it is already processed + }); + } + void Handle(TEvBlobStorage::TEvControllerValidateConfigResponse::TPtr ev) { const auto& record = ev->Get()->Record; STLOG(PRI_DEBUG, BS_NODE, NWDC77, "received TEvControllerValidateConfigResponse", (SelfId, SelfId()), @@ -843,7 +905,7 @@ namespace NKikimr::NStorage { case NKikimrBlobStorage::TEvControllerValidateConfigResponse::IdPipeServerMismatch: Self->DisconnectFromConsole(); Self->ConnectToConsole(); - return FinishWithError(TResult::ERROR, TStringBuilder() << "console connection race detected"); + return FinishWithError(TResult::ERROR, TStringBuilder() << "console connection race detected: " << record.GetErrorReason()); case NKikimrBlobStorage::TEvControllerValidateConfigResponse::ConfigNotValid: return FinishWithError(TResult::ERROR, TStringBuilder() << "console config validation failed: " @@ -884,25 +946,24 @@ namespace NKikimr::NStorage { FinishWithError(TResult::RACE, "storage config generation regenerated while collecting configs"); return std::nullopt; } - TOverloaded handler{ - [&](std::monostate&&) { + auto r = Self->ProcessCollectConfigs(res->MutableCollectConfigs(), selfAssemblyUUID); + return std::visit<std::optional<TString>>(TOverloaded{ + [&](std::monostate&) { const ERootState prevState = std::exchange(Self->RootState, ERootState::RELAX); Y_ABORT_UNLESS(prevState == ERootState::IN_PROGRESS); Finish(Sender, SelfId(), PrepareResult(TResult::OK, std::nullopt).release(), 0, Cookie); return std::nullopt; }, - [&](TString&& error) { + [&](TString& error) { const ERootState prevState = std::exchange(Self->RootState, ERootState::RELAX); Y_ABORT_UNLESS(prevState == ERootState::IN_PROGRESS); return error; }, - [&](NKikimrBlobStorage::TStorageConfig&& proposedConfig) { + [&](NKikimrBlobStorage::TStorageConfig& proposedConfig) { StartProposition(&proposedConfig, false); return std::nullopt; } - }; - return std::visit<std::optional<TString>>(handler, - Self->ProcessCollectConfigs(res->MutableCollectConfigs(), &selfAssemblyUUID)); + }, r.Outcome); }; TEvScatter task; @@ -980,12 +1041,12 @@ namespace NKikimr::NStorage { } std::unique_ptr<TEvNodeConfigInvokeOnRootResult> PrepareResult(TResult::EStatus status, - std::optional<std::reference_wrapper<const TString>> errorReason) { + std::optional<TStringBuf> errorReason) { auto ev = std::make_unique<TEvNodeConfigInvokeOnRootResult>(); auto *record = &ev->Record; record->SetStatus(status); if (errorReason) { - record->SetErrorReason(*errorReason); + record->SetErrorReason(errorReason->data(), errorReason->size()); } if (auto scepter = Scepter.lock()) { auto *s = record->MutableScepter(); diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp index 0bb0982eed5..6fe9683e99e 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp @@ -705,7 +705,7 @@ void TNodeWarden::PersistConfig(std::optional<TString> mainYaml, ui64 mainYamlVe } if (!NFs::Rename(tempPath, configPath)) { - STLOG(PRI_ERROR, BS_NODE, NW92, "Failed to rename temporary file", (Error, LastSystemErrorText())); + STLOG(PRI_ERROR, BS_NODE, NW53, "Failed to rename temporary file", (Error, LastSystemErrorText())); success = false; return false; } diff --git a/ydb/core/cms/console/console_handshake.cpp b/ydb/core/cms/console/console_handshake.cpp index 0f31f2c140b..c486ff1b405 100644 --- a/ydb/core/cms/console/console_handshake.cpp +++ b/ydb/core/cms/console/console_handshake.cpp @@ -97,15 +97,22 @@ private: }; template <typename TRequestEvent, typename TResponse> -bool TConfigsManager::CheckSession(TEventHandle<TRequestEvent>& ev, std::unique_ptr<TResponse>& failEvent, typename TResponse::ProtoRecordType::EStatus status) { - if (Self.CurrentSenderId != ev.Sender) { - failEvent->Record.SetStatus(status); - SendInReply(ev.Sender, ev.InterconnectSession, std::move(failEvent)); - return false; - } else if (Self.CurrentPipeServerId != ev.Recipient) { - return false; +bool TConfigsManager::CheckSession(TEventHandle<TRequestEvent>& ev, std::unique_ptr<TResponse>& failEvent, + typename TResponse::ProtoRecordType::EStatus status) { + for (const auto& [senderId, pipeServerId] : Self.ConfigClients) { + if (senderId == ev.Sender && pipeServerId == ev.Recipient) { // same sender and pipe + return true; + } else if (senderId == ev.Sender) { // pipe differs, same sender (obsolete pipe) + return false; + } else if (pipeServerId == ev.Recipient) { // different sender, same pipe? + Y_DEBUG_ABORT(); + return false; + } } - return true; + // no matching pair found, obsolete event + failEvent->Record.SetStatus(status); + SendInReply(ev.Sender, ev.InterconnectSession, std::move(failEvent), ev.Cookie); + return false; } void TConfigsManager::Handle(TEvBlobStorage::TEvControllerProposeConfigRequest::TPtr &ev, const TActorContext &ctx) { @@ -113,12 +120,14 @@ void TConfigsManager::Handle(TEvBlobStorage::TEvControllerProposeConfigRequest:: const auto& proposedConfigHash = record.GetConfigHash(); const auto& proposedConfigVersion = record.GetConfigVersion(); ui64 currentConfigHash = NKikimr::NYaml::GetConfigHash(MainYamlConfig); - if (Self.CurrentSenderId != ev->Sender) { - NTabletPipe::CloseServer(Self.SelfId(), Self.CurrentPipeServerId); + + auto& [senderId, pipeServerId] = Self.ConfigClients[ev->Get()->Record.GetDistconf()]; + if (pipeServerId != ev->Recipient) { + NTabletPipe::CloseServer(Self.SelfId(), pipeServerId); } - Self.CurrentSenderId = ev->Sender; - Self.CurrentPipeServerId = ev->Recipient; + senderId = ev->Sender; + pipeServerId = ev->Recipient; auto response = std::make_unique<TEvBlobStorage::TEvControllerProposeConfigResponse>(); auto& responseRecord = response->Record; diff --git a/ydb/core/cms/console/console_impl.h b/ydb/core/cms/console/console_impl.h index 1f1a47afbcb..8deb77a5210 100644 --- a/ydb/core/cms/console/console_impl.h +++ b/ydb/core/cms/console/console_impl.h @@ -196,8 +196,7 @@ private: TActorId NetClassifierUpdaterId; // For handshake with BSController/distconf - TActorId CurrentSenderId; - TActorId CurrentPipeServerId; + std::array<std::tuple<TActorId, TActorId>, 2> ConfigClients; // distconf -> SenderId, PipeServerId }; } // namespace NKikimr::NConsole diff --git a/ydb/core/grpc_services/rpc_config_base.h b/ydb/core/grpc_services/rpc_config_base.h index 20f858d0508..f65cfeed6cc 100644 --- a/ydb/core/grpc_services/rpc_config_base.h +++ b/ydb/core/grpc_services/rpc_config_base.h @@ -183,21 +183,22 @@ protected: void Handle(TEvNodeWardenStorageConfig::TPtr ev) { auto *self = Self(); - if (ev->Get()->Config->GetGeneration() || self->IsDistconfEnableQuery()) { // distconf (will be) enabled + if (ev->Get()->SelfManagementEnabled || self->IsDistconfEnableQuery()) { // distconf (will be) enabled auto ev = std::make_unique<NStorage::TEvNodeConfigInvokeOnRoot>(); self->FillDistconfQuery(*ev); self->Send(MakeBlobStorageNodeWardenID(self->SelfId().NodeId()), ev.release()); } else { // classic BSC - BSCTabletId = MakeBSControllerID(); CreatePipe(); - SendGetInterfaceVersion(); } } void Handle(NStorage::TEvNodeConfigInvokeOnRootResult::TPtr ev) { auto *self = Self(); auto& record = ev->Get()->Record; - if (record.GetStatus() != NKikimrBlobStorage::TEvNodeConfigInvokeOnRootResult::OK) { + if (auto status = record.GetStatus(); status == NKikimrBlobStorage::TEvNodeConfigInvokeOnRootResult::CONTINUE_BSC) { + // continue with BSC + CreatePipe(); + } else if (status != NKikimrBlobStorage::TEvNodeConfigInvokeOnRootResult::OK) { self->Reply(Ydb::StatusIds::INTERNAL_ERROR, record.GetErrorReason(), NKikimrIssues::TIssuesIds::DEFAULT_ERROR, self->ActorContext()); } else { @@ -217,15 +218,13 @@ protected: void CreatePipe() { auto *self = Self(); - BSCPipeClient = self->Register(NTabletPipe::CreateClient(self->SelfId(), BSCTabletId, GetPipeConfig())); - } + BSCPipeClient = self->Register(NTabletPipe::CreateClient(self->SelfId(), MakeBSControllerID(), GetPipeConfig())); - void SendGetInterfaceVersion() { auto req = std::make_unique<TEvBlobStorage::TEvControllerConfigRequest>(); auto& record = req->Record; auto *request = record.MutableRequest(); request->AddCommand()->MutableGetInterfaceVersion(); - NTabletPipe::SendData(Self()->SelfId(), BSCPipeClient, req.release(), 0, TBase::Span_.GetTraceId()); + NTabletPipe::SendData(self->SelfId(), BSCPipeClient, req.release(), 0, TBase::Span_.GetTraceId()); State = EState::GET_INTERFACE_VERSION; } @@ -397,7 +396,6 @@ private: }; EState State = EState::UNKNOWN; - ui64 BSCTabletId = 0; TActorId BSCPipeClient; ui32 InterfaceVersion = 0; }; diff --git a/ydb/core/protos/blobstorage.proto b/ydb/core/protos/blobstorage.proto index df0815c5712..70e3b0c3709 100644 --- a/ydb/core/protos/blobstorage.proto +++ b/ydb/core/protos/blobstorage.proto @@ -1407,6 +1407,7 @@ message TEvNodeWardenGroupInfo { message TEvControllerProposeConfigRequest { optional uint64 ConfigHash = 1; optional uint64 ConfigVersion = 2; + optional bool Distconf = 3; // is the request originated from distconf? } message TEvControllerProposeConfigResponse { diff --git a/ydb/core/protos/blobstorage_distributed_config.proto b/ydb/core/protos/blobstorage_distributed_config.proto index 6d3756306b5..a186d1b710a 100644 --- a/ydb/core/protos/blobstorage_distributed_config.proto +++ b/ydb/core/protos/blobstorage_distributed_config.proto @@ -230,6 +230,7 @@ message TEvNodeConfigInvokeOnRootResult { NO_QUORUM = 1; // root node did not have quorum of following nodes ERROR = 2; // failure during request execution RACE = 3; // race in requests with other entities + CONTINUE_BSC = 4; // process this request with BSC } message TScepter { @@ -247,6 +248,10 @@ message TEvNodeConfigInvokeOnRootResult { optional string StorageYAML = 2; } + message TReplaceStorageConfig { + optional bool AllowEnablingDistconf = 1; + } + EStatus Status = 1; optional string ErrorReason = 2; TScepter Scepter = 3; @@ -256,6 +261,7 @@ message TEvNodeConfigInvokeOnRootResult { oneof Response { TQueryConfig QueryConfig = 5; TFetchStorageConfig FetchStorageConfig = 10; + TReplaceStorageConfig ReplaceStorageConfig = 11; } } |
