aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2022-11-23 14:18:42 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2022-11-23 14:18:42 +0300
commit98228b1c88aff21f1fe7e0ee2d2ed0a9c28cc781 (patch)
tree7bb501736ad18f6fd54f858271c018dc79ee39fc
parentd2cba3927c297cd7e6af4cbbc6e581ec29ed066f (diff)
downloadydb-98228b1c88aff21f1fe7e0ee2d2ed0a9c28cc781.tar.gz
metadata provider extension - add objects modification.
-rw-r--r--ydb/core/base/events.h2
-rw-r--r--ydb/core/protos/services.proto5
-rw-r--r--ydb/core/testlib/common_helper.cpp29
-rw-r--r--ydb/core/testlib/common_helper.h3
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp6
-rw-r--r--ydb/core/tx/tiering/CMakeLists.txt15
-rw-r--r--ydb/core/tx/tiering/common.h20
-rw-r--r--ydb/core/tx/tiering/common/CMakeLists.txt17
-rw-r--r--ydb/core/tx/tiering/common/global_tier_id.cpp5
-rw-r--r--ydb/core/tx/tiering/common/global_tier_id.h27
-rw-r--r--ydb/core/tx/tiering/decoder.h18
-rw-r--r--ydb/core/tx/tiering/external_data.cpp20
-rw-r--r--ydb/core/tx/tiering/external_data.h12
-rw-r--r--ydb/core/tx/tiering/manager.cpp23
-rw-r--r--ydb/core/tx/tiering/manager.h8
-rw-r--r--ydb/core/tx/tiering/owner_enrich.cpp111
-rw-r--r--ydb/core/tx/tiering/owner_enrich.h48
-rw-r--r--ydb/core/tx/tiering/path_cleaner.cpp2
-rw-r--r--ydb/core/tx/tiering/rule.cpp26
-rw-r--r--ydb/core/tx/tiering/rule/CMakeLists.txt31
-rw-r--r--ydb/core/tx/tiering/rule/initializer.cpp54
-rw-r--r--ydb/core/tx/tiering/rule/initializer.h14
-rw-r--r--ydb/core/tx/tiering/rule/manager.cpp12
-rw-r--r--ydb/core/tx/tiering/rule/manager.h15
-rw-r--r--ydb/core/tx/tiering/rule/object.cpp158
-rw-r--r--ydb/core/tx/tiering/rule/object.h (renamed from ydb/core/tx/tiering/rule.h)82
-rw-r--r--ydb/core/tx/tiering/snapshot.h6
-rw-r--r--ydb/core/tx/tiering/snapshot_enrich.h2
-rw-r--r--ydb/core/tx/tiering/tier/CMakeLists.txt31
-rw-r--r--ydb/core/tx/tiering/tier/initializer.cpp39
-rw-r--r--ydb/core/tx/tiering/tier/initializer.h14
-rw-r--r--ydb/core/tx/tiering/tier/manager.cpp12
-rw-r--r--ydb/core/tx/tiering/tier/manager.h16
-rw-r--r--ydb/core/tx/tiering/tier/object.cpp94
-rw-r--r--ydb/core/tx/tiering/tier/object.h (renamed from ydb/core/tx/tiering/tier_config.h)27
-rw-r--r--ydb/core/tx/tiering/tier_config.cpp33
-rw-r--r--ydb/core/tx/tiering/ut/ut_tiers.cpp153
-rw-r--r--ydb/services/bg_tasks/abstract/task.h2
-rw-r--r--ydb/services/bg_tasks/ds_table/CMakeLists.txt1
-rw-r--r--ydb/services/bg_tasks/ds_table/executor.cpp105
-rw-r--r--ydb/services/bg_tasks/ds_table/executor.h26
-rw-r--r--ydb/services/bg_tasks/ds_table/executor_controller.cpp4
-rw-r--r--ydb/services/bg_tasks/ds_table/executor_controller.h5
-rw-r--r--ydb/services/bg_tasks/ds_table/initialization.cpp68
-rw-r--r--ydb/services/bg_tasks/ds_table/initialization.h21
-rw-r--r--ydb/services/metadata/CMakeLists.txt3
-rw-r--r--ydb/services/metadata/abstract/CMakeLists.txt3
-rw-r--r--ydb/services/metadata/abstract/common.cpp17
-rw-r--r--ydb/services/metadata/abstract/common.h83
-rw-r--r--ydb/services/metadata/abstract/decoder.cpp (renamed from ydb/core/tx/tiering/decoder.cpp)45
-rw-r--r--ydb/services/metadata/abstract/decoder.h30
-rw-r--r--ydb/services/metadata/abstract/fetcher.cpp24
-rw-r--r--ydb/services/metadata/abstract/fetcher.h89
-rw-r--r--ydb/services/metadata/abstract/manager.cpp5
-rw-r--r--ydb/services/metadata/abstract/manager.h96
-rw-r--r--ydb/services/metadata/ds_table/CMakeLists.txt1
-rw-r--r--ydb/services/metadata/ds_table/accessor_refresh.cpp62
-rw-r--r--ydb/services/metadata/ds_table/accessor_refresh.h46
-rw-r--r--ydb/services/metadata/ds_table/config.h2
-rw-r--r--ydb/services/metadata/ds_table/service.cpp110
-rw-r--r--ydb/services/metadata/ds_table/service.h107
-rw-r--r--ydb/services/metadata/initializer/CMakeLists.txt5
-rw-r--r--ydb/services/metadata/initializer/accessor_init.cpp57
-rw-r--r--ydb/services/metadata/initializer/accessor_init.h24
-rw-r--r--ydb/services/metadata/initializer/common.h29
-rw-r--r--ydb/services/metadata/initializer/controller.cpp18
-rw-r--r--ydb/services/metadata/initializer/controller.h21
-rw-r--r--ydb/services/metadata/initializer/events.h21
-rw-r--r--ydb/services/metadata/initializer/fetcher.cpp10
-rw-r--r--ydb/services/metadata/initializer/fetcher.h16
-rw-r--r--ydb/services/metadata/initializer/initializer.cpp33
-rw-r--r--ydb/services/metadata/initializer/initializer.h15
-rw-r--r--ydb/services/metadata/initializer/manager.cpp12
-rw-r--r--ydb/services/metadata/initializer/manager.h19
-rw-r--r--ydb/services/metadata/initializer/object.cpp57
-rw-r--r--ydb/services/metadata/initializer/object.h76
-rw-r--r--ydb/services/metadata/initializer/snapshot.cpp30
-rw-r--r--ydb/services/metadata/initializer/snapshot.h21
-rw-r--r--ydb/services/metadata/manager/CMakeLists.txt35
-rw-r--r--ydb/services/metadata/manager/alter.cpp5
-rw-r--r--ydb/services/metadata/manager/alter.h384
-rw-r--r--ydb/services/metadata/manager/common.cpp5
-rw-r--r--ydb/services/metadata/manager/common.h30
-rw-r--r--ydb/services/metadata/manager/generic_manager.cpp5
-rw-r--r--ydb/services/metadata/manager/generic_manager.h93
-rw-r--r--ydb/services/metadata/manager/modification.cpp5
-rw-r--r--ydb/services/metadata/manager/modification.h157
-rw-r--r--ydb/services/metadata/manager/modification_controller.cpp5
-rw-r--r--ydb/services/metadata/manager/modification_controller.h29
-rw-r--r--ydb/services/metadata/manager/object.cpp27
-rw-r--r--ydb/services/metadata/manager/object.h24
-rw-r--r--ydb/services/metadata/manager/preparation_controller.cpp5
-rw-r--r--ydb/services/metadata/manager/preparation_controller.h40
-rw-r--r--ydb/services/metadata/manager/restore.cpp5
-rw-r--r--ydb/services/metadata/manager/restore.h76
-rw-r--r--ydb/services/metadata/manager/restore_controller.cpp5
-rw-r--r--ydb/services/metadata/manager/restore_controller.h42
-rw-r--r--ydb/services/metadata/manager/table_record.cpp294
-rw-r--r--ydb/services/metadata/manager/table_record.h91
-rw-r--r--ydb/services/metadata/manager/ydb_value_operator.cpp132
-rw-r--r--ydb/services/metadata/manager/ydb_value_operator.h28
-rw-r--r--ydb/services/metadata/request/request_actor.h47
-rw-r--r--ydb/services/metadata/secret/CMakeLists.txt42
-rw-r--r--ydb/services/metadata/secret/access.cpp87
-rw-r--r--ydb/services/metadata/secret/access.h55
-rw-r--r--ydb/services/metadata/secret/fetcher.cpp13
-rw-r--r--ydb/services/metadata/secret/fetcher.h15
-rw-r--r--ydb/services/metadata/secret/initializer.cpp68
-rw-r--r--ydb/services/metadata/secret/initializer.h20
-rw-r--r--ydb/services/metadata/secret/manager.cpp16
-rw-r--r--ydb/services/metadata/secret/manager.h25
-rw-r--r--ydb/services/metadata/secret/secret.cpp90
-rw-r--r--ydb/services/metadata/secret/secret.h70
-rw-r--r--ydb/services/metadata/secret/snapshot.cpp47
-rw-r--r--ydb/services/metadata/secret/snapshot.h23
-rw-r--r--ydb/services/metadata/secret/ut/ut_secret.cpp213
-rw-r--r--ydb/services/metadata/service.cpp8
-rw-r--r--ydb/services/metadata/service.h36
118 files changed, 4294 insertions, 712 deletions
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h
index af59ea23c73..7cbd1868460 100644
--- a/ydb/core/base/events.h
+++ b/ydb/core/base/events.h
@@ -154,6 +154,8 @@ struct TKikimrEvents : TEvents {
ES_TIERING,
ES_METADATA_INITIALIZER,
ES_YDB_AUDIT_LOG,
+ ES_METADATA_MANAGER,
+ ES_METADATA_SECRET
};
};
diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto
index 1f1fb531d79..8f5728be861 100644
--- a/ydb/core/protos/services.proto
+++ b/ydb/core/protos/services.proto
@@ -332,14 +332,15 @@ enum EServiceKikimr {
// MetadataProvider
METADATA_PROVIDER = 1500;
+ METADATA_INITIALIZER = 1501;
+ METADATA_MANAGER = 1502;
+ METADATA_SECRET = 1503;
// Tiering
TX_TIERING = 1600;
// Background tasks
BG_TASKS = 1700;
-
- METADATA_INITIALIZER = 1800;
};
message TActivity {
diff --git a/ydb/core/testlib/common_helper.cpp b/ydb/core/testlib/common_helper.cpp
index 45408b456cc..eb9a9344447 100644
--- a/ydb/core/testlib/common_helper.cpp
+++ b/ydb/core/testlib/common_helper.cpp
@@ -19,12 +19,35 @@ void THelper::WaitForSchemeOperation(TActorId sender, ui64 txId) {
runtime.GrabEdgeEventRethrow<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult>(sender);
}
-void THelper::StartDataRequest(const TString& request) const {
+void THelper::StartDataRequest(const TString& request, const bool expectSuccess) const {
NYdb::NTable::TTableClient tClient(Server.GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false));
- tClient.CreateSession().Subscribe([request](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) {
+ auto expectation = expectSuccess;
+ tClient.CreateSession().Subscribe([request, expectation](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) {
auto session = f.GetValueSync().GetSession();
session.ExecuteDataQuery(request
- , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx());
+ , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx())
+ .Subscribe([expectation](NYdb::NTable::TAsyncDataQueryResult f)
+ {
+ TStringStream ss;
+ f.GetValueSync().GetIssues().PrintTo(ss, false);
+ Cerr << ss.Str() << Endl;
+ Y_VERIFY(expectation == f.GetValueSync().IsSuccess());
+ });
+ });
+}
+
+void THelper::StartSchemaRequest(const TString& request, const bool expectSuccess) const {
+ NYdb::NTable::TTableClient tClient(Server.GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false));
+ auto expectation = expectSuccess;
+ tClient.CreateSession().Subscribe([request, expectation](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) {
+ auto session = f.GetValueSync().GetSession();
+ session.ExecuteSchemeQuery(request).Subscribe([expectation](NYdb::TAsyncStatus f)
+ {
+ TStringStream ss;
+ f.GetValueSync().GetIssues().PrintTo(ss, false);
+ Cerr << ss.Str() << Endl;
+ Y_VERIFY(expectation == f.GetValueSync().IsSuccess());
+ });
});
}
diff --git a/ydb/core/testlib/common_helper.h b/ydb/core/testlib/common_helper.h
index 25ffdf809bc..4218e1f4476 100644
--- a/ydb/core/testlib/common_helper.h
+++ b/ydb/core/testlib/common_helper.h
@@ -16,6 +16,7 @@ public:
void DropTable(const TString& tablePath);
- void StartDataRequest(const TString& request) const;
+ void StartDataRequest(const TString& request, const bool expectSuccess = true) const;
+ void StartSchemaRequest(const TString& request, const bool expectSuccess = true) const;
};
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index aa18ffcd877..3aa13a26915 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -845,7 +845,7 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u
if (regularTtls.empty()) {
regularTtls = Ttl.MakeIndexTtlMap(TInstant::Now(), force);
}
- const bool tiersUsage = regularTtls.empty() && Tiers && Tiers->IsActive();
+ const bool tiersUsage = regularTtls.empty() && Tiers;
if (tiersUsage) {
regularTtls = Tiers->GetTiering();
}
@@ -1045,7 +1045,7 @@ void TColumnShard::Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev)
NOlap::TIndexInfo TColumnShard::GetActualIndexInfo(const bool tiersUsage) const {
auto indexInfo = PrimaryIndex->GetIndexInfo();
- if (tiersUsage && Tiers && Tiers->IsActive()) {
+ if (tiersUsage && Tiers) {
for (auto&& i : *Tiers) {
indexInfo.AddStorageTier(i.second.BuildTierStorage());
}
@@ -1055,7 +1055,7 @@ NOlap::TIndexInfo TColumnShard::GetActualIndexInfo(const bool tiersUsage) const
void TColumnShard::ActivateTiering(const ui64 pathId, const bool enableTiering) {
if (OwnerPath && !Tiers) {
- Tiers = std::make_shared<TTiersManager>(TabletID(), SelfId(), OwnerPath);
+ Tiers = std::make_shared<TTiersManager>(TabletID(), SelfId());
Tiers->Start(Tiers);
}
if (!!Tiers) {
diff --git a/ydb/core/tx/tiering/CMakeLists.txt b/ydb/core/tx/tiering/CMakeLists.txt
index 59d6f170428..0e438b65e9d 100644
--- a/ydb/core/tx/tiering/CMakeLists.txt
+++ b/ydb/core/tx/tiering/CMakeLists.txt
@@ -6,6 +6,9 @@
# original buildsystem will not be accepted.
+add_subdirectory(common)
+add_subdirectory(rule)
+add_subdirectory(tier)
add_subdirectory(ut)
add_library(core-tx-tiering)
@@ -16,6 +19,9 @@ target_link_libraries(core-tx-tiering PUBLIC
cpp-json-writer
ydb-core-blobstorage
ydb-core-protos
+ tx-tiering-rule
+ tx-tiering-tier
+ tx-tiering-common
core-tablet_flat-protos
ydb-core-wrappers
api-protos
@@ -27,11 +33,6 @@ target_sources(core-tx-tiering PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/tier_cleaner.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/path_cleaner.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/manager.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/decoder.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/external_data.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/tier_config.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/rule.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/owner_enrich.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/snapshot_enrich.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/snapshot.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/s3_actor.cpp
@@ -45,6 +46,9 @@ target_link_libraries(core-tx-tiering.global PUBLIC
cpp-json-writer
ydb-core-blobstorage
ydb-core-protos
+ tx-tiering-rule
+ tx-tiering-tier
+ tx-tiering-common
core-tablet_flat-protos
ydb-core-wrappers
api-protos
@@ -53,4 +57,5 @@ target_link_libraries(core-tx-tiering.global PUBLIC
)
target_sources(core-tx-tiering.global PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/cleaner_task.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/external_data.cpp
)
diff --git a/ydb/core/tx/tiering/common.h b/ydb/core/tx/tiering/common.h
index 536e920a301..906f897dd65 100644
--- a/ydb/core/tx/tiering/common.h
+++ b/ydb/core/tx/tiering/common.h
@@ -7,26 +7,6 @@
namespace NKikimr::NColumnShard::NTiers {
-class TGlobalTierId {
-private:
- YDB_ACCESSOR_DEF(TString, OwnerPath);
- YDB_ACCESSOR_DEF(TString, TierName);
-public:
- TGlobalTierId(const TString& ownerPath, const TString& tierName)
- : OwnerPath(ownerPath)
- , TierName(tierName) {
-
- }
-
- bool operator<(const TGlobalTierId& item) const {
- return std::tie(OwnerPath, TierName) < std::tie(item.OwnerPath, item.TierName);
- }
-
- TString ToString() const {
- return OwnerPath + "." + TierName;
- }
-};
-
enum EEvents {
EvTierCleared = EventSpaceBegin(TKikimrEvents::ES_TIERING),
EvEnd
diff --git a/ydb/core/tx/tiering/common/CMakeLists.txt b/ydb/core/tx/tiering/common/CMakeLists.txt
new file mode 100644
index 00000000000..898b441f818
--- /dev/null
+++ b/ydb/core/tx/tiering/common/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(tx-tiering-common)
+target_link_libraries(tx-tiering-common PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+)
+target_sources(tx-tiering-common PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/common/global_tier_id.cpp
+)
diff --git a/ydb/core/tx/tiering/common/global_tier_id.cpp b/ydb/core/tx/tiering/common/global_tier_id.cpp
new file mode 100644
index 00000000000..3bc06dedbf4
--- /dev/null
+++ b/ydb/core/tx/tiering/common/global_tier_id.cpp
@@ -0,0 +1,5 @@
+#include "global_tier_id.h"
+
+namespace NKikimr::NColumnShard::NTiers {
+
+}
diff --git a/ydb/core/tx/tiering/common/global_tier_id.h b/ydb/core/tx/tiering/common/global_tier_id.h
new file mode 100644
index 00000000000..3675987f684
--- /dev/null
+++ b/ydb/core/tx/tiering/common/global_tier_id.h
@@ -0,0 +1,27 @@
+#pragma once
+#include <ydb/library/accessor/accessor.h>
+#include <util/generic/string.h>
+
+namespace NKikimr::NColumnShard::NTiers {
+
+class TGlobalTierId {
+private:
+ YDB_ACCESSOR_DEF(TString, OwnerPath);
+ YDB_ACCESSOR_DEF(TString, TierName);
+public:
+ TGlobalTierId(const TString& ownerPath, const TString& tierName)
+ : OwnerPath(ownerPath)
+ , TierName(tierName) {
+
+ }
+
+ bool operator<(const TGlobalTierId& item) const {
+ return std::tie(OwnerPath, TierName) < std::tie(item.OwnerPath, item.TierName);
+ }
+
+ TString ToString() const {
+ return OwnerPath + "." + TierName;
+ }
+};
+
+}
diff --git a/ydb/core/tx/tiering/decoder.h b/ydb/core/tx/tiering/decoder.h
deleted file mode 100644
index 587f4ae9da8..00000000000
--- a/ydb/core/tx/tiering/decoder.h
+++ /dev/null
@@ -1,18 +0,0 @@
-#pragma once
-#include <ydb/public/api/protos/ydb_value.pb.h>
-#include <util/datetime/base.h>
-
-namespace NKikimr::NInternal {
-
-class TDecoderBase {
-protected:
- i32 GetFieldIndex(const Ydb::ResultSet& rawData, const TString& columnId, const bool verify = true) const;
-public:
- bool Read(const ui32 columnIdx, TString& result, const Ydb::Value& r) const;
- bool ReadDebugProto(const ui32 columnIdx, ::google::protobuf::Message& result, const Ydb::Value& r) const;
- bool Read(const ui32 columnIdx, TDuration& result, const Ydb::Value& r) const;
- bool Read(const ui32 columnIdx, TInstant& result, const Ydb::Value& r) const;
- bool Read(const ui32 columnIdx, bool& result, const Ydb::Value& r) const;
-};
-
-}
diff --git a/ydb/core/tx/tiering/external_data.cpp b/ydb/core/tx/tiering/external_data.cpp
index 1d9ef8376dd..a280c35c125 100644
--- a/ydb/core/tx/tiering/external_data.cpp
+++ b/ydb/core/tx/tiering/external_data.cpp
@@ -1,8 +1,9 @@
#include "external_data.h"
-#include "owner_enrich.h"
#include "snapshot_enrich.h"
#include <ydb/core/base/path.h>
+#include <ydb/core/tx/tiering/tier/manager.h>
+#include <ydb/core/tx/tiering/rule/manager.h>
#include <library/cpp/json/writer/json_value.h>
#include <library/cpp/protobuf/json/proto2json.h>
@@ -19,20 +20,15 @@ void TSnapshotConstructor::EnrichSnapshotData(ISnapshot::TPtr original, NMetadat
TActivationContext::AsActorContext().Register(new TActorSnapshotEnrich(original, controller, TablesDecoder));
}
-TSnapshotConstructor::TSnapshotConstructor(const TString& ownerPath)
- : OwnerPath(ownerPath)
+TSnapshotConstructor::TSnapshotConstructor()
{
- Y_VERIFY(!!OwnerPath);
- const TString ownerPathNormal = TFsPath(OwnerPath).Fix().GetPath();
- const TString tenantPathNormal = TFsPath(AppData()->TenantName).Fix().GetPath();
- Y_VERIFY(ownerPathNormal.StartsWith(tenantPathNormal));
- TablePath = "/" + AppData()->TenantName + "/.configs/tiering/";
- Tables.emplace_back(TablePath + "tiers");
- Tables.emplace_back(TablePath + "rules");
}
-void TSnapshotConstructor::DoPrepare(NMetadataInitializer::IController::TPtr controller) const {
- TActivationContext::AsActorContext().Register(new TActorOwnerEnrich(OwnerPath, controller, Tables));
+std::vector<NKikimr::NMetadata::IOperationsManager::TPtr> TSnapshotConstructor::DoGetManagers() const {
+ std::vector<NMetadata::IOperationsManager::TPtr> result;
+ result.emplace_back(std::make_shared<TTiersManager>());
+ result.emplace_back(std::make_shared<TTieringRulesManager>());
+ return result;
}
}
diff --git a/ydb/core/tx/tiering/external_data.h b/ydb/core/tx/tiering/external_data.h
index a04e3caf5df..4524a792dc8 100644
--- a/ydb/core/tx/tiering/external_data.h
+++ b/ydb/core/tx/tiering/external_data.h
@@ -10,24 +10,18 @@
namespace NKikimr::NColumnShard::NTiers {
-class TSnapshotConstructor: public NMetadataProvider::TGenericSnapshotParser<TConfigsSnapshot> {
+class TSnapshotConstructor: public NMetadataProvider::TSnapshotsManager<TConfigsSnapshot> {
private:
using TNavigate = NSchemeCache::TSchemeCacheNavigate;
using TBaseActor = TActor<TSnapshotConstructor>;
using ISnapshot = NMetadataProvider::ISnapshot;
- TVector<TString> Tables;
- TString TablePath;
- const TString OwnerPath;
mutable std::shared_ptr<TTablesDecoderCache> TablesDecoder;
protected:
- virtual const TVector<TString>& DoGetTables() const override {
- return Tables;
- }
- virtual void DoPrepare(NMetadataInitializer::IController::TPtr controller) const override;
+ virtual std::vector<NMetadata::IOperationsManager::TPtr> DoGetManagers() const override;
public:
virtual void EnrichSnapshotData(ISnapshot::TPtr original, NMetadataProvider::ISnapshotAcceptorController::TPtr controller) const override;
- TSnapshotConstructor(const TString& ownerPath);
+ TSnapshotConstructor();
};
}
diff --git a/ydb/core/tx/tiering/manager.cpp b/ydb/core/tx/tiering/manager.cpp
index 985172f6c08..33446f84de6 100644
--- a/ydb/core/tx/tiering/manager.cpp
+++ b/ydb/core/tx/tiering/manager.cpp
@@ -44,7 +44,7 @@ public:
namespace NTiers {
-TManager& TManager::Restart(const TTierConfig& config, const bool activity) {
+TManager& TManager::Restart(const TTierConfig& config) {
if (Config.IsSame(config)) {
return *this;
}
@@ -52,9 +52,7 @@ TManager& TManager::Restart(const TTierConfig& config, const bool activity) {
Stop();
}
Config = config;
- if (activity) {
- Start();
- }
+ Start();
return *this;
}
@@ -138,7 +136,7 @@ void TTiersManager::TakeConfigs(NMetadataProvider::ISnapshot::TPtr snapshotExt)
itSelf->second.Stop();
itSelf = Managers.erase(itSelf);
} else {
- itSelf->second.Restart(it->second, IsActive());
+ itSelf->second.Restart(it->second);
++itSelf;
}
}
@@ -148,9 +146,7 @@ void TTiersManager::TakeConfigs(NMetadataProvider::ISnapshot::TPtr snapshotExt)
}
NTiers::TManager localManager(TabletId, TabletActorId, i.second);
auto& manager = Managers.emplace(i.second.GetGlobalTierId(), std::move(localManager)).first->second;
- if (IsActive()) {
- manager.Start();
- }
+ manager.Start();
}
}
@@ -169,10 +165,7 @@ TActorId TTiersManager::GetStorageActorId(const NTiers::TGlobalTierId& tierId) {
}
TTiersManager& TTiersManager::Start(std::shared_ptr<TTiersManager> ownerPtr) {
- if (ActiveFlag) {
- return *this;
- }
- ActiveFlag = true;
+ ALS_ERROR(NKikimrServices::TX_TIERING) << "AAAAAAAAAAAAAA ";
Y_VERIFY(!Actor);
Actor = new TTiersManager::TActor(ownerPtr);
TActivationContext::AsActorContext().RegisterWithSameMailbox(Actor);
@@ -183,11 +176,9 @@ TTiersManager& TTiersManager::Start(std::shared_ptr<TTiersManager> ownerPtr) {
}
TTiersManager& TTiersManager::Stop() {
- if (!ActiveFlag) {
+ if (!Actor) {
return *this;
}
- ActiveFlag = false;
- Y_VERIFY(!!Actor);
if (TlsActivationContext) {
TActivationContext::AsActorContext().Send(Actor->SelfId(), new NActors::TEvents::TEvPoison);
}
@@ -206,7 +197,7 @@ const NTiers::TManager& TTiersManager::GetManagerVerified(const NTiers::TGlobalT
NMetadataProvider::ISnapshotParser::TPtr TTiersManager::GetExternalDataManipulation() const {
if (!ExternalDataManipulation) {
- ExternalDataManipulation = std::make_shared<NTiers::TSnapshotConstructor>(OwnerPath);
+ ExternalDataManipulation = std::make_shared<NTiers::TSnapshotConstructor>();
}
return ExternalDataManipulation;
}
diff --git a/ydb/core/tx/tiering/manager.h b/ydb/core/tx/tiering/manager.h
index 3a716addf2c..f9effb784c0 100644
--- a/ydb/core/tx/tiering/manager.h
+++ b/ydb/core/tx/tiering/manager.h
@@ -1,6 +1,5 @@
#pragma once
#include "external_data.h"
-#include "tier_config.h"
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/actor.h>
@@ -21,7 +20,7 @@ public:
static NOlap::TCompression ConvertCompression(const NKikimrSchemeOp::TCompressionOptions& compression);
NOlap::TStorageTier BuildTierStorage() const;
- TManager& Restart(const TTierConfig& config, const bool activity);
+ TManager& Restart(const TTierConfig& config);
bool NeedExport() const {
return Config.NeedExport();
}
@@ -36,20 +35,17 @@ private:
using TManagers = TMap<NTiers::TGlobalTierId, NTiers::TManager>;
ui64 TabletId = 0;
const TActorId TabletActorId;
- TString OwnerPath;
TActor* Actor = nullptr;
std::unordered_set<ui64> EnabledPathId;
YDB_READONLY_DEF(TManagers, Managers);
- YDB_READONLY_FLAG(Active, false);
NMetadataProvider::ISnapshot::TPtr Snapshot;
mutable NMetadataProvider::ISnapshotParser::TPtr ExternalDataManipulation;
public:
- TTiersManager(const ui64 tabletId, const TActorId& tabletActorId, const TString& ownerPath)
+ TTiersManager(const ui64 tabletId, const TActorId& tabletActorId)
: TabletId(tabletId)
, TabletActorId(tabletActorId)
- , OwnerPath(ownerPath)
{
}
TActorId GetActorId() const;
diff --git a/ydb/core/tx/tiering/owner_enrich.cpp b/ydb/core/tx/tiering/owner_enrich.cpp
deleted file mode 100644
index f3d2fa611c2..00000000000
--- a/ydb/core/tx/tiering/owner_enrich.cpp
+++ /dev/null
@@ -1,111 +0,0 @@
-#include "owner_enrich.h"
-#include "snapshot.h"
-
-#include <ydb/core/base/path.h>
-
-#include <util/string/join.h>
-
-namespace NKikimr::NColumnShard::NTiers {
-
-TVector<NMetadataInitializer::ITableModifier::TPtr> TActorOwnerEnrich::BuildModifiers(const NSchemeCache::TSchemeCacheNavigate::TEntry& ownerEntry) const {
- TVector<NMetadataInitializer::ITableModifier::TPtr> result;
- {
- Ydb::Table::CreateTableRequest request;
- request.set_session_id("");
- request.set_path(TableNames[0]);
- request.add_primary_key("ownerPath");
- request.add_primary_key("tierName");
- {
- auto& column = *request.add_columns();
- column.set_name("ownerPath");
- column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
- }
- {
- auto& column = *request.add_columns();
- column.set_name("tierName");
- column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
- }
- {
- auto& column = *request.add_columns();
- column.set_name("tierConfig");
- column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
- }
- result.emplace_back(new NMetadataInitializer::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request));
- }
- {
- Ydb::Table::CreateTableRequest request;
- request.set_session_id("");
- request.set_path(TableNames[1]);
- request.add_primary_key("ownerPath");
- request.add_primary_key("tablePath");
- request.add_primary_key("tierName");
- {
- auto& column = *request.add_columns();
- column.set_name("ownerPath");
- column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
- }
- {
- auto& column = *request.add_columns();
- column.set_name("tierName");
- column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
- }
- {
- auto& column = *request.add_columns();
- column.set_name("tablePath");
- column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
- }
- {
- auto& column = *request.add_columns();
- column.set_name("durationForEvict");
- column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
- }
- {
- auto& column = *request.add_columns();
- column.set_name("column");
- column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
- }
- result.emplace_back(new NMetadataInitializer::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request));
- }
- Y_VERIFY(!!ownerEntry.SecurityObject);
- return result;
-}
-
-void TActorOwnerEnrich::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
- auto* info = ev->Get();
- const auto& request = info->Request;
- auto g = PassAwayGuard();
- if (request->ResultSet.empty()) {
- Controller->PreparationProblem("cannot resolve owner path to ids");
- return;
- }
-
- if (request->ResultSet.size() != 1) {
- Controller->PreparationProblem("unexpected result set size: " + ::ToString(request->ResultSet.size()));
- return;
- }
-
- for (auto&& i : request->ResultSet) {
- const TString path = "/" + JoinSeq("/", i.Path);
- switch (i.Status) {
- case TNavigate::EStatus::Ok:
- Controller->PreparationFinished(BuildModifiers(i));
- return;
- default:
- Controller->PreparationProblem(::ToString(i.Status) + " for '" + path + "'");
- return;
- }
- }
- Controller->PreparationProblem("unexpected");
-}
-
-void TActorOwnerEnrich::Bootstrap() {
- Become(&TThis::StateMain);
- auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
- request->DatabaseName = NKikimr::CanonizePath(AppData()->TenantName);
- auto& entry = request->ResultSet.emplace_back();
- entry.Operation = TNavigate::OpPath;
- entry.Path = NKikimr::SplitPath(OwnerPath);
- Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release()));
-}
-
-}
diff --git a/ydb/core/tx/tiering/owner_enrich.h b/ydb/core/tx/tiering/owner_enrich.h
deleted file mode 100644
index 969c5fea28e..00000000000
--- a/ydb/core/tx/tiering/owner_enrich.h
+++ /dev/null
@@ -1,48 +0,0 @@
-#pragma once
-#include "rule.h"
-#include "tier_config.h"
-
-#include <ydb/core/protos/flat_scheme_op.pb.h>
-#include <ydb/core/tx/scheme_cache/scheme_cache.h>
-#include <ydb/services/metadata/service.h>
-
-#include <library/cpp/json/writer/json_value.h>
-
-namespace NKikimr::NColumnShard::NTiers {
-
-class TActorOwnerEnrich: public TActorBootstrapped<TActorOwnerEnrich> {
-private:
- using TBase = TActorBootstrapped<TActorOwnerEnrich>;
- using TNavigate = NSchemeCache::TSchemeCacheNavigate;
- const TString OwnerPath;
- NMetadataInitializer::IController::TPtr Controller;
- const TVector<TString> TableNames;
-private:
- TVector<NMetadataInitializer::ITableModifier::TPtr> BuildModifiers(const NSchemeCache::TSchemeCacheNavigate::TEntry& ownerEntry) const;
-
- void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
-public:
- static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
- return NKikimrServices::TActivity::TIERING_OWNER_ENRICH;
- }
-
- STATEFN(StateMain) {
- switch (ev->GetTypeRewrite()) {
- hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
- default:
- break;
- }
- }
-
- TActorOwnerEnrich(const TString& ownerPath, NMetadataInitializer::IController::TPtr controller, const TVector<TString>& tableNames)
- : OwnerPath(ownerPath)
- , Controller(controller)
- , TableNames(tableNames)
- {
-
- }
-
- void Bootstrap();
-};
-
-}
diff --git a/ydb/core/tx/tiering/path_cleaner.cpp b/ydb/core/tx/tiering/path_cleaner.cpp
index 70e55ad362c..7be50ae766e 100644
--- a/ydb/core/tx/tiering/path_cleaner.cpp
+++ b/ydb/core/tx/tiering/path_cleaner.cpp
@@ -20,7 +20,7 @@ void TPathCleaner::Handle(TEvTierCleared::TPtr& ev) {
NMetadataProvider::ISnapshotParser::TPtr TPathCleaner::GetTieringSnapshotParser() const {
if (!ExternalDataManipulation) {
- ExternalDataManipulation = std::make_shared<NTiers::TSnapshotConstructor>(OwnerPath);
+ ExternalDataManipulation = std::make_shared<NTiers::TSnapshotConstructor>();
}
return ExternalDataManipulation;
}
diff --git a/ydb/core/tx/tiering/rule.cpp b/ydb/core/tx/tiering/rule.cpp
deleted file mode 100644
index 6bcb4fcbfa9..00000000000
--- a/ydb/core/tx/tiering/rule.cpp
+++ /dev/null
@@ -1,26 +0,0 @@
-#include "rule.h"
-
-namespace NKikimr::NColumnShard::NTiers {
-NJson::TJsonValue TTieringRule::GetDebugJson() const {
- NJson::TJsonValue result = NJson::JSON_MAP;
- result.InsertValue(TDecoder::TierName, TierName);
- result.InsertValue(TDecoder::TablePath, TablePath);
- result.InsertValue("tablePathId", TablePathId);
- result.InsertValue(TDecoder::Column, Column);
- result.InsertValue(TDecoder::DurationForEvict, DurationForEvict.ToString());
- return result;
-}
-
-NJson::TJsonValue TTableTiering::GetDebugJson() const {
- NJson::TJsonValue result = NJson::JSON_MAP;
- result.InsertValue(TTieringRule::TDecoder::TablePath, TablePath);
- result.InsertValue("tablePathId", TablePathId);
- result.InsertValue(TTieringRule::TDecoder::Column, Column);
- auto&& jsonRules = result.InsertValue("rules", NJson::JSON_ARRAY);
- for (auto&& i : Rules) {
- jsonRules.AppendValue(i.GetDebugJson());
- }
- return result;
-}
-
-}
diff --git a/ydb/core/tx/tiering/rule/CMakeLists.txt b/ydb/core/tx/tiering/rule/CMakeLists.txt
new file mode 100644
index 00000000000..2efcc188954
--- /dev/null
+++ b/ydb/core/tx/tiering/rule/CMakeLists.txt
@@ -0,0 +1,31 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(tx-tiering-rule)
+target_link_libraries(tx-tiering-rule PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ services-metadata-initializer
+ services-metadata-abstract
+)
+target_sources(tx-tiering-rule PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/rule/object.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/rule/initializer.cpp
+)
+
+add_global_library_for(tx-tiering-rule.global tx-tiering-rule)
+target_link_libraries(tx-tiering-rule.global PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ services-metadata-initializer
+ services-metadata-abstract
+)
+target_sources(tx-tiering-rule.global PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/rule/manager.cpp
+)
diff --git a/ydb/core/tx/tiering/rule/initializer.cpp b/ydb/core/tx/tiering/rule/initializer.cpp
new file mode 100644
index 00000000000..be36ce9e9e3
--- /dev/null
+++ b/ydb/core/tx/tiering/rule/initializer.cpp
@@ -0,0 +1,54 @@
+#include "initializer.h"
+#include "object.h"
+
+namespace NKikimr::NColumnShard::NTiers {
+
+TVector<NKikimr::NMetadataInitializer::ITableModifier::TPtr> TTierRulesInitializer::BuildModifiers() const {
+ TVector<NMetadataInitializer::ITableModifier::TPtr> result;
+ {
+ Ydb::Table::CreateTableRequest request;
+ request.set_session_id("");
+ request.set_path(TTieringRule::GetStorageTablePath());
+ request.add_primary_key("tieringRuleId");
+ {
+ auto& column = *request.add_columns();
+ column.set_name("tieringRuleId");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("ownerPath");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("tierName");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("tablePath");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("durationForEvict");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("column");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ result.emplace_back(new NMetadataInitializer::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request, "create_rules"));
+ auto hRequest = TTieringRule::AddHistoryTableScheme(request);
+ result.emplace_back(new NMetadataInitializer::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(hRequest, "create_rules_history"));
+ }
+ return result;
+}
+
+void TTierRulesInitializer::DoPrepare(NMetadataInitializer::IInitializerInput::TPtr controller) const {
+ controller->PreparationFinished(BuildModifiers());
+}
+
+}
diff --git a/ydb/core/tx/tiering/rule/initializer.h b/ydb/core/tx/tiering/rule/initializer.h
new file mode 100644
index 00000000000..f0d99a25ca8
--- /dev/null
+++ b/ydb/core/tx/tiering/rule/initializer.h
@@ -0,0 +1,14 @@
+#pragma once
+#include <ydb/services/metadata/abstract/common.h>
+#include <ydb/services/metadata/initializer/common.h>
+
+namespace NKikimr::NColumnShard::NTiers {
+
+class TTierRulesInitializer: public NMetadata::IInitializationBehaviour {
+protected:
+ TVector<NMetadataInitializer::ITableModifier::TPtr> BuildModifiers() const;
+ virtual void DoPrepare(NMetadataInitializer::IInitializerInput::TPtr controller) const override;
+public:
+};
+
+}
diff --git a/ydb/core/tx/tiering/rule/manager.cpp b/ydb/core/tx/tiering/rule/manager.cpp
new file mode 100644
index 00000000000..8c08544dc93
--- /dev/null
+++ b/ydb/core/tx/tiering/rule/manager.cpp
@@ -0,0 +1,12 @@
+#include "manager.h"
+#include "initializer.h"
+
+namespace NKikimr::NColumnShard::NTiers {
+
+TTieringRulesManager::TFactory::TRegistrator<TTieringRulesManager> TTieringRulesManager::Registrator(TTieringRulesManager::GetTypeIdStatic());
+
+NMetadata::IInitializationBehaviour::TPtr TTieringRulesManager::DoGetInitializationBehaviour() const {
+ return std::make_shared<TTierRulesInitializer>();
+}
+
+}
diff --git a/ydb/core/tx/tiering/rule/manager.h b/ydb/core/tx/tiering/rule/manager.h
new file mode 100644
index 00000000000..1705199aebe
--- /dev/null
+++ b/ydb/core/tx/tiering/rule/manager.h
@@ -0,0 +1,15 @@
+#pragma once
+#include "object.h"
+
+#include <ydb/services/metadata/manager/generic_manager.h>
+
+namespace NKikimr::NColumnShard::NTiers {
+
+class TTieringRulesManager: public NMetadata::TGenericOperationsManager<TTieringRule> {
+private:
+ static TFactory::TRegistrator<TTieringRulesManager> Registrator;
+protected:
+ virtual NMetadata::IInitializationBehaviour::TPtr DoGetInitializationBehaviour() const override;
+};
+
+}
diff --git a/ydb/core/tx/tiering/rule/object.cpp b/ydb/core/tx/tiering/rule/object.cpp
new file mode 100644
index 00000000000..82c72444cdd
--- /dev/null
+++ b/ydb/core/tx/tiering/rule/object.cpp
@@ -0,0 +1,158 @@
+#include "object.h"
+#include <ydb/services/metadata/manager/ydb_value_operator.h>
+
+namespace NKikimr::NColumnShard::NTiers {
+NJson::TJsonValue TTieringRule::GetDebugJson() const {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ result.InsertValue(TDecoder::TieringRuleId, TieringRuleId);
+ result.InsertValue(TDecoder::TierName, TierName);
+ result.InsertValue(TDecoder::TablePath, TablePath);
+ result.InsertValue("tablePathId", TablePathId);
+ result.InsertValue(TDecoder::Column, Column);
+ result.InsertValue(TDecoder::DurationForEvict, DurationForEvict.ToString());
+ return result;
+}
+
+TString TTieringRule::GetStorageTablePath() {
+ return "/" + AppData()->TenantName + "/.metadata/tiering/rules";
+}
+
+void TTieringRule::AlteringPreparation(std::vector<TTieringRule>&& objects,
+ NMetadataManager::IAlterPreparationController<TTieringRule>::TPtr controller,
+ const NMetadata::IOperationsManager::TModificationContext& /*context*/)
+{
+ controller->PreparationFinished(std::move(objects));
+}
+
+NKikimr::NMetadataManager::TTableRecord TTieringRule::SerializeToRecord() const {
+ NMetadataManager::TTableRecord result;
+ result.SetColumn(TDecoder::TieringRuleId, NMetadataManager::TYDBValue::Bytes(TieringRuleId));
+ result.SetColumn(TDecoder::OwnerPath, NMetadataManager::TYDBValue::Bytes(OwnerPath));
+ result.SetColumn(TDecoder::TierName, NMetadataManager::TYDBValue::Bytes(TierName));
+ result.SetColumn(TDecoder::TablePath, NMetadataManager::TYDBValue::Bytes(TablePath));
+ result.SetColumn(TDecoder::DurationForEvict, NMetadataManager::TYDBValue::Bytes(DurationForEvict.ToString()));
+ result.SetColumn(TDecoder::Column, NMetadataManager::TYDBValue::Bytes(Column));
+ return result;
+}
+
+bool TTieringRule::DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& r) {
+ if (!decoder.Read(decoder.GetTieringRuleIdIdx(), TieringRuleId, r)) {
+ return false;
+ }
+ if (!decoder.Read(decoder.GetOwnerPathIdx(), OwnerPath, r)) {
+ return false;
+ }
+ OwnerPath = TFsPath(OwnerPath).Fix().GetPath();
+ if (!decoder.Read(decoder.GetTierNameIdx(), TierName, r)) {
+ return false;
+ }
+ if (!decoder.Read(decoder.GetTablePathIdx(), TablePath, r)) {
+ return false;
+ }
+ TablePath = TFsPath(TablePath).Fix().GetPath();
+ if (!decoder.Read(decoder.GetDurationForEvictIdx(), DurationForEvict, r)) {
+ return false;
+ }
+ if (!decoder.Read(decoder.GetColumnIdx(), Column, r)) {
+ return false;
+ }
+ return true;
+}
+
+NMetadata::TOperationParsingResult TTieringRule::BuildPatchFromSettings(const NYql::TObjectSettingsImpl& settings,
+ const NMetadata::IOperationsManager::TModificationContext& /*context*/) {
+ NKikimr::NMetadataManager::TTableRecord result;
+ result.SetColumn(TDecoder::TieringRuleId, NMetadataManager::TYDBValue::Bytes(settings.GetObjectId()));
+ {
+ auto it = settings.GetFeatures().find(TDecoder::TablePath);
+ if (it != settings.GetFeatures().end()) {
+ result.SetColumn(TDecoder::TablePath, NMetadataManager::TYDBValue::Bytes(it->second));
+ }
+ }
+ {
+ auto it = settings.GetFeatures().find(TDecoder::TierName);
+ if (it != settings.GetFeatures().end()) {
+ result.SetColumn(TDecoder::TierName, NMetadataManager::TYDBValue::Bytes(it->second));
+ }
+ }
+ {
+ auto it = settings.GetFeatures().find(TDecoder::OwnerPath);
+ if (it != settings.GetFeatures().end()) {
+ result.SetColumn(TDecoder::OwnerPath, NMetadataManager::TYDBValue::Bytes(it->second));
+ }
+ }
+ {
+ auto it = settings.GetFeatures().find(TDecoder::DurationForEvict);
+ if (it != settings.GetFeatures().end()) {
+ result.SetColumn(TDecoder::DurationForEvict, NMetadataManager::TYDBValue::Bytes(it->second));
+ }
+ }
+ {
+ auto it = settings.GetFeatures().find(TDecoder::Column);
+ if (it != settings.GetFeatures().end()) {
+ result.SetColumn(TDecoder::Column, NMetadataManager::TYDBValue::Bytes(it->second));
+ }
+ }
+ return result;
+}
+
+NJson::TJsonValue TTableTiering::GetDebugJson() const {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ result.InsertValue(TTieringRule::TDecoder::TablePath, TablePath);
+ result.InsertValue("tablePathId", TablePathId);
+ result.InsertValue(TTieringRule::TDecoder::Column, Column);
+ auto&& jsonRules = result.InsertValue("rules", NJson::JSON_ARRAY);
+ for (auto&& i : Rules) {
+ jsonRules.AppendValue(i.GetDebugJson());
+ }
+ return result;
+}
+
+void TTableTiering::AddRule(TTieringRule&& tr) {
+ if (Rules.size()) {
+ Y_VERIFY(Rules.back().GetDurationForEvict() <= tr.GetDurationForEvict());
+ if (Column != tr.GetColumn()) {
+ ALS_ERROR(NKikimrServices::TX_TIERING) << "inconsistency rule column: " <<
+ TablePath << "/" << Column << " != " << tr.GetColumn();
+ return;
+ }
+ } else {
+ Column = tr.GetColumn();
+ TablePath = tr.GetTablePath();
+ TablePathId = tr.GetTablePathId();
+ }
+ Rules.emplace_back(std::move(tr));
+}
+
+NKikimr::NOlap::TTiersInfo TTableTiering::BuildTiersInfo() const {
+ NOlap::TTiersInfo result(GetColumn());
+ for (auto&& r : Rules) {
+ result.AddTier(r.GetTierName(), Now() - r.GetDurationForEvict());
+ }
+ return result;
+}
+
+std::vector<Ydb::Column> TTieringRule::TDecoder::GetPKColumns() {
+ return
+ {
+ NMetadataManager::TYDBColumn::Bytes(TieringRuleId)
+ };
+}
+
+std::vector<Ydb::Column> TTieringRule::TDecoder::GetColumns() {
+ return
+ {
+ NMetadataManager::TYDBColumn::Bytes(TieringRuleId),
+ NMetadataManager::TYDBColumn::Bytes(OwnerPath),
+ NMetadataManager::TYDBColumn::Bytes(TierName),
+ NMetadataManager::TYDBColumn::Bytes(TablePath),
+ NMetadataManager::TYDBColumn::Bytes(Column),
+ NMetadataManager::TYDBColumn::Bytes(DurationForEvict)
+ };
+}
+
+std::vector<TString> TTieringRule::TDecoder::GetPKColumnIds() {
+ return { TieringRuleId };
+}
+
+}
diff --git a/ydb/core/tx/tiering/rule.h b/ydb/core/tx/tiering/rule/object.h
index 468ce5ec510..8b71c0e376e 100644
--- a/ydb/core/tx/tiering/rule.h
+++ b/ydb/core/tx/tiering/rule/object.h
@@ -1,17 +1,19 @@
#pragma once
-#include "common.h"
-#include "decoder.h"
-
#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/core/tx/columnshard/engines/column_engine.h>
+#include <ydb/core/tx/tiering/common/global_tier_id.h>
+#include <ydb/services/metadata/abstract/decoder.h>
#include <ydb/services/metadata/service.h>
+#include <ydb/services/metadata/manager/object.h>
+#include <ydb/services/metadata/manager/preparation_controller.h>
#include <library/cpp/json/writer/json_value.h>
namespace NKikimr::NColumnShard::NTiers {
-class TTieringRule {
+class TTieringRule: public NMetadataManager::TObject<TTieringRule> {
private:
+ YDB_ACCESSOR_DEF(TString, TieringRuleId);
YDB_ACCESSOR_DEF(TString, OwnerPath);
YDB_ACCESSOR_DEF(TString, TierName);
YDB_ACCESSOR_DEF(TString, TablePath);
@@ -19,31 +21,47 @@ private:
YDB_ACCESSOR_DEF(TString, Column);
YDB_ACCESSOR_DEF(TDuration, DurationForEvict);
public:
- bool operator<(const TTieringRule& item) const {
- return std::tie(TablePath, TierName, Column, DurationForEvict)
- < std::tie(item.TablePath, item.TierName, item.Column, item.DurationForEvict);
+ static TString GetTypeId() {
+ return "TIERING_RULE";
}
-
TGlobalTierId GetGlobalTierId() const {
return TGlobalTierId(OwnerPath, TierName);
}
+ static TString GetStorageTablePath();
+ static void AlteringPreparation(std::vector<TTieringRule>&& objects,
+ NMetadataManager::IAlterPreparationController<TTieringRule>::TPtr controller,
+ const NMetadata::IOperationsManager::TModificationContext& context);
+
+ bool operator<(const TTieringRule& item) const {
+ return std::tie(TablePath, TierName, Column, DurationForEvict, TieringRuleId)
+ < std::tie(item.TablePath, item.TierName, item.Column, item.DurationForEvict, item.TieringRuleId);
+ }
+
NJson::TJsonValue GetDebugJson() const;
+
class TDecoder: public NInternal::TDecoderBase {
private:
+ YDB_READONLY(i32, TieringRuleIdIdx, -1);
YDB_READONLY(i32, OwnerPathIdx, -1);
YDB_READONLY(i32, TablePathIdx, -1);
YDB_READONLY(i32, DurationForEvictIdx, -1);
YDB_READONLY(i32, TierNameIdx, -1);
YDB_READONLY(i32, ColumnIdx, -1);
public:
+ static inline const TString TieringRuleId = "tieringRuleId";
static inline const TString OwnerPath = "ownerPath";
static inline const TString TierName = "tierName";
static inline const TString TablePath = "tablePath";
static inline const TString DurationForEvict = "durationForEvict";
static inline const TString Column = "column";
+ static std::vector<Ydb::Column> GetPKColumns();
+ static std::vector<Ydb::Column> GetColumns();
+ static std::vector<TString> GetPKColumnIds();
+
TDecoder(const Ydb::ResultSet& rawData) {
+ TieringRuleIdIdx = GetFieldIndex(rawData, TieringRuleId);
OwnerPathIdx = GetFieldIndex(rawData, OwnerPath);
TierNameIdx = GetFieldIndex(rawData, TierName);
TablePathIdx = GetFieldIndex(rawData, TablePath);
@@ -51,26 +69,10 @@ public:
ColumnIdx = GetFieldIndex(rawData, Column);
}
};
- bool DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& r) {
- if (!decoder.Read(decoder.GetOwnerPathIdx(), OwnerPath, r)) {
- return false;
- }
- OwnerPath = TFsPath(OwnerPath).Fix().GetPath();
- if (!decoder.Read(decoder.GetTierNameIdx(), TierName, r)) {
- return false;
- }
- if (!decoder.Read(decoder.GetTablePathIdx(), TablePath, r)) {
- return false;
- }
- TablePath = TFsPath(TablePath).Fix().GetPath();
- if (!decoder.Read(decoder.GetDurationForEvictIdx(), DurationForEvict, r)) {
- return false;
- }
- if (!decoder.Read(decoder.GetColumnIdx(), Column, r)) {
- return false;
- }
- return true;
- }
+ NKikimr::NMetadataManager::TTableRecord SerializeToRecord() const;
+ bool DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& r);
+ static NMetadata::TOperationParsingResult BuildPatchFromSettings(const NYql::TObjectSettingsImpl& settings,
+ const NMetadata::IOperationsManager::TModificationContext& context);
};
class TTableTiering {
@@ -88,29 +90,9 @@ public:
}
}
NJson::TJsonValue GetDebugJson() const;
- void AddRule(TTieringRule&& tr) {
- if (Rules.size()) {
- Y_VERIFY(Rules.back().GetDurationForEvict() <= tr.GetDurationForEvict());
- if (Column != tr.GetColumn()) {
- ALS_ERROR(NKikimrServices::TX_TIERING) << "inconsistency rule column: " <<
- TablePath << "/" << Column << " != " << tr.GetColumn();
- return;
- }
- } else {
- Column = tr.GetColumn();
- TablePath = tr.GetTablePath();
- TablePathId = tr.GetTablePathId();
- }
- Rules.emplace_back(std::move(tr));
- }
+ void AddRule(TTieringRule&& tr);
- NOlap::TTiersInfo BuildTiersInfo() const {
- NOlap::TTiersInfo result(GetColumn());
- for (auto&& r : Rules) {
- result.AddTier(r.GetTierName(), Now() - r.GetDurationForEvict());
- }
- return result;
- }
+ NOlap::TTiersInfo BuildTiersInfo() const;
};
}
diff --git a/ydb/core/tx/tiering/snapshot.h b/ydb/core/tx/tiering/snapshot.h
index b913dacbb64..30f19b359d6 100644
--- a/ydb/core/tx/tiering/snapshot.h
+++ b/ydb/core/tx/tiering/snapshot.h
@@ -1,9 +1,9 @@
#pragma once
-#include "rule.h"
-#include "tier_config.h"
-
#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
+#include <ydb/core/tx/tiering/tier/object.h>
+#include <ydb/core/tx/tiering/rule/object.h>
+
#include <ydb/services/metadata/service.h>
#include <library/cpp/json/writer/json_value.h>
diff --git a/ydb/core/tx/tiering/snapshot_enrich.h b/ydb/core/tx/tiering/snapshot_enrich.h
index eca0eb893c4..2a24ba2226a 100644
--- a/ydb/core/tx/tiering/snapshot_enrich.h
+++ b/ydb/core/tx/tiering/snapshot_enrich.h
@@ -1,6 +1,4 @@
#pragma once
-#include "rule.h"
-#include "tier_config.h"
#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
diff --git a/ydb/core/tx/tiering/tier/CMakeLists.txt b/ydb/core/tx/tiering/tier/CMakeLists.txt
new file mode 100644
index 00000000000..523f34fb8c2
--- /dev/null
+++ b/ydb/core/tx/tiering/tier/CMakeLists.txt
@@ -0,0 +1,31 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(tx-tiering-tier)
+target_link_libraries(tx-tiering-tier PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ services-metadata-initializer
+ services-metadata-abstract
+)
+target_sources(tx-tiering-tier PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/tier/object.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/tier/initializer.cpp
+)
+
+add_global_library_for(tx-tiering-tier.global tx-tiering-tier)
+target_link_libraries(tx-tiering-tier.global PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ services-metadata-initializer
+ services-metadata-abstract
+)
+target_sources(tx-tiering-tier.global PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/tier/manager.cpp
+)
diff --git a/ydb/core/tx/tiering/tier/initializer.cpp b/ydb/core/tx/tiering/tier/initializer.cpp
new file mode 100644
index 00000000000..4567b99fef6
--- /dev/null
+++ b/ydb/core/tx/tiering/tier/initializer.cpp
@@ -0,0 +1,39 @@
+#include "initializer.h"
+#include "object.h"
+
+namespace NKikimr::NColumnShard::NTiers {
+
+TVector<NKikimr::NMetadataInitializer::ITableModifier::TPtr> TTiersInitializer::BuildModifiers() const {
+ TVector<NMetadataInitializer::ITableModifier::TPtr> result;
+ {
+ Ydb::Table::CreateTableRequest request;
+ request.set_session_id("");
+ request.set_path(TTierConfig::GetStorageTablePath());
+ request.add_primary_key("tierName");
+ {
+ auto& column = *request.add_columns();
+ column.set_name("ownerPath");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("tierName");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("tierConfig");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ result.emplace_back(new NMetadataInitializer::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request, "create_tiers"));
+ auto hRequest = TTierConfig::AddHistoryTableScheme(request);
+ result.emplace_back(new NMetadataInitializer::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(hRequest, "create_tiers_history"));
+ }
+ return result;
+}
+
+void TTiersInitializer::DoPrepare(NMetadataInitializer::IInitializerInput::TPtr controller) const {
+ controller->PreparationFinished(BuildModifiers());
+}
+
+}
diff --git a/ydb/core/tx/tiering/tier/initializer.h b/ydb/core/tx/tiering/tier/initializer.h
new file mode 100644
index 00000000000..3fbcd4ed52e
--- /dev/null
+++ b/ydb/core/tx/tiering/tier/initializer.h
@@ -0,0 +1,14 @@
+#pragma once
+#include <ydb/services/metadata/abstract/common.h>
+#include <ydb/services/metadata/initializer/common.h>
+
+namespace NKikimr::NColumnShard::NTiers {
+
+class TTiersInitializer: public NMetadata::IInitializationBehaviour {
+protected:
+ TVector<NMetadataInitializer::ITableModifier::TPtr> BuildModifiers() const;
+ virtual void DoPrepare(NMetadataInitializer::IInitializerInput::TPtr controller) const override;
+public:
+};
+
+}
diff --git a/ydb/core/tx/tiering/tier/manager.cpp b/ydb/core/tx/tiering/tier/manager.cpp
new file mode 100644
index 00000000000..07f662b1f5a
--- /dev/null
+++ b/ydb/core/tx/tiering/tier/manager.cpp
@@ -0,0 +1,12 @@
+#include "manager.h"
+#include "initializer.h"
+
+namespace NKikimr::NColumnShard::NTiers {
+
+TTiersManager::TFactory::TRegistrator<TTiersManager> TTiersManager::Registrator(TTiersManager::GetTypeIdStatic());
+
+NMetadata::IInitializationBehaviour::TPtr TTiersManager::DoGetInitializationBehaviour() const {
+ return std::make_shared<TTiersInitializer>();
+}
+
+}
diff --git a/ydb/core/tx/tiering/tier/manager.h b/ydb/core/tx/tiering/tier/manager.h
new file mode 100644
index 00000000000..95878bcd0fd
--- /dev/null
+++ b/ydb/core/tx/tiering/tier/manager.h
@@ -0,0 +1,16 @@
+#pragma once
+#include "object.h"
+
+#include <ydb/services/metadata/manager/generic_manager.h>
+
+namespace NKikimr::NColumnShard::NTiers {
+
+class TTiersManager: public NMetadata::TGenericOperationsManager<TTierConfig> {
+private:
+ static TFactory::TRegistrator<TTiersManager> Registrator;
+protected:
+ virtual NMetadata::IInitializationBehaviour::TPtr DoGetInitializationBehaviour() const override;
+public:
+};
+
+}
diff --git a/ydb/core/tx/tiering/tier/object.cpp b/ydb/core/tx/tiering/tier/object.cpp
new file mode 100644
index 00000000000..638d2614f7a
--- /dev/null
+++ b/ydb/core/tx/tiering/tier/object.cpp
@@ -0,0 +1,94 @@
+#include "object.h"
+#include <ydb/services/metadata/manager/ydb_value_operator.h>
+
+#include <library/cpp/json/writer/json_value.h>
+#include <library/cpp/protobuf/json/proto2json.h>
+
+namespace NKikimr::NColumnShard::NTiers {
+
+NJson::TJsonValue TTierConfig::GetDebugJson() const {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ result.InsertValue(TDecoder::OwnerPath, OwnerPath);
+ result.InsertValue(TDecoder::TierName, TierName);
+ NProtobufJson::Proto2Json(ProtoConfig, result.InsertValue(TDecoder::TierConfig, NJson::JSON_MAP));
+ return result;
+}
+
+bool TTierConfig::IsSame(const TTierConfig& item) const {
+ return TierName == item.TierName && ProtoConfig.SerializeAsString() == item.ProtoConfig.SerializeAsString();
+}
+
+bool TTierConfig::DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& r) {
+ if (!decoder.Read(decoder.GetOwnerPathIdx(), OwnerPath, r)) {
+ return false;
+ }
+ OwnerPath = TFsPath(OwnerPath).Fix().GetPath();
+ if (!decoder.Read(decoder.GetTierNameIdx(), TierName, r)) {
+ return false;
+ }
+ if (!decoder.ReadDebugProto(decoder.GetTierConfigIdx(), ProtoConfig, r)) {
+ return false;
+ }
+ return true;
+}
+
+NKikimr::NMetadataManager::TTableRecord TTierConfig::SerializeToRecord() const {
+ NMetadataManager::TTableRecord result;
+ result.SetColumn(TDecoder::OwnerPath, NMetadataManager::TYDBValue::Bytes(OwnerPath));
+ result.SetColumn(TDecoder::TierName, NMetadataManager::TYDBValue::Bytes(TierName));
+ result.SetColumn(TDecoder::TierConfig, NMetadataManager::TYDBValue::Bytes(ProtoConfig.DebugString()));
+ return result;
+}
+
+TString TTierConfig::GetStorageTablePath() {
+ return "/" + AppData()->TenantName + "/.metadata/tiering/tiers";
+}
+
+void TTierConfig::AlteringPreparation(std::vector<TTierConfig>&& objects,
+ NMetadataManager::IAlterPreparationController<TTierConfig>::TPtr controller,
+ const NMetadata::IOperationsManager::TModificationContext& /*context*/) {
+ controller->PreparationFinished(std::move(objects));
+}
+
+NMetadata::TOperationParsingResult TTierConfig::BuildPatchFromSettings(const NYql::TObjectSettingsImpl& settings,
+ const NMetadata::IOperationsManager::TModificationContext& /*context*/) {
+ NKikimr::NMetadataManager::TTableRecord result;
+ result.SetColumn(TDecoder::TierName, NMetadataManager::TYDBValue::Bytes(settings.GetObjectId()));
+ {
+ auto it = settings.GetFeatures().find(TDecoder::OwnerPath);
+ if (it != settings.GetFeatures().end()) {
+ result.SetColumn(TDecoder::OwnerPath, NMetadataManager::TYDBValue::Bytes(it->second));
+ }
+ }
+ {
+ auto it = settings.GetFeatures().find(TDecoder::TierConfig);
+ if (it != settings.GetFeatures().end()) {
+ TTierProto proto;
+ if (!::google::protobuf::TextFormat::ParseFromString(it->second, &proto)) {
+ return "incorrect proto format";
+ } else {
+ result.SetColumn(TDecoder::TierConfig, NMetadataManager::TYDBValue::Bytes(it->second));
+ }
+ }
+ }
+ return result;
+}
+
+std::vector<Ydb::Column> TTierConfig::TDecoder::GetPKColumns() {
+ return { NMetadataManager::TYDBColumn::Bytes(TierName) };
+}
+
+std::vector<Ydb::Column> TTierConfig::TDecoder::GetColumns() {
+ return
+ {
+ NMetadataManager::TYDBColumn::Bytes(OwnerPath),
+ NMetadataManager::TYDBColumn::Bytes(TierName),
+ NMetadataManager::TYDBColumn::Bytes(TierConfig)
+ };
+}
+
+std::vector<TString> TTierConfig::TDecoder::GetPKColumnIds() {
+ return { TierName };
+}
+
+}
diff --git a/ydb/core/tx/tiering/tier_config.h b/ydb/core/tx/tiering/tier/object.h
index 4a36df4ab01..3d0aef8392b 100644
--- a/ydb/core/tx/tiering/tier_config.h
+++ b/ydb/core/tx/tiering/tier/object.h
@@ -1,18 +1,19 @@
#pragma once
-#include "common.h"
-#include "decoder.h"
-
-#include <ydb/services/metadata/service.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>
+#include <ydb/core/tx/tiering/common/global_tier_id.h>
+#include <ydb/services/metadata/abstract/decoder.h>
+#include <ydb/services/metadata/manager/preparation_controller.h>
+#include <ydb/services/metadata/manager/table_record.h>
+#include <ydb/services/metadata/manager/object.h>
+#include <ydb/services/metadata/service.h>
#include <library/cpp/json/writer/json_value.h>
namespace NKikimr::NColumnShard::NTiers {
-class TTierConfig {
+class TTierConfig: public NMetadataManager::TObject<TTierConfig> {
private:
using TTierProto = NKikimrSchemeOp::TStorageTierConfig;
- using TS3SettingsProto = NKikimrSchemeOp::TS3Settings;
YDB_ACCESSOR_DEF(TString, OwnerPath);
YDB_ACCESSOR_DEF(TString, TierName);
YDB_ACCESSOR_DEF(TTierProto, ProtoConfig);
@@ -35,6 +36,9 @@ public:
static inline const TString OwnerPath = "ownerPath";
static inline const TString TierName = "tierName";
static inline const TString TierConfig = "tierConfig";
+ static std::vector<Ydb::Column> GetPKColumns();
+ static std::vector<Ydb::Column> GetColumns();
+ static std::vector<TString> GetPKColumnIds();
TDecoder(const Ydb::ResultSet& rawData) {
OwnerPathIdx = GetFieldIndex(rawData, OwnerPath);
TierNameIdx = GetFieldIndex(rawData, TierName);
@@ -42,6 +46,14 @@ public:
}
};
bool DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& r);
+ NMetadataManager::TTableRecord SerializeToRecord() const;
+ static NMetadata::TOperationParsingResult BuildPatchFromSettings(const NYql::TObjectSettingsImpl& settings,
+ const NMetadata::IOperationsManager::TModificationContext& context);
+
+ static TString GetStorageTablePath();
+ static void AlteringPreparation(std::vector<TTierConfig>&& objects,
+ NMetadataManager::IAlterPreparationController<TTierConfig>::TPtr controller,
+ const NMetadata::IOperationsManager::TModificationContext& context);
TGlobalTierId GetGlobalTierId() const {
return TGlobalTierId(OwnerPath, TierName);
@@ -52,6 +64,9 @@ public:
}
bool IsSame(const TTierConfig& item) const;
NJson::TJsonValue GetDebugJson() const;
+ static TString GetTypeId() {
+ return "TIER";
+ }
};
}
diff --git a/ydb/core/tx/tiering/tier_config.cpp b/ydb/core/tx/tiering/tier_config.cpp
deleted file mode 100644
index 1e4b7085d05..00000000000
--- a/ydb/core/tx/tiering/tier_config.cpp
+++ /dev/null
@@ -1,33 +0,0 @@
-#include "tier_config.h"
-#include <library/cpp/json/writer/json_value.h>
-#include <library/cpp/protobuf/json/proto2json.h>
-
-namespace NKikimr::NColumnShard::NTiers {
-
-NJson::TJsonValue TTierConfig::GetDebugJson() const {
- NJson::TJsonValue result = NJson::JSON_MAP;
- result.InsertValue(TDecoder::OwnerPath, OwnerPath);
- result.InsertValue(TDecoder::TierName, TierName);
- NProtobufJson::Proto2Json(ProtoConfig, result.InsertValue(TDecoder::TierConfig, NJson::JSON_MAP));
- return result;
-}
-
-bool TTierConfig::IsSame(const TTierConfig& item) const {
- return TierName == item.TierName && ProtoConfig.SerializeAsString() == item.ProtoConfig.SerializeAsString();
-}
-
-bool TTierConfig::DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& r) {
- if (!decoder.Read(decoder.GetOwnerPathIdx(), OwnerPath, r)) {
- return false;
- }
- OwnerPath = TFsPath(OwnerPath).Fix().GetPath();
- if (!decoder.Read(decoder.GetTierNameIdx(), TierName, r)) {
- return false;
- }
- if (!decoder.ReadDebugProto(decoder.GetTierConfigIdx(), ProtoConfig, r)) {
- return false;
- }
- return true;
-}
-
-}
diff --git a/ydb/core/tx/tiering/ut/ut_tiers.cpp b/ydb/core/tx/tiering/ut/ut_tiers.cpp
index 04d442bd9fe..3c657d30766 100644
--- a/ydb/core/tx/tiering/ut/ut_tiers.cpp
+++ b/ydb/core/tx/tiering/ut/ut_tiers.cpp
@@ -8,6 +8,10 @@
#include <ydb/core/wrappers/fake_storage.h>
#include <ydb/library/accessor/accessor.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+#include <ydb/services/metadata/manager/alter.h>
+#include <ydb/services/metadata/manager/common.h>
+#include <ydb/services/metadata/manager/table_record.h>
+#include <ydb/services/metadata/manager/ydb_value_operator.h>
#include <ydb/services/metadata/service.h>
#include <library/cpp/actors/core/av_bootstrapped.h>
@@ -69,7 +73,8 @@ public:
Function: %s
Columns: %s
}
- })", tableName.c_str(), tableShardsCount, shardingFunction.c_str(), shardingColumns.c_str()));
+ }
+ )", tableName.c_str(), tableShardsCount, shardingFunction.c_str(), shardingColumns.c_str()));
}
};
@@ -114,10 +119,16 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
TActorId ProviderId;
TInstant Start;
YDB_READONLY_FLAG(Found, false);
- YDB_ACCESSOR(ui32, TieringsCount, 1);
+ YDB_ACCESSOR(ui32, ExpectedTieringsCount, 1);
+ YDB_ACCESSOR(ui32, ExpectedTiersCount, 1);
using TKeyCheckers = TMap<NTiers::TGlobalTierId, TJsonChecker>;
YDB_ACCESSOR_DEF(TKeyCheckers, Checkers);
public:
+ void ResetConditions() {
+ FoundFlag = false;
+ Checkers.clear();
+ }
+
STATEFN(StateInit) {
switch (ev->GetTypeRewrite()) {
hFunc(NMetadataProvider::TEvRefreshSubscriberData, Handle);
@@ -152,12 +163,12 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
auto snapshot = event->GetSnapshotAs<NTiers::TConfigsSnapshot>();
Y_VERIFY(!!snapshot);
auto* tInfo = snapshot->GetTableTiering("/Root/olapStore/olapTable");
- if (TieringsCount) {
+ if (ExpectedTieringsCount) {
if (!tInfo) {
Cerr << "tiering not found: " << snapshot->SerializeToString() << Endl;
return ;
}
- if (tInfo->GetRules().size() != TieringsCount) {
+ if (tInfo->GetRules().size() != ExpectedTieringsCount) {
Cerr << "TieringsCount incorrect: " << snapshot->SerializeToString() << Endl;
return;
}
@@ -176,6 +187,10 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
Cerr << "PathId: " << tiering.GetTablePathId() << Endl;
}
}
+ if (ExpectedTiersCount != snapshot->GetTierConfigs().size()) {
+ Cerr << "TieringsCount incorrect: " << snapshot->SerializeToString() << Endl;
+ return;
+ }
for (auto&& i : Checkers) {
auto value = snapshot->GetValue(i.first);
NJson::TJsonValue jsonData;
@@ -195,13 +210,26 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
void Bootstrap() {
ProviderId = NMetadataProvider::MakeServiceId(1);
- ExternalDataManipulation = std::make_shared<NTiers::TSnapshotConstructor>("/Root/olapStore");
+ ExternalDataManipulation = std::make_shared<NTiers::TSnapshotConstructor>();
Become(&TThis::StateInit);
Sender<NMetadataProvider::TEvSubscribeExternal>(ExternalDataManipulation).SendTo(ProviderId);
Start = Now();
}
};
+ class TEmulatorAlterController: public NMetadataManager::IAlterController {
+ private:
+ YDB_READONLY_FLAG(Finished, false);
+ public:
+ virtual void AlterProblem(const TString& errorMessage) override {
+ Cerr << errorMessage << Endl;
+ Y_VERIFY(false);
+ }
+ virtual void AlterFinished() override {
+ FinishedFlag = true;
+ }
+ };
+
Y_UNIT_TEST(DSConfigsStub) {
TPortManager pm;
@@ -236,15 +264,55 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
runtime.SimulateSleep(TDuration::Seconds(10));
Cerr << "Initialization finished" << Endl;
- lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/tiers` (ownerPath, tierName, tierConfig) "
- "VALUES ('/Root/olapStore', 'tier1', '" + ConfigProtoStr + "')");
- lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/rules` (ownerPath, tierName, tablePath, column, durationForEvict) "
- "VALUES ('/Root/olapStore', 'tier1', '/Root/olapStore/olapTable', 'timestamp', '10d')");
- const TInstant start = Now();
- while (!emulator->IsFound() && Now() - start < TDuration::Seconds(2000)) {
- runtime.SimulateSleep(TDuration::Seconds(1));
+ lHelper.StartSchemaRequest("CREATE OBJECT tier1 ( "
+ "TYPE TIER) WITH (tierConfig = `" + ConfigProtoStr + "`, ownerPath = `/Root/olapStore`)");
+ lHelper.StartSchemaRequest("CREATE OBJECT tier1 ("
+ "TYPE TIERING_RULE) WITH (tierName = tier1, tablePath = `/Root/olapStore/olapTable`, "
+ "ownerPath = `/Root/olapStore`, column = timestamp, durationForEvict = `10d` )");
+ {
+ const TInstant start = Now();
+ while (!emulator->IsFound() && Now() - start < TDuration::Seconds(2000)) {
+ runtime.SimulateSleep(TDuration::Seconds(1));
+ }
+ Y_VERIFY(emulator->IsFound());
+ }
+ {
+ lHelper.StartSchemaRequest("ALTER OBJECT tier1 ( "
+ "TYPE TIER) SET tierConfig = `" + ConfigProtoStr1 + "`");
+
+ emulator->ResetConditions();
+ emulator->MutableCheckers().emplace(NTiers::TGlobalTierId("/Root/olapStore", "tier1"), TJsonChecker("Name", "abc1"));
+ {
+ const TInstant start = Now();
+ while (!emulator->IsFound() && Now() - start < TDuration::Seconds(2000)) {
+ runtime.SimulateSleep(TDuration::Seconds(1));
+ }
+ Y_VERIFY(emulator->IsFound());
+ }
+ }
+ {
+ std::vector<NMetadataManager::TTableRecord> patches;
+ {
+ NMetadataManager::TTableRecord patch;
+ patch.SetColumn("ownerPath", NMetadataManager::TYDBValue::Bytes("/Root/olapStore"));
+ patch.SetColumn("tierName", NMetadataManager::TYDBValue::Bytes("tier1"));
+ patches.emplace_back(std::move(patch));
+ }
+
+ lHelper.StartSchemaRequest("DROP OBJECT tier1(TYPE TIER)");
+ lHelper.StartSchemaRequest("DROP OBJECT tier1(TYPE TIERING_RULE)");
+
+ emulator->ResetConditions();
+ emulator->SetExpectedTieringsCount(0);
+ emulator->SetExpectedTiersCount(0);
+ {
+ const TInstant start = Now();
+ while (!emulator->IsFound() && Now() - start < TDuration::Seconds(20)) {
+ runtime.SimulateSleep(TDuration::Seconds(1));
+ }
+ Y_VERIFY(emulator->IsFound());
+ }
}
- Y_VERIFY(emulator->IsFound());
}
}
@@ -280,34 +348,47 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
runtime.SimulateSleep(TDuration::Seconds(10));
Cerr << "Initialization finished" << Endl;
- lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/tiers` (ownerPath, tierName, tierConfig) "
- "VALUES ('/Root/olapStore', 'tier1', '" + ConfigProtoStr1 + "')");
- lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/rules` (ownerPath, tierName, tablePath, column, durationForEvict) "
- "VALUES ('/Root/olapStore', 'tier1', '/Root/olapStore/olapTable', 'timestamp', '10d')");
+ lHelper.StartSchemaRequest("CREATE OBJECT tier1 ( "
+ "TYPE TIER) WITH (tierConfig = `" + ConfigProtoStr1 + "`, ownerPath = `/Root/olapStore`)");
+ lHelper.StartSchemaRequest("CREATE OBJECT tier1 ("
+ "TYPE TIERING_RULE) "
+ "WITH (ownerPath = `/Root/olapStore`, tierName = tier1, tablePath = `/Root/olapStore/olapTable`, column = timestamp, durationForEvict = `10d` "
+ ")");
{
TTestCSEmulator emulator;
emulator.MutableCheckers().emplace(NTiers::TGlobalTierId("/Root/olapStore", "tier1"), TJsonChecker("Name", "abc1"));
- emulator.SetTieringsCount(1);
+ emulator.SetExpectedTieringsCount(1);
+ emulator.SetExpectedTiersCount(1);
emulator.CheckRuntime(runtime);
}
- lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/tiers` (ownerPath, tierName, tierConfig) "
- "VALUES ('/Root/olapStore', 'tier2', '" + ConfigProtoStr2 + "')");
- lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/rules` (ownerPath, tierName, tablePath, column, durationForEvict) "
- "VALUES ('/Root/olapStore', 'tier2', '/Root/olapStore/olapTable', 'timestamp', '20d')");
+ lHelper.StartSchemaRequest("CREATE OBJECT tier2 ( "
+ "TYPE TIER) WITH (ownerPath = `/Root/olapStore`, tierConfig = `" + ConfigProtoStr2 + "`)");
+ lHelper.StartSchemaRequest("CREATE OBJECT tier2 ("
+ "TYPE TIERING_RULE) WITH (tierName = tier2, tablePath = `/Root/olapStore/olapTable`, ownerPath = `/Root/olapStore`, column = timestamp, durationForEvict = `20d` )");
{
TTestCSEmulator emulator;
emulator.MutableCheckers().emplace(NTiers::TGlobalTierId("/Root/olapStore", "tier1"), TJsonChecker("Name", "abc1"));
emulator.MutableCheckers().emplace(NTiers::TGlobalTierId("/Root/olapStore", "tier2"), TJsonChecker("Name", "abc2"));
- emulator.SetTieringsCount(2);
+ emulator.SetExpectedTieringsCount(2);
+ emulator.SetExpectedTiersCount(2);
emulator.CheckRuntime(runtime);
}
- lHelper.StartDataRequest("DELETE FROM `/Root/.configs/tiering/tiers`");
- lHelper.StartDataRequest("DELETE FROM `/Root/.configs/tiering/rules`");
+ lHelper.StartSchemaRequest("DROP OBJECT tier2 (TYPE TIER)");
+ lHelper.StartSchemaRequest("DROP OBJECT tier2 (TYPE TIERING_RULE)");
{
TTestCSEmulator emulator;
- emulator.SetTieringsCount(0);
+ emulator.SetExpectedTieringsCount(1);
+ emulator.SetExpectedTiersCount(1);
+ emulator.CheckRuntime(runtime);
+ }
+ lHelper.StartSchemaRequest("DROP OBJECT tier1 (TYPE TIER)");
+ lHelper.StartSchemaRequest("DROP OBJECT tier1 (TYPE TIERING_RULE)");
+ {
+ TTestCSEmulator emulator;
+ emulator.SetExpectedTieringsCount(0);
+ emulator.SetExpectedTiersCount(0);
emulator.CheckRuntime(runtime);
}
@@ -381,19 +462,21 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
runtime.SimulateSleep(TDuration::Seconds(20));
Cerr << "Initialization finished" << Endl;
- lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/tiers` (ownerPath, tierName, tierConfig) "
- "VALUES ('/Root/olapStore', 'fakeTier1', '" + TierConfigProtoStr + "')");
- lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/tiers` (ownerPath, tierName, tierConfig) "
- "VALUES ('/Root/olapStore', 'fakeTier2', '" + TierConfigProtoStr + "')");
- lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/rules` (ownerPath, tierName, tablePath, column, durationForEvict) "
- "VALUES ('/Root/olapStore', 'fakeTier1', '/Root/olapStore/olapTable', 'timestamp', '10d')");
- lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/rules` (ownerPath, tierName, tablePath, column, durationForEvict) "
- "VALUES ('/Root/olapStore', 'fakeTier2', '/Root/olapStore/olapTable', 'timestamp', '20d')");
+ lHelper.StartSchemaRequest("CREATE OBJECT fakeTier1 ( "
+ "TYPE TIER) WITH (tierConfig = `" + TierConfigProtoStr + "`, ownerPath = `/Root/olapStore`)");
+ lHelper.StartSchemaRequest("CREATE OBJECT fakeTier1 ("
+ "TYPE TIERING_RULE) WITH (tierName = fakeTier1, ownerPath = `/Root/olapStore`, tablePath = `/Root/olapStore/olapTable`, column = timestamp, durationForEvict = `10d` )");
+
+ lHelper.StartSchemaRequest("CREATE OBJECT fakeTier2 ( "
+ "TYPE TIER) WITH (tierConfig = `" + TierConfigProtoStr + "`, ownerPath = `/Root/olapStore`)");
+ lHelper.StartSchemaRequest("CREATE OBJECT fakeTier2 ("
+ "TYPE TIERING_RULE) WITH (tierName = fakeTier2, ownerPath = `/Root/olapStore`, tablePath = `/Root/olapStore/olapTable`, column = timestamp, durationForEvict = `20d` )");
{
TTestCSEmulator emulator;
emulator.MutableCheckers().emplace(NTiers::TGlobalTierId("/Root/olapStore", "fakeTier1"), TJsonChecker("Name", "fakeTier"));
emulator.MutableCheckers().emplace(NTiers::TGlobalTierId("/Root/olapStore", "fakeTier2"), TJsonChecker("ObjectStorage.Endpoint", TierEndpoint));
- emulator.SetTieringsCount(2);
+ emulator.SetExpectedTieringsCount(2);
+ emulator.SetExpectedTiersCount(2);
emulator.CheckRuntime(runtime);
}
Cerr << "Insert..." << Endl;
diff --git a/ydb/services/bg_tasks/abstract/task.h b/ydb/services/bg_tasks/abstract/task.h
index eb87ea5e1b9..a796e671118 100644
--- a/ydb/services/bg_tasks/abstract/task.h
+++ b/ydb/services/bg_tasks/abstract/task.h
@@ -5,8 +5,8 @@
#include <ydb/core/base/appdata.h>
#include <ydb/core/base/events.h>
-#include <ydb/core/tx/tiering/decoder.h>
#include <ydb/library/accessor/accessor.h>
+#include <ydb/services/metadata/abstract/decoder.h>
#include <library/cpp/actors/core/events.h>
#include <library/cpp/object_factory/object_factory.h>
diff --git a/ydb/services/bg_tasks/ds_table/CMakeLists.txt b/ydb/services/bg_tasks/ds_table/CMakeLists.txt
index 5fecb1aa1bc..573f38f4356 100644
--- a/ydb/services/bg_tasks/ds_table/CMakeLists.txt
+++ b/ydb/services/bg_tasks/ds_table/CMakeLists.txt
@@ -32,4 +32,5 @@ target_sources(services-bg_tasks-ds_table PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/add_tasks.cpp
${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/task_enabled.cpp
${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/lock_pinger.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/initialization.cpp
)
diff --git a/ydb/services/bg_tasks/ds_table/executor.cpp b/ydb/services/bg_tasks/ds_table/executor.cpp
index 5cae73625fc..c8be62c0f49 100644
--- a/ydb/services/bg_tasks/ds_table/executor.cpp
+++ b/ydb/services/bg_tasks/ds_table/executor.cpp
@@ -5,82 +5,24 @@
#include "task_executor.h"
#include "task_enabled.h"
#include "fetch_tasks.h"
+#include "initialization.h"
-namespace NKikimr::NBackgroundTasks {
+#include <ydb/services/metadata/initializer/fetcher.h>
+#include <ydb/services/metadata/initializer/manager.h>
+#include <ydb/services/metadata/service.h>
-TVector<NMetadataInitializer::ITableModifier::TPtr> TExecutor::BuildModifiers() const {
- const TString tableName = Config.GetTablePath();
- TVector<NMetadataInitializer::ITableModifier::TPtr> result;
- {
- Ydb::Table::CreateTableRequest request;
- request.set_session_id("");
- request.set_path(tableName);
- request.add_primary_key("id");
- {
- auto& column = *request.add_columns();
- column.set_name("id");
- column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
- }
- {
- auto& column = *request.add_columns();
- column.set_name("enabled");
- column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::BOOL);
- }
- {
- auto& column = *request.add_columns();
- column.set_name("class");
- column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
- }
- {
- auto& column = *request.add_columns();
- column.set_name("executorId");
- column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
- }
- {
- auto& column = *request.add_columns();
- column.set_name("lastPing");
- column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT32);
- }
- {
- auto& column = *request.add_columns();
- column.set_name("startInstant");
- column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT32);
- }
- {
- auto& column = *request.add_columns();
- column.set_name("constructInstant");
- column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT32);
- }
- {
- auto& column = *request.add_columns();
- column.set_name("activity");
- column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
- }
- {
- auto& column = *request.add_columns();
- column.set_name("scheduler");
- column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
- }
- {
- auto& column = *request.add_columns();
- column.set_name("state");
- column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
- }
- result.emplace_back(new NMetadataInitializer::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request));
- }
- return result;
-}
+namespace NKikimr::NBackgroundTasks {
void TExecutor::Handle(TEvStartAssign::TPtr& /*ev*/) {
ALS_DEBUG(NKikimrServices::BG_TASKS) << "start assign";
if (Config.GetMaxInFlight() > CurrentTaskIds.size()) {
- Register(new TAssignTasksActor(Config.GetMaxInFlight() - CurrentTaskIds.size(), Controller, ExecutorId));
+ Register(new TAssignTasksActor(Config.GetMaxInFlight() - CurrentTaskIds.size(), InternalController, ExecutorId));
}
}
void TExecutor::Handle(TEvAssignFinished::TPtr& /*ev*/) {
ALS_DEBUG(NKikimrServices::BG_TASKS) << "assign finished";
- Register(new TFetchTasksActor(CurrentTaskIds, ExecutorId, Controller));
+ Register(new TFetchTasksActor(CurrentTaskIds, ExecutorId, InternalController));
}
void TExecutor::Handle(TEvFetchingFinished::TPtr& /*ev*/) {
@@ -96,7 +38,7 @@ void TExecutor::Handle(TEvLockPingerFinished::TPtr& /*ev*/) {
void TExecutor::Handle(TEvLockPingerStart::TPtr& /*ev*/) {
ALS_DEBUG(NKikimrServices::BG_TASKS) << "pinger start";
if (CurrentTaskIds.size()) {
- Register(new TLockPingerActor(Controller, CurrentTaskIds));
+ Register(new TLockPingerActor(InternalController, CurrentTaskIds));
} else {
Schedule(Config.GetPingPeriod(), new TEvLockPingerStart);
}
@@ -105,7 +47,7 @@ void TExecutor::Handle(TEvLockPingerStart::TPtr& /*ev*/) {
void TExecutor::Handle(TEvTaskFetched::TPtr& ev) {
ALS_DEBUG(NKikimrServices::BG_TASKS) << "task fetched";
if (CurrentTaskIds.emplace(ev->Get()->GetTask().GetId()).second) {
- Register(new TTaskExecutor(ev->Get()->GetTask(), Controller));
+ Register(new TTaskExecutor(ev->Get()->GetTask(), InternalController));
}
}
@@ -118,24 +60,37 @@ void TExecutor::Handle(TEvTaskExecutorFinished::TPtr& ev) {
void TExecutor::Handle(TEvAddTask::TPtr& ev) {
ALS_DEBUG(NKikimrServices::BG_TASKS) << "add task";
- Register(new TAddTasksActor(Controller, ev->Get()->GetTask(), ev->Sender));
+ Register(new TAddTasksActor(InternalController, ev->Get()->GetTask(), ev->Sender));
}
void TExecutor::Handle(TEvUpdateTaskEnabled::TPtr& ev) {
ALS_DEBUG(NKikimrServices::BG_TASKS) << "start task";
- Register(new TUpdateTaskEnabledActor(Controller, ev->Get()->GetTaskId(), ev->Get()->GetEnabled(), ev->Sender));
-}
-
-void TExecutor::RegisterState() {
- Controller = std::make_shared<TExecutorController>(SelfId(), Config);
- Become(&TExecutor::StateMain);
+ Register(new TUpdateTaskEnabledActor(InternalController, ev->Get()->GetTaskId(), ev->Get()->GetEnabled(), ev->Sender));
}
-void TExecutor::OnInitialized() {
+void TExecutor::Handle(NMetadataInitializer::TEvInitializationFinished::TPtr& /*ev*/) {
Sender<TEvStartAssign>().SendTo(SelfId());
Schedule(Config.GetPingPeriod(), new TEvLockPingerStart);
}
+void TExecutor::Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev) {
+ auto snapshot = ev->Get()->GetValidatedSnapshotAs<NMetadataInitializer::TSnapshot>();
+ auto b = std::make_shared<TBGTasksInitializer>(Config);
+ Register(new NMetadataInitializer::TDSAccessorInitialized(Config.GetRequestConfig(), "bg_tasks", b, InternalController, snapshot));
+}
+
+void TExecutor::Bootstrap() {
+ InternalController = std::make_shared<TExecutorController>(SelfId(), Config);
+ Become(&TExecutor::StateMain);
+ auto manager = std::make_shared<NMetadataInitializer::TFetcher>();
+ if (NMetadataProvider::TServiceOperator::IsEnabled()) {
+ Sender<NMetadataProvider::TEvSubscribeExternal>(manager).SendTo(NMetadataProvider::MakeServiceId(SelfId().NodeId()));
+ } else {
+ auto b = std::make_shared<TBGTasksInitializer>(Config);
+ Register(new NMetadataInitializer::TDSAccessorInitialized(Config.GetRequestConfig(), "bg_tasks", b, InternalController, nullptr));
+ }
+}
+
NActors::IActor* CreateService(const TConfig& config) {
return new TExecutor(config);
}
diff --git a/ydb/services/bg_tasks/ds_table/executor.h b/ydb/services/bg_tasks/ds_table/executor.h
index b15d25f6c92..a5d334302d2 100644
--- a/ydb/services/bg_tasks/ds_table/executor.h
+++ b/ydb/services/bg_tasks/ds_table/executor.h
@@ -51,22 +51,16 @@ public:
}
};
-class TExecutor: public NMetadataInitializer::TDSAccessorInitialized {
+class TExecutor: public NActors::TActorBootstrapped<TExecutor> {
private:
- using TBase = NMetadataInitializer::TDSAccessorInitialized;
+ using TBase = NActors::TActorBootstrapped<TExecutor>;
TString TableName;
const TString ExecutorId = TGUID::CreateTimebased().AsUuidString();
const TConfig Config;
std::set<TString> CurrentTaskIds;
- TExecutorController::TPtr Controller;
+ TExecutorController::TPtr InternalController;
protected:
- virtual void RegisterState() override;
- virtual void OnInitialized() override;
- virtual void Prepare(NMetadataInitializer::IController::TPtr controller) override {
- controller->PreparationFinished(BuildModifiers());
- }
- TVector<NMetadataInitializer::ITableModifier::TPtr> BuildModifiers() const;
-
+ void Handle(NMetadataInitializer::TEvInitializationFinished::TPtr& ev);
void Handle(TEvStartAssign::TPtr& ev);
void Handle(TEvAssignFinished::TPtr& ev);
void Handle(TEvFetchingFinished::TPtr& ev);
@@ -76,9 +70,11 @@ protected:
void Handle(TEvUpdateTaskEnabled::TPtr& ev);
void Handle(TEvLockPingerStart::TPtr& ev);
void Handle(TEvLockPingerFinished::TPtr& ev);
+ void Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev);
- STFUNC(StateMain) {
+ STATEFN(StateMain) {
switch (ev->GetTypeRewrite()) {
+ hFunc(NMetadataInitializer::TEvInitializationFinished, Handle);
hFunc(TEvStartAssign, Handle);
hFunc(TEvAssignFinished, Handle);
hFunc(TEvFetchingFinished, Handle);
@@ -87,15 +83,17 @@ protected:
hFunc(TEvTaskExecutorFinished, Handle);
hFunc(TEvLockPingerStart, Handle);
hFunc(TEvLockPingerFinished, Handle);
+ hFunc(NMetadataProvider::TEvRefreshSubscriberData, Handle);
default:
- TBase::StateMain(ev, ctx);
+ break;
}
}
public:
+ void Bootstrap();
+
TExecutor(const TConfig& config)
- : TBase(config.GetRequestConfig())
- , Config(config)
+ : Config(config)
{
TServiceOperator::Register();
}
diff --git a/ydb/services/bg_tasks/ds_table/executor_controller.cpp b/ydb/services/bg_tasks/ds_table/executor_controller.cpp
index 14f9a9ac691..6d37a5f2fc7 100644
--- a/ydb/services/bg_tasks/ds_table/executor_controller.cpp
+++ b/ydb/services/bg_tasks/ds_table/executor_controller.cpp
@@ -23,6 +23,10 @@ void TExecutorController::TaskFinished(const TString& taskId) const {
ExecutorActorId.Send(ExecutorActorId, new TEvTaskExecutorFinished(taskId));
}
+void TExecutorController::InitializationFinished(const TString& id) const {
+ ExecutorActorId.Send(ExecutorActorId, new NMetadataInitializer::TEvInitializationFinished(id));
+}
+
void TExecutorController::LockPingerFinished() const {
ExecutorActorId.Send(ExecutorActorId, new TEvLockPingerFinished());
}
diff --git a/ydb/services/bg_tasks/ds_table/executor_controller.h b/ydb/services/bg_tasks/ds_table/executor_controller.h
index 3d3aebb9cd3..cf831712c07 100644
--- a/ydb/services/bg_tasks/ds_table/executor_controller.h
+++ b/ydb/services/bg_tasks/ds_table/executor_controller.h
@@ -1,6 +1,8 @@
#pragma once
#include "config.h"
+#include <ydb/services/metadata/initializer/common.h>
+
#include <library/cpp/actors/core/actor.h>
#include <library/cpp/actors/core/actorid.h>
@@ -8,7 +10,7 @@ namespace NKikimr::NBackgroundTasks {
class TTask;
-class TExecutorController {
+class TExecutorController: public NMetadataInitializer::IInitializerOutput {
private:
const NActors::TActorIdentity ExecutorActorId;
YDB_READONLY_DEF(TConfig, Config);
@@ -29,6 +31,7 @@ public:
return Config.GetRequestConfig();
}
+ virtual void InitializationFinished(const TString& id) const override;
void LockPingerFinished() const;
void TaskFetched(const TTask& task) const;
void TaskFinished(const TString& taskId) const;
diff --git a/ydb/services/bg_tasks/ds_table/initialization.cpp b/ydb/services/bg_tasks/ds_table/initialization.cpp
new file mode 100644
index 00000000000..efcf2d249ba
--- /dev/null
+++ b/ydb/services/bg_tasks/ds_table/initialization.cpp
@@ -0,0 +1,68 @@
+#include "initialization.h"
+
+namespace NKikimr::NBackgroundTasks {
+
+void TBGTasksInitializer::DoPrepare(NMetadataInitializer::IInitializerInput::TPtr controller) const {
+ const TString tableName = Config.GetTablePath();
+ TVector<NMetadataInitializer::ITableModifier::TPtr> result;
+ {
+ Ydb::Table::CreateTableRequest request;
+ request.set_session_id("");
+ request.set_path(tableName);
+ request.add_primary_key("id");
+ {
+ auto& column = *request.add_columns();
+ column.set_name("id");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("enabled");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::BOOL);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("class");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("executorId");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("lastPing");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT32);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("startInstant");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT32);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("constructInstant");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT32);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("activity");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("scheduler");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("state");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ result.emplace_back(new NMetadataInitializer::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request, "create"));
+ }
+ controller->PreparationFinished(result);
+}
+
+}
diff --git a/ydb/services/bg_tasks/ds_table/initialization.h b/ydb/services/bg_tasks/ds_table/initialization.h
new file mode 100644
index 00000000000..aeb4ac84ac3
--- /dev/null
+++ b/ydb/services/bg_tasks/ds_table/initialization.h
@@ -0,0 +1,21 @@
+#pragma once
+#include "config.h"
+
+#include <ydb/services/metadata/abstract/common.h>
+
+namespace NKikimr::NBackgroundTasks {
+
+class TBGTasksInitializer: public NMetadata::IInitializationBehaviour {
+private:
+ const TConfig Config;
+protected:
+ virtual void DoPrepare(NMetadataInitializer::IInitializerInput::TPtr controller) const override;
+public:
+ TBGTasksInitializer(const TConfig& config)
+ : Config(config)
+ {
+
+ }
+};
+
+}
diff --git a/ydb/services/metadata/CMakeLists.txt b/ydb/services/metadata/CMakeLists.txt
index 55a81045b7a..96609b8fa6b 100644
--- a/ydb/services/metadata/CMakeLists.txt
+++ b/ydb/services/metadata/CMakeLists.txt
@@ -9,7 +9,9 @@
add_subdirectory(abstract)
add_subdirectory(ds_table)
add_subdirectory(initializer)
+add_subdirectory(manager)
add_subdirectory(request)
+add_subdirectory(secret)
add_library(ydb-services-metadata)
target_link_libraries(ydb-services-metadata PUBLIC
@@ -17,6 +19,7 @@ target_link_libraries(ydb-services-metadata PUBLIC
yutil
cpp-actors-core
services-metadata-abstract
+ services-metadata-manager
)
target_sources(ydb-services-metadata PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/metadata/service.cpp
diff --git a/ydb/services/metadata/abstract/CMakeLists.txt b/ydb/services/metadata/abstract/CMakeLists.txt
index 51ed2ede54a..4da40e6d1ff 100644
--- a/ydb/services/metadata/abstract/CMakeLists.txt
+++ b/ydb/services/metadata/abstract/CMakeLists.txt
@@ -19,5 +19,8 @@ target_link_libraries(services-metadata-abstract PUBLIC
)
target_sources(services-metadata-abstract PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/metadata/abstract/common.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/abstract/manager.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/abstract/decoder.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/abstract/fetcher.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/abstract/kqp_common.cpp
)
diff --git a/ydb/services/metadata/abstract/common.cpp b/ydb/services/metadata/abstract/common.cpp
index 7662c8de1c1..fa5484fdbea 100644
--- a/ydb/services/metadata/abstract/common.cpp
+++ b/ydb/services/metadata/abstract/common.cpp
@@ -1,22 +1,5 @@
#include "common.h"
-#include <util/string/join.h>
namespace NKikimr::NMetadataProvider {
-TString ISnapshotParser::GetSnapshotId() const {
- if (!SnapshotId) {
- SnapshotId = JoinSeq(",", GetTables());
- }
- return *SnapshotId;
-}
-
-ISnapshot::TPtr ISnapshotParser::ParseSnapshot(const Ydb::Table::ExecuteQueryResult& rawData, const TInstant actuality) const {
- ISnapshot::TPtr result = CreateSnapshot(actuality);
- Y_VERIFY(result);
- if (!result->DeserializeFromResultSet(rawData)) {
- return nullptr;
- }
- return result;
-}
-
}
diff --git a/ydb/services/metadata/abstract/common.h b/ydb/services/metadata/abstract/common.h
index 259a384bb7e..f3cabd77350 100644
--- a/ydb/services/metadata/abstract/common.h
+++ b/ydb/services/metadata/abstract/common.h
@@ -1,14 +1,15 @@
#pragma once
+#include "fetcher.h"
+
#include <library/cpp/actors/core/actor.h>
#include <library/cpp/actors/core/actorid.h>
#include <library/cpp/actors/core/events.h>
#include <library/cpp/actors/core/actor_virtual.h>
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/object_factory/object_factory.h>
#include <ydb/core/base/events.h>
#include <ydb/library/accessor/accessor.h>
-#include <ydb/public/api/protos/ydb_table.pb.h>
-#include <ydb/services/metadata/initializer/common.h>
namespace NKikimr::NMetadataProvider {
@@ -21,80 +22,13 @@ enum EEvSubscribe {
EvUnsubscribeLocal,
EvSubscribeExternal,
EvUnsubscribeExternal,
+ EvYQLResponse,
+ EvAlterObjects,
EvEnd
};
static_assert(EEvSubscribe::EvEnd < EventSpaceEnd(TKikimrEvents::ES_METADATA_PROVIDER), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_METADATA_PROVIDER)");
-class ISnapshot;
-
-class ISnapshotAcceptorController {
-public:
- using TPtr = std::shared_ptr<ISnapshotAcceptorController>;
- virtual ~ISnapshotAcceptorController() = default;
- virtual void Enriched(std::shared_ptr<ISnapshot> enrichedSnapshot) = 0;
- virtual void EnrichProblem(const TString& errorMessage) = 0;
-};
-
-class ISnapshot {
-private:
- YDB_READONLY_DEF(TInstant, Actuality);
-protected:
- virtual bool DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawData) = 0;
- virtual TString DoSerializeToString() const = 0;
-public:
- using TPtr = std::shared_ptr<ISnapshot>;
- ISnapshot(const TInstant actuality)
- : Actuality(actuality) {
-
- }
-
- bool DeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawData) {
- return DoDeserializeFromResultSet(rawData);
- }
-
- TString SerializeToString() const {
- return DoSerializeToString();
- }
-
- virtual ~ISnapshot() = default;
-};
-
-class ISnapshotParser {
-protected:
- virtual ISnapshot::TPtr CreateSnapshot(const TInstant actuality) const = 0;
- virtual void DoPrepare(NMetadataInitializer::IController::TPtr controller) const = 0;
- virtual const TVector<TString>& DoGetTables() const = 0;
- mutable std::optional<TString> SnapshotId;
-public:
- using TPtr = std::shared_ptr<ISnapshotParser>;
-
- TString GetSnapshotId() const;
- ISnapshot::TPtr ParseSnapshot(const Ydb::Table::ExecuteQueryResult& rawData, const TInstant actuality) const;
-
- void Prepare(NMetadataInitializer::IController::TPtr controller) const {
- return DoPrepare(controller);
- }
-
- virtual void EnrichSnapshotData(ISnapshot::TPtr original, ISnapshotAcceptorController::TPtr controller) const {
- controller->Enriched(original);
- }
-
- const TVector<TString>& GetTables() const {
- return DoGetTables();
- }
-
- virtual ~ISnapshotParser() = default;
-};
-
-template <class TSnapshot>
-class TGenericSnapshotParser: public ISnapshotParser {
-protected:
- virtual ISnapshot::TPtr CreateSnapshot(const TInstant actuality) const override {
- return std::make_shared<TSnapshot>(actuality);
- }
-};
-
class TEvRefreshSubscriberData: public NActors::TEventLocal<TEvRefreshSubscriberData, EvRefreshSubscriberData> {
private:
YDB_READONLY_DEF(ISnapshot::TPtr, Snapshot);
@@ -108,6 +42,13 @@ public:
const TSnapshot* GetSnapshotAs() const {
return dynamic_cast<const TSnapshot*>(Snapshot.get());
}
+
+ template <class TSnapshot>
+ std::shared_ptr<TSnapshot> GetValidatedSnapshotAs() const {
+ auto result = dynamic_pointer_cast<TSnapshot>(Snapshot);
+ Y_VERIFY(result);
+ return result;
+ }
};
}
diff --git a/ydb/core/tx/tiering/decoder.cpp b/ydb/services/metadata/abstract/decoder.cpp
index f8b0c5a190e..8b6375ffd31 100644
--- a/ydb/core/tx/tiering/decoder.cpp
+++ b/ydb/services/metadata/abstract/decoder.cpp
@@ -2,10 +2,11 @@
#include <library/cpp/protobuf/json/proto2json.h>
#include <contrib/libs/protobuf/src/google/protobuf/text_format.h>
#include <library/cpp/actors/core/log.h>
+#include <library/cpp/json/json_reader.h>
namespace NKikimr::NInternal {
-i32 TDecoderBase::GetFieldIndex(const Ydb::ResultSet& rawData, const TString& columnId, const bool verify /*= true*/) const {
+i32 TDecoderBase::GetFieldIndex(const Ydb::ResultSet& rawData, const TString& columnId) const {
i32 idx = 0;
for (auto&& i : rawData.columns()) {
if (i.name() == columnId) {
@@ -13,16 +14,21 @@ i32 TDecoderBase::GetFieldIndex(const Ydb::ResultSet& rawData, const TString& co
}
++idx;
}
- Y_VERIFY(!verify, "incorrect columnId %s", columnId.data());
return -1;
}
-bool TDecoderBase::Read(const ui32 columnIdx, TString& result, const Ydb::Value& r) const {
+bool TDecoderBase::Read(const i32 columnIdx, TString& result, const Ydb::Value& r) const {
+ if (columnIdx >= (i32)r.items().size() || columnIdx < 0) {
+ return false;
+ }
result = r.items()[columnIdx].bytes_value();
return true;
}
-bool TDecoderBase::Read(const ui32 columnIdx, TDuration& result, const Ydb::Value& r) const {
+bool TDecoderBase::Read(const i32 columnIdx, TDuration& result, const Ydb::Value& r) const {
+ if (columnIdx >= (i32)r.items().size() || columnIdx < 0) {
+ return false;
+ }
const TString& s = r.items()[columnIdx].bytes_value();
if (!TDuration::TryParse(s, result)) {
ALS_WARN(0) << "cannot parse duration for tiering: " << s;
@@ -31,7 +37,10 @@ bool TDecoderBase::Read(const ui32 columnIdx, TDuration& result, const Ydb::Valu
return true;
}
-bool TDecoderBase::Read(const ui32 columnIdx, bool& result, const Ydb::Value& r) const {
+bool TDecoderBase::Read(const i32 columnIdx, bool& result, const Ydb::Value& r) const {
+ if (columnIdx >= (i32)r.items().size() || columnIdx < 0) {
+ return false;
+ }
auto& pValue = r.items()[columnIdx];
if (pValue.has_bool_value()) {
result = pValue.bool_value();
@@ -42,14 +51,17 @@ bool TDecoderBase::Read(const ui32 columnIdx, bool& result, const Ydb::Value& r)
return true;
}
-bool TDecoderBase::Read(const ui32 columnIdx, TInstant& result, const Ydb::Value& r) const {
+bool TDecoderBase::Read(const i32 columnIdx, TInstant& result, const Ydb::Value& r) const {
+ if (columnIdx >= (i32)r.items().size() || columnIdx < 0) {
+ return false;
+ }
auto& pValue = r.items()[columnIdx];
if (pValue.has_uint32_value()) {
result = TInstant::Seconds(pValue.uint32_value());
} else if (pValue.has_int64_value()) {
- result = TInstant::Seconds(pValue.int64_value());
+ result = TInstant::MicroSeconds(pValue.int64_value());
} else if (pValue.has_uint64_value()) {
- result = TInstant::Seconds(pValue.uint64_value());
+ result = TInstant::MicroSeconds(pValue.uint64_value());
} else if (pValue.has_int32_value()) {
result = TInstant::Seconds(pValue.int32_value());
} else {
@@ -59,7 +71,22 @@ bool TDecoderBase::Read(const ui32 columnIdx, TInstant& result, const Ydb::Value
return true;
}
-bool TDecoderBase::ReadDebugProto(const ui32 columnIdx, ::google::protobuf::Message& result, const Ydb::Value& r) const {
+bool TDecoderBase::ReadJson(const i32 columnIdx, NJson::TJsonValue& result, const Ydb::Value& r) const {
+ if (columnIdx >= (i32)r.items().size() || columnIdx < 0) {
+ return false;
+ }
+ const TString& s = r.items()[columnIdx].bytes_value();
+ if (!NJson::ReadJsonFastTree(s, &result)) {
+ ALS_ERROR(0) << "cannot parse json string: " << s;
+ return false;
+ }
+ return true;
+}
+
+bool TDecoderBase::ReadDebugProto(const i32 columnIdx, ::google::protobuf::Message& result, const Ydb::Value& r) const {
+ if (columnIdx >= (i32)r.items().size() || columnIdx < 0) {
+ return false;
+ }
const TString& s = r.items()[columnIdx].bytes_value();
if (!::google::protobuf::TextFormat::ParseFromString(s, &result)) {
ALS_ERROR(0) << "cannot parse proto string: " << s;
diff --git a/ydb/services/metadata/abstract/decoder.h b/ydb/services/metadata/abstract/decoder.h
new file mode 100644
index 00000000000..39977b9e681
--- /dev/null
+++ b/ydb/services/metadata/abstract/decoder.h
@@ -0,0 +1,30 @@
+#pragma once
+#include <ydb/public/api/protos/ydb_value.pb.h>
+#include <ydb/services/metadata/manager/table_record.h>
+
+#include <library/cpp/json/writer/json_value.h>
+#include <util/datetime/base.h>
+
+namespace NKikimr::NInternal {
+
+class TDecoderBase {
+protected:
+ i32 GetFieldIndex(const Ydb::ResultSet& rawData, const TString& columnId) const;
+public:
+ bool Read(const i32 columnIdx, TString& result, const Ydb::Value& r) const;
+ bool ReadDebugProto(const i32 columnIdx, ::google::protobuf::Message& result, const Ydb::Value& r) const;
+ bool ReadJson(const i32 columnIdx, NJson::TJsonValue& result, const Ydb::Value& r) const;
+ bool Read(const i32 columnIdx, TDuration& result, const Ydb::Value& r) const;
+ bool Read(const i32 columnIdx, TInstant& result, const Ydb::Value& r) const;
+ bool Read(const i32 columnIdx, bool& result, const Ydb::Value& r) const;
+
+ template <class TObject>
+ static bool DeserializeFromRecord(TObject& object, const NMetadataManager::TTableRecord& tr) {
+ auto rs = tr.BuildRecordSet();
+ Y_VERIFY(rs.rows().size() == 1);
+ typename TObject::TDecoder decoder(rs);
+ return object.DeserializeFromRecord(decoder, rs.rows()[0]);
+ }
+};
+
+}
diff --git a/ydb/services/metadata/abstract/fetcher.cpp b/ydb/services/metadata/abstract/fetcher.cpp
new file mode 100644
index 00000000000..d4d1efcd3ee
--- /dev/null
+++ b/ydb/services/metadata/abstract/fetcher.cpp
@@ -0,0 +1,24 @@
+#include "fetcher.h"
+#include <util/string/join.h>
+
+namespace NKikimr::NMetadataProvider {
+
+ISnapshot::TPtr ISnapshotParser::ParseSnapshot(const Ydb::Table::ExecuteQueryResult& rawData, const TInstant actuality) const {
+ ISnapshot::TPtr result = CreateSnapshot(actuality);
+ Y_VERIFY(result);
+ if (!result->DeserializeFromResultSet(rawData)) {
+ return nullptr;
+ }
+ return result;
+}
+
+TString ISnapshotParser::GetComponentId() const {
+ auto managers = GetManagers();
+ std::vector<TString> ids;
+ for (auto&& i : managers) {
+ ids.emplace_back(i->GetTablePath());
+ }
+ return JoinSeq("-", ids);
+}
+
+}
diff --git a/ydb/services/metadata/abstract/fetcher.h b/ydb/services/metadata/abstract/fetcher.h
new file mode 100644
index 00000000000..8e211fed613
--- /dev/null
+++ b/ydb/services/metadata/abstract/fetcher.h
@@ -0,0 +1,89 @@
+#pragma once
+#include "manager.h"
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/actorid.h>
+#include <library/cpp/actors/core/events.h>
+#include <library/cpp/actors/core/actor_virtual.h>
+#include <library/cpp/actors/core/actorsystem.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/object_factory/object_factory.h>
+#include <ydb/core/base/events.h>
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
+#include <ydb/public/api/protos/ydb_table.pb.h>
+#include <ydb/services/metadata/initializer/common.h>
+#include <ydb/services/metadata/manager/common.h>
+#include <ydb/services/metadata/manager/table_record.h>
+#include <ydb/services/metadata/manager/alter.h>
+
+namespace NKikimr::NMetadataProvider {
+
+class ISnapshot;
+
+class ISnapshotAcceptorController {
+public:
+ using TPtr = std::shared_ptr<ISnapshotAcceptorController>;
+ virtual ~ISnapshotAcceptorController() = default;
+ virtual void Enriched(std::shared_ptr<ISnapshot> enrichedSnapshot) = 0;
+ virtual void EnrichProblem(const TString& errorMessage) = 0;
+};
+
+class ISnapshot {
+private:
+ YDB_READONLY_DEF(TInstant, Actuality);
+protected:
+ virtual bool DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawData) = 0;
+ virtual TString DoSerializeToString() const = 0;
+public:
+ using TPtr = std::shared_ptr<ISnapshot>;
+ ISnapshot(const TInstant actuality)
+ : Actuality(actuality) {
+
+ }
+
+ bool DeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawData) {
+ return DoDeserializeFromResultSet(rawData);
+ }
+
+ TString SerializeToString() const {
+ return DoSerializeToString();
+ }
+
+ virtual ~ISnapshot() = default;
+};
+
+class ISnapshotParser {
+private:
+ mutable std::vector<NMetadata::IOperationsManager::TPtr> Managers;
+protected:
+ virtual ISnapshot::TPtr CreateSnapshot(const TInstant actuality) const = 0;
+ virtual std::vector<NMetadata::IOperationsManager::TPtr> DoGetManagers() const = 0;
+public:
+ using TPtr = std::shared_ptr<ISnapshotParser>;
+
+ TString GetComponentId() const;
+ ISnapshot::TPtr ParseSnapshot(const Ydb::Table::ExecuteQueryResult& rawData, const TInstant actuality) const;
+
+ virtual void EnrichSnapshotData(ISnapshot::TPtr original, ISnapshotAcceptorController::TPtr controller) const {
+ controller->Enriched(original);
+ }
+
+ const std::vector<NMetadata::IOperationsManager::TPtr>& GetManagers() const {
+ if (Managers.empty()) {
+ Managers = DoGetManagers();
+ }
+ return Managers;
+ }
+
+ virtual ~ISnapshotParser() = default;
+};
+
+template <class TSnapshot>
+class TSnapshotsManager: public ISnapshotParser {
+protected:
+ virtual ISnapshot::TPtr CreateSnapshot(const TInstant actuality) const override {
+ return std::make_shared<TSnapshot>(actuality);
+ }
+};
+
+}
diff --git a/ydb/services/metadata/abstract/manager.cpp b/ydb/services/metadata/abstract/manager.cpp
new file mode 100644
index 00000000000..849b69a83ef
--- /dev/null
+++ b/ydb/services/metadata/abstract/manager.cpp
@@ -0,0 +1,5 @@
+#include "manager.h"
+
+namespace NKikimr::NMetadata {
+
+}
diff --git a/ydb/services/metadata/abstract/manager.h b/ydb/services/metadata/abstract/manager.h
new file mode 100644
index 00000000000..a9d83521bd0
--- /dev/null
+++ b/ydb/services/metadata/abstract/manager.h
@@ -0,0 +1,96 @@
+#pragma once
+#include "kqp_common.h"
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/actorid.h>
+#include <library/cpp/actors/core/events.h>
+#include <library/cpp/actors/core/actor_virtual.h>
+#include <library/cpp/actors/core/actorsystem.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/object_factory/object_factory.h>
+
+#include <ydb/core/base/events.h>
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
+#include <ydb/public/api/protos/ydb_table.pb.h>
+#include <ydb/services/metadata/initializer/common.h>
+#include <ydb/services/metadata/manager/common.h>
+#include <ydb/services/metadata/manager/table_record.h>
+
+namespace NKikimr::NMetadata {
+
+class TOperationParsingResult {
+private:
+ YDB_READONLY_FLAG(Success, false);
+ YDB_READONLY_DEF(TString, ErrorMessage);
+ YDB_READONLY_DEF(NMetadataManager::TTableRecord, Record);
+public:
+ TOperationParsingResult(const char* errorMessage)
+ : SuccessFlag(false)
+ , ErrorMessage(errorMessage) {
+
+ }
+
+ TOperationParsingResult(const TString& errorMessage)
+ : SuccessFlag(false)
+ , ErrorMessage(errorMessage) {
+
+ }
+
+ TOperationParsingResult(NMetadataManager::TTableRecord&& record)
+ : SuccessFlag(true)
+ , Record(record) {
+
+ }
+};
+
+class IInitializationBehaviour {
+protected:
+ virtual void DoPrepare(NMetadataInitializer::IInitializerInput::TPtr controller) const = 0;
+public:
+ using TPtr = std::shared_ptr<IInitializationBehaviour>;
+ virtual ~IInitializationBehaviour() = default;
+ void Prepare(NMetadataInitializer::IInitializerInput::TPtr controller) const {
+ return DoPrepare(controller);
+ }
+};
+
+class IAlterCommand {
+private:
+ YDB_READONLY_DEF(std::vector<NMetadataManager::TTableRecord>, Records);
+ YDB_READONLY_DEF(IOperationsManager::TPtr, Manager);
+ YDB_READONLY_DEF(NMetadataManager::IAlterController::TPtr, Controller);
+ YDB_READONLY_DEF(IOperationsManager::TModificationContext, Context);
+protected:
+ virtual void DoExecute() const = 0;
+public:
+ using TPtr = std::shared_ptr<IAlterCommand>;
+ virtual ~IAlterCommand() = default;
+
+ IAlterCommand(const std::vector<NMetadataManager::TTableRecord>& records,
+ IOperationsManager::TPtr manager,
+ NMetadataManager::IAlterController::TPtr controller,
+ const IOperationsManager::TModificationContext& context)
+ : Records(records)
+ , Manager(manager)
+ , Controller(controller)
+ , Context(context) {
+
+ }
+
+ IAlterCommand(const NMetadataManager::TTableRecord& record,
+ IOperationsManager::TPtr manager,
+ NMetadataManager::IAlterController::TPtr controller,
+ const IOperationsManager::TModificationContext& context)
+ : Manager(manager)
+ , Controller(controller)
+ , Context(context) {
+ Records.emplace_back(record);
+
+ }
+
+ void Execute() const {
+ DoExecute();
+ }
+};
+
+}
diff --git a/ydb/services/metadata/ds_table/CMakeLists.txt b/ydb/services/metadata/ds_table/CMakeLists.txt
index 137a58c0d35..350ba781800 100644
--- a/ydb/services/metadata/ds_table/CMakeLists.txt
+++ b/ydb/services/metadata/ds_table/CMakeLists.txt
@@ -17,6 +17,7 @@ target_link_libraries(services-metadata-ds_table PUBLIC
core-grpc_services-base
ydb-core-grpc_services
services-metadata-initializer
+ services-metadata-secret
)
target_sources(services-metadata-ds_table PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_refresh.cpp
diff --git a/ydb/services/metadata/ds_table/accessor_refresh.cpp b/ydb/services/metadata/ds_table/accessor_refresh.cpp
index e89bd66d28f..aaf15d84a5f 100644
--- a/ydb/services/metadata/ds_table/accessor_refresh.cpp
+++ b/ydb/services/metadata/ds_table/accessor_refresh.cpp
@@ -9,22 +9,20 @@
namespace NKikimr::NMetadataProvider {
using namespace NInternal::NRequest;
-void TDSAccessorRefresher::Handle(TEvRequestResult<TDialogSelect>::TPtr& ev) {
- const TString startString = CurrentSelection.SerializeAsString();
-
- auto currentFullReply = ev->Get()->GetResult();
+void TDSAccessorRefresher::Handle(TEvYQLResponse::TPtr& ev) {
+ auto& currentFullReply = ev->Get()->GetResponse();
Ydb::Table::ExecuteQueryResult qResult;
currentFullReply.operation().result().UnpackTo(&qResult);
- Y_VERIFY((size_t)qResult.result_sets().size() == SnapshotConstructor->GetTables().size());
- *CurrentSelection.mutable_result_sets() = std::move(*qResult.mutable_result_sets());
- auto parsedSnapshot = SnapshotConstructor->ParseSnapshot(CurrentSelection, RequestedActuality);
+ Y_VERIFY((size_t)qResult.result_sets().size() == SnapshotConstructor->GetManagers().size());
+ *ProposedProto.mutable_result_sets() = std::move(*qResult.mutable_result_sets());
+ auto parsedSnapshot = SnapshotConstructor->ParseSnapshot(ProposedProto, RequestedActuality);
if (!parsedSnapshot) {
ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "cannot parse current snapshot";
}
- if (!!parsedSnapshot && CurrentSelection.SerializeAsString() != startString) {
- ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "New refresher data: " << CurrentSelection.DebugString();
- SnapshotConstructor->EnrichSnapshotData(parsedSnapshot, GetController());
+ if (!!parsedSnapshot && CurrentSelection.SerializeAsString() != ProposedProto.SerializeAsString()) {
+ ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "New refresher data: " << ProposedProto.DebugString();
+ SnapshotConstructor->EnrichSnapshotData(parsedSnapshot, InternalController);
} else {
Schedule(Config.GetRefreshPeriod(), new TEvRefresh());
}
@@ -34,6 +32,7 @@ void TDSAccessorRefresher::Handle(TEvEnrichSnapshotResult::TPtr& ev) {
RequestedActuality = TInstant::Zero();
Schedule(Config.GetRefreshPeriod(), new TEvRefresh());
CurrentSnapshot = ev->Get()->GetEnrichedSnapshot();
+ *CurrentSelection.mutable_result_sets() = std::move(*ProposedProto.mutable_result_sets());
OnSnapshotModified();
}
@@ -43,48 +42,27 @@ void TDSAccessorRefresher::Handle(TEvEnrichSnapshotProblem::TPtr& ev) {
ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "enrich problem: " << ev->Get()->GetErrorText();
}
-void TDSAccessorRefresher::Handle(TEvRequestResult<TDialogCreateSession>::TPtr& ev) {
- Ydb::Table::CreateSessionResponse currentFullReply = ev->Get()->GetResult();
- Ydb::Table::CreateSessionResult session;
- currentFullReply.operation().result().UnpackTo(&session);
- const TString sessionId = session.session_id();
- Y_VERIFY(sessionId);
- Ydb::Table::ExecuteDataQueryRequest request;
+void TDSAccessorRefresher::Handle(TEvRefresh::TPtr& /*ev*/) {
TStringBuilder sb;
- auto& tables = SnapshotConstructor->GetTables();
- Y_VERIFY(tables.size());
- for (auto&& i : tables) {
- sb << "SELECT * FROM `" + EscapeC(i) + "`;";
+ auto& managers = SnapshotConstructor->GetManagers();
+ Y_VERIFY(managers.size());
+ for (auto&& i : managers) {
+ sb << "SELECT * FROM `" + EscapeC(i->GetTablePath()) + "`;";
}
-
- request.mutable_query()->set_yql_text(sb);
- request.set_session_id(sessionId);
- request.mutable_tx_control()->mutable_begin_tx()->mutable_snapshot_read_only();
-
- Register(new TYDBRequest<TDialogSelect>(std::move(request), SelfId(), Config.GetRequestConfig()));
-}
-
-void TDSAccessorRefresher::Handle(TEvRefresh::TPtr& /*ev*/) {
- Register(new TYDBRequest<TDialogCreateSession>(TDialogCreateSession::TRequest(), SelfId(), Config.GetRequestConfig()));
-}
-
-void TDSAccessorRefresher::OnInitialized() {
- Sender<TEvRefresh>().SendTo(SelfId());
+ Register(new TYQLQuerySessionedActor(sb, Config.GetRequestConfig(), InternalController));
}
TDSAccessorRefresher::TDSAccessorRefresher(const TConfig& config, ISnapshotParser::TPtr snapshotConstructor)
- : TBase(config.GetRequestConfig())
- , SnapshotConstructor(snapshotConstructor)
+ : SnapshotConstructor(snapshotConstructor)
, Config(config)
{
}
-ISnapshotAcceptorController::TPtr TDSAccessorRefresher::GetController() const {
- if (!ControllerImpl) {
- ControllerImpl = std::make_shared<TSnapshotAcceptorController>(SelfId());
- }
- return ControllerImpl;
+void TDSAccessorRefresher::Bootstrap() {
+ RegisterState();
+ InternalController = std::make_shared<TRefreshInternalController>(SelfId());
+ Sender<TEvRefresh>().SendTo(SelfId());
}
}
diff --git a/ydb/services/metadata/ds_table/accessor_refresh.h b/ydb/services/metadata/ds_table/accessor_refresh.h
index 920afdbcfb1..11ac1984bc0 100644
--- a/ydb/services/metadata/ds_table/accessor_refresh.h
+++ b/ydb/services/metadata/ds_table/accessor_refresh.h
@@ -13,6 +13,17 @@ class TEvRefresh: public NActors::TEventLocal<TEvRefresh, EEvSubscribe::EvRefres
public:
};
+class TEvYQLResponse: public NActors::TEventLocal<TEvYQLResponse, EEvSubscribe::EvYQLResponse> {
+private:
+ YDB_READONLY_DEF(NInternal::NRequest::TDialogYQLRequest::TResponse, Response);
+public:
+ TEvYQLResponse(const NInternal::NRequest::TDialogYQLRequest::TResponse& r)
+ : Response(r)
+ {
+
+ }
+};
+
class TEvEnrichSnapshotResult: public NActors::TEventLocal<TEvEnrichSnapshotResult, EEvSubscribe::EvEnrichSnapshotResult> {
private:
YDB_READONLY_DEF(ISnapshot::TPtr, EnrichedSnapshot);
@@ -33,11 +44,11 @@ public:
}
};
-class TSnapshotAcceptorController: public ISnapshotAcceptorController {
+class TRefreshInternalController: public ISnapshotAcceptorController, public NInternal::NRequest::IQueryOutput {
private:
const TActorIdentity ActorId;
public:
- TSnapshotAcceptorController(const TActorIdentity& actorId)
+ TRefreshInternalController(const TActorIdentity& actorId)
: ActorId(actorId) {
}
@@ -49,39 +60,41 @@ public:
virtual void Enriched(ISnapshot::TPtr enrichedSnapshot) override {
ActorId.Send(ActorId, new TEvEnrichSnapshotResult(enrichedSnapshot));
}
+
+ virtual void OnReply(const NInternal::NRequest::TDialogYQLRequest::TResponse& response) override {
+ ActorId.Send(ActorId, new TEvYQLResponse(response));
+ }
};
-class TDSAccessorRefresher: public NMetadataInitializer::TDSAccessorInitialized {
+class TDSAccessorRefresher: public NActors::TActorBootstrapped<TDSAccessorRefresher> {
private:
- using TBase = NMetadataInitializer::TDSAccessorInitialized;
+ using TBase = NActors::TActorBootstrapped<TDSAccessorRefresher>;
ISnapshotParser::TPtr SnapshotConstructor;
+ std::shared_ptr<TRefreshInternalController> InternalController;
YDB_READONLY_DEF(ISnapshot::TPtr, CurrentSnapshot);
YDB_READONLY_DEF(Ydb::Table::ExecuteQueryResult, CurrentSelection);
+ Ydb::Table::ExecuteQueryResult ProposedProto;
TInstant RequestedActuality = TInstant::Zero();
const TConfig Config;
-
- mutable ISnapshotAcceptorController::TPtr ControllerImpl;
-
- ISnapshotAcceptorController::TPtr GetController() const;
protected:
- virtual void Prepare(NMetadataInitializer::IController::TPtr controller) override {
- SnapshotConstructor->Prepare(controller);
+ virtual void RegisterState() {
+ Become(&TDSAccessorRefresher::StateMain);
}
bool IsReady() const {
return !!CurrentSnapshot;
}
- virtual void OnInitialized() override;
virtual void OnSnapshotModified() = 0;
public:
- STFUNC(StateMain) {
+ void Bootstrap();
+
+ STATEFN(StateMain) {
switch (ev->GetTypeRewrite()) {
- hFunc(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogSelect>, Handle);
- hFunc(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogCreateSession>, Handle);
+ hFunc(TEvYQLResponse, Handle);
hFunc(TEvRefresh, Handle);
hFunc(TEvEnrichSnapshotResult, Handle);
hFunc(TEvEnrichSnapshotProblem, Handle);
default:
- TBase::StateMain(ev, ctx);
+ break;
}
}
@@ -89,8 +102,7 @@ public:
void Handle(TEvEnrichSnapshotResult::TPtr& ev);
void Handle(TEvEnrichSnapshotProblem::TPtr& ev);
- void Handle(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogSelect>::TPtr& ev);
- void Handle(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogCreateSession>::TPtr& ev);
+ void Handle(TEvYQLResponse::TPtr& ev);
void Handle(TEvRefresh::TPtr& ev);
};
diff --git a/ydb/services/metadata/ds_table/config.h b/ydb/services/metadata/ds_table/config.h
index 228b3e637b6..f4344506a40 100644
--- a/ydb/services/metadata/ds_table/config.h
+++ b/ydb/services/metadata/ds_table/config.h
@@ -10,7 +10,7 @@ class TConfig {
private:
YDB_READONLY_DEF(NInternal::NRequest::TConfig, RequestConfig);
YDB_READONLY(TDuration, RefreshPeriod, TDuration::Seconds(10));
- YDB_READONLY_FLAG(Enabled, true);
+ YDB_READONLY_FLAG(Enabled, false);
public:
TConfig() = default;
diff --git a/ydb/services/metadata/ds_table/service.cpp b/ydb/services/metadata/ds_table/service.cpp
index d75d258450b..c698cb407dd 100644
--- a/ydb/services/metadata/ds_table/service.cpp
+++ b/ydb/services/metadata/ds_table/service.cpp
@@ -14,20 +14,118 @@ IActor* CreateService(const TConfig& config) {
return new TService(config);
}
+void TService::PrepareManagers(std::vector<NMetadata::IOperationsManager::TPtr> managers, TAutoPtr<IEventBase> ev, const NActors::TActorId& sender) {
+ TManagersId id(managers);
+ const bool isInitialization = (managers.size() == 1) && managers.front()->GetTypeId() == NMetadataInitializer::TManager::GetTypeIdStatic();
+ if (ActiveFlag || (PreparationFlag && isInitialization)) {
+ for (auto&& manager : managers) {
+ Y_VERIFY(!RegisteredManagers.contains(manager->GetTypeId()));
+ if (!ManagersInRegistration.contains(manager->GetTypeId()) && !RegisteredManagers.contains(manager->GetTypeId())) {
+ ManagersInRegistration.emplace(manager->GetTypeId(), manager);
+ Register(new NMetadataInitializer::TDSAccessorInitialized(Config.GetRequestConfig(),
+ manager->GetTypeId(), manager->GetInitializationBehaviour(), InternalController, InitializationSnapshot));
+ }
+ }
+ } else if (!PreparationFlag) {
+ PreparationFlag = true;
+ InitializationFetcher = std::make_shared<NMetadataInitializer::TFetcher>();
+ Send(SelfId(), new TEvSubscribeExternal(InitializationFetcher));
+ }
+ EventsWaiting[id].emplace_back(ev, sender);
+}
+
+void TService::Handle(NMetadataInitializer::TEvInitializationFinished::TPtr& ev) {
+ const TString& initId = ev->Get()->GetInitializationId();
+
+ auto it = ManagersInRegistration.find(initId);
+ Y_VERIFY(it != ManagersInRegistration.end());
+ RegisteredManagers.emplace(initId, it->second);
+ ManagersInRegistration.erase(it);
+
+ std::map<TManagersId, std::deque<TWaitEvent>> movedEvents;
+ for (auto it = EventsWaiting.begin(); it != EventsWaiting.end(); ) {
+ auto m = it->first;
+ if (!m.RemoveId(initId)) {
+ ++it;
+ continue;
+ }
+ if (m.IsEmpty()) {
+ for (auto&& i : it->second) {
+ i.Resend(SelfId());
+ }
+ } else {
+ auto itNext = EventsWaiting.find(m);
+ if (itNext == EventsWaiting.end()) {
+ movedEvents.emplace(m, std::move(it->second));
+ } else {
+ for (auto&& i : it->second) {
+ itNext->second.emplace_back(std::move(i));
+ }
+ }
+ }
+ it = EventsWaiting.erase(it);
+ }
+ for (auto&& i : movedEvents) {
+ EventsWaiting.emplace(i.first, std::move(i.second));
+ }
+}
+
void TService::Handle(TEvSubscribeExternal::TPtr& ev) {
- auto it = Accessors.find(ev->Get()->GetSnapshotParser()->GetSnapshotId());
- if (it == Accessors.end()) {
- THolder<TExternalData> actor = MakeHolder<TExternalData>(Config, ev->Get()->GetSnapshotParser());
- it = Accessors.emplace(ev->Get()->GetSnapshotParser()->GetSnapshotId(), Register(actor.Release())).first;
+ std::vector<NMetadata::IOperationsManager::TPtr> needManagers;
+ for (auto&& i : ev->Get()->GetFetcher()->GetManagers()) {
+ if (!RegisteredManagers.contains(i->GetTypeId())) {
+ needManagers.emplace_back(i);
+ }
+ }
+ if (needManagers.empty()) {
+ auto it = Accessors.find(ev->Get()->GetFetcher()->GetComponentId());
+ if (it == Accessors.end()) {
+ THolder<TExternalData> actor = MakeHolder<TExternalData>(Config, ev->Get()->GetFetcher());
+ it = Accessors.emplace(ev->Get()->GetFetcher()->GetComponentId(), Register(actor.Release())).first;
+ }
+ Send<TEvSubscribe>(it->second, ev->Sender);
+ } else {
+ PrepareManagers(needManagers, ev->ReleaseBase(), ev->Sender);
+ }
+}
+
+void TService::Handle(TEvAlterObjects::TPtr& ev) {
+ auto it = RegisteredManagers.find(ev->Get()->GetCommand()->GetManager()->GetTypeId());
+ if (it != RegisteredManagers.end()) {
+ ev->Get()->GetCommand()->Execute();
+ } else {
+ auto m = ev->Get()->GetCommand()->GetManager();
+ PrepareManagers({ m }, ev->ReleaseBase(), ev->Sender);
}
- Send<TEvSubscribe>(it->second, ev->Sender);
}
void TService::Handle(TEvUnsubscribeExternal::TPtr& ev) {
- auto it = Accessors.find(ev->Get()->GetSnapshotParser()->GetSnapshotId());
+ auto it = Accessors.find(ev->Get()->GetFetcher()->GetComponentId());
if (it != Accessors.end()) {
Send<TEvUnsubscribe>(it->second, ev->Sender);
}
}
+void TService::Handle(TEvRefreshSubscriberData::TPtr& ev) {
+ auto s = ev->Get()->GetSnapshot();
+ InitializationSnapshot = dynamic_pointer_cast<NMetadataInitializer::TSnapshot>(s);
+ Y_VERIFY(InitializationSnapshot);
+ if (!ActiveFlag) {
+ ActiveFlag = true;
+ for (auto&& i : EventsWaiting) {
+ i.second.front().Resend(SelfId());
+ i.second.pop_front();
+ }
+ }
+}
+
+void TService::Bootstrap(const NActors::TActorContext& /*ctx*/) {
+ Become(&TService::StateMain);
+ InternalController = std::make_shared<TServiceInternalController>(SelfId());
+}
+
+void TServiceInternalController::InitializationFinished(const TString& id) const {
+ ActorId.Send(ActorId, new NMetadataInitializer::TEvInitializationFinished(id));
+}
+
}
diff --git a/ydb/services/metadata/ds_table/service.h b/ydb/services/metadata/ds_table/service.h
index e55d90ac914..7b52685c945 100644
--- a/ydb/services/metadata/ds_table/service.h
+++ b/ydb/services/metadata/ds_table/service.h
@@ -2,27 +2,125 @@
#include "config.h"
#include <ydb/services/metadata/service.h>
+#include <ydb/services/metadata/initializer/common.h>
+#include <ydb/services/metadata/initializer/events.h>
+#include <ydb/services/metadata/initializer/manager.h>
+#include <ydb/services/metadata/initializer/snapshot.h>
+#include <ydb/services/metadata/initializer/fetcher.h>
+
#include <library/cpp/actors/core/hfunc.h>
namespace NKikimr::NMetadataProvider {
+class TServiceInternalController: public NMetadataInitializer::IInitializerOutput {
+private:
+ const NActors::TActorIdentity ActorId;
+public:
+ TServiceInternalController(const NActors::TActorIdentity& actorId)
+ : ActorId(actorId)
+ {
+
+ }
+
+ virtual void InitializationFinished(const TString& id) const override;
+};
+
+class TManagersId {
+private:
+ YDB_READONLY_DEF(std::set<TString>, ManagerIds);
+public:
+ TManagersId(const std::vector<NMetadata::IOperationsManager::TPtr>& managers) {
+ for (auto&& i : managers) {
+ ManagerIds.emplace(i->GetTypeId());
+ }
+ }
+
+ bool IsEmpty() const {
+ return ManagerIds.empty();
+ }
+
+ bool RemoveId(const TString& id) {
+ auto it = ManagerIds.find(id);
+ if (it == ManagerIds.end()) {
+ return false;
+ }
+ ManagerIds.erase(it);
+ return true;
+ }
+
+ bool operator<(const TManagersId& item) const {
+ if (ManagerIds.size() < item.ManagerIds.size()) {
+ return true;
+ } else if (ManagerIds.size() > item.ManagerIds.size()) {
+ return false;
+ } else {
+ auto itSelf = ManagerIds.begin();
+ auto itItem = item.ManagerIds.begin();
+ while (itSelf != ManagerIds.end()) {
+ if (*itSelf < *itItem) {
+ return true;
+ }
+ ++itSelf;
+ ++itItem;
+ }
+ return false;
+ }
+ }
+};
+
+class TWaitEvent {
+private:
+ TAutoPtr<IEventBase> Event;
+ const TActorId Sender;
+public:
+ TWaitEvent(TAutoPtr<IEventBase> ev, const TActorId& sender)
+ : Event(ev)
+ , Sender(sender)
+ {
+
+ }
+
+ void Resend(const TActorIdentity& receiver) {
+ TActivationContext::Send(new IEventHandle(receiver, Sender, Event.Release()));
+ }
+};
+
+class TWaitingWatcher {
+private:
+};
+
class TService: public NActors::TActorBootstrapped<TService> {
private:
using TBase = NActors::TActor<TService>;
+ bool ActiveFlag = false;
+ bool PreparationFlag = false;
std::map<TString, NActors::TActorId> Accessors;
+ std::map<TManagersId, std::deque<TWaitEvent>> EventsWaiting;
+ std::map<TString, NMetadata::IOperationsManager::TPtr> ManagersInRegistration;
+ std::map<TString, NMetadata::IOperationsManager::TPtr> RegisteredManagers;
+
+ std::shared_ptr<NMetadataInitializer::TFetcher> InitializationFetcher;
+ std::shared_ptr<NMetadataInitializer::TSnapshot> InitializationSnapshot;
+ std::shared_ptr<TServiceInternalController> InternalController;
const TConfig Config;
-public:
+
+ void Handle(NMetadataInitializer::TEvInitializationFinished::TPtr& ev);
+ void Handle(TEvRefreshSubscriberData::TPtr& ev);
void Handle(TEvSubscribeExternal::TPtr& ev);
void Handle(TEvUnsubscribeExternal::TPtr& ev);
+ void Handle(TEvAlterObjects::TPtr& ev);
+ void PrepareManagers(std::vector<NMetadata::IOperationsManager::TPtr> manager, TAutoPtr<IEventBase> ev, const NActors::TActorId& sender);
+public:
- void Bootstrap(const NActors::TActorContext& /*ctx*/) {
- Become(&TThis::StateMain);
- }
+ void Bootstrap(const NActors::TActorContext& /*ctx*/);
STATEFN(StateMain) {
switch (ev->GetTypeRewrite()) {
+ hFunc(TEvAlterObjects, Handle);
+ hFunc(TEvRefreshSubscriberData, Handle);
hFunc(TEvSubscribeExternal, Handle);
hFunc(TEvUnsubscribeExternal, Handle);
+ hFunc(NMetadataInitializer::TEvInitializationFinished, Handle);
default:
Y_VERIFY(false);
}
@@ -31,6 +129,7 @@ public:
TService(const TConfig& config)
: Config(config)
{
+ TServiceOperator::Register();
}
};
diff --git a/ydb/services/metadata/initializer/CMakeLists.txt b/ydb/services/metadata/initializer/CMakeLists.txt
index d3b624ff7c1..7abe0ee5bfd 100644
--- a/ydb/services/metadata/initializer/CMakeLists.txt
+++ b/ydb/services/metadata/initializer/CMakeLists.txt
@@ -23,4 +23,9 @@ target_sources(services-metadata-initializer PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/metadata/initializer/common.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/initializer/controller.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/initializer/events.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/initializer/manager.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/initializer/object.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/initializer/snapshot.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/initializer/initializer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/initializer/fetcher.cpp
)
diff --git a/ydb/services/metadata/initializer/accessor_init.cpp b/ydb/services/metadata/initializer/accessor_init.cpp
index 9f03393e60b..ad1dcff2260 100644
--- a/ydb/services/metadata/initializer/accessor_init.cpp
+++ b/ydb/services/metadata/initializer/accessor_init.cpp
@@ -1,23 +1,32 @@
#include "accessor_init.h"
#include "controller.h"
+#include "manager.h"
#include <ydb/core/grpc_services/local_rpc/local_rpc.h>
+#include <ydb/services/metadata/abstract/common.h>
+#include <ydb/services/metadata/manager/alter.h>
+#include <ydb/services/metadata/service.h>
+
namespace NKikimr::NMetadataInitializer {
void TDSAccessorInitialized::Bootstrap() {
- RegisterState();
- Controller = std::make_shared<TInitializerController>(SelfId());
+ Become(&TDSAccessorInitialized::StateMain);
+ InternalController = std::make_shared<TInitializerInput>(SelfId());
Send(SelfId(), new TEvInitializerPreparationStart);
}
void TDSAccessorInitialized::Handle(TEvInitializerPreparationStart::TPtr& /*ev*/) {
- Prepare(Controller);
+ InitializationBehaviour->Prepare(InternalController);
}
void TDSAccessorInitialized::Handle(TEvInitializerPreparationFinished::TPtr& ev) {
auto modifiers = ev->Get()->GetModifiers();
for (auto&& i : modifiers) {
+ TDBInitializationKey key(ComponentId, i->GetModificationId());
+ if (InitializationSnapshot && InitializationSnapshot->GetObjects().contains(key)) {
+ continue;
+ }
Modifiers.emplace_back(i);
}
if (Modifiers.size()) {
@@ -25,7 +34,7 @@ void TDSAccessorInitialized::Handle(TEvInitializerPreparationFinished::TPtr& ev)
Modifiers.front()->Execute(SelfId(), Config);
} else {
ALS_INFO(NKikimrServices::METADATA_INITIALIZER) << "initialization finished";
- OnInitialized();
+ ExternalController->InitializationFinished(ComponentId);
}
}
@@ -34,22 +43,48 @@ void TDSAccessorInitialized::Handle(TEvInitializerPreparationProblem::TPtr& ev)
Schedule(TDuration::Seconds(1), new TEvInitializerPreparationStart);
}
-void TDSAccessorInitialized::Handle(NInternal::NRequest::TEvRequestFinished::TPtr& /*ev*/) {
- ALS_INFO(NKikimrServices::METADATA_INITIALIZER) << "modifiers count: " << Modifiers.size();
- if (Modifiers.empty()) {
- return;
- }
+void TDSAccessorInitialized::DoNextModifier() {
Modifiers.pop_front();
if (Modifiers.size()) {
Modifiers.front()->Execute(SelfId(), Config);
} else {
ALS_INFO(NKikimrServices::METADATA_INITIALIZER) << "initialization finished";
- OnInitialized();
+ ExternalController->InitializationFinished(ComponentId);
+ }
+}
+
+void TDSAccessorInitialized::Handle(NInternal::NRequest::TEvRequestFinished::TPtr& /*ev*/) {
+ ALS_INFO(NKikimrServices::METADATA_INITIALIZER) << "modifiers count: " << Modifiers.size();
+ Y_VERIFY(Modifiers.size());
+ if (NMetadataProvider::TServiceOperator::IsEnabled() && InitializationSnapshot) {
+ TDBInitialization dbInit(ComponentId, Modifiers.front()->GetModificationId());
+ auto manager = std::make_shared<NMetadataInitializer::TManager>();
+ auto alterCommand = std::make_shared<NMetadataManager::TAlterCommand<TDBInitialization>>(
+ dbInit.SerializeToRecord(), manager, InternalController, NMetadata::IOperationsManager::TModificationContext());
+ Sender<NMetadataProvider::TEvAlterObjects>(alterCommand)
+ .SendTo(NMetadataProvider::MakeServiceId(SelfId().NodeId()));
+ } else {
+ DoNextModifier();
}
}
-TDSAccessorInitialized::TDSAccessorInitialized(const NInternal::NRequest::TConfig& config)
+void TDSAccessorInitialized::Handle(NMetadataManager::TEvModificationFinished::TPtr& /*ev*/) {
+ DoNextModifier();
+}
+
+void TDSAccessorInitialized::Handle(NMetadataManager::TEvModificationProblem::TPtr& /*ev*/) {
+ Schedule(TDuration::Seconds(1), new NInternal::NRequest::TEvRequestFinished);
+}
+
+TDSAccessorInitialized::TDSAccessorInitialized(const NInternal::NRequest::TConfig& config,
+ const TString& componentId,
+ NMetadata::IInitializationBehaviour::TPtr initializationBehaviour,
+ IInitializerOutput::TPtr controller, std::shared_ptr<NMetadataInitializer::TSnapshot> initializationSnapshot)
: Config(config)
+ , InitializationBehaviour(initializationBehaviour)
+ , ExternalController(controller)
+ , InitializationSnapshot(initializationSnapshot)
+ , ComponentId(componentId)
{
}
diff --git a/ydb/services/metadata/initializer/accessor_init.h b/ydb/services/metadata/initializer/accessor_init.h
index c558e05df32..f6fd51ab847 100644
--- a/ydb/services/metadata/initializer/accessor_init.h
+++ b/ydb/services/metadata/initializer/accessor_init.h
@@ -1,7 +1,10 @@
#pragma once
#include "common.h"
+#include "controller.h"
#include "events.h"
+#include "snapshot.h"
+#include <ydb/services/metadata/abstract/common.h>
#include <ydb/services/metadata/ds_table/config.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
@@ -15,23 +18,28 @@ class TDSAccessorInitialized: public NActors::TActorBootstrapped<TDSAccessorInit
private:
TDeque<ITableModifier::TPtr> Modifiers;
const NInternal::NRequest::TConfig Config;
- IController::TPtr Controller;
-
+ NMetadata::IInitializationBehaviour::TPtr InitializationBehaviour;
+ IInitializerOutput::TPtr ExternalController;
+ TInitializerInput::TPtr InternalController;
+ std::shared_ptr<NMetadataInitializer::TSnapshot> InitializationSnapshot;
+ const TString ComponentId;
void Handle(TEvInitializerPreparationStart::TPtr& ev);
void Handle(TEvInitializerPreparationFinished::TPtr& ev);
void Handle(TEvInitializerPreparationProblem::TPtr& ev);
void Handle(NInternal::NRequest::TEvRequestFinished::TPtr& ev);
-protected:
- virtual void RegisterState() = 0;
- virtual void OnInitialized() = 0;
- virtual void Prepare(IController::TPtr controller) = 0;
+ void Handle(NMetadataManager::TEvModificationFinished::TPtr& ev);
+ void Handle(NMetadataManager::TEvModificationProblem::TPtr& ev);
+ void DoNextModifier();
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::METADATA_INITIALIZER;
}
void Bootstrap();
- TDSAccessorInitialized(const NInternal::NRequest::TConfig& config);
+ TDSAccessorInitialized(const NInternal::NRequest::TConfig& config,
+ const TString& componentId,
+ NMetadata::IInitializationBehaviour::TPtr initializationBehaviour,
+ IInitializerOutput::TPtr controller, std::shared_ptr<NMetadataInitializer::TSnapshot> initializationSnapshot);
STATEFN(StateMain) {
switch (ev->GetTypeRewrite()) {
@@ -39,6 +47,8 @@ public:
hFunc(TEvInitializerPreparationStart, Handle);
hFunc(TEvInitializerPreparationFinished, Handle);
hFunc(TEvInitializerPreparationProblem, Handle);
+ hFunc(NMetadataManager::TEvModificationFinished, Handle);
+ hFunc(NMetadataManager::TEvModificationProblem, Handle);
default:
break;
}
diff --git a/ydb/services/metadata/initializer/common.h b/ydb/services/metadata/initializer/common.h
index 40e8e79ee80..ae693fc5cb0 100644
--- a/ydb/services/metadata/initializer/common.h
+++ b/ydb/services/metadata/initializer/common.h
@@ -5,11 +5,20 @@
namespace NKikimr::NMetadataInitializer {
class ITableModifier {
+private:
+ YDB_READONLY_DEF(TString, ModificationId);
protected:
virtual bool DoExecute(const TActorId& resultCallbackId, const NInternal::NRequest::TConfig& config) const = 0;
public:
using TPtr = std::shared_ptr<ITableModifier>;
virtual ~ITableModifier() = default;
+
+ ITableModifier(const TString& modificationId)
+ : ModificationId(modificationId)
+ {
+
+ }
+
bool Execute(const TActorId& resultCallbackId, const NInternal::NRequest::TConfig& config) const {
return DoExecute(resultCallbackId, config);
}
@@ -18,6 +27,7 @@ public:
template <class TDialogPolicy>
class TGenericTableModifier: public ITableModifier {
private:
+ using TBase = ITableModifier;
YDB_READONLY_DEF(typename TDialogPolicy::TRequest, Request);
protected:
virtual bool DoExecute(const TActorId& resultCallbackId, const NInternal::NRequest::TConfig& config) const override {
@@ -25,18 +35,27 @@ protected:
return true;
}
public:
- TGenericTableModifier(typename TDialogPolicy::TRequest& request)
- : Request(request) {
+ TGenericTableModifier(typename TDialogPolicy::TRequest& request, const TString& modificationId)
+ : TBase(modificationId)
+ , Request(request)
+ {
}
};
-class IController {
+class IInitializerInput {
public:
- using TPtr = std::shared_ptr<IController>;
+ using TPtr = std::shared_ptr<IInitializerInput>;
virtual void PreparationFinished(const TVector<ITableModifier::TPtr>& modifiers) const = 0;
virtual void PreparationProblem(const TString& errorMessage) const = 0;
- virtual ~IController() = default;
+ virtual ~IInitializerInput() = default;
+};
+
+class IInitializerOutput {
+public:
+ using TPtr = std::shared_ptr<IInitializerOutput>;
+ virtual void InitializationFinished(const TString& id) const = 0;
+ virtual ~IInitializerOutput() = default;
};
}
diff --git a/ydb/services/metadata/initializer/controller.cpp b/ydb/services/metadata/initializer/controller.cpp
index 1481e547336..73aa8a0e748 100644
--- a/ydb/services/metadata/initializer/controller.cpp
+++ b/ydb/services/metadata/initializer/controller.cpp
@@ -1,14 +1,28 @@
#include "controller.h"
#include "events.h"
+#include <ydb/services/metadata/manager/modification_controller.h>
+
namespace NKikimr::NMetadataInitializer {
-void TInitializerController::PreparationFinished(const TVector<ITableModifier::TPtr>& modifiers) const {
+void TInitializerInput::PreparationFinished(const TVector<ITableModifier::TPtr>& modifiers) const {
ActorId.Send(ActorId, new TEvInitializerPreparationFinished(modifiers));
}
-void TInitializerController::PreparationProblem(const TString& errorMessage) const {
+void TInitializerInput::PreparationProblem(const TString& errorMessage) const {
ActorId.Send(ActorId, new TEvInitializerPreparationProblem(errorMessage));
}
+void TInitializerInput::AlterProblem(const TString& errorMessage) {
+ ActorId.Send(ActorId, new NMetadataManager::TEvModificationProblem(errorMessage));
+}
+
+void TInitializerInput::AlterFinished() {
+ ActorId.Send(ActorId, new NMetadataManager::TEvModificationFinished());
+}
+
+void TInitializerOutput::InitializationFinished(const TString& id) const {
+ ActorId.Send(ActorId, new TEvInitializationFinished(id));
+}
+
}
diff --git a/ydb/services/metadata/initializer/controller.h b/ydb/services/metadata/initializer/controller.h
index 2d2c2617e8f..63b2dd65ac9 100644
--- a/ydb/services/metadata/initializer/controller.h
+++ b/ydb/services/metadata/initializer/controller.h
@@ -1,19 +1,36 @@
#pragma once
#include "common.h"
+#include <ydb/services/metadata/manager/common.h>
+
namespace NKikimr::NMetadataInitializer {
-class TInitializerController: public IController {
+class TInitializerInput: public IInitializerInput, public NMetadataManager::IAlterController {
private:
const TActorIdentity ActorId;
public:
- TInitializerController(const TActorIdentity& actorId)
+ using TPtr = std::shared_ptr<TInitializerInput>;
+ TInitializerInput(const TActorIdentity& actorId)
: ActorId(actorId) {
}
+ virtual void AlterProblem(const TString& errorMessage) override;
+ virtual void AlterFinished() override;
virtual void PreparationFinished(const TVector<ITableModifier::TPtr>& modifiers) const override;
virtual void PreparationProblem(const TString& errorMessage) const override;
};
+class TInitializerOutput: public IInitializerOutput {
+private:
+ const TActorIdentity ActorId;
+public:
+ TInitializerOutput(const TActorIdentity& actorId)
+ : ActorId(actorId) {
+
+ }
+
+ virtual void InitializationFinished(const TString& id) const override;
+};
+
}
diff --git a/ydb/services/metadata/initializer/events.h b/ydb/services/metadata/initializer/events.h
index 5fe9b136835..e20967d27c5 100644
--- a/ydb/services/metadata/initializer/events.h
+++ b/ydb/services/metadata/initializer/events.h
@@ -10,20 +10,21 @@
namespace NKikimr::NMetadataInitializer {
-enum EInitializerEvents {
+enum EEvents {
EvInitializerPreparationStart = EventSpaceBegin(TKikimrEvents::ES_METADATA_INITIALIZER),
EvInitializerPreparationFinished,
EvInitializerPreparationProblem,
+ EvInitializationFinished,
EvEnd
};
-static_assert(EInitializerEvents::EvEnd < EventSpaceEnd(TKikimrEvents::ES_METADATA_INITIALIZER), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_METADATA_INITIALIZER)");
+static_assert(EEvents::EvEnd < EventSpaceEnd(TKikimrEvents::ES_METADATA_INITIALIZER), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_METADATA_INITIALIZER)");
-class TEvInitializerPreparationStart: public TEventLocal<TEvInitializerPreparationStart, EInitializerEvents::EvInitializerPreparationStart> {
+class TEvInitializerPreparationStart: public TEventLocal<TEvInitializerPreparationStart, EEvents::EvInitializerPreparationStart> {
public:
};
-class TEvInitializerPreparationFinished: public TEventLocal<TEvInitializerPreparationFinished, EInitializerEvents::EvInitializerPreparationFinished> {
+class TEvInitializerPreparationFinished: public TEventLocal<TEvInitializerPreparationFinished, EEvents::EvInitializerPreparationFinished> {
private:
YDB_READONLY_DEF(TVector<ITableModifier::TPtr>, Modifiers);
public:
@@ -33,7 +34,7 @@ public:
}
};
-class TEvInitializerPreparationProblem: public TEventLocal<TEvInitializerPreparationProblem, EInitializerEvents::EvInitializerPreparationProblem> {
+class TEvInitializerPreparationProblem: public TEventLocal<TEvInitializerPreparationProblem, EEvents::EvInitializerPreparationProblem> {
private:
YDB_READONLY_DEF(TString, ErrorMessage);
public:
@@ -43,4 +44,14 @@ public:
}
};
+class TEvInitializationFinished: public TEventLocal<TEvInitializationFinished, EEvents::EvInitializationFinished> {
+private:
+ YDB_READONLY_DEF(TString, InitializationId);
+public:
+ TEvInitializationFinished(const TString& initializationId)
+ : InitializationId(initializationId) {
+
+ }
+};
+
}
diff --git a/ydb/services/metadata/initializer/fetcher.cpp b/ydb/services/metadata/initializer/fetcher.cpp
new file mode 100644
index 00000000000..ac3e6fc55d2
--- /dev/null
+++ b/ydb/services/metadata/initializer/fetcher.cpp
@@ -0,0 +1,10 @@
+#include "fetcher.h"
+#include "manager.h"
+
+namespace NKikimr::NMetadataInitializer {
+
+std::vector<NMetadata::IOperationsManager::TPtr> TFetcher::DoGetManagers() const {
+ return { std::make_shared<TManager>() };
+}
+
+}
diff --git a/ydb/services/metadata/initializer/fetcher.h b/ydb/services/metadata/initializer/fetcher.h
new file mode 100644
index 00000000000..ac59e635e6c
--- /dev/null
+++ b/ydb/services/metadata/initializer/fetcher.h
@@ -0,0 +1,16 @@
+#pragma once
+
+#include "snapshot.h"
+#include <ydb/services/metadata/abstract/common.h>
+#include <ydb/services/metadata/abstract/manager.h>
+#include <ydb/library/accessor/accessor.h>
+
+namespace NKikimr::NMetadataInitializer {
+
+class TFetcher: public NMetadataProvider::TSnapshotsManager<TSnapshot> {
+protected:
+ virtual std::vector<NMetadata::IOperationsManager::TPtr> DoGetManagers() const override;
+public:
+};
+
+}
diff --git a/ydb/services/metadata/initializer/initializer.cpp b/ydb/services/metadata/initializer/initializer.cpp
new file mode 100644
index 00000000000..b0503002768
--- /dev/null
+++ b/ydb/services/metadata/initializer/initializer.cpp
@@ -0,0 +1,33 @@
+#include "initializer.h"
+
+namespace NKikimr::NMetadataInitializer {
+
+void TInitializer::DoPrepare(NMetadataInitializer::IInitializerInput::TPtr controller) const {
+ TVector<NMetadataInitializer::ITableModifier::TPtr> result;
+ {
+ Ydb::Table::CreateTableRequest request;
+ request.set_session_id("");
+ request.set_path(TDBInitialization::GetStorageTablePath());
+ request.add_primary_key(TDBInitialization::TDecoder::ComponentId);
+ request.add_primary_key(TDBInitialization::TDecoder::ModificationId);
+ {
+ auto& column = *request.add_columns();
+ column.set_name(TDBInitialization::TDecoder::ComponentId);
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name(TDBInitialization::TDecoder::ModificationId);
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name(TDBInitialization::TDecoder::Instant);
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT32);
+ }
+ result.emplace_back(new NMetadataInitializer::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request, "create"));
+ }
+ controller->PreparationFinished(result);
+}
+
+}
diff --git a/ydb/services/metadata/initializer/initializer.h b/ydb/services/metadata/initializer/initializer.h
new file mode 100644
index 00000000000..e6a882eb3f7
--- /dev/null
+++ b/ydb/services/metadata/initializer/initializer.h
@@ -0,0 +1,15 @@
+#pragma once
+
+#include "snapshot.h"
+#include <ydb/services/metadata/abstract/common.h>
+#include <ydb/library/accessor/accessor.h>
+
+namespace NKikimr::NMetadataInitializer {
+
+class TInitializer: public NMetadata::IInitializationBehaviour {
+protected:
+ virtual void DoPrepare(NMetadataInitializer::IInitializerInput::TPtr controller) const override;
+public:
+};
+
+}
diff --git a/ydb/services/metadata/initializer/manager.cpp b/ydb/services/metadata/initializer/manager.cpp
new file mode 100644
index 00000000000..3a61d8b089f
--- /dev/null
+++ b/ydb/services/metadata/initializer/manager.cpp
@@ -0,0 +1,12 @@
+#include "manager.h"
+#include "initializer.h"
+
+namespace NKikimr::NMetadataInitializer {
+
+TManager::TFactory::TRegistrator<TManager> TManager::Registrator(TManager::GetTypeIdStatic());
+
+NMetadata::IInitializationBehaviour::TPtr TManager::DoGetInitializationBehaviour() const {
+ return std::make_shared<TInitializer>();
+}
+
+}
diff --git a/ydb/services/metadata/initializer/manager.h b/ydb/services/metadata/initializer/manager.h
new file mode 100644
index 00000000000..74f8d743c06
--- /dev/null
+++ b/ydb/services/metadata/initializer/manager.h
@@ -0,0 +1,19 @@
+#pragma once
+
+#include "object.h"
+
+#include <ydb/services/metadata/abstract/common.h>
+#include <ydb/services/metadata/manager/generic_manager.h>
+#include <ydb/library/accessor/accessor.h>
+
+namespace NKikimr::NMetadataInitializer {
+
+class TManager: public NMetadata::TGenericOperationsManager<TDBInitialization> {
+private:
+ static TFactory::TRegistrator<TManager> Registrator;
+protected:
+ virtual NMetadata::IInitializationBehaviour::TPtr DoGetInitializationBehaviour() const override;
+public:
+};
+
+}
diff --git a/ydb/services/metadata/initializer/object.cpp b/ydb/services/metadata/initializer/object.cpp
new file mode 100644
index 00000000000..7145bd79d00
--- /dev/null
+++ b/ydb/services/metadata/initializer/object.cpp
@@ -0,0 +1,57 @@
+#include "object.h"
+#include <ydb/core/base/appdata.h>
+#include <ydb/services/metadata/manager/ydb_value_operator.h>
+
+namespace NKikimr::NMetadataInitializer {
+
+TString TDBInitialization::GetStorageTablePath() {
+ return "/" + AppData()->TenantName + "/.metadata/initializations";
+}
+
+bool TDBInitialization::DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& rawValue) {
+ if (!decoder.Read(decoder.GetComponentIdIdx(), ComponentId, rawValue)) {
+ return false;
+ }
+ if (!decoder.Read(decoder.GetModificationIdIdx(), ModificationId, rawValue)) {
+ return false;
+ }
+ if (!decoder.Read(decoder.GetInstantIdx(), Instant, rawValue)) {
+ return false;
+ }
+ return true;
+}
+
+NKikimr::NMetadataManager::TTableRecord TDBInitialization::SerializeToRecord() const {
+ NMetadataManager::TTableRecord result;
+ result.SetColumn(TDecoder::ComponentId, NMetadataManager::TYDBValue::Bytes(ComponentId));
+ result.SetColumn(TDecoder::ModificationId, NMetadataManager::TYDBValue::Bytes(ModificationId));
+ result.SetColumn(TDecoder::Instant, NMetadataManager::TYDBValue::UInt32(Instant.Seconds()));
+ return result;
+}
+
+void TDBInitialization::AlteringPreparation(std::vector<TDBInitialization>&& objects,
+ NMetadataManager::IAlterPreparationController<TDBInitialization>::TPtr controller,
+ const NMetadata::IOperationsManager::TModificationContext& /*context*/) {
+ controller->PreparationFinished(std::move(objects));
+}
+
+std::vector<Ydb::Column> TDBInitialization::TDecoder::GetColumns() {
+ return {
+ NMetadataManager::TYDBColumn::Bytes(ComponentId),
+ NMetadataManager::TYDBColumn::Bytes(ModificationId),
+ NMetadataManager::TYDBColumn::UInt32(Instant)
+ };
+}
+
+std::vector<Ydb::Column> TDBInitialization::TDecoder::GetPKColumns() {
+ return {
+ NMetadataManager::TYDBColumn::Bytes(ComponentId),
+ NMetadataManager::TYDBColumn::Bytes(ModificationId)
+ };
+}
+
+std::vector<TString> TDBInitialization::TDecoder::GetPKColumnIds() {
+ return { ComponentId, ModificationId };
+}
+
+}
diff --git a/ydb/services/metadata/initializer/object.h b/ydb/services/metadata/initializer/object.h
new file mode 100644
index 00000000000..2f74c8a7aa0
--- /dev/null
+++ b/ydb/services/metadata/initializer/object.h
@@ -0,0 +1,76 @@
+#pragma once
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/core/base/appdata.h>
+
+#include <ydb/services/metadata/abstract/decoder.h>
+#include <ydb/services/metadata/abstract/manager.h>
+#include <ydb/services/metadata/manager/object.h>
+#include <ydb/services/metadata/manager/preparation_controller.h>
+
+namespace NKikimr::NMetadataInitializer {
+
+class TDBInitializationKey {
+private:
+ YDB_READONLY_PROTECT_DEF(TString, ComponentId);
+ YDB_READONLY_PROTECT_DEF(TString, ModificationId);
+public:
+ TDBInitializationKey() = default;
+ TDBInitializationKey(const TString& componentId, const TString& modificationId)
+ : ComponentId(componentId)
+ , ModificationId(modificationId)
+ {
+ }
+
+ bool operator<(const TDBInitializationKey& item) const {
+ return std::tie(ComponentId, ModificationId) < std::tie(item.ComponentId, item.ModificationId);
+ }
+};
+
+class TDBInitialization: public TDBInitializationKey, public NMetadataManager::TObject<TDBInitialization> {
+private:
+ using TBase = TDBInitializationKey;
+ YDB_READONLY(TInstant, Instant, AppData()->TimeProvider->Now());
+public:
+ static NMetadata::TOperationParsingResult BuildPatchFromSettings(const NYql::TObjectSettingsImpl& /*settings*/,
+ const NMetadata::IOperationsManager::TModificationContext& /*context*/) {
+ NMetadataManager::TTableRecord result;
+ return result;
+ }
+
+ class TDecoder: public NInternal::TDecoderBase {
+ private:
+ YDB_ACCESSOR(i32, ComponentIdIdx, -1);
+ YDB_ACCESSOR(i32, ModificationIdIdx, -1);
+ YDB_ACCESSOR(i32, InstantIdx, -1);
+ public:
+ static inline const TString ComponentId = "componentId";
+ static inline const TString ModificationId = "modificationId";
+ static inline const TString Instant = "instant";
+ static std::vector<TString> GetPKColumnIds();
+ static std::vector<Ydb::Column> GetPKColumns();
+ static std::vector<Ydb::Column> GetColumns();
+
+ TDecoder(const Ydb::ResultSet& rawData) {
+ ComponentIdIdx = GetFieldIndex(rawData, ComponentId);
+ ModificationIdIdx = GetFieldIndex(rawData, ModificationId);
+ InstantIdx = GetFieldIndex(rawData, Instant);
+ }
+ };
+
+ using TBase::TBase;
+
+ static void AlteringPreparation(std::vector<TDBInitialization>&& objects,
+ NMetadataManager::IAlterPreparationController<TDBInitialization>::TPtr controller,
+ const NMetadata::IOperationsManager::TModificationContext& context);
+ static TString GetStorageTablePath();
+ static TString GetStorageHistoryTablePath() {
+ return "";
+ }
+ bool DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& rawValue);
+ NKikimr::NMetadataManager::TTableRecord SerializeToRecord() const;
+ static TString GetTypeId() {
+ return "INITIALIZATION";
+ }
+};
+
+}
diff --git a/ydb/services/metadata/initializer/snapshot.cpp b/ydb/services/metadata/initializer/snapshot.cpp
new file mode 100644
index 00000000000..93212da6f50
--- /dev/null
+++ b/ydb/services/metadata/initializer/snapshot.cpp
@@ -0,0 +1,30 @@
+#include "snapshot.h"
+
+namespace NKikimr::NMetadataInitializer {
+
+bool TSnapshot::DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawDataResult) {
+ Y_VERIFY(rawDataResult.result_sets().size() == 1);
+ {
+ auto& rawData = rawDataResult.result_sets()[0];
+ TDBInitialization::TDecoder decoder(rawData);
+ for (auto&& r : rawData.rows()) {
+ TDBInitialization initObject;
+ if (!initObject.DeserializeFromRecord(decoder, r)) {
+ ALS_ERROR(NKikimrServices::METADATA_INITIALIZER) << "cannot parse initialization info for snapshot";
+ continue;
+ }
+ Objects.emplace(initObject, initObject);
+ }
+ }
+ return true;
+}
+
+TString TSnapshot::DoSerializeToString() const {
+ TStringBuilder sb;
+ for (auto&& i : Objects) {
+ sb << i.first.GetComponentId() << ":" << i.first.GetModificationId() << ";";
+ }
+ return sb;
+}
+
+}
diff --git a/ydb/services/metadata/initializer/snapshot.h b/ydb/services/metadata/initializer/snapshot.h
new file mode 100644
index 00000000000..73de0083e74
--- /dev/null
+++ b/ydb/services/metadata/initializer/snapshot.h
@@ -0,0 +1,21 @@
+#pragma once
+#include "object.h"
+
+#include <ydb/services/metadata/abstract/common.h>
+#include <ydb/library/accessor/accessor.h>
+
+namespace NKikimr::NMetadataInitializer {
+
+class TSnapshot: public NMetadataProvider::ISnapshot {
+private:
+ using TBase = NMetadataProvider::ISnapshot;
+ using TObjects = std::map<TDBInitializationKey, TDBInitialization>;
+ YDB_READONLY_DEF(TObjects, Objects);
+protected:
+ virtual bool DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawData) override;
+ virtual TString DoSerializeToString() const override;
+public:
+ using TBase::TBase;
+};
+
+}
diff --git a/ydb/services/metadata/manager/CMakeLists.txt b/ydb/services/metadata/manager/CMakeLists.txt
new file mode 100644
index 00000000000..019c6f158c2
--- /dev/null
+++ b/ydb/services/metadata/manager/CMakeLists.txt
@@ -0,0 +1,35 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(services-metadata-manager)
+target_link_libraries(services-metadata-manager PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ ydb-library-accessor
+ cpp-actors-core
+ api-protos
+ ydb-core-protos
+ services-bg_tasks-abstract
+ services-metadata-initializer
+ ydb-core-base
+ services-metadata-request
+)
+target_sources(services-metadata-manager PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/manager/alter.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/manager/table_record.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/manager/restore.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/manager/modification.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/manager/generic_manager.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/manager/preparation_controller.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/manager/restore_controller.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/manager/common.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/manager/ydb_value_operator.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/manager/modification_controller.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/manager/object.cpp
+)
diff --git a/ydb/services/metadata/manager/alter.cpp b/ydb/services/metadata/manager/alter.cpp
new file mode 100644
index 00000000000..79e9f2d485e
--- /dev/null
+++ b/ydb/services/metadata/manager/alter.cpp
@@ -0,0 +1,5 @@
+#include "alter.h"
+
+namespace NKikimr::NMetadataManager {
+
+}
diff --git a/ydb/services/metadata/manager/alter.h b/ydb/services/metadata/manager/alter.h
new file mode 100644
index 00000000000..086ad56ab42
--- /dev/null
+++ b/ydb/services/metadata/manager/alter.h
@@ -0,0 +1,384 @@
+#pragma once
+#include "modification_controller.h"
+#include "preparation_controller.h"
+#include "restore.h"
+#include "modification.h"
+
+#include <ydb/services/metadata/abstract/manager.h>
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+
+namespace NKikimr::NMetadataManager {
+
+template <class TObject>
+class TProcessingController:
+ public IRestoreObjectsController<TObject>,
+ public IModificationObjectsController,
+ public IAlterPreparationController<TObject> {
+private:
+ const TActorIdentity ActorId;
+public:
+ using TPtr = std::shared_ptr<TProcessingController>;
+ TProcessingController(const TActorIdentity actorId)
+ : ActorId(actorId)
+ {
+
+ }
+
+ virtual void RestoreFinished(std::vector<TObject>&& objects, const TString& transactionId) override {
+ ActorId.Send(ActorId, new TEvRestoreFinished<TObject>(std::move(objects), transactionId));
+ }
+ virtual void RestoreProblem(const TString& errorMessage) override {
+ ActorId.Send(ActorId, new TEvRestoreProblem(errorMessage));
+ }
+ virtual void ModificationFinished() override {
+ ActorId.Send(ActorId, new TEvModificationFinished());
+ }
+ virtual void ModificationProblem(const TString& errorMessage) override {
+ ActorId.Send(ActorId, new TEvModificationProblem(errorMessage));
+ }
+ virtual void PreparationProblem(const TString& errorMessage) override {
+ ActorId.Send(ActorId, new TEvAlterPreparationProblem(errorMessage));
+ }
+ virtual void PreparationFinished(std::vector<TObject>&& objects) override {
+ ActorId.Send(ActorId, new TEvAlterPreparationFinished<TObject>(std::move(objects)));
+ }
+
+};
+
+template <class TObject>
+class TModificationActorImpl: public NActors::TActorBootstrapped<TModificationActorImpl<TObject>> {
+private:
+ using TBase = NActors::TActorBootstrapped<TModificationActorImpl<TObject>>;
+protected:
+ TString SessionId;
+ TString TransactionId;
+ typename TProcessingController<TObject>::TPtr InternalController;
+ IAlterController::TPtr ExternalController;
+ const NMetadata::IOperationsManager::TModificationContext Context;
+ std::vector<TTableRecord> Patches;
+ TTableRecords RestoreObjectIds;
+ virtual bool PrepareRestoredObjects(std::vector<TObject>& objects) const = 0;
+ virtual bool ProcessPreparedObjects(TTableRecords&& records) const = 0;
+ virtual void InitState() = 0;
+ virtual bool BuildRestoreObjectIds() = 0;
+public:
+ TModificationActorImpl(TTableRecord&& patch, IAlterController::TPtr controller, const NMetadata::IOperationsManager::TModificationContext& context)
+ : ExternalController(controller)
+ , Context(context) {
+ Patches.emplace_back(std::move(patch));
+ }
+
+ TModificationActorImpl(const TTableRecord& patch, IAlterController::TPtr controller, const NMetadata::IOperationsManager::TModificationContext& context)
+ : ExternalController(controller)
+ , Context(context) {
+ Patches.emplace_back(patch);
+ }
+
+ TModificationActorImpl(std::vector<TTableRecord>&& patches, IAlterController::TPtr controller, const NMetadata::IOperationsManager::TModificationContext& context)
+ : ExternalController(controller)
+ , Context(context)
+ , Patches(std::move(patches)) {
+
+ }
+
+ TModificationActorImpl(const std::vector<TTableRecord>& patches, IAlterController::TPtr controller, const NMetadata::IOperationsManager::TModificationContext& context)
+ : ExternalController(controller)
+ , Context(context)
+ , Patches(patches) {
+
+ }
+
+ STATEFN(StateMain) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogCreateSession>, Handle);
+ hFunc(TEvRestoreFinished<TObject>, Handle);
+ hFunc(TEvAlterPreparationFinished<TObject>, Handle);
+ hFunc(NInternal::NRequest::TEvRequestFailed, Handle);
+ hFunc(TEvRestoreProblem, Handle);
+ hFunc(TEvAlterPreparationProblem, Handle);
+ default:
+ break;
+ }
+ }
+
+ void Bootstrap() {
+ InitState();
+ if (!Patches.size()) {
+ ExternalController->AlterProblem("no patches");
+ return TBase::PassAway();
+ }
+ if (!BuildRestoreObjectIds()) {
+ return TBase::PassAway();
+ }
+
+ TBase::Register(new NInternal::NRequest::TYDBRequest<NInternal::NRequest::TDialogCreateSession>(
+ NInternal::NRequest::TDialogCreateSession::TRequest(), TBase::SelfId()));
+ }
+
+ void Handle(typename NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogCreateSession>::TPtr& ev) {
+ Ydb::Table::CreateSessionResponse currentFullReply = ev->Get()->GetResult();
+ Ydb::Table::CreateSessionResult session;
+ currentFullReply.operation().result().UnpackTo(&session);
+ SessionId = session.session_id();
+ Y_VERIFY(SessionId);
+
+ InternalController = std::make_shared<TProcessingController<TObject>>(TBase::SelfId());
+ TBase::Register(new TRestoreObjectsActor<TObject>(RestoreObjectIds, InternalController, SessionId));
+ }
+
+ void Handle(typename TEvRestoreFinished<TObject>::TPtr& ev) {
+ TransactionId = ev->Get()->GetTransactionId();
+ Y_VERIFY(TransactionId);
+ std::vector<TObject> objects = std::move(ev->Get()->MutableObjects());
+ if (!PrepareRestoredObjects(objects)) {
+ TBase::PassAway();
+ } else {
+ TObject::AlteringPreparation(std::move(objects), InternalController, Context);
+ }
+ }
+
+ void Handle(typename TEvAlterPreparationFinished<TObject>::TPtr& ev) {
+ TTableRecords records;
+ records.InitColumns(TObject::TDecoder::GetColumns());
+ records.ReserveRows(ev->Get()->GetObjects().size());
+ for (auto&& i : ev->Get()->GetObjects()) {
+ if (!records.AddRecordNativeValues(i.SerializeToRecord())) {
+ ExternalController->AlterProblem("unexpected serialization inconsistency");
+ return TBase::PassAway();
+ }
+ }
+ if (!ProcessPreparedObjects(std::move(records))) {
+ ExternalController->AlterProblem("cannot process prepared objects");
+ return TBase::PassAway();
+ }
+ }
+
+ void Handle(typename NInternal::NRequest::TEvRequestFailed::TPtr& /*ev*/) {
+ auto g = TBase::PassAwayGuard();
+ ExternalController->AlterProblem("cannot initialize session");
+ }
+
+ void Handle(TEvAlterPreparationProblem::TPtr& ev) {
+ auto g = TBase::PassAwayGuard();
+ ExternalController->AlterProblem("preparation problem: " + ev->Get()->GetErrorMessage());
+ }
+
+ void Handle(TEvRestoreProblem::TPtr& ev) {
+ auto g = TBase::PassAwayGuard();
+ ExternalController->AlterProblem("cannot restore objects: " + ev->Get()->GetErrorMessage());
+ }
+
+};
+
+template <class TObject>
+class TModificationActor: public TModificationActorImpl<TObject> {
+private:
+ using TBase = TModificationActorImpl<TObject>;
+protected:
+ virtual void InitState() override {
+ TBase::Become(&TModificationActor<TObject>::StateMain);
+ }
+
+ virtual bool BuildRestoreObjectIds() override {
+ TBase::RestoreObjectIds.InitColumns(TObject::TDecoder::GetPKColumns());
+ for (auto&& i : TBase::Patches) {
+ if (!TBase::RestoreObjectIds.AddRecordNativeValues(i)) {
+ TBase::ExternalController->AlterProblem("no pk columns in patch");
+ return false;
+ }
+ }
+ return true;
+ }
+
+ virtual TString GetModificationType() const = 0;
+
+public:
+ using TBase::TBase;
+ STFUNC(StateMain) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvModificationFinished, Handle);
+ hFunc(TEvModificationProblem, Handle);
+ default:
+ TBase::StateMain(ev, ctx);
+ }
+ }
+
+ virtual bool PrepareRestoredObjects(std::vector<TObject>& objects) const override {
+ std::vector<bool> realPatches;
+ realPatches.resize(TBase::Patches.size(), false);
+ for (auto&& i : objects) {
+ const TTableRecord* trPatch = nullptr;
+ TTableRecord trObject = i.SerializeToRecord();
+ for (auto&& p : TBase::Patches) {
+ if (p.CompareColumns(trObject, TObject::TDecoder::GetPKColumnIds())) {
+ trPatch = &p;
+ break;
+ }
+ }
+ TObject objectPatched;
+ if (!trPatch) {
+ TBase::ExternalController->AlterProblem("cannot found patch for object");
+ return false;
+ } else if (!trObject.TakeValuesFrom(*trPatch)) {
+ TBase::ExternalController->AlterProblem("cannot patch object");
+ return false;
+ } else if (!TObject::TDecoder::DeserializeFromRecord(objectPatched, trObject)) {
+ TBase::ExternalController->AlterProblem("cannot parse object after patch");
+ return false;
+ } else {
+ i = std::move(objectPatched);
+ }
+ }
+ for (auto&& p : TBase::Patches) {
+ bool found = false;
+ for (auto&& i : objects) {
+ if (i.SerializeToRecord().CompareColumns(p, TObject::TDecoder::GetPKColumnIds())) {
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ TObject object;
+ if (!TObject::TDecoder::DeserializeFromRecord(object, p)) {
+ TBase::ExternalController->AlterProblem("cannot parse new object");
+ return false;
+ }
+ objects.emplace_back(std::move(object));
+ }
+ }
+ return true;
+ }
+
+ void Handle(TEvModificationFinished::TPtr& /*ev*/) {
+ auto g = TBase::PassAwayGuard();
+ TBase::ExternalController->AlterFinished();
+ }
+
+ void Handle(TEvModificationProblem::TPtr& ev) {
+ auto g = TBase::PassAwayGuard();
+ TBase::ExternalController->AlterProblem("cannot " + GetModificationType() + " objects: " + ev->Get()->GetErrorMessage());
+ }
+
+};
+
+template <class TObject>
+class TAlterActor: public TModificationActor<TObject> {
+private:
+ using TBase = TModificationActor<TObject>;
+protected:
+ virtual bool ProcessPreparedObjects(TTableRecords&& records) const override {
+ TBase::Register(new TUpdateObjectsActor<TObject>(std::move(records),
+ TBase::InternalController, TBase::SessionId, TBase::TransactionId, TBase::Context.GetUserToken()));
+ return true;
+ }
+
+ virtual TString GetModificationType() const override {
+ return "ALTER";
+ }
+public:
+ using TBase::TBase;
+};
+
+template <class TObject>
+class TCreateActor: public TModificationActor<TObject> {
+private:
+ using TBase = TModificationActor<TObject>;
+protected:
+ virtual bool ProcessPreparedObjects(TTableRecords&& records) const override {
+ TBase::Register(new TInsertObjectsActor<TObject>(std::move(records),
+ TBase::InternalController, TBase::SessionId, TBase::TransactionId, TBase::Context.GetUserToken()));
+ return true;
+ }
+
+ virtual TString GetModificationType() const override {
+ return "CREATE";
+ }
+public:
+ using TBase::TBase;
+};
+
+template <class TObject>
+class TDropActor: public TModificationActor<TObject> {
+private:
+ using TBase = TModificationActor<TObject>;
+protected:
+ virtual void InitState() override {
+ TBase::Become(&TDropActor<TObject>::StateMain);
+ }
+
+ virtual bool BuildRestoreObjectIds() override {
+ auto& first = TBase::Patches.front();
+ std::vector<Ydb::Column> columns = first.SelectOwnedColumns(TObject::TDecoder::GetPKColumns());
+ if (!columns.size()) {
+ TBase::ExternalController->AlterProblem("no pk columns in patch");
+ return false;
+ }
+ if (!columns.size()) {
+ TBase::ExternalController->AlterProblem("no columns for pk detection");
+ return false;
+ }
+ TBase::RestoreObjectIds.InitColumns(columns);
+ for (auto&& i : TBase::Patches) {
+ if (!TBase::RestoreObjectIds.AddRecordNativeValues(i)) {
+ TBase::ExternalController->AlterProblem("incorrect pk columns");
+ return false;
+ }
+ }
+ return true;
+ }
+ virtual TString GetModificationType() const override {
+ return "DROP";
+ }
+public:
+ using TBase::TBase;
+
+ virtual bool ProcessPreparedObjects(TTableRecords&& records) const override {
+ TBase::Register(new TDeleteObjectsActor<TObject>(std::move(records),
+ TBase::InternalController, TBase::SessionId, TBase::TransactionId, TBase::Context.GetUserToken()));
+ return true;
+ }
+
+ virtual bool PrepareRestoredObjects(std::vector<TObject>& /*objects*/) const override {
+ return true;
+ }
+
+};
+
+template <class TObject>
+class TCreateCommand: public NMetadata::IAlterCommand {
+private:
+ using TBase = NMetadata::IAlterCommand;
+protected:
+ virtual void DoExecute() const override {
+ TActivationContext::AsActorContext().Register(new NMetadataManager::TCreateActor<TObject>(GetRecords(), GetController(), GetContext()));
+ }
+public:
+ using TBase::TBase;
+};
+
+template <class TObject>
+class TAlterCommand: public NMetadata::IAlterCommand {
+private:
+ using TBase = NMetadata::IAlterCommand;
+protected:
+ virtual void DoExecute() const override {
+ TActivationContext::AsActorContext().Register(new NMetadataManager::TAlterActor<TObject>(GetRecords(), GetController(), GetContext()));
+ }
+public:
+ using TBase::TBase;
+};
+
+template <class TObject>
+class TDropCommand: public NMetadata::IAlterCommand {
+private:
+ using TBase = NMetadata::IAlterCommand;
+protected:
+ virtual void DoExecute() const override {
+ TActivationContext::AsActorContext().Register(new NMetadataManager::TDropActor<TObject>(GetRecords(), GetController(), GetContext()));
+ }
+public:
+ using TBase::TBase;
+};
+
+}
diff --git a/ydb/services/metadata/manager/common.cpp b/ydb/services/metadata/manager/common.cpp
new file mode 100644
index 00000000000..4c7a303e26f
--- /dev/null
+++ b/ydb/services/metadata/manager/common.cpp
@@ -0,0 +1,5 @@
+#include "common.h"
+
+namespace NKikimr::NMetadataManager {
+
+}
diff --git a/ydb/services/metadata/manager/common.h b/ydb/services/metadata/manager/common.h
new file mode 100644
index 00000000000..2001262d6ce
--- /dev/null
+++ b/ydb/services/metadata/manager/common.h
@@ -0,0 +1,30 @@
+#pragma once
+#include <ydb/core/base/events.h>
+#include <library/cpp/actors/core/events.h>
+
+namespace NKikimr::NMetadataManager {
+
+class IAlterController {
+public:
+ using TPtr = std::shared_ptr<IAlterController>;
+ virtual ~IAlterController() = default;
+
+ virtual void AlterProblem(const TString& errorMessage) = 0;
+ virtual void AlterFinished() = 0;
+
+};
+
+enum EEvents {
+ EvRestoreFinished = EventSpaceBegin(TKikimrEvents::ES_METADATA_MANAGER),
+ EvRestoreProblem,
+ EvModificationFinished,
+ EvModificationProblem,
+ EvAlterFinished,
+ EvAlterProblem,
+ EvAlterPreparationFinished,
+ EvAlterPreparationProblem,
+ EvEnd
+};
+
+static_assert(EEvents::EvEnd < EventSpaceEnd(TKikimrEvents::ES_METADATA_MANAGER), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_METADATA_MANAGER)");
+}
diff --git a/ydb/services/metadata/manager/generic_manager.cpp b/ydb/services/metadata/manager/generic_manager.cpp
new file mode 100644
index 00000000000..51d4f0ed926
--- /dev/null
+++ b/ydb/services/metadata/manager/generic_manager.cpp
@@ -0,0 +1,5 @@
+#include "generic_manager.h"
+
+namespace NKikimr::NColumnShard::NTiers {
+
+}
diff --git a/ydb/services/metadata/manager/generic_manager.h b/ydb/services/metadata/manager/generic_manager.h
new file mode 100644
index 00000000000..3d7eda96dc2
--- /dev/null
+++ b/ydb/services/metadata/manager/generic_manager.h
@@ -0,0 +1,93 @@
+#pragma once
+#include <ydb/services/metadata/abstract/manager.h>
+#include <ydb/services/metadata/manager/common.h>
+#include <ydb/services/metadata/service.h>
+
+namespace NKikimr::NMetadata {
+
+class TOperationsController: public NMetadataManager::IAlterController {
+private:
+ YDB_READONLY_DEF(NThreading::TPromise<NMetadata::TObjectOperatorResult>, Promise);
+public:
+ TOperationsController(NThreading::TPromise<NMetadata::TObjectOperatorResult>&& p)
+ : Promise(std::move(p))
+ {
+
+ }
+
+ virtual void AlterProblem(const TString& errorMessage) override {
+ Promise.SetValue(NMetadata::TObjectOperatorResult(false).SetErrorMessage(errorMessage));
+ }
+ virtual void AlterFinished() override {
+ Promise.SetValue(NMetadata::TObjectOperatorResult(true));
+ }
+
+};
+
+template <class T>
+class TGenericOperationsManager: public NMetadata::IOperationsManager {
+protected:
+ virtual NThreading::TFuture<NMetadata::TObjectOperatorResult> DoCreateObject(
+ const NYql::TCreateObjectSettings& settings, const ui32 nodeId,
+ NMetadata::IOperationsManager::TPtr manager, const TModificationContext& context) const override
+ {
+ NMetadata::TOperationParsingResult patch(T::BuildPatchFromSettings(settings, context));
+ if (!patch.IsSuccess()) {
+ NMetadata::TObjectOperatorResult result(patch.GetErrorMessage());
+ return NThreading::MakeFuture<NMetadata::TObjectOperatorResult>(result);
+ }
+ auto promise = NThreading::NewPromise<NMetadata::TObjectOperatorResult>();
+ auto result = promise.GetFuture();
+ auto c = std::make_shared<TOperationsController>(std::move(promise));
+ auto command = std::make_shared<NMetadataManager::TCreateCommand<T>>(patch.GetRecord(), manager, c, context);
+ TActivationContext::Send(new IEventHandle(NMetadataProvider::MakeServiceId(nodeId), {},
+ new NMetadataProvider::TEvAlterObjects(command)));
+ return result;
+ }
+ virtual NThreading::TFuture<NMetadata::TObjectOperatorResult> DoAlterObject(
+ const NYql::TAlterObjectSettings& settings, const ui32 nodeId,
+ NMetadata::IOperationsManager::TPtr manager, const TModificationContext& context) const override
+ {
+ NMetadata::TOperationParsingResult patch(T::BuildPatchFromSettings(settings, context));
+ if (!patch.IsSuccess()) {
+ return NThreading::MakeFuture<NMetadata::TObjectOperatorResult>(NMetadata::TObjectOperatorResult(patch.GetErrorMessage()));
+ }
+ auto promise = NThreading::NewPromise<NMetadata::TObjectOperatorResult>();
+ auto result = promise.GetFuture();
+ auto c = std::make_shared<TOperationsController>(std::move(promise));
+ auto command = std::make_shared<NMetadataManager::TAlterCommand<T>>(patch.GetRecord(), manager, c, context);
+ TActivationContext::Send(new IEventHandle(NMetadataProvider::MakeServiceId(nodeId), {},
+ new NMetadataProvider::TEvAlterObjects(command)));
+ return result;
+ }
+ virtual NThreading::TFuture<NMetadata::TObjectOperatorResult> DoDropObject(
+ const NYql::TDropObjectSettings& settings, const ui32 nodeId,
+ NMetadata::IOperationsManager::TPtr manager, const TModificationContext& context) const override
+ {
+ NMetadata::TOperationParsingResult patch(T::BuildPatchFromSettings(settings, context));
+ if (!patch.IsSuccess()) {
+ return NThreading::MakeFuture<NMetadata::TObjectOperatorResult>(NMetadata::TObjectOperatorResult(patch.GetErrorMessage()));
+ }
+ auto promise = NThreading::NewPromise<NMetadata::TObjectOperatorResult>();
+ auto result = promise.GetFuture();
+ auto c = std::make_shared<TOperationsController>(std::move(promise));
+ auto command = std::make_shared<NMetadataManager::TDropCommand<T>>(patch.GetRecord(), manager, c, context);
+ TActivationContext::Send(new IEventHandle(NMetadataProvider::MakeServiceId(nodeId), {},
+ new NMetadataProvider::TEvAlterObjects(command)));
+ return result;
+ }
+public:
+ virtual TString GetTablePath() const override {
+ return T::GetStorageTablePath();
+ }
+
+ virtual TString GetTypeId() const override {
+ return GetTypeIdStatic();
+ }
+
+ static TString GetTypeIdStatic() {
+ return T::GetTypeId();
+ }
+};
+
+}
diff --git a/ydb/services/metadata/manager/modification.cpp b/ydb/services/metadata/manager/modification.cpp
new file mode 100644
index 00000000000..8ef18dcb38f
--- /dev/null
+++ b/ydb/services/metadata/manager/modification.cpp
@@ -0,0 +1,5 @@
+#include "modification.h"
+
+namespace NKikimr::NMetadataManager {
+
+}
diff --git a/ydb/services/metadata/manager/modification.h b/ydb/services/metadata/manager/modification.h
new file mode 100644
index 00000000000..bac2c54496e
--- /dev/null
+++ b/ydb/services/metadata/manager/modification.h
@@ -0,0 +1,157 @@
+#pragma once
+#include "table_record.h"
+#include "modification_controller.h"
+#include "ydb_value_operator.h"
+
+#include <ydb/library/aclib/aclib.h>
+#include <ydb/services/metadata/request/request_actor.h>
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+
+namespace NKikimr::NMetadataManager {
+
+template <class TObject>
+class TModifyObjectsActor: public NActors::TActorBootstrapped<TModifyObjectsActor<TObject>> {
+private:
+ using TBase = NActors::TActorBootstrapped<TModifyObjectsActor<TObject>>;
+ IModificationObjectsController::TPtr Controller;
+ const TString SessionId;
+ const TString TransactionId;
+ const TMaybe<NACLib::TUserToken> UserToken;
+ std::deque<NInternal::NRequest::TDialogYQLRequest::TRequest> Requests;
+protected:
+ TTableRecords Objects;
+ virtual Ydb::Table::ExecuteDataQueryRequest BuildModifyQuery() const = 0;
+ virtual TString GetModifyType() const = 0;
+
+ void BuildRequestDirect() {
+ Ydb::Table::ExecuteDataQueryRequest request = BuildModifyQuery();
+ request.set_session_id(SessionId);
+ request.mutable_tx_control()->set_tx_id(TransactionId);
+ Requests.emplace_back(std::move(request));
+ }
+
+ void BuildRequestHistory() {
+ if (!TObject::GetStorageHistoryTablePath()) {
+ return;
+ }
+ if (UserToken) {
+ Objects.AddColumn(TYDBColumn::Bytes("historyUserId"), TYDBValue::Bytes(UserToken->GetUserSID()));
+ }
+ Objects.AddColumn(TYDBColumn::UInt64("historyInstant"), TYDBValue::UInt64(TActivationContext::Now().MicroSeconds()));
+ Objects.AddColumn(TYDBColumn::Bytes("historyAction"), TYDBValue::Bytes(GetModifyType()));
+ Ydb::Table::ExecuteDataQueryRequest request = Objects.BuildInsertQuery(TObject::GetStorageHistoryTablePath());
+ request.set_session_id(SessionId);
+ request.mutable_tx_control()->set_tx_id(TransactionId);
+ request.mutable_tx_control()->set_commit_tx(true);
+ Requests.emplace_back(std::move(request));
+ }
+
+ void Handle(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogYQLRequest>::TPtr& /*ev*/) {
+ if (Requests.size()) {
+ TBase::Register(new NInternal::NRequest::TYDBRequest<NInternal::NRequest::TDialogYQLRequest>(Requests.front(), TBase::SelfId()));
+ Requests.pop_front();
+ } else {
+ Controller->ModificationFinished();
+ TBase::PassAway();
+ }
+ }
+
+ void Handle(NInternal::NRequest::TEvRequestFailed::TPtr& /*ev*/) {
+ auto g = TBase::PassAwayGuard();
+ Controller->ModificationProblem("cannot execute yql request for upsert objects");
+ }
+
+public:
+ TModifyObjectsActor(TTableRecords&& objects, IModificationObjectsController::TPtr controller, const TString& sessionId,
+ const TString& transactionId, const TMaybe<NACLib::TUserToken>& userToken)
+ : Controller(controller)
+ , SessionId(sessionId)
+ , TransactionId(transactionId)
+ , UserToken(userToken)
+ , Objects(std::move(objects))
+
+ {
+ Y_VERIFY(SessionId);
+ }
+
+ STATEFN(StateMain) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogYQLRequest>, Handle);
+ hFunc(NInternal::NRequest::TEvRequestFailed, Handle);
+ default:
+ break;
+ }
+ }
+
+ void Bootstrap() {
+ TBase::Become(&TModifyObjectsActor::StateMain);
+ BuildRequestDirect();
+ BuildRequestHistory();
+ TBase::Register(new NInternal::NRequest::TYDBRequest<NInternal::NRequest::TDialogYQLRequest>(Requests.front(), TBase::SelfId()));
+ Requests.pop_front();
+ }
+};
+
+template <class TObject>
+class TUpsertObjectsActor: public TModifyObjectsActor<TObject> {
+private:
+ using TBase = TModifyObjectsActor<TObject>;
+protected:
+ virtual Ydb::Table::ExecuteDataQueryRequest BuildModifyQuery() const override {
+ return TBase::Objects.BuildUpsertQuery(TObject::GetStorageTablePath());
+ }
+ virtual TString GetModifyType() const override {
+ return "upsert";
+ }
+public:
+ using TBase::TBase;
+};
+
+template <class TObject>
+class TUpdateObjectsActor: public TModifyObjectsActor<TObject> {
+private:
+ using TBase = TModifyObjectsActor<TObject>;
+protected:
+ virtual Ydb::Table::ExecuteDataQueryRequest BuildModifyQuery() const override {
+ return TBase::Objects.BuildUpdateQuery(TObject::GetStorageTablePath());
+ }
+ virtual TString GetModifyType() const override {
+ return "update";
+ }
+public:
+ using TBase::TBase;
+};
+
+template <class TObject>
+class TDeleteObjectsActor: public TModifyObjectsActor<TObject> {
+private:
+ using TBase = TModifyObjectsActor<TObject>;
+protected:
+ virtual Ydb::Table::ExecuteDataQueryRequest BuildModifyQuery() const override {
+ auto objectIds = TBase::Objects.SelectColumns(TObject::TDecoder::GetPKColumnIds());
+ return objectIds.BuildDeleteQuery(TObject::GetStorageTablePath());
+ }
+ virtual TString GetModifyType() const override {
+ return "delete";
+ }
+public:
+ using TBase::TBase;
+};
+
+template <class TObject>
+class TInsertObjectsActor: public TModifyObjectsActor<TObject> {
+private:
+ using TBase = TModifyObjectsActor<TObject>;
+protected:
+ virtual Ydb::Table::ExecuteDataQueryRequest BuildModifyQuery() const override {
+ return TBase::Objects.BuildInsertQuery(TObject::GetStorageTablePath());
+ }
+ virtual TString GetModifyType() const override {
+ return "insert";
+ }
+public:
+ using TBase::TBase;
+};
+
+}
diff --git a/ydb/services/metadata/manager/modification_controller.cpp b/ydb/services/metadata/manager/modification_controller.cpp
new file mode 100644
index 00000000000..ee8f24b7c8e
--- /dev/null
+++ b/ydb/services/metadata/manager/modification_controller.cpp
@@ -0,0 +1,5 @@
+#include "modification_controller.h"
+
+namespace NKikimr::NMetadataManager {
+
+}
diff --git a/ydb/services/metadata/manager/modification_controller.h b/ydb/services/metadata/manager/modification_controller.h
new file mode 100644
index 00000000000..f935924d6ee
--- /dev/null
+++ b/ydb/services/metadata/manager/modification_controller.h
@@ -0,0 +1,29 @@
+#pragma once
+#include "common.h"
+#include <ydb/library/accessor/accessor.h>
+
+namespace NKikimr::NMetadataManager {
+
+class IModificationObjectsController {
+public:
+ using TPtr = std::shared_ptr<IModificationObjectsController>;
+ virtual ~IModificationObjectsController() = default;
+ virtual void ModificationProblem(const TString& errorMessage) = 0;
+ virtual void ModificationFinished() = 0;
+};
+
+class TEvModificationFinished: public TEventLocal<TEvModificationFinished, EvModificationFinished> {
+public:
+};
+
+class TEvModificationProblem: public TEventLocal<TEvModificationProblem, EvModificationProblem> {
+private:
+ YDB_ACCESSOR_DEF(TString, ErrorMessage);
+public:
+ TEvModificationProblem(const TString& errorMessage)
+ : ErrorMessage(errorMessage) {
+
+ }
+};
+
+}
diff --git a/ydb/services/metadata/manager/object.cpp b/ydb/services/metadata/manager/object.cpp
new file mode 100644
index 00000000000..e981c14049a
--- /dev/null
+++ b/ydb/services/metadata/manager/object.cpp
@@ -0,0 +1,27 @@
+#include "object.h"
+
+namespace NKikimr::NMetadataManager {
+
+Ydb::Table::CreateTableRequest TBaseObject::AddHistoryTableScheme(const Ydb::Table::CreateTableRequest& baseScheme, const TString& tableName) {
+ Ydb::Table::CreateTableRequest result = baseScheme;
+ result.add_primary_key("historyInstant");
+ result.set_path(tableName);
+ {
+ auto& column = *result.add_columns();
+ column.set_name("historyAction");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *result.add_columns();
+ column.set_name("historyUserId");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *result.add_columns();
+ column.set_name("historyInstant");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT64);
+ }
+ return result;
+}
+
+}
diff --git a/ydb/services/metadata/manager/object.h b/ydb/services/metadata/manager/object.h
new file mode 100644
index 00000000000..ee54b389cc1
--- /dev/null
+++ b/ydb/services/metadata/manager/object.h
@@ -0,0 +1,24 @@
+#pragma once
+#include <ydb/public/api/protos/ydb_table.pb.h>
+
+namespace NKikimr::NMetadataManager {
+
+class TBaseObject {
+public:
+ static Ydb::Table::CreateTableRequest AddHistoryTableScheme(const Ydb::Table::CreateTableRequest& baseScheme, const TString& tableName);
+
+};
+
+template <class TDerived>
+class TObject: public TBaseObject {
+public:
+ static TString GetStorageHistoryTablePath() {
+ return TDerived::GetStorageTablePath() + "_history";
+ }
+
+ static Ydb::Table::CreateTableRequest AddHistoryTableScheme(const Ydb::Table::CreateTableRequest& baseScheme) {
+ return TBaseObject::AddHistoryTableScheme(baseScheme, TDerived::GetStorageHistoryTablePath());
+ }
+};
+
+}
diff --git a/ydb/services/metadata/manager/preparation_controller.cpp b/ydb/services/metadata/manager/preparation_controller.cpp
new file mode 100644
index 00000000000..84a3df1216e
--- /dev/null
+++ b/ydb/services/metadata/manager/preparation_controller.cpp
@@ -0,0 +1,5 @@
+#include "restore_controller.h"
+
+namespace NKikimr::NMetadataManager {
+
+}
diff --git a/ydb/services/metadata/manager/preparation_controller.h b/ydb/services/metadata/manager/preparation_controller.h
new file mode 100644
index 00000000000..658b5a6385f
--- /dev/null
+++ b/ydb/services/metadata/manager/preparation_controller.h
@@ -0,0 +1,40 @@
+#pragma once
+#include "common.h"
+
+#include <ydb/library/accessor/accessor.h>
+
+namespace NKikimr::NMetadataManager {
+
+template <class TObject>
+class IAlterPreparationController {
+public:
+ using TPtr = std::shared_ptr<IAlterPreparationController>;
+ virtual ~IAlterPreparationController() = default;
+
+ virtual void PreparationFinished(std::vector<TObject>&& objects) = 0;
+ virtual void PreparationProblem(const TString& errorMessage) = 0;
+};
+
+template <class TObject>
+class TEvAlterPreparationFinished: public TEventLocal<TEvAlterPreparationFinished<TObject>, EvAlterPreparationFinished> {
+private:
+ YDB_READONLY_DEF(std::vector<TObject>, Objects);
+public:
+ TEvAlterPreparationFinished(std::vector<TObject>&& objects)
+ : Objects(std::move(objects))
+ {
+
+ }
+};
+
+class TEvAlterPreparationProblem: public TEventLocal<TEvAlterPreparationProblem, EvAlterPreparationProblem> {
+private:
+ YDB_ACCESSOR_DEF(TString, ErrorMessage);
+public:
+ TEvAlterPreparationProblem(const TString& errorMessage)
+ : ErrorMessage(errorMessage) {
+
+ }
+};
+
+}
diff --git a/ydb/services/metadata/manager/restore.cpp b/ydb/services/metadata/manager/restore.cpp
new file mode 100644
index 00000000000..43fe115ea02
--- /dev/null
+++ b/ydb/services/metadata/manager/restore.cpp
@@ -0,0 +1,5 @@
+#include "restore.h"
+
+namespace NKikimr::NMetadataManager {
+
+}
diff --git a/ydb/services/metadata/manager/restore.h b/ydb/services/metadata/manager/restore.h
new file mode 100644
index 00000000000..6506e5d6292
--- /dev/null
+++ b/ydb/services/metadata/manager/restore.h
@@ -0,0 +1,76 @@
+#pragma once
+#include "table_record.h"
+
+#include <ydb/services/metadata/manager/restore_controller.h>
+#include <ydb/services/metadata/request/request_actor.h>
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+
+namespace NKikimr::NMetadataManager {
+
+template <class TObject>
+class TRestoreObjectsActor: public NActors::TActorBootstrapped<TRestoreObjectsActor<TObject>> {
+private:
+ using TBase = NActors::TActorBootstrapped<TRestoreObjectsActor<TObject>>;
+ using IRestoreObjectsController = IRestoreObjectsController<TObject>;
+ typename IRestoreObjectsController::TPtr Controller;
+ const TTableRecords ObjectIds;
+ TString SessionId;
+
+ void Handle(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogSelect>::TPtr& ev) {
+ auto g = TBase::PassAwayGuard();
+ const NInternal::NRequest::TDialogSelect::TResponse& result = ev->Get()->GetResult();
+ Ydb::Table::ExecuteQueryResult qResult;
+ result.operation().result().UnpackTo(&qResult);
+ Y_VERIFY((size_t)qResult.result_sets().size() == 1);
+
+ typename TObject::TDecoder decoder(qResult.result_sets()[0]);
+ std::vector<TObject> objects;
+ for (auto&& row : qResult.result_sets()[0].rows()) {
+ TObject object;
+ if (!object.DeserializeFromRecord(decoder, row)) {
+ Controller->RestoreProblem("cannot parse exists object");
+ return;
+ }
+ objects.emplace_back(std::move(object));
+ }
+ Controller->RestoreFinished(std::move(objects), qResult.tx_meta().id());
+ }
+
+ void Handle(NInternal::NRequest::TEvRequestFailed::TPtr& /*ev*/) {
+ auto g = TBase::PassAwayGuard();
+ Controller->RestoreProblem("cannot execute yql request");
+ }
+
+public:
+ TRestoreObjectsActor(const TTableRecords& objectIds, typename IRestoreObjectsController::TPtr controller, const TString& sessionId)
+ : Controller(controller)
+ , ObjectIds(objectIds)
+ , SessionId(sessionId)
+ {
+ Y_VERIFY(SessionId);
+ }
+
+ STATEFN(StateMain) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogSelect>, Handle);
+ hFunc(NInternal::NRequest::TEvRequestFailed, Handle);
+ default:
+ break;
+ }
+ }
+
+ void Bootstrap() {
+ if (ObjectIds.empty()) {
+ Controller->RestoreProblem("no objects for restore");
+ TBase::PassAway();
+ }
+ auto request = ObjectIds.BuildSelectQuery(TObject::GetStorageTablePath());
+ request.mutable_tx_control()->mutable_begin_tx()->mutable_serializable_read_write();
+ request.set_session_id(SessionId);
+ TBase::Become(&TRestoreObjectsActor::StateMain);
+ TBase::Register(new NInternal::NRequest::TYDBRequest<NInternal::NRequest::TDialogSelect>(request, TBase::SelfId()));
+ }
+};
+
+}
diff --git a/ydb/services/metadata/manager/restore_controller.cpp b/ydb/services/metadata/manager/restore_controller.cpp
new file mode 100644
index 00000000000..84a3df1216e
--- /dev/null
+++ b/ydb/services/metadata/manager/restore_controller.cpp
@@ -0,0 +1,5 @@
+#include "restore_controller.h"
+
+namespace NKikimr::NMetadataManager {
+
+}
diff --git a/ydb/services/metadata/manager/restore_controller.h b/ydb/services/metadata/manager/restore_controller.h
new file mode 100644
index 00000000000..0a81e6eeff2
--- /dev/null
+++ b/ydb/services/metadata/manager/restore_controller.h
@@ -0,0 +1,42 @@
+#pragma once
+#include "common.h"
+
+#include <ydb/library/accessor/accessor.h>
+
+namespace NKikimr::NMetadataManager {
+
+template <class TObject>
+class IRestoreObjectsController {
+public:
+ using TPtr = std::shared_ptr<IRestoreObjectsController>;
+ virtual ~IRestoreObjectsController() = default;
+
+ virtual void RestoreFinished(std::vector<TObject>&& objects, const TString& transactionId) = 0;
+ virtual void RestoreProblem(const TString& errorMessage) = 0;
+};
+
+template <class TObject>
+class TEvRestoreFinished: public TEventLocal<TEvRestoreFinished<TObject>, EvRestoreFinished> {
+private:
+ YDB_ACCESSOR_DEF(std::vector<TObject>, Objects);
+ YDB_READONLY_DEF(TString, TransactionId);
+public:
+ TEvRestoreFinished(std::vector<TObject>&& objects, const TString& transactionId)
+ : Objects(std::move(objects))
+ , TransactionId(transactionId)
+ {
+
+ }
+};
+
+class TEvRestoreProblem: public TEventLocal<TEvRestoreProblem, EvRestoreProblem> {
+private:
+ YDB_ACCESSOR_DEF(TString, ErrorMessage);
+public:
+ TEvRestoreProblem(const TString& errorMessage)
+ : ErrorMessage(errorMessage) {
+
+ }
+};
+
+}
diff --git a/ydb/services/metadata/manager/table_record.cpp b/ydb/services/metadata/manager/table_record.cpp
new file mode 100644
index 00000000000..fa2c9eb42d9
--- /dev/null
+++ b/ydb/services/metadata/manager/table_record.cpp
@@ -0,0 +1,294 @@
+#include "table_record.h"
+#include "ydb_value_operator.h"
+
+#include <ydb/core/protos/services.pb.h>
+
+#include <library/cpp/actors/core/log.h>
+#include <util/string/builder.h>
+#include <util/string/join.h>
+
+namespace NKikimr::NMetadataManager {
+
+bool TTableRecord::CompareColumns(const TTableRecord& item, const std::vector<TString>& columnIds) const {
+ for (auto&& i : columnIds) {
+ auto itSelf = Values.find(i);
+ auto itItem = item.Values.find(i);
+ if (itSelf == Values.end()) {
+ if (itItem == item.Values.end()) {
+ continue;
+ } else {
+ return false;
+ }
+ } else if (itItem == item.Values.end()) {
+ return false;
+ } else {
+ if (!TYDBValue::Compare(itSelf->second, itItem->second)) {
+ return false;
+ }
+ }
+ }
+ return true;
+}
+
+bool TTableRecord::HasColumns(const std::vector<TString>& columnIds) const {
+ for (auto&& i : columnIds) {
+ if (!Values.contains(i)) {
+ return false;
+ }
+ }
+ return true;
+}
+
+ui32 TTableRecord::CountIntersectColumns(const std::vector<TString>& columnIds) const {
+ ui32 result = 0;
+ for (auto&& i : columnIds) {
+ if (Values.contains(i)) {
+ ++result;
+ }
+ }
+ return result;
+}
+
+bool TTableRecord::TakeValuesFrom(const TTableRecord& item) {
+ for (auto&& i : item.Values) {
+ Values[i.first] = i.second;
+ }
+ return true;
+}
+
+const Ydb::Value* TTableRecord::GetValuePtr(const TString& columnId) const {
+ auto it = Values.find(columnId);
+ if (it == Values.end()) {
+ return nullptr;
+ }
+ return &it->second;
+}
+
+NKikimr::NMetadataManager::TTableRecord& TTableRecord::SetColumn(const TString& columnId, const Ydb::Value& v) {
+ Values[columnId] = v;
+ return *this;
+}
+
+Ydb::ResultSet TTableRecord::BuildRecordSet() const {
+ Ydb::ResultSet result;
+ Ydb::Value row;
+ for (auto&& i : Values) {
+ Ydb::Column column;
+ column.set_name(i.first);
+// column.set_type(i.second.type());
+ *result.add_columns() = column;
+ *row.add_items() = i.second;
+ }
+ *result.add_rows() = row;
+ return result;
+}
+
+bool TTableRecord::SameColumns(const TTableRecord& item) const {
+ if (Values.size() != item.Values.size()) {
+ return false;
+ }
+ auto itSelf = Values.begin();
+ auto itItem = item.Values.begin();
+ for (; itSelf != Values.end() && itItem != Values.end(); ++itSelf, ++itItem) {
+ if (itSelf->first != itItem->first) {
+ return false;
+ }
+ }
+ return true;
+}
+
+std::vector<Ydb::Column> TTableRecord::SelectOwnedColumns(const std::vector<Ydb::Column>& columns) const {
+ std::vector<Ydb::Column> result;
+ for (auto&& i : columns) {
+ if (Values.contains(i.name())) {
+ result.emplace_back(i);
+ }
+ }
+ return result;
+}
+
+ui32 TTableRecords::AddRecordImpl(const TTableRecord& record) {
+ Y_VERIFY(Columns.size());
+ ui32 foundColumns = 0;
+ Records.resize(Records.size() + 1);
+ for (ui32 i = 0; i < Columns.size(); ++i) {
+ const Ydb::Value* v = record.GetValuePtr(Columns[i].name());
+ if (v) {
+ if (!TYDBValue::IsSameType(*v, Columns[i].type())) {
+ ALS_ERROR(NKikimrServices::METADATA_MANAGER);
+ Y_VERIFY_DEBUG(false);
+ continue;
+ }
+ ++foundColumns;
+ *Records.back().add_items() = *v;
+ } else {
+ *Records.back().add_items() = TYDBValue::NullValue();
+ }
+ }
+ return foundColumns;
+}
+
+Ydb::TypedValue TTableRecords::BuildVariableTupleRecords() const {
+ Ydb::TypedValue result;
+ if (Columns.size() > 1) {
+ for (auto&& r : Records) {
+ *result.mutable_value()->add_items() = r;
+ }
+ auto* tuple = result.mutable_type()->mutable_list_type()->mutable_item()->mutable_tuple_type();
+ for (auto&& i : Columns) {
+ *tuple->add_elements() = i.type();
+ }
+ } else if (Columns.size() == 1) {
+ for (auto&& r : Records) {
+ *result.mutable_value()->add_items() = r.items()[0];
+ }
+ *result.mutable_type()->mutable_list_type()->mutable_item() = Columns[0].type();
+ }
+ return result;
+}
+
+Ydb::TypedValue TTableRecords::BuildVariableStructRecords() const {
+ Ydb::TypedValue result;
+ for (auto&& r : Records) {
+ *result.mutable_value()->add_items() = r;
+ }
+ auto* structDescr = result.mutable_type()->mutable_list_type()->mutable_item()->mutable_struct_type();
+ for (auto&& i : Columns) {
+ auto& m = *structDescr->add_members();
+ *m.mutable_type() = i.type();
+ m.set_name(i.name());
+ }
+ return result;
+}
+
+TString TTableRecords::BuildColumnsSchemaTuple() const {
+ TStringBuilder sb;
+ if (Columns.size() > 1) {
+ sb << "Tuple<";
+ std::vector<TString> types;
+ for (auto&& i : Columns) {
+ types.emplace_back(TYDBValue::TypeToString(i.type()));
+ }
+ sb << JoinSeq(", ", types) << ">";
+ } else if (Columns.size() == 1) {
+ sb << TYDBValue::TypeToString(Columns[0].type());
+ }
+ return sb;
+}
+
+TString TTableRecords::BuildColumnsSchemaStruct() const {
+ TStringBuilder sb;
+ sb << "Struct<";
+ std::vector<TString> types;
+ for (auto&& i : Columns) {
+ types.emplace_back(i.name() + ":" + TYDBValue::TypeToString(i.type()));
+ }
+ sb << JoinSeq(", ", types) << ">";
+ return sb;
+}
+
+std::vector<TString> TTableRecords::GetColumnIds() const {
+ std::vector<TString> result;
+ for (auto&& i : Columns) {
+ result.emplace_back(i.name());
+ }
+ return result;
+}
+
+Ydb::Table::ExecuteDataQueryRequest TTableRecords::BuildUpsertQuery(const TString& tablePath) const {
+ Ydb::Table::ExecuteDataQueryRequest result;
+ TStringBuilder sb;
+ sb << "DECLARE $objects AS List<" << BuildColumnsSchemaStruct() << ">;" << Endl;
+ sb << "UPSERT INTO `" + tablePath + "`" << Endl;
+ sb << "SELECT " << JoinSeq(",", GetColumnIds()) << " FROM AS_TABLE($objects)" << Endl;
+ Cerr << sb << Endl;
+ result.mutable_query()->set_yql_text(sb);
+ (*result.mutable_parameters())["$objects"] = BuildVariableStructRecords();
+ return result;
+}
+
+Ydb::Table::ExecuteDataQueryRequest TTableRecords::BuildInsertQuery(const TString& tablePath) const {
+ Ydb::Table::ExecuteDataQueryRequest result;
+ TStringBuilder sb;
+ sb << "DECLARE $objects AS List<" << BuildColumnsSchemaStruct() << ">;" << Endl;
+ sb << "INSERT INTO `" + tablePath + "`" << Endl;
+ sb << "SELECT " << JoinSeq(",", GetColumnIds()) << " FROM AS_TABLE($objects)" << Endl;
+ Cerr << sb << Endl;
+ result.mutable_query()->set_yql_text(sb);
+ (*result.mutable_parameters())["$objects"] = BuildVariableStructRecords();
+ return result;
+}
+
+Ydb::Table::ExecuteDataQueryRequest TTableRecords::BuildSelectQuery(const TString& tablePath) const {
+ Ydb::Table::ExecuteDataQueryRequest result;
+ TStringBuilder sb;
+ sb << "DECLARE $ids AS List<" << BuildColumnsSchemaTuple() << ">;" << Endl;
+ sb << "SELECT * FROM `" + tablePath + "`" << Endl;
+ if (GetColumnIds().size() > 1) {
+ sb << "WHERE (" << JoinSeq(", ", GetColumnIds()) << ") IN $ids" << Endl;
+ } else if (GetColumnIds().size() == 1) {
+ sb << "WHERE " << GetColumnIds()[0] << " IN $ids" << Endl;
+ }
+ Cerr << sb << Endl;
+ result.mutable_query()->set_yql_text(sb);
+ (*result.mutable_parameters())["$ids"] = BuildVariableTupleRecords();
+ return result;
+}
+
+Ydb::Table::ExecuteDataQueryRequest TTableRecords::BuildDeleteQuery(const TString& tablePath) const {
+ Ydb::Table::ExecuteDataQueryRequest result;
+ TStringBuilder sb;
+ sb << "DECLARE $ids AS List<" << BuildColumnsSchemaTuple() << ">;" << Endl;
+ sb << "DELETE FROM `" + tablePath + "`" << Endl;
+ sb << "WHERE (" << JoinSeq(", ", GetColumnIds()) << ") IN $ids" << Endl;
+ Cerr << sb << Endl;
+ result.mutable_query()->set_yql_text(sb);
+ (*result.mutable_parameters())["$ids"] = BuildVariableTupleRecords();
+ return result;
+}
+
+Ydb::Table::ExecuteDataQueryRequest TTableRecords::BuildUpdateQuery(const TString& tablePath) const {
+ Ydb::Table::ExecuteDataQueryRequest result;
+ TStringBuilder sb;
+ sb << "DECLARE $objects AS List<" << BuildColumnsSchemaStruct() << ">;" << Endl;
+ sb << "UPDATE `" + tablePath + "` ON" << Endl;
+ sb << "SELECT " << JoinSeq(",", GetColumnIds()) << " FROM AS_TABLE($objects)" << Endl;
+ Cerr << sb << Endl;
+ result.mutable_query()->set_yql_text(sb);
+ (*result.mutable_parameters())["$objects"] = BuildVariableStructRecords();
+ return result;
+}
+
+void TTableRecords::AddColumn(const Ydb::Column& c, const Ydb::Value& v) {
+ for (auto&& i : Columns) {
+ Y_VERIFY(i.name() != c.name());
+ }
+ Columns.emplace_back(c);
+ for (auto&& i : Records) {
+ *i.add_items() = v;
+ }
+}
+
+NKikimr::NMetadataManager::TTableRecords TTableRecords::SelectColumns(const std::vector<TString>& columnIds) const {
+ std::set<TString> columnIdsSet(columnIds.begin(), columnIds.end());
+ TTableRecords result;
+ std::vector<ui32> idxs;
+ ui32 idx = 0;
+ for (auto&& i : Columns) {
+ if (columnIdsSet.contains(i.name())) {
+ result.Columns.emplace_back(i);
+ idxs.emplace_back(idx);
+ }
+ ++idx;
+ }
+ result.Records.reserve(Records.size());
+ for (auto&& i : Records) {
+ result.Records.emplace_back(Ydb::Value());
+ for (auto&& idx : idxs) {
+ *result.Records.back().add_items() = i.items()[idx];
+ }
+ }
+ return result;
+}
+
+}
diff --git a/ydb/services/metadata/manager/table_record.h b/ydb/services/metadata/manager/table_record.h
new file mode 100644
index 00000000000..b9a04f8dc34
--- /dev/null
+++ b/ydb/services/metadata/manager/table_record.h
@@ -0,0 +1,91 @@
+#pragma once
+#include <ydb/public/api/protos/ydb_value.pb.h>
+#include <ydb/public/api/protos/ydb_table.pb.h>
+
+#include <ydb/library/accessor/accessor.h>
+
+namespace NKikimr::NMetadataManager {
+
+class TTableRecord {
+private:
+ using TValues = std::map<TString, Ydb::Value>;
+ YDB_READONLY_DEF(TValues, Values);
+public:
+ std::vector<Ydb::Column> SelectOwnedColumns(const std::vector<Ydb::Column>& columns) const;
+ Ydb::ResultSet BuildRecordSet() const;
+ ui32 GetColumnsCount() const {
+ return Values.size();
+ }
+ TTableRecord& SetColumn(const TString& columnId, const Ydb::Value& v);
+ bool CompareColumns(const TTableRecord& item, const std::vector<TString>& columnIds) const;
+ bool HasColumns(const std::vector<TString>& columnIds) const;
+ ui32 CountIntersectColumns(const std::vector<TString>& columnIds) const;
+ bool SameColumns(const TTableRecord& item) const;
+ bool TakeValuesFrom(const TTableRecord& item);
+ const Ydb::Value* GetValuePtr(const TString& columnId) const;
+};
+
+class TTableRecords {
+private:
+ YDB_READONLY_DEF(std::vector<Ydb::Column>, Columns);
+ std::vector<Ydb::Value> Records;
+
+ ui32 AddRecordImpl(const TTableRecord& record);
+ Ydb::TypedValue BuildVariableTupleRecords() const;
+ Ydb::TypedValue BuildVariableStructRecords() const;
+ TString BuildColumnsSchemaTuple() const;
+ TString BuildColumnsSchemaStruct() const;
+
+ std::vector<TString> GetColumnIds() const;
+
+ void PopRecord() {
+ Y_VERIFY(Records.size());
+ Records.pop_back();
+ }
+
+public:
+ TTableRecords SelectColumns(const std::vector<TString>& columnIds) const;
+
+ Ydb::Table::ExecuteDataQueryRequest BuildInsertQuery(const TString& tablePath) const;
+ Ydb::Table::ExecuteDataQueryRequest BuildUpsertQuery(const TString& tablePath) const;
+ Ydb::Table::ExecuteDataQueryRequest BuildSelectQuery(const TString& tablePath) const;
+ Ydb::Table::ExecuteDataQueryRequest BuildDeleteQuery(const TString& tablePath) const;
+ Ydb::Table::ExecuteDataQueryRequest BuildUpdateQuery(const TString& tablePath) const;
+
+ void AddColumn(const Ydb::Column& c, const Ydb::Value& v);
+
+ bool empty() const {
+ return Records.empty();
+ }
+
+ void InitColumns(const std::vector<Ydb::Column>& columns) {
+ Y_VERIFY(Columns.empty());
+ Columns = columns;
+ }
+ void ReserveRows(const ui32 rowsCount) {
+ Records.reserve(rowsCount);
+ }
+ bool AddRecordAllValues(const TTableRecord& record) {
+ if (AddRecordImpl(record) != record.GetValues().size()) {
+ PopRecord();
+ return false;
+ }
+ return true;
+ }
+ bool AddRecordNativeValues(const TTableRecord& record) {
+ if (AddRecordImpl(record) != Columns.size()) {
+ PopRecord();
+ return false;
+ }
+ return true;
+ }
+ bool AddRecordSomeValues(const TTableRecord& record) {
+ if (!AddRecordImpl(record)) {
+ PopRecord();
+ return false;
+ }
+ return true;
+ }
+};
+
+}
diff --git a/ydb/services/metadata/manager/ydb_value_operator.cpp b/ydb/services/metadata/manager/ydb_value_operator.cpp
new file mode 100644
index 00000000000..f1a4ebce542
--- /dev/null
+++ b/ydb/services/metadata/manager/ydb_value_operator.cpp
@@ -0,0 +1,132 @@
+#include "ydb_value_operator.h"
+
+namespace NKikimr::NMetadataManager {
+
+bool TYDBValue::IsSameType(const Ydb::Value& v, const Ydb::Type& type) {
+ Y_VERIFY(type.has_type_id());
+ if (type.type_id() == Ydb::Type::BOOL) {
+ return v.has_bool_value();
+ } else if (type.type_id() == Ydb::Type::INT32) {
+ return v.has_int32_value();
+ } else if (type.type_id() == Ydb::Type::UINT32) {
+ return v.has_uint32_value();
+ } else if (type.type_id() == Ydb::Type::INT64) {
+ return v.has_int64_value();
+ } else if (type.type_id() == Ydb::Type::UINT64) {
+ return v.has_uint64_value();
+ } else if (type.type_id() == Ydb::Type::STRING) {
+ return v.has_bytes_value();
+ } else if (type.type_id() == Ydb::Type::UTF8) {
+ return v.has_text_value();
+ }
+ Y_VERIFY(false);
+}
+
+bool TYDBValue::IsSameType(const Ydb::Value& l, const Ydb::Value& r) {
+ if (l.has_bool_value()) {
+ return r.has_bool_value();
+ }
+ if (l.has_bytes_value()) {
+ return r.has_bytes_value();
+ }
+ if (l.has_text_value()) {
+ return r.has_text_value();
+ }
+ Y_VERIFY(false);
+}
+
+bool TYDBValue::Compare(const Ydb::Value& l, const Ydb::Value& r) {
+ if (!IsSameType(l, r)) {
+ return false;
+ }
+ if (l.has_bool_value()) {
+ return l.bool_value() == r.bool_value();
+ }
+ if (l.has_bytes_value()) {
+ return l.bytes_value() == r.bytes_value();
+ }
+ if (l.has_text_value()) {
+ return l.text_value() == r.text_value();
+ }
+ Y_VERIFY(false);
+}
+
+TString TYDBValue::TypeToString(const Ydb::Type& type) {
+ Y_VERIFY(type.has_type_id());
+ if (type.type_id() == Ydb::Type::BOOL) {
+ return "Bool";
+ } else if (type.type_id() == Ydb::Type::INT32) {
+ return "Int32";
+ } else if (type.type_id() == Ydb::Type::UINT32) {
+ return "Uint32";
+ } else if (type.type_id() == Ydb::Type::INT64) {
+ return "Uint64";
+ } else if (type.type_id() == Ydb::Type::UINT64) {
+ return "Uint64";
+ } else if (type.type_id() == Ydb::Type::STRING) {
+ return "String";
+ } else if (type.type_id() == Ydb::Type::UTF8) {
+ return "Utf8";
+ } else {
+ Y_VERIFY(false);
+ }
+}
+
+Ydb::Value TYDBValue::NullValue() {
+ Ydb::Value result;
+ result.set_null_flag_value(::google::protobuf::NULL_VALUE);
+ return result;
+}
+
+Ydb::Value TYDBValue::Bytes(const TString& value) {
+ Ydb::Value result;
+ result.set_bytes_value(value);
+ return result;
+}
+
+Ydb::Value TYDBValue::Bytes(const TStringBuf& value) {
+ Ydb::Value result;
+ result.set_bytes_value(TString(value.data(), value.size()));
+ return result;
+}
+
+Ydb::Value TYDBValue::Bytes(const char* value) {
+ Ydb::Value result;
+ result.set_bytes_value(TString(value));
+ return result;
+}
+
+Ydb::Value TYDBValue::UInt64(const ui64 value) {
+ Ydb::Value result;
+ result.set_uint64_value(value);
+ return result;
+}
+
+Ydb::Value TYDBValue::UInt32(const ui32 value) {
+ Ydb::Value result;
+ result.set_uint32_value(value);
+ return result;
+}
+
+Ydb::Column TYDBColumn::Bytes(const TString& columnId) {
+ Ydb::Column result;
+ result.set_name(columnId);
+ result.mutable_type()->set_type_id(Ydb::Type::STRING);
+ return result;
+}
+
+Ydb::Column TYDBColumn::UInt64(const TString& columnId) {
+ Ydb::Column result;
+ result.set_name(columnId);
+ result.mutable_type()->set_type_id(Ydb::Type::UINT64);
+ return result;
+}
+
+Ydb::Column TYDBColumn::UInt32(const TString& columnId) {
+ Ydb::Column result;
+ result.set_name(columnId);
+ result.mutable_type()->set_type_id(Ydb::Type::UINT32);
+ return result;
+}
+
+}
diff --git a/ydb/services/metadata/manager/ydb_value_operator.h b/ydb/services/metadata/manager/ydb_value_operator.h
new file mode 100644
index 00000000000..0cdcd39c665
--- /dev/null
+++ b/ydb/services/metadata/manager/ydb_value_operator.h
@@ -0,0 +1,28 @@
+#pragma once
+#include <ydb/public/api/protos/ydb_table.pb.h>
+#include <ydb/public/api/protos/ydb_value.pb.h>
+
+namespace NKikimr::NMetadataManager {
+
+class TYDBColumn {
+public:
+ static Ydb::Column Bytes(const TString& columnId);
+ static Ydb::Column UInt64(const TString& columnId);
+ static Ydb::Column UInt32(const TString& columnId);
+};
+
+class TYDBValue {
+public:
+ static bool IsSameType(const Ydb::Value& l, const Ydb::Type& type);
+ static bool IsSameType(const Ydb::Value& l, const Ydb::Value& r);
+ static bool Compare(const Ydb::Value& l, const Ydb::Value& r);
+ static TString TypeToString(const Ydb::Type& type);
+ static Ydb::Value NullValue();
+ static Ydb::Value Bytes(const char* value);
+ static Ydb::Value Bytes(const TString& value);
+ static Ydb::Value Bytes(const TStringBuf& value);
+ static Ydb::Value UInt64(const ui64 value);
+ static Ydb::Value UInt32(const ui32 value);
+};
+
+}
diff --git a/ydb/services/metadata/request/request_actor.h b/ydb/services/metadata/request/request_actor.h
index 2da9ea8aa93..fee842d47b5 100644
--- a/ydb/services/metadata/request/request_actor.h
+++ b/ydb/services/metadata/request/request_actor.h
@@ -89,7 +89,7 @@ private:
TRequest ProtoRequest;
const NActors::TActorId ActorFinishId;
const NActors::TActorId ActorRestartId;
- const TConfig& Config;
+ const TConfig Config;
ui32 Retry = 0;
protected:
class TEvRequestInternalResult: public NActors::TEventLocal<TEvRequestInternalResult, TDialogPolicy::EvResultInternal> {
@@ -157,8 +157,14 @@ public:
: ProtoRequest(request)
, ActorFinishId(actorFinishId)
, ActorRestartId(actorRestartId)
- , Config(config)
- {
+ , Config(config) {
+
+ }
+
+ TYDBRequest(const TRequest& request, const NActors::TActorId actorCallbackId)
+ : ProtoRequest(request)
+ , ActorFinishId(actorCallbackId)
+ , ActorRestartId(actorCallbackId) {
}
};
@@ -167,7 +173,6 @@ template <class TDialogPolicy>
class TSessionedActorImpl: public NActors::TActorBootstrapped<TSessionedActorImpl<TDialogPolicy>> {
private:
ui32 Retry = 0;
- const TActorId FinishedActorId;
static_assert(!std::is_same<TDialogPolicy, TDialogCreateSession>());
using TBase = NActors::TActorBootstrapped<TSessionedActorImpl<TDialogPolicy>>;
@@ -226,6 +231,40 @@ public:
}
};
+class IQueryOutput {
+public:
+ using TPtr = std::shared_ptr<IQueryOutput>;
+ virtual ~IQueryOutput() = default;
+
+ virtual void OnReply(const TDialogYQLRequest::TResponse& response) = 0;
+};
+
+class TYQLQuerySessionedActor: public TSessionedActorImpl<TDialogYQLRequest> {
+private:
+ using TBase = TSessionedActorImpl<TDialogYQLRequest>;
+ const TString Query;
+ IQueryOutput::TPtr Output;
+protected:
+ virtual std::optional<TDialogYQLRequest::TRequest> OnSessionId(const TString& sessionId) override {
+ Ydb::Table::ExecuteDataQueryRequest request;
+ request.mutable_query()->set_yql_text(Query);
+ request.set_session_id(sessionId);
+ request.mutable_tx_control()->mutable_begin_tx()->mutable_snapshot_read_only();
+ return request;
+ }
+ virtual void OnResult(const TDialogYQLRequest::TResponse& response) override {
+ Output->OnReply(response);
+ }
+public:
+ TYQLQuerySessionedActor(const TString& query, const NInternal::NRequest::TConfig& config, IQueryOutput::TPtr output)
+ : TBase(config)
+ , Query(query)
+ , Output(output)
+ {
+
+ }
+};
+
using TSessionedActor = TSessionedActorImpl<TDialogYQLRequest>;
}
diff --git a/ydb/services/metadata/secret/CMakeLists.txt b/ydb/services/metadata/secret/CMakeLists.txt
new file mode 100644
index 00000000000..0de0feb0f0d
--- /dev/null
+++ b/ydb/services/metadata/secret/CMakeLists.txt
@@ -0,0 +1,42 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(services-metadata-secret)
+target_link_libraries(services-metadata-secret PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ ydb-core-base
+ core-grpc_services-local_rpc
+ core-grpc_services-base
+ ydb-core-grpc_services
+ services-metadata-request
+)
+target_sources(services-metadata-secret PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/secret/secret.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/secret/access.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/secret/snapshot.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/secret/initializer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/secret/fetcher.cpp
+)
+
+add_global_library_for(services-metadata-secret.global services-metadata-secret)
+target_link_libraries(services-metadata-secret.global PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ ydb-core-base
+ core-grpc_services-local_rpc
+ core-grpc_services-base
+ ydb-core-grpc_services
+ services-metadata-request
+)
+target_sources(services-metadata-secret.global PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/secret/manager.cpp
+)
diff --git a/ydb/services/metadata/secret/access.cpp b/ydb/services/metadata/secret/access.cpp
new file mode 100644
index 00000000000..4bd991c29e7
--- /dev/null
+++ b/ydb/services/metadata/secret/access.cpp
@@ -0,0 +1,87 @@
+#include "access.h"
+#include <ydb/core/base/appdata.h>
+#include <ydb/services/metadata/manager/ydb_value_operator.h>
+
+namespace NKikimr::NMetadata::NSecret {
+
+TString TAccess::GetStorageTablePath() {
+ return "/" + AppData()->TenantName + "/.metadata/secret_access";
+}
+
+bool TAccess::DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& rawValue) {
+ if (!decoder.Read(decoder.GetOwnerUserIdIdx(), OwnerUserId, rawValue)) {
+ return false;
+ }
+ if (!decoder.Read(decoder.GetSecretIdIdx(), SecretId, rawValue)) {
+ return false;
+ }
+ if (!decoder.Read(decoder.GetAccessUserIdIdx(), AccessUserId, rawValue)) {
+ return false;
+ }
+ return true;
+}
+
+NMetadataManager::TTableRecord TAccess::SerializeToRecord() const {
+ NMetadataManager::TTableRecord result;
+ result.SetColumn(TDecoder::OwnerUserId, NMetadataManager::TYDBValue::Bytes(OwnerUserId));
+ result.SetColumn(TDecoder::SecretId, NMetadataManager::TYDBValue::Bytes(SecretId));
+ result.SetColumn(TDecoder::AccessUserId, NMetadataManager::TYDBValue::Bytes(AccessUserId));
+ return result;
+}
+
+void TAccess::AlteringPreparation(std::vector<TAccess>&& objects,
+ NMetadataManager::IAlterPreparationController<TAccess>::TPtr controller,
+ const NMetadata::IOperationsManager::TModificationContext& context) {
+ if (!!context.GetUserToken()) {
+ for (auto&& i : objects) {
+ if (i.GetOwnerUserId() != context.GetUserToken()->GetUserSID()) {
+ controller->PreparationProblem("no permissions for modify secret access");
+ return;
+ }
+ }
+ }
+ controller->PreparationFinished(std::move(objects));
+}
+
+NMetadata::TOperationParsingResult TAccess::BuildPatchFromSettings(const NYql::TObjectSettingsImpl& settings,
+ const NMetadata::IOperationsManager::TModificationContext& context)
+{
+ NKikimr::NMetadataManager::TTableRecord result;
+ TStringBuf sb(settings.GetObjectId().data(), settings.GetObjectId().size());
+ TStringBuf l;
+ TStringBuf r;
+ if (!sb.TrySplit('/', l, r)) {
+ return "incorrect objectId format (secretId/accessUserId)";
+ }
+ result.SetColumn(TDecoder::SecretId, NMetadataManager::TYDBValue::Bytes(l));
+ result.SetColumn(TDecoder::AccessUserId, NMetadataManager::TYDBValue::Bytes(r));
+ if (!context.GetUserToken()) {
+ auto it = settings.GetFeatures().find(TDecoder::OwnerUserId);
+ if (it != settings.GetFeatures().end()) {
+ result.SetColumn(TDecoder::OwnerUserId, NMetadataManager::TYDBValue::Bytes(it->second));
+ } else {
+ return "OwnerUserId not defined";
+ }
+ } else {
+ result.SetColumn(TDecoder::OwnerUserId, NMetadataManager::TYDBValue::Bytes(context.GetUserToken()->GetUserSID()));
+ }
+ return result;
+}
+
+std::vector<Ydb::Column> TAccess::TDecoder::GetColumns() {
+ return {
+ NMetadataManager::TYDBColumn::Bytes(OwnerUserId),
+ NMetadataManager::TYDBColumn::Bytes(SecretId),
+ NMetadataManager::TYDBColumn::Bytes(AccessUserId)
+ };
+}
+
+std::vector<Ydb::Column> TAccess::TDecoder::GetPKColumns() {
+ return GetColumns();
+}
+
+std::vector<TString> TAccess::TDecoder::GetPKColumnIds() {
+ return { OwnerUserId, SecretId, AccessUserId };
+}
+
+}
diff --git a/ydb/services/metadata/secret/access.h b/ydb/services/metadata/secret/access.h
new file mode 100644
index 00000000000..2ab5882afb5
--- /dev/null
+++ b/ydb/services/metadata/secret/access.h
@@ -0,0 +1,55 @@
+#pragma once
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/core/base/appdata.h>
+
+#include <ydb/services/metadata/abstract/decoder.h>
+#include <ydb/services/metadata/abstract/manager.h>
+#include <ydb/services/metadata/manager/object.h>
+#include <ydb/services/metadata/manager/preparation_controller.h>
+
+namespace NKikimr::NMetadata::NSecret {
+
+class TAccess: public NMetadataManager::TObject<TAccess> {
+private:
+ using TBase = NMetadataManager::TObject<TAccess>;
+ YDB_ACCESSOR_DEF(TString, OwnerUserId);
+ YDB_ACCESSOR_DEF(TString, SecretId);
+ YDB_ACCESSOR_DEF(TString, AccessUserId);
+public:
+ class TDecoder: public NInternal::TDecoderBase {
+ private:
+ YDB_ACCESSOR(i32, OwnerUserIdIdx, -1);
+ YDB_ACCESSOR(i32, SecretIdIdx, -1);
+ YDB_ACCESSOR(i32, AccessUserIdIdx, -1);
+ public:
+ static inline const TString OwnerUserId = "ownerUserId";
+ static inline const TString SecretId = "secretId";
+ static inline const TString AccessUserId = "accessUserId";
+ static std::vector<TString> GetPKColumnIds();
+ static std::vector<Ydb::Column> GetPKColumns();
+ static std::vector<Ydb::Column> GetColumns();
+
+ TDecoder(const Ydb::ResultSet& rawData) {
+ OwnerUserIdIdx = GetFieldIndex(rawData, OwnerUserId);
+ SecretIdIdx = GetFieldIndex(rawData, SecretId);
+ AccessUserIdIdx = GetFieldIndex(rawData, AccessUserId);
+ }
+ };
+
+ using TBase::TBase;
+
+ static void AlteringPreparation(std::vector<TAccess>&& objects,
+ NMetadataManager::IAlterPreparationController<TAccess>::TPtr controller,
+ const NMetadata::IOperationsManager::TModificationContext& context);
+ static TString GetStorageTablePath();
+ bool DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& rawValue);
+ NMetadataManager::TTableRecord SerializeToRecord() const;
+
+ static NMetadata::TOperationParsingResult BuildPatchFromSettings(const NYql::TObjectSettingsImpl& settings,
+ const NMetadata::IOperationsManager::TModificationContext& context);
+ static TString GetTypeId() {
+ return "SECRET_ACCESS";
+ }
+};
+
+}
diff --git a/ydb/services/metadata/secret/fetcher.cpp b/ydb/services/metadata/secret/fetcher.cpp
new file mode 100644
index 00000000000..1c4d3ec3804
--- /dev/null
+++ b/ydb/services/metadata/secret/fetcher.cpp
@@ -0,0 +1,13 @@
+#include "fetcher.h"
+#include "manager.h"
+
+namespace NKikimr::NMetadata::NSecret {
+
+std::vector<IOperationsManager::TPtr> TManager::DoGetManagers() const {
+ return {
+ std::make_shared<TSecretManager>(),
+ std::make_shared<TAccessManager>(),
+ };
+}
+
+}
diff --git a/ydb/services/metadata/secret/fetcher.h b/ydb/services/metadata/secret/fetcher.h
new file mode 100644
index 00000000000..66a3903898b
--- /dev/null
+++ b/ydb/services/metadata/secret/fetcher.h
@@ -0,0 +1,15 @@
+#pragma once
+
+#include "snapshot.h"
+#include <ydb/services/metadata/abstract/common.h>
+#include <ydb/library/accessor/accessor.h>
+
+namespace NKikimr::NMetadata::NSecret {
+
+class TManager: public NMetadataProvider::TSnapshotsManager<TSnapshot> {
+protected:
+ virtual std::vector<IOperationsManager::TPtr> DoGetManagers() const override;
+public:
+};
+
+}
diff --git a/ydb/services/metadata/secret/initializer.cpp b/ydb/services/metadata/secret/initializer.cpp
new file mode 100644
index 00000000000..d92c03727ec
--- /dev/null
+++ b/ydb/services/metadata/secret/initializer.cpp
@@ -0,0 +1,68 @@
+#include "initializer.h"
+#include "secret.h"
+#include "access.h"
+
+namespace NKikimr::NMetadata::NSecret {
+
+void TSecretInitializer::DoPrepare(NMetadataInitializer::IInitializerInput::TPtr controller) const {
+ TVector<NMetadataInitializer::ITableModifier::TPtr> result;
+ {
+ Ydb::Table::CreateTableRequest request;
+ request.set_session_id("");
+ request.set_path(TSecret::GetStorageTablePath());
+ request.add_primary_key(TSecret::TDecoder::OwnerUserId);
+ request.add_primary_key(TSecret::TDecoder::SecretId);
+ {
+ auto& column = *request.add_columns();
+ column.set_name(TSecret::TDecoder::OwnerUserId);
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name(TSecret::TDecoder::SecretId);
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name(TSecret::TDecoder::Value);
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ result.emplace_back(new NMetadataInitializer::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request, "create_secret"));
+ auto hRequest = TSecret::AddHistoryTableScheme(request);
+ result.emplace_back(new NMetadataInitializer::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(hRequest, "create_secret_history"));
+ }
+ controller->PreparationFinished(result);
+}
+
+void TAccessInitializer::DoPrepare(NMetadataInitializer::IInitializerInput::TPtr controller) const {
+ TVector<NMetadataInitializer::ITableModifier::TPtr> result;
+ {
+ Ydb::Table::CreateTableRequest request;
+ request.set_session_id("");
+ request.set_path(TAccess::GetStorageTablePath());
+ request.add_primary_key(TAccess::TDecoder::OwnerUserId);
+ request.add_primary_key(TAccess::TDecoder::SecretId);
+ request.add_primary_key(TAccess::TDecoder::AccessUserId);
+ {
+ auto& column = *request.add_columns();
+ column.set_name(TAccess::TDecoder::OwnerUserId);
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name(TAccess::TDecoder::SecretId);
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name(TAccess::TDecoder::AccessUserId);
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ result.emplace_back(new NMetadataInitializer::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request, "create_access"));
+ auto hRequest = TAccess::AddHistoryTableScheme(request);
+ result.emplace_back(new NMetadataInitializer::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(hRequest, "create_access_history"));
+ }
+ controller->PreparationFinished(result);
+}
+
+}
diff --git a/ydb/services/metadata/secret/initializer.h b/ydb/services/metadata/secret/initializer.h
new file mode 100644
index 00000000000..f988388214e
--- /dev/null
+++ b/ydb/services/metadata/secret/initializer.h
@@ -0,0 +1,20 @@
+#pragma once
+
+#include <ydb/services/metadata/abstract/common.h>
+#include <ydb/library/accessor/accessor.h>
+
+namespace NKikimr::NMetadata::NSecret {
+
+class TSecretInitializer: public IInitializationBehaviour {
+protected:
+ virtual void DoPrepare(NMetadataInitializer::IInitializerInput::TPtr controller) const override;
+public:
+};
+
+class TAccessInitializer: public IInitializationBehaviour {
+protected:
+ virtual void DoPrepare(NMetadataInitializer::IInitializerInput::TPtr controller) const override;
+public:
+};
+
+}
diff --git a/ydb/services/metadata/secret/manager.cpp b/ydb/services/metadata/secret/manager.cpp
new file mode 100644
index 00000000000..177a2ea17db
--- /dev/null
+++ b/ydb/services/metadata/secret/manager.cpp
@@ -0,0 +1,16 @@
+#include "manager.h"
+#include "initializer.h"
+namespace NKikimr::NMetadata::NSecret {
+
+TSecretManager::TFactory::TRegistrator<TSecretManager> TSecretManager::Registrator(TSecretManager::GetTypeIdStatic());
+TAccessManager::TFactory::TRegistrator<TAccessManager> TAccessManager::Registrator(TAccessManager::GetTypeIdStatic());
+
+IInitializationBehaviour::TPtr TAccessManager::DoGetInitializationBehaviour() const {
+ return std::make_shared<TAccessInitializer>();
+}
+
+IInitializationBehaviour::TPtr TSecretManager::DoGetInitializationBehaviour() const {
+ return std::make_shared<TSecretInitializer>();
+}
+
+}
diff --git a/ydb/services/metadata/secret/manager.h b/ydb/services/metadata/secret/manager.h
new file mode 100644
index 00000000000..7b8f76e19b0
--- /dev/null
+++ b/ydb/services/metadata/secret/manager.h
@@ -0,0 +1,25 @@
+#pragma once
+#include "snapshot.h"
+#include <ydb/services/metadata/abstract/common.h>
+#include <ydb/services/metadata/manager/generic_manager.h>
+#include <ydb/library/accessor/accessor.h>
+
+namespace NKikimr::NMetadata::NSecret {
+
+class TSecretManager: public TGenericOperationsManager<TSecret> {
+private:
+ static TFactory::TRegistrator<TSecretManager> Registrator;
+protected:
+ virtual IInitializationBehaviour::TPtr DoGetInitializationBehaviour() const override;
+public:
+};
+
+class TAccessManager: public TGenericOperationsManager<TAccess> {
+private:
+ static TFactory::TRegistrator<TAccessManager> Registrator;
+protected:
+ virtual IInitializationBehaviour::TPtr DoGetInitializationBehaviour() const override;
+public:
+};
+
+}
diff --git a/ydb/services/metadata/secret/secret.cpp b/ydb/services/metadata/secret/secret.cpp
new file mode 100644
index 00000000000..ccd45b426e2
--- /dev/null
+++ b/ydb/services/metadata/secret/secret.cpp
@@ -0,0 +1,90 @@
+#include "secret.h"
+#include <ydb/core/base/appdata.h>
+#include <ydb/services/metadata/manager/ydb_value_operator.h>
+
+namespace NKikimr::NMetadata::NSecret {
+
+TString TSecret::GetStorageTablePath() {
+ return "/" + AppData()->TenantName + "/.metadata/secrets";
+}
+
+bool TSecret::DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& rawValue) {
+ if (!decoder.Read(decoder.GetOwnerUserIdIdx(), OwnerUserId, rawValue)) {
+ return false;
+ }
+ if (!decoder.Read(decoder.GetSecretIdIdx(), SecretId, rawValue)) {
+ return false;
+ }
+ if (!decoder.Read(decoder.GetValueIdx(), Value, rawValue)) {
+ return false;
+ }
+ return true;
+}
+
+NMetadataManager::TTableRecord TSecret::SerializeToRecord() const {
+ NMetadataManager::TTableRecord result;
+ result.SetColumn(TDecoder::OwnerUserId, NMetadataManager::TYDBValue::Bytes(OwnerUserId));
+ result.SetColumn(TDecoder::SecretId, NMetadataManager::TYDBValue::Bytes(SecretId));
+ result.SetColumn(TDecoder::Value, NMetadataManager::TYDBValue::Bytes(Value));
+ return result;
+}
+
+void TSecret::AlteringPreparation(std::vector<TSecret>&& objects,
+ NMetadataManager::IAlterPreparationController<TSecret>::TPtr controller,
+ const NMetadata::IOperationsManager::TModificationContext& context) {
+ if (!!context.GetUserToken()) {
+ for (auto&& i : objects) {
+ if (i.GetOwnerUserId() != context.GetUserToken()->GetUserSID()) {
+ controller->PreparationProblem("no permissions for modify secrets");
+ return;
+ }
+ }
+ }
+ controller->PreparationFinished(std::move(objects));
+}
+
+NMetadata::TOperationParsingResult TSecret::BuildPatchFromSettings(const NYql::TObjectSettingsImpl& settings,
+ const NMetadata::IOperationsManager::TModificationContext& context) {
+ NKikimr::NMetadataManager::TTableRecord result;
+ if (!context.GetUserToken()) {
+ auto it = settings.GetFeatures().find(TDecoder::OwnerUserId);
+ if (it != settings.GetFeatures().end()) {
+ result.SetColumn(TDecoder::OwnerUserId, NMetadataManager::TYDBValue::Bytes(it->second));
+ } else {
+ return "OwnerUserId not defined";
+ }
+ } else {
+ result.SetColumn(TDecoder::OwnerUserId, NMetadataManager::TYDBValue::Bytes(context.GetUserToken()->GetUserSID()));
+ }
+ {
+ result.SetColumn(TDecoder::SecretId, NMetadataManager::TYDBValue::Bytes(settings.GetObjectId()));
+ }
+ {
+ auto it = settings.GetFeatures().find(TDecoder::Value);
+ if (it != settings.GetFeatures().end()) {
+ result.SetColumn(TDecoder::Value, NMetadataManager::TYDBValue::Bytes(it->second));
+ }
+ }
+ return result;
+}
+
+std::vector<Ydb::Column> TSecret::TDecoder::GetColumns() {
+ return {
+ NMetadataManager::TYDBColumn::Bytes(OwnerUserId),
+ NMetadataManager::TYDBColumn::Bytes(SecretId),
+ NMetadataManager::TYDBColumn::Bytes(Value)
+ };
+}
+
+std::vector<Ydb::Column> TSecret::TDecoder::GetPKColumns() {
+ return {
+ NMetadataManager::TYDBColumn::Bytes(OwnerUserId),
+ NMetadataManager::TYDBColumn::Bytes(SecretId)
+ };
+}
+
+std::vector<TString> TSecret::TDecoder::GetPKColumnIds() {
+ return { OwnerUserId, SecretId };
+}
+
+}
diff --git a/ydb/services/metadata/secret/secret.h b/ydb/services/metadata/secret/secret.h
new file mode 100644
index 00000000000..ca6b653f323
--- /dev/null
+++ b/ydb/services/metadata/secret/secret.h
@@ -0,0 +1,70 @@
+#pragma once
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/core/base/appdata.h>
+
+#include <ydb/services/metadata/abstract/decoder.h>
+#include <ydb/services/metadata/abstract/manager.h>
+#include <ydb/services/metadata/manager/object.h>
+#include <ydb/services/metadata/manager/preparation_controller.h>
+
+namespace NKikimr::NMetadata::NSecret {
+
+class TSecretId {
+private:
+ YDB_READONLY_PROTECT_DEF(TString, OwnerUserId);
+ YDB_READONLY_PROTECT_DEF(TString, SecretId);
+public:
+ TSecretId() = default;
+ TSecretId(const TString& ownerUserId, const TString& secretId)
+ : OwnerUserId(ownerUserId)
+ , SecretId(secretId)
+ {
+ }
+
+ bool operator<(const TSecretId& item) const {
+ return std::tie(OwnerUserId, SecretId) < std::tie(item.OwnerUserId, item.SecretId);
+ }
+};
+
+class TSecret: public TSecretId, public NMetadataManager::TObject<TSecret> {
+private:
+ using TBase = TSecretId;
+ YDB_ACCESSOR_DEF(TString, Value);
+public:
+ class TDecoder: public NInternal::TDecoderBase {
+ private:
+ YDB_ACCESSOR(i32, OwnerUserIdIdx, -1);
+ YDB_ACCESSOR(i32, SecretIdIdx, -1);
+ YDB_ACCESSOR(i32, ValueIdx, -1);
+ public:
+ static inline const TString OwnerUserId = "ownerUserId";
+ static inline const TString SecretId = "secretId";
+ static inline const TString Value = "value";
+ static std::vector<TString> GetPKColumnIds();
+ static std::vector<Ydb::Column> GetPKColumns();
+ static std::vector<Ydb::Column> GetColumns();
+
+ TDecoder(const Ydb::ResultSet& rawData) {
+ OwnerUserIdIdx = GetFieldIndex(rawData, OwnerUserId);
+ SecretIdIdx = GetFieldIndex(rawData, SecretId);
+ ValueIdx = GetFieldIndex(rawData, Value);
+ }
+ };
+
+ using TBase::TBase;
+
+ static void AlteringPreparation(std::vector<TSecret>&& objects,
+ NMetadataManager::IAlterPreparationController<TSecret>::TPtr controller,
+ const NMetadata::IOperationsManager::TModificationContext& context);
+ static TString GetStorageTablePath();
+ bool DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& rawValue);
+ NMetadataManager::TTableRecord SerializeToRecord() const;
+ static NMetadata::TOperationParsingResult BuildPatchFromSettings(const NYql::TObjectSettingsImpl& settings,
+ const NMetadata::IOperationsManager::TModificationContext& context);
+ static TString GetTypeId() {
+ return "SECRET";
+ }
+
+};
+
+}
diff --git a/ydb/services/metadata/secret/snapshot.cpp b/ydb/services/metadata/secret/snapshot.cpp
new file mode 100644
index 00000000000..f97ba3a3bf4
--- /dev/null
+++ b/ydb/services/metadata/secret/snapshot.cpp
@@ -0,0 +1,47 @@
+#include "snapshot.h"
+
+namespace NKikimr::NMetadata::NSecret {
+
+bool TSnapshot::DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawDataResult) {
+ Y_VERIFY(rawDataResult.result_sets().size() == 2);
+ {
+ auto& rawData = rawDataResult.result_sets()[0];
+ TSecret::TDecoder decoder(rawData);
+ for (auto&& r : rawData.rows()) {
+ TSecret object;
+ if (!object.DeserializeFromRecord(decoder, r)) {
+ ALS_ERROR(NKikimrServices::METADATA_SECRET) << "cannot parse secret info for snapshot";
+ continue;
+ }
+ Secrets.emplace(object, object);
+ }
+ }
+ {
+ auto& rawData = rawDataResult.result_sets()[1];
+ TAccess::TDecoder decoder(rawData);
+ for (auto&& r : rawData.rows()) {
+ TAccess object;
+ if (!object.DeserializeFromRecord(decoder, r)) {
+ ALS_ERROR(NKikimrServices::METADATA_SECRET) << "cannot parse secret info for snapshot";
+ continue;
+ }
+ Access.emplace_back(object);
+ }
+ }
+ return true;
+}
+
+TString TSnapshot::DoSerializeToString() const {
+ TStringBuilder sb;
+ sb << "SECRETS:";
+ for (auto&& i : Secrets) {
+ sb << i.first.GetOwnerUserId() << ":" << i.first.GetSecretId() << ":" << i.second.GetValue() << ";";
+ }
+ sb << "ACCESS:";
+ for (auto&& i : Access) {
+ sb << i.GetOwnerUserId() << ":" << i.GetSecretId() << ":" << i.GetAccessUserId() << ";";
+ }
+ return sb;
+}
+
+}
diff --git a/ydb/services/metadata/secret/snapshot.h b/ydb/services/metadata/secret/snapshot.h
new file mode 100644
index 00000000000..c25ea23600f
--- /dev/null
+++ b/ydb/services/metadata/secret/snapshot.h
@@ -0,0 +1,23 @@
+#pragma once
+#include "secret.h"
+#include "access.h"
+
+#include <ydb/services/metadata/abstract/common.h>
+#include <ydb/library/accessor/accessor.h>
+
+namespace NKikimr::NMetadata::NSecret {
+
+class TSnapshot: public NMetadataProvider::ISnapshot {
+private:
+ using TBase = NMetadataProvider::ISnapshot;
+ using TSecrets = std::map<TSecretId, TSecret>;
+ YDB_READONLY_DEF(TSecrets, Secrets);
+ YDB_READONLY_DEF(std::vector<TAccess>, Access);
+protected:
+ virtual bool DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawData) override;
+ virtual TString DoSerializeToString() const override;
+public:
+ using TBase::TBase;
+};
+
+}
diff --git a/ydb/services/metadata/secret/ut/ut_secret.cpp b/ydb/services/metadata/secret/ut/ut_secret.cpp
new file mode 100644
index 00000000000..05895729c54
--- /dev/null
+++ b/ydb/services/metadata/secret/ut/ut_secret.cpp
@@ -0,0 +1,213 @@
+#include <ydb/core/cms/console/configs_dispatcher.h>
+#include <ydb/core/testlib/cs_helper.h>
+#include <ydb/core/tx/tiering/external_data.h>
+#include <ydb/core/tx/schemeshard/schemeshard.h>
+#include <ydb/core/tx/tx_proxy/proxy.h>
+#include <ydb/core/wrappers/ut_helpers/s3_mock.h>
+#include <ydb/core/wrappers/s3_wrapper.h>
+#include <ydb/core/wrappers/fake_storage.h>
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+#include <ydb/services/metadata/abstract/common.h>
+#include <ydb/services/metadata/manager/alter.h>
+#include <ydb/services/metadata/manager/common.h>
+#include <ydb/services/metadata/manager/table_record.h>
+#include <ydb/services/metadata/manager/ydb_value_operator.h>
+#include <ydb/services/metadata/secret/manager.h>
+#include <ydb/services/metadata/secret/fetcher.h>
+#include <ydb/services/metadata/secret/snapshot.h>
+#include <ydb/services/metadata/service.h>
+
+#include <library/cpp/actors/core/av_bootstrapped.h>
+#include <library/cpp/protobuf/json/proto2json.h>
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/system/hostname.h>
+
+namespace NKikimr {
+
+using namespace NColumnShard;
+
+Y_UNIT_TEST_SUITE(Secret) {
+
+ class TJsonChecker {
+ private:
+ YDB_ACCESSOR_DEF(TString, Path);
+ YDB_ACCESSOR_DEF(TString, Expectation);
+ public:
+ TJsonChecker(const TString& path, const TString& expectation)
+ : Path(path)
+ , Expectation(expectation)
+ {
+
+ }
+ bool Check(const NJson::TJsonValue& jsonInfo) const {
+ auto* jsonPathValue = jsonInfo.GetValueByPath(Path);
+ if (!jsonPathValue) {
+ return Expectation == "__NULL";
+ }
+ return jsonPathValue->GetStringRobust() == Expectation;
+ }
+
+ TString GetDebugString() const {
+ TStringBuilder sb;
+ sb << "path=" << Path << ";"
+ << "expectation=" << Expectation << ";";
+ return sb;
+ }
+ };
+
+ class TSecretUserEmulator: public NActors::TActorBootstrapped<TSecretUserEmulator> {
+ private:
+ using TBase = NActors::TActorBootstrapped<TSecretUserEmulator>;
+ std::shared_ptr<NMetadata::NSecret::TManager> Manager = std::make_shared<NMetadata::NSecret::TManager>();
+ YDB_READONLY_FLAG(Found, false);
+ YDB_READONLY(TInstant, Start, Now());
+ YDB_ACCESSOR(ui32, ExpectedSecretsCount, 1);
+ YDB_ACCESSOR(ui32, ExpectedAccessCount, 1);
+ using TKeyCheckers = TMap<NMetadata::NSecret::TSecretId, TJsonChecker>;
+ YDB_ACCESSOR_DEF(TKeyCheckers, Checkers);
+ public:
+
+ void ResetConditions() {
+ FoundFlag = false;
+ Checkers.clear();
+ }
+
+ STATEFN(StateInit) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(NMetadataProvider::TEvRefreshSubscriberData, Handle);
+ default:
+ Y_VERIFY(false);
+ }
+ }
+
+ void CheckRuntime(TTestActorRuntime& runtime) {
+ const auto pred = [this](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event)->TTestActorRuntimeBase::EEventAction {
+ if (event->HasBuffer() && !event->HasEvent()) {
+ } else if (!event->GetBase()) {
+ } else {
+ auto ptr = dynamic_cast<NMetadataProvider::TEvRefreshSubscriberData*>(event->GetBase());
+ if (ptr) {
+ CheckFound(ptr);
+ }
+ }
+ return TTestActorRuntimeBase::EEventAction::PROCESS;
+ };
+
+ runtime.SetObserverFunc(pred);
+
+ for (const TInstant start = Now(); !IsFound() && Now() - start < TDuration::Seconds(10); ) {
+ runtime.SimulateSleep(TDuration::Seconds(1));
+ }
+ runtime.SetObserverFunc(TTestActorRuntime::DefaultObserverFunc);
+ Y_VERIFY(IsFound());
+ }
+
+ void CheckFound(NMetadataProvider::TEvRefreshSubscriberData* event) {
+ auto snapshot = event->GetSnapshotAs<NMetadata::NSecret::TSnapshot>();
+ Y_VERIFY(!!snapshot);
+ if (ExpectedSecretsCount) {
+ if (snapshot->GetSecrets().size() != ExpectedSecretsCount) {
+ Cerr << "snapshot->GetSecrets().size() incorrect: " << snapshot->SerializeToString() << Endl;
+ return;
+ }
+ } else if (snapshot->GetSecrets().size()) {
+ Cerr << "snapshot->GetSecrets().size() incorrect (zero expects): " << snapshot->SerializeToString() << Endl;
+ return;
+ }
+ if (ExpectedAccessCount) {
+ if (snapshot->GetAccess().size() != ExpectedAccessCount) {
+ Cerr << "snapshot->GetAccess().size() incorrect: " << snapshot->SerializeToString() << Endl;
+ return;
+ }
+ } else if (snapshot->GetAccess().size()) {
+ Cerr << "snapshot->GetAccess().size() incorrect (zero expects): " << snapshot->SerializeToString() << Endl;
+ return;
+ }
+ FoundFlag = true;
+ }
+
+ void Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev) {
+ CheckFound(ev->Get());
+ }
+
+ void Bootstrap() {
+ auto manager = std::make_shared<NMetadata::NSecret::TManager>();
+ Become(&TThis::StateInit);
+ Sender<NMetadataProvider::TEvSubscribeExternal>(manager).SendTo(NMetadataProvider::MakeServiceId(SelfId().NodeId()));
+ Start = Now();
+ }
+ };
+
+ Y_UNIT_TEST(Simple) {
+ TPortManager pm;
+
+ ui32 grpcPort = pm.GetPort();
+ ui32 msgbPort = pm.GetPort();
+
+ Tests::TServerSettings serverSettings(msgbPort);
+ serverSettings.Port = msgbPort;
+ serverSettings.GrpcPort = grpcPort;
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableMetadataProvider(true)
+ .SetEnableOlapSchemaOperations(true);
+ ;
+
+ Tests::TServer::TPtr server = new Tests::TServer(serverSettings);
+ server->EnableGRpc(grpcPort);
+ // server->SetupDefaultProfiles();
+
+ Tests::TClient client(serverSettings);
+
+ auto& runtime = *server->GetRuntime();
+
+ auto sender = runtime.AllocateEdgeActor();
+ server->SetupRootStoragePools(sender);
+
+ TSecretUserEmulator* emulator = new TSecretUserEmulator;
+ runtime.Register(emulator);
+ {
+ runtime.SimulateSleep(TDuration::Seconds(10));
+ Cerr << "Initialization finished" << Endl;
+
+ Tests::NCS::THelper lHelper(*server);
+ lHelper.StartSchemaRequest("CREATE OBJECT secret1 (TYPE SECRET WITH value = `100`, ownerUserId = `root@root`)");
+
+ emulator->SetExpectedSecretsCount(1).SetExpectedAccessCount(0);
+ {
+ const TInstant start = Now();
+ while (!emulator->IsFound() && Now() - start < TDuration::Seconds(20)) {
+ runtime.SimulateSleep(TDuration::Seconds(1));
+ }
+ Y_VERIFY(emulator->IsFound());
+ }
+
+ lHelper.StartSchemaRequest("ALTER OBJECT secret1 (TYPE SECRET SET value = `abcde`, ownerUserId = `root@root`)");
+ lHelper.StartSchemaRequest("CREATE OBJECT `secret1/test@test1` (TYPE SECRET_ACCESS WITH ownerUserId = `root@root`)");
+
+ emulator->SetExpectedSecretsCount(1).SetExpectedAccessCount(1);
+ {
+ const TInstant start = Now();
+ while (!emulator->IsFound() && Now() - start < TDuration::Seconds(20)) {
+ runtime.SimulateSleep(TDuration::Seconds(1));
+ }
+ Y_VERIFY(emulator->IsFound());
+ }
+
+ lHelper.StartSchemaRequest("DROP OBJECT `secret1/test@test1` (TYPE SECRET_ACCESS WITH ownerUserId = `root@root`)");
+ lHelper.StartSchemaRequest("DROP OBJECT `secret1` (TYPE SECRET WITH ownerUserId = `root@root`)");
+
+ emulator->SetExpectedSecretsCount(0).SetExpectedAccessCount(0);
+ {
+ const TInstant start = Now();
+ while (!emulator->IsFound() && Now() - start < TDuration::Seconds(20)) {
+ runtime.SimulateSleep(TDuration::Seconds(1));
+ }
+ Y_VERIFY(emulator->IsFound());
+ }
+ }
+ }
+}
+}
diff --git a/ydb/services/metadata/service.cpp b/ydb/services/metadata/service.cpp
index 70714c1c43b..059b3fd51e8 100644
--- a/ydb/services/metadata/service.cpp
+++ b/ydb/services/metadata/service.cpp
@@ -6,4 +6,12 @@ NActors::TActorId MakeServiceId(const ui32 nodeId) {
return NActors::TActorId(nodeId, "SrvcMetaData");
}
+void TServiceOperator::Register() {
+ Singleton<TServiceOperator>()->EnabledFlag = true;
+}
+
+bool TServiceOperator::IsEnabled() {
+ return Singleton<TServiceOperator>()->EnabledFlag;
+}
+
}
diff --git a/ydb/services/metadata/service.h b/ydb/services/metadata/service.h
index d5761869748..667b9e25f23 100644
--- a/ydb/services/metadata/service.h
+++ b/ydb/services/metadata/service.h
@@ -1,30 +1,50 @@
#pragma once
#include <ydb/services/metadata/abstract/common.h>
+#include <ydb/services/metadata/abstract/manager.h>
#include <library/cpp/actors/core/event_local.h>
namespace NKikimr::NMetadataProvider {
+class TEvAlterObjects: public NActors::TEventLocal<TEvAlterObjects, EEvSubscribe::EvAlterObjects> {
+private:
+ YDB_READONLY_DEF(NMetadata::IAlterCommand::TPtr, Command);
+public:
+ TEvAlterObjects(NMetadata::IAlterCommand::TPtr command)
+ : Command(command) {
+
+ }
+};
+
class TEvSubscribeExternal: public NActors::TEventLocal<TEvSubscribeExternal, EEvSubscribe::EvSubscribeExternal> {
private:
- YDB_READONLY_DEF(ISnapshotParser::TPtr, SnapshotParser);
+ YDB_READONLY_DEF(ISnapshotParser::TPtr, Fetcher);
public:
- TEvSubscribeExternal(ISnapshotParser::TPtr parser)
- : SnapshotParser(parser)
+ TEvSubscribeExternal(ISnapshotParser::TPtr fetcher)
+ : Fetcher(fetcher)
{
- Y_VERIFY(!!SnapshotParser);
+ Y_VERIFY(!!Fetcher);
}
};
class TEvUnsubscribeExternal: public NActors::TEventLocal<TEvUnsubscribeExternal, EEvSubscribe::EvUnsubscribeExternal> {
private:
- YDB_READONLY_DEF(ISnapshotParser::TPtr, SnapshotParser);
+ YDB_READONLY_DEF(ISnapshotParser::TPtr, Fetcher);
public:
- TEvUnsubscribeExternal(ISnapshotParser::TPtr parser)
- : SnapshotParser(parser) {
- Y_VERIFY(!!SnapshotParser);
+ TEvUnsubscribeExternal(ISnapshotParser::TPtr fetcher)
+ : Fetcher(fetcher) {
+ Y_VERIFY(!!Fetcher);
}
};
NActors::TActorId MakeServiceId(const ui32 node);
+class TServiceOperator {
+private:
+ friend class TService;
+ bool EnabledFlag = false;
+ static void Register();
+public:
+ static bool IsEnabled();
+};
+
}