aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-01-09 14:01:31 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-01-09 14:01:31 +0300
commitd7316a25ddae54e96b1db61144a2ab8c7255cfe6 (patch)
treecd8cb4bbcceaea7f32a86eae728cc018532b56f2
parentc11c17bd122fa8e9d4fb1bf61352caa0be47235f (diff)
downloadydb-d7316a25ddae54e96b1db61144a2ab8c7255cfe6.tar.gz
decomposite metadata service
reuse snapshots parsing method initialization bg tasks through metadata service
-rw-r--r--ydb/core/persqueue/writer/metadata_initializers.cpp2
-rw-r--r--ydb/core/persqueue/writer/metadata_initializers.h4
-rw-r--r--ydb/core/tx/tiering/rule/behaviour.h4
-rw-r--r--ydb/core/tx/tiering/rule/checker.cpp14
-rw-r--r--ydb/core/tx/tiering/rule/initializer.cpp2
-rw-r--r--ydb/core/tx/tiering/snapshot.cpp31
-rw-r--r--ydb/core/tx/tiering/tier/behaviour.h3
-rw-r--r--ydb/core/tx/tiering/tier/checker.cpp14
-rw-r--r--ydb/core/tx/tiering/tier/initializer.cpp2
-rw-r--r--ydb/core/tx/tiering/ut/ut_tiers.cpp4
-rw-r--r--ydb/services/bg_tasks/ds_table/CMakeLists.darwin.txt1
-rw-r--r--ydb/services/bg_tasks/ds_table/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/services/bg_tasks/ds_table/CMakeLists.linux.txt1
-rw-r--r--ydb/services/bg_tasks/ds_table/assign_tasks.cpp2
-rw-r--r--ydb/services/bg_tasks/ds_table/behaviour.cpp14
-rw-r--r--ydb/services/bg_tasks/ds_table/behaviour.h27
-rw-r--r--ydb/services/bg_tasks/ds_table/executor.cpp34
-rw-r--r--ydb/services/bg_tasks/ds_table/executor.h35
-rw-r--r--ydb/services/bg_tasks/ds_table/executor_controller.cpp12
-rw-r--r--ydb/services/bg_tasks/ds_table/executor_controller.h12
-rw-r--r--ydb/services/bg_tasks/ds_table/fetch_tasks.cpp4
-rw-r--r--ydb/services/bg_tasks/ds_table/finish_task.cpp2
-rw-r--r--ydb/services/bg_tasks/ds_table/initialization.cpp2
-rw-r--r--ydb/services/bg_tasks/ds_table/interrupt.cpp2
-rw-r--r--ydb/services/bg_tasks/ds_table/lock_pinger.cpp2
-rw-r--r--ydb/services/bg_tasks/ut/ut_tasks.cpp2
-rw-r--r--ydb/services/metadata/abstract/common.h1
-rw-r--r--ydb/services/metadata/abstract/fetcher.h19
-rw-r--r--ydb/services/metadata/abstract/kqp_common.cpp7
-rw-r--r--ydb/services/metadata/abstract/kqp_common.h19
-rw-r--r--ydb/services/metadata/ds_table/CMakeLists.darwin.txt3
-rw-r--r--ydb/services/metadata/ds_table/CMakeLists.linux-aarch64.txt3
-rw-r--r--ydb/services/metadata/ds_table/CMakeLists.linux.txt3
-rw-r--r--ydb/services/metadata/ds_table/behaviour_registrator_actor.cpp68
-rw-r--r--ydb/services/metadata/ds_table/behaviour_registrator_actor.h54
-rw-r--r--ydb/services/metadata/ds_table/initializer_actor.cpp29
-rw-r--r--ydb/services/metadata/ds_table/initializer_actor.h59
-rw-r--r--ydb/services/metadata/ds_table/registration.cpp97
-rw-r--r--ydb/services/metadata/ds_table/registration.h143
-rw-r--r--ydb/services/metadata/ds_table/scheme_describe.h28
-rw-r--r--ydb/services/metadata/ds_table/service.cpp169
-rw-r--r--ydb/services/metadata/ds_table/service.h149
-rw-r--r--ydb/services/metadata/initializer/accessor_init.cpp4
-rw-r--r--ydb/services/metadata/initializer/behaviour.h3
-rw-r--r--ydb/services/metadata/initializer/common.h6
-rw-r--r--ydb/services/metadata/initializer/controller.cpp10
-rw-r--r--ydb/services/metadata/initializer/controller.h10
-rw-r--r--ydb/services/metadata/initializer/initializer.cpp2
-rw-r--r--ydb/services/metadata/initializer/manager.cpp2
-rw-r--r--ydb/services/metadata/initializer/snapshot.cpp9
-rw-r--r--ydb/services/metadata/initializer/snapshot.h2
-rw-r--r--ydb/services/metadata/initializer/ut/ut_init.cpp8
-rw-r--r--ydb/services/metadata/manager/abstract.h24
-rw-r--r--ydb/services/metadata/manager/alter.h6
-rw-r--r--ydb/services/metadata/manager/alter_impl.h38
-rw-r--r--ydb/services/metadata/manager/common.h8
-rw-r--r--ydb/services/metadata/manager/generic_manager.h4
-rw-r--r--ydb/services/metadata/manager/modification.h4
-rw-r--r--ydb/services/metadata/manager/modification_controller.h4
-rw-r--r--ydb/services/metadata/manager/restore.h8
-rw-r--r--ydb/services/metadata/manager/restore_controller.h4
-rw-r--r--ydb/services/metadata/secret/access_behaviour.h2
-rw-r--r--ydb/services/metadata/secret/checker_access.cpp4
-rw-r--r--ydb/services/metadata/secret/checker_secret.cpp6
-rw-r--r--ydb/services/metadata/secret/initializer.cpp4
-rw-r--r--ydb/services/metadata/secret/manager.cpp6
-rw-r--r--ydb/services/metadata/secret/secret_behaviour.h2
-rw-r--r--ydb/services/metadata/secret/snapshot.cpp26
68 files changed, 809 insertions, 481 deletions
diff --git a/ydb/core/persqueue/writer/metadata_initializers.cpp b/ydb/core/persqueue/writer/metadata_initializers.cpp
index 651242956d8..563565014a4 100644
--- a/ydb/core/persqueue/writer/metadata_initializers.cpp
+++ b/ydb/core/persqueue/writer/metadata_initializers.cpp
@@ -62,7 +62,7 @@ void TSrcIdMetaInitializer::DoPrepare(NInitializer::IInitializerInput::TPtr cont
result.emplace_back(new NInitializer::TGenericTableModifier<NRequest::TDialogCreateTable>(request, "create"));
}
result.emplace_back(NInitializer::TACLModifierConstructor::GetReadOnlyModifier(tablePath, "acl"));
- controller->PreparationFinished(result);
+ controller->OnPreparationFinished(result);
}
diff --git a/ydb/core/persqueue/writer/metadata_initializers.h b/ydb/core/persqueue/writer/metadata_initializers.h
index aa78cf6c48b..76799fe1628 100644
--- a/ydb/core/persqueue/writer/metadata_initializers.h
+++ b/ydb/core/persqueue/writer/metadata_initializers.h
@@ -31,10 +31,10 @@ protected:
TInitBehaviourPtr ConstructInitializer() const override {
return TSrcIdMetaInitializer::GetInstant();
}
- std::shared_ptr<NMetadata::NModifications::IOperationsManager> ConstructOperationsManager() const override {
+public:
+ std::shared_ptr<NMetadata::NModifications::IOperationsManager> GetOperationsManager() const override {
return nullptr;
}
-public:
static TClassBehaviourPtr GetInstant() {
static TClassBehaviourPtr res{new TSrcIdMetaInitManager()};
return res;
diff --git a/ydb/core/tx/tiering/rule/behaviour.h b/ydb/core/tx/tiering/rule/behaviour.h
index b7e29d1ce24..c10f2f24d73 100644
--- a/ydb/core/tx/tiering/rule/behaviour.h
+++ b/ydb/core/tx/tiering/rule/behaviour.h
@@ -1,11 +1,11 @@
#pragma once
-#include "object.h"
+#include "object.h"
#include <ydb/services/metadata/abstract/kqp_common.h>
namespace NKikimr::NColumnShard::NTiers {
-class TTieringRuleBehaviour: public NMetadata::IClassBehaviour {
+class TTieringRuleBehaviour: public NMetadata::TClassBehaviour<TTieringRule> {
private:
static TFactory::TRegistrator<TTieringRuleBehaviour> Registrator;
protected:
diff --git a/ydb/core/tx/tiering/rule/checker.cpp b/ydb/core/tx/tiering/rule/checker.cpp
index 4b22d14d0e1..9d6cb899567 100644
--- a/ydb/core/tx/tiering/rule/checker.cpp
+++ b/ydb/core/tx/tiering/rule/checker.cpp
@@ -16,7 +16,7 @@ void TRulePreparationActor::StartChecker() {
}
auto g = PassAwayGuard();
if (!SSCheckResult->GetContent().GetOperationAllow()) {
- Controller->PreparationProblem(SSCheckResult->GetContent().GetDenyReason());
+ Controller->OnPreparationProblem(SSCheckResult->GetContent().GetDenyReason());
return;
}
@@ -24,29 +24,29 @@ void TRulePreparationActor::StartChecker() {
for (auto&& interval : tiering.GetIntervals()) {
auto tier = Tierings->GetTierById(interval.GetTierName());
if (!tier) {
- Controller->PreparationProblem("unknown tier usage: " + interval.GetTierName());
+ Controller->OnPreparationProblem("unknown tier usage: " + interval.GetTierName());
return;
} else if (!Secrets->CheckSecretAccess(tier->GetProtoConfig().GetObjectStorage().GetAccessKey(), Context.GetUserToken())) {
- Controller->PreparationProblem("no access for secret: " + tier->GetProtoConfig().GetObjectStorage().GetAccessKey());
+ Controller->OnPreparationProblem("no access for secret: " + tier->GetProtoConfig().GetObjectStorage().GetAccessKey());
return;
} else if (!Secrets->CheckSecretAccess(tier->GetProtoConfig().GetObjectStorage().GetSecretKey(), Context.GetUserToken())) {
- Controller->PreparationProblem("no access for secret: " + tier->GetProtoConfig().GetObjectStorage().GetSecretKey());
+ Controller->OnPreparationProblem("no access for secret: " + tier->GetProtoConfig().GetObjectStorage().GetSecretKey());
return;
}
}
}
- Controller->PreparationFinished(std::move(Objects));
+ Controller->OnPreparationFinished(std::move(Objects));
}
void TRulePreparationActor::Handle(NSchemeShard::TEvSchemeShard::TEvProcessingResponse::TPtr& ev) {
auto& proto = ev->Get()->Record;
if (proto.HasError()) {
- Controller->PreparationProblem(proto.GetError().GetErrorMessage());
+ Controller->OnPreparationProblem(proto.GetError().GetErrorMessage());
PassAway();
} else if (proto.HasContent()) {
SSCheckResult = SSFetcher->UnpackResult(ev->Get()->Record.GetContent().GetData());
if (!SSCheckResult) {
- Controller->PreparationProblem("cannot unpack ss-fetcher result for class " + SSFetcher->GetClassName());
+ Controller->OnPreparationProblem("cannot unpack ss-fetcher result for class " + SSFetcher->GetClassName());
PassAway();
} else {
StartChecker();
diff --git a/ydb/core/tx/tiering/rule/initializer.cpp b/ydb/core/tx/tiering/rule/initializer.cpp
index a14b70ce26a..b2f07c8ae45 100644
--- a/ydb/core/tx/tiering/rule/initializer.cpp
+++ b/ydb/core/tx/tiering/rule/initializer.cpp
@@ -35,7 +35,7 @@ TVector<NKikimr::NMetadata::NInitializer::ITableModifier::TPtr> TTierRulesInitia
}
void TTierRulesInitializer::DoPrepare(NMetadata::NInitializer::IInitializerInput::TPtr controller) const {
- controller->PreparationFinished(BuildModifiers());
+ controller->OnPreparationFinished(BuildModifiers());
}
}
diff --git a/ydb/core/tx/tiering/snapshot.cpp b/ydb/core/tx/tiering/snapshot.cpp
index e33421e5a5c..6034aefc492 100644
--- a/ydb/core/tx/tiering/snapshot.cpp
+++ b/ydb/core/tx/tiering/snapshot.cpp
@@ -11,35 +11,8 @@ namespace NKikimr::NColumnShard::NTiers {
bool TConfigsSnapshot::DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawDataResult) {
Y_VERIFY(rawDataResult.result_sets().size() == 2);
- {
- auto& rawData = rawDataResult.result_sets()[0];
- TTierConfig::TDecoder decoder(rawData);
- for (auto&& r : rawData.rows()) {
- TTierConfig config;
- if (!config.DeserializeFromRecord(decoder, r)) {
- ALS_ERROR(NKikimrServices::TX_TIERING) << "cannot parse tier config from snapshot";
- continue;
- }
- TierConfigs.emplace(config.GetTierName(), config);
- }
- }
- {
- auto& rawData = rawDataResult.result_sets()[1];
- TTieringRule::TDecoder decoder(rawData);
- TVector<TTieringRule> rulesLocal;
- rulesLocal.reserve(rawData.rows().size());
- for (auto&& r : rawData.rows()) {
- TTieringRule tr;
- if (!tr.DeserializeFromRecord(decoder, r)) {
- ALS_WARN(NKikimrServices::TX_TIERING) << "cannot parse record for tiering info";
- continue;
- }
- rulesLocal.emplace_back(std::move(tr));
- }
- for (auto&& i : rulesLocal) {
- TableTierings.emplace(i.GetTieringRuleId(), std::move(i));
- }
- }
+ ParseSnapshotObjects<TTierConfig>(rawDataResult.result_sets()[0], [this](TTierConfig&& s) {TierConfigs.emplace(s.GetTierName(), s); });
+ ParseSnapshotObjects<TTieringRule>(rawDataResult.result_sets()[1], [this](TTieringRule&& s) {TableTierings.emplace(s.GetTieringRuleId(), s); });
return true;
}
diff --git a/ydb/core/tx/tiering/tier/behaviour.h b/ydb/core/tx/tiering/tier/behaviour.h
index ba51a201dcb..fd231708fff 100644
--- a/ydb/core/tx/tiering/tier/behaviour.h
+++ b/ydb/core/tx/tiering/tier/behaviour.h
@@ -1,10 +1,11 @@
#pragma once
+#include "object.h"
#include <ydb/services/metadata/abstract/kqp_common.h>
namespace NKikimr::NColumnShard::NTiers {
-class TTierConfigBehaviour: public NMetadata::IClassBehaviour {
+class TTierConfigBehaviour: public NMetadata::TClassBehaviour<TTierConfig> {
private:
static TFactory::TRegistrator<TTierConfigBehaviour> Registrator;
protected:
diff --git a/ydb/core/tx/tiering/tier/checker.cpp b/ydb/core/tx/tiering/tier/checker.cpp
index 3f0156d0934..46e24eeb229 100644
--- a/ydb/core/tx/tiering/tier/checker.cpp
+++ b/ydb/core/tx/tiering/tier/checker.cpp
@@ -12,7 +12,7 @@ void TTierPreparationActor::StartChecker() {
}
auto g = PassAwayGuard();
if (!SSCheckResult->GetContent().GetOperationAllow()) {
- Controller->PreparationProblem(SSCheckResult->GetContent().GetDenyReason());
+ Controller->OnPreparationProblem(SSCheckResult->GetContent().GetDenyReason());
return;
}
for (auto&& tier : Objects) {
@@ -27,30 +27,30 @@ void TTierPreparationActor::StartChecker() {
}
}
if (tieringsWithTiers.size()) {
- Controller->PreparationProblem("tier in usage for tierings: " + JoinSeq(", ", tieringsWithTiers));
+ Controller->OnPreparationProblem("tier in usage for tierings: " + JoinSeq(", ", tieringsWithTiers));
return;
}
}
if (!Secrets->CheckSecretAccess(tier.GetProtoConfig().GetObjectStorage().GetAccessKey(), Context.GetUserToken())) {
- Controller->PreparationProblem("no access for secret: " + tier.GetProtoConfig().GetObjectStorage().GetAccessKey());
+ Controller->OnPreparationProblem("no access for secret: " + tier.GetProtoConfig().GetObjectStorage().GetAccessKey());
return;
} else if (!Secrets->CheckSecretAccess(tier.GetProtoConfig().GetObjectStorage().GetSecretKey(), Context.GetUserToken())) {
- Controller->PreparationProblem("no access for secret: " + tier.GetProtoConfig().GetObjectStorage().GetSecretKey());
+ Controller->OnPreparationProblem("no access for secret: " + tier.GetProtoConfig().GetObjectStorage().GetSecretKey());
return;
}
}
- Controller->PreparationFinished(std::move(Objects));
+ Controller->OnPreparationFinished(std::move(Objects));
}
void TTierPreparationActor::Handle(NSchemeShard::TEvSchemeShard::TEvProcessingResponse::TPtr& ev) {
auto& proto = ev->Get()->Record;
if (proto.HasError()) {
- Controller->PreparationProblem(proto.GetError().GetErrorMessage());
+ Controller->OnPreparationProblem(proto.GetError().GetErrorMessage());
PassAway();
} else if (proto.HasContent()) {
SSCheckResult = SSFetcher->UnpackResult(ev->Get()->Record.GetContent().GetData());
if (!SSCheckResult) {
- Controller->PreparationProblem("cannot unpack ss-fetcher result for class " + SSFetcher->GetClassName());
+ Controller->OnPreparationProblem("cannot unpack ss-fetcher result for class " + SSFetcher->GetClassName());
PassAway();
} else {
StartChecker();
diff --git a/ydb/core/tx/tiering/tier/initializer.cpp b/ydb/core/tx/tiering/tier/initializer.cpp
index 11d1cc0e2f9..cbd9599b0fa 100644
--- a/ydb/core/tx/tiering/tier/initializer.cpp
+++ b/ydb/core/tx/tiering/tier/initializer.cpp
@@ -30,7 +30,7 @@ TVector<NKikimr::NMetadata::NInitializer::ITableModifier::TPtr> TTiersInitialize
}
void TTiersInitializer::DoPrepare(NMetadata::NInitializer::IInitializerInput::TPtr controller) const {
- controller->PreparationFinished(BuildModifiers());
+ controller->OnPreparationFinished(BuildModifiers());
}
}
diff --git a/ydb/core/tx/tiering/ut/ut_tiers.cpp b/ydb/core/tx/tiering/ut/ut_tiers.cpp
index 9c3d133ba8d..6376bbec211 100644
--- a/ydb/core/tx/tiering/ut/ut_tiers.cpp
+++ b/ydb/core/tx/tiering/ut/ut_tiers.cpp
@@ -222,11 +222,11 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
private:
YDB_READONLY_FLAG(Finished, false);
public:
- virtual void AlterProblem(const TString& errorMessage) override {
+ virtual void OnAlteringProblem(const TString& errorMessage) override {
Cerr << errorMessage << Endl;
Y_VERIFY(false);
}
- virtual void AlterFinished() override {
+ virtual void OnAlteringFinished() override {
FinishedFlag = true;
}
};
diff --git a/ydb/services/bg_tasks/ds_table/CMakeLists.darwin.txt b/ydb/services/bg_tasks/ds_table/CMakeLists.darwin.txt
index 573f38f4356..a689fcd4138 100644
--- a/ydb/services/bg_tasks/ds_table/CMakeLists.darwin.txt
+++ b/ydb/services/bg_tasks/ds_table/CMakeLists.darwin.txt
@@ -20,6 +20,7 @@ target_link_libraries(services-bg_tasks-ds_table PUBLIC
services-metadata-request
)
target_sources(services-bg_tasks-ds_table PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/behaviour.cpp
${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/executor.cpp
${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/interrupt.cpp
${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/task_executor_controller.cpp
diff --git a/ydb/services/bg_tasks/ds_table/CMakeLists.linux-aarch64.txt b/ydb/services/bg_tasks/ds_table/CMakeLists.linux-aarch64.txt
index be43e3e6080..6dd0b379cc2 100644
--- a/ydb/services/bg_tasks/ds_table/CMakeLists.linux-aarch64.txt
+++ b/ydb/services/bg_tasks/ds_table/CMakeLists.linux-aarch64.txt
@@ -21,6 +21,7 @@ target_link_libraries(services-bg_tasks-ds_table PUBLIC
services-metadata-request
)
target_sources(services-bg_tasks-ds_table PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/behaviour.cpp
${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/executor.cpp
${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/interrupt.cpp
${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/task_executor_controller.cpp
diff --git a/ydb/services/bg_tasks/ds_table/CMakeLists.linux.txt b/ydb/services/bg_tasks/ds_table/CMakeLists.linux.txt
index be43e3e6080..6dd0b379cc2 100644
--- a/ydb/services/bg_tasks/ds_table/CMakeLists.linux.txt
+++ b/ydb/services/bg_tasks/ds_table/CMakeLists.linux.txt
@@ -21,6 +21,7 @@ target_link_libraries(services-bg_tasks-ds_table PUBLIC
services-metadata-request
)
target_sources(services-bg_tasks-ds_table PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/behaviour.cpp
${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/executor.cpp
${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/interrupt.cpp
${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/task_executor_controller.cpp
diff --git a/ydb/services/bg_tasks/ds_table/assign_tasks.cpp b/ydb/services/bg_tasks/ds_table/assign_tasks.cpp
index 2b8fbc490f4..4bff7f55e08 100644
--- a/ydb/services/bg_tasks/ds_table/assign_tasks.cpp
+++ b/ydb/services/bg_tasks/ds_table/assign_tasks.cpp
@@ -39,7 +39,7 @@ std::optional<NMetadata::NRequest::TDialogYQLRequest::TRequest> TAssignTasksActo
}
void TAssignTasksActor::OnResult(const NMetadata::NRequest::TDialogYQLRequest::TResponse& /*result*/) {
- Controller->AssignFinished();
+ Controller->OnAssignFinished();
}
}
diff --git a/ydb/services/bg_tasks/ds_table/behaviour.cpp b/ydb/services/bg_tasks/ds_table/behaviour.cpp
new file mode 100644
index 00000000000..f8fa956ad5a
--- /dev/null
+++ b/ydb/services/bg_tasks/ds_table/behaviour.cpp
@@ -0,0 +1,14 @@
+#include "behaviour.h"
+#include "initialization.h"
+
+namespace NKikimr::NBackgroundTasks {
+
+std::shared_ptr<NMetadata::NInitializer::IInitializationBehaviour> TBehaviour::ConstructInitializer() const {
+ return std::make_shared<TBGTasksInitializer>(Config);
+}
+
+std::shared_ptr<NMetadata::NModifications::IOperationsManager> TBehaviour::GetOperationsManager() const {
+ return nullptr;
+}
+
+}
diff --git a/ydb/services/bg_tasks/ds_table/behaviour.h b/ydb/services/bg_tasks/ds_table/behaviour.h
new file mode 100644
index 00000000000..60cff56c298
--- /dev/null
+++ b/ydb/services/bg_tasks/ds_table/behaviour.h
@@ -0,0 +1,27 @@
+#pragma once
+#include "config.h"
+#include <ydb/services/metadata/abstract/kqp_common.h>
+#include <ydb/services/metadata/abstract/initialization.h>
+#include <ydb/services/metadata/manager/abstract.h>
+
+namespace NKikimr::NBackgroundTasks {
+
+class TBehaviour: public NMetadata::IClassBehaviour {
+private:
+ const TConfig Config;
+public:
+ virtual TString GetInternalStorageTablePath() const override {
+ return Config.GetTablePath();
+ }
+ virtual std::shared_ptr<NMetadata::NInitializer::IInitializationBehaviour> ConstructInitializer() const override;
+ virtual std::shared_ptr<NMetadata::NModifications::IOperationsManager> GetOperationsManager() const override;
+ TBehaviour(const TConfig& config)
+ : Config(config) {
+
+ }
+ virtual TString GetTypeId() const override {
+ return "bg_tasks";
+ }
+};
+
+}
diff --git a/ydb/services/bg_tasks/ds_table/executor.cpp b/ydb/services/bg_tasks/ds_table/executor.cpp
index 48b8d3edd41..ba56f99197c 100644
--- a/ydb/services/bg_tasks/ds_table/executor.cpp
+++ b/ydb/services/bg_tasks/ds_table/executor.cpp
@@ -59,36 +59,44 @@ void TExecutor::Handle(TEvTaskExecutorFinished::TPtr& ev) {
}
void TExecutor::Handle(TEvAddTask::TPtr& ev) {
- ALS_DEBUG(NKikimrServices::BG_TASKS) << "add task";
- Register(new TAddTasksActor(InternalController, ev->Get()->GetTask(), ev->Sender));
+ if (CheckActivity()) {
+ ALS_DEBUG(NKikimrServices::BG_TASKS) << "add task";
+ Register(new TAddTasksActor(InternalController, ev->Get()->GetTask(), ev->Sender));
+ } else {
+ DeferredEventsOnIntialization.Add(*ev);
+ }
}
void TExecutor::Handle(TEvUpdateTaskEnabled::TPtr& ev) {
- ALS_DEBUG(NKikimrServices::BG_TASKS) << "start task";
- Register(new TUpdateTaskEnabledActor(InternalController, ev->Get()->GetTaskId(), ev->Get()->GetEnabled(), ev->Sender));
+ if (CheckActivity()) {
+ ALS_DEBUG(NKikimrServices::BG_TASKS) << "start task";
+ Register(new TUpdateTaskEnabledActor(InternalController, ev->Get()->GetTaskId(), ev->Get()->GetEnabled(), ev->Sender));
+ } else {
+ DeferredEventsOnIntialization.Add(*ev);
+ }
}
-void TExecutor::Handle(NMetadata::NInitializer::TEvInitializationFinished::TPtr& /*ev*/) {
+void TExecutor::Handle(NMetadata::NProvider::TEvManagerPrepared::TPtr& /*ev*/) {
+ ActivityState = EActivity::Active;
Sender<TEvStartAssign>().SendTo(SelfId());
Schedule(Config.GetPingPeriod(), new TEvLockPingerStart);
+ DeferredEventsOnIntialization.ResendAll(SelfId());
}
void TExecutor::Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev) {
auto snapshot = ev->Get()->GetValidatedSnapshotAs<NMetadata::NInitializer::TSnapshot>();
- auto b = std::make_shared<TBGTasksInitializer>(Config);
- Register(new NMetadata::NInitializer::TDSAccessorInitialized(Config.GetRequestConfig(), "bg_tasks", b, InternalController, snapshot));
+ Y_VERIFY(snapshot, "incorrect initialization snapshot from metadata service");
+ if (snapshot->HasComponent("bg_tasks")) {
+ CheckActivity();
+ }
}
void TExecutor::Bootstrap() {
InternalController = std::make_shared<TExecutorController>(SelfId(), Config);
Become(&TExecutor::StateMain);
auto manager = std::make_shared<NMetadata::NInitializer::TFetcher>();
- if (NMetadata::NProvider::TServiceOperator::IsEnabled()) {
- Sender<NMetadata::NProvider::TEvSubscribeExternal>(manager).SendTo(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()));
- } else {
- auto b = std::make_shared<TBGTasksInitializer>(Config);
- Register(new NMetadata::NInitializer::TDSAccessorInitialized(Config.GetRequestConfig(), "bg_tasks", b, InternalController, nullptr));
- }
+ Y_VERIFY(NMetadata::NProvider::TServiceOperator::IsEnabled(), "metadata service not active");
+ Sender<NMetadata::NProvider::TEvSubscribeExternal>(manager).SendTo(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()));
}
NActors::IActor* CreateService(const TConfig& config) {
diff --git a/ydb/services/bg_tasks/ds_table/executor.h b/ydb/services/bg_tasks/ds_table/executor.h
index e0c1a7aeaac..7fa99c2a149 100644
--- a/ydb/services/bg_tasks/ds_table/executor.h
+++ b/ydb/services/bg_tasks/ds_table/executor.h
@@ -1,4 +1,5 @@
#pragma once
+#include "behaviour.h"
#include "config.h"
#include "executor_controller.h"
@@ -6,6 +7,8 @@
#include <ydb/services/bg_tasks/abstract/task.h>
#include <ydb/services/bg_tasks/service.h>
#include <ydb/services/metadata/initializer/accessor_init.h>
+#include <ydb/services/metadata/ds_table/service.h>
+#include <ydb/services/metadata/service.h>
namespace NKikimr::NBackgroundTasks {
@@ -59,8 +62,34 @@ private:
const TConfig Config;
std::set<TString> CurrentTaskIds;
TExecutorController::TPtr InternalController;
+ NMetadata::NProvider::TEventsWaiter DeferredEventsOnIntialization;
+
+ std::shared_ptr<TBehaviour> Behaviour;
+
+ enum class EActivity {
+ Created,
+ Preparation,
+ Active
+ };
+
+ EActivity ActivityState = EActivity::Created;
+
+ bool CheckActivity() {
+ switch (ActivityState) {
+ case EActivity::Created:
+ ActivityState = EActivity::Preparation;
+ Sender<NMetadata::NProvider::TEvPrepareManager>(Behaviour).SendTo(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()));
+ break;
+ case EActivity::Preparation:
+ break;
+ case EActivity::Active:
+ return true;
+ }
+ return false;
+ }
+
protected:
- void Handle(NMetadata::NInitializer::TEvInitializationFinished::TPtr& ev);
+ void Handle(NMetadata::NProvider::TEvManagerPrepared::TPtr& ev);
void Handle(TEvStartAssign::TPtr& ev);
void Handle(TEvAssignFinished::TPtr& ev);
void Handle(TEvFetchingFinished::TPtr& ev);
@@ -74,7 +103,7 @@ protected:
STATEFN(StateMain) {
switch (ev->GetTypeRewrite()) {
- hFunc(NMetadata::NInitializer::TEvInitializationFinished, Handle);
+ hFunc(NMetadata::NProvider::TEvManagerPrepared, Handle);
hFunc(TEvStartAssign, Handle);
hFunc(TEvAssignFinished, Handle);
hFunc(TEvFetchingFinished, Handle);
@@ -94,7 +123,9 @@ public:
TExecutor(const TConfig& config)
: Config(config)
+ , Behaviour(std::make_shared<TBehaviour>(Config))
{
+
TServiceOperator::Register();
}
};
diff --git a/ydb/services/bg_tasks/ds_table/executor_controller.cpp b/ydb/services/bg_tasks/ds_table/executor_controller.cpp
index 03b87684771..05e888de043 100644
--- a/ydb/services/bg_tasks/ds_table/executor_controller.cpp
+++ b/ydb/services/bg_tasks/ds_table/executor_controller.cpp
@@ -7,27 +7,27 @@
namespace NKikimr::NBackgroundTasks {
-void TExecutorController::AssignFinished() const {
+void TExecutorController::OnAssignFinished() const {
ExecutorActorId.Send(ExecutorActorId, new TEvAssignFinished());
}
-void TExecutorController::FetchingFinished() const {
+void TExecutorController::OnFetchingFinished() const {
ExecutorActorId.Send(ExecutorActorId, new TEvFetchingFinished());
}
-void TExecutorController::TaskFetched(const TTask& task) const {
+void TExecutorController::OnTaskFetched(const TTask& task) const {
ExecutorActorId.Send(ExecutorActorId, new TEvTaskFetched(task));
}
-void TExecutorController::TaskFinished(const TString& taskId) const {
+void TExecutorController::OnTaskFinished(const TString& taskId) const {
ExecutorActorId.Send(ExecutorActorId, new TEvTaskExecutorFinished(taskId));
}
-void TExecutorController::InitializationFinished(const TString& id) const {
+void TExecutorController::OnInitializationFinished(const TString& id) const {
ExecutorActorId.Send(ExecutorActorId, new NMetadata::NInitializer::TEvInitializationFinished(id));
}
-void TExecutorController::LockPingerFinished() const {
+void TExecutorController::OnLockPingerFinished() const {
ExecutorActorId.Send(ExecutorActorId, new TEvLockPingerFinished());
}
diff --git a/ydb/services/bg_tasks/ds_table/executor_controller.h b/ydb/services/bg_tasks/ds_table/executor_controller.h
index e0f779cb9d2..69b27ef1252 100644
--- a/ydb/services/bg_tasks/ds_table/executor_controller.h
+++ b/ydb/services/bg_tasks/ds_table/executor_controller.h
@@ -38,12 +38,12 @@ public:
return Config.GetRequestConfig();
}
- virtual void InitializationFinished(const TString& id) const override;
- void LockPingerFinished() const;
- void TaskFetched(const TTask& task) const;
- void TaskFinished(const TString& taskId) const;
- void AssignFinished() const;
- void FetchingFinished() const;
+ virtual void OnInitializationFinished(const TString& id) const override;
+ void OnLockPingerFinished() const;
+ void OnTaskFetched(const TTask& task) const;
+ void OnTaskFinished(const TString& taskId) const;
+ void OnAssignFinished() const;
+ void OnFetchingFinished() const;
};
}
diff --git a/ydb/services/bg_tasks/ds_table/fetch_tasks.cpp b/ydb/services/bg_tasks/ds_table/fetch_tasks.cpp
index 80fb325a218..b254c836c85 100644
--- a/ydb/services/bg_tasks/ds_table/fetch_tasks.cpp
+++ b/ydb/services/bg_tasks/ds_table/fetch_tasks.cpp
@@ -16,9 +16,9 @@ void TFetchTasksActor::OnResult(const NMetadata::NRequest::TDialogYQLRequest::TR
ALS_ERROR(NKikimrServices::BG_TASKS) << "cannot parse task record";
continue;
}
- Controller->TaskFetched(task);
+ Controller->OnTaskFetched(task);
}
- Controller->FetchingFinished();
+ Controller->OnFetchingFinished();
}
std::optional<NMetadata::NRequest::TDialogYQLRequest::TRequest> TFetchTasksActor::OnSessionId(const TString& sessionId) {
diff --git a/ydb/services/bg_tasks/ds_table/finish_task.cpp b/ydb/services/bg_tasks/ds_table/finish_task.cpp
index d311515f9cf..b5dbed8204b 100644
--- a/ydb/services/bg_tasks/ds_table/finish_task.cpp
+++ b/ydb/services/bg_tasks/ds_table/finish_task.cpp
@@ -21,7 +21,7 @@ std::optional<NMetadata::NRequest::TDialogYQLRequest::TRequest> TDropTaskActor::
}
void TDropTaskActor::OnResult(const NMetadata::NRequest::TDialogYQLRequest::TResponse& /*result*/) {
- Controller->TaskFinished(TaskId);
+ Controller->OnTaskFinished(TaskId);
}
}
diff --git a/ydb/services/bg_tasks/ds_table/initialization.cpp b/ydb/services/bg_tasks/ds_table/initialization.cpp
index 818b1ee5185..1951e6a5c2c 100644
--- a/ydb/services/bg_tasks/ds_table/initialization.cpp
+++ b/ydb/services/bg_tasks/ds_table/initialization.cpp
@@ -63,7 +63,7 @@ void TBGTasksInitializer::DoPrepare(NMetadata::NInitializer::IInitializerInput::
}
result.emplace_back(new NMetadata::NInitializer::TGenericTableModifier<NMetadata::NRequest::TDialogCreateTable>(request, "create"));
}
- controller->PreparationFinished(result);
+ controller->OnPreparationFinished(result);
}
}
diff --git a/ydb/services/bg_tasks/ds_table/interrupt.cpp b/ydb/services/bg_tasks/ds_table/interrupt.cpp
index 07be29788db..5c22e3872ff 100644
--- a/ydb/services/bg_tasks/ds_table/interrupt.cpp
+++ b/ydb/services/bg_tasks/ds_table/interrupt.cpp
@@ -38,7 +38,7 @@ std::optional<NMetadata::NRequest::TDialogYQLRequest::TRequest> TInterruptTaskAc
}
void TInterruptTaskActor::OnResult(const NMetadata::NRequest::TDialogYQLRequest::TResponse& /*result*/) {
- ExecutorController->TaskFinished(TaskId);
+ ExecutorController->OnTaskFinished(TaskId);
}
}
diff --git a/ydb/services/bg_tasks/ds_table/lock_pinger.cpp b/ydb/services/bg_tasks/ds_table/lock_pinger.cpp
index e746115b60e..4b8f67a597e 100644
--- a/ydb/services/bg_tasks/ds_table/lock_pinger.cpp
+++ b/ydb/services/bg_tasks/ds_table/lock_pinger.cpp
@@ -36,7 +36,7 @@ std::optional<NMetadata::NRequest::TDialogYQLRequest::TRequest> TLockPingerActor
}
void TLockPingerActor::OnResult(const NMetadata::NRequest::TDialogYQLRequest::TResponse& /*ev*/) {
- ExecutorController->LockPingerFinished();
+ ExecutorController->OnLockPingerFinished();
}
}
diff --git a/ydb/services/bg_tasks/ut/ut_tasks.cpp b/ydb/services/bg_tasks/ut/ut_tasks.cpp
index 5e603467fb1..36e02d125c9 100644
--- a/ydb/services/bg_tasks/ut/ut_tasks.cpp
+++ b/ydb/services/bg_tasks/ut/ut_tasks.cpp
@@ -23,7 +23,7 @@ Y_UNIT_TEST_SUITE(BGTaskTests) {
serverSettings.GrpcPort = grpcPort;
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
- .SetEnableMetadataProvider(false)
+ .SetEnableMetadataProvider(true)
.SetEnableBackgroundTasks(true)
.SetEnableOlapSchemaOperations(true);
;
diff --git a/ydb/services/metadata/abstract/common.h b/ydb/services/metadata/abstract/common.h
index 90998755b06..fb52622b8c3 100644
--- a/ydb/services/metadata/abstract/common.h
+++ b/ydb/services/metadata/abstract/common.h
@@ -37,6 +37,7 @@ enum EEvents {
EvPathExistsCheckFailed,
EvPathExistsCheckResult,
EvStartMetadataService,
+ EvStartRegistration,
EvEnd
};
diff --git a/ydb/services/metadata/abstract/fetcher.h b/ydb/services/metadata/abstract/fetcher.h
index dca8a0ab0ff..062259c2d59 100644
--- a/ydb/services/metadata/abstract/fetcher.h
+++ b/ydb/services/metadata/abstract/fetcher.h
@@ -14,6 +14,7 @@
#include <ydb/services/metadata/manager/common.h>
#include <ydb/services/metadata/manager/table_record.h>
#include <ydb/services/metadata/manager/alter.h>
+#include <util/system/type_name.h>
namespace NKikimr::NMetadata::NFetcher {
@@ -33,6 +34,24 @@ private:
protected:
virtual bool DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawData) = 0;
virtual TString DoSerializeToString() const = 0;
+
+ template <class TObject, class TActor>
+ bool ParseSnapshotObjects(const Ydb::ResultSet& rawData, const TActor& actor, const bool stopOnIncorrectDeserialization = false) {
+ typename TObject::TDecoder decoder(rawData);
+ for (auto&& r : rawData.rows()) {
+ TObject object;
+ if (!object.DeserializeFromRecord(decoder, r)) {
+ ALS_WARN(NKikimrServices::METADATA_PROVIDER) << "cannot parse object: " << TypeName<TObject>();
+ if (stopOnIncorrectDeserialization) {
+ return false;
+ } else {
+ continue;
+ }
+ }
+ actor(std::move(object));
+ }
+ return true;
+ }
public:
using TPtr = std::shared_ptr<ISnapshot>;
ISnapshot(const TInstant actuality)
diff --git a/ydb/services/metadata/abstract/kqp_common.cpp b/ydb/services/metadata/abstract/kqp_common.cpp
index 6dbaac450b5..bd192a9c7c1 100644
--- a/ydb/services/metadata/abstract/kqp_common.cpp
+++ b/ydb/services/metadata/abstract/kqp_common.cpp
@@ -26,13 +26,6 @@ NInitializer::IInitializationBehaviour::TPtr IClassBehaviour::GetInitializer() c
return Initializer;
}
-NModifications::IOperationsManager::TPtr IClassBehaviour::GetOperationsManager() const {
- if (!OperationsManager) {
- OperationsManager = ConstructOperationsManager();
- }
- return OperationsManager;
-}
-
TString IClassBehaviour::GetInternalStorageHistoryTablePath() const {
return GetInternalStorageTablePath() + "_history";
}
diff --git a/ydb/services/metadata/abstract/kqp_common.h b/ydb/services/metadata/abstract/kqp_common.h
index f4501a5bf34..e1ed9140490 100644
--- a/ydb/services/metadata/abstract/kqp_common.h
+++ b/ydb/services/metadata/abstract/kqp_common.h
@@ -17,19 +17,30 @@ public:
using TPtr = std::shared_ptr<IClassBehaviour>;
private:
mutable std::shared_ptr<NInitializer::IInitializationBehaviour> Initializer;
- mutable std::shared_ptr<NModifications::IOperationsManager> OperationsManager;
protected:
+ virtual std::shared_ptr<NInitializer::IInitializationBehaviour> ConstructInitializer() const = 0;
virtual TString GetInternalStorageTablePath() const = 0;
virtual TString GetInternalStorageHistoryTablePath() const;
- virtual std::shared_ptr<NInitializer::IInitializationBehaviour> ConstructInitializer() const = 0;
- virtual std::shared_ptr<NModifications::IOperationsManager> ConstructOperationsManager() const = 0;
public:
virtual ~IClassBehaviour() = default;
TString GetStorageTablePath() const;
TString GetStorageHistoryTablePath() const;
std::shared_ptr<NInitializer::IInitializationBehaviour> GetInitializer() const;
- std::shared_ptr<NModifications::IOperationsManager> GetOperationsManager() const;
+ virtual std::shared_ptr<NModifications::IOperationsManager> GetOperationsManager() const = 0;
virtual TString GetTypeId() const = 0;
};
+
+template <class TObject>
+class TClassBehaviour: public IClassBehaviour {
+private:
+protected:
+ virtual std::shared_ptr<NModifications::IOperationsManager> ConstructOperationsManager() const = 0;
+public:
+ virtual std::shared_ptr<NModifications::IOperationsManager> GetOperationsManager() const override final {
+ static std::shared_ptr<NModifications::IOperationsManager> manager = ConstructOperationsManager();
+ return manager;
+ }
+};
+
}
diff --git a/ydb/services/metadata/ds_table/CMakeLists.darwin.txt b/ydb/services/metadata/ds_table/CMakeLists.darwin.txt
index 34212fd99cc..e9eccb5be5c 100644
--- a/ydb/services/metadata/ds_table/CMakeLists.darwin.txt
+++ b/ydb/services/metadata/ds_table/CMakeLists.darwin.txt
@@ -25,8 +25,11 @@ target_sources(services-metadata-ds_table PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_subscribe.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_snapshot_simple.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_snapshot_base.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/behaviour_registrator_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/initializer_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/scheme_describe.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/table_exists.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/service.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/config.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/registration.cpp
)
diff --git a/ydb/services/metadata/ds_table/CMakeLists.linux-aarch64.txt b/ydb/services/metadata/ds_table/CMakeLists.linux-aarch64.txt
index 409457cc70b..0931f0b0c99 100644
--- a/ydb/services/metadata/ds_table/CMakeLists.linux-aarch64.txt
+++ b/ydb/services/metadata/ds_table/CMakeLists.linux-aarch64.txt
@@ -26,8 +26,11 @@ target_sources(services-metadata-ds_table PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_subscribe.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_snapshot_simple.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_snapshot_base.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/behaviour_registrator_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/initializer_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/scheme_describe.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/table_exists.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/service.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/config.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/registration.cpp
)
diff --git a/ydb/services/metadata/ds_table/CMakeLists.linux.txt b/ydb/services/metadata/ds_table/CMakeLists.linux.txt
index 409457cc70b..0931f0b0c99 100644
--- a/ydb/services/metadata/ds_table/CMakeLists.linux.txt
+++ b/ydb/services/metadata/ds_table/CMakeLists.linux.txt
@@ -26,8 +26,11 @@ target_sources(services-metadata-ds_table PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_subscribe.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_snapshot_simple.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_snapshot_base.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/behaviour_registrator_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/initializer_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/scheme_describe.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/table_exists.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/service.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/config.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/registration.cpp
)
diff --git a/ydb/services/metadata/ds_table/behaviour_registrator_actor.cpp b/ydb/services/metadata/ds_table/behaviour_registrator_actor.cpp
new file mode 100644
index 00000000000..4f1ef808ebe
--- /dev/null
+++ b/ydb/services/metadata/ds_table/behaviour_registrator_actor.cpp
@@ -0,0 +1,68 @@
+#include "behaviour_registrator_actor.h"
+
+#include "accessor_snapshot_simple.h"
+#include <ydb/services/metadata/initializer/accessor_init.h>
+
+namespace NKikimr::NMetadata::NProvider {
+
+class TBehaviourRegistrator::TInternalController: public NInitializer::IInitializerOutput, public ISchemeDescribeController {
+private:
+ const NActors::TActorIdentity ActorId;
+public:
+ TInternalController(const NActors::TActorIdentity& actorId)
+ : ActorId(actorId) {
+
+ }
+
+ virtual void OnInitializationFinished(const TString& id) const override {
+ ActorId.Send(ActorId, new NInitializer::TEvInitializationFinished(id));
+ }
+ virtual void OnDescriptionFailed(const TString& errorMessage, const TString& requestId) const override {
+ ActorId.Send(ActorId, new TEvTableDescriptionFailed(errorMessage, requestId));
+ }
+ virtual void OnDescriptionSuccess(THashMap<ui32, TSysTables::TTableColumnInfo>&& result, const TString& requestId) const override {
+ ActorId.Send(ActorId, new TEvTableDescriptionSuccess(std::move(result), requestId));
+ }
+};
+
+void TBehaviourRegistrator::Handle(TEvTableDescriptionSuccess::TPtr& ev) {
+ const TString& initId = Behaviour->GetTypeId();
+ Y_VERIFY(initId == ev->Get()->GetRequestId());
+ auto it = RegistrationData->InRegistration.find(initId);
+ Y_VERIFY(it != RegistrationData->InRegistration.end());
+ it->second->GetOperationsManager()->SetActualSchema(ev->Get()->GetSchema());
+ RegistrationData->InitializationFinished(initId);
+}
+
+void TBehaviourRegistrator::Handle(TEvTableDescriptionFailed::TPtr& ev) {
+ const TString& initId = Behaviour->GetTypeId();
+ Y_VERIFY(initId == ev->Get()->GetRequestId());
+ ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "metadata service cannot receive table description for " << initId << Endl;
+ Schedule(TDuration::Seconds(1), new TEvStartRegistration());
+}
+
+void TBehaviourRegistrator::Handle(TEvStartRegistration::TPtr& /*ev*/) {
+ Register(new NInitializer::TDSAccessorInitialized(ReqConfig,
+ Behaviour->GetTypeId(), Behaviour->GetInitializer(), InternalController, RegistrationData->GetInitializationSnapshot()));
+}
+
+void TBehaviourRegistrator::Handle(NInitializer::TEvInitializationFinished::TPtr& ev) {
+ const TString& initId = Behaviour->GetTypeId();
+ Y_VERIFY(initId == ev->Get()->GetInitializationId());
+
+ auto it = RegistrationData->InRegistration.find(initId);
+ Y_VERIFY(it != RegistrationData->InRegistration.end());
+ if (it->second->GetOperationsManager()) {
+ Register(new TSchemeDescriptionActor(InternalController, initId, it->second->GetStorageTablePath()));
+ } else {
+ RegistrationData->InitializationFinished(initId);
+ }
+}
+
+void TBehaviourRegistrator::Bootstrap() {
+ InternalController = std::make_shared<TInternalController>(SelfId());
+ TBase::Become(&TBehaviourRegistrator::StateMain);
+ TBase::Sender<TEvStartRegistration>().SendTo(SelfId());
+}
+
+}
diff --git a/ydb/services/metadata/ds_table/behaviour_registrator_actor.h b/ydb/services/metadata/ds_table/behaviour_registrator_actor.h
new file mode 100644
index 00000000000..ae98bda201d
--- /dev/null
+++ b/ydb/services/metadata/ds_table/behaviour_registrator_actor.h
@@ -0,0 +1,54 @@
+#pragma once
+#include "scheme_describe.h"
+#include "registration.h"
+
+#include <ydb/services/metadata/abstract/common.h>
+#include <ydb/services/metadata/abstract/kqp_common.h>
+#include <ydb/services/metadata/initializer/common.h>
+#include <ydb/services/metadata/initializer/events.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+
+namespace NKikimr::NMetadata::NProvider {
+
+class TEvStartRegistration: public TEventLocal<TEvStartRegistration, EEvents::EvStartRegistration> {
+};
+
+class TBehaviourRegistrator: public NActors::TActorBootstrapped<TBehaviourRegistrator> {
+private:
+ using TBase = NActors::TActorBootstrapped<TBehaviourRegistrator>;
+ class TInternalController;
+
+ IClassBehaviour::TPtr Behaviour;
+ std::shared_ptr<TRegistrationData> RegistrationData;
+ const NRequest::TConfig ReqConfig;
+ std::shared_ptr<TInternalController> InternalController;
+
+ void Handle(TEvTableDescriptionSuccess::TPtr& ev);
+ void Handle(TEvTableDescriptionFailed::TPtr& ev);
+ void Handle(TEvStartRegistration::TPtr& ev);
+ void Handle(NInitializer::TEvInitializationFinished::TPtr& ev);
+public:
+ TBehaviourRegistrator(IClassBehaviour::TPtr b, std::shared_ptr<TRegistrationData> registrationData, const NRequest::TConfig& reqConfig)
+ : Behaviour(b)
+ , RegistrationData(registrationData)
+ , ReqConfig(reqConfig) {
+
+ }
+
+ void Bootstrap();
+
+ STATEFN(StateMain) {
+ switch (ev->GetTypeRewrite()) {
+
+ hFunc(TEvTableDescriptionSuccess, Handle);
+ hFunc(TEvTableDescriptionFailed, Handle);
+
+ hFunc(NInitializer::TEvInitializationFinished, Handle);
+ hFunc(TEvStartRegistration, Handle);
+ default:
+ Y_VERIFY(false);
+ }
+ }
+};
+
+}
diff --git a/ydb/services/metadata/ds_table/initializer_actor.cpp b/ydb/services/metadata/ds_table/initializer_actor.cpp
new file mode 100644
index 00000000000..18f121b6559
--- /dev/null
+++ b/ydb/services/metadata/ds_table/initializer_actor.cpp
@@ -0,0 +1,29 @@
+#include "initializer_actor.h"
+
+namespace NKikimr::NMetadata::NProvider {
+
+void TInitializerSnapshotReader::Handle(TDSAccessorSimple::TEvController::TEvResult::TPtr& ev) {
+ RegistrationData->StartInitialization();
+ RegistrationData->SetInitializationSnapshot(ev->Get()->GetResult());
+}
+
+void TInitializerSnapshotReader::Handle(TEvStartMetadataService::TPtr& /*ev*/) {
+ Register(new TDSAccessorSimple(ReqConfig, InternalController, RegistrationData->GetInitializationFetcher()));
+}
+
+void TInitializerSnapshotReader::Handle(TDSAccessorSimple::TEvController::TEvError::TPtr& ev) {
+ ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "cannot receive initializer snapshot: " << ev->Get()->GetErrorMessage() << Endl;
+ Schedule(TDuration::Seconds(1), new TEvStartMetadataService());
+}
+
+void TInitializerSnapshotReader::Handle(TDSAccessorSimple::TEvController::TEvTableAbsent::TPtr& /*ev*/) {
+ RegistrationData->NoInitializationSnapshot();
+}
+
+void TInitializerSnapshotReader::Bootstrap() {
+ InternalController = std::make_shared<TInitializerSnapshotReaderController>(SelfId());
+ Become(&TInitializerSnapshotReader::StateMain);
+ Sender<TEvStartMetadataService>().SendTo(SelfId());
+}
+
+}
diff --git a/ydb/services/metadata/ds_table/initializer_actor.h b/ydb/services/metadata/ds_table/initializer_actor.h
new file mode 100644
index 00000000000..8181c9df566
--- /dev/null
+++ b/ydb/services/metadata/ds_table/initializer_actor.h
@@ -0,0 +1,59 @@
+#pragma once
+#include "accessor_snapshot_simple.h"
+#include "registration.h"
+
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <ydb/services/metadata/request/config.h>
+
+namespace NKikimr::NMetadata::NProvider {
+
+class TEvStartMetadataService: public TEventLocal<TEvStartMetadataService, EEvents::EvStartMetadataService> {
+};
+
+class TInitializerSnapshotReaderController: public TDSAccessorSimple::TEvController {
+private:
+ const NActors::TActorIdentity ActorId;
+public:
+ TInitializerSnapshotReaderController(const NActors::TActorIdentity& actorId)
+ : TDSAccessorSimple::TEvController(actorId)
+ , ActorId(actorId) {
+
+ }
+};
+
+class TInitializerSnapshotReader: public NActors::TActorBootstrapped<TInitializerSnapshotReader> {
+private:
+ std::shared_ptr<TRegistrationData> RegistrationData;
+ const NRequest::TConfig ReqConfig;
+ std::shared_ptr<TInitializerSnapshotReaderController> InternalController;
+
+ void Handle(TDSAccessorSimple::TEvController::TEvResult::TPtr& ev);
+ void Handle(TEvStartMetadataService::TPtr& ev);
+ void Handle(TDSAccessorSimple::TEvController::TEvError::TPtr& ev);
+ void Handle(TDSAccessorSimple::TEvController::TEvTableAbsent::TPtr& ev);
+
+public:
+ TInitializerSnapshotReader(std::shared_ptr<TRegistrationData> registrationData, const NRequest::TConfig& reqConfig)
+ : RegistrationData(registrationData)
+ , ReqConfig(reqConfig) {
+
+ }
+
+ void Bootstrap();
+
+ STATEFN(StateMain) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TDSAccessorSimple::TEvController::TEvResult, Handle);
+ hFunc(TDSAccessorSimple::TEvController::TEvError, Handle);
+ hFunc(TDSAccessorSimple::TEvController::TEvTableAbsent, Handle);
+
+ hFunc(TEvStartMetadataService, Handle);
+ default:
+ Y_VERIFY(false);
+ }
+ }
+
+};
+
+}
diff --git a/ydb/services/metadata/ds_table/registration.cpp b/ydb/services/metadata/ds_table/registration.cpp
new file mode 100644
index 00000000000..6912b125a69
--- /dev/null
+++ b/ydb/services/metadata/ds_table/registration.cpp
@@ -0,0 +1,97 @@
+#include "registration.h"
+
+namespace NKikimr::NMetadata::NProvider {
+
+bool TBehavioursId::operator<(const TBehavioursId& item) const {
+ if (BehaviourIds.size() < item.BehaviourIds.size()) {
+ return true;
+ } else if (BehaviourIds.size() > item.BehaviourIds.size()) {
+ return false;
+ } else {
+ auto itSelf = BehaviourIds.begin();
+ auto itItem = item.BehaviourIds.begin();
+ while (itSelf != BehaviourIds.end()) {
+ if (*itSelf < *itItem) {
+ return true;
+ }
+ ++itSelf;
+ ++itItem;
+ }
+ return false;
+ }
+}
+
+bool TBehavioursId::RemoveId(const TString& id) {
+ auto it = BehaviourIds.find(id);
+ if (it == BehaviourIds.end()) {
+ return false;
+ }
+ BehaviourIds.erase(it);
+ return true;
+}
+
+
+void TEventsCollector::Initialized(const TString& initId) {
+ std::map<TBehavioursId, TEventsWaiter> movedEvents;
+ for (auto it = Events.begin(); it != Events.end(); ) {
+ auto m = it->first;
+ if (!m.RemoveId(initId)) {
+ ++it;
+ continue;
+ }
+ if (m.IsEmpty()) {
+ it->second.ResendAll(OwnerId);
+ } else {
+ auto itNext = Events.find(m);
+ if (itNext == Events.end()) {
+ movedEvents.emplace(m, std::move(it->second));
+ } else {
+ itNext->second.Merge(std::move(it->second));
+ }
+ }
+ it = Events.erase(it);
+ }
+ for (auto&& i : movedEvents) {
+ Events.emplace(i.first, std::move(i.second));
+ }
+}
+
+
+void TRegistrationData::InitializationFinished(const TString& initId) {
+ auto it = InRegistration.find(initId);
+ Y_VERIFY(it != InRegistration.end());
+
+ Registered.emplace(initId, it->second);
+ InRegistration.erase(it);
+ EventsWaiting->Initialized(initId);
+}
+
+void TRegistrationData::SetInitializationSnapshot(NFetcher::ISnapshot::TPtr s) {
+ InitializationSnapshot = dynamic_pointer_cast<NInitializer::TSnapshot>(s);
+ Y_VERIFY(InitializationSnapshot);
+ if (Stage == EStage::WaitInitializerInfo) {
+ Stage = EStage::Active;
+ EventsWaiting->TryResendOne();
+ } else if (Stage == EStage::Active) {
+
+ } else if (Stage == EStage::Created) {
+ Y_VERIFY(false, "incorrect stage for method usage");
+ }
+}
+
+void TRegistrationData::StartInitialization() {
+ Y_VERIFY(Stage == EStage::Created);
+ Stage = EStage::WaitInitializerInfo;
+ EventsWaiting->GetOwnerId().Send(EventsWaiting->GetOwnerId(), new TEvSubscribeExternal(InitializationFetcher));
+}
+
+TRegistrationData::TRegistrationData() {
+ InitializationFetcher = std::make_shared<NInitializer::TFetcher>();
+}
+
+void TRegistrationData::NoInitializationSnapshot() {
+ InitializationSnapshot = std::make_shared<NInitializer::TSnapshot>(TInstant::Zero());
+ EventsWaiting->TryResendOne();
+}
+
+}
diff --git a/ydb/services/metadata/ds_table/registration.h b/ydb/services/metadata/ds_table/registration.h
new file mode 100644
index 00000000000..60f63ae2b04
--- /dev/null
+++ b/ydb/services/metadata/ds_table/registration.h
@@ -0,0 +1,143 @@
+#pragma once
+#include <ydb/services/metadata/abstract/kqp_common.h>
+#include <ydb/services/metadata/initializer/fetcher.h>
+#include <ydb/services/metadata/initializer/snapshot.h>
+
+namespace NKikimr::NMetadata::NProvider {
+
+class TBehavioursId {
+private:
+ YDB_READONLY_DEF(std::set<TString>, BehaviourIds);
+public:
+ TBehavioursId(const std::vector<IClassBehaviour::TPtr>& managers) {
+ for (auto&& i : managers) {
+ BehaviourIds.emplace(i->GetTypeId());
+ }
+ }
+
+ bool IsEmpty() const {
+ return BehaviourIds.empty();
+ }
+
+ bool RemoveId(const TString& id);
+
+ bool operator<(const TBehavioursId& item) const;
+};
+
+class TWaitEvent {
+private:
+ TAutoPtr<IEventBase> Event;
+ const TActorId Sender;
+public:
+ TWaitEvent(TAutoPtr<IEventBase> ev, const TActorId& sender)
+ : Event(ev)
+ , Sender(sender) {
+
+ }
+
+ void Resend(const TActorIdentity& receiver) {
+ TActivationContext::Send(new IEventHandle(receiver, Sender, Event.Release()));
+ }
+};
+
+class TEventsWaiter {
+private:
+ std::deque<TWaitEvent> Events;
+public:
+ void Add(TAutoPtr<IEventBase> ev, const TActorId& sender) {
+ if (Events.size() > 10000) {
+ ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "too many events for deferred sending (maybe service cannot start)";
+ return;
+ }
+ Events.emplace_back(ev, sender);
+ }
+
+ template <class T>
+ void Add(TEventHandle<T>& ev) {
+ Add(ev.ReleaseBase(), ev.Sender);
+ }
+
+ bool ResendOne(const TActorIdentity& receiver) {
+ if (Events.empty()) {
+ return false;
+ }
+ Events.front().Resend(receiver);
+ Events.pop_front();
+ return true;
+ }
+
+ void ResendAll(const TActorIdentity& receiver) {
+ while (ResendOne(receiver)) {
+ }
+ }
+
+ bool IsEmpty() const {
+ return Events.empty();
+ }
+
+ void Merge(TEventsWaiter&& source) {
+ Merge(std::move(source.Events));
+ }
+
+ void Merge(std::deque<TWaitEvent>&& source) {
+ for (auto&& i : source) {
+ Events.emplace_back(std::move(i));
+ }
+ }
+};
+
+class TEventsCollector {
+private:
+ const TActorIdentity OwnerId;
+ std::map<TBehavioursId, TEventsWaiter> Events;
+public:
+ TEventsCollector(const TActorIdentity& ownerId)
+ : OwnerId(ownerId) {
+
+ }
+
+ const TActorIdentity& GetOwnerId() const {
+ return OwnerId;
+ }
+
+ void Add(const TBehavioursId& id, TAutoPtr<IEventBase> ev, const TActorId& sender) {
+ Events[id].Add(ev, sender);
+ }
+
+ void TryResendOne() {
+ for (auto&& i : Events) {
+ i.second.ResendOne(OwnerId);
+ }
+ }
+
+ void Initialized(const TString& initId);
+};
+
+class TRegistrationData {
+public:
+ enum class EStage {
+ Created,
+ WaitInitializerInfo,
+ Active
+ };
+private:
+ YDB_READONLY(EStage, Stage, EStage::Created);
+ YDB_READONLY_DEF(std::shared_ptr<NInitializer::TSnapshot>, InitializationSnapshot);
+ YDB_READONLY_DEF(std::shared_ptr<NInitializer::TFetcher>, InitializationFetcher);
+public:
+ TRegistrationData();
+
+ void StartInitialization();
+
+ std::map<TString, IClassBehaviour::TPtr> InRegistration;
+ std::map<TString, IClassBehaviour::TPtr> Registered;
+ std::shared_ptr<TEventsCollector> EventsWaiting;
+
+ void SetInitializationSnapshot(NFetcher::ISnapshot::TPtr s);
+ void NoInitializationSnapshot();
+
+ void InitializationFinished(const TString& initId);
+
+};
+
+}
diff --git a/ydb/services/metadata/ds_table/scheme_describe.h b/ydb/services/metadata/ds_table/scheme_describe.h
index d8c8a87ef7f..3a57dc4d84d 100644
--- a/ydb/services/metadata/ds_table/scheme_describe.h
+++ b/ydb/services/metadata/ds_table/scheme_describe.h
@@ -13,6 +13,34 @@ public:
virtual void OnDescriptionSuccess(THashMap<ui32, TSysTables::TTableColumnInfo>&& result, const TString& requestId) const = 0;
};
+class TEvTableDescriptionFailed: public TEventLocal<TEvTableDescriptionFailed, EEvents::EvTableDescriptionFailed> {
+private:
+ YDB_READONLY_DEF(TString, ErrorMessage);
+ YDB_READONLY_DEF(TString, RequestId);
+public:
+ explicit TEvTableDescriptionFailed(const TString& errorMessage, const TString& reqId)
+ : ErrorMessage(errorMessage)
+ , RequestId(reqId) {
+
+ }
+};
+
+class TEvTableDescriptionSuccess: public TEventLocal<TEvTableDescriptionSuccess, EEvents::EvTableDescriptionSuccess> {
+private:
+ using TDescription = THashMap<ui32, TSysTables::TTableColumnInfo>;
+ YDB_READONLY_DEF(TString, RequestId);
+ YDB_READONLY_DEF(TDescription, Description);
+public:
+ TEvTableDescriptionSuccess(TDescription&& description, const TString& reqId)
+ : RequestId(reqId)
+ , Description(std::move(description)) {
+ }
+
+ NModifications::TTableSchema GetSchema() const {
+ return NModifications::TTableSchema(Description);
+ }
+};
+
class TSchemeDescriptionActor: public NActors::TActorBootstrapped<TSchemeDescriptionActor> {
private:
using TBase = NActors::TActorBootstrapped<TSchemeDescriptionActor>;
diff --git a/ydb/services/metadata/ds_table/service.cpp b/ydb/services/metadata/ds_table/service.cpp
index f327a4cd841..8530111f4b0 100644
--- a/ydb/services/metadata/ds_table/service.cpp
+++ b/ydb/services/metadata/ds_table/service.cpp
@@ -1,6 +1,8 @@
#include "service.h"
#include "accessor_subscribe.h"
+#include "behaviour_registrator_actor.h"
+#include "initializer_actor.h"
#include <ydb/core/base/appdata.h>
#include <ydb/core/grpc_services/local_rpc/local_rpc.h>
@@ -15,29 +17,34 @@ IActor* CreateService(const TConfig& config) {
}
void TService::PrepareManagers(std::vector<IClassBehaviour::TPtr> managers, TAutoPtr<IEventBase> ev, const NActors::TActorId& sender) {
- TManagersId id(managers);
- if (InitializationSnapshot) {
+ TBehavioursId id(managers);
+ if (RegistrationData->GetInitializationSnapshot()) {
const bool isInitialization = (managers.size() == 1) && managers.front()->GetTypeId() == NInitializer::TDBInitialization::GetTypeId();
- if (ActiveFlag || (PreparationFlag && isInitialization)) {
- for (auto&& manager : managers) {
- Y_VERIFY(!RegisteredManagers.contains(manager->GetTypeId()));
- if (!ManagersInRegistration.contains(manager->GetTypeId()) && !RegisteredManagers.contains(manager->GetTypeId())) {
- ManagersInRegistration.emplace(manager->GetTypeId(), manager);
- Register(new NInitializer::TDSAccessorInitialized(Config.GetRequestConfig(),
- manager->GetTypeId(), manager->GetInitializer(), InternalController, InitializationSnapshot));
+ switch (RegistrationData->GetStage()) {
+ case TRegistrationData::EStage::Created:
+ RegistrationData->StartInitialization();
+ break;
+ case TRegistrationData::EStage::WaitInitializerInfo:
+ if (!isInitialization) {
+ break;
}
- }
- } else if (!PreparationFlag) {
- PreparationFlag = true;
- Send(SelfId(), new TEvSubscribeExternal(InitializationFetcher));
+ case TRegistrationData::EStage::Active:
+ for (auto&& b : managers) {
+ Y_VERIFY(!RegistrationData->Registered.contains(b->GetTypeId()));
+ if (!RegistrationData->InRegistration.contains(b->GetTypeId()) && !RegistrationData->Registered.contains(b->GetTypeId())) {
+ RegistrationData->InRegistration.emplace(b->GetTypeId(), b);
+ RegisterWithSameMailbox(new TBehaviourRegistrator(b, RegistrationData, Config.GetRequestConfig()));
+ }
+ }
+ break;
}
}
- EventsWaiting[id].emplace_back(ev, sender);
+ RegistrationData->EventsWaiting->Add(id, ev, sender);
}
void TService::Handle(TEvPrepareManager::TPtr& ev) {
- auto it = RegisteredManagers.find(ev->Get()->GetManager()->GetTypeId());
- if (it != RegisteredManagers.end()) {
+ auto it = RegistrationData->Registered.find(ev->Get()->GetManager()->GetTypeId());
+ if (it != RegistrationData->Registered.end()) {
Send(ev->Sender, new TEvManagerPrepared(it->second));
} else {
auto m = ev->Get()->GetManager();
@@ -45,67 +52,6 @@ void TService::Handle(TEvPrepareManager::TPtr& ev) {
}
}
-void TService::InitializationFinished(const TString& initId) {
- auto it = ManagersInRegistration.find(initId);
- Y_VERIFY(it != ManagersInRegistration.end());
-
- RegisteredManagers.emplace(initId, it->second);
- ManagersInRegistration.erase(it);
-
- std::map<TManagersId, std::deque<TWaitEvent>> movedEvents;
- for (auto it = EventsWaiting.begin(); it != EventsWaiting.end(); ) {
- auto m = it->first;
- if (!m.RemoveId(initId)) {
- ++it;
- continue;
- }
- if (m.IsEmpty()) {
- for (auto&& i : it->second) {
- i.Resend(SelfId());
- }
- } else {
- auto itNext = EventsWaiting.find(m);
- if (itNext == EventsWaiting.end()) {
- movedEvents.emplace(m, std::move(it->second));
- } else {
- for (auto&& i : it->second) {
- itNext->second.emplace_back(std::move(i));
- }
- }
- }
- it = EventsWaiting.erase(it);
- }
- for (auto&& i : movedEvents) {
- EventsWaiting.emplace(i.first, std::move(i.second));
- }
-}
-
-void TService::Handle(TEvTableDescriptionSuccess::TPtr& ev) {
- const TString& initId = ev->Get()->GetRequestId();
- auto it = ManagersInRegistration.find(initId);
- Y_VERIFY(it != ManagersInRegistration.end());
- it->second->GetOperationsManager()->SetActualSchema(ev->Get()->GetSchema());
- InitializationFinished(initId);
-}
-
-void TService::Handle(TEvTableDescriptionFailed::TPtr& ev) {
- const TString& initId = ev->Get()->GetRequestId();
- ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "metadata service cannot receive table description for " << initId << Endl;
- Schedule(TDuration::Seconds(1), new NInitializer::TEvInitializationFinished(initId));
-}
-
-void TService::Handle(NInitializer::TEvInitializationFinished::TPtr& ev) {
- const TString& initId = ev->Get()->GetInitializationId();
-
- auto it = ManagersInRegistration.find(initId);
- Y_VERIFY(it != ManagersInRegistration.end());
- if (it->second->GetOperationsManager()) {
- Register(new TSchemeDescriptionActor(InternalController, initId, it->second->GetStorageTablePath()));
- } else {
- InitializationFinished(initId);
- }
-}
-
void TService::Handle(TEvSubscribeExternal::TPtr& ev) {
const TActorId senderId = ev->Sender;
ProcessEventWithFetcher(*ev, ev->Get()->GetFetcher(), [this, senderId](const TActorId& actorId) {
@@ -121,17 +67,17 @@ void TService::Handle(TEvAskSnapshot::TPtr& ev) {
}
void TService::Handle(TEvObjectsOperation::TPtr& ev) {
- if (ev->Get()->GetCommand()->GetManager()->GetTypeId() == NInitializer::TDBInitialization::GetTypeId()) {
- ev->Get()->GetCommand()->SetManager(NInitializer::TDBInitialization::GetBehaviour());
- ev->Get()->GetCommand()->Execute();
+ auto command = ev->Get()->GetCommand();
+ if (command->GetBehaviour()->GetTypeId() == NInitializer::TDBInitialization::GetTypeId()) {
+ command->SetBehaviour(NInitializer::TDBInitialization::GetBehaviour());
+ command->Execute();
} else {
- auto it = RegisteredManagers.find(ev->Get()->GetCommand()->GetManager()->GetTypeId());
- if (it != RegisteredManagers.end()) {
- ev->Get()->GetCommand()->SetManager(it->second);
- ev->Get()->GetCommand()->Execute();
+ auto it = RegistrationData->Registered.find(command->GetBehaviour()->GetTypeId());
+ if (it != RegistrationData->Registered.end()) {
+ command->Execute();
} else {
- auto m = ev->Get()->GetCommand()->GetManager();
- PrepareManagers({ m }, ev->ReleaseBase(), ev->Sender);
+ auto b = command->GetBehaviour();
+ PrepareManagers({ b }, ev->ReleaseBase(), ev->Sender);
}
}
}
@@ -144,60 +90,15 @@ void TService::Handle(TEvUnsubscribeExternal::TPtr& ev) {
}
void TService::Handle(TEvRefreshSubscriberData::TPtr& ev) {
- auto s = ev->Get()->GetSnapshot();
- InitializationSnapshot = dynamic_pointer_cast<NInitializer::TSnapshot>(s);
- Y_VERIFY(InitializationSnapshot);
- if (!ActiveFlag) {
- ActiveFlag = true;
- Activate();
- }
-}
-
-void TService::Activate() {
- for (auto&& i : EventsWaiting) {
- i.second.front().Resend(SelfId());
- i.second.pop_front();
- }
-}
-
-void TService::Handle(TDSAccessorSimple::TEvController::TEvResult::TPtr& ev) {
- InitializationSnapshot = dynamic_pointer_cast<NInitializer::TSnapshot>(ev->Get()->GetResult());
- Y_VERIFY(InitializationSnapshot);
- Activate();
-}
-
-void TService::Handle(TEvStartMetadataService::TPtr& /*ev*/) {
- Register(new TDSAccessorSimple(Config.GetRequestConfig(), InternalController, InitializationFetcher));
-}
-
-void TService::Handle(TDSAccessorSimple::TEvController::TEvError::TPtr& ev) {
- ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "cannot receive initializer snapshot: " << ev->Get()->GetErrorMessage() << Endl;
- Schedule(TDuration::Seconds(1), new TEvStartMetadataService());
-}
-
-void TService::Handle(TDSAccessorSimple::TEvController::TEvTableAbsent::TPtr& /*ev*/) {
- InitializationSnapshot = std::make_shared<NInitializer::TSnapshot>(TInstant::Zero());
- Activate();
+ RegistrationData->SetInitializationSnapshot(ev->Get()->GetSnapshot());
}
void TService::Bootstrap(const NActors::TActorContext& /*ctx*/) {
+ RegistrationData->EventsWaiting = std::make_shared<TEventsCollector>(SelfId());
ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "metadata service started" << Endl;
Become(&TService::StateMain);
- InternalController = std::make_shared<TServiceInternalController>(SelfId());
- InitializationFetcher = std::make_shared<NInitializer::TFetcher>();
- Sender<TEvStartMetadataService>().SendTo(SelfId());
+ RegisterWithSameMailbox(new TInitializerSnapshotReader(RegistrationData, Config.GetRequestConfig()));
}
-void TServiceInternalController::InitializationFinished(const TString& id) const {
- ActorId.Send(ActorId, new NInitializer::TEvInitializationFinished(id));
-}
-
-void TServiceInternalController::OnDescriptionFailed(const TString& errorMessage, const TString& requestId) const {
- ActorId.Send(ActorId, new TEvTableDescriptionFailed(errorMessage, requestId));
-}
-
-void TServiceInternalController::OnDescriptionSuccess(THashMap<ui32, TSysTables::TTableColumnInfo>&& result, const TString& requestId) const {
- ActorId.Send(ActorId, new TEvTableDescriptionSuccess(std::move(result), requestId));
-}
}
diff --git a/ydb/services/metadata/ds_table/service.h b/ydb/services/metadata/ds_table/service.h
index a22597d4a41..9c0746c0092 100644
--- a/ydb/services/metadata/ds_table/service.h
+++ b/ydb/services/metadata/ds_table/service.h
@@ -3,6 +3,7 @@
#include "config.h"
#include "scheme_describe.h"
#include "accessor_snapshot_simple.h"
+#include "registration.h"
#include <ydb/services/metadata/service.h>
#include <ydb/services/metadata/initializer/common.h>
@@ -16,157 +17,28 @@
namespace NKikimr::NMetadata::NProvider {
-class TEvStartMetadataService: public TEventLocal<TEvStartMetadataService, EEvents::EvStartMetadataService> {
-};
-
-class TEvTableDescriptionFailed: public TEventLocal<TEvTableDescriptionFailed, EEvents::EvTableDescriptionFailed> {
-private:
- YDB_READONLY_DEF(TString, ErrorMessage);
- YDB_READONLY_DEF(TString, RequestId);
-public:
- explicit TEvTableDescriptionFailed(const TString& errorMessage, const TString& reqId)
- : ErrorMessage(errorMessage)
- , RequestId(reqId) {
-
- }
-};
-
-class TEvTableDescriptionSuccess: public TEventLocal<TEvTableDescriptionSuccess, EEvents::EvTableDescriptionSuccess> {
-private:
- using TDescription = THashMap<ui32, TSysTables::TTableColumnInfo>;
- YDB_READONLY_DEF(TString, RequestId);
- YDB_READONLY_DEF(TDescription, Description);
-public:
- TEvTableDescriptionSuccess(TDescription&& description, const TString& reqId)
- : RequestId(reqId)
- , Description(std::move(description))
- {
- }
-
- NModifications::TTableSchema GetSchema() const {
- return NModifications::TTableSchema(Description);
- }
-};
-
-class TServiceInternalController: public NInitializer::IInitializerOutput,
- public TDSAccessorSimple::TEvController, public ISchemeDescribeController {
-private:
- const NActors::TActorIdentity ActorId;
-public:
- TServiceInternalController(const NActors::TActorIdentity& actorId)
- : TDSAccessorSimple::TEvController(actorId)
- , ActorId(actorId)
- {
-
- }
-
- virtual void InitializationFinished(const TString& id) const override;
-
- virtual void OnDescriptionFailed(const TString& errorMessage, const TString& requestId) const override;
- virtual void OnDescriptionSuccess(THashMap<ui32, TSysTables::TTableColumnInfo>&& result, const TString& requestId) const override;
-};
-
-class TManagersId {
-private:
- YDB_READONLY_DEF(std::set<TString>, ManagerIds);
-public:
- TManagersId(const std::vector<IClassBehaviour::TPtr>& managers) {
- for (auto&& i : managers) {
- ManagerIds.emplace(i->GetTypeId());
- }
- }
-
- bool IsEmpty() const {
- return ManagerIds.empty();
- }
-
- bool RemoveId(const TString& id) {
- auto it = ManagerIds.find(id);
- if (it == ManagerIds.end()) {
- return false;
- }
- ManagerIds.erase(it);
- return true;
- }
-
- bool operator<(const TManagersId& item) const {
- if (ManagerIds.size() < item.ManagerIds.size()) {
- return true;
- } else if (ManagerIds.size() > item.ManagerIds.size()) {
- return false;
- } else {
- auto itSelf = ManagerIds.begin();
- auto itItem = item.ManagerIds.begin();
- while (itSelf != ManagerIds.end()) {
- if (*itSelf < *itItem) {
- return true;
- }
- ++itSelf;
- ++itItem;
- }
- return false;
- }
- }
-};
-
-class TWaitEvent {
-private:
- TAutoPtr<IEventBase> Event;
- const TActorId Sender;
-public:
- TWaitEvent(TAutoPtr<IEventBase> ev, const TActorId& sender)
- : Event(ev)
- , Sender(sender)
- {
-
- }
-
- void Resend(const TActorIdentity& receiver) {
- TActivationContext::Send(new IEventHandle(receiver, Sender, Event.Release()));
- }
-};
-
class TService: public NActors::TActorBootstrapped<TService> {
private:
using TBase = NActors::TActor<TService>;
- bool ActiveFlag = false;
- bool PreparationFlag = false;
std::map<TString, NActors::TActorId> Accessors;
- std::map<TManagersId, std::deque<TWaitEvent>> EventsWaiting;
- std::map<TString, IClassBehaviour::TPtr> ManagersInRegistration;
- std::map<TString, IClassBehaviour::TPtr> RegisteredManagers;
-
- std::shared_ptr<NInitializer::TFetcher> InitializationFetcher;
- std::shared_ptr<NInitializer::TSnapshot> InitializationSnapshot;
- std::shared_ptr<TServiceInternalController> InternalController;
+ std::shared_ptr<TRegistrationData> RegistrationData = std::make_shared<TRegistrationData>();
const TConfig Config;
- void Handle(TEvStartMetadataService::TPtr& ev);
- void Handle(NInitializer::TEvInitializationFinished::TPtr & ev);
void Handle(TEvRefreshSubscriberData::TPtr& ev);
-
void Handle(TEvAskSnapshot::TPtr& ev);
void Handle(TEvPrepareManager::TPtr& ev);
void Handle(TEvSubscribeExternal::TPtr& ev);
void Handle(TEvUnsubscribeExternal::TPtr& ev);
void Handle(TEvObjectsOperation::TPtr& ev);
- void Handle(TEvTableDescriptionSuccess::TPtr& ev);
- void Handle(TEvTableDescriptionFailed::TPtr& ev);
-
- void Handle(TDSAccessorSimple::TEvController::TEvResult::TPtr& ev);
- void Handle(TDSAccessorSimple::TEvController::TEvError::TPtr& ev);
- void Handle(TDSAccessorSimple::TEvController::TEvTableAbsent::TPtr& ev);
-
- void PrepareManagers(std::vector<IClassBehaviour::TPtr> manager, TAutoPtr<IEventBase> ev, const NActors::TActorId& sender);
- void InitializationFinished(const TString& initId);
+ void PrepareManagers(std::vector<IClassBehaviour::TPtr> managers, TAutoPtr<IEventBase> ev, const NActors::TActorId& sender);
void Activate();
template <class TAction>
void ProcessEventWithFetcher(IEventHandle& ev, NFetcher::ISnapshotsFetcher::TPtr fetcher, TAction action) {
std::vector<IClassBehaviour::TPtr> needManagers;
for (auto&& i : fetcher->GetManagers()) {
- if (!RegisteredManagers.contains(i->GetTypeId())) {
+ if (!RegistrationData->Registered.contains(i->GetTypeId())) {
needManagers.emplace_back(i);
}
}
@@ -188,10 +60,6 @@ public:
STATEFN(StateMain) {
switch (ev->GetTypeRewrite()) {
- hFunc(TDSAccessorSimple::TEvController::TEvResult, Handle);
- hFunc(TDSAccessorSimple::TEvController::TEvError, Handle);
- hFunc(TDSAccessorSimple::TEvController::TEvTableAbsent, Handle);
-
hFunc(TEvObjectsOperation, Handle);
hFunc(TEvRefreshSubscriberData, Handle);
hFunc(TEvAskSnapshot, Handle);
@@ -199,20 +67,13 @@ public:
hFunc(TEvSubscribeExternal, Handle);
hFunc(TEvUnsubscribeExternal, Handle);
- hFunc(TEvTableDescriptionSuccess, Handle);
- hFunc(TEvTableDescriptionFailed, Handle);
-
- hFunc(TEvStartMetadataService, Handle);
-
- hFunc(NInitializer::TEvInitializationFinished, Handle);
default:
Y_VERIFY(false);
}
}
TService(const TConfig& config)
- : Config(config)
- {
+ : Config(config) {
TServiceOperator::Register(Config);
}
};
diff --git a/ydb/services/metadata/initializer/accessor_init.cpp b/ydb/services/metadata/initializer/accessor_init.cpp
index 9ebe20be327..cab2f2d9492 100644
--- a/ydb/services/metadata/initializer/accessor_init.cpp
+++ b/ydb/services/metadata/initializer/accessor_init.cpp
@@ -34,7 +34,7 @@ void TDSAccessorInitialized::Handle(TEvInitializerPreparationFinished::TPtr& ev)
Modifiers.front()->Execute(SelfId(), Config);
} else {
ALS_INFO(NKikimrServices::METADATA_INITIALIZER) << "initialization finished";
- ExternalController->InitializationFinished(ComponentId);
+ ExternalController->OnInitializationFinished(ComponentId);
}
}
@@ -49,7 +49,7 @@ void TDSAccessorInitialized::DoNextModifier() {
Modifiers.front()->Execute(SelfId(), Config);
} else {
ALS_INFO(NKikimrServices::METADATA_INITIALIZER) << "initialization finished";
- ExternalController->InitializationFinished(ComponentId);
+ ExternalController->OnInitializationFinished(ComponentId);
}
}
diff --git a/ydb/services/metadata/initializer/behaviour.h b/ydb/services/metadata/initializer/behaviour.h
index 9bfc5711e53..1947b7c85a2 100644
--- a/ydb/services/metadata/initializer/behaviour.h
+++ b/ydb/services/metadata/initializer/behaviour.h
@@ -1,12 +1,13 @@
#pragma once
+#include "object.h"
#include <ydb/services/metadata/abstract/initialization.h>
#include <ydb/services/metadata/abstract/kqp_common.h>
#include <ydb/services/metadata/manager/common.h>
namespace NKikimr::NMetadata::NInitializer {
-class TDBObjectBehaviour: public IClassBehaviour {
+class TDBObjectBehaviour: public TClassBehaviour<TDBInitialization> {
protected:
virtual IInitializationBehaviour::TPtr ConstructInitializer() const override;
virtual std::shared_ptr<NModifications::IOperationsManager> ConstructOperationsManager() const override;
diff --git a/ydb/services/metadata/initializer/common.h b/ydb/services/metadata/initializer/common.h
index 4ff8f991693..b5e6206901c 100644
--- a/ydb/services/metadata/initializer/common.h
+++ b/ydb/services/metadata/initializer/common.h
@@ -69,15 +69,15 @@ public:
class IInitializerInput {
public:
using TPtr = std::shared_ptr<IInitializerInput>;
- virtual void PreparationFinished(const TVector<ITableModifier::TPtr>& modifiers) const = 0;
- virtual void PreparationProblem(const TString& errorMessage) const = 0;
+ virtual void OnPreparationFinished(const TVector<ITableModifier::TPtr>& modifiers) const = 0;
+ virtual void OnPreparationProblem(const TString& errorMessage) const = 0;
virtual ~IInitializerInput() = default;
};
class IInitializerOutput {
public:
using TPtr = std::shared_ptr<IInitializerOutput>;
- virtual void InitializationFinished(const TString& id) const = 0;
+ virtual void OnInitializationFinished(const TString& id) const = 0;
virtual ~IInitializerOutput() = default;
};
diff --git a/ydb/services/metadata/initializer/controller.cpp b/ydb/services/metadata/initializer/controller.cpp
index 1aaadb1676a..b3cfd8e64ff 100644
--- a/ydb/services/metadata/initializer/controller.cpp
+++ b/ydb/services/metadata/initializer/controller.cpp
@@ -5,23 +5,23 @@
namespace NKikimr::NMetadata::NInitializer {
-void TInitializerInput::PreparationFinished(const TVector<ITableModifier::TPtr>& modifiers) const {
+void TInitializerInput::OnPreparationFinished(const TVector<ITableModifier::TPtr>& modifiers) const {
ActorId.Send(ActorId, new TEvInitializerPreparationFinished(modifiers));
}
-void TInitializerInput::PreparationProblem(const TString& errorMessage) const {
+void TInitializerInput::OnPreparationProblem(const TString& errorMessage) const {
ActorId.Send(ActorId, new TEvInitializerPreparationProblem(errorMessage));
}
-void TInitializerInput::AlterProblem(const TString& errorMessage) {
+void TInitializerInput::OnAlteringProblem(const TString& errorMessage) {
ActorId.Send(ActorId, new NModifications::TEvModificationProblem(errorMessage));
}
-void TInitializerInput::AlterFinished() {
+void TInitializerInput::OnAlteringFinished() {
ActorId.Send(ActorId, new NModifications::TEvModificationFinished());
}
-void TInitializerOutput::InitializationFinished(const TString& id) const {
+void TInitializerOutput::OnInitializationFinished(const TString& id) const {
ActorId.Send(ActorId, new TEvInitializationFinished(id));
}
diff --git a/ydb/services/metadata/initializer/controller.h b/ydb/services/metadata/initializer/controller.h
index 528b46847c0..062a20b464e 100644
--- a/ydb/services/metadata/initializer/controller.h
+++ b/ydb/services/metadata/initializer/controller.h
@@ -15,10 +15,10 @@ public:
}
- virtual void AlterProblem(const TString& errorMessage) override;
- virtual void AlterFinished() override;
- virtual void PreparationFinished(const TVector<ITableModifier::TPtr>& modifiers) const override;
- virtual void PreparationProblem(const TString& errorMessage) const override;
+ virtual void OnAlteringProblem(const TString& errorMessage) override;
+ virtual void OnAlteringFinished() override;
+ virtual void OnPreparationFinished(const TVector<ITableModifier::TPtr>& modifiers) const override;
+ virtual void OnPreparationProblem(const TString& errorMessage) const override;
};
class TInitializerOutput: public IInitializerOutput {
@@ -30,7 +30,7 @@ public:
}
- virtual void InitializationFinished(const TString& id) const override;
+ virtual void OnInitializationFinished(const TString& id) const override;
};
}
diff --git a/ydb/services/metadata/initializer/initializer.cpp b/ydb/services/metadata/initializer/initializer.cpp
index a5b57890257..26905a30e54 100644
--- a/ydb/services/metadata/initializer/initializer.cpp
+++ b/ydb/services/metadata/initializer/initializer.cpp
@@ -28,7 +28,7 @@ void TInitializer::DoPrepare(IInitializerInput::TPtr controller) const {
result.emplace_back(new TGenericTableModifier<NRequest::TDialogCreateTable>(request, "create"));
}
result.emplace_back(TACLModifierConstructor::GetReadOnlyModifier(TDBInitialization::GetBehaviour()->GetStorageTablePath(), "acl"));
- controller->PreparationFinished(result);
+ controller->OnPreparationFinished(result);
}
}
diff --git a/ydb/services/metadata/initializer/manager.cpp b/ydb/services/metadata/initializer/manager.cpp
index 55859c8b2fc..079e78b216f 100644
--- a/ydb/services/metadata/initializer/manager.cpp
+++ b/ydb/services/metadata/initializer/manager.cpp
@@ -7,7 +7,7 @@ void TManager::DoPrepareObjectsBeforeModification(std::vector<TDBInitialization>
NModifications::IAlterPreparationController<TDBInitialization>::TPtr controller,
const TModificationContext& /*context*/) const
{
- controller->PreparationFinished(std::move(objects));
+ controller->OnPreparationFinished(std::move(objects));
}
NModifications::TOperationParsingResult TManager::DoBuildPatchFromSettings(
diff --git a/ydb/services/metadata/initializer/snapshot.cpp b/ydb/services/metadata/initializer/snapshot.cpp
index 9560ba30daf..60eb064d5a4 100644
--- a/ydb/services/metadata/initializer/snapshot.cpp
+++ b/ydb/services/metadata/initializer/snapshot.cpp
@@ -27,4 +27,13 @@ TString TSnapshot::DoSerializeToString() const {
return sb;
}
+bool TSnapshot::HasComponent(const TString& componentId) const {
+ for (auto&& i : Objects) {
+ if (i.first.GetComponentId() == componentId) {
+ return true;
+ }
+ }
+ return false;
+}
+
}
diff --git a/ydb/services/metadata/initializer/snapshot.h b/ydb/services/metadata/initializer/snapshot.h
index 98193131cc1..b4953955aad 100644
--- a/ydb/services/metadata/initializer/snapshot.h
+++ b/ydb/services/metadata/initializer/snapshot.h
@@ -15,6 +15,8 @@ protected:
virtual bool DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawData) override;
virtual TString DoSerializeToString() const override;
public:
+ bool HasComponent(const TString& componentId) const;
+
using TBase::TBase;
};
diff --git a/ydb/services/metadata/initializer/ut/ut_init.cpp b/ydb/services/metadata/initializer/ut/ut_init.cpp
index 16ea1a4be10..122c2214198 100644
--- a/ydb/services/metadata/initializer/ut/ut_init.cpp
+++ b/ydb/services/metadata/initializer/ut/ut_init.cpp
@@ -50,7 +50,7 @@ Y_UNIT_TEST_SUITE(Initializer) {
result.emplace_back(new NMetadata::NInitializer::TGenericTableModifier<NMetadata::NRequest::TDialogCreateTable>(request, "create"));
}
result.emplace_back(NMetadata::NInitializer::TACLModifierConstructor::GetReadOnlyModifier(tablePath, "acl"));
- controller->PreparationFinished(result);
+ controller->OnPreparationFinished(result);
}
public:
};
@@ -63,7 +63,7 @@ Y_UNIT_TEST_SUITE(Initializer) {
virtual std::shared_ptr<NMetadata::NInitializer::IInitializationBehaviour> ConstructInitializer() const override {
return std::make_shared<TTestInitializer>();
}
- virtual std::shared_ptr<NMetadata::NModifications::IOperationsManager> ConstructOperationsManager() const override {
+ virtual std::shared_ptr<NMetadata::NModifications::IOperationsManager> GetOperationsManager() const override {
return nullptr;
}
public:
@@ -71,7 +71,7 @@ Y_UNIT_TEST_SUITE(Initializer) {
return TypeName<TInitBehaviourTest>();
}
- static IClassBehaviour::TPtr GetInstant() {
+ static IClassBehaviour::TPtr GetInstance() {
static std::shared_ptr<TInitBehaviourTest> result = std::make_shared<TInitBehaviourTest>();
return result;
}
@@ -97,7 +97,7 @@ Y_UNIT_TEST_SUITE(Initializer) {
void Bootstrap() {
Become(&TThis::StateWork);
- Sender<NMetadata::NProvider::TEvPrepareManager>(TInitBehaviourTest::GetInstant()).
+ Sender<NMetadata::NProvider::TEvPrepareManager>(TInitBehaviourTest::GetInstance()).
SendTo(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()));
}
};
diff --git a/ydb/services/metadata/manager/abstract.h b/ydb/services/metadata/manager/abstract.h
index 5016eb1d84a..90b21eb213e 100644
--- a/ydb/services/metadata/manager/abstract.h
+++ b/ydb/services/metadata/manager/abstract.h
@@ -194,7 +194,7 @@ public:
class IAlterCommand {
private:
YDB_READONLY_DEF(std::vector<NInternal::TTableRecord>, Records);
- YDB_ACCESSOR_DEF(IClassBehaviour::TPtr, Manager);
+ YDB_ACCESSOR_DEF(IClassBehaviour::TPtr, Behaviour);
YDB_READONLY_DEF(IAlterController::TPtr, Controller);
protected:
mutable IOperationsManager::TModificationContext Context;
@@ -205,7 +205,7 @@ public:
template <class TObject>
std::shared_ptr<IObjectOperationsManager<TObject>> GetOperationsManagerFor() const {
- auto result = std::dynamic_pointer_cast<IObjectOperationsManager<TObject>>(Manager->GetOperationsManager());
+ auto result = std::dynamic_pointer_cast<IObjectOperationsManager<TObject>>(Behaviour->GetOperationsManager());
Y_VERIFY(result);
return result;
}
@@ -215,29 +215,37 @@ public:
}
IAlterCommand(const std::vector<NInternal::TTableRecord>& records,
- IClassBehaviour::TPtr manager,
+ IClassBehaviour::TPtr behaviour,
NModifications::IAlterController::TPtr controller,
const IOperationsManager::TModificationContext& context)
: Records(records)
- , Manager(manager)
+ , Behaviour(behaviour)
, Controller(controller)
, Context(context) {
- Y_VERIFY(Manager->GetOperationsManager());
+ Y_VERIFY(Behaviour->GetOperationsManager());
}
IAlterCommand(const NInternal::TTableRecord& record,
- IClassBehaviour::TPtr manager,
+ IClassBehaviour::TPtr behaviour,
NModifications::IAlterController::TPtr controller,
const IOperationsManager::TModificationContext& context)
- : Manager(manager)
+ : Behaviour(behaviour)
, Controller(controller)
, Context(context) {
- Y_VERIFY(Manager->GetOperationsManager());
+ Y_VERIFY(Behaviour->GetOperationsManager());
Records.emplace_back(record);
}
void Execute() const {
+ if (!Behaviour) {
+ Controller->OnAlteringProblem("behaviour not ready");
+ return;
+ }
+ if (!Behaviour->GetOperationsManager()) {
+ Controller->OnAlteringProblem("behaviour's manager not initialized");
+ return;
+ }
DoExecute();
}
};
diff --git a/ydb/services/metadata/manager/alter.h b/ydb/services/metadata/manager/alter.h
index 191f0c6e7f5..57301997840 100644
--- a/ydb/services/metadata/manager/alter.h
+++ b/ydb/services/metadata/manager/alter.h
@@ -55,17 +55,17 @@ protected:
auto& first = TBase::Patches.front();
std::vector<Ydb::Column> columns = first.SelectOwnedColumns(Manager->GetSchema().GetPKColumns());
if (!columns.size()) {
- TBase::ExternalController->AlterProblem("no pk columns in patch");
+ TBase::ExternalController->OnAlteringProblem("no pk columns in patch");
return false;
}
if (columns.size() != Manager->GetSchema().GetPKColumns().size()) {
- TBase::ExternalController->AlterProblem("no columns for pk detection");
+ TBase::ExternalController->OnAlteringProblem("no columns for pk detection");
return false;
}
TBase::RestoreObjectIds.InitColumns(columns);
for (auto&& i : TBase::Patches) {
if (!TBase::RestoreObjectIds.AddRecordNativeValues(i)) {
- TBase::ExternalController->AlterProblem("incorrect pk columns");
+ TBase::ExternalController->OnAlteringProblem("incorrect pk columns");
return false;
}
}
diff --git a/ydb/services/metadata/manager/alter_impl.h b/ydb/services/metadata/manager/alter_impl.h
index ffc3564601e..685958f8829 100644
--- a/ydb/services/metadata/manager/alter_impl.h
+++ b/ydb/services/metadata/manager/alter_impl.h
@@ -24,22 +24,22 @@ public:
}
- virtual void RestoreFinished(std::vector<TObject>&& objects, const TString& transactionId) override {
+ virtual void OnRestoringFinished(std::vector<TObject>&& objects, const TString& transactionId) override {
ActorId.Send(ActorId, new TEvRestoreFinished<TObject>(std::move(objects), transactionId));
}
- virtual void RestoreProblem(const TString& errorMessage) override {
+ virtual void OnRestoringProblem(const TString& errorMessage) override {
ActorId.Send(ActorId, new TEvRestoreProblem(errorMessage));
}
- virtual void ModificationFinished() override {
+ virtual void OnModificationFinished() override {
ActorId.Send(ActorId, new TEvModificationFinished());
}
- virtual void ModificationProblem(const TString& errorMessage) override {
+ virtual void OnModificationProblem(const TString& errorMessage) override {
ActorId.Send(ActorId, new TEvModificationProblem(errorMessage));
}
- virtual void PreparationProblem(const TString& errorMessage) override {
+ virtual void OnPreparationProblem(const TString& errorMessage) override {
ActorId.Send(ActorId, new TEvAlterPreparationProblem(errorMessage));
}
- virtual void PreparationFinished(std::vector<TObject>&& objects) override {
+ virtual void OnPreparationFinished(std::vector<TObject>&& objects) override {
ActorId.Send(ActorId, new TEvAlterPreparationFinished<TObject>(std::move(objects)));
}
@@ -119,7 +119,7 @@ public:
void Bootstrap() {
InitState();
if (!Patches.size()) {
- ExternalController->AlterProblem("no patches");
+ ExternalController->OnAlteringProblem("no patches");
return TBase::PassAway();
}
if (!BuildRestoreObjectIds()) {
@@ -158,29 +158,29 @@ public:
records.ReserveRows(ev->Get()->GetObjects().size());
for (auto&& i : ev->Get()->GetObjects()) {
if (!records.AddRecordNativeValues(i.SerializeToRecord())) {
- ExternalController->AlterProblem("unexpected serialization inconsistency");
+ ExternalController->OnAlteringProblem("unexpected serialization inconsistency");
return TBase::PassAway();
}
}
if (!ProcessPreparedObjects(std::move(records))) {
- ExternalController->AlterProblem("cannot process prepared objects");
+ ExternalController->OnAlteringProblem("cannot process prepared objects");
return TBase::PassAway();
}
}
void Handle(typename NRequest::TEvRequestFailed::TPtr& /*ev*/) {
auto g = TBase::PassAwayGuard();
- ExternalController->AlterProblem("cannot initialize session");
+ ExternalController->OnAlteringProblem("cannot initialize session");
}
void Handle(TEvAlterPreparationProblem::TPtr& ev) {
auto g = TBase::PassAwayGuard();
- ExternalController->AlterProblem("preparation problem: " + ev->Get()->GetErrorMessage());
+ ExternalController->OnAlteringProblem("preparation problem: " + ev->Get()->GetErrorMessage());
}
void Handle(TEvRestoreProblem::TPtr& ev) {
auto g = TBase::PassAwayGuard();
- ExternalController->AlterProblem("cannot restore objects: " + ev->Get()->GetErrorMessage());
+ ExternalController->OnAlteringProblem("cannot restore objects: " + ev->Get()->GetErrorMessage());
}
};
@@ -199,7 +199,7 @@ protected:
TBase::RestoreObjectIds.InitColumns(Manager->GetSchema().GetPKColumns());
for (auto&& i : TBase::Patches) {
if (!TBase::RestoreObjectIds.AddRecordNativeValues(i)) {
- TBase::ExternalController->AlterProblem("no pk columns in patch");
+ TBase::ExternalController->OnAlteringProblem("no pk columns in patch");
return false;
}
}
@@ -233,13 +233,13 @@ public:
}
TObject objectPatched;
if (!trPatch) {
- TBase::ExternalController->AlterProblem("cannot found patch for object");
+ TBase::ExternalController->OnAlteringProblem("cannot found patch for object");
return false;
} else if (!trObject.TakeValuesFrom(*trPatch)) {
- TBase::ExternalController->AlterProblem("cannot patch object");
+ TBase::ExternalController->OnAlteringProblem("cannot patch object");
return false;
} else if (!TObject::TDecoder::DeserializeFromRecord(objectPatched, trObject)) {
- TBase::ExternalController->AlterProblem("cannot parse object after patch");
+ TBase::ExternalController->OnAlteringProblem("cannot parse object after patch");
return false;
} else {
i = std::move(objectPatched);
@@ -256,7 +256,7 @@ public:
if (!found) {
TObject object;
if (!TObject::TDecoder::DeserializeFromRecord(object, p)) {
- TBase::ExternalController->AlterProblem("cannot parse new object");
+ TBase::ExternalController->OnAlteringProblem("cannot parse new object");
return false;
}
objects.emplace_back(std::move(object));
@@ -267,12 +267,12 @@ public:
void Handle(TEvModificationFinished::TPtr& /*ev*/) {
auto g = TBase::PassAwayGuard();
- TBase::ExternalController->AlterFinished();
+ TBase::ExternalController->OnAlteringFinished();
}
void Handle(TEvModificationProblem::TPtr& ev) {
auto g = TBase::PassAwayGuard();
- TBase::ExternalController->AlterProblem("cannot " + GetModificationType() + " objects: " + ev->Get()->GetErrorMessage());
+ TBase::ExternalController->OnAlteringProblem("cannot " + GetModificationType() + " objects: " + ev->Get()->GetErrorMessage());
}
};
diff --git a/ydb/services/metadata/manager/common.h b/ydb/services/metadata/manager/common.h
index c2dcbc3ea68..38848407d93 100644
--- a/ydb/services/metadata/manager/common.h
+++ b/ydb/services/metadata/manager/common.h
@@ -12,8 +12,8 @@ public:
using TPtr = std::shared_ptr<IAlterPreparationController>;
virtual ~IAlterPreparationController() = default;
- virtual void PreparationFinished(std::vector<TObject>&& objects) = 0;
- virtual void PreparationProblem(const TString& errorMessage) = 0;
+ virtual void OnPreparationFinished(std::vector<TObject>&& objects) = 0;
+ virtual void OnPreparationProblem(const TString& errorMessage) = 0;
};
class IAlterController {
@@ -21,8 +21,8 @@ public:
using TPtr = std::shared_ptr<IAlterController>;
virtual ~IAlterController() = default;
- virtual void AlterProblem(const TString& errorMessage) = 0;
- virtual void AlterFinished() = 0;
+ virtual void OnAlteringProblem(const TString& errorMessage) = 0;
+ virtual void OnAlteringFinished() = 0;
};
diff --git a/ydb/services/metadata/manager/generic_manager.h b/ydb/services/metadata/manager/generic_manager.h
index a88c34dceeb..9ee4fca2cc6 100644
--- a/ydb/services/metadata/manager/generic_manager.h
+++ b/ydb/services/metadata/manager/generic_manager.h
@@ -14,10 +14,10 @@ public:
}
- virtual void AlterProblem(const TString& errorMessage) override {
+ virtual void OnAlteringProblem(const TString& errorMessage) override {
Promise.SetValue(TObjectOperatorResult(false).SetErrorMessage(errorMessage));
}
- virtual void AlterFinished() override {
+ virtual void OnAlteringFinished() override {
Promise.SetValue(TObjectOperatorResult(true));
}
diff --git a/ydb/services/metadata/manager/modification.h b/ydb/services/metadata/manager/modification.h
index 7a85bdb4180..795826390d0 100644
--- a/ydb/services/metadata/manager/modification.h
+++ b/ydb/services/metadata/manager/modification.h
@@ -53,14 +53,14 @@ protected:
Requests.front(), SystemUserToken, TBase::SelfId()));
Requests.pop_front();
} else {
- Controller->ModificationFinished();
+ Controller->OnModificationFinished();
TBase::PassAway();
}
}
void Handle(NRequest::TEvRequestFailed::TPtr& ev) {
auto g = TBase::PassAwayGuard();
- Controller->ModificationProblem("cannot execute yql request for " + GetModifyType() +
+ Controller->OnModificationProblem("cannot execute yql request for " + GetModifyType() +
" objects: " + ev->Get()->GetErrorMessage());
}
diff --git a/ydb/services/metadata/manager/modification_controller.h b/ydb/services/metadata/manager/modification_controller.h
index 0df4a8c2acc..b44c3a3ad0d 100644
--- a/ydb/services/metadata/manager/modification_controller.h
+++ b/ydb/services/metadata/manager/modification_controller.h
@@ -8,8 +8,8 @@ class IModificationObjectsController {
public:
using TPtr = std::shared_ptr<IModificationObjectsController>;
virtual ~IModificationObjectsController() = default;
- virtual void ModificationProblem(const TString& errorMessage) = 0;
- virtual void ModificationFinished() = 0;
+ virtual void OnModificationProblem(const TString& errorMessage) = 0;
+ virtual void OnModificationFinished() = 0;
};
class TEvModificationFinished: public TEventLocal<TEvModificationFinished, EvModificationFinished> {
diff --git a/ydb/services/metadata/manager/restore.h b/ydb/services/metadata/manager/restore.h
index a50caf7de89..5a000669665 100644
--- a/ydb/services/metadata/manager/restore.h
+++ b/ydb/services/metadata/manager/restore.h
@@ -30,17 +30,17 @@ private:
for (auto&& row : qResult.result_sets()[0].rows()) {
TObject object;
if (!object.DeserializeFromRecord(decoder, row)) {
- Controller->RestoreProblem("cannot parse exists object");
+ Controller->OnRestoringProblem("cannot parse exists object");
return;
}
objects.emplace_back(std::move(object));
}
- Controller->RestoreFinished(std::move(objects), qResult.tx_meta().id());
+ Controller->OnRestoringFinished(std::move(objects), qResult.tx_meta().id());
}
void Handle(NRequest::TEvRequestFailed::TPtr& /*ev*/) {
auto g = TBase::PassAwayGuard();
- Controller->RestoreProblem("cannot execute yql request");
+ Controller->OnRestoringProblem("cannot execute yql request");
}
public:
@@ -64,7 +64,7 @@ public:
void Bootstrap() {
if (ObjectIds.empty()) {
- Controller->RestoreProblem("no objects for restore");
+ Controller->OnRestoringProblem("no objects for restore");
TBase::PassAway();
}
auto request = ObjectIds.BuildSelectQuery(TObject::GetBehaviour()->GetStorageTablePath());
diff --git a/ydb/services/metadata/manager/restore_controller.h b/ydb/services/metadata/manager/restore_controller.h
index f0141b1ce8b..cfdecac0fd1 100644
--- a/ydb/services/metadata/manager/restore_controller.h
+++ b/ydb/services/metadata/manager/restore_controller.h
@@ -11,8 +11,8 @@ public:
using TPtr = std::shared_ptr<IRestoreObjectsController>;
virtual ~IRestoreObjectsController() = default;
- virtual void RestoreFinished(std::vector<TObject>&& objects, const TString& transactionId) = 0;
- virtual void RestoreProblem(const TString& errorMessage) = 0;
+ virtual void OnRestoringFinished(std::vector<TObject>&& objects, const TString& transactionId) = 0;
+ virtual void OnRestoringProblem(const TString& errorMessage) = 0;
};
template <class TObject>
diff --git a/ydb/services/metadata/secret/access_behaviour.h b/ydb/services/metadata/secret/access_behaviour.h
index 2d84cc462ec..ced34bfcc0a 100644
--- a/ydb/services/metadata/secret/access_behaviour.h
+++ b/ydb/services/metadata/secret/access_behaviour.h
@@ -8,7 +8,7 @@
namespace NKikimr::NMetadata::NSecret {
-class TAccessBehaviour: public IClassBehaviour {
+class TAccessBehaviour: public TClassBehaviour<TAccess> {
private:
static TFactory::TRegistrator<TAccessBehaviour> Registrator;
protected:
diff --git a/ydb/services/metadata/secret/checker_access.cpp b/ydb/services/metadata/secret/checker_access.cpp
index 54c59a27803..1b522a6ea19 100644
--- a/ydb/services/metadata/secret/checker_access.cpp
+++ b/ydb/services/metadata/secret/checker_access.cpp
@@ -18,12 +18,12 @@ void TAccessPreparationActor::StartChecker() {
}
}
if (!foundSecret) {
- Controller->PreparationProblem("used in access secret " + i.GetSecretId() + " not found");
+ Controller->OnPreparationProblem("used in access secret " + i.GetSecretId() + " not found");
return;
}
}
}
- Controller->PreparationFinished(std::move(Objects));
+ Controller->OnPreparationFinished(std::move(Objects));
}
void TAccessPreparationActor::Handle(NProvider::TEvRefreshSubscriberData::TPtr& ev) {
diff --git a/ydb/services/metadata/secret/checker_secret.cpp b/ydb/services/metadata/secret/checker_secret.cpp
index 3607de1889c..e297a5cd6b6 100644
--- a/ydb/services/metadata/secret/checker_secret.cpp
+++ b/ydb/services/metadata/secret/checker_secret.cpp
@@ -11,20 +11,20 @@ void TSecretPreparationActor::StartChecker() {
for (auto&& i : Objects) {
if (Context.GetActivityType() == NModifications::IOperationsManager::EActivityType::Alter) {
if (!Secrets->GetSecrets().contains(i)) {
- Controller->PreparationProblem("secret " + i.GetSecretId() + " not found for alter");
+ Controller->OnPreparationProblem("secret " + i.GetSecretId() + " not found for alter");
return;
}
}
for (auto&& sa : Secrets->GetAccess()) {
if (Context.GetActivityType() == NModifications::IOperationsManager::EActivityType::Drop) {
if (sa.GetOwnerUserId() == i.GetOwnerUserId() && sa.GetSecretId() == i.GetSecretId()) {
- Controller->PreparationProblem("secret " + i.GetSecretId() + " using in access for " + sa.GetAccessSID());
+ Controller->OnPreparationProblem("secret " + i.GetSecretId() + " using in access for " + sa.GetAccessSID());
return;
}
}
}
}
- Controller->PreparationFinished(std::move(Objects));
+ Controller->OnPreparationFinished(std::move(Objects));
}
void TSecretPreparationActor::Handle(NProvider::TEvRefreshSubscriberData::TPtr& ev) {
diff --git a/ydb/services/metadata/secret/initializer.cpp b/ydb/services/metadata/secret/initializer.cpp
index de06d5037a2..db36f1263f9 100644
--- a/ydb/services/metadata/secret/initializer.cpp
+++ b/ydb/services/metadata/secret/initializer.cpp
@@ -33,7 +33,7 @@ void TSecretInitializer::DoPrepare(NInitializer::IInitializerInput::TPtr control
}
result.emplace_back(NInitializer::TACLModifierConstructor::GetNoAccessModifier(TSecret::GetBehaviour()->GetStorageTablePath(), "acl"));
result.emplace_back(NInitializer::TACLModifierConstructor::GetNoAccessModifier(TSecret::GetBehaviour()->GetStorageHistoryTablePath(), "acl_history"));
- controller->PreparationFinished(result);
+ controller->OnPreparationFinished(result);
}
void TAccessInitializer::DoPrepare(NInitializer::IInitializerInput::TPtr controller) const {
@@ -66,7 +66,7 @@ void TAccessInitializer::DoPrepare(NInitializer::IInitializerInput::TPtr control
}
result.emplace_back(NInitializer::TACLModifierConstructor::GetNoAccessModifier(TAccess::GetBehaviour()->GetStorageTablePath(), "acl"));
result.emplace_back(NInitializer::TACLModifierConstructor::GetNoAccessModifier(TAccess::GetBehaviour()->GetStorageHistoryTablePath(), "acl_history"));
- controller->PreparationFinished(result);
+ controller->OnPreparationFinished(result);
}
}
diff --git a/ydb/services/metadata/secret/manager.cpp b/ydb/services/metadata/secret/manager.cpp
index c1f60a41d00..8642dee62c2 100644
--- a/ydb/services/metadata/secret/manager.cpp
+++ b/ydb/services/metadata/secret/manager.cpp
@@ -7,13 +7,13 @@ namespace NKikimr::NMetadata::NSecret {
void TAccessManager::DoPrepareObjectsBeforeModification(std::vector<TAccess>&& patchedObjects, NModifications::IAlterPreparationController<TAccess>::TPtr controller, const NModifications::IOperationsManager::TModificationContext& context) const {
if (context.GetActivityType() == IOperationsManager::EActivityType::Alter) {
- controller->PreparationProblem("access object cannot be modified");
+ controller->OnPreparationProblem("access object cannot be modified");
return;
}
if (!!context.GetUserToken()) {
for (auto&& i : patchedObjects) {
if (i.GetOwnerUserId() != context.GetUserToken()->GetUserSID()) {
- controller->PreparationProblem("no permissions for modify secret access");
+ controller->OnPreparationProblem("no permissions for modify secret access");
return;
}
}
@@ -87,7 +87,7 @@ void TSecretManager::DoPrepareObjectsBeforeModification(std::vector<TSecret>&& p
if (!!context.GetUserToken()) {
for (auto&& i : patchedObjects) {
if (i.GetOwnerUserId() != context.GetUserToken()->GetUserSID()) {
- controller->PreparationProblem("no permissions for modify secrets");
+ controller->OnPreparationProblem("no permissions for modify secrets");
return;
}
}
diff --git a/ydb/services/metadata/secret/secret_behaviour.h b/ydb/services/metadata/secret/secret_behaviour.h
index ad2fcc739aa..24ea6585c25 100644
--- a/ydb/services/metadata/secret/secret_behaviour.h
+++ b/ydb/services/metadata/secret/secret_behaviour.h
@@ -5,7 +5,7 @@
namespace NKikimr::NMetadata::NSecret {
-class TSecretBehaviour: public IClassBehaviour {
+class TSecretBehaviour: public TClassBehaviour<TSecret> {
private:
static TFactory::TRegistrator<TSecretBehaviour> Registrator;
protected:
diff --git a/ydb/services/metadata/secret/snapshot.cpp b/ydb/services/metadata/secret/snapshot.cpp
index 7e0b9423e6a..8c2e8eeb5ca 100644
--- a/ydb/services/metadata/secret/snapshot.cpp
+++ b/ydb/services/metadata/secret/snapshot.cpp
@@ -4,30 +4,8 @@ namespace NKikimr::NMetadata::NSecret {
bool TSnapshot::DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawDataResult) {
Y_VERIFY(rawDataResult.result_sets().size() == 2);
- {
- auto& rawData = rawDataResult.result_sets()[0];
- TSecret::TDecoder decoder(rawData);
- for (auto&& r : rawData.rows()) {
- TSecret object;
- if (!object.DeserializeFromRecord(decoder, r)) {
- ALS_ERROR(NKikimrServices::METADATA_SECRET) << "cannot parse secret info for snapshot";
- continue;
- }
- Secrets.emplace(object, object);
- }
- }
- {
- auto& rawData = rawDataResult.result_sets()[1];
- TAccess::TDecoder decoder(rawData);
- for (auto&& r : rawData.rows()) {
- TAccess object;
- if (!object.DeserializeFromRecord(decoder, r)) {
- ALS_ERROR(NKikimrServices::METADATA_SECRET) << "cannot parse secret info for snapshot";
- continue;
- }
- Access.emplace_back(object);
- }
- }
+ ParseSnapshotObjects<TSecret>(rawDataResult.result_sets()[0], [this](TSecret&& s) {Secrets.emplace(s, s); });
+ ParseSnapshotObjects<TAccess>(rawDataResult.result_sets()[1], [this](TAccess&& s) {Access.emplace_back(std::move(s)); });
return true;
}