diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-01-09 14:01:31 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-01-09 14:01:31 +0300 |
commit | d7316a25ddae54e96b1db61144a2ab8c7255cfe6 (patch) | |
tree | cd8cb4bbcceaea7f32a86eae728cc018532b56f2 | |
parent | c11c17bd122fa8e9d4fb1bf61352caa0be47235f (diff) | |
download | ydb-d7316a25ddae54e96b1db61144a2ab8c7255cfe6.tar.gz |
decomposite metadata service
reuse snapshots parsing method
initialization bg tasks through metadata service
68 files changed, 809 insertions, 481 deletions
diff --git a/ydb/core/persqueue/writer/metadata_initializers.cpp b/ydb/core/persqueue/writer/metadata_initializers.cpp index 651242956d8..563565014a4 100644 --- a/ydb/core/persqueue/writer/metadata_initializers.cpp +++ b/ydb/core/persqueue/writer/metadata_initializers.cpp @@ -62,7 +62,7 @@ void TSrcIdMetaInitializer::DoPrepare(NInitializer::IInitializerInput::TPtr cont result.emplace_back(new NInitializer::TGenericTableModifier<NRequest::TDialogCreateTable>(request, "create")); } result.emplace_back(NInitializer::TACLModifierConstructor::GetReadOnlyModifier(tablePath, "acl")); - controller->PreparationFinished(result); + controller->OnPreparationFinished(result); } diff --git a/ydb/core/persqueue/writer/metadata_initializers.h b/ydb/core/persqueue/writer/metadata_initializers.h index aa78cf6c48b..76799fe1628 100644 --- a/ydb/core/persqueue/writer/metadata_initializers.h +++ b/ydb/core/persqueue/writer/metadata_initializers.h @@ -31,10 +31,10 @@ protected: TInitBehaviourPtr ConstructInitializer() const override { return TSrcIdMetaInitializer::GetInstant(); } - std::shared_ptr<NMetadata::NModifications::IOperationsManager> ConstructOperationsManager() const override { +public: + std::shared_ptr<NMetadata::NModifications::IOperationsManager> GetOperationsManager() const override { return nullptr; } -public: static TClassBehaviourPtr GetInstant() { static TClassBehaviourPtr res{new TSrcIdMetaInitManager()}; return res; diff --git a/ydb/core/tx/tiering/rule/behaviour.h b/ydb/core/tx/tiering/rule/behaviour.h index b7e29d1ce24..c10f2f24d73 100644 --- a/ydb/core/tx/tiering/rule/behaviour.h +++ b/ydb/core/tx/tiering/rule/behaviour.h @@ -1,11 +1,11 @@ #pragma once -#include "object.h" +#include "object.h" #include <ydb/services/metadata/abstract/kqp_common.h> namespace NKikimr::NColumnShard::NTiers { -class TTieringRuleBehaviour: public NMetadata::IClassBehaviour { +class TTieringRuleBehaviour: public NMetadata::TClassBehaviour<TTieringRule> { private: static TFactory::TRegistrator<TTieringRuleBehaviour> Registrator; protected: diff --git a/ydb/core/tx/tiering/rule/checker.cpp b/ydb/core/tx/tiering/rule/checker.cpp index 4b22d14d0e1..9d6cb899567 100644 --- a/ydb/core/tx/tiering/rule/checker.cpp +++ b/ydb/core/tx/tiering/rule/checker.cpp @@ -16,7 +16,7 @@ void TRulePreparationActor::StartChecker() { } auto g = PassAwayGuard(); if (!SSCheckResult->GetContent().GetOperationAllow()) { - Controller->PreparationProblem(SSCheckResult->GetContent().GetDenyReason()); + Controller->OnPreparationProblem(SSCheckResult->GetContent().GetDenyReason()); return; } @@ -24,29 +24,29 @@ void TRulePreparationActor::StartChecker() { for (auto&& interval : tiering.GetIntervals()) { auto tier = Tierings->GetTierById(interval.GetTierName()); if (!tier) { - Controller->PreparationProblem("unknown tier usage: " + interval.GetTierName()); + Controller->OnPreparationProblem("unknown tier usage: " + interval.GetTierName()); return; } else if (!Secrets->CheckSecretAccess(tier->GetProtoConfig().GetObjectStorage().GetAccessKey(), Context.GetUserToken())) { - Controller->PreparationProblem("no access for secret: " + tier->GetProtoConfig().GetObjectStorage().GetAccessKey()); + Controller->OnPreparationProblem("no access for secret: " + tier->GetProtoConfig().GetObjectStorage().GetAccessKey()); return; } else if (!Secrets->CheckSecretAccess(tier->GetProtoConfig().GetObjectStorage().GetSecretKey(), Context.GetUserToken())) { - Controller->PreparationProblem("no access for secret: " + tier->GetProtoConfig().GetObjectStorage().GetSecretKey()); + Controller->OnPreparationProblem("no access for secret: " + tier->GetProtoConfig().GetObjectStorage().GetSecretKey()); return; } } } - Controller->PreparationFinished(std::move(Objects)); + Controller->OnPreparationFinished(std::move(Objects)); } void TRulePreparationActor::Handle(NSchemeShard::TEvSchemeShard::TEvProcessingResponse::TPtr& ev) { auto& proto = ev->Get()->Record; if (proto.HasError()) { - Controller->PreparationProblem(proto.GetError().GetErrorMessage()); + Controller->OnPreparationProblem(proto.GetError().GetErrorMessage()); PassAway(); } else if (proto.HasContent()) { SSCheckResult = SSFetcher->UnpackResult(ev->Get()->Record.GetContent().GetData()); if (!SSCheckResult) { - Controller->PreparationProblem("cannot unpack ss-fetcher result for class " + SSFetcher->GetClassName()); + Controller->OnPreparationProblem("cannot unpack ss-fetcher result for class " + SSFetcher->GetClassName()); PassAway(); } else { StartChecker(); diff --git a/ydb/core/tx/tiering/rule/initializer.cpp b/ydb/core/tx/tiering/rule/initializer.cpp index a14b70ce26a..b2f07c8ae45 100644 --- a/ydb/core/tx/tiering/rule/initializer.cpp +++ b/ydb/core/tx/tiering/rule/initializer.cpp @@ -35,7 +35,7 @@ TVector<NKikimr::NMetadata::NInitializer::ITableModifier::TPtr> TTierRulesInitia } void TTierRulesInitializer::DoPrepare(NMetadata::NInitializer::IInitializerInput::TPtr controller) const { - controller->PreparationFinished(BuildModifiers()); + controller->OnPreparationFinished(BuildModifiers()); } } diff --git a/ydb/core/tx/tiering/snapshot.cpp b/ydb/core/tx/tiering/snapshot.cpp index e33421e5a5c..6034aefc492 100644 --- a/ydb/core/tx/tiering/snapshot.cpp +++ b/ydb/core/tx/tiering/snapshot.cpp @@ -11,35 +11,8 @@ namespace NKikimr::NColumnShard::NTiers { bool TConfigsSnapshot::DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawDataResult) { Y_VERIFY(rawDataResult.result_sets().size() == 2); - { - auto& rawData = rawDataResult.result_sets()[0]; - TTierConfig::TDecoder decoder(rawData); - for (auto&& r : rawData.rows()) { - TTierConfig config; - if (!config.DeserializeFromRecord(decoder, r)) { - ALS_ERROR(NKikimrServices::TX_TIERING) << "cannot parse tier config from snapshot"; - continue; - } - TierConfigs.emplace(config.GetTierName(), config); - } - } - { - auto& rawData = rawDataResult.result_sets()[1]; - TTieringRule::TDecoder decoder(rawData); - TVector<TTieringRule> rulesLocal; - rulesLocal.reserve(rawData.rows().size()); - for (auto&& r : rawData.rows()) { - TTieringRule tr; - if (!tr.DeserializeFromRecord(decoder, r)) { - ALS_WARN(NKikimrServices::TX_TIERING) << "cannot parse record for tiering info"; - continue; - } - rulesLocal.emplace_back(std::move(tr)); - } - for (auto&& i : rulesLocal) { - TableTierings.emplace(i.GetTieringRuleId(), std::move(i)); - } - } + ParseSnapshotObjects<TTierConfig>(rawDataResult.result_sets()[0], [this](TTierConfig&& s) {TierConfigs.emplace(s.GetTierName(), s); }); + ParseSnapshotObjects<TTieringRule>(rawDataResult.result_sets()[1], [this](TTieringRule&& s) {TableTierings.emplace(s.GetTieringRuleId(), s); }); return true; } diff --git a/ydb/core/tx/tiering/tier/behaviour.h b/ydb/core/tx/tiering/tier/behaviour.h index ba51a201dcb..fd231708fff 100644 --- a/ydb/core/tx/tiering/tier/behaviour.h +++ b/ydb/core/tx/tiering/tier/behaviour.h @@ -1,10 +1,11 @@ #pragma once +#include "object.h" #include <ydb/services/metadata/abstract/kqp_common.h> namespace NKikimr::NColumnShard::NTiers { -class TTierConfigBehaviour: public NMetadata::IClassBehaviour { +class TTierConfigBehaviour: public NMetadata::TClassBehaviour<TTierConfig> { private: static TFactory::TRegistrator<TTierConfigBehaviour> Registrator; protected: diff --git a/ydb/core/tx/tiering/tier/checker.cpp b/ydb/core/tx/tiering/tier/checker.cpp index 3f0156d0934..46e24eeb229 100644 --- a/ydb/core/tx/tiering/tier/checker.cpp +++ b/ydb/core/tx/tiering/tier/checker.cpp @@ -12,7 +12,7 @@ void TTierPreparationActor::StartChecker() { } auto g = PassAwayGuard(); if (!SSCheckResult->GetContent().GetOperationAllow()) { - Controller->PreparationProblem(SSCheckResult->GetContent().GetDenyReason()); + Controller->OnPreparationProblem(SSCheckResult->GetContent().GetDenyReason()); return; } for (auto&& tier : Objects) { @@ -27,30 +27,30 @@ void TTierPreparationActor::StartChecker() { } } if (tieringsWithTiers.size()) { - Controller->PreparationProblem("tier in usage for tierings: " + JoinSeq(", ", tieringsWithTiers)); + Controller->OnPreparationProblem("tier in usage for tierings: " + JoinSeq(", ", tieringsWithTiers)); return; } } if (!Secrets->CheckSecretAccess(tier.GetProtoConfig().GetObjectStorage().GetAccessKey(), Context.GetUserToken())) { - Controller->PreparationProblem("no access for secret: " + tier.GetProtoConfig().GetObjectStorage().GetAccessKey()); + Controller->OnPreparationProblem("no access for secret: " + tier.GetProtoConfig().GetObjectStorage().GetAccessKey()); return; } else if (!Secrets->CheckSecretAccess(tier.GetProtoConfig().GetObjectStorage().GetSecretKey(), Context.GetUserToken())) { - Controller->PreparationProblem("no access for secret: " + tier.GetProtoConfig().GetObjectStorage().GetSecretKey()); + Controller->OnPreparationProblem("no access for secret: " + tier.GetProtoConfig().GetObjectStorage().GetSecretKey()); return; } } - Controller->PreparationFinished(std::move(Objects)); + Controller->OnPreparationFinished(std::move(Objects)); } void TTierPreparationActor::Handle(NSchemeShard::TEvSchemeShard::TEvProcessingResponse::TPtr& ev) { auto& proto = ev->Get()->Record; if (proto.HasError()) { - Controller->PreparationProblem(proto.GetError().GetErrorMessage()); + Controller->OnPreparationProblem(proto.GetError().GetErrorMessage()); PassAway(); } else if (proto.HasContent()) { SSCheckResult = SSFetcher->UnpackResult(ev->Get()->Record.GetContent().GetData()); if (!SSCheckResult) { - Controller->PreparationProblem("cannot unpack ss-fetcher result for class " + SSFetcher->GetClassName()); + Controller->OnPreparationProblem("cannot unpack ss-fetcher result for class " + SSFetcher->GetClassName()); PassAway(); } else { StartChecker(); diff --git a/ydb/core/tx/tiering/tier/initializer.cpp b/ydb/core/tx/tiering/tier/initializer.cpp index 11d1cc0e2f9..cbd9599b0fa 100644 --- a/ydb/core/tx/tiering/tier/initializer.cpp +++ b/ydb/core/tx/tiering/tier/initializer.cpp @@ -30,7 +30,7 @@ TVector<NKikimr::NMetadata::NInitializer::ITableModifier::TPtr> TTiersInitialize } void TTiersInitializer::DoPrepare(NMetadata::NInitializer::IInitializerInput::TPtr controller) const { - controller->PreparationFinished(BuildModifiers()); + controller->OnPreparationFinished(BuildModifiers()); } } diff --git a/ydb/core/tx/tiering/ut/ut_tiers.cpp b/ydb/core/tx/tiering/ut/ut_tiers.cpp index 9c3d133ba8d..6376bbec211 100644 --- a/ydb/core/tx/tiering/ut/ut_tiers.cpp +++ b/ydb/core/tx/tiering/ut/ut_tiers.cpp @@ -222,11 +222,11 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { private: YDB_READONLY_FLAG(Finished, false); public: - virtual void AlterProblem(const TString& errorMessage) override { + virtual void OnAlteringProblem(const TString& errorMessage) override { Cerr << errorMessage << Endl; Y_VERIFY(false); } - virtual void AlterFinished() override { + virtual void OnAlteringFinished() override { FinishedFlag = true; } }; diff --git a/ydb/services/bg_tasks/ds_table/CMakeLists.darwin.txt b/ydb/services/bg_tasks/ds_table/CMakeLists.darwin.txt index 573f38f4356..a689fcd4138 100644 --- a/ydb/services/bg_tasks/ds_table/CMakeLists.darwin.txt +++ b/ydb/services/bg_tasks/ds_table/CMakeLists.darwin.txt @@ -20,6 +20,7 @@ target_link_libraries(services-bg_tasks-ds_table PUBLIC services-metadata-request ) target_sources(services-bg_tasks-ds_table PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/behaviour.cpp ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/executor.cpp ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/interrupt.cpp ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/task_executor_controller.cpp diff --git a/ydb/services/bg_tasks/ds_table/CMakeLists.linux-aarch64.txt b/ydb/services/bg_tasks/ds_table/CMakeLists.linux-aarch64.txt index be43e3e6080..6dd0b379cc2 100644 --- a/ydb/services/bg_tasks/ds_table/CMakeLists.linux-aarch64.txt +++ b/ydb/services/bg_tasks/ds_table/CMakeLists.linux-aarch64.txt @@ -21,6 +21,7 @@ target_link_libraries(services-bg_tasks-ds_table PUBLIC services-metadata-request ) target_sources(services-bg_tasks-ds_table PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/behaviour.cpp ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/executor.cpp ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/interrupt.cpp ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/task_executor_controller.cpp diff --git a/ydb/services/bg_tasks/ds_table/CMakeLists.linux.txt b/ydb/services/bg_tasks/ds_table/CMakeLists.linux.txt index be43e3e6080..6dd0b379cc2 100644 --- a/ydb/services/bg_tasks/ds_table/CMakeLists.linux.txt +++ b/ydb/services/bg_tasks/ds_table/CMakeLists.linux.txt @@ -21,6 +21,7 @@ target_link_libraries(services-bg_tasks-ds_table PUBLIC services-metadata-request ) target_sources(services-bg_tasks-ds_table PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/behaviour.cpp ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/executor.cpp ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/interrupt.cpp ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/task_executor_controller.cpp diff --git a/ydb/services/bg_tasks/ds_table/assign_tasks.cpp b/ydb/services/bg_tasks/ds_table/assign_tasks.cpp index 2b8fbc490f4..4bff7f55e08 100644 --- a/ydb/services/bg_tasks/ds_table/assign_tasks.cpp +++ b/ydb/services/bg_tasks/ds_table/assign_tasks.cpp @@ -39,7 +39,7 @@ std::optional<NMetadata::NRequest::TDialogYQLRequest::TRequest> TAssignTasksActo } void TAssignTasksActor::OnResult(const NMetadata::NRequest::TDialogYQLRequest::TResponse& /*result*/) { - Controller->AssignFinished(); + Controller->OnAssignFinished(); } } diff --git a/ydb/services/bg_tasks/ds_table/behaviour.cpp b/ydb/services/bg_tasks/ds_table/behaviour.cpp new file mode 100644 index 00000000000..f8fa956ad5a --- /dev/null +++ b/ydb/services/bg_tasks/ds_table/behaviour.cpp @@ -0,0 +1,14 @@ +#include "behaviour.h" +#include "initialization.h" + +namespace NKikimr::NBackgroundTasks { + +std::shared_ptr<NMetadata::NInitializer::IInitializationBehaviour> TBehaviour::ConstructInitializer() const { + return std::make_shared<TBGTasksInitializer>(Config); +} + +std::shared_ptr<NMetadata::NModifications::IOperationsManager> TBehaviour::GetOperationsManager() const { + return nullptr; +} + +} diff --git a/ydb/services/bg_tasks/ds_table/behaviour.h b/ydb/services/bg_tasks/ds_table/behaviour.h new file mode 100644 index 00000000000..60cff56c298 --- /dev/null +++ b/ydb/services/bg_tasks/ds_table/behaviour.h @@ -0,0 +1,27 @@ +#pragma once +#include "config.h" +#include <ydb/services/metadata/abstract/kqp_common.h> +#include <ydb/services/metadata/abstract/initialization.h> +#include <ydb/services/metadata/manager/abstract.h> + +namespace NKikimr::NBackgroundTasks { + +class TBehaviour: public NMetadata::IClassBehaviour { +private: + const TConfig Config; +public: + virtual TString GetInternalStorageTablePath() const override { + return Config.GetTablePath(); + } + virtual std::shared_ptr<NMetadata::NInitializer::IInitializationBehaviour> ConstructInitializer() const override; + virtual std::shared_ptr<NMetadata::NModifications::IOperationsManager> GetOperationsManager() const override; + TBehaviour(const TConfig& config) + : Config(config) { + + } + virtual TString GetTypeId() const override { + return "bg_tasks"; + } +}; + +} diff --git a/ydb/services/bg_tasks/ds_table/executor.cpp b/ydb/services/bg_tasks/ds_table/executor.cpp index 48b8d3edd41..ba56f99197c 100644 --- a/ydb/services/bg_tasks/ds_table/executor.cpp +++ b/ydb/services/bg_tasks/ds_table/executor.cpp @@ -59,36 +59,44 @@ void TExecutor::Handle(TEvTaskExecutorFinished::TPtr& ev) { } void TExecutor::Handle(TEvAddTask::TPtr& ev) { - ALS_DEBUG(NKikimrServices::BG_TASKS) << "add task"; - Register(new TAddTasksActor(InternalController, ev->Get()->GetTask(), ev->Sender)); + if (CheckActivity()) { + ALS_DEBUG(NKikimrServices::BG_TASKS) << "add task"; + Register(new TAddTasksActor(InternalController, ev->Get()->GetTask(), ev->Sender)); + } else { + DeferredEventsOnIntialization.Add(*ev); + } } void TExecutor::Handle(TEvUpdateTaskEnabled::TPtr& ev) { - ALS_DEBUG(NKikimrServices::BG_TASKS) << "start task"; - Register(new TUpdateTaskEnabledActor(InternalController, ev->Get()->GetTaskId(), ev->Get()->GetEnabled(), ev->Sender)); + if (CheckActivity()) { + ALS_DEBUG(NKikimrServices::BG_TASKS) << "start task"; + Register(new TUpdateTaskEnabledActor(InternalController, ev->Get()->GetTaskId(), ev->Get()->GetEnabled(), ev->Sender)); + } else { + DeferredEventsOnIntialization.Add(*ev); + } } -void TExecutor::Handle(NMetadata::NInitializer::TEvInitializationFinished::TPtr& /*ev*/) { +void TExecutor::Handle(NMetadata::NProvider::TEvManagerPrepared::TPtr& /*ev*/) { + ActivityState = EActivity::Active; Sender<TEvStartAssign>().SendTo(SelfId()); Schedule(Config.GetPingPeriod(), new TEvLockPingerStart); + DeferredEventsOnIntialization.ResendAll(SelfId()); } void TExecutor::Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev) { auto snapshot = ev->Get()->GetValidatedSnapshotAs<NMetadata::NInitializer::TSnapshot>(); - auto b = std::make_shared<TBGTasksInitializer>(Config); - Register(new NMetadata::NInitializer::TDSAccessorInitialized(Config.GetRequestConfig(), "bg_tasks", b, InternalController, snapshot)); + Y_VERIFY(snapshot, "incorrect initialization snapshot from metadata service"); + if (snapshot->HasComponent("bg_tasks")) { + CheckActivity(); + } } void TExecutor::Bootstrap() { InternalController = std::make_shared<TExecutorController>(SelfId(), Config); Become(&TExecutor::StateMain); auto manager = std::make_shared<NMetadata::NInitializer::TFetcher>(); - if (NMetadata::NProvider::TServiceOperator::IsEnabled()) { - Sender<NMetadata::NProvider::TEvSubscribeExternal>(manager).SendTo(NMetadata::NProvider::MakeServiceId(SelfId().NodeId())); - } else { - auto b = std::make_shared<TBGTasksInitializer>(Config); - Register(new NMetadata::NInitializer::TDSAccessorInitialized(Config.GetRequestConfig(), "bg_tasks", b, InternalController, nullptr)); - } + Y_VERIFY(NMetadata::NProvider::TServiceOperator::IsEnabled(), "metadata service not active"); + Sender<NMetadata::NProvider::TEvSubscribeExternal>(manager).SendTo(NMetadata::NProvider::MakeServiceId(SelfId().NodeId())); } NActors::IActor* CreateService(const TConfig& config) { diff --git a/ydb/services/bg_tasks/ds_table/executor.h b/ydb/services/bg_tasks/ds_table/executor.h index e0c1a7aeaac..7fa99c2a149 100644 --- a/ydb/services/bg_tasks/ds_table/executor.h +++ b/ydb/services/bg_tasks/ds_table/executor.h @@ -1,4 +1,5 @@ #pragma once +#include "behaviour.h" #include "config.h" #include "executor_controller.h" @@ -6,6 +7,8 @@ #include <ydb/services/bg_tasks/abstract/task.h> #include <ydb/services/bg_tasks/service.h> #include <ydb/services/metadata/initializer/accessor_init.h> +#include <ydb/services/metadata/ds_table/service.h> +#include <ydb/services/metadata/service.h> namespace NKikimr::NBackgroundTasks { @@ -59,8 +62,34 @@ private: const TConfig Config; std::set<TString> CurrentTaskIds; TExecutorController::TPtr InternalController; + NMetadata::NProvider::TEventsWaiter DeferredEventsOnIntialization; + + std::shared_ptr<TBehaviour> Behaviour; + + enum class EActivity { + Created, + Preparation, + Active + }; + + EActivity ActivityState = EActivity::Created; + + bool CheckActivity() { + switch (ActivityState) { + case EActivity::Created: + ActivityState = EActivity::Preparation; + Sender<NMetadata::NProvider::TEvPrepareManager>(Behaviour).SendTo(NMetadata::NProvider::MakeServiceId(SelfId().NodeId())); + break; + case EActivity::Preparation: + break; + case EActivity::Active: + return true; + } + return false; + } + protected: - void Handle(NMetadata::NInitializer::TEvInitializationFinished::TPtr& ev); + void Handle(NMetadata::NProvider::TEvManagerPrepared::TPtr& ev); void Handle(TEvStartAssign::TPtr& ev); void Handle(TEvAssignFinished::TPtr& ev); void Handle(TEvFetchingFinished::TPtr& ev); @@ -74,7 +103,7 @@ protected: STATEFN(StateMain) { switch (ev->GetTypeRewrite()) { - hFunc(NMetadata::NInitializer::TEvInitializationFinished, Handle); + hFunc(NMetadata::NProvider::TEvManagerPrepared, Handle); hFunc(TEvStartAssign, Handle); hFunc(TEvAssignFinished, Handle); hFunc(TEvFetchingFinished, Handle); @@ -94,7 +123,9 @@ public: TExecutor(const TConfig& config) : Config(config) + , Behaviour(std::make_shared<TBehaviour>(Config)) { + TServiceOperator::Register(); } }; diff --git a/ydb/services/bg_tasks/ds_table/executor_controller.cpp b/ydb/services/bg_tasks/ds_table/executor_controller.cpp index 03b87684771..05e888de043 100644 --- a/ydb/services/bg_tasks/ds_table/executor_controller.cpp +++ b/ydb/services/bg_tasks/ds_table/executor_controller.cpp @@ -7,27 +7,27 @@ namespace NKikimr::NBackgroundTasks { -void TExecutorController::AssignFinished() const { +void TExecutorController::OnAssignFinished() const { ExecutorActorId.Send(ExecutorActorId, new TEvAssignFinished()); } -void TExecutorController::FetchingFinished() const { +void TExecutorController::OnFetchingFinished() const { ExecutorActorId.Send(ExecutorActorId, new TEvFetchingFinished()); } -void TExecutorController::TaskFetched(const TTask& task) const { +void TExecutorController::OnTaskFetched(const TTask& task) const { ExecutorActorId.Send(ExecutorActorId, new TEvTaskFetched(task)); } -void TExecutorController::TaskFinished(const TString& taskId) const { +void TExecutorController::OnTaskFinished(const TString& taskId) const { ExecutorActorId.Send(ExecutorActorId, new TEvTaskExecutorFinished(taskId)); } -void TExecutorController::InitializationFinished(const TString& id) const { +void TExecutorController::OnInitializationFinished(const TString& id) const { ExecutorActorId.Send(ExecutorActorId, new NMetadata::NInitializer::TEvInitializationFinished(id)); } -void TExecutorController::LockPingerFinished() const { +void TExecutorController::OnLockPingerFinished() const { ExecutorActorId.Send(ExecutorActorId, new TEvLockPingerFinished()); } diff --git a/ydb/services/bg_tasks/ds_table/executor_controller.h b/ydb/services/bg_tasks/ds_table/executor_controller.h index e0f779cb9d2..69b27ef1252 100644 --- a/ydb/services/bg_tasks/ds_table/executor_controller.h +++ b/ydb/services/bg_tasks/ds_table/executor_controller.h @@ -38,12 +38,12 @@ public: return Config.GetRequestConfig(); } - virtual void InitializationFinished(const TString& id) const override; - void LockPingerFinished() const; - void TaskFetched(const TTask& task) const; - void TaskFinished(const TString& taskId) const; - void AssignFinished() const; - void FetchingFinished() const; + virtual void OnInitializationFinished(const TString& id) const override; + void OnLockPingerFinished() const; + void OnTaskFetched(const TTask& task) const; + void OnTaskFinished(const TString& taskId) const; + void OnAssignFinished() const; + void OnFetchingFinished() const; }; } diff --git a/ydb/services/bg_tasks/ds_table/fetch_tasks.cpp b/ydb/services/bg_tasks/ds_table/fetch_tasks.cpp index 80fb325a218..b254c836c85 100644 --- a/ydb/services/bg_tasks/ds_table/fetch_tasks.cpp +++ b/ydb/services/bg_tasks/ds_table/fetch_tasks.cpp @@ -16,9 +16,9 @@ void TFetchTasksActor::OnResult(const NMetadata::NRequest::TDialogYQLRequest::TR ALS_ERROR(NKikimrServices::BG_TASKS) << "cannot parse task record"; continue; } - Controller->TaskFetched(task); + Controller->OnTaskFetched(task); } - Controller->FetchingFinished(); + Controller->OnFetchingFinished(); } std::optional<NMetadata::NRequest::TDialogYQLRequest::TRequest> TFetchTasksActor::OnSessionId(const TString& sessionId) { diff --git a/ydb/services/bg_tasks/ds_table/finish_task.cpp b/ydb/services/bg_tasks/ds_table/finish_task.cpp index d311515f9cf..b5dbed8204b 100644 --- a/ydb/services/bg_tasks/ds_table/finish_task.cpp +++ b/ydb/services/bg_tasks/ds_table/finish_task.cpp @@ -21,7 +21,7 @@ std::optional<NMetadata::NRequest::TDialogYQLRequest::TRequest> TDropTaskActor:: } void TDropTaskActor::OnResult(const NMetadata::NRequest::TDialogYQLRequest::TResponse& /*result*/) { - Controller->TaskFinished(TaskId); + Controller->OnTaskFinished(TaskId); } } diff --git a/ydb/services/bg_tasks/ds_table/initialization.cpp b/ydb/services/bg_tasks/ds_table/initialization.cpp index 818b1ee5185..1951e6a5c2c 100644 --- a/ydb/services/bg_tasks/ds_table/initialization.cpp +++ b/ydb/services/bg_tasks/ds_table/initialization.cpp @@ -63,7 +63,7 @@ void TBGTasksInitializer::DoPrepare(NMetadata::NInitializer::IInitializerInput:: } result.emplace_back(new NMetadata::NInitializer::TGenericTableModifier<NMetadata::NRequest::TDialogCreateTable>(request, "create")); } - controller->PreparationFinished(result); + controller->OnPreparationFinished(result); } } diff --git a/ydb/services/bg_tasks/ds_table/interrupt.cpp b/ydb/services/bg_tasks/ds_table/interrupt.cpp index 07be29788db..5c22e3872ff 100644 --- a/ydb/services/bg_tasks/ds_table/interrupt.cpp +++ b/ydb/services/bg_tasks/ds_table/interrupt.cpp @@ -38,7 +38,7 @@ std::optional<NMetadata::NRequest::TDialogYQLRequest::TRequest> TInterruptTaskAc } void TInterruptTaskActor::OnResult(const NMetadata::NRequest::TDialogYQLRequest::TResponse& /*result*/) { - ExecutorController->TaskFinished(TaskId); + ExecutorController->OnTaskFinished(TaskId); } } diff --git a/ydb/services/bg_tasks/ds_table/lock_pinger.cpp b/ydb/services/bg_tasks/ds_table/lock_pinger.cpp index e746115b60e..4b8f67a597e 100644 --- a/ydb/services/bg_tasks/ds_table/lock_pinger.cpp +++ b/ydb/services/bg_tasks/ds_table/lock_pinger.cpp @@ -36,7 +36,7 @@ std::optional<NMetadata::NRequest::TDialogYQLRequest::TRequest> TLockPingerActor } void TLockPingerActor::OnResult(const NMetadata::NRequest::TDialogYQLRequest::TResponse& /*ev*/) { - ExecutorController->LockPingerFinished(); + ExecutorController->OnLockPingerFinished(); } } diff --git a/ydb/services/bg_tasks/ut/ut_tasks.cpp b/ydb/services/bg_tasks/ut/ut_tasks.cpp index 5e603467fb1..36e02d125c9 100644 --- a/ydb/services/bg_tasks/ut/ut_tasks.cpp +++ b/ydb/services/bg_tasks/ut/ut_tasks.cpp @@ -23,7 +23,7 @@ Y_UNIT_TEST_SUITE(BGTaskTests) { serverSettings.GrpcPort = grpcPort; serverSettings.SetDomainName("Root") .SetUseRealThreads(false) - .SetEnableMetadataProvider(false) + .SetEnableMetadataProvider(true) .SetEnableBackgroundTasks(true) .SetEnableOlapSchemaOperations(true); ; diff --git a/ydb/services/metadata/abstract/common.h b/ydb/services/metadata/abstract/common.h index 90998755b06..fb52622b8c3 100644 --- a/ydb/services/metadata/abstract/common.h +++ b/ydb/services/metadata/abstract/common.h @@ -37,6 +37,7 @@ enum EEvents { EvPathExistsCheckFailed, EvPathExistsCheckResult, EvStartMetadataService, + EvStartRegistration, EvEnd }; diff --git a/ydb/services/metadata/abstract/fetcher.h b/ydb/services/metadata/abstract/fetcher.h index dca8a0ab0ff..062259c2d59 100644 --- a/ydb/services/metadata/abstract/fetcher.h +++ b/ydb/services/metadata/abstract/fetcher.h @@ -14,6 +14,7 @@ #include <ydb/services/metadata/manager/common.h> #include <ydb/services/metadata/manager/table_record.h> #include <ydb/services/metadata/manager/alter.h> +#include <util/system/type_name.h> namespace NKikimr::NMetadata::NFetcher { @@ -33,6 +34,24 @@ private: protected: virtual bool DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawData) = 0; virtual TString DoSerializeToString() const = 0; + + template <class TObject, class TActor> + bool ParseSnapshotObjects(const Ydb::ResultSet& rawData, const TActor& actor, const bool stopOnIncorrectDeserialization = false) { + typename TObject::TDecoder decoder(rawData); + for (auto&& r : rawData.rows()) { + TObject object; + if (!object.DeserializeFromRecord(decoder, r)) { + ALS_WARN(NKikimrServices::METADATA_PROVIDER) << "cannot parse object: " << TypeName<TObject>(); + if (stopOnIncorrectDeserialization) { + return false; + } else { + continue; + } + } + actor(std::move(object)); + } + return true; + } public: using TPtr = std::shared_ptr<ISnapshot>; ISnapshot(const TInstant actuality) diff --git a/ydb/services/metadata/abstract/kqp_common.cpp b/ydb/services/metadata/abstract/kqp_common.cpp index 6dbaac450b5..bd192a9c7c1 100644 --- a/ydb/services/metadata/abstract/kqp_common.cpp +++ b/ydb/services/metadata/abstract/kqp_common.cpp @@ -26,13 +26,6 @@ NInitializer::IInitializationBehaviour::TPtr IClassBehaviour::GetInitializer() c return Initializer; } -NModifications::IOperationsManager::TPtr IClassBehaviour::GetOperationsManager() const { - if (!OperationsManager) { - OperationsManager = ConstructOperationsManager(); - } - return OperationsManager; -} - TString IClassBehaviour::GetInternalStorageHistoryTablePath() const { return GetInternalStorageTablePath() + "_history"; } diff --git a/ydb/services/metadata/abstract/kqp_common.h b/ydb/services/metadata/abstract/kqp_common.h index f4501a5bf34..e1ed9140490 100644 --- a/ydb/services/metadata/abstract/kqp_common.h +++ b/ydb/services/metadata/abstract/kqp_common.h @@ -17,19 +17,30 @@ public: using TPtr = std::shared_ptr<IClassBehaviour>; private: mutable std::shared_ptr<NInitializer::IInitializationBehaviour> Initializer; - mutable std::shared_ptr<NModifications::IOperationsManager> OperationsManager; protected: + virtual std::shared_ptr<NInitializer::IInitializationBehaviour> ConstructInitializer() const = 0; virtual TString GetInternalStorageTablePath() const = 0; virtual TString GetInternalStorageHistoryTablePath() const; - virtual std::shared_ptr<NInitializer::IInitializationBehaviour> ConstructInitializer() const = 0; - virtual std::shared_ptr<NModifications::IOperationsManager> ConstructOperationsManager() const = 0; public: virtual ~IClassBehaviour() = default; TString GetStorageTablePath() const; TString GetStorageHistoryTablePath() const; std::shared_ptr<NInitializer::IInitializationBehaviour> GetInitializer() const; - std::shared_ptr<NModifications::IOperationsManager> GetOperationsManager() const; + virtual std::shared_ptr<NModifications::IOperationsManager> GetOperationsManager() const = 0; virtual TString GetTypeId() const = 0; }; + +template <class TObject> +class TClassBehaviour: public IClassBehaviour { +private: +protected: + virtual std::shared_ptr<NModifications::IOperationsManager> ConstructOperationsManager() const = 0; +public: + virtual std::shared_ptr<NModifications::IOperationsManager> GetOperationsManager() const override final { + static std::shared_ptr<NModifications::IOperationsManager> manager = ConstructOperationsManager(); + return manager; + } +}; + } diff --git a/ydb/services/metadata/ds_table/CMakeLists.darwin.txt b/ydb/services/metadata/ds_table/CMakeLists.darwin.txt index 34212fd99cc..e9eccb5be5c 100644 --- a/ydb/services/metadata/ds_table/CMakeLists.darwin.txt +++ b/ydb/services/metadata/ds_table/CMakeLists.darwin.txt @@ -25,8 +25,11 @@ target_sources(services-metadata-ds_table PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_subscribe.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_snapshot_simple.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_snapshot_base.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/behaviour_registrator_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/initializer_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/scheme_describe.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/table_exists.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/service.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/config.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/registration.cpp ) diff --git a/ydb/services/metadata/ds_table/CMakeLists.linux-aarch64.txt b/ydb/services/metadata/ds_table/CMakeLists.linux-aarch64.txt index 409457cc70b..0931f0b0c99 100644 --- a/ydb/services/metadata/ds_table/CMakeLists.linux-aarch64.txt +++ b/ydb/services/metadata/ds_table/CMakeLists.linux-aarch64.txt @@ -26,8 +26,11 @@ target_sources(services-metadata-ds_table PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_subscribe.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_snapshot_simple.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_snapshot_base.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/behaviour_registrator_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/initializer_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/scheme_describe.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/table_exists.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/service.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/config.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/registration.cpp ) diff --git a/ydb/services/metadata/ds_table/CMakeLists.linux.txt b/ydb/services/metadata/ds_table/CMakeLists.linux.txt index 409457cc70b..0931f0b0c99 100644 --- a/ydb/services/metadata/ds_table/CMakeLists.linux.txt +++ b/ydb/services/metadata/ds_table/CMakeLists.linux.txt @@ -26,8 +26,11 @@ target_sources(services-metadata-ds_table PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_subscribe.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_snapshot_simple.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_snapshot_base.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/behaviour_registrator_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/initializer_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/scheme_describe.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/table_exists.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/service.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/config.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/registration.cpp ) diff --git a/ydb/services/metadata/ds_table/behaviour_registrator_actor.cpp b/ydb/services/metadata/ds_table/behaviour_registrator_actor.cpp new file mode 100644 index 00000000000..4f1ef808ebe --- /dev/null +++ b/ydb/services/metadata/ds_table/behaviour_registrator_actor.cpp @@ -0,0 +1,68 @@ +#include "behaviour_registrator_actor.h" + +#include "accessor_snapshot_simple.h" +#include <ydb/services/metadata/initializer/accessor_init.h> + +namespace NKikimr::NMetadata::NProvider { + +class TBehaviourRegistrator::TInternalController: public NInitializer::IInitializerOutput, public ISchemeDescribeController { +private: + const NActors::TActorIdentity ActorId; +public: + TInternalController(const NActors::TActorIdentity& actorId) + : ActorId(actorId) { + + } + + virtual void OnInitializationFinished(const TString& id) const override { + ActorId.Send(ActorId, new NInitializer::TEvInitializationFinished(id)); + } + virtual void OnDescriptionFailed(const TString& errorMessage, const TString& requestId) const override { + ActorId.Send(ActorId, new TEvTableDescriptionFailed(errorMessage, requestId)); + } + virtual void OnDescriptionSuccess(THashMap<ui32, TSysTables::TTableColumnInfo>&& result, const TString& requestId) const override { + ActorId.Send(ActorId, new TEvTableDescriptionSuccess(std::move(result), requestId)); + } +}; + +void TBehaviourRegistrator::Handle(TEvTableDescriptionSuccess::TPtr& ev) { + const TString& initId = Behaviour->GetTypeId(); + Y_VERIFY(initId == ev->Get()->GetRequestId()); + auto it = RegistrationData->InRegistration.find(initId); + Y_VERIFY(it != RegistrationData->InRegistration.end()); + it->second->GetOperationsManager()->SetActualSchema(ev->Get()->GetSchema()); + RegistrationData->InitializationFinished(initId); +} + +void TBehaviourRegistrator::Handle(TEvTableDescriptionFailed::TPtr& ev) { + const TString& initId = Behaviour->GetTypeId(); + Y_VERIFY(initId == ev->Get()->GetRequestId()); + ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "metadata service cannot receive table description for " << initId << Endl; + Schedule(TDuration::Seconds(1), new TEvStartRegistration()); +} + +void TBehaviourRegistrator::Handle(TEvStartRegistration::TPtr& /*ev*/) { + Register(new NInitializer::TDSAccessorInitialized(ReqConfig, + Behaviour->GetTypeId(), Behaviour->GetInitializer(), InternalController, RegistrationData->GetInitializationSnapshot())); +} + +void TBehaviourRegistrator::Handle(NInitializer::TEvInitializationFinished::TPtr& ev) { + const TString& initId = Behaviour->GetTypeId(); + Y_VERIFY(initId == ev->Get()->GetInitializationId()); + + auto it = RegistrationData->InRegistration.find(initId); + Y_VERIFY(it != RegistrationData->InRegistration.end()); + if (it->second->GetOperationsManager()) { + Register(new TSchemeDescriptionActor(InternalController, initId, it->second->GetStorageTablePath())); + } else { + RegistrationData->InitializationFinished(initId); + } +} + +void TBehaviourRegistrator::Bootstrap() { + InternalController = std::make_shared<TInternalController>(SelfId()); + TBase::Become(&TBehaviourRegistrator::StateMain); + TBase::Sender<TEvStartRegistration>().SendTo(SelfId()); +} + +} diff --git a/ydb/services/metadata/ds_table/behaviour_registrator_actor.h b/ydb/services/metadata/ds_table/behaviour_registrator_actor.h new file mode 100644 index 00000000000..ae98bda201d --- /dev/null +++ b/ydb/services/metadata/ds_table/behaviour_registrator_actor.h @@ -0,0 +1,54 @@ +#pragma once +#include "scheme_describe.h" +#include "registration.h" + +#include <ydb/services/metadata/abstract/common.h> +#include <ydb/services/metadata/abstract/kqp_common.h> +#include <ydb/services/metadata/initializer/common.h> +#include <ydb/services/metadata/initializer/events.h> +#include <library/cpp/actors/core/actor_bootstrapped.h> + +namespace NKikimr::NMetadata::NProvider { + +class TEvStartRegistration: public TEventLocal<TEvStartRegistration, EEvents::EvStartRegistration> { +}; + +class TBehaviourRegistrator: public NActors::TActorBootstrapped<TBehaviourRegistrator> { +private: + using TBase = NActors::TActorBootstrapped<TBehaviourRegistrator>; + class TInternalController; + + IClassBehaviour::TPtr Behaviour; + std::shared_ptr<TRegistrationData> RegistrationData; + const NRequest::TConfig ReqConfig; + std::shared_ptr<TInternalController> InternalController; + + void Handle(TEvTableDescriptionSuccess::TPtr& ev); + void Handle(TEvTableDescriptionFailed::TPtr& ev); + void Handle(TEvStartRegistration::TPtr& ev); + void Handle(NInitializer::TEvInitializationFinished::TPtr& ev); +public: + TBehaviourRegistrator(IClassBehaviour::TPtr b, std::shared_ptr<TRegistrationData> registrationData, const NRequest::TConfig& reqConfig) + : Behaviour(b) + , RegistrationData(registrationData) + , ReqConfig(reqConfig) { + + } + + void Bootstrap(); + + STATEFN(StateMain) { + switch (ev->GetTypeRewrite()) { + + hFunc(TEvTableDescriptionSuccess, Handle); + hFunc(TEvTableDescriptionFailed, Handle); + + hFunc(NInitializer::TEvInitializationFinished, Handle); + hFunc(TEvStartRegistration, Handle); + default: + Y_VERIFY(false); + } + } +}; + +} diff --git a/ydb/services/metadata/ds_table/initializer_actor.cpp b/ydb/services/metadata/ds_table/initializer_actor.cpp new file mode 100644 index 00000000000..18f121b6559 --- /dev/null +++ b/ydb/services/metadata/ds_table/initializer_actor.cpp @@ -0,0 +1,29 @@ +#include "initializer_actor.h" + +namespace NKikimr::NMetadata::NProvider { + +void TInitializerSnapshotReader::Handle(TDSAccessorSimple::TEvController::TEvResult::TPtr& ev) { + RegistrationData->StartInitialization(); + RegistrationData->SetInitializationSnapshot(ev->Get()->GetResult()); +} + +void TInitializerSnapshotReader::Handle(TEvStartMetadataService::TPtr& /*ev*/) { + Register(new TDSAccessorSimple(ReqConfig, InternalController, RegistrationData->GetInitializationFetcher())); +} + +void TInitializerSnapshotReader::Handle(TDSAccessorSimple::TEvController::TEvError::TPtr& ev) { + ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "cannot receive initializer snapshot: " << ev->Get()->GetErrorMessage() << Endl; + Schedule(TDuration::Seconds(1), new TEvStartMetadataService()); +} + +void TInitializerSnapshotReader::Handle(TDSAccessorSimple::TEvController::TEvTableAbsent::TPtr& /*ev*/) { + RegistrationData->NoInitializationSnapshot(); +} + +void TInitializerSnapshotReader::Bootstrap() { + InternalController = std::make_shared<TInitializerSnapshotReaderController>(SelfId()); + Become(&TInitializerSnapshotReader::StateMain); + Sender<TEvStartMetadataService>().SendTo(SelfId()); +} + +} diff --git a/ydb/services/metadata/ds_table/initializer_actor.h b/ydb/services/metadata/ds_table/initializer_actor.h new file mode 100644 index 00000000000..8181c9df566 --- /dev/null +++ b/ydb/services/metadata/ds_table/initializer_actor.h @@ -0,0 +1,59 @@ +#pragma once +#include "accessor_snapshot_simple.h" +#include "registration.h" + +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <ydb/services/metadata/request/config.h> + +namespace NKikimr::NMetadata::NProvider { + +class TEvStartMetadataService: public TEventLocal<TEvStartMetadataService, EEvents::EvStartMetadataService> { +}; + +class TInitializerSnapshotReaderController: public TDSAccessorSimple::TEvController { +private: + const NActors::TActorIdentity ActorId; +public: + TInitializerSnapshotReaderController(const NActors::TActorIdentity& actorId) + : TDSAccessorSimple::TEvController(actorId) + , ActorId(actorId) { + + } +}; + +class TInitializerSnapshotReader: public NActors::TActorBootstrapped<TInitializerSnapshotReader> { +private: + std::shared_ptr<TRegistrationData> RegistrationData; + const NRequest::TConfig ReqConfig; + std::shared_ptr<TInitializerSnapshotReaderController> InternalController; + + void Handle(TDSAccessorSimple::TEvController::TEvResult::TPtr& ev); + void Handle(TEvStartMetadataService::TPtr& ev); + void Handle(TDSAccessorSimple::TEvController::TEvError::TPtr& ev); + void Handle(TDSAccessorSimple::TEvController::TEvTableAbsent::TPtr& ev); + +public: + TInitializerSnapshotReader(std::shared_ptr<TRegistrationData> registrationData, const NRequest::TConfig& reqConfig) + : RegistrationData(registrationData) + , ReqConfig(reqConfig) { + + } + + void Bootstrap(); + + STATEFN(StateMain) { + switch (ev->GetTypeRewrite()) { + hFunc(TDSAccessorSimple::TEvController::TEvResult, Handle); + hFunc(TDSAccessorSimple::TEvController::TEvError, Handle); + hFunc(TDSAccessorSimple::TEvController::TEvTableAbsent, Handle); + + hFunc(TEvStartMetadataService, Handle); + default: + Y_VERIFY(false); + } + } + +}; + +} diff --git a/ydb/services/metadata/ds_table/registration.cpp b/ydb/services/metadata/ds_table/registration.cpp new file mode 100644 index 00000000000..6912b125a69 --- /dev/null +++ b/ydb/services/metadata/ds_table/registration.cpp @@ -0,0 +1,97 @@ +#include "registration.h" + +namespace NKikimr::NMetadata::NProvider { + +bool TBehavioursId::operator<(const TBehavioursId& item) const { + if (BehaviourIds.size() < item.BehaviourIds.size()) { + return true; + } else if (BehaviourIds.size() > item.BehaviourIds.size()) { + return false; + } else { + auto itSelf = BehaviourIds.begin(); + auto itItem = item.BehaviourIds.begin(); + while (itSelf != BehaviourIds.end()) { + if (*itSelf < *itItem) { + return true; + } + ++itSelf; + ++itItem; + } + return false; + } +} + +bool TBehavioursId::RemoveId(const TString& id) { + auto it = BehaviourIds.find(id); + if (it == BehaviourIds.end()) { + return false; + } + BehaviourIds.erase(it); + return true; +} + + +void TEventsCollector::Initialized(const TString& initId) { + std::map<TBehavioursId, TEventsWaiter> movedEvents; + for (auto it = Events.begin(); it != Events.end(); ) { + auto m = it->first; + if (!m.RemoveId(initId)) { + ++it; + continue; + } + if (m.IsEmpty()) { + it->second.ResendAll(OwnerId); + } else { + auto itNext = Events.find(m); + if (itNext == Events.end()) { + movedEvents.emplace(m, std::move(it->second)); + } else { + itNext->second.Merge(std::move(it->second)); + } + } + it = Events.erase(it); + } + for (auto&& i : movedEvents) { + Events.emplace(i.first, std::move(i.second)); + } +} + + +void TRegistrationData::InitializationFinished(const TString& initId) { + auto it = InRegistration.find(initId); + Y_VERIFY(it != InRegistration.end()); + + Registered.emplace(initId, it->second); + InRegistration.erase(it); + EventsWaiting->Initialized(initId); +} + +void TRegistrationData::SetInitializationSnapshot(NFetcher::ISnapshot::TPtr s) { + InitializationSnapshot = dynamic_pointer_cast<NInitializer::TSnapshot>(s); + Y_VERIFY(InitializationSnapshot); + if (Stage == EStage::WaitInitializerInfo) { + Stage = EStage::Active; + EventsWaiting->TryResendOne(); + } else if (Stage == EStage::Active) { + + } else if (Stage == EStage::Created) { + Y_VERIFY(false, "incorrect stage for method usage"); + } +} + +void TRegistrationData::StartInitialization() { + Y_VERIFY(Stage == EStage::Created); + Stage = EStage::WaitInitializerInfo; + EventsWaiting->GetOwnerId().Send(EventsWaiting->GetOwnerId(), new TEvSubscribeExternal(InitializationFetcher)); +} + +TRegistrationData::TRegistrationData() { + InitializationFetcher = std::make_shared<NInitializer::TFetcher>(); +} + +void TRegistrationData::NoInitializationSnapshot() { + InitializationSnapshot = std::make_shared<NInitializer::TSnapshot>(TInstant::Zero()); + EventsWaiting->TryResendOne(); +} + +} diff --git a/ydb/services/metadata/ds_table/registration.h b/ydb/services/metadata/ds_table/registration.h new file mode 100644 index 00000000000..60f63ae2b04 --- /dev/null +++ b/ydb/services/metadata/ds_table/registration.h @@ -0,0 +1,143 @@ +#pragma once +#include <ydb/services/metadata/abstract/kqp_common.h> +#include <ydb/services/metadata/initializer/fetcher.h> +#include <ydb/services/metadata/initializer/snapshot.h> + +namespace NKikimr::NMetadata::NProvider { + +class TBehavioursId { +private: + YDB_READONLY_DEF(std::set<TString>, BehaviourIds); +public: + TBehavioursId(const std::vector<IClassBehaviour::TPtr>& managers) { + for (auto&& i : managers) { + BehaviourIds.emplace(i->GetTypeId()); + } + } + + bool IsEmpty() const { + return BehaviourIds.empty(); + } + + bool RemoveId(const TString& id); + + bool operator<(const TBehavioursId& item) const; +}; + +class TWaitEvent { +private: + TAutoPtr<IEventBase> Event; + const TActorId Sender; +public: + TWaitEvent(TAutoPtr<IEventBase> ev, const TActorId& sender) + : Event(ev) + , Sender(sender) { + + } + + void Resend(const TActorIdentity& receiver) { + TActivationContext::Send(new IEventHandle(receiver, Sender, Event.Release())); + } +}; + +class TEventsWaiter { +private: + std::deque<TWaitEvent> Events; +public: + void Add(TAutoPtr<IEventBase> ev, const TActorId& sender) { + if (Events.size() > 10000) { + ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "too many events for deferred sending (maybe service cannot start)"; + return; + } + Events.emplace_back(ev, sender); + } + + template <class T> + void Add(TEventHandle<T>& ev) { + Add(ev.ReleaseBase(), ev.Sender); + } + + bool ResendOne(const TActorIdentity& receiver) { + if (Events.empty()) { + return false; + } + Events.front().Resend(receiver); + Events.pop_front(); + return true; + } + + void ResendAll(const TActorIdentity& receiver) { + while (ResendOne(receiver)) { + } + } + + bool IsEmpty() const { + return Events.empty(); + } + + void Merge(TEventsWaiter&& source) { + Merge(std::move(source.Events)); + } + + void Merge(std::deque<TWaitEvent>&& source) { + for (auto&& i : source) { + Events.emplace_back(std::move(i)); + } + } +}; + +class TEventsCollector { +private: + const TActorIdentity OwnerId; + std::map<TBehavioursId, TEventsWaiter> Events; +public: + TEventsCollector(const TActorIdentity& ownerId) + : OwnerId(ownerId) { + + } + + const TActorIdentity& GetOwnerId() const { + return OwnerId; + } + + void Add(const TBehavioursId& id, TAutoPtr<IEventBase> ev, const TActorId& sender) { + Events[id].Add(ev, sender); + } + + void TryResendOne() { + for (auto&& i : Events) { + i.second.ResendOne(OwnerId); + } + } + + void Initialized(const TString& initId); +}; + +class TRegistrationData { +public: + enum class EStage { + Created, + WaitInitializerInfo, + Active + }; +private: + YDB_READONLY(EStage, Stage, EStage::Created); + YDB_READONLY_DEF(std::shared_ptr<NInitializer::TSnapshot>, InitializationSnapshot); + YDB_READONLY_DEF(std::shared_ptr<NInitializer::TFetcher>, InitializationFetcher); +public: + TRegistrationData(); + + void StartInitialization(); + + std::map<TString, IClassBehaviour::TPtr> InRegistration; + std::map<TString, IClassBehaviour::TPtr> Registered; + std::shared_ptr<TEventsCollector> EventsWaiting; + + void SetInitializationSnapshot(NFetcher::ISnapshot::TPtr s); + void NoInitializationSnapshot(); + + void InitializationFinished(const TString& initId); + +}; + +} diff --git a/ydb/services/metadata/ds_table/scheme_describe.h b/ydb/services/metadata/ds_table/scheme_describe.h index d8c8a87ef7f..3a57dc4d84d 100644 --- a/ydb/services/metadata/ds_table/scheme_describe.h +++ b/ydb/services/metadata/ds_table/scheme_describe.h @@ -13,6 +13,34 @@ public: virtual void OnDescriptionSuccess(THashMap<ui32, TSysTables::TTableColumnInfo>&& result, const TString& requestId) const = 0; }; +class TEvTableDescriptionFailed: public TEventLocal<TEvTableDescriptionFailed, EEvents::EvTableDescriptionFailed> { +private: + YDB_READONLY_DEF(TString, ErrorMessage); + YDB_READONLY_DEF(TString, RequestId); +public: + explicit TEvTableDescriptionFailed(const TString& errorMessage, const TString& reqId) + : ErrorMessage(errorMessage) + , RequestId(reqId) { + + } +}; + +class TEvTableDescriptionSuccess: public TEventLocal<TEvTableDescriptionSuccess, EEvents::EvTableDescriptionSuccess> { +private: + using TDescription = THashMap<ui32, TSysTables::TTableColumnInfo>; + YDB_READONLY_DEF(TString, RequestId); + YDB_READONLY_DEF(TDescription, Description); +public: + TEvTableDescriptionSuccess(TDescription&& description, const TString& reqId) + : RequestId(reqId) + , Description(std::move(description)) { + } + + NModifications::TTableSchema GetSchema() const { + return NModifications::TTableSchema(Description); + } +}; + class TSchemeDescriptionActor: public NActors::TActorBootstrapped<TSchemeDescriptionActor> { private: using TBase = NActors::TActorBootstrapped<TSchemeDescriptionActor>; diff --git a/ydb/services/metadata/ds_table/service.cpp b/ydb/services/metadata/ds_table/service.cpp index f327a4cd841..8530111f4b0 100644 --- a/ydb/services/metadata/ds_table/service.cpp +++ b/ydb/services/metadata/ds_table/service.cpp @@ -1,6 +1,8 @@ #include "service.h" #include "accessor_subscribe.h" +#include "behaviour_registrator_actor.h" +#include "initializer_actor.h" #include <ydb/core/base/appdata.h> #include <ydb/core/grpc_services/local_rpc/local_rpc.h> @@ -15,29 +17,34 @@ IActor* CreateService(const TConfig& config) { } void TService::PrepareManagers(std::vector<IClassBehaviour::TPtr> managers, TAutoPtr<IEventBase> ev, const NActors::TActorId& sender) { - TManagersId id(managers); - if (InitializationSnapshot) { + TBehavioursId id(managers); + if (RegistrationData->GetInitializationSnapshot()) { const bool isInitialization = (managers.size() == 1) && managers.front()->GetTypeId() == NInitializer::TDBInitialization::GetTypeId(); - if (ActiveFlag || (PreparationFlag && isInitialization)) { - for (auto&& manager : managers) { - Y_VERIFY(!RegisteredManagers.contains(manager->GetTypeId())); - if (!ManagersInRegistration.contains(manager->GetTypeId()) && !RegisteredManagers.contains(manager->GetTypeId())) { - ManagersInRegistration.emplace(manager->GetTypeId(), manager); - Register(new NInitializer::TDSAccessorInitialized(Config.GetRequestConfig(), - manager->GetTypeId(), manager->GetInitializer(), InternalController, InitializationSnapshot)); + switch (RegistrationData->GetStage()) { + case TRegistrationData::EStage::Created: + RegistrationData->StartInitialization(); + break; + case TRegistrationData::EStage::WaitInitializerInfo: + if (!isInitialization) { + break; } - } - } else if (!PreparationFlag) { - PreparationFlag = true; - Send(SelfId(), new TEvSubscribeExternal(InitializationFetcher)); + case TRegistrationData::EStage::Active: + for (auto&& b : managers) { + Y_VERIFY(!RegistrationData->Registered.contains(b->GetTypeId())); + if (!RegistrationData->InRegistration.contains(b->GetTypeId()) && !RegistrationData->Registered.contains(b->GetTypeId())) { + RegistrationData->InRegistration.emplace(b->GetTypeId(), b); + RegisterWithSameMailbox(new TBehaviourRegistrator(b, RegistrationData, Config.GetRequestConfig())); + } + } + break; } } - EventsWaiting[id].emplace_back(ev, sender); + RegistrationData->EventsWaiting->Add(id, ev, sender); } void TService::Handle(TEvPrepareManager::TPtr& ev) { - auto it = RegisteredManagers.find(ev->Get()->GetManager()->GetTypeId()); - if (it != RegisteredManagers.end()) { + auto it = RegistrationData->Registered.find(ev->Get()->GetManager()->GetTypeId()); + if (it != RegistrationData->Registered.end()) { Send(ev->Sender, new TEvManagerPrepared(it->second)); } else { auto m = ev->Get()->GetManager(); @@ -45,67 +52,6 @@ void TService::Handle(TEvPrepareManager::TPtr& ev) { } } -void TService::InitializationFinished(const TString& initId) { - auto it = ManagersInRegistration.find(initId); - Y_VERIFY(it != ManagersInRegistration.end()); - - RegisteredManagers.emplace(initId, it->second); - ManagersInRegistration.erase(it); - - std::map<TManagersId, std::deque<TWaitEvent>> movedEvents; - for (auto it = EventsWaiting.begin(); it != EventsWaiting.end(); ) { - auto m = it->first; - if (!m.RemoveId(initId)) { - ++it; - continue; - } - if (m.IsEmpty()) { - for (auto&& i : it->second) { - i.Resend(SelfId()); - } - } else { - auto itNext = EventsWaiting.find(m); - if (itNext == EventsWaiting.end()) { - movedEvents.emplace(m, std::move(it->second)); - } else { - for (auto&& i : it->second) { - itNext->second.emplace_back(std::move(i)); - } - } - } - it = EventsWaiting.erase(it); - } - for (auto&& i : movedEvents) { - EventsWaiting.emplace(i.first, std::move(i.second)); - } -} - -void TService::Handle(TEvTableDescriptionSuccess::TPtr& ev) { - const TString& initId = ev->Get()->GetRequestId(); - auto it = ManagersInRegistration.find(initId); - Y_VERIFY(it != ManagersInRegistration.end()); - it->second->GetOperationsManager()->SetActualSchema(ev->Get()->GetSchema()); - InitializationFinished(initId); -} - -void TService::Handle(TEvTableDescriptionFailed::TPtr& ev) { - const TString& initId = ev->Get()->GetRequestId(); - ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "metadata service cannot receive table description for " << initId << Endl; - Schedule(TDuration::Seconds(1), new NInitializer::TEvInitializationFinished(initId)); -} - -void TService::Handle(NInitializer::TEvInitializationFinished::TPtr& ev) { - const TString& initId = ev->Get()->GetInitializationId(); - - auto it = ManagersInRegistration.find(initId); - Y_VERIFY(it != ManagersInRegistration.end()); - if (it->second->GetOperationsManager()) { - Register(new TSchemeDescriptionActor(InternalController, initId, it->second->GetStorageTablePath())); - } else { - InitializationFinished(initId); - } -} - void TService::Handle(TEvSubscribeExternal::TPtr& ev) { const TActorId senderId = ev->Sender; ProcessEventWithFetcher(*ev, ev->Get()->GetFetcher(), [this, senderId](const TActorId& actorId) { @@ -121,17 +67,17 @@ void TService::Handle(TEvAskSnapshot::TPtr& ev) { } void TService::Handle(TEvObjectsOperation::TPtr& ev) { - if (ev->Get()->GetCommand()->GetManager()->GetTypeId() == NInitializer::TDBInitialization::GetTypeId()) { - ev->Get()->GetCommand()->SetManager(NInitializer::TDBInitialization::GetBehaviour()); - ev->Get()->GetCommand()->Execute(); + auto command = ev->Get()->GetCommand(); + if (command->GetBehaviour()->GetTypeId() == NInitializer::TDBInitialization::GetTypeId()) { + command->SetBehaviour(NInitializer::TDBInitialization::GetBehaviour()); + command->Execute(); } else { - auto it = RegisteredManagers.find(ev->Get()->GetCommand()->GetManager()->GetTypeId()); - if (it != RegisteredManagers.end()) { - ev->Get()->GetCommand()->SetManager(it->second); - ev->Get()->GetCommand()->Execute(); + auto it = RegistrationData->Registered.find(command->GetBehaviour()->GetTypeId()); + if (it != RegistrationData->Registered.end()) { + command->Execute(); } else { - auto m = ev->Get()->GetCommand()->GetManager(); - PrepareManagers({ m }, ev->ReleaseBase(), ev->Sender); + auto b = command->GetBehaviour(); + PrepareManagers({ b }, ev->ReleaseBase(), ev->Sender); } } } @@ -144,60 +90,15 @@ void TService::Handle(TEvUnsubscribeExternal::TPtr& ev) { } void TService::Handle(TEvRefreshSubscriberData::TPtr& ev) { - auto s = ev->Get()->GetSnapshot(); - InitializationSnapshot = dynamic_pointer_cast<NInitializer::TSnapshot>(s); - Y_VERIFY(InitializationSnapshot); - if (!ActiveFlag) { - ActiveFlag = true; - Activate(); - } -} - -void TService::Activate() { - for (auto&& i : EventsWaiting) { - i.second.front().Resend(SelfId()); - i.second.pop_front(); - } -} - -void TService::Handle(TDSAccessorSimple::TEvController::TEvResult::TPtr& ev) { - InitializationSnapshot = dynamic_pointer_cast<NInitializer::TSnapshot>(ev->Get()->GetResult()); - Y_VERIFY(InitializationSnapshot); - Activate(); -} - -void TService::Handle(TEvStartMetadataService::TPtr& /*ev*/) { - Register(new TDSAccessorSimple(Config.GetRequestConfig(), InternalController, InitializationFetcher)); -} - -void TService::Handle(TDSAccessorSimple::TEvController::TEvError::TPtr& ev) { - ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "cannot receive initializer snapshot: " << ev->Get()->GetErrorMessage() << Endl; - Schedule(TDuration::Seconds(1), new TEvStartMetadataService()); -} - -void TService::Handle(TDSAccessorSimple::TEvController::TEvTableAbsent::TPtr& /*ev*/) { - InitializationSnapshot = std::make_shared<NInitializer::TSnapshot>(TInstant::Zero()); - Activate(); + RegistrationData->SetInitializationSnapshot(ev->Get()->GetSnapshot()); } void TService::Bootstrap(const NActors::TActorContext& /*ctx*/) { + RegistrationData->EventsWaiting = std::make_shared<TEventsCollector>(SelfId()); ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "metadata service started" << Endl; Become(&TService::StateMain); - InternalController = std::make_shared<TServiceInternalController>(SelfId()); - InitializationFetcher = std::make_shared<NInitializer::TFetcher>(); - Sender<TEvStartMetadataService>().SendTo(SelfId()); + RegisterWithSameMailbox(new TInitializerSnapshotReader(RegistrationData, Config.GetRequestConfig())); } -void TServiceInternalController::InitializationFinished(const TString& id) const { - ActorId.Send(ActorId, new NInitializer::TEvInitializationFinished(id)); -} - -void TServiceInternalController::OnDescriptionFailed(const TString& errorMessage, const TString& requestId) const { - ActorId.Send(ActorId, new TEvTableDescriptionFailed(errorMessage, requestId)); -} - -void TServiceInternalController::OnDescriptionSuccess(THashMap<ui32, TSysTables::TTableColumnInfo>&& result, const TString& requestId) const { - ActorId.Send(ActorId, new TEvTableDescriptionSuccess(std::move(result), requestId)); -} } diff --git a/ydb/services/metadata/ds_table/service.h b/ydb/services/metadata/ds_table/service.h index a22597d4a41..9c0746c0092 100644 --- a/ydb/services/metadata/ds_table/service.h +++ b/ydb/services/metadata/ds_table/service.h @@ -3,6 +3,7 @@ #include "config.h" #include "scheme_describe.h" #include "accessor_snapshot_simple.h" +#include "registration.h" #include <ydb/services/metadata/service.h> #include <ydb/services/metadata/initializer/common.h> @@ -16,157 +17,28 @@ namespace NKikimr::NMetadata::NProvider { -class TEvStartMetadataService: public TEventLocal<TEvStartMetadataService, EEvents::EvStartMetadataService> { -}; - -class TEvTableDescriptionFailed: public TEventLocal<TEvTableDescriptionFailed, EEvents::EvTableDescriptionFailed> { -private: - YDB_READONLY_DEF(TString, ErrorMessage); - YDB_READONLY_DEF(TString, RequestId); -public: - explicit TEvTableDescriptionFailed(const TString& errorMessage, const TString& reqId) - : ErrorMessage(errorMessage) - , RequestId(reqId) { - - } -}; - -class TEvTableDescriptionSuccess: public TEventLocal<TEvTableDescriptionSuccess, EEvents::EvTableDescriptionSuccess> { -private: - using TDescription = THashMap<ui32, TSysTables::TTableColumnInfo>; - YDB_READONLY_DEF(TString, RequestId); - YDB_READONLY_DEF(TDescription, Description); -public: - TEvTableDescriptionSuccess(TDescription&& description, const TString& reqId) - : RequestId(reqId) - , Description(std::move(description)) - { - } - - NModifications::TTableSchema GetSchema() const { - return NModifications::TTableSchema(Description); - } -}; - -class TServiceInternalController: public NInitializer::IInitializerOutput, - public TDSAccessorSimple::TEvController, public ISchemeDescribeController { -private: - const NActors::TActorIdentity ActorId; -public: - TServiceInternalController(const NActors::TActorIdentity& actorId) - : TDSAccessorSimple::TEvController(actorId) - , ActorId(actorId) - { - - } - - virtual void InitializationFinished(const TString& id) const override; - - virtual void OnDescriptionFailed(const TString& errorMessage, const TString& requestId) const override; - virtual void OnDescriptionSuccess(THashMap<ui32, TSysTables::TTableColumnInfo>&& result, const TString& requestId) const override; -}; - -class TManagersId { -private: - YDB_READONLY_DEF(std::set<TString>, ManagerIds); -public: - TManagersId(const std::vector<IClassBehaviour::TPtr>& managers) { - for (auto&& i : managers) { - ManagerIds.emplace(i->GetTypeId()); - } - } - - bool IsEmpty() const { - return ManagerIds.empty(); - } - - bool RemoveId(const TString& id) { - auto it = ManagerIds.find(id); - if (it == ManagerIds.end()) { - return false; - } - ManagerIds.erase(it); - return true; - } - - bool operator<(const TManagersId& item) const { - if (ManagerIds.size() < item.ManagerIds.size()) { - return true; - } else if (ManagerIds.size() > item.ManagerIds.size()) { - return false; - } else { - auto itSelf = ManagerIds.begin(); - auto itItem = item.ManagerIds.begin(); - while (itSelf != ManagerIds.end()) { - if (*itSelf < *itItem) { - return true; - } - ++itSelf; - ++itItem; - } - return false; - } - } -}; - -class TWaitEvent { -private: - TAutoPtr<IEventBase> Event; - const TActorId Sender; -public: - TWaitEvent(TAutoPtr<IEventBase> ev, const TActorId& sender) - : Event(ev) - , Sender(sender) - { - - } - - void Resend(const TActorIdentity& receiver) { - TActivationContext::Send(new IEventHandle(receiver, Sender, Event.Release())); - } -}; - class TService: public NActors::TActorBootstrapped<TService> { private: using TBase = NActors::TActor<TService>; - bool ActiveFlag = false; - bool PreparationFlag = false; std::map<TString, NActors::TActorId> Accessors; - std::map<TManagersId, std::deque<TWaitEvent>> EventsWaiting; - std::map<TString, IClassBehaviour::TPtr> ManagersInRegistration; - std::map<TString, IClassBehaviour::TPtr> RegisteredManagers; - - std::shared_ptr<NInitializer::TFetcher> InitializationFetcher; - std::shared_ptr<NInitializer::TSnapshot> InitializationSnapshot; - std::shared_ptr<TServiceInternalController> InternalController; + std::shared_ptr<TRegistrationData> RegistrationData = std::make_shared<TRegistrationData>(); const TConfig Config; - void Handle(TEvStartMetadataService::TPtr& ev); - void Handle(NInitializer::TEvInitializationFinished::TPtr & ev); void Handle(TEvRefreshSubscriberData::TPtr& ev); - void Handle(TEvAskSnapshot::TPtr& ev); void Handle(TEvPrepareManager::TPtr& ev); void Handle(TEvSubscribeExternal::TPtr& ev); void Handle(TEvUnsubscribeExternal::TPtr& ev); void Handle(TEvObjectsOperation::TPtr& ev); - void Handle(TEvTableDescriptionSuccess::TPtr& ev); - void Handle(TEvTableDescriptionFailed::TPtr& ev); - - void Handle(TDSAccessorSimple::TEvController::TEvResult::TPtr& ev); - void Handle(TDSAccessorSimple::TEvController::TEvError::TPtr& ev); - void Handle(TDSAccessorSimple::TEvController::TEvTableAbsent::TPtr& ev); - - void PrepareManagers(std::vector<IClassBehaviour::TPtr> manager, TAutoPtr<IEventBase> ev, const NActors::TActorId& sender); - void InitializationFinished(const TString& initId); + void PrepareManagers(std::vector<IClassBehaviour::TPtr> managers, TAutoPtr<IEventBase> ev, const NActors::TActorId& sender); void Activate(); template <class TAction> void ProcessEventWithFetcher(IEventHandle& ev, NFetcher::ISnapshotsFetcher::TPtr fetcher, TAction action) { std::vector<IClassBehaviour::TPtr> needManagers; for (auto&& i : fetcher->GetManagers()) { - if (!RegisteredManagers.contains(i->GetTypeId())) { + if (!RegistrationData->Registered.contains(i->GetTypeId())) { needManagers.emplace_back(i); } } @@ -188,10 +60,6 @@ public: STATEFN(StateMain) { switch (ev->GetTypeRewrite()) { - hFunc(TDSAccessorSimple::TEvController::TEvResult, Handle); - hFunc(TDSAccessorSimple::TEvController::TEvError, Handle); - hFunc(TDSAccessorSimple::TEvController::TEvTableAbsent, Handle); - hFunc(TEvObjectsOperation, Handle); hFunc(TEvRefreshSubscriberData, Handle); hFunc(TEvAskSnapshot, Handle); @@ -199,20 +67,13 @@ public: hFunc(TEvSubscribeExternal, Handle); hFunc(TEvUnsubscribeExternal, Handle); - hFunc(TEvTableDescriptionSuccess, Handle); - hFunc(TEvTableDescriptionFailed, Handle); - - hFunc(TEvStartMetadataService, Handle); - - hFunc(NInitializer::TEvInitializationFinished, Handle); default: Y_VERIFY(false); } } TService(const TConfig& config) - : Config(config) - { + : Config(config) { TServiceOperator::Register(Config); } }; diff --git a/ydb/services/metadata/initializer/accessor_init.cpp b/ydb/services/metadata/initializer/accessor_init.cpp index 9ebe20be327..cab2f2d9492 100644 --- a/ydb/services/metadata/initializer/accessor_init.cpp +++ b/ydb/services/metadata/initializer/accessor_init.cpp @@ -34,7 +34,7 @@ void TDSAccessorInitialized::Handle(TEvInitializerPreparationFinished::TPtr& ev) Modifiers.front()->Execute(SelfId(), Config); } else { ALS_INFO(NKikimrServices::METADATA_INITIALIZER) << "initialization finished"; - ExternalController->InitializationFinished(ComponentId); + ExternalController->OnInitializationFinished(ComponentId); } } @@ -49,7 +49,7 @@ void TDSAccessorInitialized::DoNextModifier() { Modifiers.front()->Execute(SelfId(), Config); } else { ALS_INFO(NKikimrServices::METADATA_INITIALIZER) << "initialization finished"; - ExternalController->InitializationFinished(ComponentId); + ExternalController->OnInitializationFinished(ComponentId); } } diff --git a/ydb/services/metadata/initializer/behaviour.h b/ydb/services/metadata/initializer/behaviour.h index 9bfc5711e53..1947b7c85a2 100644 --- a/ydb/services/metadata/initializer/behaviour.h +++ b/ydb/services/metadata/initializer/behaviour.h @@ -1,12 +1,13 @@ #pragma once +#include "object.h" #include <ydb/services/metadata/abstract/initialization.h> #include <ydb/services/metadata/abstract/kqp_common.h> #include <ydb/services/metadata/manager/common.h> namespace NKikimr::NMetadata::NInitializer { -class TDBObjectBehaviour: public IClassBehaviour { +class TDBObjectBehaviour: public TClassBehaviour<TDBInitialization> { protected: virtual IInitializationBehaviour::TPtr ConstructInitializer() const override; virtual std::shared_ptr<NModifications::IOperationsManager> ConstructOperationsManager() const override; diff --git a/ydb/services/metadata/initializer/common.h b/ydb/services/metadata/initializer/common.h index 4ff8f991693..b5e6206901c 100644 --- a/ydb/services/metadata/initializer/common.h +++ b/ydb/services/metadata/initializer/common.h @@ -69,15 +69,15 @@ public: class IInitializerInput { public: using TPtr = std::shared_ptr<IInitializerInput>; - virtual void PreparationFinished(const TVector<ITableModifier::TPtr>& modifiers) const = 0; - virtual void PreparationProblem(const TString& errorMessage) const = 0; + virtual void OnPreparationFinished(const TVector<ITableModifier::TPtr>& modifiers) const = 0; + virtual void OnPreparationProblem(const TString& errorMessage) const = 0; virtual ~IInitializerInput() = default; }; class IInitializerOutput { public: using TPtr = std::shared_ptr<IInitializerOutput>; - virtual void InitializationFinished(const TString& id) const = 0; + virtual void OnInitializationFinished(const TString& id) const = 0; virtual ~IInitializerOutput() = default; }; diff --git a/ydb/services/metadata/initializer/controller.cpp b/ydb/services/metadata/initializer/controller.cpp index 1aaadb1676a..b3cfd8e64ff 100644 --- a/ydb/services/metadata/initializer/controller.cpp +++ b/ydb/services/metadata/initializer/controller.cpp @@ -5,23 +5,23 @@ namespace NKikimr::NMetadata::NInitializer { -void TInitializerInput::PreparationFinished(const TVector<ITableModifier::TPtr>& modifiers) const { +void TInitializerInput::OnPreparationFinished(const TVector<ITableModifier::TPtr>& modifiers) const { ActorId.Send(ActorId, new TEvInitializerPreparationFinished(modifiers)); } -void TInitializerInput::PreparationProblem(const TString& errorMessage) const { +void TInitializerInput::OnPreparationProblem(const TString& errorMessage) const { ActorId.Send(ActorId, new TEvInitializerPreparationProblem(errorMessage)); } -void TInitializerInput::AlterProblem(const TString& errorMessage) { +void TInitializerInput::OnAlteringProblem(const TString& errorMessage) { ActorId.Send(ActorId, new NModifications::TEvModificationProblem(errorMessage)); } -void TInitializerInput::AlterFinished() { +void TInitializerInput::OnAlteringFinished() { ActorId.Send(ActorId, new NModifications::TEvModificationFinished()); } -void TInitializerOutput::InitializationFinished(const TString& id) const { +void TInitializerOutput::OnInitializationFinished(const TString& id) const { ActorId.Send(ActorId, new TEvInitializationFinished(id)); } diff --git a/ydb/services/metadata/initializer/controller.h b/ydb/services/metadata/initializer/controller.h index 528b46847c0..062a20b464e 100644 --- a/ydb/services/metadata/initializer/controller.h +++ b/ydb/services/metadata/initializer/controller.h @@ -15,10 +15,10 @@ public: } - virtual void AlterProblem(const TString& errorMessage) override; - virtual void AlterFinished() override; - virtual void PreparationFinished(const TVector<ITableModifier::TPtr>& modifiers) const override; - virtual void PreparationProblem(const TString& errorMessage) const override; + virtual void OnAlteringProblem(const TString& errorMessage) override; + virtual void OnAlteringFinished() override; + virtual void OnPreparationFinished(const TVector<ITableModifier::TPtr>& modifiers) const override; + virtual void OnPreparationProblem(const TString& errorMessage) const override; }; class TInitializerOutput: public IInitializerOutput { @@ -30,7 +30,7 @@ public: } - virtual void InitializationFinished(const TString& id) const override; + virtual void OnInitializationFinished(const TString& id) const override; }; } diff --git a/ydb/services/metadata/initializer/initializer.cpp b/ydb/services/metadata/initializer/initializer.cpp index a5b57890257..26905a30e54 100644 --- a/ydb/services/metadata/initializer/initializer.cpp +++ b/ydb/services/metadata/initializer/initializer.cpp @@ -28,7 +28,7 @@ void TInitializer::DoPrepare(IInitializerInput::TPtr controller) const { result.emplace_back(new TGenericTableModifier<NRequest::TDialogCreateTable>(request, "create")); } result.emplace_back(TACLModifierConstructor::GetReadOnlyModifier(TDBInitialization::GetBehaviour()->GetStorageTablePath(), "acl")); - controller->PreparationFinished(result); + controller->OnPreparationFinished(result); } } diff --git a/ydb/services/metadata/initializer/manager.cpp b/ydb/services/metadata/initializer/manager.cpp index 55859c8b2fc..079e78b216f 100644 --- a/ydb/services/metadata/initializer/manager.cpp +++ b/ydb/services/metadata/initializer/manager.cpp @@ -7,7 +7,7 @@ void TManager::DoPrepareObjectsBeforeModification(std::vector<TDBInitialization> NModifications::IAlterPreparationController<TDBInitialization>::TPtr controller, const TModificationContext& /*context*/) const { - controller->PreparationFinished(std::move(objects)); + controller->OnPreparationFinished(std::move(objects)); } NModifications::TOperationParsingResult TManager::DoBuildPatchFromSettings( diff --git a/ydb/services/metadata/initializer/snapshot.cpp b/ydb/services/metadata/initializer/snapshot.cpp index 9560ba30daf..60eb064d5a4 100644 --- a/ydb/services/metadata/initializer/snapshot.cpp +++ b/ydb/services/metadata/initializer/snapshot.cpp @@ -27,4 +27,13 @@ TString TSnapshot::DoSerializeToString() const { return sb; } +bool TSnapshot::HasComponent(const TString& componentId) const { + for (auto&& i : Objects) { + if (i.first.GetComponentId() == componentId) { + return true; + } + } + return false; +} + } diff --git a/ydb/services/metadata/initializer/snapshot.h b/ydb/services/metadata/initializer/snapshot.h index 98193131cc1..b4953955aad 100644 --- a/ydb/services/metadata/initializer/snapshot.h +++ b/ydb/services/metadata/initializer/snapshot.h @@ -15,6 +15,8 @@ protected: virtual bool DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawData) override; virtual TString DoSerializeToString() const override; public: + bool HasComponent(const TString& componentId) const; + using TBase::TBase; }; diff --git a/ydb/services/metadata/initializer/ut/ut_init.cpp b/ydb/services/metadata/initializer/ut/ut_init.cpp index 16ea1a4be10..122c2214198 100644 --- a/ydb/services/metadata/initializer/ut/ut_init.cpp +++ b/ydb/services/metadata/initializer/ut/ut_init.cpp @@ -50,7 +50,7 @@ Y_UNIT_TEST_SUITE(Initializer) { result.emplace_back(new NMetadata::NInitializer::TGenericTableModifier<NMetadata::NRequest::TDialogCreateTable>(request, "create")); } result.emplace_back(NMetadata::NInitializer::TACLModifierConstructor::GetReadOnlyModifier(tablePath, "acl")); - controller->PreparationFinished(result); + controller->OnPreparationFinished(result); } public: }; @@ -63,7 +63,7 @@ Y_UNIT_TEST_SUITE(Initializer) { virtual std::shared_ptr<NMetadata::NInitializer::IInitializationBehaviour> ConstructInitializer() const override { return std::make_shared<TTestInitializer>(); } - virtual std::shared_ptr<NMetadata::NModifications::IOperationsManager> ConstructOperationsManager() const override { + virtual std::shared_ptr<NMetadata::NModifications::IOperationsManager> GetOperationsManager() const override { return nullptr; } public: @@ -71,7 +71,7 @@ Y_UNIT_TEST_SUITE(Initializer) { return TypeName<TInitBehaviourTest>(); } - static IClassBehaviour::TPtr GetInstant() { + static IClassBehaviour::TPtr GetInstance() { static std::shared_ptr<TInitBehaviourTest> result = std::make_shared<TInitBehaviourTest>(); return result; } @@ -97,7 +97,7 @@ Y_UNIT_TEST_SUITE(Initializer) { void Bootstrap() { Become(&TThis::StateWork); - Sender<NMetadata::NProvider::TEvPrepareManager>(TInitBehaviourTest::GetInstant()). + Sender<NMetadata::NProvider::TEvPrepareManager>(TInitBehaviourTest::GetInstance()). SendTo(NMetadata::NProvider::MakeServiceId(SelfId().NodeId())); } }; diff --git a/ydb/services/metadata/manager/abstract.h b/ydb/services/metadata/manager/abstract.h index 5016eb1d84a..90b21eb213e 100644 --- a/ydb/services/metadata/manager/abstract.h +++ b/ydb/services/metadata/manager/abstract.h @@ -194,7 +194,7 @@ public: class IAlterCommand { private: YDB_READONLY_DEF(std::vector<NInternal::TTableRecord>, Records); - YDB_ACCESSOR_DEF(IClassBehaviour::TPtr, Manager); + YDB_ACCESSOR_DEF(IClassBehaviour::TPtr, Behaviour); YDB_READONLY_DEF(IAlterController::TPtr, Controller); protected: mutable IOperationsManager::TModificationContext Context; @@ -205,7 +205,7 @@ public: template <class TObject> std::shared_ptr<IObjectOperationsManager<TObject>> GetOperationsManagerFor() const { - auto result = std::dynamic_pointer_cast<IObjectOperationsManager<TObject>>(Manager->GetOperationsManager()); + auto result = std::dynamic_pointer_cast<IObjectOperationsManager<TObject>>(Behaviour->GetOperationsManager()); Y_VERIFY(result); return result; } @@ -215,29 +215,37 @@ public: } IAlterCommand(const std::vector<NInternal::TTableRecord>& records, - IClassBehaviour::TPtr manager, + IClassBehaviour::TPtr behaviour, NModifications::IAlterController::TPtr controller, const IOperationsManager::TModificationContext& context) : Records(records) - , Manager(manager) + , Behaviour(behaviour) , Controller(controller) , Context(context) { - Y_VERIFY(Manager->GetOperationsManager()); + Y_VERIFY(Behaviour->GetOperationsManager()); } IAlterCommand(const NInternal::TTableRecord& record, - IClassBehaviour::TPtr manager, + IClassBehaviour::TPtr behaviour, NModifications::IAlterController::TPtr controller, const IOperationsManager::TModificationContext& context) - : Manager(manager) + : Behaviour(behaviour) , Controller(controller) , Context(context) { - Y_VERIFY(Manager->GetOperationsManager()); + Y_VERIFY(Behaviour->GetOperationsManager()); Records.emplace_back(record); } void Execute() const { + if (!Behaviour) { + Controller->OnAlteringProblem("behaviour not ready"); + return; + } + if (!Behaviour->GetOperationsManager()) { + Controller->OnAlteringProblem("behaviour's manager not initialized"); + return; + } DoExecute(); } }; diff --git a/ydb/services/metadata/manager/alter.h b/ydb/services/metadata/manager/alter.h index 191f0c6e7f5..57301997840 100644 --- a/ydb/services/metadata/manager/alter.h +++ b/ydb/services/metadata/manager/alter.h @@ -55,17 +55,17 @@ protected: auto& first = TBase::Patches.front(); std::vector<Ydb::Column> columns = first.SelectOwnedColumns(Manager->GetSchema().GetPKColumns()); if (!columns.size()) { - TBase::ExternalController->AlterProblem("no pk columns in patch"); + TBase::ExternalController->OnAlteringProblem("no pk columns in patch"); return false; } if (columns.size() != Manager->GetSchema().GetPKColumns().size()) { - TBase::ExternalController->AlterProblem("no columns for pk detection"); + TBase::ExternalController->OnAlteringProblem("no columns for pk detection"); return false; } TBase::RestoreObjectIds.InitColumns(columns); for (auto&& i : TBase::Patches) { if (!TBase::RestoreObjectIds.AddRecordNativeValues(i)) { - TBase::ExternalController->AlterProblem("incorrect pk columns"); + TBase::ExternalController->OnAlteringProblem("incorrect pk columns"); return false; } } diff --git a/ydb/services/metadata/manager/alter_impl.h b/ydb/services/metadata/manager/alter_impl.h index ffc3564601e..685958f8829 100644 --- a/ydb/services/metadata/manager/alter_impl.h +++ b/ydb/services/metadata/manager/alter_impl.h @@ -24,22 +24,22 @@ public: } - virtual void RestoreFinished(std::vector<TObject>&& objects, const TString& transactionId) override { + virtual void OnRestoringFinished(std::vector<TObject>&& objects, const TString& transactionId) override { ActorId.Send(ActorId, new TEvRestoreFinished<TObject>(std::move(objects), transactionId)); } - virtual void RestoreProblem(const TString& errorMessage) override { + virtual void OnRestoringProblem(const TString& errorMessage) override { ActorId.Send(ActorId, new TEvRestoreProblem(errorMessage)); } - virtual void ModificationFinished() override { + virtual void OnModificationFinished() override { ActorId.Send(ActorId, new TEvModificationFinished()); } - virtual void ModificationProblem(const TString& errorMessage) override { + virtual void OnModificationProblem(const TString& errorMessage) override { ActorId.Send(ActorId, new TEvModificationProblem(errorMessage)); } - virtual void PreparationProblem(const TString& errorMessage) override { + virtual void OnPreparationProblem(const TString& errorMessage) override { ActorId.Send(ActorId, new TEvAlterPreparationProblem(errorMessage)); } - virtual void PreparationFinished(std::vector<TObject>&& objects) override { + virtual void OnPreparationFinished(std::vector<TObject>&& objects) override { ActorId.Send(ActorId, new TEvAlterPreparationFinished<TObject>(std::move(objects))); } @@ -119,7 +119,7 @@ public: void Bootstrap() { InitState(); if (!Patches.size()) { - ExternalController->AlterProblem("no patches"); + ExternalController->OnAlteringProblem("no patches"); return TBase::PassAway(); } if (!BuildRestoreObjectIds()) { @@ -158,29 +158,29 @@ public: records.ReserveRows(ev->Get()->GetObjects().size()); for (auto&& i : ev->Get()->GetObjects()) { if (!records.AddRecordNativeValues(i.SerializeToRecord())) { - ExternalController->AlterProblem("unexpected serialization inconsistency"); + ExternalController->OnAlteringProblem("unexpected serialization inconsistency"); return TBase::PassAway(); } } if (!ProcessPreparedObjects(std::move(records))) { - ExternalController->AlterProblem("cannot process prepared objects"); + ExternalController->OnAlteringProblem("cannot process prepared objects"); return TBase::PassAway(); } } void Handle(typename NRequest::TEvRequestFailed::TPtr& /*ev*/) { auto g = TBase::PassAwayGuard(); - ExternalController->AlterProblem("cannot initialize session"); + ExternalController->OnAlteringProblem("cannot initialize session"); } void Handle(TEvAlterPreparationProblem::TPtr& ev) { auto g = TBase::PassAwayGuard(); - ExternalController->AlterProblem("preparation problem: " + ev->Get()->GetErrorMessage()); + ExternalController->OnAlteringProblem("preparation problem: " + ev->Get()->GetErrorMessage()); } void Handle(TEvRestoreProblem::TPtr& ev) { auto g = TBase::PassAwayGuard(); - ExternalController->AlterProblem("cannot restore objects: " + ev->Get()->GetErrorMessage()); + ExternalController->OnAlteringProblem("cannot restore objects: " + ev->Get()->GetErrorMessage()); } }; @@ -199,7 +199,7 @@ protected: TBase::RestoreObjectIds.InitColumns(Manager->GetSchema().GetPKColumns()); for (auto&& i : TBase::Patches) { if (!TBase::RestoreObjectIds.AddRecordNativeValues(i)) { - TBase::ExternalController->AlterProblem("no pk columns in patch"); + TBase::ExternalController->OnAlteringProblem("no pk columns in patch"); return false; } } @@ -233,13 +233,13 @@ public: } TObject objectPatched; if (!trPatch) { - TBase::ExternalController->AlterProblem("cannot found patch for object"); + TBase::ExternalController->OnAlteringProblem("cannot found patch for object"); return false; } else if (!trObject.TakeValuesFrom(*trPatch)) { - TBase::ExternalController->AlterProblem("cannot patch object"); + TBase::ExternalController->OnAlteringProblem("cannot patch object"); return false; } else if (!TObject::TDecoder::DeserializeFromRecord(objectPatched, trObject)) { - TBase::ExternalController->AlterProblem("cannot parse object after patch"); + TBase::ExternalController->OnAlteringProblem("cannot parse object after patch"); return false; } else { i = std::move(objectPatched); @@ -256,7 +256,7 @@ public: if (!found) { TObject object; if (!TObject::TDecoder::DeserializeFromRecord(object, p)) { - TBase::ExternalController->AlterProblem("cannot parse new object"); + TBase::ExternalController->OnAlteringProblem("cannot parse new object"); return false; } objects.emplace_back(std::move(object)); @@ -267,12 +267,12 @@ public: void Handle(TEvModificationFinished::TPtr& /*ev*/) { auto g = TBase::PassAwayGuard(); - TBase::ExternalController->AlterFinished(); + TBase::ExternalController->OnAlteringFinished(); } void Handle(TEvModificationProblem::TPtr& ev) { auto g = TBase::PassAwayGuard(); - TBase::ExternalController->AlterProblem("cannot " + GetModificationType() + " objects: " + ev->Get()->GetErrorMessage()); + TBase::ExternalController->OnAlteringProblem("cannot " + GetModificationType() + " objects: " + ev->Get()->GetErrorMessage()); } }; diff --git a/ydb/services/metadata/manager/common.h b/ydb/services/metadata/manager/common.h index c2dcbc3ea68..38848407d93 100644 --- a/ydb/services/metadata/manager/common.h +++ b/ydb/services/metadata/manager/common.h @@ -12,8 +12,8 @@ public: using TPtr = std::shared_ptr<IAlterPreparationController>; virtual ~IAlterPreparationController() = default; - virtual void PreparationFinished(std::vector<TObject>&& objects) = 0; - virtual void PreparationProblem(const TString& errorMessage) = 0; + virtual void OnPreparationFinished(std::vector<TObject>&& objects) = 0; + virtual void OnPreparationProblem(const TString& errorMessage) = 0; }; class IAlterController { @@ -21,8 +21,8 @@ public: using TPtr = std::shared_ptr<IAlterController>; virtual ~IAlterController() = default; - virtual void AlterProblem(const TString& errorMessage) = 0; - virtual void AlterFinished() = 0; + virtual void OnAlteringProblem(const TString& errorMessage) = 0; + virtual void OnAlteringFinished() = 0; }; diff --git a/ydb/services/metadata/manager/generic_manager.h b/ydb/services/metadata/manager/generic_manager.h index a88c34dceeb..9ee4fca2cc6 100644 --- a/ydb/services/metadata/manager/generic_manager.h +++ b/ydb/services/metadata/manager/generic_manager.h @@ -14,10 +14,10 @@ public: } - virtual void AlterProblem(const TString& errorMessage) override { + virtual void OnAlteringProblem(const TString& errorMessage) override { Promise.SetValue(TObjectOperatorResult(false).SetErrorMessage(errorMessage)); } - virtual void AlterFinished() override { + virtual void OnAlteringFinished() override { Promise.SetValue(TObjectOperatorResult(true)); } diff --git a/ydb/services/metadata/manager/modification.h b/ydb/services/metadata/manager/modification.h index 7a85bdb4180..795826390d0 100644 --- a/ydb/services/metadata/manager/modification.h +++ b/ydb/services/metadata/manager/modification.h @@ -53,14 +53,14 @@ protected: Requests.front(), SystemUserToken, TBase::SelfId())); Requests.pop_front(); } else { - Controller->ModificationFinished(); + Controller->OnModificationFinished(); TBase::PassAway(); } } void Handle(NRequest::TEvRequestFailed::TPtr& ev) { auto g = TBase::PassAwayGuard(); - Controller->ModificationProblem("cannot execute yql request for " + GetModifyType() + + Controller->OnModificationProblem("cannot execute yql request for " + GetModifyType() + " objects: " + ev->Get()->GetErrorMessage()); } diff --git a/ydb/services/metadata/manager/modification_controller.h b/ydb/services/metadata/manager/modification_controller.h index 0df4a8c2acc..b44c3a3ad0d 100644 --- a/ydb/services/metadata/manager/modification_controller.h +++ b/ydb/services/metadata/manager/modification_controller.h @@ -8,8 +8,8 @@ class IModificationObjectsController { public: using TPtr = std::shared_ptr<IModificationObjectsController>; virtual ~IModificationObjectsController() = default; - virtual void ModificationProblem(const TString& errorMessage) = 0; - virtual void ModificationFinished() = 0; + virtual void OnModificationProblem(const TString& errorMessage) = 0; + virtual void OnModificationFinished() = 0; }; class TEvModificationFinished: public TEventLocal<TEvModificationFinished, EvModificationFinished> { diff --git a/ydb/services/metadata/manager/restore.h b/ydb/services/metadata/manager/restore.h index a50caf7de89..5a000669665 100644 --- a/ydb/services/metadata/manager/restore.h +++ b/ydb/services/metadata/manager/restore.h @@ -30,17 +30,17 @@ private: for (auto&& row : qResult.result_sets()[0].rows()) { TObject object; if (!object.DeserializeFromRecord(decoder, row)) { - Controller->RestoreProblem("cannot parse exists object"); + Controller->OnRestoringProblem("cannot parse exists object"); return; } objects.emplace_back(std::move(object)); } - Controller->RestoreFinished(std::move(objects), qResult.tx_meta().id()); + Controller->OnRestoringFinished(std::move(objects), qResult.tx_meta().id()); } void Handle(NRequest::TEvRequestFailed::TPtr& /*ev*/) { auto g = TBase::PassAwayGuard(); - Controller->RestoreProblem("cannot execute yql request"); + Controller->OnRestoringProblem("cannot execute yql request"); } public: @@ -64,7 +64,7 @@ public: void Bootstrap() { if (ObjectIds.empty()) { - Controller->RestoreProblem("no objects for restore"); + Controller->OnRestoringProblem("no objects for restore"); TBase::PassAway(); } auto request = ObjectIds.BuildSelectQuery(TObject::GetBehaviour()->GetStorageTablePath()); diff --git a/ydb/services/metadata/manager/restore_controller.h b/ydb/services/metadata/manager/restore_controller.h index f0141b1ce8b..cfdecac0fd1 100644 --- a/ydb/services/metadata/manager/restore_controller.h +++ b/ydb/services/metadata/manager/restore_controller.h @@ -11,8 +11,8 @@ public: using TPtr = std::shared_ptr<IRestoreObjectsController>; virtual ~IRestoreObjectsController() = default; - virtual void RestoreFinished(std::vector<TObject>&& objects, const TString& transactionId) = 0; - virtual void RestoreProblem(const TString& errorMessage) = 0; + virtual void OnRestoringFinished(std::vector<TObject>&& objects, const TString& transactionId) = 0; + virtual void OnRestoringProblem(const TString& errorMessage) = 0; }; template <class TObject> diff --git a/ydb/services/metadata/secret/access_behaviour.h b/ydb/services/metadata/secret/access_behaviour.h index 2d84cc462ec..ced34bfcc0a 100644 --- a/ydb/services/metadata/secret/access_behaviour.h +++ b/ydb/services/metadata/secret/access_behaviour.h @@ -8,7 +8,7 @@ namespace NKikimr::NMetadata::NSecret { -class TAccessBehaviour: public IClassBehaviour { +class TAccessBehaviour: public TClassBehaviour<TAccess> { private: static TFactory::TRegistrator<TAccessBehaviour> Registrator; protected: diff --git a/ydb/services/metadata/secret/checker_access.cpp b/ydb/services/metadata/secret/checker_access.cpp index 54c59a27803..1b522a6ea19 100644 --- a/ydb/services/metadata/secret/checker_access.cpp +++ b/ydb/services/metadata/secret/checker_access.cpp @@ -18,12 +18,12 @@ void TAccessPreparationActor::StartChecker() { } } if (!foundSecret) { - Controller->PreparationProblem("used in access secret " + i.GetSecretId() + " not found"); + Controller->OnPreparationProblem("used in access secret " + i.GetSecretId() + " not found"); return; } } } - Controller->PreparationFinished(std::move(Objects)); + Controller->OnPreparationFinished(std::move(Objects)); } void TAccessPreparationActor::Handle(NProvider::TEvRefreshSubscriberData::TPtr& ev) { diff --git a/ydb/services/metadata/secret/checker_secret.cpp b/ydb/services/metadata/secret/checker_secret.cpp index 3607de1889c..e297a5cd6b6 100644 --- a/ydb/services/metadata/secret/checker_secret.cpp +++ b/ydb/services/metadata/secret/checker_secret.cpp @@ -11,20 +11,20 @@ void TSecretPreparationActor::StartChecker() { for (auto&& i : Objects) { if (Context.GetActivityType() == NModifications::IOperationsManager::EActivityType::Alter) { if (!Secrets->GetSecrets().contains(i)) { - Controller->PreparationProblem("secret " + i.GetSecretId() + " not found for alter"); + Controller->OnPreparationProblem("secret " + i.GetSecretId() + " not found for alter"); return; } } for (auto&& sa : Secrets->GetAccess()) { if (Context.GetActivityType() == NModifications::IOperationsManager::EActivityType::Drop) { if (sa.GetOwnerUserId() == i.GetOwnerUserId() && sa.GetSecretId() == i.GetSecretId()) { - Controller->PreparationProblem("secret " + i.GetSecretId() + " using in access for " + sa.GetAccessSID()); + Controller->OnPreparationProblem("secret " + i.GetSecretId() + " using in access for " + sa.GetAccessSID()); return; } } } } - Controller->PreparationFinished(std::move(Objects)); + Controller->OnPreparationFinished(std::move(Objects)); } void TSecretPreparationActor::Handle(NProvider::TEvRefreshSubscriberData::TPtr& ev) { diff --git a/ydb/services/metadata/secret/initializer.cpp b/ydb/services/metadata/secret/initializer.cpp index de06d5037a2..db36f1263f9 100644 --- a/ydb/services/metadata/secret/initializer.cpp +++ b/ydb/services/metadata/secret/initializer.cpp @@ -33,7 +33,7 @@ void TSecretInitializer::DoPrepare(NInitializer::IInitializerInput::TPtr control } result.emplace_back(NInitializer::TACLModifierConstructor::GetNoAccessModifier(TSecret::GetBehaviour()->GetStorageTablePath(), "acl")); result.emplace_back(NInitializer::TACLModifierConstructor::GetNoAccessModifier(TSecret::GetBehaviour()->GetStorageHistoryTablePath(), "acl_history")); - controller->PreparationFinished(result); + controller->OnPreparationFinished(result); } void TAccessInitializer::DoPrepare(NInitializer::IInitializerInput::TPtr controller) const { @@ -66,7 +66,7 @@ void TAccessInitializer::DoPrepare(NInitializer::IInitializerInput::TPtr control } result.emplace_back(NInitializer::TACLModifierConstructor::GetNoAccessModifier(TAccess::GetBehaviour()->GetStorageTablePath(), "acl")); result.emplace_back(NInitializer::TACLModifierConstructor::GetNoAccessModifier(TAccess::GetBehaviour()->GetStorageHistoryTablePath(), "acl_history")); - controller->PreparationFinished(result); + controller->OnPreparationFinished(result); } } diff --git a/ydb/services/metadata/secret/manager.cpp b/ydb/services/metadata/secret/manager.cpp index c1f60a41d00..8642dee62c2 100644 --- a/ydb/services/metadata/secret/manager.cpp +++ b/ydb/services/metadata/secret/manager.cpp @@ -7,13 +7,13 @@ namespace NKikimr::NMetadata::NSecret { void TAccessManager::DoPrepareObjectsBeforeModification(std::vector<TAccess>&& patchedObjects, NModifications::IAlterPreparationController<TAccess>::TPtr controller, const NModifications::IOperationsManager::TModificationContext& context) const { if (context.GetActivityType() == IOperationsManager::EActivityType::Alter) { - controller->PreparationProblem("access object cannot be modified"); + controller->OnPreparationProblem("access object cannot be modified"); return; } if (!!context.GetUserToken()) { for (auto&& i : patchedObjects) { if (i.GetOwnerUserId() != context.GetUserToken()->GetUserSID()) { - controller->PreparationProblem("no permissions for modify secret access"); + controller->OnPreparationProblem("no permissions for modify secret access"); return; } } @@ -87,7 +87,7 @@ void TSecretManager::DoPrepareObjectsBeforeModification(std::vector<TSecret>&& p if (!!context.GetUserToken()) { for (auto&& i : patchedObjects) { if (i.GetOwnerUserId() != context.GetUserToken()->GetUserSID()) { - controller->PreparationProblem("no permissions for modify secrets"); + controller->OnPreparationProblem("no permissions for modify secrets"); return; } } diff --git a/ydb/services/metadata/secret/secret_behaviour.h b/ydb/services/metadata/secret/secret_behaviour.h index ad2fcc739aa..24ea6585c25 100644 --- a/ydb/services/metadata/secret/secret_behaviour.h +++ b/ydb/services/metadata/secret/secret_behaviour.h @@ -5,7 +5,7 @@ namespace NKikimr::NMetadata::NSecret { -class TSecretBehaviour: public IClassBehaviour { +class TSecretBehaviour: public TClassBehaviour<TSecret> { private: static TFactory::TRegistrator<TSecretBehaviour> Registrator; protected: diff --git a/ydb/services/metadata/secret/snapshot.cpp b/ydb/services/metadata/secret/snapshot.cpp index 7e0b9423e6a..8c2e8eeb5ca 100644 --- a/ydb/services/metadata/secret/snapshot.cpp +++ b/ydb/services/metadata/secret/snapshot.cpp @@ -4,30 +4,8 @@ namespace NKikimr::NMetadata::NSecret { bool TSnapshot::DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawDataResult) { Y_VERIFY(rawDataResult.result_sets().size() == 2); - { - auto& rawData = rawDataResult.result_sets()[0]; - TSecret::TDecoder decoder(rawData); - for (auto&& r : rawData.rows()) { - TSecret object; - if (!object.DeserializeFromRecord(decoder, r)) { - ALS_ERROR(NKikimrServices::METADATA_SECRET) << "cannot parse secret info for snapshot"; - continue; - } - Secrets.emplace(object, object); - } - } - { - auto& rawData = rawDataResult.result_sets()[1]; - TAccess::TDecoder decoder(rawData); - for (auto&& r : rawData.rows()) { - TAccess object; - if (!object.DeserializeFromRecord(decoder, r)) { - ALS_ERROR(NKikimrServices::METADATA_SECRET) << "cannot parse secret info for snapshot"; - continue; - } - Access.emplace_back(object); - } - } + ParseSnapshotObjects<TSecret>(rawDataResult.result_sets()[0], [this](TSecret&& s) {Secrets.emplace(s, s); }); + ParseSnapshotObjects<TAccess>(rawDataResult.result_sets()[1], [this](TAccess&& s) {Access.emplace_back(std::move(s)); }); return true; } |