diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-11-23 14:18:42 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-11-23 14:18:42 +0300 |
commit | 98228b1c88aff21f1fe7e0ee2d2ed0a9c28cc781 (patch) | |
tree | 7bb501736ad18f6fd54f858271c018dc79ee39fc | |
parent | d2cba3927c297cd7e6af4cbbc6e581ec29ed066f (diff) | |
download | ydb-98228b1c88aff21f1fe7e0ee2d2ed0a9c28cc781.tar.gz |
metadata provider extension - add objects modification.
118 files changed, 4294 insertions, 712 deletions
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h index af59ea23c73..7cbd1868460 100644 --- a/ydb/core/base/events.h +++ b/ydb/core/base/events.h @@ -154,6 +154,8 @@ struct TKikimrEvents : TEvents { ES_TIERING, ES_METADATA_INITIALIZER, ES_YDB_AUDIT_LOG, + ES_METADATA_MANAGER, + ES_METADATA_SECRET }; }; diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto index 1f1fb531d79..8f5728be861 100644 --- a/ydb/core/protos/services.proto +++ b/ydb/core/protos/services.proto @@ -332,14 +332,15 @@ enum EServiceKikimr { // MetadataProvider METADATA_PROVIDER = 1500; + METADATA_INITIALIZER = 1501; + METADATA_MANAGER = 1502; + METADATA_SECRET = 1503; // Tiering TX_TIERING = 1600; // Background tasks BG_TASKS = 1700; - - METADATA_INITIALIZER = 1800; }; message TActivity { diff --git a/ydb/core/testlib/common_helper.cpp b/ydb/core/testlib/common_helper.cpp index 45408b456cc..eb9a9344447 100644 --- a/ydb/core/testlib/common_helper.cpp +++ b/ydb/core/testlib/common_helper.cpp @@ -19,12 +19,35 @@ void THelper::WaitForSchemeOperation(TActorId sender, ui64 txId) { runtime.GrabEdgeEventRethrow<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult>(sender); } -void THelper::StartDataRequest(const TString& request) const { +void THelper::StartDataRequest(const TString& request, const bool expectSuccess) const { NYdb::NTable::TTableClient tClient(Server.GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false)); - tClient.CreateSession().Subscribe([request](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) { + auto expectation = expectSuccess; + tClient.CreateSession().Subscribe([request, expectation](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) { auto session = f.GetValueSync().GetSession(); session.ExecuteDataQuery(request - , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()); + , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()) + .Subscribe([expectation](NYdb::NTable::TAsyncDataQueryResult f) + { + TStringStream ss; + f.GetValueSync().GetIssues().PrintTo(ss, false); + Cerr << ss.Str() << Endl; + Y_VERIFY(expectation == f.GetValueSync().IsSuccess()); + }); + }); +} + +void THelper::StartSchemaRequest(const TString& request, const bool expectSuccess) const { + NYdb::NTable::TTableClient tClient(Server.GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false)); + auto expectation = expectSuccess; + tClient.CreateSession().Subscribe([request, expectation](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) { + auto session = f.GetValueSync().GetSession(); + session.ExecuteSchemeQuery(request).Subscribe([expectation](NYdb::TAsyncStatus f) + { + TStringStream ss; + f.GetValueSync().GetIssues().PrintTo(ss, false); + Cerr << ss.Str() << Endl; + Y_VERIFY(expectation == f.GetValueSync().IsSuccess()); + }); }); } diff --git a/ydb/core/testlib/common_helper.h b/ydb/core/testlib/common_helper.h index 25ffdf809bc..4218e1f4476 100644 --- a/ydb/core/testlib/common_helper.h +++ b/ydb/core/testlib/common_helper.h @@ -16,6 +16,7 @@ public: void DropTable(const TString& tablePath); - void StartDataRequest(const TString& request) const; + void StartDataRequest(const TString& request, const bool expectSuccess = true) const; + void StartSchemaRequest(const TString& request, const bool expectSuccess = true) const; }; } diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index aa18ffcd877..3aa13a26915 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -845,7 +845,7 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u if (regularTtls.empty()) { regularTtls = Ttl.MakeIndexTtlMap(TInstant::Now(), force); } - const bool tiersUsage = regularTtls.empty() && Tiers && Tiers->IsActive(); + const bool tiersUsage = regularTtls.empty() && Tiers; if (tiersUsage) { regularTtls = Tiers->GetTiering(); } @@ -1045,7 +1045,7 @@ void TColumnShard::Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev) NOlap::TIndexInfo TColumnShard::GetActualIndexInfo(const bool tiersUsage) const { auto indexInfo = PrimaryIndex->GetIndexInfo(); - if (tiersUsage && Tiers && Tiers->IsActive()) { + if (tiersUsage && Tiers) { for (auto&& i : *Tiers) { indexInfo.AddStorageTier(i.second.BuildTierStorage()); } @@ -1055,7 +1055,7 @@ NOlap::TIndexInfo TColumnShard::GetActualIndexInfo(const bool tiersUsage) const void TColumnShard::ActivateTiering(const ui64 pathId, const bool enableTiering) { if (OwnerPath && !Tiers) { - Tiers = std::make_shared<TTiersManager>(TabletID(), SelfId(), OwnerPath); + Tiers = std::make_shared<TTiersManager>(TabletID(), SelfId()); Tiers->Start(Tiers); } if (!!Tiers) { diff --git a/ydb/core/tx/tiering/CMakeLists.txt b/ydb/core/tx/tiering/CMakeLists.txt index 59d6f170428..0e438b65e9d 100644 --- a/ydb/core/tx/tiering/CMakeLists.txt +++ b/ydb/core/tx/tiering/CMakeLists.txt @@ -6,6 +6,9 @@ # original buildsystem will not be accepted. +add_subdirectory(common) +add_subdirectory(rule) +add_subdirectory(tier) add_subdirectory(ut) add_library(core-tx-tiering) @@ -16,6 +19,9 @@ target_link_libraries(core-tx-tiering PUBLIC cpp-json-writer ydb-core-blobstorage ydb-core-protos + tx-tiering-rule + tx-tiering-tier + tx-tiering-common core-tablet_flat-protos ydb-core-wrappers api-protos @@ -27,11 +33,6 @@ target_sources(core-tx-tiering PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/tier_cleaner.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/path_cleaner.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/manager.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/decoder.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/external_data.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/tier_config.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/rule.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/owner_enrich.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/snapshot_enrich.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/snapshot.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/s3_actor.cpp @@ -45,6 +46,9 @@ target_link_libraries(core-tx-tiering.global PUBLIC cpp-json-writer ydb-core-blobstorage ydb-core-protos + tx-tiering-rule + tx-tiering-tier + tx-tiering-common core-tablet_flat-protos ydb-core-wrappers api-protos @@ -53,4 +57,5 @@ target_link_libraries(core-tx-tiering.global PUBLIC ) target_sources(core-tx-tiering.global PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/cleaner_task.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/external_data.cpp ) diff --git a/ydb/core/tx/tiering/common.h b/ydb/core/tx/tiering/common.h index 536e920a301..906f897dd65 100644 --- a/ydb/core/tx/tiering/common.h +++ b/ydb/core/tx/tiering/common.h @@ -7,26 +7,6 @@ namespace NKikimr::NColumnShard::NTiers { -class TGlobalTierId { -private: - YDB_ACCESSOR_DEF(TString, OwnerPath); - YDB_ACCESSOR_DEF(TString, TierName); -public: - TGlobalTierId(const TString& ownerPath, const TString& tierName) - : OwnerPath(ownerPath) - , TierName(tierName) { - - } - - bool operator<(const TGlobalTierId& item) const { - return std::tie(OwnerPath, TierName) < std::tie(item.OwnerPath, item.TierName); - } - - TString ToString() const { - return OwnerPath + "." + TierName; - } -}; - enum EEvents { EvTierCleared = EventSpaceBegin(TKikimrEvents::ES_TIERING), EvEnd diff --git a/ydb/core/tx/tiering/common/CMakeLists.txt b/ydb/core/tx/tiering/common/CMakeLists.txt new file mode 100644 index 00000000000..898b441f818 --- /dev/null +++ b/ydb/core/tx/tiering/common/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-tiering-common) +target_link_libraries(tx-tiering-common PUBLIC + contrib-libs-cxxsupp + yutil +) +target_sources(tx-tiering-common PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/common/global_tier_id.cpp +) diff --git a/ydb/core/tx/tiering/common/global_tier_id.cpp b/ydb/core/tx/tiering/common/global_tier_id.cpp new file mode 100644 index 00000000000..3bc06dedbf4 --- /dev/null +++ b/ydb/core/tx/tiering/common/global_tier_id.cpp @@ -0,0 +1,5 @@ +#include "global_tier_id.h" + +namespace NKikimr::NColumnShard::NTiers { + +} diff --git a/ydb/core/tx/tiering/common/global_tier_id.h b/ydb/core/tx/tiering/common/global_tier_id.h new file mode 100644 index 00000000000..3675987f684 --- /dev/null +++ b/ydb/core/tx/tiering/common/global_tier_id.h @@ -0,0 +1,27 @@ +#pragma once +#include <ydb/library/accessor/accessor.h> +#include <util/generic/string.h> + +namespace NKikimr::NColumnShard::NTiers { + +class TGlobalTierId { +private: + YDB_ACCESSOR_DEF(TString, OwnerPath); + YDB_ACCESSOR_DEF(TString, TierName); +public: + TGlobalTierId(const TString& ownerPath, const TString& tierName) + : OwnerPath(ownerPath) + , TierName(tierName) { + + } + + bool operator<(const TGlobalTierId& item) const { + return std::tie(OwnerPath, TierName) < std::tie(item.OwnerPath, item.TierName); + } + + TString ToString() const { + return OwnerPath + "." + TierName; + } +}; + +} diff --git a/ydb/core/tx/tiering/decoder.h b/ydb/core/tx/tiering/decoder.h deleted file mode 100644 index 587f4ae9da8..00000000000 --- a/ydb/core/tx/tiering/decoder.h +++ /dev/null @@ -1,18 +0,0 @@ -#pragma once -#include <ydb/public/api/protos/ydb_value.pb.h> -#include <util/datetime/base.h> - -namespace NKikimr::NInternal { - -class TDecoderBase { -protected: - i32 GetFieldIndex(const Ydb::ResultSet& rawData, const TString& columnId, const bool verify = true) const; -public: - bool Read(const ui32 columnIdx, TString& result, const Ydb::Value& r) const; - bool ReadDebugProto(const ui32 columnIdx, ::google::protobuf::Message& result, const Ydb::Value& r) const; - bool Read(const ui32 columnIdx, TDuration& result, const Ydb::Value& r) const; - bool Read(const ui32 columnIdx, TInstant& result, const Ydb::Value& r) const; - bool Read(const ui32 columnIdx, bool& result, const Ydb::Value& r) const; -}; - -} diff --git a/ydb/core/tx/tiering/external_data.cpp b/ydb/core/tx/tiering/external_data.cpp index 1d9ef8376dd..a280c35c125 100644 --- a/ydb/core/tx/tiering/external_data.cpp +++ b/ydb/core/tx/tiering/external_data.cpp @@ -1,8 +1,9 @@ #include "external_data.h" -#include "owner_enrich.h" #include "snapshot_enrich.h" #include <ydb/core/base/path.h> +#include <ydb/core/tx/tiering/tier/manager.h> +#include <ydb/core/tx/tiering/rule/manager.h> #include <library/cpp/json/writer/json_value.h> #include <library/cpp/protobuf/json/proto2json.h> @@ -19,20 +20,15 @@ void TSnapshotConstructor::EnrichSnapshotData(ISnapshot::TPtr original, NMetadat TActivationContext::AsActorContext().Register(new TActorSnapshotEnrich(original, controller, TablesDecoder)); } -TSnapshotConstructor::TSnapshotConstructor(const TString& ownerPath) - : OwnerPath(ownerPath) +TSnapshotConstructor::TSnapshotConstructor() { - Y_VERIFY(!!OwnerPath); - const TString ownerPathNormal = TFsPath(OwnerPath).Fix().GetPath(); - const TString tenantPathNormal = TFsPath(AppData()->TenantName).Fix().GetPath(); - Y_VERIFY(ownerPathNormal.StartsWith(tenantPathNormal)); - TablePath = "/" + AppData()->TenantName + "/.configs/tiering/"; - Tables.emplace_back(TablePath + "tiers"); - Tables.emplace_back(TablePath + "rules"); } -void TSnapshotConstructor::DoPrepare(NMetadataInitializer::IController::TPtr controller) const { - TActivationContext::AsActorContext().Register(new TActorOwnerEnrich(OwnerPath, controller, Tables)); +std::vector<NKikimr::NMetadata::IOperationsManager::TPtr> TSnapshotConstructor::DoGetManagers() const { + std::vector<NMetadata::IOperationsManager::TPtr> result; + result.emplace_back(std::make_shared<TTiersManager>()); + result.emplace_back(std::make_shared<TTieringRulesManager>()); + return result; } } diff --git a/ydb/core/tx/tiering/external_data.h b/ydb/core/tx/tiering/external_data.h index a04e3caf5df..4524a792dc8 100644 --- a/ydb/core/tx/tiering/external_data.h +++ b/ydb/core/tx/tiering/external_data.h @@ -10,24 +10,18 @@ namespace NKikimr::NColumnShard::NTiers { -class TSnapshotConstructor: public NMetadataProvider::TGenericSnapshotParser<TConfigsSnapshot> { +class TSnapshotConstructor: public NMetadataProvider::TSnapshotsManager<TConfigsSnapshot> { private: using TNavigate = NSchemeCache::TSchemeCacheNavigate; using TBaseActor = TActor<TSnapshotConstructor>; using ISnapshot = NMetadataProvider::ISnapshot; - TVector<TString> Tables; - TString TablePath; - const TString OwnerPath; mutable std::shared_ptr<TTablesDecoderCache> TablesDecoder; protected: - virtual const TVector<TString>& DoGetTables() const override { - return Tables; - } - virtual void DoPrepare(NMetadataInitializer::IController::TPtr controller) const override; + virtual std::vector<NMetadata::IOperationsManager::TPtr> DoGetManagers() const override; public: virtual void EnrichSnapshotData(ISnapshot::TPtr original, NMetadataProvider::ISnapshotAcceptorController::TPtr controller) const override; - TSnapshotConstructor(const TString& ownerPath); + TSnapshotConstructor(); }; } diff --git a/ydb/core/tx/tiering/manager.cpp b/ydb/core/tx/tiering/manager.cpp index 985172f6c08..33446f84de6 100644 --- a/ydb/core/tx/tiering/manager.cpp +++ b/ydb/core/tx/tiering/manager.cpp @@ -44,7 +44,7 @@ public: namespace NTiers { -TManager& TManager::Restart(const TTierConfig& config, const bool activity) { +TManager& TManager::Restart(const TTierConfig& config) { if (Config.IsSame(config)) { return *this; } @@ -52,9 +52,7 @@ TManager& TManager::Restart(const TTierConfig& config, const bool activity) { Stop(); } Config = config; - if (activity) { - Start(); - } + Start(); return *this; } @@ -138,7 +136,7 @@ void TTiersManager::TakeConfigs(NMetadataProvider::ISnapshot::TPtr snapshotExt) itSelf->second.Stop(); itSelf = Managers.erase(itSelf); } else { - itSelf->second.Restart(it->second, IsActive()); + itSelf->second.Restart(it->second); ++itSelf; } } @@ -148,9 +146,7 @@ void TTiersManager::TakeConfigs(NMetadataProvider::ISnapshot::TPtr snapshotExt) } NTiers::TManager localManager(TabletId, TabletActorId, i.second); auto& manager = Managers.emplace(i.second.GetGlobalTierId(), std::move(localManager)).first->second; - if (IsActive()) { - manager.Start(); - } + manager.Start(); } } @@ -169,10 +165,7 @@ TActorId TTiersManager::GetStorageActorId(const NTiers::TGlobalTierId& tierId) { } TTiersManager& TTiersManager::Start(std::shared_ptr<TTiersManager> ownerPtr) { - if (ActiveFlag) { - return *this; - } - ActiveFlag = true; + ALS_ERROR(NKikimrServices::TX_TIERING) << "AAAAAAAAAAAAAA "; Y_VERIFY(!Actor); Actor = new TTiersManager::TActor(ownerPtr); TActivationContext::AsActorContext().RegisterWithSameMailbox(Actor); @@ -183,11 +176,9 @@ TTiersManager& TTiersManager::Start(std::shared_ptr<TTiersManager> ownerPtr) { } TTiersManager& TTiersManager::Stop() { - if (!ActiveFlag) { + if (!Actor) { return *this; } - ActiveFlag = false; - Y_VERIFY(!!Actor); if (TlsActivationContext) { TActivationContext::AsActorContext().Send(Actor->SelfId(), new NActors::TEvents::TEvPoison); } @@ -206,7 +197,7 @@ const NTiers::TManager& TTiersManager::GetManagerVerified(const NTiers::TGlobalT NMetadataProvider::ISnapshotParser::TPtr TTiersManager::GetExternalDataManipulation() const { if (!ExternalDataManipulation) { - ExternalDataManipulation = std::make_shared<NTiers::TSnapshotConstructor>(OwnerPath); + ExternalDataManipulation = std::make_shared<NTiers::TSnapshotConstructor>(); } return ExternalDataManipulation; } diff --git a/ydb/core/tx/tiering/manager.h b/ydb/core/tx/tiering/manager.h index 3a716addf2c..f9effb784c0 100644 --- a/ydb/core/tx/tiering/manager.h +++ b/ydb/core/tx/tiering/manager.h @@ -1,6 +1,5 @@ #pragma once #include "external_data.h" -#include "tier_config.h" #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/actor.h> @@ -21,7 +20,7 @@ public: static NOlap::TCompression ConvertCompression(const NKikimrSchemeOp::TCompressionOptions& compression); NOlap::TStorageTier BuildTierStorage() const; - TManager& Restart(const TTierConfig& config, const bool activity); + TManager& Restart(const TTierConfig& config); bool NeedExport() const { return Config.NeedExport(); } @@ -36,20 +35,17 @@ private: using TManagers = TMap<NTiers::TGlobalTierId, NTiers::TManager>; ui64 TabletId = 0; const TActorId TabletActorId; - TString OwnerPath; TActor* Actor = nullptr; std::unordered_set<ui64> EnabledPathId; YDB_READONLY_DEF(TManagers, Managers); - YDB_READONLY_FLAG(Active, false); NMetadataProvider::ISnapshot::TPtr Snapshot; mutable NMetadataProvider::ISnapshotParser::TPtr ExternalDataManipulation; public: - TTiersManager(const ui64 tabletId, const TActorId& tabletActorId, const TString& ownerPath) + TTiersManager(const ui64 tabletId, const TActorId& tabletActorId) : TabletId(tabletId) , TabletActorId(tabletActorId) - , OwnerPath(ownerPath) { } TActorId GetActorId() const; diff --git a/ydb/core/tx/tiering/owner_enrich.cpp b/ydb/core/tx/tiering/owner_enrich.cpp deleted file mode 100644 index f3d2fa611c2..00000000000 --- a/ydb/core/tx/tiering/owner_enrich.cpp +++ /dev/null @@ -1,111 +0,0 @@ -#include "owner_enrich.h" -#include "snapshot.h" - -#include <ydb/core/base/path.h> - -#include <util/string/join.h> - -namespace NKikimr::NColumnShard::NTiers { - -TVector<NMetadataInitializer::ITableModifier::TPtr> TActorOwnerEnrich::BuildModifiers(const NSchemeCache::TSchemeCacheNavigate::TEntry& ownerEntry) const { - TVector<NMetadataInitializer::ITableModifier::TPtr> result; - { - Ydb::Table::CreateTableRequest request; - request.set_session_id(""); - request.set_path(TableNames[0]); - request.add_primary_key("ownerPath"); - request.add_primary_key("tierName"); - { - auto& column = *request.add_columns(); - column.set_name("ownerPath"); - column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); - } - { - auto& column = *request.add_columns(); - column.set_name("tierName"); - column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); - } - { - auto& column = *request.add_columns(); - column.set_name("tierConfig"); - column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); - } - result.emplace_back(new NMetadataInitializer::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request)); - } - { - Ydb::Table::CreateTableRequest request; - request.set_session_id(""); - request.set_path(TableNames[1]); - request.add_primary_key("ownerPath"); - request.add_primary_key("tablePath"); - request.add_primary_key("tierName"); - { - auto& column = *request.add_columns(); - column.set_name("ownerPath"); - column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); - } - { - auto& column = *request.add_columns(); - column.set_name("tierName"); - column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); - } - { - auto& column = *request.add_columns(); - column.set_name("tablePath"); - column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); - } - { - auto& column = *request.add_columns(); - column.set_name("durationForEvict"); - column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); - } - { - auto& column = *request.add_columns(); - column.set_name("column"); - column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); - } - result.emplace_back(new NMetadataInitializer::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request)); - } - Y_VERIFY(!!ownerEntry.SecurityObject); - return result; -} - -void TActorOwnerEnrich::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { - auto* info = ev->Get(); - const auto& request = info->Request; - auto g = PassAwayGuard(); - if (request->ResultSet.empty()) { - Controller->PreparationProblem("cannot resolve owner path to ids"); - return; - } - - if (request->ResultSet.size() != 1) { - Controller->PreparationProblem("unexpected result set size: " + ::ToString(request->ResultSet.size())); - return; - } - - for (auto&& i : request->ResultSet) { - const TString path = "/" + JoinSeq("/", i.Path); - switch (i.Status) { - case TNavigate::EStatus::Ok: - Controller->PreparationFinished(BuildModifiers(i)); - return; - default: - Controller->PreparationProblem(::ToString(i.Status) + " for '" + path + "'"); - return; - } - } - Controller->PreparationProblem("unexpected"); -} - -void TActorOwnerEnrich::Bootstrap() { - Become(&TThis::StateMain); - auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>(); - request->DatabaseName = NKikimr::CanonizePath(AppData()->TenantName); - auto& entry = request->ResultSet.emplace_back(); - entry.Operation = TNavigate::OpPath; - entry.Path = NKikimr::SplitPath(OwnerPath); - Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release())); -} - -} diff --git a/ydb/core/tx/tiering/owner_enrich.h b/ydb/core/tx/tiering/owner_enrich.h deleted file mode 100644 index 969c5fea28e..00000000000 --- a/ydb/core/tx/tiering/owner_enrich.h +++ /dev/null @@ -1,48 +0,0 @@ -#pragma once -#include "rule.h" -#include "tier_config.h" - -#include <ydb/core/protos/flat_scheme_op.pb.h> -#include <ydb/core/tx/scheme_cache/scheme_cache.h> -#include <ydb/services/metadata/service.h> - -#include <library/cpp/json/writer/json_value.h> - -namespace NKikimr::NColumnShard::NTiers { - -class TActorOwnerEnrich: public TActorBootstrapped<TActorOwnerEnrich> { -private: - using TBase = TActorBootstrapped<TActorOwnerEnrich>; - using TNavigate = NSchemeCache::TSchemeCacheNavigate; - const TString OwnerPath; - NMetadataInitializer::IController::TPtr Controller; - const TVector<TString> TableNames; -private: - TVector<NMetadataInitializer::ITableModifier::TPtr> BuildModifiers(const NSchemeCache::TSchemeCacheNavigate::TEntry& ownerEntry) const; - - void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev); -public: - static constexpr NKikimrServices::TActivity::EType ActorActivityType() { - return NKikimrServices::TActivity::TIERING_OWNER_ENRICH; - } - - STATEFN(StateMain) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); - default: - break; - } - } - - TActorOwnerEnrich(const TString& ownerPath, NMetadataInitializer::IController::TPtr controller, const TVector<TString>& tableNames) - : OwnerPath(ownerPath) - , Controller(controller) - , TableNames(tableNames) - { - - } - - void Bootstrap(); -}; - -} diff --git a/ydb/core/tx/tiering/path_cleaner.cpp b/ydb/core/tx/tiering/path_cleaner.cpp index 70e55ad362c..7be50ae766e 100644 --- a/ydb/core/tx/tiering/path_cleaner.cpp +++ b/ydb/core/tx/tiering/path_cleaner.cpp @@ -20,7 +20,7 @@ void TPathCleaner::Handle(TEvTierCleared::TPtr& ev) { NMetadataProvider::ISnapshotParser::TPtr TPathCleaner::GetTieringSnapshotParser() const { if (!ExternalDataManipulation) { - ExternalDataManipulation = std::make_shared<NTiers::TSnapshotConstructor>(OwnerPath); + ExternalDataManipulation = std::make_shared<NTiers::TSnapshotConstructor>(); } return ExternalDataManipulation; } diff --git a/ydb/core/tx/tiering/rule.cpp b/ydb/core/tx/tiering/rule.cpp deleted file mode 100644 index 6bcb4fcbfa9..00000000000 --- a/ydb/core/tx/tiering/rule.cpp +++ /dev/null @@ -1,26 +0,0 @@ -#include "rule.h" - -namespace NKikimr::NColumnShard::NTiers { -NJson::TJsonValue TTieringRule::GetDebugJson() const { - NJson::TJsonValue result = NJson::JSON_MAP; - result.InsertValue(TDecoder::TierName, TierName); - result.InsertValue(TDecoder::TablePath, TablePath); - result.InsertValue("tablePathId", TablePathId); - result.InsertValue(TDecoder::Column, Column); - result.InsertValue(TDecoder::DurationForEvict, DurationForEvict.ToString()); - return result; -} - -NJson::TJsonValue TTableTiering::GetDebugJson() const { - NJson::TJsonValue result = NJson::JSON_MAP; - result.InsertValue(TTieringRule::TDecoder::TablePath, TablePath); - result.InsertValue("tablePathId", TablePathId); - result.InsertValue(TTieringRule::TDecoder::Column, Column); - auto&& jsonRules = result.InsertValue("rules", NJson::JSON_ARRAY); - for (auto&& i : Rules) { - jsonRules.AppendValue(i.GetDebugJson()); - } - return result; -} - -} diff --git a/ydb/core/tx/tiering/rule/CMakeLists.txt b/ydb/core/tx/tiering/rule/CMakeLists.txt new file mode 100644 index 00000000000..2efcc188954 --- /dev/null +++ b/ydb/core/tx/tiering/rule/CMakeLists.txt @@ -0,0 +1,31 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-tiering-rule) +target_link_libraries(tx-tiering-rule PUBLIC + contrib-libs-cxxsupp + yutil + services-metadata-initializer + services-metadata-abstract +) +target_sources(tx-tiering-rule PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/rule/object.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/rule/initializer.cpp +) + +add_global_library_for(tx-tiering-rule.global tx-tiering-rule) +target_link_libraries(tx-tiering-rule.global PUBLIC + contrib-libs-cxxsupp + yutil + services-metadata-initializer + services-metadata-abstract +) +target_sources(tx-tiering-rule.global PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/rule/manager.cpp +) diff --git a/ydb/core/tx/tiering/rule/initializer.cpp b/ydb/core/tx/tiering/rule/initializer.cpp new file mode 100644 index 00000000000..be36ce9e9e3 --- /dev/null +++ b/ydb/core/tx/tiering/rule/initializer.cpp @@ -0,0 +1,54 @@ +#include "initializer.h" +#include "object.h" + +namespace NKikimr::NColumnShard::NTiers { + +TVector<NKikimr::NMetadataInitializer::ITableModifier::TPtr> TTierRulesInitializer::BuildModifiers() const { + TVector<NMetadataInitializer::ITableModifier::TPtr> result; + { + Ydb::Table::CreateTableRequest request; + request.set_session_id(""); + request.set_path(TTieringRule::GetStorageTablePath()); + request.add_primary_key("tieringRuleId"); + { + auto& column = *request.add_columns(); + column.set_name("tieringRuleId"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *request.add_columns(); + column.set_name("ownerPath"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *request.add_columns(); + column.set_name("tierName"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *request.add_columns(); + column.set_name("tablePath"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *request.add_columns(); + column.set_name("durationForEvict"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *request.add_columns(); + column.set_name("column"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + result.emplace_back(new NMetadataInitializer::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request, "create_rules")); + auto hRequest = TTieringRule::AddHistoryTableScheme(request); + result.emplace_back(new NMetadataInitializer::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(hRequest, "create_rules_history")); + } + return result; +} + +void TTierRulesInitializer::DoPrepare(NMetadataInitializer::IInitializerInput::TPtr controller) const { + controller->PreparationFinished(BuildModifiers()); +} + +} diff --git a/ydb/core/tx/tiering/rule/initializer.h b/ydb/core/tx/tiering/rule/initializer.h new file mode 100644 index 00000000000..f0d99a25ca8 --- /dev/null +++ b/ydb/core/tx/tiering/rule/initializer.h @@ -0,0 +1,14 @@ +#pragma once +#include <ydb/services/metadata/abstract/common.h> +#include <ydb/services/metadata/initializer/common.h> + +namespace NKikimr::NColumnShard::NTiers { + +class TTierRulesInitializer: public NMetadata::IInitializationBehaviour { +protected: + TVector<NMetadataInitializer::ITableModifier::TPtr> BuildModifiers() const; + virtual void DoPrepare(NMetadataInitializer::IInitializerInput::TPtr controller) const override; +public: +}; + +} diff --git a/ydb/core/tx/tiering/rule/manager.cpp b/ydb/core/tx/tiering/rule/manager.cpp new file mode 100644 index 00000000000..8c08544dc93 --- /dev/null +++ b/ydb/core/tx/tiering/rule/manager.cpp @@ -0,0 +1,12 @@ +#include "manager.h" +#include "initializer.h" + +namespace NKikimr::NColumnShard::NTiers { + +TTieringRulesManager::TFactory::TRegistrator<TTieringRulesManager> TTieringRulesManager::Registrator(TTieringRulesManager::GetTypeIdStatic()); + +NMetadata::IInitializationBehaviour::TPtr TTieringRulesManager::DoGetInitializationBehaviour() const { + return std::make_shared<TTierRulesInitializer>(); +} + +} diff --git a/ydb/core/tx/tiering/rule/manager.h b/ydb/core/tx/tiering/rule/manager.h new file mode 100644 index 00000000000..1705199aebe --- /dev/null +++ b/ydb/core/tx/tiering/rule/manager.h @@ -0,0 +1,15 @@ +#pragma once +#include "object.h" + +#include <ydb/services/metadata/manager/generic_manager.h> + +namespace NKikimr::NColumnShard::NTiers { + +class TTieringRulesManager: public NMetadata::TGenericOperationsManager<TTieringRule> { +private: + static TFactory::TRegistrator<TTieringRulesManager> Registrator; +protected: + virtual NMetadata::IInitializationBehaviour::TPtr DoGetInitializationBehaviour() const override; +}; + +} diff --git a/ydb/core/tx/tiering/rule/object.cpp b/ydb/core/tx/tiering/rule/object.cpp new file mode 100644 index 00000000000..82c72444cdd --- /dev/null +++ b/ydb/core/tx/tiering/rule/object.cpp @@ -0,0 +1,158 @@ +#include "object.h" +#include <ydb/services/metadata/manager/ydb_value_operator.h> + +namespace NKikimr::NColumnShard::NTiers { +NJson::TJsonValue TTieringRule::GetDebugJson() const { + NJson::TJsonValue result = NJson::JSON_MAP; + result.InsertValue(TDecoder::TieringRuleId, TieringRuleId); + result.InsertValue(TDecoder::TierName, TierName); + result.InsertValue(TDecoder::TablePath, TablePath); + result.InsertValue("tablePathId", TablePathId); + result.InsertValue(TDecoder::Column, Column); + result.InsertValue(TDecoder::DurationForEvict, DurationForEvict.ToString()); + return result; +} + +TString TTieringRule::GetStorageTablePath() { + return "/" + AppData()->TenantName + "/.metadata/tiering/rules"; +} + +void TTieringRule::AlteringPreparation(std::vector<TTieringRule>&& objects, + NMetadataManager::IAlterPreparationController<TTieringRule>::TPtr controller, + const NMetadata::IOperationsManager::TModificationContext& /*context*/) +{ + controller->PreparationFinished(std::move(objects)); +} + +NKikimr::NMetadataManager::TTableRecord TTieringRule::SerializeToRecord() const { + NMetadataManager::TTableRecord result; + result.SetColumn(TDecoder::TieringRuleId, NMetadataManager::TYDBValue::Bytes(TieringRuleId)); + result.SetColumn(TDecoder::OwnerPath, NMetadataManager::TYDBValue::Bytes(OwnerPath)); + result.SetColumn(TDecoder::TierName, NMetadataManager::TYDBValue::Bytes(TierName)); + result.SetColumn(TDecoder::TablePath, NMetadataManager::TYDBValue::Bytes(TablePath)); + result.SetColumn(TDecoder::DurationForEvict, NMetadataManager::TYDBValue::Bytes(DurationForEvict.ToString())); + result.SetColumn(TDecoder::Column, NMetadataManager::TYDBValue::Bytes(Column)); + return result; +} + +bool TTieringRule::DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& r) { + if (!decoder.Read(decoder.GetTieringRuleIdIdx(), TieringRuleId, r)) { + return false; + } + if (!decoder.Read(decoder.GetOwnerPathIdx(), OwnerPath, r)) { + return false; + } + OwnerPath = TFsPath(OwnerPath).Fix().GetPath(); + if (!decoder.Read(decoder.GetTierNameIdx(), TierName, r)) { + return false; + } + if (!decoder.Read(decoder.GetTablePathIdx(), TablePath, r)) { + return false; + } + TablePath = TFsPath(TablePath).Fix().GetPath(); + if (!decoder.Read(decoder.GetDurationForEvictIdx(), DurationForEvict, r)) { + return false; + } + if (!decoder.Read(decoder.GetColumnIdx(), Column, r)) { + return false; + } + return true; +} + +NMetadata::TOperationParsingResult TTieringRule::BuildPatchFromSettings(const NYql::TObjectSettingsImpl& settings, + const NMetadata::IOperationsManager::TModificationContext& /*context*/) { + NKikimr::NMetadataManager::TTableRecord result; + result.SetColumn(TDecoder::TieringRuleId, NMetadataManager::TYDBValue::Bytes(settings.GetObjectId())); + { + auto it = settings.GetFeatures().find(TDecoder::TablePath); + if (it != settings.GetFeatures().end()) { + result.SetColumn(TDecoder::TablePath, NMetadataManager::TYDBValue::Bytes(it->second)); + } + } + { + auto it = settings.GetFeatures().find(TDecoder::TierName); + if (it != settings.GetFeatures().end()) { + result.SetColumn(TDecoder::TierName, NMetadataManager::TYDBValue::Bytes(it->second)); + } + } + { + auto it = settings.GetFeatures().find(TDecoder::OwnerPath); + if (it != settings.GetFeatures().end()) { + result.SetColumn(TDecoder::OwnerPath, NMetadataManager::TYDBValue::Bytes(it->second)); + } + } + { + auto it = settings.GetFeatures().find(TDecoder::DurationForEvict); + if (it != settings.GetFeatures().end()) { + result.SetColumn(TDecoder::DurationForEvict, NMetadataManager::TYDBValue::Bytes(it->second)); + } + } + { + auto it = settings.GetFeatures().find(TDecoder::Column); + if (it != settings.GetFeatures().end()) { + result.SetColumn(TDecoder::Column, NMetadataManager::TYDBValue::Bytes(it->second)); + } + } + return result; +} + +NJson::TJsonValue TTableTiering::GetDebugJson() const { + NJson::TJsonValue result = NJson::JSON_MAP; + result.InsertValue(TTieringRule::TDecoder::TablePath, TablePath); + result.InsertValue("tablePathId", TablePathId); + result.InsertValue(TTieringRule::TDecoder::Column, Column); + auto&& jsonRules = result.InsertValue("rules", NJson::JSON_ARRAY); + for (auto&& i : Rules) { + jsonRules.AppendValue(i.GetDebugJson()); + } + return result; +} + +void TTableTiering::AddRule(TTieringRule&& tr) { + if (Rules.size()) { + Y_VERIFY(Rules.back().GetDurationForEvict() <= tr.GetDurationForEvict()); + if (Column != tr.GetColumn()) { + ALS_ERROR(NKikimrServices::TX_TIERING) << "inconsistency rule column: " << + TablePath << "/" << Column << " != " << tr.GetColumn(); + return; + } + } else { + Column = tr.GetColumn(); + TablePath = tr.GetTablePath(); + TablePathId = tr.GetTablePathId(); + } + Rules.emplace_back(std::move(tr)); +} + +NKikimr::NOlap::TTiersInfo TTableTiering::BuildTiersInfo() const { + NOlap::TTiersInfo result(GetColumn()); + for (auto&& r : Rules) { + result.AddTier(r.GetTierName(), Now() - r.GetDurationForEvict()); + } + return result; +} + +std::vector<Ydb::Column> TTieringRule::TDecoder::GetPKColumns() { + return + { + NMetadataManager::TYDBColumn::Bytes(TieringRuleId) + }; +} + +std::vector<Ydb::Column> TTieringRule::TDecoder::GetColumns() { + return + { + NMetadataManager::TYDBColumn::Bytes(TieringRuleId), + NMetadataManager::TYDBColumn::Bytes(OwnerPath), + NMetadataManager::TYDBColumn::Bytes(TierName), + NMetadataManager::TYDBColumn::Bytes(TablePath), + NMetadataManager::TYDBColumn::Bytes(Column), + NMetadataManager::TYDBColumn::Bytes(DurationForEvict) + }; +} + +std::vector<TString> TTieringRule::TDecoder::GetPKColumnIds() { + return { TieringRuleId }; +} + +} diff --git a/ydb/core/tx/tiering/rule.h b/ydb/core/tx/tiering/rule/object.h index 468ce5ec510..8b71c0e376e 100644 --- a/ydb/core/tx/tiering/rule.h +++ b/ydb/core/tx/tiering/rule/object.h @@ -1,17 +1,19 @@ #pragma once -#include "common.h" -#include "decoder.h" - #include <ydb/core/protos/flat_scheme_op.pb.h> #include <ydb/core/tx/columnshard/engines/column_engine.h> +#include <ydb/core/tx/tiering/common/global_tier_id.h> +#include <ydb/services/metadata/abstract/decoder.h> #include <ydb/services/metadata/service.h> +#include <ydb/services/metadata/manager/object.h> +#include <ydb/services/metadata/manager/preparation_controller.h> #include <library/cpp/json/writer/json_value.h> namespace NKikimr::NColumnShard::NTiers { -class TTieringRule { +class TTieringRule: public NMetadataManager::TObject<TTieringRule> { private: + YDB_ACCESSOR_DEF(TString, TieringRuleId); YDB_ACCESSOR_DEF(TString, OwnerPath); YDB_ACCESSOR_DEF(TString, TierName); YDB_ACCESSOR_DEF(TString, TablePath); @@ -19,31 +21,47 @@ private: YDB_ACCESSOR_DEF(TString, Column); YDB_ACCESSOR_DEF(TDuration, DurationForEvict); public: - bool operator<(const TTieringRule& item) const { - return std::tie(TablePath, TierName, Column, DurationForEvict) - < std::tie(item.TablePath, item.TierName, item.Column, item.DurationForEvict); + static TString GetTypeId() { + return "TIERING_RULE"; } - TGlobalTierId GetGlobalTierId() const { return TGlobalTierId(OwnerPath, TierName); } + static TString GetStorageTablePath(); + static void AlteringPreparation(std::vector<TTieringRule>&& objects, + NMetadataManager::IAlterPreparationController<TTieringRule>::TPtr controller, + const NMetadata::IOperationsManager::TModificationContext& context); + + bool operator<(const TTieringRule& item) const { + return std::tie(TablePath, TierName, Column, DurationForEvict, TieringRuleId) + < std::tie(item.TablePath, item.TierName, item.Column, item.DurationForEvict, item.TieringRuleId); + } + NJson::TJsonValue GetDebugJson() const; + class TDecoder: public NInternal::TDecoderBase { private: + YDB_READONLY(i32, TieringRuleIdIdx, -1); YDB_READONLY(i32, OwnerPathIdx, -1); YDB_READONLY(i32, TablePathIdx, -1); YDB_READONLY(i32, DurationForEvictIdx, -1); YDB_READONLY(i32, TierNameIdx, -1); YDB_READONLY(i32, ColumnIdx, -1); public: + static inline const TString TieringRuleId = "tieringRuleId"; static inline const TString OwnerPath = "ownerPath"; static inline const TString TierName = "tierName"; static inline const TString TablePath = "tablePath"; static inline const TString DurationForEvict = "durationForEvict"; static inline const TString Column = "column"; + static std::vector<Ydb::Column> GetPKColumns(); + static std::vector<Ydb::Column> GetColumns(); + static std::vector<TString> GetPKColumnIds(); + TDecoder(const Ydb::ResultSet& rawData) { + TieringRuleIdIdx = GetFieldIndex(rawData, TieringRuleId); OwnerPathIdx = GetFieldIndex(rawData, OwnerPath); TierNameIdx = GetFieldIndex(rawData, TierName); TablePathIdx = GetFieldIndex(rawData, TablePath); @@ -51,26 +69,10 @@ public: ColumnIdx = GetFieldIndex(rawData, Column); } }; - bool DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& r) { - if (!decoder.Read(decoder.GetOwnerPathIdx(), OwnerPath, r)) { - return false; - } - OwnerPath = TFsPath(OwnerPath).Fix().GetPath(); - if (!decoder.Read(decoder.GetTierNameIdx(), TierName, r)) { - return false; - } - if (!decoder.Read(decoder.GetTablePathIdx(), TablePath, r)) { - return false; - } - TablePath = TFsPath(TablePath).Fix().GetPath(); - if (!decoder.Read(decoder.GetDurationForEvictIdx(), DurationForEvict, r)) { - return false; - } - if (!decoder.Read(decoder.GetColumnIdx(), Column, r)) { - return false; - } - return true; - } + NKikimr::NMetadataManager::TTableRecord SerializeToRecord() const; + bool DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& r); + static NMetadata::TOperationParsingResult BuildPatchFromSettings(const NYql::TObjectSettingsImpl& settings, + const NMetadata::IOperationsManager::TModificationContext& context); }; class TTableTiering { @@ -88,29 +90,9 @@ public: } } NJson::TJsonValue GetDebugJson() const; - void AddRule(TTieringRule&& tr) { - if (Rules.size()) { - Y_VERIFY(Rules.back().GetDurationForEvict() <= tr.GetDurationForEvict()); - if (Column != tr.GetColumn()) { - ALS_ERROR(NKikimrServices::TX_TIERING) << "inconsistency rule column: " << - TablePath << "/" << Column << " != " << tr.GetColumn(); - return; - } - } else { - Column = tr.GetColumn(); - TablePath = tr.GetTablePath(); - TablePathId = tr.GetTablePathId(); - } - Rules.emplace_back(std::move(tr)); - } + void AddRule(TTieringRule&& tr); - NOlap::TTiersInfo BuildTiersInfo() const { - NOlap::TTiersInfo result(GetColumn()); - for (auto&& r : Rules) { - result.AddTier(r.GetTierName(), Now() - r.GetDurationForEvict()); - } - return result; - } + NOlap::TTiersInfo BuildTiersInfo() const; }; } diff --git a/ydb/core/tx/tiering/snapshot.h b/ydb/core/tx/tiering/snapshot.h index b913dacbb64..30f19b359d6 100644 --- a/ydb/core/tx/tiering/snapshot.h +++ b/ydb/core/tx/tiering/snapshot.h @@ -1,9 +1,9 @@ #pragma once -#include "rule.h" -#include "tier_config.h" - #include <ydb/core/protos/flat_scheme_op.pb.h> #include <ydb/core/tx/scheme_cache/scheme_cache.h> +#include <ydb/core/tx/tiering/tier/object.h> +#include <ydb/core/tx/tiering/rule/object.h> + #include <ydb/services/metadata/service.h> #include <library/cpp/json/writer/json_value.h> diff --git a/ydb/core/tx/tiering/snapshot_enrich.h b/ydb/core/tx/tiering/snapshot_enrich.h index eca0eb893c4..2a24ba2226a 100644 --- a/ydb/core/tx/tiering/snapshot_enrich.h +++ b/ydb/core/tx/tiering/snapshot_enrich.h @@ -1,6 +1,4 @@ #pragma once -#include "rule.h" -#include "tier_config.h" #include <ydb/core/protos/flat_scheme_op.pb.h> #include <ydb/core/tx/scheme_cache/scheme_cache.h> diff --git a/ydb/core/tx/tiering/tier/CMakeLists.txt b/ydb/core/tx/tiering/tier/CMakeLists.txt new file mode 100644 index 00000000000..523f34fb8c2 --- /dev/null +++ b/ydb/core/tx/tiering/tier/CMakeLists.txt @@ -0,0 +1,31 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-tiering-tier) +target_link_libraries(tx-tiering-tier PUBLIC + contrib-libs-cxxsupp + yutil + services-metadata-initializer + services-metadata-abstract +) +target_sources(tx-tiering-tier PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/tier/object.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/tier/initializer.cpp +) + +add_global_library_for(tx-tiering-tier.global tx-tiering-tier) +target_link_libraries(tx-tiering-tier.global PUBLIC + contrib-libs-cxxsupp + yutil + services-metadata-initializer + services-metadata-abstract +) +target_sources(tx-tiering-tier.global PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/tier/manager.cpp +) diff --git a/ydb/core/tx/tiering/tier/initializer.cpp b/ydb/core/tx/tiering/tier/initializer.cpp new file mode 100644 index 00000000000..4567b99fef6 --- /dev/null +++ b/ydb/core/tx/tiering/tier/initializer.cpp @@ -0,0 +1,39 @@ +#include "initializer.h" +#include "object.h" + +namespace NKikimr::NColumnShard::NTiers { + +TVector<NKikimr::NMetadataInitializer::ITableModifier::TPtr> TTiersInitializer::BuildModifiers() const { + TVector<NMetadataInitializer::ITableModifier::TPtr> result; + { + Ydb::Table::CreateTableRequest request; + request.set_session_id(""); + request.set_path(TTierConfig::GetStorageTablePath()); + request.add_primary_key("tierName"); + { + auto& column = *request.add_columns(); + column.set_name("ownerPath"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *request.add_columns(); + column.set_name("tierName"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *request.add_columns(); + column.set_name("tierConfig"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + result.emplace_back(new NMetadataInitializer::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request, "create_tiers")); + auto hRequest = TTierConfig::AddHistoryTableScheme(request); + result.emplace_back(new NMetadataInitializer::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(hRequest, "create_tiers_history")); + } + return result; +} + +void TTiersInitializer::DoPrepare(NMetadataInitializer::IInitializerInput::TPtr controller) const { + controller->PreparationFinished(BuildModifiers()); +} + +} diff --git a/ydb/core/tx/tiering/tier/initializer.h b/ydb/core/tx/tiering/tier/initializer.h new file mode 100644 index 00000000000..3fbcd4ed52e --- /dev/null +++ b/ydb/core/tx/tiering/tier/initializer.h @@ -0,0 +1,14 @@ +#pragma once +#include <ydb/services/metadata/abstract/common.h> +#include <ydb/services/metadata/initializer/common.h> + +namespace NKikimr::NColumnShard::NTiers { + +class TTiersInitializer: public NMetadata::IInitializationBehaviour { +protected: + TVector<NMetadataInitializer::ITableModifier::TPtr> BuildModifiers() const; + virtual void DoPrepare(NMetadataInitializer::IInitializerInput::TPtr controller) const override; +public: +}; + +} diff --git a/ydb/core/tx/tiering/tier/manager.cpp b/ydb/core/tx/tiering/tier/manager.cpp new file mode 100644 index 00000000000..07f662b1f5a --- /dev/null +++ b/ydb/core/tx/tiering/tier/manager.cpp @@ -0,0 +1,12 @@ +#include "manager.h" +#include "initializer.h" + +namespace NKikimr::NColumnShard::NTiers { + +TTiersManager::TFactory::TRegistrator<TTiersManager> TTiersManager::Registrator(TTiersManager::GetTypeIdStatic()); + +NMetadata::IInitializationBehaviour::TPtr TTiersManager::DoGetInitializationBehaviour() const { + return std::make_shared<TTiersInitializer>(); +} + +} diff --git a/ydb/core/tx/tiering/tier/manager.h b/ydb/core/tx/tiering/tier/manager.h new file mode 100644 index 00000000000..95878bcd0fd --- /dev/null +++ b/ydb/core/tx/tiering/tier/manager.h @@ -0,0 +1,16 @@ +#pragma once +#include "object.h" + +#include <ydb/services/metadata/manager/generic_manager.h> + +namespace NKikimr::NColumnShard::NTiers { + +class TTiersManager: public NMetadata::TGenericOperationsManager<TTierConfig> { +private: + static TFactory::TRegistrator<TTiersManager> Registrator; +protected: + virtual NMetadata::IInitializationBehaviour::TPtr DoGetInitializationBehaviour() const override; +public: +}; + +} diff --git a/ydb/core/tx/tiering/tier/object.cpp b/ydb/core/tx/tiering/tier/object.cpp new file mode 100644 index 00000000000..638d2614f7a --- /dev/null +++ b/ydb/core/tx/tiering/tier/object.cpp @@ -0,0 +1,94 @@ +#include "object.h" +#include <ydb/services/metadata/manager/ydb_value_operator.h> + +#include <library/cpp/json/writer/json_value.h> +#include <library/cpp/protobuf/json/proto2json.h> + +namespace NKikimr::NColumnShard::NTiers { + +NJson::TJsonValue TTierConfig::GetDebugJson() const { + NJson::TJsonValue result = NJson::JSON_MAP; + result.InsertValue(TDecoder::OwnerPath, OwnerPath); + result.InsertValue(TDecoder::TierName, TierName); + NProtobufJson::Proto2Json(ProtoConfig, result.InsertValue(TDecoder::TierConfig, NJson::JSON_MAP)); + return result; +} + +bool TTierConfig::IsSame(const TTierConfig& item) const { + return TierName == item.TierName && ProtoConfig.SerializeAsString() == item.ProtoConfig.SerializeAsString(); +} + +bool TTierConfig::DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& r) { + if (!decoder.Read(decoder.GetOwnerPathIdx(), OwnerPath, r)) { + return false; + } + OwnerPath = TFsPath(OwnerPath).Fix().GetPath(); + if (!decoder.Read(decoder.GetTierNameIdx(), TierName, r)) { + return false; + } + if (!decoder.ReadDebugProto(decoder.GetTierConfigIdx(), ProtoConfig, r)) { + return false; + } + return true; +} + +NKikimr::NMetadataManager::TTableRecord TTierConfig::SerializeToRecord() const { + NMetadataManager::TTableRecord result; + result.SetColumn(TDecoder::OwnerPath, NMetadataManager::TYDBValue::Bytes(OwnerPath)); + result.SetColumn(TDecoder::TierName, NMetadataManager::TYDBValue::Bytes(TierName)); + result.SetColumn(TDecoder::TierConfig, NMetadataManager::TYDBValue::Bytes(ProtoConfig.DebugString())); + return result; +} + +TString TTierConfig::GetStorageTablePath() { + return "/" + AppData()->TenantName + "/.metadata/tiering/tiers"; +} + +void TTierConfig::AlteringPreparation(std::vector<TTierConfig>&& objects, + NMetadataManager::IAlterPreparationController<TTierConfig>::TPtr controller, + const NMetadata::IOperationsManager::TModificationContext& /*context*/) { + controller->PreparationFinished(std::move(objects)); +} + +NMetadata::TOperationParsingResult TTierConfig::BuildPatchFromSettings(const NYql::TObjectSettingsImpl& settings, + const NMetadata::IOperationsManager::TModificationContext& /*context*/) { + NKikimr::NMetadataManager::TTableRecord result; + result.SetColumn(TDecoder::TierName, NMetadataManager::TYDBValue::Bytes(settings.GetObjectId())); + { + auto it = settings.GetFeatures().find(TDecoder::OwnerPath); + if (it != settings.GetFeatures().end()) { + result.SetColumn(TDecoder::OwnerPath, NMetadataManager::TYDBValue::Bytes(it->second)); + } + } + { + auto it = settings.GetFeatures().find(TDecoder::TierConfig); + if (it != settings.GetFeatures().end()) { + TTierProto proto; + if (!::google::protobuf::TextFormat::ParseFromString(it->second, &proto)) { + return "incorrect proto format"; + } else { + result.SetColumn(TDecoder::TierConfig, NMetadataManager::TYDBValue::Bytes(it->second)); + } + } + } + return result; +} + +std::vector<Ydb::Column> TTierConfig::TDecoder::GetPKColumns() { + return { NMetadataManager::TYDBColumn::Bytes(TierName) }; +} + +std::vector<Ydb::Column> TTierConfig::TDecoder::GetColumns() { + return + { + NMetadataManager::TYDBColumn::Bytes(OwnerPath), + NMetadataManager::TYDBColumn::Bytes(TierName), + NMetadataManager::TYDBColumn::Bytes(TierConfig) + }; +} + +std::vector<TString> TTierConfig::TDecoder::GetPKColumnIds() { + return { TierName }; +} + +} diff --git a/ydb/core/tx/tiering/tier_config.h b/ydb/core/tx/tiering/tier/object.h index 4a36df4ab01..3d0aef8392b 100644 --- a/ydb/core/tx/tiering/tier_config.h +++ b/ydb/core/tx/tiering/tier/object.h @@ -1,18 +1,19 @@ #pragma once -#include "common.h" -#include "decoder.h" - -#include <ydb/services/metadata/service.h> #include <ydb/core/protos/flat_scheme_op.pb.h> +#include <ydb/core/tx/tiering/common/global_tier_id.h> +#include <ydb/services/metadata/abstract/decoder.h> +#include <ydb/services/metadata/manager/preparation_controller.h> +#include <ydb/services/metadata/manager/table_record.h> +#include <ydb/services/metadata/manager/object.h> +#include <ydb/services/metadata/service.h> #include <library/cpp/json/writer/json_value.h> namespace NKikimr::NColumnShard::NTiers { -class TTierConfig { +class TTierConfig: public NMetadataManager::TObject<TTierConfig> { private: using TTierProto = NKikimrSchemeOp::TStorageTierConfig; - using TS3SettingsProto = NKikimrSchemeOp::TS3Settings; YDB_ACCESSOR_DEF(TString, OwnerPath); YDB_ACCESSOR_DEF(TString, TierName); YDB_ACCESSOR_DEF(TTierProto, ProtoConfig); @@ -35,6 +36,9 @@ public: static inline const TString OwnerPath = "ownerPath"; static inline const TString TierName = "tierName"; static inline const TString TierConfig = "tierConfig"; + static std::vector<Ydb::Column> GetPKColumns(); + static std::vector<Ydb::Column> GetColumns(); + static std::vector<TString> GetPKColumnIds(); TDecoder(const Ydb::ResultSet& rawData) { OwnerPathIdx = GetFieldIndex(rawData, OwnerPath); TierNameIdx = GetFieldIndex(rawData, TierName); @@ -42,6 +46,14 @@ public: } }; bool DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& r); + NMetadataManager::TTableRecord SerializeToRecord() const; + static NMetadata::TOperationParsingResult BuildPatchFromSettings(const NYql::TObjectSettingsImpl& settings, + const NMetadata::IOperationsManager::TModificationContext& context); + + static TString GetStorageTablePath(); + static void AlteringPreparation(std::vector<TTierConfig>&& objects, + NMetadataManager::IAlterPreparationController<TTierConfig>::TPtr controller, + const NMetadata::IOperationsManager::TModificationContext& context); TGlobalTierId GetGlobalTierId() const { return TGlobalTierId(OwnerPath, TierName); @@ -52,6 +64,9 @@ public: } bool IsSame(const TTierConfig& item) const; NJson::TJsonValue GetDebugJson() const; + static TString GetTypeId() { + return "TIER"; + } }; } diff --git a/ydb/core/tx/tiering/tier_config.cpp b/ydb/core/tx/tiering/tier_config.cpp deleted file mode 100644 index 1e4b7085d05..00000000000 --- a/ydb/core/tx/tiering/tier_config.cpp +++ /dev/null @@ -1,33 +0,0 @@ -#include "tier_config.h" -#include <library/cpp/json/writer/json_value.h> -#include <library/cpp/protobuf/json/proto2json.h> - -namespace NKikimr::NColumnShard::NTiers { - -NJson::TJsonValue TTierConfig::GetDebugJson() const { - NJson::TJsonValue result = NJson::JSON_MAP; - result.InsertValue(TDecoder::OwnerPath, OwnerPath); - result.InsertValue(TDecoder::TierName, TierName); - NProtobufJson::Proto2Json(ProtoConfig, result.InsertValue(TDecoder::TierConfig, NJson::JSON_MAP)); - return result; -} - -bool TTierConfig::IsSame(const TTierConfig& item) const { - return TierName == item.TierName && ProtoConfig.SerializeAsString() == item.ProtoConfig.SerializeAsString(); -} - -bool TTierConfig::DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& r) { - if (!decoder.Read(decoder.GetOwnerPathIdx(), OwnerPath, r)) { - return false; - } - OwnerPath = TFsPath(OwnerPath).Fix().GetPath(); - if (!decoder.Read(decoder.GetTierNameIdx(), TierName, r)) { - return false; - } - if (!decoder.ReadDebugProto(decoder.GetTierConfigIdx(), ProtoConfig, r)) { - return false; - } - return true; -} - -} diff --git a/ydb/core/tx/tiering/ut/ut_tiers.cpp b/ydb/core/tx/tiering/ut/ut_tiers.cpp index 04d442bd9fe..3c657d30766 100644 --- a/ydb/core/tx/tiering/ut/ut_tiers.cpp +++ b/ydb/core/tx/tiering/ut/ut_tiers.cpp @@ -8,6 +8,10 @@ #include <ydb/core/wrappers/fake_storage.h> #include <ydb/library/accessor/accessor.h> #include <ydb/public/sdk/cpp/client/ydb_table/table.h> +#include <ydb/services/metadata/manager/alter.h> +#include <ydb/services/metadata/manager/common.h> +#include <ydb/services/metadata/manager/table_record.h> +#include <ydb/services/metadata/manager/ydb_value_operator.h> #include <ydb/services/metadata/service.h> #include <library/cpp/actors/core/av_bootstrapped.h> @@ -69,7 +73,8 @@ public: Function: %s Columns: %s } - })", tableName.c_str(), tableShardsCount, shardingFunction.c_str(), shardingColumns.c_str())); + } + )", tableName.c_str(), tableShardsCount, shardingFunction.c_str(), shardingColumns.c_str())); } }; @@ -114,10 +119,16 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { TActorId ProviderId; TInstant Start; YDB_READONLY_FLAG(Found, false); - YDB_ACCESSOR(ui32, TieringsCount, 1); + YDB_ACCESSOR(ui32, ExpectedTieringsCount, 1); + YDB_ACCESSOR(ui32, ExpectedTiersCount, 1); using TKeyCheckers = TMap<NTiers::TGlobalTierId, TJsonChecker>; YDB_ACCESSOR_DEF(TKeyCheckers, Checkers); public: + void ResetConditions() { + FoundFlag = false; + Checkers.clear(); + } + STATEFN(StateInit) { switch (ev->GetTypeRewrite()) { hFunc(NMetadataProvider::TEvRefreshSubscriberData, Handle); @@ -152,12 +163,12 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { auto snapshot = event->GetSnapshotAs<NTiers::TConfigsSnapshot>(); Y_VERIFY(!!snapshot); auto* tInfo = snapshot->GetTableTiering("/Root/olapStore/olapTable"); - if (TieringsCount) { + if (ExpectedTieringsCount) { if (!tInfo) { Cerr << "tiering not found: " << snapshot->SerializeToString() << Endl; return ; } - if (tInfo->GetRules().size() != TieringsCount) { + if (tInfo->GetRules().size() != ExpectedTieringsCount) { Cerr << "TieringsCount incorrect: " << snapshot->SerializeToString() << Endl; return; } @@ -176,6 +187,10 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { Cerr << "PathId: " << tiering.GetTablePathId() << Endl; } } + if (ExpectedTiersCount != snapshot->GetTierConfigs().size()) { + Cerr << "TieringsCount incorrect: " << snapshot->SerializeToString() << Endl; + return; + } for (auto&& i : Checkers) { auto value = snapshot->GetValue(i.first); NJson::TJsonValue jsonData; @@ -195,13 +210,26 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { void Bootstrap() { ProviderId = NMetadataProvider::MakeServiceId(1); - ExternalDataManipulation = std::make_shared<NTiers::TSnapshotConstructor>("/Root/olapStore"); + ExternalDataManipulation = std::make_shared<NTiers::TSnapshotConstructor>(); Become(&TThis::StateInit); Sender<NMetadataProvider::TEvSubscribeExternal>(ExternalDataManipulation).SendTo(ProviderId); Start = Now(); } }; + class TEmulatorAlterController: public NMetadataManager::IAlterController { + private: + YDB_READONLY_FLAG(Finished, false); + public: + virtual void AlterProblem(const TString& errorMessage) override { + Cerr << errorMessage << Endl; + Y_VERIFY(false); + } + virtual void AlterFinished() override { + FinishedFlag = true; + } + }; + Y_UNIT_TEST(DSConfigsStub) { TPortManager pm; @@ -236,15 +264,55 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { runtime.SimulateSleep(TDuration::Seconds(10)); Cerr << "Initialization finished" << Endl; - lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/tiers` (ownerPath, tierName, tierConfig) " - "VALUES ('/Root/olapStore', 'tier1', '" + ConfigProtoStr + "')"); - lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/rules` (ownerPath, tierName, tablePath, column, durationForEvict) " - "VALUES ('/Root/olapStore', 'tier1', '/Root/olapStore/olapTable', 'timestamp', '10d')"); - const TInstant start = Now(); - while (!emulator->IsFound() && Now() - start < TDuration::Seconds(2000)) { - runtime.SimulateSleep(TDuration::Seconds(1)); + lHelper.StartSchemaRequest("CREATE OBJECT tier1 ( " + "TYPE TIER) WITH (tierConfig = `" + ConfigProtoStr + "`, ownerPath = `/Root/olapStore`)"); + lHelper.StartSchemaRequest("CREATE OBJECT tier1 (" + "TYPE TIERING_RULE) WITH (tierName = tier1, tablePath = `/Root/olapStore/olapTable`, " + "ownerPath = `/Root/olapStore`, column = timestamp, durationForEvict = `10d` )"); + { + const TInstant start = Now(); + while (!emulator->IsFound() && Now() - start < TDuration::Seconds(2000)) { + runtime.SimulateSleep(TDuration::Seconds(1)); + } + Y_VERIFY(emulator->IsFound()); + } + { + lHelper.StartSchemaRequest("ALTER OBJECT tier1 ( " + "TYPE TIER) SET tierConfig = `" + ConfigProtoStr1 + "`"); + + emulator->ResetConditions(); + emulator->MutableCheckers().emplace(NTiers::TGlobalTierId("/Root/olapStore", "tier1"), TJsonChecker("Name", "abc1")); + { + const TInstant start = Now(); + while (!emulator->IsFound() && Now() - start < TDuration::Seconds(2000)) { + runtime.SimulateSleep(TDuration::Seconds(1)); + } + Y_VERIFY(emulator->IsFound()); + } + } + { + std::vector<NMetadataManager::TTableRecord> patches; + { + NMetadataManager::TTableRecord patch; + patch.SetColumn("ownerPath", NMetadataManager::TYDBValue::Bytes("/Root/olapStore")); + patch.SetColumn("tierName", NMetadataManager::TYDBValue::Bytes("tier1")); + patches.emplace_back(std::move(patch)); + } + + lHelper.StartSchemaRequest("DROP OBJECT tier1(TYPE TIER)"); + lHelper.StartSchemaRequest("DROP OBJECT tier1(TYPE TIERING_RULE)"); + + emulator->ResetConditions(); + emulator->SetExpectedTieringsCount(0); + emulator->SetExpectedTiersCount(0); + { + const TInstant start = Now(); + while (!emulator->IsFound() && Now() - start < TDuration::Seconds(20)) { + runtime.SimulateSleep(TDuration::Seconds(1)); + } + Y_VERIFY(emulator->IsFound()); + } } - Y_VERIFY(emulator->IsFound()); } } @@ -280,34 +348,47 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { runtime.SimulateSleep(TDuration::Seconds(10)); Cerr << "Initialization finished" << Endl; - lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/tiers` (ownerPath, tierName, tierConfig) " - "VALUES ('/Root/olapStore', 'tier1', '" + ConfigProtoStr1 + "')"); - lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/rules` (ownerPath, tierName, tablePath, column, durationForEvict) " - "VALUES ('/Root/olapStore', 'tier1', '/Root/olapStore/olapTable', 'timestamp', '10d')"); + lHelper.StartSchemaRequest("CREATE OBJECT tier1 ( " + "TYPE TIER) WITH (tierConfig = `" + ConfigProtoStr1 + "`, ownerPath = `/Root/olapStore`)"); + lHelper.StartSchemaRequest("CREATE OBJECT tier1 (" + "TYPE TIERING_RULE) " + "WITH (ownerPath = `/Root/olapStore`, tierName = tier1, tablePath = `/Root/olapStore/olapTable`, column = timestamp, durationForEvict = `10d` " + ")"); { TTestCSEmulator emulator; emulator.MutableCheckers().emplace(NTiers::TGlobalTierId("/Root/olapStore", "tier1"), TJsonChecker("Name", "abc1")); - emulator.SetTieringsCount(1); + emulator.SetExpectedTieringsCount(1); + emulator.SetExpectedTiersCount(1); emulator.CheckRuntime(runtime); } - lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/tiers` (ownerPath, tierName, tierConfig) " - "VALUES ('/Root/olapStore', 'tier2', '" + ConfigProtoStr2 + "')"); - lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/rules` (ownerPath, tierName, tablePath, column, durationForEvict) " - "VALUES ('/Root/olapStore', 'tier2', '/Root/olapStore/olapTable', 'timestamp', '20d')"); + lHelper.StartSchemaRequest("CREATE OBJECT tier2 ( " + "TYPE TIER) WITH (ownerPath = `/Root/olapStore`, tierConfig = `" + ConfigProtoStr2 + "`)"); + lHelper.StartSchemaRequest("CREATE OBJECT tier2 (" + "TYPE TIERING_RULE) WITH (tierName = tier2, tablePath = `/Root/olapStore/olapTable`, ownerPath = `/Root/olapStore`, column = timestamp, durationForEvict = `20d` )"); { TTestCSEmulator emulator; emulator.MutableCheckers().emplace(NTiers::TGlobalTierId("/Root/olapStore", "tier1"), TJsonChecker("Name", "abc1")); emulator.MutableCheckers().emplace(NTiers::TGlobalTierId("/Root/olapStore", "tier2"), TJsonChecker("Name", "abc2")); - emulator.SetTieringsCount(2); + emulator.SetExpectedTieringsCount(2); + emulator.SetExpectedTiersCount(2); emulator.CheckRuntime(runtime); } - lHelper.StartDataRequest("DELETE FROM `/Root/.configs/tiering/tiers`"); - lHelper.StartDataRequest("DELETE FROM `/Root/.configs/tiering/rules`"); + lHelper.StartSchemaRequest("DROP OBJECT tier2 (TYPE TIER)"); + lHelper.StartSchemaRequest("DROP OBJECT tier2 (TYPE TIERING_RULE)"); { TTestCSEmulator emulator; - emulator.SetTieringsCount(0); + emulator.SetExpectedTieringsCount(1); + emulator.SetExpectedTiersCount(1); + emulator.CheckRuntime(runtime); + } + lHelper.StartSchemaRequest("DROP OBJECT tier1 (TYPE TIER)"); + lHelper.StartSchemaRequest("DROP OBJECT tier1 (TYPE TIERING_RULE)"); + { + TTestCSEmulator emulator; + emulator.SetExpectedTieringsCount(0); + emulator.SetExpectedTiersCount(0); emulator.CheckRuntime(runtime); } @@ -381,19 +462,21 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { runtime.SimulateSleep(TDuration::Seconds(20)); Cerr << "Initialization finished" << Endl; - lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/tiers` (ownerPath, tierName, tierConfig) " - "VALUES ('/Root/olapStore', 'fakeTier1', '" + TierConfigProtoStr + "')"); - lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/tiers` (ownerPath, tierName, tierConfig) " - "VALUES ('/Root/olapStore', 'fakeTier2', '" + TierConfigProtoStr + "')"); - lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/rules` (ownerPath, tierName, tablePath, column, durationForEvict) " - "VALUES ('/Root/olapStore', 'fakeTier1', '/Root/olapStore/olapTable', 'timestamp', '10d')"); - lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/rules` (ownerPath, tierName, tablePath, column, durationForEvict) " - "VALUES ('/Root/olapStore', 'fakeTier2', '/Root/olapStore/olapTable', 'timestamp', '20d')"); + lHelper.StartSchemaRequest("CREATE OBJECT fakeTier1 ( " + "TYPE TIER) WITH (tierConfig = `" + TierConfigProtoStr + "`, ownerPath = `/Root/olapStore`)"); + lHelper.StartSchemaRequest("CREATE OBJECT fakeTier1 (" + "TYPE TIERING_RULE) WITH (tierName = fakeTier1, ownerPath = `/Root/olapStore`, tablePath = `/Root/olapStore/olapTable`, column = timestamp, durationForEvict = `10d` )"); + + lHelper.StartSchemaRequest("CREATE OBJECT fakeTier2 ( " + "TYPE TIER) WITH (tierConfig = `" + TierConfigProtoStr + "`, ownerPath = `/Root/olapStore`)"); + lHelper.StartSchemaRequest("CREATE OBJECT fakeTier2 (" + "TYPE TIERING_RULE) WITH (tierName = fakeTier2, ownerPath = `/Root/olapStore`, tablePath = `/Root/olapStore/olapTable`, column = timestamp, durationForEvict = `20d` )"); { TTestCSEmulator emulator; emulator.MutableCheckers().emplace(NTiers::TGlobalTierId("/Root/olapStore", "fakeTier1"), TJsonChecker("Name", "fakeTier")); emulator.MutableCheckers().emplace(NTiers::TGlobalTierId("/Root/olapStore", "fakeTier2"), TJsonChecker("ObjectStorage.Endpoint", TierEndpoint)); - emulator.SetTieringsCount(2); + emulator.SetExpectedTieringsCount(2); + emulator.SetExpectedTiersCount(2); emulator.CheckRuntime(runtime); } Cerr << "Insert..." << Endl; diff --git a/ydb/services/bg_tasks/abstract/task.h b/ydb/services/bg_tasks/abstract/task.h index eb87ea5e1b9..a796e671118 100644 --- a/ydb/services/bg_tasks/abstract/task.h +++ b/ydb/services/bg_tasks/abstract/task.h @@ -5,8 +5,8 @@ #include <ydb/core/base/appdata.h> #include <ydb/core/base/events.h> -#include <ydb/core/tx/tiering/decoder.h> #include <ydb/library/accessor/accessor.h> +#include <ydb/services/metadata/abstract/decoder.h> #include <library/cpp/actors/core/events.h> #include <library/cpp/object_factory/object_factory.h> diff --git a/ydb/services/bg_tasks/ds_table/CMakeLists.txt b/ydb/services/bg_tasks/ds_table/CMakeLists.txt index 5fecb1aa1bc..573f38f4356 100644 --- a/ydb/services/bg_tasks/ds_table/CMakeLists.txt +++ b/ydb/services/bg_tasks/ds_table/CMakeLists.txt @@ -32,4 +32,5 @@ target_sources(services-bg_tasks-ds_table PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/add_tasks.cpp ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/task_enabled.cpp ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/lock_pinger.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/bg_tasks/ds_table/initialization.cpp ) diff --git a/ydb/services/bg_tasks/ds_table/executor.cpp b/ydb/services/bg_tasks/ds_table/executor.cpp index 5cae73625fc..c8be62c0f49 100644 --- a/ydb/services/bg_tasks/ds_table/executor.cpp +++ b/ydb/services/bg_tasks/ds_table/executor.cpp @@ -5,82 +5,24 @@ #include "task_executor.h" #include "task_enabled.h" #include "fetch_tasks.h" +#include "initialization.h" -namespace NKikimr::NBackgroundTasks { +#include <ydb/services/metadata/initializer/fetcher.h> +#include <ydb/services/metadata/initializer/manager.h> +#include <ydb/services/metadata/service.h> -TVector<NMetadataInitializer::ITableModifier::TPtr> TExecutor::BuildModifiers() const { - const TString tableName = Config.GetTablePath(); - TVector<NMetadataInitializer::ITableModifier::TPtr> result; - { - Ydb::Table::CreateTableRequest request; - request.set_session_id(""); - request.set_path(tableName); - request.add_primary_key("id"); - { - auto& column = *request.add_columns(); - column.set_name("id"); - column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); - } - { - auto& column = *request.add_columns(); - column.set_name("enabled"); - column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::BOOL); - } - { - auto& column = *request.add_columns(); - column.set_name("class"); - column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); - } - { - auto& column = *request.add_columns(); - column.set_name("executorId"); - column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); - } - { - auto& column = *request.add_columns(); - column.set_name("lastPing"); - column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT32); - } - { - auto& column = *request.add_columns(); - column.set_name("startInstant"); - column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT32); - } - { - auto& column = *request.add_columns(); - column.set_name("constructInstant"); - column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT32); - } - { - auto& column = *request.add_columns(); - column.set_name("activity"); - column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); - } - { - auto& column = *request.add_columns(); - column.set_name("scheduler"); - column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); - } - { - auto& column = *request.add_columns(); - column.set_name("state"); - column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); - } - result.emplace_back(new NMetadataInitializer::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request)); - } - return result; -} +namespace NKikimr::NBackgroundTasks { void TExecutor::Handle(TEvStartAssign::TPtr& /*ev*/) { ALS_DEBUG(NKikimrServices::BG_TASKS) << "start assign"; if (Config.GetMaxInFlight() > CurrentTaskIds.size()) { - Register(new TAssignTasksActor(Config.GetMaxInFlight() - CurrentTaskIds.size(), Controller, ExecutorId)); + Register(new TAssignTasksActor(Config.GetMaxInFlight() - CurrentTaskIds.size(), InternalController, ExecutorId)); } } void TExecutor::Handle(TEvAssignFinished::TPtr& /*ev*/) { ALS_DEBUG(NKikimrServices::BG_TASKS) << "assign finished"; - Register(new TFetchTasksActor(CurrentTaskIds, ExecutorId, Controller)); + Register(new TFetchTasksActor(CurrentTaskIds, ExecutorId, InternalController)); } void TExecutor::Handle(TEvFetchingFinished::TPtr& /*ev*/) { @@ -96,7 +38,7 @@ void TExecutor::Handle(TEvLockPingerFinished::TPtr& /*ev*/) { void TExecutor::Handle(TEvLockPingerStart::TPtr& /*ev*/) { ALS_DEBUG(NKikimrServices::BG_TASKS) << "pinger start"; if (CurrentTaskIds.size()) { - Register(new TLockPingerActor(Controller, CurrentTaskIds)); + Register(new TLockPingerActor(InternalController, CurrentTaskIds)); } else { Schedule(Config.GetPingPeriod(), new TEvLockPingerStart); } @@ -105,7 +47,7 @@ void TExecutor::Handle(TEvLockPingerStart::TPtr& /*ev*/) { void TExecutor::Handle(TEvTaskFetched::TPtr& ev) { ALS_DEBUG(NKikimrServices::BG_TASKS) << "task fetched"; if (CurrentTaskIds.emplace(ev->Get()->GetTask().GetId()).second) { - Register(new TTaskExecutor(ev->Get()->GetTask(), Controller)); + Register(new TTaskExecutor(ev->Get()->GetTask(), InternalController)); } } @@ -118,24 +60,37 @@ void TExecutor::Handle(TEvTaskExecutorFinished::TPtr& ev) { void TExecutor::Handle(TEvAddTask::TPtr& ev) { ALS_DEBUG(NKikimrServices::BG_TASKS) << "add task"; - Register(new TAddTasksActor(Controller, ev->Get()->GetTask(), ev->Sender)); + Register(new TAddTasksActor(InternalController, ev->Get()->GetTask(), ev->Sender)); } void TExecutor::Handle(TEvUpdateTaskEnabled::TPtr& ev) { ALS_DEBUG(NKikimrServices::BG_TASKS) << "start task"; - Register(new TUpdateTaskEnabledActor(Controller, ev->Get()->GetTaskId(), ev->Get()->GetEnabled(), ev->Sender)); -} - -void TExecutor::RegisterState() { - Controller = std::make_shared<TExecutorController>(SelfId(), Config); - Become(&TExecutor::StateMain); + Register(new TUpdateTaskEnabledActor(InternalController, ev->Get()->GetTaskId(), ev->Get()->GetEnabled(), ev->Sender)); } -void TExecutor::OnInitialized() { +void TExecutor::Handle(NMetadataInitializer::TEvInitializationFinished::TPtr& /*ev*/) { Sender<TEvStartAssign>().SendTo(SelfId()); Schedule(Config.GetPingPeriod(), new TEvLockPingerStart); } +void TExecutor::Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev) { + auto snapshot = ev->Get()->GetValidatedSnapshotAs<NMetadataInitializer::TSnapshot>(); + auto b = std::make_shared<TBGTasksInitializer>(Config); + Register(new NMetadataInitializer::TDSAccessorInitialized(Config.GetRequestConfig(), "bg_tasks", b, InternalController, snapshot)); +} + +void TExecutor::Bootstrap() { + InternalController = std::make_shared<TExecutorController>(SelfId(), Config); + Become(&TExecutor::StateMain); + auto manager = std::make_shared<NMetadataInitializer::TFetcher>(); + if (NMetadataProvider::TServiceOperator::IsEnabled()) { + Sender<NMetadataProvider::TEvSubscribeExternal>(manager).SendTo(NMetadataProvider::MakeServiceId(SelfId().NodeId())); + } else { + auto b = std::make_shared<TBGTasksInitializer>(Config); + Register(new NMetadataInitializer::TDSAccessorInitialized(Config.GetRequestConfig(), "bg_tasks", b, InternalController, nullptr)); + } +} + NActors::IActor* CreateService(const TConfig& config) { return new TExecutor(config); } diff --git a/ydb/services/bg_tasks/ds_table/executor.h b/ydb/services/bg_tasks/ds_table/executor.h index b15d25f6c92..a5d334302d2 100644 --- a/ydb/services/bg_tasks/ds_table/executor.h +++ b/ydb/services/bg_tasks/ds_table/executor.h @@ -51,22 +51,16 @@ public: } }; -class TExecutor: public NMetadataInitializer::TDSAccessorInitialized { +class TExecutor: public NActors::TActorBootstrapped<TExecutor> { private: - using TBase = NMetadataInitializer::TDSAccessorInitialized; + using TBase = NActors::TActorBootstrapped<TExecutor>; TString TableName; const TString ExecutorId = TGUID::CreateTimebased().AsUuidString(); const TConfig Config; std::set<TString> CurrentTaskIds; - TExecutorController::TPtr Controller; + TExecutorController::TPtr InternalController; protected: - virtual void RegisterState() override; - virtual void OnInitialized() override; - virtual void Prepare(NMetadataInitializer::IController::TPtr controller) override { - controller->PreparationFinished(BuildModifiers()); - } - TVector<NMetadataInitializer::ITableModifier::TPtr> BuildModifiers() const; - + void Handle(NMetadataInitializer::TEvInitializationFinished::TPtr& ev); void Handle(TEvStartAssign::TPtr& ev); void Handle(TEvAssignFinished::TPtr& ev); void Handle(TEvFetchingFinished::TPtr& ev); @@ -76,9 +70,11 @@ protected: void Handle(TEvUpdateTaskEnabled::TPtr& ev); void Handle(TEvLockPingerStart::TPtr& ev); void Handle(TEvLockPingerFinished::TPtr& ev); + void Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev); - STFUNC(StateMain) { + STATEFN(StateMain) { switch (ev->GetTypeRewrite()) { + hFunc(NMetadataInitializer::TEvInitializationFinished, Handle); hFunc(TEvStartAssign, Handle); hFunc(TEvAssignFinished, Handle); hFunc(TEvFetchingFinished, Handle); @@ -87,15 +83,17 @@ protected: hFunc(TEvTaskExecutorFinished, Handle); hFunc(TEvLockPingerStart, Handle); hFunc(TEvLockPingerFinished, Handle); + hFunc(NMetadataProvider::TEvRefreshSubscriberData, Handle); default: - TBase::StateMain(ev, ctx); + break; } } public: + void Bootstrap(); + TExecutor(const TConfig& config) - : TBase(config.GetRequestConfig()) - , Config(config) + : Config(config) { TServiceOperator::Register(); } diff --git a/ydb/services/bg_tasks/ds_table/executor_controller.cpp b/ydb/services/bg_tasks/ds_table/executor_controller.cpp index 14f9a9ac691..6d37a5f2fc7 100644 --- a/ydb/services/bg_tasks/ds_table/executor_controller.cpp +++ b/ydb/services/bg_tasks/ds_table/executor_controller.cpp @@ -23,6 +23,10 @@ void TExecutorController::TaskFinished(const TString& taskId) const { ExecutorActorId.Send(ExecutorActorId, new TEvTaskExecutorFinished(taskId)); } +void TExecutorController::InitializationFinished(const TString& id) const { + ExecutorActorId.Send(ExecutorActorId, new NMetadataInitializer::TEvInitializationFinished(id)); +} + void TExecutorController::LockPingerFinished() const { ExecutorActorId.Send(ExecutorActorId, new TEvLockPingerFinished()); } diff --git a/ydb/services/bg_tasks/ds_table/executor_controller.h b/ydb/services/bg_tasks/ds_table/executor_controller.h index 3d3aebb9cd3..cf831712c07 100644 --- a/ydb/services/bg_tasks/ds_table/executor_controller.h +++ b/ydb/services/bg_tasks/ds_table/executor_controller.h @@ -1,6 +1,8 @@ #pragma once #include "config.h" +#include <ydb/services/metadata/initializer/common.h> + #include <library/cpp/actors/core/actor.h> #include <library/cpp/actors/core/actorid.h> @@ -8,7 +10,7 @@ namespace NKikimr::NBackgroundTasks { class TTask; -class TExecutorController { +class TExecutorController: public NMetadataInitializer::IInitializerOutput { private: const NActors::TActorIdentity ExecutorActorId; YDB_READONLY_DEF(TConfig, Config); @@ -29,6 +31,7 @@ public: return Config.GetRequestConfig(); } + virtual void InitializationFinished(const TString& id) const override; void LockPingerFinished() const; void TaskFetched(const TTask& task) const; void TaskFinished(const TString& taskId) const; diff --git a/ydb/services/bg_tasks/ds_table/initialization.cpp b/ydb/services/bg_tasks/ds_table/initialization.cpp new file mode 100644 index 00000000000..efcf2d249ba --- /dev/null +++ b/ydb/services/bg_tasks/ds_table/initialization.cpp @@ -0,0 +1,68 @@ +#include "initialization.h" + +namespace NKikimr::NBackgroundTasks { + +void TBGTasksInitializer::DoPrepare(NMetadataInitializer::IInitializerInput::TPtr controller) const { + const TString tableName = Config.GetTablePath(); + TVector<NMetadataInitializer::ITableModifier::TPtr> result; + { + Ydb::Table::CreateTableRequest request; + request.set_session_id(""); + request.set_path(tableName); + request.add_primary_key("id"); + { + auto& column = *request.add_columns(); + column.set_name("id"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *request.add_columns(); + column.set_name("enabled"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::BOOL); + } + { + auto& column = *request.add_columns(); + column.set_name("class"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *request.add_columns(); + column.set_name("executorId"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *request.add_columns(); + column.set_name("lastPing"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT32); + } + { + auto& column = *request.add_columns(); + column.set_name("startInstant"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT32); + } + { + auto& column = *request.add_columns(); + column.set_name("constructInstant"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT32); + } + { + auto& column = *request.add_columns(); + column.set_name("activity"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *request.add_columns(); + column.set_name("scheduler"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *request.add_columns(); + column.set_name("state"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + result.emplace_back(new NMetadataInitializer::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request, "create")); + } + controller->PreparationFinished(result); +} + +} diff --git a/ydb/services/bg_tasks/ds_table/initialization.h b/ydb/services/bg_tasks/ds_table/initialization.h new file mode 100644 index 00000000000..aeb4ac84ac3 --- /dev/null +++ b/ydb/services/bg_tasks/ds_table/initialization.h @@ -0,0 +1,21 @@ +#pragma once +#include "config.h" + +#include <ydb/services/metadata/abstract/common.h> + +namespace NKikimr::NBackgroundTasks { + +class TBGTasksInitializer: public NMetadata::IInitializationBehaviour { +private: + const TConfig Config; +protected: + virtual void DoPrepare(NMetadataInitializer::IInitializerInput::TPtr controller) const override; +public: + TBGTasksInitializer(const TConfig& config) + : Config(config) + { + + } +}; + +} diff --git a/ydb/services/metadata/CMakeLists.txt b/ydb/services/metadata/CMakeLists.txt index 55a81045b7a..96609b8fa6b 100644 --- a/ydb/services/metadata/CMakeLists.txt +++ b/ydb/services/metadata/CMakeLists.txt @@ -9,7 +9,9 @@ add_subdirectory(abstract) add_subdirectory(ds_table) add_subdirectory(initializer) +add_subdirectory(manager) add_subdirectory(request) +add_subdirectory(secret) add_library(ydb-services-metadata) target_link_libraries(ydb-services-metadata PUBLIC @@ -17,6 +19,7 @@ target_link_libraries(ydb-services-metadata PUBLIC yutil cpp-actors-core services-metadata-abstract + services-metadata-manager ) target_sources(ydb-services-metadata PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/metadata/service.cpp diff --git a/ydb/services/metadata/abstract/CMakeLists.txt b/ydb/services/metadata/abstract/CMakeLists.txt index 51ed2ede54a..4da40e6d1ff 100644 --- a/ydb/services/metadata/abstract/CMakeLists.txt +++ b/ydb/services/metadata/abstract/CMakeLists.txt @@ -19,5 +19,8 @@ target_link_libraries(services-metadata-abstract PUBLIC ) target_sources(services-metadata-abstract PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/metadata/abstract/common.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/abstract/manager.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/abstract/decoder.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/abstract/fetcher.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/abstract/kqp_common.cpp ) diff --git a/ydb/services/metadata/abstract/common.cpp b/ydb/services/metadata/abstract/common.cpp index 7662c8de1c1..fa5484fdbea 100644 --- a/ydb/services/metadata/abstract/common.cpp +++ b/ydb/services/metadata/abstract/common.cpp @@ -1,22 +1,5 @@ #include "common.h" -#include <util/string/join.h> namespace NKikimr::NMetadataProvider { -TString ISnapshotParser::GetSnapshotId() const { - if (!SnapshotId) { - SnapshotId = JoinSeq(",", GetTables()); - } - return *SnapshotId; -} - -ISnapshot::TPtr ISnapshotParser::ParseSnapshot(const Ydb::Table::ExecuteQueryResult& rawData, const TInstant actuality) const { - ISnapshot::TPtr result = CreateSnapshot(actuality); - Y_VERIFY(result); - if (!result->DeserializeFromResultSet(rawData)) { - return nullptr; - } - return result; -} - } diff --git a/ydb/services/metadata/abstract/common.h b/ydb/services/metadata/abstract/common.h index 259a384bb7e..f3cabd77350 100644 --- a/ydb/services/metadata/abstract/common.h +++ b/ydb/services/metadata/abstract/common.h @@ -1,14 +1,15 @@ #pragma once +#include "fetcher.h" + #include <library/cpp/actors/core/actor.h> #include <library/cpp/actors/core/actorid.h> #include <library/cpp/actors/core/events.h> #include <library/cpp/actors/core/actor_virtual.h> #include <library/cpp/actors/core/actorsystem.h> #include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/object_factory/object_factory.h> #include <ydb/core/base/events.h> #include <ydb/library/accessor/accessor.h> -#include <ydb/public/api/protos/ydb_table.pb.h> -#include <ydb/services/metadata/initializer/common.h> namespace NKikimr::NMetadataProvider { @@ -21,80 +22,13 @@ enum EEvSubscribe { EvUnsubscribeLocal, EvSubscribeExternal, EvUnsubscribeExternal, + EvYQLResponse, + EvAlterObjects, EvEnd }; static_assert(EEvSubscribe::EvEnd < EventSpaceEnd(TKikimrEvents::ES_METADATA_PROVIDER), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_METADATA_PROVIDER)"); -class ISnapshot; - -class ISnapshotAcceptorController { -public: - using TPtr = std::shared_ptr<ISnapshotAcceptorController>; - virtual ~ISnapshotAcceptorController() = default; - virtual void Enriched(std::shared_ptr<ISnapshot> enrichedSnapshot) = 0; - virtual void EnrichProblem(const TString& errorMessage) = 0; -}; - -class ISnapshot { -private: - YDB_READONLY_DEF(TInstant, Actuality); -protected: - virtual bool DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawData) = 0; - virtual TString DoSerializeToString() const = 0; -public: - using TPtr = std::shared_ptr<ISnapshot>; - ISnapshot(const TInstant actuality) - : Actuality(actuality) { - - } - - bool DeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawData) { - return DoDeserializeFromResultSet(rawData); - } - - TString SerializeToString() const { - return DoSerializeToString(); - } - - virtual ~ISnapshot() = default; -}; - -class ISnapshotParser { -protected: - virtual ISnapshot::TPtr CreateSnapshot(const TInstant actuality) const = 0; - virtual void DoPrepare(NMetadataInitializer::IController::TPtr controller) const = 0; - virtual const TVector<TString>& DoGetTables() const = 0; - mutable std::optional<TString> SnapshotId; -public: - using TPtr = std::shared_ptr<ISnapshotParser>; - - TString GetSnapshotId() const; - ISnapshot::TPtr ParseSnapshot(const Ydb::Table::ExecuteQueryResult& rawData, const TInstant actuality) const; - - void Prepare(NMetadataInitializer::IController::TPtr controller) const { - return DoPrepare(controller); - } - - virtual void EnrichSnapshotData(ISnapshot::TPtr original, ISnapshotAcceptorController::TPtr controller) const { - controller->Enriched(original); - } - - const TVector<TString>& GetTables() const { - return DoGetTables(); - } - - virtual ~ISnapshotParser() = default; -}; - -template <class TSnapshot> -class TGenericSnapshotParser: public ISnapshotParser { -protected: - virtual ISnapshot::TPtr CreateSnapshot(const TInstant actuality) const override { - return std::make_shared<TSnapshot>(actuality); - } -}; - class TEvRefreshSubscriberData: public NActors::TEventLocal<TEvRefreshSubscriberData, EvRefreshSubscriberData> { private: YDB_READONLY_DEF(ISnapshot::TPtr, Snapshot); @@ -108,6 +42,13 @@ public: const TSnapshot* GetSnapshotAs() const { return dynamic_cast<const TSnapshot*>(Snapshot.get()); } + + template <class TSnapshot> + std::shared_ptr<TSnapshot> GetValidatedSnapshotAs() const { + auto result = dynamic_pointer_cast<TSnapshot>(Snapshot); + Y_VERIFY(result); + return result; + } }; } diff --git a/ydb/core/tx/tiering/decoder.cpp b/ydb/services/metadata/abstract/decoder.cpp index f8b0c5a190e..8b6375ffd31 100644 --- a/ydb/core/tx/tiering/decoder.cpp +++ b/ydb/services/metadata/abstract/decoder.cpp @@ -2,10 +2,11 @@ #include <library/cpp/protobuf/json/proto2json.h> #include <contrib/libs/protobuf/src/google/protobuf/text_format.h> #include <library/cpp/actors/core/log.h> +#include <library/cpp/json/json_reader.h> namespace NKikimr::NInternal { -i32 TDecoderBase::GetFieldIndex(const Ydb::ResultSet& rawData, const TString& columnId, const bool verify /*= true*/) const { +i32 TDecoderBase::GetFieldIndex(const Ydb::ResultSet& rawData, const TString& columnId) const { i32 idx = 0; for (auto&& i : rawData.columns()) { if (i.name() == columnId) { @@ -13,16 +14,21 @@ i32 TDecoderBase::GetFieldIndex(const Ydb::ResultSet& rawData, const TString& co } ++idx; } - Y_VERIFY(!verify, "incorrect columnId %s", columnId.data()); return -1; } -bool TDecoderBase::Read(const ui32 columnIdx, TString& result, const Ydb::Value& r) const { +bool TDecoderBase::Read(const i32 columnIdx, TString& result, const Ydb::Value& r) const { + if (columnIdx >= (i32)r.items().size() || columnIdx < 0) { + return false; + } result = r.items()[columnIdx].bytes_value(); return true; } -bool TDecoderBase::Read(const ui32 columnIdx, TDuration& result, const Ydb::Value& r) const { +bool TDecoderBase::Read(const i32 columnIdx, TDuration& result, const Ydb::Value& r) const { + if (columnIdx >= (i32)r.items().size() || columnIdx < 0) { + return false; + } const TString& s = r.items()[columnIdx].bytes_value(); if (!TDuration::TryParse(s, result)) { ALS_WARN(0) << "cannot parse duration for tiering: " << s; @@ -31,7 +37,10 @@ bool TDecoderBase::Read(const ui32 columnIdx, TDuration& result, const Ydb::Valu return true; } -bool TDecoderBase::Read(const ui32 columnIdx, bool& result, const Ydb::Value& r) const { +bool TDecoderBase::Read(const i32 columnIdx, bool& result, const Ydb::Value& r) const { + if (columnIdx >= (i32)r.items().size() || columnIdx < 0) { + return false; + } auto& pValue = r.items()[columnIdx]; if (pValue.has_bool_value()) { result = pValue.bool_value(); @@ -42,14 +51,17 @@ bool TDecoderBase::Read(const ui32 columnIdx, bool& result, const Ydb::Value& r) return true; } -bool TDecoderBase::Read(const ui32 columnIdx, TInstant& result, const Ydb::Value& r) const { +bool TDecoderBase::Read(const i32 columnIdx, TInstant& result, const Ydb::Value& r) const { + if (columnIdx >= (i32)r.items().size() || columnIdx < 0) { + return false; + } auto& pValue = r.items()[columnIdx]; if (pValue.has_uint32_value()) { result = TInstant::Seconds(pValue.uint32_value()); } else if (pValue.has_int64_value()) { - result = TInstant::Seconds(pValue.int64_value()); + result = TInstant::MicroSeconds(pValue.int64_value()); } else if (pValue.has_uint64_value()) { - result = TInstant::Seconds(pValue.uint64_value()); + result = TInstant::MicroSeconds(pValue.uint64_value()); } else if (pValue.has_int32_value()) { result = TInstant::Seconds(pValue.int32_value()); } else { @@ -59,7 +71,22 @@ bool TDecoderBase::Read(const ui32 columnIdx, TInstant& result, const Ydb::Value return true; } -bool TDecoderBase::ReadDebugProto(const ui32 columnIdx, ::google::protobuf::Message& result, const Ydb::Value& r) const { +bool TDecoderBase::ReadJson(const i32 columnIdx, NJson::TJsonValue& result, const Ydb::Value& r) const { + if (columnIdx >= (i32)r.items().size() || columnIdx < 0) { + return false; + } + const TString& s = r.items()[columnIdx].bytes_value(); + if (!NJson::ReadJsonFastTree(s, &result)) { + ALS_ERROR(0) << "cannot parse json string: " << s; + return false; + } + return true; +} + +bool TDecoderBase::ReadDebugProto(const i32 columnIdx, ::google::protobuf::Message& result, const Ydb::Value& r) const { + if (columnIdx >= (i32)r.items().size() || columnIdx < 0) { + return false; + } const TString& s = r.items()[columnIdx].bytes_value(); if (!::google::protobuf::TextFormat::ParseFromString(s, &result)) { ALS_ERROR(0) << "cannot parse proto string: " << s; diff --git a/ydb/services/metadata/abstract/decoder.h b/ydb/services/metadata/abstract/decoder.h new file mode 100644 index 00000000000..39977b9e681 --- /dev/null +++ b/ydb/services/metadata/abstract/decoder.h @@ -0,0 +1,30 @@ +#pragma once +#include <ydb/public/api/protos/ydb_value.pb.h> +#include <ydb/services/metadata/manager/table_record.h> + +#include <library/cpp/json/writer/json_value.h> +#include <util/datetime/base.h> + +namespace NKikimr::NInternal { + +class TDecoderBase { +protected: + i32 GetFieldIndex(const Ydb::ResultSet& rawData, const TString& columnId) const; +public: + bool Read(const i32 columnIdx, TString& result, const Ydb::Value& r) const; + bool ReadDebugProto(const i32 columnIdx, ::google::protobuf::Message& result, const Ydb::Value& r) const; + bool ReadJson(const i32 columnIdx, NJson::TJsonValue& result, const Ydb::Value& r) const; + bool Read(const i32 columnIdx, TDuration& result, const Ydb::Value& r) const; + bool Read(const i32 columnIdx, TInstant& result, const Ydb::Value& r) const; + bool Read(const i32 columnIdx, bool& result, const Ydb::Value& r) const; + + template <class TObject> + static bool DeserializeFromRecord(TObject& object, const NMetadataManager::TTableRecord& tr) { + auto rs = tr.BuildRecordSet(); + Y_VERIFY(rs.rows().size() == 1); + typename TObject::TDecoder decoder(rs); + return object.DeserializeFromRecord(decoder, rs.rows()[0]); + } +}; + +} diff --git a/ydb/services/metadata/abstract/fetcher.cpp b/ydb/services/metadata/abstract/fetcher.cpp new file mode 100644 index 00000000000..d4d1efcd3ee --- /dev/null +++ b/ydb/services/metadata/abstract/fetcher.cpp @@ -0,0 +1,24 @@ +#include "fetcher.h" +#include <util/string/join.h> + +namespace NKikimr::NMetadataProvider { + +ISnapshot::TPtr ISnapshotParser::ParseSnapshot(const Ydb::Table::ExecuteQueryResult& rawData, const TInstant actuality) const { + ISnapshot::TPtr result = CreateSnapshot(actuality); + Y_VERIFY(result); + if (!result->DeserializeFromResultSet(rawData)) { + return nullptr; + } + return result; +} + +TString ISnapshotParser::GetComponentId() const { + auto managers = GetManagers(); + std::vector<TString> ids; + for (auto&& i : managers) { + ids.emplace_back(i->GetTablePath()); + } + return JoinSeq("-", ids); +} + +} diff --git a/ydb/services/metadata/abstract/fetcher.h b/ydb/services/metadata/abstract/fetcher.h new file mode 100644 index 00000000000..8e211fed613 --- /dev/null +++ b/ydb/services/metadata/abstract/fetcher.h @@ -0,0 +1,89 @@ +#pragma once +#include "manager.h" +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/actorid.h> +#include <library/cpp/actors/core/events.h> +#include <library/cpp/actors/core/actor_virtual.h> +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/object_factory/object_factory.h> +#include <ydb/core/base/events.h> +#include <ydb/library/accessor/accessor.h> +#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h> +#include <ydb/public/api/protos/ydb_table.pb.h> +#include <ydb/services/metadata/initializer/common.h> +#include <ydb/services/metadata/manager/common.h> +#include <ydb/services/metadata/manager/table_record.h> +#include <ydb/services/metadata/manager/alter.h> + +namespace NKikimr::NMetadataProvider { + +class ISnapshot; + +class ISnapshotAcceptorController { +public: + using TPtr = std::shared_ptr<ISnapshotAcceptorController>; + virtual ~ISnapshotAcceptorController() = default; + virtual void Enriched(std::shared_ptr<ISnapshot> enrichedSnapshot) = 0; + virtual void EnrichProblem(const TString& errorMessage) = 0; +}; + +class ISnapshot { +private: + YDB_READONLY_DEF(TInstant, Actuality); +protected: + virtual bool DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawData) = 0; + virtual TString DoSerializeToString() const = 0; +public: + using TPtr = std::shared_ptr<ISnapshot>; + ISnapshot(const TInstant actuality) + : Actuality(actuality) { + + } + + bool DeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawData) { + return DoDeserializeFromResultSet(rawData); + } + + TString SerializeToString() const { + return DoSerializeToString(); + } + + virtual ~ISnapshot() = default; +}; + +class ISnapshotParser { +private: + mutable std::vector<NMetadata::IOperationsManager::TPtr> Managers; +protected: + virtual ISnapshot::TPtr CreateSnapshot(const TInstant actuality) const = 0; + virtual std::vector<NMetadata::IOperationsManager::TPtr> DoGetManagers() const = 0; +public: + using TPtr = std::shared_ptr<ISnapshotParser>; + + TString GetComponentId() const; + ISnapshot::TPtr ParseSnapshot(const Ydb::Table::ExecuteQueryResult& rawData, const TInstant actuality) const; + + virtual void EnrichSnapshotData(ISnapshot::TPtr original, ISnapshotAcceptorController::TPtr controller) const { + controller->Enriched(original); + } + + const std::vector<NMetadata::IOperationsManager::TPtr>& GetManagers() const { + if (Managers.empty()) { + Managers = DoGetManagers(); + } + return Managers; + } + + virtual ~ISnapshotParser() = default; +}; + +template <class TSnapshot> +class TSnapshotsManager: public ISnapshotParser { +protected: + virtual ISnapshot::TPtr CreateSnapshot(const TInstant actuality) const override { + return std::make_shared<TSnapshot>(actuality); + } +}; + +} diff --git a/ydb/services/metadata/abstract/manager.cpp b/ydb/services/metadata/abstract/manager.cpp new file mode 100644 index 00000000000..849b69a83ef --- /dev/null +++ b/ydb/services/metadata/abstract/manager.cpp @@ -0,0 +1,5 @@ +#include "manager.h" + +namespace NKikimr::NMetadata { + +} diff --git a/ydb/services/metadata/abstract/manager.h b/ydb/services/metadata/abstract/manager.h new file mode 100644 index 00000000000..a9d83521bd0 --- /dev/null +++ b/ydb/services/metadata/abstract/manager.h @@ -0,0 +1,96 @@ +#pragma once +#include "kqp_common.h" +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/actorid.h> +#include <library/cpp/actors/core/events.h> +#include <library/cpp/actors/core/actor_virtual.h> +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/object_factory/object_factory.h> + +#include <ydb/core/base/events.h> +#include <ydb/library/accessor/accessor.h> +#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h> +#include <ydb/public/api/protos/ydb_table.pb.h> +#include <ydb/services/metadata/initializer/common.h> +#include <ydb/services/metadata/manager/common.h> +#include <ydb/services/metadata/manager/table_record.h> + +namespace NKikimr::NMetadata { + +class TOperationParsingResult { +private: + YDB_READONLY_FLAG(Success, false); + YDB_READONLY_DEF(TString, ErrorMessage); + YDB_READONLY_DEF(NMetadataManager::TTableRecord, Record); +public: + TOperationParsingResult(const char* errorMessage) + : SuccessFlag(false) + , ErrorMessage(errorMessage) { + + } + + TOperationParsingResult(const TString& errorMessage) + : SuccessFlag(false) + , ErrorMessage(errorMessage) { + + } + + TOperationParsingResult(NMetadataManager::TTableRecord&& record) + : SuccessFlag(true) + , Record(record) { + + } +}; + +class IInitializationBehaviour { +protected: + virtual void DoPrepare(NMetadataInitializer::IInitializerInput::TPtr controller) const = 0; +public: + using TPtr = std::shared_ptr<IInitializationBehaviour>; + virtual ~IInitializationBehaviour() = default; + void Prepare(NMetadataInitializer::IInitializerInput::TPtr controller) const { + return DoPrepare(controller); + } +}; + +class IAlterCommand { +private: + YDB_READONLY_DEF(std::vector<NMetadataManager::TTableRecord>, Records); + YDB_READONLY_DEF(IOperationsManager::TPtr, Manager); + YDB_READONLY_DEF(NMetadataManager::IAlterController::TPtr, Controller); + YDB_READONLY_DEF(IOperationsManager::TModificationContext, Context); +protected: + virtual void DoExecute() const = 0; +public: + using TPtr = std::shared_ptr<IAlterCommand>; + virtual ~IAlterCommand() = default; + + IAlterCommand(const std::vector<NMetadataManager::TTableRecord>& records, + IOperationsManager::TPtr manager, + NMetadataManager::IAlterController::TPtr controller, + const IOperationsManager::TModificationContext& context) + : Records(records) + , Manager(manager) + , Controller(controller) + , Context(context) { + + } + + IAlterCommand(const NMetadataManager::TTableRecord& record, + IOperationsManager::TPtr manager, + NMetadataManager::IAlterController::TPtr controller, + const IOperationsManager::TModificationContext& context) + : Manager(manager) + , Controller(controller) + , Context(context) { + Records.emplace_back(record); + + } + + void Execute() const { + DoExecute(); + } +}; + +} diff --git a/ydb/services/metadata/ds_table/CMakeLists.txt b/ydb/services/metadata/ds_table/CMakeLists.txt index 137a58c0d35..350ba781800 100644 --- a/ydb/services/metadata/ds_table/CMakeLists.txt +++ b/ydb/services/metadata/ds_table/CMakeLists.txt @@ -17,6 +17,7 @@ target_link_libraries(services-metadata-ds_table PUBLIC core-grpc_services-base ydb-core-grpc_services services-metadata-initializer + services-metadata-secret ) target_sources(services-metadata-ds_table PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_refresh.cpp diff --git a/ydb/services/metadata/ds_table/accessor_refresh.cpp b/ydb/services/metadata/ds_table/accessor_refresh.cpp index e89bd66d28f..aaf15d84a5f 100644 --- a/ydb/services/metadata/ds_table/accessor_refresh.cpp +++ b/ydb/services/metadata/ds_table/accessor_refresh.cpp @@ -9,22 +9,20 @@ namespace NKikimr::NMetadataProvider { using namespace NInternal::NRequest; -void TDSAccessorRefresher::Handle(TEvRequestResult<TDialogSelect>::TPtr& ev) { - const TString startString = CurrentSelection.SerializeAsString(); - - auto currentFullReply = ev->Get()->GetResult(); +void TDSAccessorRefresher::Handle(TEvYQLResponse::TPtr& ev) { + auto& currentFullReply = ev->Get()->GetResponse(); Ydb::Table::ExecuteQueryResult qResult; currentFullReply.operation().result().UnpackTo(&qResult); - Y_VERIFY((size_t)qResult.result_sets().size() == SnapshotConstructor->GetTables().size()); - *CurrentSelection.mutable_result_sets() = std::move(*qResult.mutable_result_sets()); - auto parsedSnapshot = SnapshotConstructor->ParseSnapshot(CurrentSelection, RequestedActuality); + Y_VERIFY((size_t)qResult.result_sets().size() == SnapshotConstructor->GetManagers().size()); + *ProposedProto.mutable_result_sets() = std::move(*qResult.mutable_result_sets()); + auto parsedSnapshot = SnapshotConstructor->ParseSnapshot(ProposedProto, RequestedActuality); if (!parsedSnapshot) { ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "cannot parse current snapshot"; } - if (!!parsedSnapshot && CurrentSelection.SerializeAsString() != startString) { - ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "New refresher data: " << CurrentSelection.DebugString(); - SnapshotConstructor->EnrichSnapshotData(parsedSnapshot, GetController()); + if (!!parsedSnapshot && CurrentSelection.SerializeAsString() != ProposedProto.SerializeAsString()) { + ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "New refresher data: " << ProposedProto.DebugString(); + SnapshotConstructor->EnrichSnapshotData(parsedSnapshot, InternalController); } else { Schedule(Config.GetRefreshPeriod(), new TEvRefresh()); } @@ -34,6 +32,7 @@ void TDSAccessorRefresher::Handle(TEvEnrichSnapshotResult::TPtr& ev) { RequestedActuality = TInstant::Zero(); Schedule(Config.GetRefreshPeriod(), new TEvRefresh()); CurrentSnapshot = ev->Get()->GetEnrichedSnapshot(); + *CurrentSelection.mutable_result_sets() = std::move(*ProposedProto.mutable_result_sets()); OnSnapshotModified(); } @@ -43,48 +42,27 @@ void TDSAccessorRefresher::Handle(TEvEnrichSnapshotProblem::TPtr& ev) { ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "enrich problem: " << ev->Get()->GetErrorText(); } -void TDSAccessorRefresher::Handle(TEvRequestResult<TDialogCreateSession>::TPtr& ev) { - Ydb::Table::CreateSessionResponse currentFullReply = ev->Get()->GetResult(); - Ydb::Table::CreateSessionResult session; - currentFullReply.operation().result().UnpackTo(&session); - const TString sessionId = session.session_id(); - Y_VERIFY(sessionId); - Ydb::Table::ExecuteDataQueryRequest request; +void TDSAccessorRefresher::Handle(TEvRefresh::TPtr& /*ev*/) { TStringBuilder sb; - auto& tables = SnapshotConstructor->GetTables(); - Y_VERIFY(tables.size()); - for (auto&& i : tables) { - sb << "SELECT * FROM `" + EscapeC(i) + "`;"; + auto& managers = SnapshotConstructor->GetManagers(); + Y_VERIFY(managers.size()); + for (auto&& i : managers) { + sb << "SELECT * FROM `" + EscapeC(i->GetTablePath()) + "`;"; } - - request.mutable_query()->set_yql_text(sb); - request.set_session_id(sessionId); - request.mutable_tx_control()->mutable_begin_tx()->mutable_snapshot_read_only(); - - Register(new TYDBRequest<TDialogSelect>(std::move(request), SelfId(), Config.GetRequestConfig())); -} - -void TDSAccessorRefresher::Handle(TEvRefresh::TPtr& /*ev*/) { - Register(new TYDBRequest<TDialogCreateSession>(TDialogCreateSession::TRequest(), SelfId(), Config.GetRequestConfig())); -} - -void TDSAccessorRefresher::OnInitialized() { - Sender<TEvRefresh>().SendTo(SelfId()); + Register(new TYQLQuerySessionedActor(sb, Config.GetRequestConfig(), InternalController)); } TDSAccessorRefresher::TDSAccessorRefresher(const TConfig& config, ISnapshotParser::TPtr snapshotConstructor) - : TBase(config.GetRequestConfig()) - , SnapshotConstructor(snapshotConstructor) + : SnapshotConstructor(snapshotConstructor) , Config(config) { } -ISnapshotAcceptorController::TPtr TDSAccessorRefresher::GetController() const { - if (!ControllerImpl) { - ControllerImpl = std::make_shared<TSnapshotAcceptorController>(SelfId()); - } - return ControllerImpl; +void TDSAccessorRefresher::Bootstrap() { + RegisterState(); + InternalController = std::make_shared<TRefreshInternalController>(SelfId()); + Sender<TEvRefresh>().SendTo(SelfId()); } } diff --git a/ydb/services/metadata/ds_table/accessor_refresh.h b/ydb/services/metadata/ds_table/accessor_refresh.h index 920afdbcfb1..11ac1984bc0 100644 --- a/ydb/services/metadata/ds_table/accessor_refresh.h +++ b/ydb/services/metadata/ds_table/accessor_refresh.h @@ -13,6 +13,17 @@ class TEvRefresh: public NActors::TEventLocal<TEvRefresh, EEvSubscribe::EvRefres public: }; +class TEvYQLResponse: public NActors::TEventLocal<TEvYQLResponse, EEvSubscribe::EvYQLResponse> { +private: + YDB_READONLY_DEF(NInternal::NRequest::TDialogYQLRequest::TResponse, Response); +public: + TEvYQLResponse(const NInternal::NRequest::TDialogYQLRequest::TResponse& r) + : Response(r) + { + + } +}; + class TEvEnrichSnapshotResult: public NActors::TEventLocal<TEvEnrichSnapshotResult, EEvSubscribe::EvEnrichSnapshotResult> { private: YDB_READONLY_DEF(ISnapshot::TPtr, EnrichedSnapshot); @@ -33,11 +44,11 @@ public: } }; -class TSnapshotAcceptorController: public ISnapshotAcceptorController { +class TRefreshInternalController: public ISnapshotAcceptorController, public NInternal::NRequest::IQueryOutput { private: const TActorIdentity ActorId; public: - TSnapshotAcceptorController(const TActorIdentity& actorId) + TRefreshInternalController(const TActorIdentity& actorId) : ActorId(actorId) { } @@ -49,39 +60,41 @@ public: virtual void Enriched(ISnapshot::TPtr enrichedSnapshot) override { ActorId.Send(ActorId, new TEvEnrichSnapshotResult(enrichedSnapshot)); } + + virtual void OnReply(const NInternal::NRequest::TDialogYQLRequest::TResponse& response) override { + ActorId.Send(ActorId, new TEvYQLResponse(response)); + } }; -class TDSAccessorRefresher: public NMetadataInitializer::TDSAccessorInitialized { +class TDSAccessorRefresher: public NActors::TActorBootstrapped<TDSAccessorRefresher> { private: - using TBase = NMetadataInitializer::TDSAccessorInitialized; + using TBase = NActors::TActorBootstrapped<TDSAccessorRefresher>; ISnapshotParser::TPtr SnapshotConstructor; + std::shared_ptr<TRefreshInternalController> InternalController; YDB_READONLY_DEF(ISnapshot::TPtr, CurrentSnapshot); YDB_READONLY_DEF(Ydb::Table::ExecuteQueryResult, CurrentSelection); + Ydb::Table::ExecuteQueryResult ProposedProto; TInstant RequestedActuality = TInstant::Zero(); const TConfig Config; - - mutable ISnapshotAcceptorController::TPtr ControllerImpl; - - ISnapshotAcceptorController::TPtr GetController() const; protected: - virtual void Prepare(NMetadataInitializer::IController::TPtr controller) override { - SnapshotConstructor->Prepare(controller); + virtual void RegisterState() { + Become(&TDSAccessorRefresher::StateMain); } bool IsReady() const { return !!CurrentSnapshot; } - virtual void OnInitialized() override; virtual void OnSnapshotModified() = 0; public: - STFUNC(StateMain) { + void Bootstrap(); + + STATEFN(StateMain) { switch (ev->GetTypeRewrite()) { - hFunc(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogSelect>, Handle); - hFunc(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogCreateSession>, Handle); + hFunc(TEvYQLResponse, Handle); hFunc(TEvRefresh, Handle); hFunc(TEvEnrichSnapshotResult, Handle); hFunc(TEvEnrichSnapshotProblem, Handle); default: - TBase::StateMain(ev, ctx); + break; } } @@ -89,8 +102,7 @@ public: void Handle(TEvEnrichSnapshotResult::TPtr& ev); void Handle(TEvEnrichSnapshotProblem::TPtr& ev); - void Handle(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogSelect>::TPtr& ev); - void Handle(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogCreateSession>::TPtr& ev); + void Handle(TEvYQLResponse::TPtr& ev); void Handle(TEvRefresh::TPtr& ev); }; diff --git a/ydb/services/metadata/ds_table/config.h b/ydb/services/metadata/ds_table/config.h index 228b3e637b6..f4344506a40 100644 --- a/ydb/services/metadata/ds_table/config.h +++ b/ydb/services/metadata/ds_table/config.h @@ -10,7 +10,7 @@ class TConfig { private: YDB_READONLY_DEF(NInternal::NRequest::TConfig, RequestConfig); YDB_READONLY(TDuration, RefreshPeriod, TDuration::Seconds(10)); - YDB_READONLY_FLAG(Enabled, true); + YDB_READONLY_FLAG(Enabled, false); public: TConfig() = default; diff --git a/ydb/services/metadata/ds_table/service.cpp b/ydb/services/metadata/ds_table/service.cpp index d75d258450b..c698cb407dd 100644 --- a/ydb/services/metadata/ds_table/service.cpp +++ b/ydb/services/metadata/ds_table/service.cpp @@ -14,20 +14,118 @@ IActor* CreateService(const TConfig& config) { return new TService(config); } +void TService::PrepareManagers(std::vector<NMetadata::IOperationsManager::TPtr> managers, TAutoPtr<IEventBase> ev, const NActors::TActorId& sender) { + TManagersId id(managers); + const bool isInitialization = (managers.size() == 1) && managers.front()->GetTypeId() == NMetadataInitializer::TManager::GetTypeIdStatic(); + if (ActiveFlag || (PreparationFlag && isInitialization)) { + for (auto&& manager : managers) { + Y_VERIFY(!RegisteredManagers.contains(manager->GetTypeId())); + if (!ManagersInRegistration.contains(manager->GetTypeId()) && !RegisteredManagers.contains(manager->GetTypeId())) { + ManagersInRegistration.emplace(manager->GetTypeId(), manager); + Register(new NMetadataInitializer::TDSAccessorInitialized(Config.GetRequestConfig(), + manager->GetTypeId(), manager->GetInitializationBehaviour(), InternalController, InitializationSnapshot)); + } + } + } else if (!PreparationFlag) { + PreparationFlag = true; + InitializationFetcher = std::make_shared<NMetadataInitializer::TFetcher>(); + Send(SelfId(), new TEvSubscribeExternal(InitializationFetcher)); + } + EventsWaiting[id].emplace_back(ev, sender); +} + +void TService::Handle(NMetadataInitializer::TEvInitializationFinished::TPtr& ev) { + const TString& initId = ev->Get()->GetInitializationId(); + + auto it = ManagersInRegistration.find(initId); + Y_VERIFY(it != ManagersInRegistration.end()); + RegisteredManagers.emplace(initId, it->second); + ManagersInRegistration.erase(it); + + std::map<TManagersId, std::deque<TWaitEvent>> movedEvents; + for (auto it = EventsWaiting.begin(); it != EventsWaiting.end(); ) { + auto m = it->first; + if (!m.RemoveId(initId)) { + ++it; + continue; + } + if (m.IsEmpty()) { + for (auto&& i : it->second) { + i.Resend(SelfId()); + } + } else { + auto itNext = EventsWaiting.find(m); + if (itNext == EventsWaiting.end()) { + movedEvents.emplace(m, std::move(it->second)); + } else { + for (auto&& i : it->second) { + itNext->second.emplace_back(std::move(i)); + } + } + } + it = EventsWaiting.erase(it); + } + for (auto&& i : movedEvents) { + EventsWaiting.emplace(i.first, std::move(i.second)); + } +} + void TService::Handle(TEvSubscribeExternal::TPtr& ev) { - auto it = Accessors.find(ev->Get()->GetSnapshotParser()->GetSnapshotId()); - if (it == Accessors.end()) { - THolder<TExternalData> actor = MakeHolder<TExternalData>(Config, ev->Get()->GetSnapshotParser()); - it = Accessors.emplace(ev->Get()->GetSnapshotParser()->GetSnapshotId(), Register(actor.Release())).first; + std::vector<NMetadata::IOperationsManager::TPtr> needManagers; + for (auto&& i : ev->Get()->GetFetcher()->GetManagers()) { + if (!RegisteredManagers.contains(i->GetTypeId())) { + needManagers.emplace_back(i); + } + } + if (needManagers.empty()) { + auto it = Accessors.find(ev->Get()->GetFetcher()->GetComponentId()); + if (it == Accessors.end()) { + THolder<TExternalData> actor = MakeHolder<TExternalData>(Config, ev->Get()->GetFetcher()); + it = Accessors.emplace(ev->Get()->GetFetcher()->GetComponentId(), Register(actor.Release())).first; + } + Send<TEvSubscribe>(it->second, ev->Sender); + } else { + PrepareManagers(needManagers, ev->ReleaseBase(), ev->Sender); + } +} + +void TService::Handle(TEvAlterObjects::TPtr& ev) { + auto it = RegisteredManagers.find(ev->Get()->GetCommand()->GetManager()->GetTypeId()); + if (it != RegisteredManagers.end()) { + ev->Get()->GetCommand()->Execute(); + } else { + auto m = ev->Get()->GetCommand()->GetManager(); + PrepareManagers({ m }, ev->ReleaseBase(), ev->Sender); } - Send<TEvSubscribe>(it->second, ev->Sender); } void TService::Handle(TEvUnsubscribeExternal::TPtr& ev) { - auto it = Accessors.find(ev->Get()->GetSnapshotParser()->GetSnapshotId()); + auto it = Accessors.find(ev->Get()->GetFetcher()->GetComponentId()); if (it != Accessors.end()) { Send<TEvUnsubscribe>(it->second, ev->Sender); } } +void TService::Handle(TEvRefreshSubscriberData::TPtr& ev) { + auto s = ev->Get()->GetSnapshot(); + InitializationSnapshot = dynamic_pointer_cast<NMetadataInitializer::TSnapshot>(s); + Y_VERIFY(InitializationSnapshot); + if (!ActiveFlag) { + ActiveFlag = true; + for (auto&& i : EventsWaiting) { + i.second.front().Resend(SelfId()); + i.second.pop_front(); + } + } +} + +void TService::Bootstrap(const NActors::TActorContext& /*ctx*/) { + Become(&TService::StateMain); + InternalController = std::make_shared<TServiceInternalController>(SelfId()); +} + +void TServiceInternalController::InitializationFinished(const TString& id) const { + ActorId.Send(ActorId, new NMetadataInitializer::TEvInitializationFinished(id)); +} + } diff --git a/ydb/services/metadata/ds_table/service.h b/ydb/services/metadata/ds_table/service.h index e55d90ac914..7b52685c945 100644 --- a/ydb/services/metadata/ds_table/service.h +++ b/ydb/services/metadata/ds_table/service.h @@ -2,27 +2,125 @@ #include "config.h" #include <ydb/services/metadata/service.h> +#include <ydb/services/metadata/initializer/common.h> +#include <ydb/services/metadata/initializer/events.h> +#include <ydb/services/metadata/initializer/manager.h> +#include <ydb/services/metadata/initializer/snapshot.h> +#include <ydb/services/metadata/initializer/fetcher.h> + #include <library/cpp/actors/core/hfunc.h> namespace NKikimr::NMetadataProvider { +class TServiceInternalController: public NMetadataInitializer::IInitializerOutput { +private: + const NActors::TActorIdentity ActorId; +public: + TServiceInternalController(const NActors::TActorIdentity& actorId) + : ActorId(actorId) + { + + } + + virtual void InitializationFinished(const TString& id) const override; +}; + +class TManagersId { +private: + YDB_READONLY_DEF(std::set<TString>, ManagerIds); +public: + TManagersId(const std::vector<NMetadata::IOperationsManager::TPtr>& managers) { + for (auto&& i : managers) { + ManagerIds.emplace(i->GetTypeId()); + } + } + + bool IsEmpty() const { + return ManagerIds.empty(); + } + + bool RemoveId(const TString& id) { + auto it = ManagerIds.find(id); + if (it == ManagerIds.end()) { + return false; + } + ManagerIds.erase(it); + return true; + } + + bool operator<(const TManagersId& item) const { + if (ManagerIds.size() < item.ManagerIds.size()) { + return true; + } else if (ManagerIds.size() > item.ManagerIds.size()) { + return false; + } else { + auto itSelf = ManagerIds.begin(); + auto itItem = item.ManagerIds.begin(); + while (itSelf != ManagerIds.end()) { + if (*itSelf < *itItem) { + return true; + } + ++itSelf; + ++itItem; + } + return false; + } + } +}; + +class TWaitEvent { +private: + TAutoPtr<IEventBase> Event; + const TActorId Sender; +public: + TWaitEvent(TAutoPtr<IEventBase> ev, const TActorId& sender) + : Event(ev) + , Sender(sender) + { + + } + + void Resend(const TActorIdentity& receiver) { + TActivationContext::Send(new IEventHandle(receiver, Sender, Event.Release())); + } +}; + +class TWaitingWatcher { +private: +}; + class TService: public NActors::TActorBootstrapped<TService> { private: using TBase = NActors::TActor<TService>; + bool ActiveFlag = false; + bool PreparationFlag = false; std::map<TString, NActors::TActorId> Accessors; + std::map<TManagersId, std::deque<TWaitEvent>> EventsWaiting; + std::map<TString, NMetadata::IOperationsManager::TPtr> ManagersInRegistration; + std::map<TString, NMetadata::IOperationsManager::TPtr> RegisteredManagers; + + std::shared_ptr<NMetadataInitializer::TFetcher> InitializationFetcher; + std::shared_ptr<NMetadataInitializer::TSnapshot> InitializationSnapshot; + std::shared_ptr<TServiceInternalController> InternalController; const TConfig Config; -public: + + void Handle(NMetadataInitializer::TEvInitializationFinished::TPtr& ev); + void Handle(TEvRefreshSubscriberData::TPtr& ev); void Handle(TEvSubscribeExternal::TPtr& ev); void Handle(TEvUnsubscribeExternal::TPtr& ev); + void Handle(TEvAlterObjects::TPtr& ev); + void PrepareManagers(std::vector<NMetadata::IOperationsManager::TPtr> manager, TAutoPtr<IEventBase> ev, const NActors::TActorId& sender); +public: - void Bootstrap(const NActors::TActorContext& /*ctx*/) { - Become(&TThis::StateMain); - } + void Bootstrap(const NActors::TActorContext& /*ctx*/); STATEFN(StateMain) { switch (ev->GetTypeRewrite()) { + hFunc(TEvAlterObjects, Handle); + hFunc(TEvRefreshSubscriberData, Handle); hFunc(TEvSubscribeExternal, Handle); hFunc(TEvUnsubscribeExternal, Handle); + hFunc(NMetadataInitializer::TEvInitializationFinished, Handle); default: Y_VERIFY(false); } @@ -31,6 +129,7 @@ public: TService(const TConfig& config) : Config(config) { + TServiceOperator::Register(); } }; diff --git a/ydb/services/metadata/initializer/CMakeLists.txt b/ydb/services/metadata/initializer/CMakeLists.txt index d3b624ff7c1..7abe0ee5bfd 100644 --- a/ydb/services/metadata/initializer/CMakeLists.txt +++ b/ydb/services/metadata/initializer/CMakeLists.txt @@ -23,4 +23,9 @@ target_sources(services-metadata-initializer PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/metadata/initializer/common.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/initializer/controller.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/initializer/events.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/initializer/manager.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/initializer/object.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/initializer/snapshot.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/initializer/initializer.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/initializer/fetcher.cpp ) diff --git a/ydb/services/metadata/initializer/accessor_init.cpp b/ydb/services/metadata/initializer/accessor_init.cpp index 9f03393e60b..ad1dcff2260 100644 --- a/ydb/services/metadata/initializer/accessor_init.cpp +++ b/ydb/services/metadata/initializer/accessor_init.cpp @@ -1,23 +1,32 @@ #include "accessor_init.h" #include "controller.h" +#include "manager.h" #include <ydb/core/grpc_services/local_rpc/local_rpc.h> +#include <ydb/services/metadata/abstract/common.h> +#include <ydb/services/metadata/manager/alter.h> +#include <ydb/services/metadata/service.h> + namespace NKikimr::NMetadataInitializer { void TDSAccessorInitialized::Bootstrap() { - RegisterState(); - Controller = std::make_shared<TInitializerController>(SelfId()); + Become(&TDSAccessorInitialized::StateMain); + InternalController = std::make_shared<TInitializerInput>(SelfId()); Send(SelfId(), new TEvInitializerPreparationStart); } void TDSAccessorInitialized::Handle(TEvInitializerPreparationStart::TPtr& /*ev*/) { - Prepare(Controller); + InitializationBehaviour->Prepare(InternalController); } void TDSAccessorInitialized::Handle(TEvInitializerPreparationFinished::TPtr& ev) { auto modifiers = ev->Get()->GetModifiers(); for (auto&& i : modifiers) { + TDBInitializationKey key(ComponentId, i->GetModificationId()); + if (InitializationSnapshot && InitializationSnapshot->GetObjects().contains(key)) { + continue; + } Modifiers.emplace_back(i); } if (Modifiers.size()) { @@ -25,7 +34,7 @@ void TDSAccessorInitialized::Handle(TEvInitializerPreparationFinished::TPtr& ev) Modifiers.front()->Execute(SelfId(), Config); } else { ALS_INFO(NKikimrServices::METADATA_INITIALIZER) << "initialization finished"; - OnInitialized(); + ExternalController->InitializationFinished(ComponentId); } } @@ -34,22 +43,48 @@ void TDSAccessorInitialized::Handle(TEvInitializerPreparationProblem::TPtr& ev) Schedule(TDuration::Seconds(1), new TEvInitializerPreparationStart); } -void TDSAccessorInitialized::Handle(NInternal::NRequest::TEvRequestFinished::TPtr& /*ev*/) { - ALS_INFO(NKikimrServices::METADATA_INITIALIZER) << "modifiers count: " << Modifiers.size(); - if (Modifiers.empty()) { - return; - } +void TDSAccessorInitialized::DoNextModifier() { Modifiers.pop_front(); if (Modifiers.size()) { Modifiers.front()->Execute(SelfId(), Config); } else { ALS_INFO(NKikimrServices::METADATA_INITIALIZER) << "initialization finished"; - OnInitialized(); + ExternalController->InitializationFinished(ComponentId); + } +} + +void TDSAccessorInitialized::Handle(NInternal::NRequest::TEvRequestFinished::TPtr& /*ev*/) { + ALS_INFO(NKikimrServices::METADATA_INITIALIZER) << "modifiers count: " << Modifiers.size(); + Y_VERIFY(Modifiers.size()); + if (NMetadataProvider::TServiceOperator::IsEnabled() && InitializationSnapshot) { + TDBInitialization dbInit(ComponentId, Modifiers.front()->GetModificationId()); + auto manager = std::make_shared<NMetadataInitializer::TManager>(); + auto alterCommand = std::make_shared<NMetadataManager::TAlterCommand<TDBInitialization>>( + dbInit.SerializeToRecord(), manager, InternalController, NMetadata::IOperationsManager::TModificationContext()); + Sender<NMetadataProvider::TEvAlterObjects>(alterCommand) + .SendTo(NMetadataProvider::MakeServiceId(SelfId().NodeId())); + } else { + DoNextModifier(); } } -TDSAccessorInitialized::TDSAccessorInitialized(const NInternal::NRequest::TConfig& config) +void TDSAccessorInitialized::Handle(NMetadataManager::TEvModificationFinished::TPtr& /*ev*/) { + DoNextModifier(); +} + +void TDSAccessorInitialized::Handle(NMetadataManager::TEvModificationProblem::TPtr& /*ev*/) { + Schedule(TDuration::Seconds(1), new NInternal::NRequest::TEvRequestFinished); +} + +TDSAccessorInitialized::TDSAccessorInitialized(const NInternal::NRequest::TConfig& config, + const TString& componentId, + NMetadata::IInitializationBehaviour::TPtr initializationBehaviour, + IInitializerOutput::TPtr controller, std::shared_ptr<NMetadataInitializer::TSnapshot> initializationSnapshot) : Config(config) + , InitializationBehaviour(initializationBehaviour) + , ExternalController(controller) + , InitializationSnapshot(initializationSnapshot) + , ComponentId(componentId) { } diff --git a/ydb/services/metadata/initializer/accessor_init.h b/ydb/services/metadata/initializer/accessor_init.h index c558e05df32..f6fd51ab847 100644 --- a/ydb/services/metadata/initializer/accessor_init.h +++ b/ydb/services/metadata/initializer/accessor_init.h @@ -1,7 +1,10 @@ #pragma once #include "common.h" +#include "controller.h" #include "events.h" +#include "snapshot.h" +#include <ydb/services/metadata/abstract/common.h> #include <ydb/services/metadata/ds_table/config.h> #include <library/cpp/actors/core/actor_bootstrapped.h> @@ -15,23 +18,28 @@ class TDSAccessorInitialized: public NActors::TActorBootstrapped<TDSAccessorInit private: TDeque<ITableModifier::TPtr> Modifiers; const NInternal::NRequest::TConfig Config; - IController::TPtr Controller; - + NMetadata::IInitializationBehaviour::TPtr InitializationBehaviour; + IInitializerOutput::TPtr ExternalController; + TInitializerInput::TPtr InternalController; + std::shared_ptr<NMetadataInitializer::TSnapshot> InitializationSnapshot; + const TString ComponentId; void Handle(TEvInitializerPreparationStart::TPtr& ev); void Handle(TEvInitializerPreparationFinished::TPtr& ev); void Handle(TEvInitializerPreparationProblem::TPtr& ev); void Handle(NInternal::NRequest::TEvRequestFinished::TPtr& ev); -protected: - virtual void RegisterState() = 0; - virtual void OnInitialized() = 0; - virtual void Prepare(IController::TPtr controller) = 0; + void Handle(NMetadataManager::TEvModificationFinished::TPtr& ev); + void Handle(NMetadataManager::TEvModificationProblem::TPtr& ev); + void DoNextModifier(); public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::METADATA_INITIALIZER; } void Bootstrap(); - TDSAccessorInitialized(const NInternal::NRequest::TConfig& config); + TDSAccessorInitialized(const NInternal::NRequest::TConfig& config, + const TString& componentId, + NMetadata::IInitializationBehaviour::TPtr initializationBehaviour, + IInitializerOutput::TPtr controller, std::shared_ptr<NMetadataInitializer::TSnapshot> initializationSnapshot); STATEFN(StateMain) { switch (ev->GetTypeRewrite()) { @@ -39,6 +47,8 @@ public: hFunc(TEvInitializerPreparationStart, Handle); hFunc(TEvInitializerPreparationFinished, Handle); hFunc(TEvInitializerPreparationProblem, Handle); + hFunc(NMetadataManager::TEvModificationFinished, Handle); + hFunc(NMetadataManager::TEvModificationProblem, Handle); default: break; } diff --git a/ydb/services/metadata/initializer/common.h b/ydb/services/metadata/initializer/common.h index 40e8e79ee80..ae693fc5cb0 100644 --- a/ydb/services/metadata/initializer/common.h +++ b/ydb/services/metadata/initializer/common.h @@ -5,11 +5,20 @@ namespace NKikimr::NMetadataInitializer { class ITableModifier { +private: + YDB_READONLY_DEF(TString, ModificationId); protected: virtual bool DoExecute(const TActorId& resultCallbackId, const NInternal::NRequest::TConfig& config) const = 0; public: using TPtr = std::shared_ptr<ITableModifier>; virtual ~ITableModifier() = default; + + ITableModifier(const TString& modificationId) + : ModificationId(modificationId) + { + + } + bool Execute(const TActorId& resultCallbackId, const NInternal::NRequest::TConfig& config) const { return DoExecute(resultCallbackId, config); } @@ -18,6 +27,7 @@ public: template <class TDialogPolicy> class TGenericTableModifier: public ITableModifier { private: + using TBase = ITableModifier; YDB_READONLY_DEF(typename TDialogPolicy::TRequest, Request); protected: virtual bool DoExecute(const TActorId& resultCallbackId, const NInternal::NRequest::TConfig& config) const override { @@ -25,18 +35,27 @@ protected: return true; } public: - TGenericTableModifier(typename TDialogPolicy::TRequest& request) - : Request(request) { + TGenericTableModifier(typename TDialogPolicy::TRequest& request, const TString& modificationId) + : TBase(modificationId) + , Request(request) + { } }; -class IController { +class IInitializerInput { public: - using TPtr = std::shared_ptr<IController>; + using TPtr = std::shared_ptr<IInitializerInput>; virtual void PreparationFinished(const TVector<ITableModifier::TPtr>& modifiers) const = 0; virtual void PreparationProblem(const TString& errorMessage) const = 0; - virtual ~IController() = default; + virtual ~IInitializerInput() = default; +}; + +class IInitializerOutput { +public: + using TPtr = std::shared_ptr<IInitializerOutput>; + virtual void InitializationFinished(const TString& id) const = 0; + virtual ~IInitializerOutput() = default; }; } diff --git a/ydb/services/metadata/initializer/controller.cpp b/ydb/services/metadata/initializer/controller.cpp index 1481e547336..73aa8a0e748 100644 --- a/ydb/services/metadata/initializer/controller.cpp +++ b/ydb/services/metadata/initializer/controller.cpp @@ -1,14 +1,28 @@ #include "controller.h" #include "events.h" +#include <ydb/services/metadata/manager/modification_controller.h> + namespace NKikimr::NMetadataInitializer { -void TInitializerController::PreparationFinished(const TVector<ITableModifier::TPtr>& modifiers) const { +void TInitializerInput::PreparationFinished(const TVector<ITableModifier::TPtr>& modifiers) const { ActorId.Send(ActorId, new TEvInitializerPreparationFinished(modifiers)); } -void TInitializerController::PreparationProblem(const TString& errorMessage) const { +void TInitializerInput::PreparationProblem(const TString& errorMessage) const { ActorId.Send(ActorId, new TEvInitializerPreparationProblem(errorMessage)); } +void TInitializerInput::AlterProblem(const TString& errorMessage) { + ActorId.Send(ActorId, new NMetadataManager::TEvModificationProblem(errorMessage)); +} + +void TInitializerInput::AlterFinished() { + ActorId.Send(ActorId, new NMetadataManager::TEvModificationFinished()); +} + +void TInitializerOutput::InitializationFinished(const TString& id) const { + ActorId.Send(ActorId, new TEvInitializationFinished(id)); +} + } diff --git a/ydb/services/metadata/initializer/controller.h b/ydb/services/metadata/initializer/controller.h index 2d2c2617e8f..63b2dd65ac9 100644 --- a/ydb/services/metadata/initializer/controller.h +++ b/ydb/services/metadata/initializer/controller.h @@ -1,19 +1,36 @@ #pragma once #include "common.h" +#include <ydb/services/metadata/manager/common.h> + namespace NKikimr::NMetadataInitializer { -class TInitializerController: public IController { +class TInitializerInput: public IInitializerInput, public NMetadataManager::IAlterController { private: const TActorIdentity ActorId; public: - TInitializerController(const TActorIdentity& actorId) + using TPtr = std::shared_ptr<TInitializerInput>; + TInitializerInput(const TActorIdentity& actorId) : ActorId(actorId) { } + virtual void AlterProblem(const TString& errorMessage) override; + virtual void AlterFinished() override; virtual void PreparationFinished(const TVector<ITableModifier::TPtr>& modifiers) const override; virtual void PreparationProblem(const TString& errorMessage) const override; }; +class TInitializerOutput: public IInitializerOutput { +private: + const TActorIdentity ActorId; +public: + TInitializerOutput(const TActorIdentity& actorId) + : ActorId(actorId) { + + } + + virtual void InitializationFinished(const TString& id) const override; +}; + } diff --git a/ydb/services/metadata/initializer/events.h b/ydb/services/metadata/initializer/events.h index 5fe9b136835..e20967d27c5 100644 --- a/ydb/services/metadata/initializer/events.h +++ b/ydb/services/metadata/initializer/events.h @@ -10,20 +10,21 @@ namespace NKikimr::NMetadataInitializer { -enum EInitializerEvents { +enum EEvents { EvInitializerPreparationStart = EventSpaceBegin(TKikimrEvents::ES_METADATA_INITIALIZER), EvInitializerPreparationFinished, EvInitializerPreparationProblem, + EvInitializationFinished, EvEnd }; -static_assert(EInitializerEvents::EvEnd < EventSpaceEnd(TKikimrEvents::ES_METADATA_INITIALIZER), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_METADATA_INITIALIZER)"); +static_assert(EEvents::EvEnd < EventSpaceEnd(TKikimrEvents::ES_METADATA_INITIALIZER), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_METADATA_INITIALIZER)"); -class TEvInitializerPreparationStart: public TEventLocal<TEvInitializerPreparationStart, EInitializerEvents::EvInitializerPreparationStart> { +class TEvInitializerPreparationStart: public TEventLocal<TEvInitializerPreparationStart, EEvents::EvInitializerPreparationStart> { public: }; -class TEvInitializerPreparationFinished: public TEventLocal<TEvInitializerPreparationFinished, EInitializerEvents::EvInitializerPreparationFinished> { +class TEvInitializerPreparationFinished: public TEventLocal<TEvInitializerPreparationFinished, EEvents::EvInitializerPreparationFinished> { private: YDB_READONLY_DEF(TVector<ITableModifier::TPtr>, Modifiers); public: @@ -33,7 +34,7 @@ public: } }; -class TEvInitializerPreparationProblem: public TEventLocal<TEvInitializerPreparationProblem, EInitializerEvents::EvInitializerPreparationProblem> { +class TEvInitializerPreparationProblem: public TEventLocal<TEvInitializerPreparationProblem, EEvents::EvInitializerPreparationProblem> { private: YDB_READONLY_DEF(TString, ErrorMessage); public: @@ -43,4 +44,14 @@ public: } }; +class TEvInitializationFinished: public TEventLocal<TEvInitializationFinished, EEvents::EvInitializationFinished> { +private: + YDB_READONLY_DEF(TString, InitializationId); +public: + TEvInitializationFinished(const TString& initializationId) + : InitializationId(initializationId) { + + } +}; + } diff --git a/ydb/services/metadata/initializer/fetcher.cpp b/ydb/services/metadata/initializer/fetcher.cpp new file mode 100644 index 00000000000..ac3e6fc55d2 --- /dev/null +++ b/ydb/services/metadata/initializer/fetcher.cpp @@ -0,0 +1,10 @@ +#include "fetcher.h" +#include "manager.h" + +namespace NKikimr::NMetadataInitializer { + +std::vector<NMetadata::IOperationsManager::TPtr> TFetcher::DoGetManagers() const { + return { std::make_shared<TManager>() }; +} + +} diff --git a/ydb/services/metadata/initializer/fetcher.h b/ydb/services/metadata/initializer/fetcher.h new file mode 100644 index 00000000000..ac59e635e6c --- /dev/null +++ b/ydb/services/metadata/initializer/fetcher.h @@ -0,0 +1,16 @@ +#pragma once + +#include "snapshot.h" +#include <ydb/services/metadata/abstract/common.h> +#include <ydb/services/metadata/abstract/manager.h> +#include <ydb/library/accessor/accessor.h> + +namespace NKikimr::NMetadataInitializer { + +class TFetcher: public NMetadataProvider::TSnapshotsManager<TSnapshot> { +protected: + virtual std::vector<NMetadata::IOperationsManager::TPtr> DoGetManagers() const override; +public: +}; + +} diff --git a/ydb/services/metadata/initializer/initializer.cpp b/ydb/services/metadata/initializer/initializer.cpp new file mode 100644 index 00000000000..b0503002768 --- /dev/null +++ b/ydb/services/metadata/initializer/initializer.cpp @@ -0,0 +1,33 @@ +#include "initializer.h" + +namespace NKikimr::NMetadataInitializer { + +void TInitializer::DoPrepare(NMetadataInitializer::IInitializerInput::TPtr controller) const { + TVector<NMetadataInitializer::ITableModifier::TPtr> result; + { + Ydb::Table::CreateTableRequest request; + request.set_session_id(""); + request.set_path(TDBInitialization::GetStorageTablePath()); + request.add_primary_key(TDBInitialization::TDecoder::ComponentId); + request.add_primary_key(TDBInitialization::TDecoder::ModificationId); + { + auto& column = *request.add_columns(); + column.set_name(TDBInitialization::TDecoder::ComponentId); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *request.add_columns(); + column.set_name(TDBInitialization::TDecoder::ModificationId); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *request.add_columns(); + column.set_name(TDBInitialization::TDecoder::Instant); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT32); + } + result.emplace_back(new NMetadataInitializer::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request, "create")); + } + controller->PreparationFinished(result); +} + +} diff --git a/ydb/services/metadata/initializer/initializer.h b/ydb/services/metadata/initializer/initializer.h new file mode 100644 index 00000000000..e6a882eb3f7 --- /dev/null +++ b/ydb/services/metadata/initializer/initializer.h @@ -0,0 +1,15 @@ +#pragma once + +#include "snapshot.h" +#include <ydb/services/metadata/abstract/common.h> +#include <ydb/library/accessor/accessor.h> + +namespace NKikimr::NMetadataInitializer { + +class TInitializer: public NMetadata::IInitializationBehaviour { +protected: + virtual void DoPrepare(NMetadataInitializer::IInitializerInput::TPtr controller) const override; +public: +}; + +} diff --git a/ydb/services/metadata/initializer/manager.cpp b/ydb/services/metadata/initializer/manager.cpp new file mode 100644 index 00000000000..3a61d8b089f --- /dev/null +++ b/ydb/services/metadata/initializer/manager.cpp @@ -0,0 +1,12 @@ +#include "manager.h" +#include "initializer.h" + +namespace NKikimr::NMetadataInitializer { + +TManager::TFactory::TRegistrator<TManager> TManager::Registrator(TManager::GetTypeIdStatic()); + +NMetadata::IInitializationBehaviour::TPtr TManager::DoGetInitializationBehaviour() const { + return std::make_shared<TInitializer>(); +} + +} diff --git a/ydb/services/metadata/initializer/manager.h b/ydb/services/metadata/initializer/manager.h new file mode 100644 index 00000000000..74f8d743c06 --- /dev/null +++ b/ydb/services/metadata/initializer/manager.h @@ -0,0 +1,19 @@ +#pragma once + +#include "object.h" + +#include <ydb/services/metadata/abstract/common.h> +#include <ydb/services/metadata/manager/generic_manager.h> +#include <ydb/library/accessor/accessor.h> + +namespace NKikimr::NMetadataInitializer { + +class TManager: public NMetadata::TGenericOperationsManager<TDBInitialization> { +private: + static TFactory::TRegistrator<TManager> Registrator; +protected: + virtual NMetadata::IInitializationBehaviour::TPtr DoGetInitializationBehaviour() const override; +public: +}; + +} diff --git a/ydb/services/metadata/initializer/object.cpp b/ydb/services/metadata/initializer/object.cpp new file mode 100644 index 00000000000..7145bd79d00 --- /dev/null +++ b/ydb/services/metadata/initializer/object.cpp @@ -0,0 +1,57 @@ +#include "object.h" +#include <ydb/core/base/appdata.h> +#include <ydb/services/metadata/manager/ydb_value_operator.h> + +namespace NKikimr::NMetadataInitializer { + +TString TDBInitialization::GetStorageTablePath() { + return "/" + AppData()->TenantName + "/.metadata/initializations"; +} + +bool TDBInitialization::DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& rawValue) { + if (!decoder.Read(decoder.GetComponentIdIdx(), ComponentId, rawValue)) { + return false; + } + if (!decoder.Read(decoder.GetModificationIdIdx(), ModificationId, rawValue)) { + return false; + } + if (!decoder.Read(decoder.GetInstantIdx(), Instant, rawValue)) { + return false; + } + return true; +} + +NKikimr::NMetadataManager::TTableRecord TDBInitialization::SerializeToRecord() const { + NMetadataManager::TTableRecord result; + result.SetColumn(TDecoder::ComponentId, NMetadataManager::TYDBValue::Bytes(ComponentId)); + result.SetColumn(TDecoder::ModificationId, NMetadataManager::TYDBValue::Bytes(ModificationId)); + result.SetColumn(TDecoder::Instant, NMetadataManager::TYDBValue::UInt32(Instant.Seconds())); + return result; +} + +void TDBInitialization::AlteringPreparation(std::vector<TDBInitialization>&& objects, + NMetadataManager::IAlterPreparationController<TDBInitialization>::TPtr controller, + const NMetadata::IOperationsManager::TModificationContext& /*context*/) { + controller->PreparationFinished(std::move(objects)); +} + +std::vector<Ydb::Column> TDBInitialization::TDecoder::GetColumns() { + return { + NMetadataManager::TYDBColumn::Bytes(ComponentId), + NMetadataManager::TYDBColumn::Bytes(ModificationId), + NMetadataManager::TYDBColumn::UInt32(Instant) + }; +} + +std::vector<Ydb::Column> TDBInitialization::TDecoder::GetPKColumns() { + return { + NMetadataManager::TYDBColumn::Bytes(ComponentId), + NMetadataManager::TYDBColumn::Bytes(ModificationId) + }; +} + +std::vector<TString> TDBInitialization::TDecoder::GetPKColumnIds() { + return { ComponentId, ModificationId }; +} + +} diff --git a/ydb/services/metadata/initializer/object.h b/ydb/services/metadata/initializer/object.h new file mode 100644 index 00000000000..2f74c8a7aa0 --- /dev/null +++ b/ydb/services/metadata/initializer/object.h @@ -0,0 +1,76 @@ +#pragma once +#include <ydb/library/accessor/accessor.h> +#include <ydb/core/base/appdata.h> + +#include <ydb/services/metadata/abstract/decoder.h> +#include <ydb/services/metadata/abstract/manager.h> +#include <ydb/services/metadata/manager/object.h> +#include <ydb/services/metadata/manager/preparation_controller.h> + +namespace NKikimr::NMetadataInitializer { + +class TDBInitializationKey { +private: + YDB_READONLY_PROTECT_DEF(TString, ComponentId); + YDB_READONLY_PROTECT_DEF(TString, ModificationId); +public: + TDBInitializationKey() = default; + TDBInitializationKey(const TString& componentId, const TString& modificationId) + : ComponentId(componentId) + , ModificationId(modificationId) + { + } + + bool operator<(const TDBInitializationKey& item) const { + return std::tie(ComponentId, ModificationId) < std::tie(item.ComponentId, item.ModificationId); + } +}; + +class TDBInitialization: public TDBInitializationKey, public NMetadataManager::TObject<TDBInitialization> { +private: + using TBase = TDBInitializationKey; + YDB_READONLY(TInstant, Instant, AppData()->TimeProvider->Now()); +public: + static NMetadata::TOperationParsingResult BuildPatchFromSettings(const NYql::TObjectSettingsImpl& /*settings*/, + const NMetadata::IOperationsManager::TModificationContext& /*context*/) { + NMetadataManager::TTableRecord result; + return result; + } + + class TDecoder: public NInternal::TDecoderBase { + private: + YDB_ACCESSOR(i32, ComponentIdIdx, -1); + YDB_ACCESSOR(i32, ModificationIdIdx, -1); + YDB_ACCESSOR(i32, InstantIdx, -1); + public: + static inline const TString ComponentId = "componentId"; + static inline const TString ModificationId = "modificationId"; + static inline const TString Instant = "instant"; + static std::vector<TString> GetPKColumnIds(); + static std::vector<Ydb::Column> GetPKColumns(); + static std::vector<Ydb::Column> GetColumns(); + + TDecoder(const Ydb::ResultSet& rawData) { + ComponentIdIdx = GetFieldIndex(rawData, ComponentId); + ModificationIdIdx = GetFieldIndex(rawData, ModificationId); + InstantIdx = GetFieldIndex(rawData, Instant); + } + }; + + using TBase::TBase; + + static void AlteringPreparation(std::vector<TDBInitialization>&& objects, + NMetadataManager::IAlterPreparationController<TDBInitialization>::TPtr controller, + const NMetadata::IOperationsManager::TModificationContext& context); + static TString GetStorageTablePath(); + static TString GetStorageHistoryTablePath() { + return ""; + } + bool DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& rawValue); + NKikimr::NMetadataManager::TTableRecord SerializeToRecord() const; + static TString GetTypeId() { + return "INITIALIZATION"; + } +}; + +} diff --git a/ydb/services/metadata/initializer/snapshot.cpp b/ydb/services/metadata/initializer/snapshot.cpp new file mode 100644 index 00000000000..93212da6f50 --- /dev/null +++ b/ydb/services/metadata/initializer/snapshot.cpp @@ -0,0 +1,30 @@ +#include "snapshot.h" + +namespace NKikimr::NMetadataInitializer { + +bool TSnapshot::DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawDataResult) { + Y_VERIFY(rawDataResult.result_sets().size() == 1); + { + auto& rawData = rawDataResult.result_sets()[0]; + TDBInitialization::TDecoder decoder(rawData); + for (auto&& r : rawData.rows()) { + TDBInitialization initObject; + if (!initObject.DeserializeFromRecord(decoder, r)) { + ALS_ERROR(NKikimrServices::METADATA_INITIALIZER) << "cannot parse initialization info for snapshot"; + continue; + } + Objects.emplace(initObject, initObject); + } + } + return true; +} + +TString TSnapshot::DoSerializeToString() const { + TStringBuilder sb; + for (auto&& i : Objects) { + sb << i.first.GetComponentId() << ":" << i.first.GetModificationId() << ";"; + } + return sb; +} + +} diff --git a/ydb/services/metadata/initializer/snapshot.h b/ydb/services/metadata/initializer/snapshot.h new file mode 100644 index 00000000000..73de0083e74 --- /dev/null +++ b/ydb/services/metadata/initializer/snapshot.h @@ -0,0 +1,21 @@ +#pragma once +#include "object.h" + +#include <ydb/services/metadata/abstract/common.h> +#include <ydb/library/accessor/accessor.h> + +namespace NKikimr::NMetadataInitializer { + +class TSnapshot: public NMetadataProvider::ISnapshot { +private: + using TBase = NMetadataProvider::ISnapshot; + using TObjects = std::map<TDBInitializationKey, TDBInitialization>; + YDB_READONLY_DEF(TObjects, Objects); +protected: + virtual bool DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawData) override; + virtual TString DoSerializeToString() const override; +public: + using TBase::TBase; +}; + +} diff --git a/ydb/services/metadata/manager/CMakeLists.txt b/ydb/services/metadata/manager/CMakeLists.txt new file mode 100644 index 00000000000..019c6f158c2 --- /dev/null +++ b/ydb/services/metadata/manager/CMakeLists.txt @@ -0,0 +1,35 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(services-metadata-manager) +target_link_libraries(services-metadata-manager PUBLIC + contrib-libs-cxxsupp + yutil + ydb-library-accessor + cpp-actors-core + api-protos + ydb-core-protos + services-bg_tasks-abstract + services-metadata-initializer + ydb-core-base + services-metadata-request +) +target_sources(services-metadata-manager PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/manager/alter.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/manager/table_record.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/manager/restore.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/manager/modification.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/manager/generic_manager.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/manager/preparation_controller.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/manager/restore_controller.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/manager/common.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/manager/ydb_value_operator.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/manager/modification_controller.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/manager/object.cpp +) diff --git a/ydb/services/metadata/manager/alter.cpp b/ydb/services/metadata/manager/alter.cpp new file mode 100644 index 00000000000..79e9f2d485e --- /dev/null +++ b/ydb/services/metadata/manager/alter.cpp @@ -0,0 +1,5 @@ +#include "alter.h" + +namespace NKikimr::NMetadataManager { + +} diff --git a/ydb/services/metadata/manager/alter.h b/ydb/services/metadata/manager/alter.h new file mode 100644 index 00000000000..086ad56ab42 --- /dev/null +++ b/ydb/services/metadata/manager/alter.h @@ -0,0 +1,384 @@ +#pragma once +#include "modification_controller.h" +#include "preparation_controller.h" +#include "restore.h" +#include "modification.h" + +#include <ydb/services/metadata/abstract/manager.h> + +#include <library/cpp/actors/core/actor_bootstrapped.h> + +namespace NKikimr::NMetadataManager { + +template <class TObject> +class TProcessingController: + public IRestoreObjectsController<TObject>, + public IModificationObjectsController, + public IAlterPreparationController<TObject> { +private: + const TActorIdentity ActorId; +public: + using TPtr = std::shared_ptr<TProcessingController>; + TProcessingController(const TActorIdentity actorId) + : ActorId(actorId) + { + + } + + virtual void RestoreFinished(std::vector<TObject>&& objects, const TString& transactionId) override { + ActorId.Send(ActorId, new TEvRestoreFinished<TObject>(std::move(objects), transactionId)); + } + virtual void RestoreProblem(const TString& errorMessage) override { + ActorId.Send(ActorId, new TEvRestoreProblem(errorMessage)); + } + virtual void ModificationFinished() override { + ActorId.Send(ActorId, new TEvModificationFinished()); + } + virtual void ModificationProblem(const TString& errorMessage) override { + ActorId.Send(ActorId, new TEvModificationProblem(errorMessage)); + } + virtual void PreparationProblem(const TString& errorMessage) override { + ActorId.Send(ActorId, new TEvAlterPreparationProblem(errorMessage)); + } + virtual void PreparationFinished(std::vector<TObject>&& objects) override { + ActorId.Send(ActorId, new TEvAlterPreparationFinished<TObject>(std::move(objects))); + } + +}; + +template <class TObject> +class TModificationActorImpl: public NActors::TActorBootstrapped<TModificationActorImpl<TObject>> { +private: + using TBase = NActors::TActorBootstrapped<TModificationActorImpl<TObject>>; +protected: + TString SessionId; + TString TransactionId; + typename TProcessingController<TObject>::TPtr InternalController; + IAlterController::TPtr ExternalController; + const NMetadata::IOperationsManager::TModificationContext Context; + std::vector<TTableRecord> Patches; + TTableRecords RestoreObjectIds; + virtual bool PrepareRestoredObjects(std::vector<TObject>& objects) const = 0; + virtual bool ProcessPreparedObjects(TTableRecords&& records) const = 0; + virtual void InitState() = 0; + virtual bool BuildRestoreObjectIds() = 0; +public: + TModificationActorImpl(TTableRecord&& patch, IAlterController::TPtr controller, const NMetadata::IOperationsManager::TModificationContext& context) + : ExternalController(controller) + , Context(context) { + Patches.emplace_back(std::move(patch)); + } + + TModificationActorImpl(const TTableRecord& patch, IAlterController::TPtr controller, const NMetadata::IOperationsManager::TModificationContext& context) + : ExternalController(controller) + , Context(context) { + Patches.emplace_back(patch); + } + + TModificationActorImpl(std::vector<TTableRecord>&& patches, IAlterController::TPtr controller, const NMetadata::IOperationsManager::TModificationContext& context) + : ExternalController(controller) + , Context(context) + , Patches(std::move(patches)) { + + } + + TModificationActorImpl(const std::vector<TTableRecord>& patches, IAlterController::TPtr controller, const NMetadata::IOperationsManager::TModificationContext& context) + : ExternalController(controller) + , Context(context) + , Patches(patches) { + + } + + STATEFN(StateMain) { + switch (ev->GetTypeRewrite()) { + hFunc(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogCreateSession>, Handle); + hFunc(TEvRestoreFinished<TObject>, Handle); + hFunc(TEvAlterPreparationFinished<TObject>, Handle); + hFunc(NInternal::NRequest::TEvRequestFailed, Handle); + hFunc(TEvRestoreProblem, Handle); + hFunc(TEvAlterPreparationProblem, Handle); + default: + break; + } + } + + void Bootstrap() { + InitState(); + if (!Patches.size()) { + ExternalController->AlterProblem("no patches"); + return TBase::PassAway(); + } + if (!BuildRestoreObjectIds()) { + return TBase::PassAway(); + } + + TBase::Register(new NInternal::NRequest::TYDBRequest<NInternal::NRequest::TDialogCreateSession>( + NInternal::NRequest::TDialogCreateSession::TRequest(), TBase::SelfId())); + } + + void Handle(typename NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogCreateSession>::TPtr& ev) { + Ydb::Table::CreateSessionResponse currentFullReply = ev->Get()->GetResult(); + Ydb::Table::CreateSessionResult session; + currentFullReply.operation().result().UnpackTo(&session); + SessionId = session.session_id(); + Y_VERIFY(SessionId); + + InternalController = std::make_shared<TProcessingController<TObject>>(TBase::SelfId()); + TBase::Register(new TRestoreObjectsActor<TObject>(RestoreObjectIds, InternalController, SessionId)); + } + + void Handle(typename TEvRestoreFinished<TObject>::TPtr& ev) { + TransactionId = ev->Get()->GetTransactionId(); + Y_VERIFY(TransactionId); + std::vector<TObject> objects = std::move(ev->Get()->MutableObjects()); + if (!PrepareRestoredObjects(objects)) { + TBase::PassAway(); + } else { + TObject::AlteringPreparation(std::move(objects), InternalController, Context); + } + } + + void Handle(typename TEvAlterPreparationFinished<TObject>::TPtr& ev) { + TTableRecords records; + records.InitColumns(TObject::TDecoder::GetColumns()); + records.ReserveRows(ev->Get()->GetObjects().size()); + for (auto&& i : ev->Get()->GetObjects()) { + if (!records.AddRecordNativeValues(i.SerializeToRecord())) { + ExternalController->AlterProblem("unexpected serialization inconsistency"); + return TBase::PassAway(); + } + } + if (!ProcessPreparedObjects(std::move(records))) { + ExternalController->AlterProblem("cannot process prepared objects"); + return TBase::PassAway(); + } + } + + void Handle(typename NInternal::NRequest::TEvRequestFailed::TPtr& /*ev*/) { + auto g = TBase::PassAwayGuard(); + ExternalController->AlterProblem("cannot initialize session"); + } + + void Handle(TEvAlterPreparationProblem::TPtr& ev) { + auto g = TBase::PassAwayGuard(); + ExternalController->AlterProblem("preparation problem: " + ev->Get()->GetErrorMessage()); + } + + void Handle(TEvRestoreProblem::TPtr& ev) { + auto g = TBase::PassAwayGuard(); + ExternalController->AlterProblem("cannot restore objects: " + ev->Get()->GetErrorMessage()); + } + +}; + +template <class TObject> +class TModificationActor: public TModificationActorImpl<TObject> { +private: + using TBase = TModificationActorImpl<TObject>; +protected: + virtual void InitState() override { + TBase::Become(&TModificationActor<TObject>::StateMain); + } + + virtual bool BuildRestoreObjectIds() override { + TBase::RestoreObjectIds.InitColumns(TObject::TDecoder::GetPKColumns()); + for (auto&& i : TBase::Patches) { + if (!TBase::RestoreObjectIds.AddRecordNativeValues(i)) { + TBase::ExternalController->AlterProblem("no pk columns in patch"); + return false; + } + } + return true; + } + + virtual TString GetModificationType() const = 0; + +public: + using TBase::TBase; + STFUNC(StateMain) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvModificationFinished, Handle); + hFunc(TEvModificationProblem, Handle); + default: + TBase::StateMain(ev, ctx); + } + } + + virtual bool PrepareRestoredObjects(std::vector<TObject>& objects) const override { + std::vector<bool> realPatches; + realPatches.resize(TBase::Patches.size(), false); + for (auto&& i : objects) { + const TTableRecord* trPatch = nullptr; + TTableRecord trObject = i.SerializeToRecord(); + for (auto&& p : TBase::Patches) { + if (p.CompareColumns(trObject, TObject::TDecoder::GetPKColumnIds())) { + trPatch = &p; + break; + } + } + TObject objectPatched; + if (!trPatch) { + TBase::ExternalController->AlterProblem("cannot found patch for object"); + return false; + } else if (!trObject.TakeValuesFrom(*trPatch)) { + TBase::ExternalController->AlterProblem("cannot patch object"); + return false; + } else if (!TObject::TDecoder::DeserializeFromRecord(objectPatched, trObject)) { + TBase::ExternalController->AlterProblem("cannot parse object after patch"); + return false; + } else { + i = std::move(objectPatched); + } + } + for (auto&& p : TBase::Patches) { + bool found = false; + for (auto&& i : objects) { + if (i.SerializeToRecord().CompareColumns(p, TObject::TDecoder::GetPKColumnIds())) { + found = true; + break; + } + } + if (!found) { + TObject object; + if (!TObject::TDecoder::DeserializeFromRecord(object, p)) { + TBase::ExternalController->AlterProblem("cannot parse new object"); + return false; + } + objects.emplace_back(std::move(object)); + } + } + return true; + } + + void Handle(TEvModificationFinished::TPtr& /*ev*/) { + auto g = TBase::PassAwayGuard(); + TBase::ExternalController->AlterFinished(); + } + + void Handle(TEvModificationProblem::TPtr& ev) { + auto g = TBase::PassAwayGuard(); + TBase::ExternalController->AlterProblem("cannot " + GetModificationType() + " objects: " + ev->Get()->GetErrorMessage()); + } + +}; + +template <class TObject> +class TAlterActor: public TModificationActor<TObject> { +private: + using TBase = TModificationActor<TObject>; +protected: + virtual bool ProcessPreparedObjects(TTableRecords&& records) const override { + TBase::Register(new TUpdateObjectsActor<TObject>(std::move(records), + TBase::InternalController, TBase::SessionId, TBase::TransactionId, TBase::Context.GetUserToken())); + return true; + } + + virtual TString GetModificationType() const override { + return "ALTER"; + } +public: + using TBase::TBase; +}; + +template <class TObject> +class TCreateActor: public TModificationActor<TObject> { +private: + using TBase = TModificationActor<TObject>; +protected: + virtual bool ProcessPreparedObjects(TTableRecords&& records) const override { + TBase::Register(new TInsertObjectsActor<TObject>(std::move(records), + TBase::InternalController, TBase::SessionId, TBase::TransactionId, TBase::Context.GetUserToken())); + return true; + } + + virtual TString GetModificationType() const override { + return "CREATE"; + } +public: + using TBase::TBase; +}; + +template <class TObject> +class TDropActor: public TModificationActor<TObject> { +private: + using TBase = TModificationActor<TObject>; +protected: + virtual void InitState() override { + TBase::Become(&TDropActor<TObject>::StateMain); + } + + virtual bool BuildRestoreObjectIds() override { + auto& first = TBase::Patches.front(); + std::vector<Ydb::Column> columns = first.SelectOwnedColumns(TObject::TDecoder::GetPKColumns()); + if (!columns.size()) { + TBase::ExternalController->AlterProblem("no pk columns in patch"); + return false; + } + if (!columns.size()) { + TBase::ExternalController->AlterProblem("no columns for pk detection"); + return false; + } + TBase::RestoreObjectIds.InitColumns(columns); + for (auto&& i : TBase::Patches) { + if (!TBase::RestoreObjectIds.AddRecordNativeValues(i)) { + TBase::ExternalController->AlterProblem("incorrect pk columns"); + return false; + } + } + return true; + } + virtual TString GetModificationType() const override { + return "DROP"; + } +public: + using TBase::TBase; + + virtual bool ProcessPreparedObjects(TTableRecords&& records) const override { + TBase::Register(new TDeleteObjectsActor<TObject>(std::move(records), + TBase::InternalController, TBase::SessionId, TBase::TransactionId, TBase::Context.GetUserToken())); + return true; + } + + virtual bool PrepareRestoredObjects(std::vector<TObject>& /*objects*/) const override { + return true; + } + +}; + +template <class TObject> +class TCreateCommand: public NMetadata::IAlterCommand { +private: + using TBase = NMetadata::IAlterCommand; +protected: + virtual void DoExecute() const override { + TActivationContext::AsActorContext().Register(new NMetadataManager::TCreateActor<TObject>(GetRecords(), GetController(), GetContext())); + } +public: + using TBase::TBase; +}; + +template <class TObject> +class TAlterCommand: public NMetadata::IAlterCommand { +private: + using TBase = NMetadata::IAlterCommand; +protected: + virtual void DoExecute() const override { + TActivationContext::AsActorContext().Register(new NMetadataManager::TAlterActor<TObject>(GetRecords(), GetController(), GetContext())); + } +public: + using TBase::TBase; +}; + +template <class TObject> +class TDropCommand: public NMetadata::IAlterCommand { +private: + using TBase = NMetadata::IAlterCommand; +protected: + virtual void DoExecute() const override { + TActivationContext::AsActorContext().Register(new NMetadataManager::TDropActor<TObject>(GetRecords(), GetController(), GetContext())); + } +public: + using TBase::TBase; +}; + +} diff --git a/ydb/services/metadata/manager/common.cpp b/ydb/services/metadata/manager/common.cpp new file mode 100644 index 00000000000..4c7a303e26f --- /dev/null +++ b/ydb/services/metadata/manager/common.cpp @@ -0,0 +1,5 @@ +#include "common.h" + +namespace NKikimr::NMetadataManager { + +} diff --git a/ydb/services/metadata/manager/common.h b/ydb/services/metadata/manager/common.h new file mode 100644 index 00000000000..2001262d6ce --- /dev/null +++ b/ydb/services/metadata/manager/common.h @@ -0,0 +1,30 @@ +#pragma once +#include <ydb/core/base/events.h> +#include <library/cpp/actors/core/events.h> + +namespace NKikimr::NMetadataManager { + +class IAlterController { +public: + using TPtr = std::shared_ptr<IAlterController>; + virtual ~IAlterController() = default; + + virtual void AlterProblem(const TString& errorMessage) = 0; + virtual void AlterFinished() = 0; + +}; + +enum EEvents { + EvRestoreFinished = EventSpaceBegin(TKikimrEvents::ES_METADATA_MANAGER), + EvRestoreProblem, + EvModificationFinished, + EvModificationProblem, + EvAlterFinished, + EvAlterProblem, + EvAlterPreparationFinished, + EvAlterPreparationProblem, + EvEnd +}; + +static_assert(EEvents::EvEnd < EventSpaceEnd(TKikimrEvents::ES_METADATA_MANAGER), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_METADATA_MANAGER)"); +} diff --git a/ydb/services/metadata/manager/generic_manager.cpp b/ydb/services/metadata/manager/generic_manager.cpp new file mode 100644 index 00000000000..51d4f0ed926 --- /dev/null +++ b/ydb/services/metadata/manager/generic_manager.cpp @@ -0,0 +1,5 @@ +#include "generic_manager.h" + +namespace NKikimr::NColumnShard::NTiers { + +} diff --git a/ydb/services/metadata/manager/generic_manager.h b/ydb/services/metadata/manager/generic_manager.h new file mode 100644 index 00000000000..3d7eda96dc2 --- /dev/null +++ b/ydb/services/metadata/manager/generic_manager.h @@ -0,0 +1,93 @@ +#pragma once +#include <ydb/services/metadata/abstract/manager.h> +#include <ydb/services/metadata/manager/common.h> +#include <ydb/services/metadata/service.h> + +namespace NKikimr::NMetadata { + +class TOperationsController: public NMetadataManager::IAlterController { +private: + YDB_READONLY_DEF(NThreading::TPromise<NMetadata::TObjectOperatorResult>, Promise); +public: + TOperationsController(NThreading::TPromise<NMetadata::TObjectOperatorResult>&& p) + : Promise(std::move(p)) + { + + } + + virtual void AlterProblem(const TString& errorMessage) override { + Promise.SetValue(NMetadata::TObjectOperatorResult(false).SetErrorMessage(errorMessage)); + } + virtual void AlterFinished() override { + Promise.SetValue(NMetadata::TObjectOperatorResult(true)); + } + +}; + +template <class T> +class TGenericOperationsManager: public NMetadata::IOperationsManager { +protected: + virtual NThreading::TFuture<NMetadata::TObjectOperatorResult> DoCreateObject( + const NYql::TCreateObjectSettings& settings, const ui32 nodeId, + NMetadata::IOperationsManager::TPtr manager, const TModificationContext& context) const override + { + NMetadata::TOperationParsingResult patch(T::BuildPatchFromSettings(settings, context)); + if (!patch.IsSuccess()) { + NMetadata::TObjectOperatorResult result(patch.GetErrorMessage()); + return NThreading::MakeFuture<NMetadata::TObjectOperatorResult>(result); + } + auto promise = NThreading::NewPromise<NMetadata::TObjectOperatorResult>(); + auto result = promise.GetFuture(); + auto c = std::make_shared<TOperationsController>(std::move(promise)); + auto command = std::make_shared<NMetadataManager::TCreateCommand<T>>(patch.GetRecord(), manager, c, context); + TActivationContext::Send(new IEventHandle(NMetadataProvider::MakeServiceId(nodeId), {}, + new NMetadataProvider::TEvAlterObjects(command))); + return result; + } + virtual NThreading::TFuture<NMetadata::TObjectOperatorResult> DoAlterObject( + const NYql::TAlterObjectSettings& settings, const ui32 nodeId, + NMetadata::IOperationsManager::TPtr manager, const TModificationContext& context) const override + { + NMetadata::TOperationParsingResult patch(T::BuildPatchFromSettings(settings, context)); + if (!patch.IsSuccess()) { + return NThreading::MakeFuture<NMetadata::TObjectOperatorResult>(NMetadata::TObjectOperatorResult(patch.GetErrorMessage())); + } + auto promise = NThreading::NewPromise<NMetadata::TObjectOperatorResult>(); + auto result = promise.GetFuture(); + auto c = std::make_shared<TOperationsController>(std::move(promise)); + auto command = std::make_shared<NMetadataManager::TAlterCommand<T>>(patch.GetRecord(), manager, c, context); + TActivationContext::Send(new IEventHandle(NMetadataProvider::MakeServiceId(nodeId), {}, + new NMetadataProvider::TEvAlterObjects(command))); + return result; + } + virtual NThreading::TFuture<NMetadata::TObjectOperatorResult> DoDropObject( + const NYql::TDropObjectSettings& settings, const ui32 nodeId, + NMetadata::IOperationsManager::TPtr manager, const TModificationContext& context) const override + { + NMetadata::TOperationParsingResult patch(T::BuildPatchFromSettings(settings, context)); + if (!patch.IsSuccess()) { + return NThreading::MakeFuture<NMetadata::TObjectOperatorResult>(NMetadata::TObjectOperatorResult(patch.GetErrorMessage())); + } + auto promise = NThreading::NewPromise<NMetadata::TObjectOperatorResult>(); + auto result = promise.GetFuture(); + auto c = std::make_shared<TOperationsController>(std::move(promise)); + auto command = std::make_shared<NMetadataManager::TDropCommand<T>>(patch.GetRecord(), manager, c, context); + TActivationContext::Send(new IEventHandle(NMetadataProvider::MakeServiceId(nodeId), {}, + new NMetadataProvider::TEvAlterObjects(command))); + return result; + } +public: + virtual TString GetTablePath() const override { + return T::GetStorageTablePath(); + } + + virtual TString GetTypeId() const override { + return GetTypeIdStatic(); + } + + static TString GetTypeIdStatic() { + return T::GetTypeId(); + } +}; + +} diff --git a/ydb/services/metadata/manager/modification.cpp b/ydb/services/metadata/manager/modification.cpp new file mode 100644 index 00000000000..8ef18dcb38f --- /dev/null +++ b/ydb/services/metadata/manager/modification.cpp @@ -0,0 +1,5 @@ +#include "modification.h" + +namespace NKikimr::NMetadataManager { + +} diff --git a/ydb/services/metadata/manager/modification.h b/ydb/services/metadata/manager/modification.h new file mode 100644 index 00000000000..bac2c54496e --- /dev/null +++ b/ydb/services/metadata/manager/modification.h @@ -0,0 +1,157 @@ +#pragma once +#include "table_record.h" +#include "modification_controller.h" +#include "ydb_value_operator.h" + +#include <ydb/library/aclib/aclib.h> +#include <ydb/services/metadata/request/request_actor.h> + +#include <library/cpp/actors/core/actor_bootstrapped.h> + +namespace NKikimr::NMetadataManager { + +template <class TObject> +class TModifyObjectsActor: public NActors::TActorBootstrapped<TModifyObjectsActor<TObject>> { +private: + using TBase = NActors::TActorBootstrapped<TModifyObjectsActor<TObject>>; + IModificationObjectsController::TPtr Controller; + const TString SessionId; + const TString TransactionId; + const TMaybe<NACLib::TUserToken> UserToken; + std::deque<NInternal::NRequest::TDialogYQLRequest::TRequest> Requests; +protected: + TTableRecords Objects; + virtual Ydb::Table::ExecuteDataQueryRequest BuildModifyQuery() const = 0; + virtual TString GetModifyType() const = 0; + + void BuildRequestDirect() { + Ydb::Table::ExecuteDataQueryRequest request = BuildModifyQuery(); + request.set_session_id(SessionId); + request.mutable_tx_control()->set_tx_id(TransactionId); + Requests.emplace_back(std::move(request)); + } + + void BuildRequestHistory() { + if (!TObject::GetStorageHistoryTablePath()) { + return; + } + if (UserToken) { + Objects.AddColumn(TYDBColumn::Bytes("historyUserId"), TYDBValue::Bytes(UserToken->GetUserSID())); + } + Objects.AddColumn(TYDBColumn::UInt64("historyInstant"), TYDBValue::UInt64(TActivationContext::Now().MicroSeconds())); + Objects.AddColumn(TYDBColumn::Bytes("historyAction"), TYDBValue::Bytes(GetModifyType())); + Ydb::Table::ExecuteDataQueryRequest request = Objects.BuildInsertQuery(TObject::GetStorageHistoryTablePath()); + request.set_session_id(SessionId); + request.mutable_tx_control()->set_tx_id(TransactionId); + request.mutable_tx_control()->set_commit_tx(true); + Requests.emplace_back(std::move(request)); + } + + void Handle(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogYQLRequest>::TPtr& /*ev*/) { + if (Requests.size()) { + TBase::Register(new NInternal::NRequest::TYDBRequest<NInternal::NRequest::TDialogYQLRequest>(Requests.front(), TBase::SelfId())); + Requests.pop_front(); + } else { + Controller->ModificationFinished(); + TBase::PassAway(); + } + } + + void Handle(NInternal::NRequest::TEvRequestFailed::TPtr& /*ev*/) { + auto g = TBase::PassAwayGuard(); + Controller->ModificationProblem("cannot execute yql request for upsert objects"); + } + +public: + TModifyObjectsActor(TTableRecords&& objects, IModificationObjectsController::TPtr controller, const TString& sessionId, + const TString& transactionId, const TMaybe<NACLib::TUserToken>& userToken) + : Controller(controller) + , SessionId(sessionId) + , TransactionId(transactionId) + , UserToken(userToken) + , Objects(std::move(objects)) + + { + Y_VERIFY(SessionId); + } + + STATEFN(StateMain) { + switch (ev->GetTypeRewrite()) { + hFunc(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogYQLRequest>, Handle); + hFunc(NInternal::NRequest::TEvRequestFailed, Handle); + default: + break; + } + } + + void Bootstrap() { + TBase::Become(&TModifyObjectsActor::StateMain); + BuildRequestDirect(); + BuildRequestHistory(); + TBase::Register(new NInternal::NRequest::TYDBRequest<NInternal::NRequest::TDialogYQLRequest>(Requests.front(), TBase::SelfId())); + Requests.pop_front(); + } +}; + +template <class TObject> +class TUpsertObjectsActor: public TModifyObjectsActor<TObject> { +private: + using TBase = TModifyObjectsActor<TObject>; +protected: + virtual Ydb::Table::ExecuteDataQueryRequest BuildModifyQuery() const override { + return TBase::Objects.BuildUpsertQuery(TObject::GetStorageTablePath()); + } + virtual TString GetModifyType() const override { + return "upsert"; + } +public: + using TBase::TBase; +}; + +template <class TObject> +class TUpdateObjectsActor: public TModifyObjectsActor<TObject> { +private: + using TBase = TModifyObjectsActor<TObject>; +protected: + virtual Ydb::Table::ExecuteDataQueryRequest BuildModifyQuery() const override { + return TBase::Objects.BuildUpdateQuery(TObject::GetStorageTablePath()); + } + virtual TString GetModifyType() const override { + return "update"; + } +public: + using TBase::TBase; +}; + +template <class TObject> +class TDeleteObjectsActor: public TModifyObjectsActor<TObject> { +private: + using TBase = TModifyObjectsActor<TObject>; +protected: + virtual Ydb::Table::ExecuteDataQueryRequest BuildModifyQuery() const override { + auto objectIds = TBase::Objects.SelectColumns(TObject::TDecoder::GetPKColumnIds()); + return objectIds.BuildDeleteQuery(TObject::GetStorageTablePath()); + } + virtual TString GetModifyType() const override { + return "delete"; + } +public: + using TBase::TBase; +}; + +template <class TObject> +class TInsertObjectsActor: public TModifyObjectsActor<TObject> { +private: + using TBase = TModifyObjectsActor<TObject>; +protected: + virtual Ydb::Table::ExecuteDataQueryRequest BuildModifyQuery() const override { + return TBase::Objects.BuildInsertQuery(TObject::GetStorageTablePath()); + } + virtual TString GetModifyType() const override { + return "insert"; + } +public: + using TBase::TBase; +}; + +} diff --git a/ydb/services/metadata/manager/modification_controller.cpp b/ydb/services/metadata/manager/modification_controller.cpp new file mode 100644 index 00000000000..ee8f24b7c8e --- /dev/null +++ b/ydb/services/metadata/manager/modification_controller.cpp @@ -0,0 +1,5 @@ +#include "modification_controller.h" + +namespace NKikimr::NMetadataManager { + +} diff --git a/ydb/services/metadata/manager/modification_controller.h b/ydb/services/metadata/manager/modification_controller.h new file mode 100644 index 00000000000..f935924d6ee --- /dev/null +++ b/ydb/services/metadata/manager/modification_controller.h @@ -0,0 +1,29 @@ +#pragma once +#include "common.h" +#include <ydb/library/accessor/accessor.h> + +namespace NKikimr::NMetadataManager { + +class IModificationObjectsController { +public: + using TPtr = std::shared_ptr<IModificationObjectsController>; + virtual ~IModificationObjectsController() = default; + virtual void ModificationProblem(const TString& errorMessage) = 0; + virtual void ModificationFinished() = 0; +}; + +class TEvModificationFinished: public TEventLocal<TEvModificationFinished, EvModificationFinished> { +public: +}; + +class TEvModificationProblem: public TEventLocal<TEvModificationProblem, EvModificationProblem> { +private: + YDB_ACCESSOR_DEF(TString, ErrorMessage); +public: + TEvModificationProblem(const TString& errorMessage) + : ErrorMessage(errorMessage) { + + } +}; + +} diff --git a/ydb/services/metadata/manager/object.cpp b/ydb/services/metadata/manager/object.cpp new file mode 100644 index 00000000000..e981c14049a --- /dev/null +++ b/ydb/services/metadata/manager/object.cpp @@ -0,0 +1,27 @@ +#include "object.h" + +namespace NKikimr::NMetadataManager { + +Ydb::Table::CreateTableRequest TBaseObject::AddHistoryTableScheme(const Ydb::Table::CreateTableRequest& baseScheme, const TString& tableName) { + Ydb::Table::CreateTableRequest result = baseScheme; + result.add_primary_key("historyInstant"); + result.set_path(tableName); + { + auto& column = *result.add_columns(); + column.set_name("historyAction"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *result.add_columns(); + column.set_name("historyUserId"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *result.add_columns(); + column.set_name("historyInstant"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT64); + } + return result; +} + +} diff --git a/ydb/services/metadata/manager/object.h b/ydb/services/metadata/manager/object.h new file mode 100644 index 00000000000..ee54b389cc1 --- /dev/null +++ b/ydb/services/metadata/manager/object.h @@ -0,0 +1,24 @@ +#pragma once +#include <ydb/public/api/protos/ydb_table.pb.h> + +namespace NKikimr::NMetadataManager { + +class TBaseObject { +public: + static Ydb::Table::CreateTableRequest AddHistoryTableScheme(const Ydb::Table::CreateTableRequest& baseScheme, const TString& tableName); + +}; + +template <class TDerived> +class TObject: public TBaseObject { +public: + static TString GetStorageHistoryTablePath() { + return TDerived::GetStorageTablePath() + "_history"; + } + + static Ydb::Table::CreateTableRequest AddHistoryTableScheme(const Ydb::Table::CreateTableRequest& baseScheme) { + return TBaseObject::AddHistoryTableScheme(baseScheme, TDerived::GetStorageHistoryTablePath()); + } +}; + +} diff --git a/ydb/services/metadata/manager/preparation_controller.cpp b/ydb/services/metadata/manager/preparation_controller.cpp new file mode 100644 index 00000000000..84a3df1216e --- /dev/null +++ b/ydb/services/metadata/manager/preparation_controller.cpp @@ -0,0 +1,5 @@ +#include "restore_controller.h" + +namespace NKikimr::NMetadataManager { + +} diff --git a/ydb/services/metadata/manager/preparation_controller.h b/ydb/services/metadata/manager/preparation_controller.h new file mode 100644 index 00000000000..658b5a6385f --- /dev/null +++ b/ydb/services/metadata/manager/preparation_controller.h @@ -0,0 +1,40 @@ +#pragma once +#include "common.h" + +#include <ydb/library/accessor/accessor.h> + +namespace NKikimr::NMetadataManager { + +template <class TObject> +class IAlterPreparationController { +public: + using TPtr = std::shared_ptr<IAlterPreparationController>; + virtual ~IAlterPreparationController() = default; + + virtual void PreparationFinished(std::vector<TObject>&& objects) = 0; + virtual void PreparationProblem(const TString& errorMessage) = 0; +}; + +template <class TObject> +class TEvAlterPreparationFinished: public TEventLocal<TEvAlterPreparationFinished<TObject>, EvAlterPreparationFinished> { +private: + YDB_READONLY_DEF(std::vector<TObject>, Objects); +public: + TEvAlterPreparationFinished(std::vector<TObject>&& objects) + : Objects(std::move(objects)) + { + + } +}; + +class TEvAlterPreparationProblem: public TEventLocal<TEvAlterPreparationProblem, EvAlterPreparationProblem> { +private: + YDB_ACCESSOR_DEF(TString, ErrorMessage); +public: + TEvAlterPreparationProblem(const TString& errorMessage) + : ErrorMessage(errorMessage) { + + } +}; + +} diff --git a/ydb/services/metadata/manager/restore.cpp b/ydb/services/metadata/manager/restore.cpp new file mode 100644 index 00000000000..43fe115ea02 --- /dev/null +++ b/ydb/services/metadata/manager/restore.cpp @@ -0,0 +1,5 @@ +#include "restore.h" + +namespace NKikimr::NMetadataManager { + +} diff --git a/ydb/services/metadata/manager/restore.h b/ydb/services/metadata/manager/restore.h new file mode 100644 index 00000000000..6506e5d6292 --- /dev/null +++ b/ydb/services/metadata/manager/restore.h @@ -0,0 +1,76 @@ +#pragma once +#include "table_record.h" + +#include <ydb/services/metadata/manager/restore_controller.h> +#include <ydb/services/metadata/request/request_actor.h> + +#include <library/cpp/actors/core/actor_bootstrapped.h> + +namespace NKikimr::NMetadataManager { + +template <class TObject> +class TRestoreObjectsActor: public NActors::TActorBootstrapped<TRestoreObjectsActor<TObject>> { +private: + using TBase = NActors::TActorBootstrapped<TRestoreObjectsActor<TObject>>; + using IRestoreObjectsController = IRestoreObjectsController<TObject>; + typename IRestoreObjectsController::TPtr Controller; + const TTableRecords ObjectIds; + TString SessionId; + + void Handle(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogSelect>::TPtr& ev) { + auto g = TBase::PassAwayGuard(); + const NInternal::NRequest::TDialogSelect::TResponse& result = ev->Get()->GetResult(); + Ydb::Table::ExecuteQueryResult qResult; + result.operation().result().UnpackTo(&qResult); + Y_VERIFY((size_t)qResult.result_sets().size() == 1); + + typename TObject::TDecoder decoder(qResult.result_sets()[0]); + std::vector<TObject> objects; + for (auto&& row : qResult.result_sets()[0].rows()) { + TObject object; + if (!object.DeserializeFromRecord(decoder, row)) { + Controller->RestoreProblem("cannot parse exists object"); + return; + } + objects.emplace_back(std::move(object)); + } + Controller->RestoreFinished(std::move(objects), qResult.tx_meta().id()); + } + + void Handle(NInternal::NRequest::TEvRequestFailed::TPtr& /*ev*/) { + auto g = TBase::PassAwayGuard(); + Controller->RestoreProblem("cannot execute yql request"); + } + +public: + TRestoreObjectsActor(const TTableRecords& objectIds, typename IRestoreObjectsController::TPtr controller, const TString& sessionId) + : Controller(controller) + , ObjectIds(objectIds) + , SessionId(sessionId) + { + Y_VERIFY(SessionId); + } + + STATEFN(StateMain) { + switch (ev->GetTypeRewrite()) { + hFunc(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogSelect>, Handle); + hFunc(NInternal::NRequest::TEvRequestFailed, Handle); + default: + break; + } + } + + void Bootstrap() { + if (ObjectIds.empty()) { + Controller->RestoreProblem("no objects for restore"); + TBase::PassAway(); + } + auto request = ObjectIds.BuildSelectQuery(TObject::GetStorageTablePath()); + request.mutable_tx_control()->mutable_begin_tx()->mutable_serializable_read_write(); + request.set_session_id(SessionId); + TBase::Become(&TRestoreObjectsActor::StateMain); + TBase::Register(new NInternal::NRequest::TYDBRequest<NInternal::NRequest::TDialogSelect>(request, TBase::SelfId())); + } +}; + +} diff --git a/ydb/services/metadata/manager/restore_controller.cpp b/ydb/services/metadata/manager/restore_controller.cpp new file mode 100644 index 00000000000..84a3df1216e --- /dev/null +++ b/ydb/services/metadata/manager/restore_controller.cpp @@ -0,0 +1,5 @@ +#include "restore_controller.h" + +namespace NKikimr::NMetadataManager { + +} diff --git a/ydb/services/metadata/manager/restore_controller.h b/ydb/services/metadata/manager/restore_controller.h new file mode 100644 index 00000000000..0a81e6eeff2 --- /dev/null +++ b/ydb/services/metadata/manager/restore_controller.h @@ -0,0 +1,42 @@ +#pragma once +#include "common.h" + +#include <ydb/library/accessor/accessor.h> + +namespace NKikimr::NMetadataManager { + +template <class TObject> +class IRestoreObjectsController { +public: + using TPtr = std::shared_ptr<IRestoreObjectsController>; + virtual ~IRestoreObjectsController() = default; + + virtual void RestoreFinished(std::vector<TObject>&& objects, const TString& transactionId) = 0; + virtual void RestoreProblem(const TString& errorMessage) = 0; +}; + +template <class TObject> +class TEvRestoreFinished: public TEventLocal<TEvRestoreFinished<TObject>, EvRestoreFinished> { +private: + YDB_ACCESSOR_DEF(std::vector<TObject>, Objects); + YDB_READONLY_DEF(TString, TransactionId); +public: + TEvRestoreFinished(std::vector<TObject>&& objects, const TString& transactionId) + : Objects(std::move(objects)) + , TransactionId(transactionId) + { + + } +}; + +class TEvRestoreProblem: public TEventLocal<TEvRestoreProblem, EvRestoreProblem> { +private: + YDB_ACCESSOR_DEF(TString, ErrorMessage); +public: + TEvRestoreProblem(const TString& errorMessage) + : ErrorMessage(errorMessage) { + + } +}; + +} diff --git a/ydb/services/metadata/manager/table_record.cpp b/ydb/services/metadata/manager/table_record.cpp new file mode 100644 index 00000000000..fa2c9eb42d9 --- /dev/null +++ b/ydb/services/metadata/manager/table_record.cpp @@ -0,0 +1,294 @@ +#include "table_record.h" +#include "ydb_value_operator.h" + +#include <ydb/core/protos/services.pb.h> + +#include <library/cpp/actors/core/log.h> +#include <util/string/builder.h> +#include <util/string/join.h> + +namespace NKikimr::NMetadataManager { + +bool TTableRecord::CompareColumns(const TTableRecord& item, const std::vector<TString>& columnIds) const { + for (auto&& i : columnIds) { + auto itSelf = Values.find(i); + auto itItem = item.Values.find(i); + if (itSelf == Values.end()) { + if (itItem == item.Values.end()) { + continue; + } else { + return false; + } + } else if (itItem == item.Values.end()) { + return false; + } else { + if (!TYDBValue::Compare(itSelf->second, itItem->second)) { + return false; + } + } + } + return true; +} + +bool TTableRecord::HasColumns(const std::vector<TString>& columnIds) const { + for (auto&& i : columnIds) { + if (!Values.contains(i)) { + return false; + } + } + return true; +} + +ui32 TTableRecord::CountIntersectColumns(const std::vector<TString>& columnIds) const { + ui32 result = 0; + for (auto&& i : columnIds) { + if (Values.contains(i)) { + ++result; + } + } + return result; +} + +bool TTableRecord::TakeValuesFrom(const TTableRecord& item) { + for (auto&& i : item.Values) { + Values[i.first] = i.second; + } + return true; +} + +const Ydb::Value* TTableRecord::GetValuePtr(const TString& columnId) const { + auto it = Values.find(columnId); + if (it == Values.end()) { + return nullptr; + } + return &it->second; +} + +NKikimr::NMetadataManager::TTableRecord& TTableRecord::SetColumn(const TString& columnId, const Ydb::Value& v) { + Values[columnId] = v; + return *this; +} + +Ydb::ResultSet TTableRecord::BuildRecordSet() const { + Ydb::ResultSet result; + Ydb::Value row; + for (auto&& i : Values) { + Ydb::Column column; + column.set_name(i.first); +// column.set_type(i.second.type()); + *result.add_columns() = column; + *row.add_items() = i.second; + } + *result.add_rows() = row; + return result; +} + +bool TTableRecord::SameColumns(const TTableRecord& item) const { + if (Values.size() != item.Values.size()) { + return false; + } + auto itSelf = Values.begin(); + auto itItem = item.Values.begin(); + for (; itSelf != Values.end() && itItem != Values.end(); ++itSelf, ++itItem) { + if (itSelf->first != itItem->first) { + return false; + } + } + return true; +} + +std::vector<Ydb::Column> TTableRecord::SelectOwnedColumns(const std::vector<Ydb::Column>& columns) const { + std::vector<Ydb::Column> result; + for (auto&& i : columns) { + if (Values.contains(i.name())) { + result.emplace_back(i); + } + } + return result; +} + +ui32 TTableRecords::AddRecordImpl(const TTableRecord& record) { + Y_VERIFY(Columns.size()); + ui32 foundColumns = 0; + Records.resize(Records.size() + 1); + for (ui32 i = 0; i < Columns.size(); ++i) { + const Ydb::Value* v = record.GetValuePtr(Columns[i].name()); + if (v) { + if (!TYDBValue::IsSameType(*v, Columns[i].type())) { + ALS_ERROR(NKikimrServices::METADATA_MANAGER); + Y_VERIFY_DEBUG(false); + continue; + } + ++foundColumns; + *Records.back().add_items() = *v; + } else { + *Records.back().add_items() = TYDBValue::NullValue(); + } + } + return foundColumns; +} + +Ydb::TypedValue TTableRecords::BuildVariableTupleRecords() const { + Ydb::TypedValue result; + if (Columns.size() > 1) { + for (auto&& r : Records) { + *result.mutable_value()->add_items() = r; + } + auto* tuple = result.mutable_type()->mutable_list_type()->mutable_item()->mutable_tuple_type(); + for (auto&& i : Columns) { + *tuple->add_elements() = i.type(); + } + } else if (Columns.size() == 1) { + for (auto&& r : Records) { + *result.mutable_value()->add_items() = r.items()[0]; + } + *result.mutable_type()->mutable_list_type()->mutable_item() = Columns[0].type(); + } + return result; +} + +Ydb::TypedValue TTableRecords::BuildVariableStructRecords() const { + Ydb::TypedValue result; + for (auto&& r : Records) { + *result.mutable_value()->add_items() = r; + } + auto* structDescr = result.mutable_type()->mutable_list_type()->mutable_item()->mutable_struct_type(); + for (auto&& i : Columns) { + auto& m = *structDescr->add_members(); + *m.mutable_type() = i.type(); + m.set_name(i.name()); + } + return result; +} + +TString TTableRecords::BuildColumnsSchemaTuple() const { + TStringBuilder sb; + if (Columns.size() > 1) { + sb << "Tuple<"; + std::vector<TString> types; + for (auto&& i : Columns) { + types.emplace_back(TYDBValue::TypeToString(i.type())); + } + sb << JoinSeq(", ", types) << ">"; + } else if (Columns.size() == 1) { + sb << TYDBValue::TypeToString(Columns[0].type()); + } + return sb; +} + +TString TTableRecords::BuildColumnsSchemaStruct() const { + TStringBuilder sb; + sb << "Struct<"; + std::vector<TString> types; + for (auto&& i : Columns) { + types.emplace_back(i.name() + ":" + TYDBValue::TypeToString(i.type())); + } + sb << JoinSeq(", ", types) << ">"; + return sb; +} + +std::vector<TString> TTableRecords::GetColumnIds() const { + std::vector<TString> result; + for (auto&& i : Columns) { + result.emplace_back(i.name()); + } + return result; +} + +Ydb::Table::ExecuteDataQueryRequest TTableRecords::BuildUpsertQuery(const TString& tablePath) const { + Ydb::Table::ExecuteDataQueryRequest result; + TStringBuilder sb; + sb << "DECLARE $objects AS List<" << BuildColumnsSchemaStruct() << ">;" << Endl; + sb << "UPSERT INTO `" + tablePath + "`" << Endl; + sb << "SELECT " << JoinSeq(",", GetColumnIds()) << " FROM AS_TABLE($objects)" << Endl; + Cerr << sb << Endl; + result.mutable_query()->set_yql_text(sb); + (*result.mutable_parameters())["$objects"] = BuildVariableStructRecords(); + return result; +} + +Ydb::Table::ExecuteDataQueryRequest TTableRecords::BuildInsertQuery(const TString& tablePath) const { + Ydb::Table::ExecuteDataQueryRequest result; + TStringBuilder sb; + sb << "DECLARE $objects AS List<" << BuildColumnsSchemaStruct() << ">;" << Endl; + sb << "INSERT INTO `" + tablePath + "`" << Endl; + sb << "SELECT " << JoinSeq(",", GetColumnIds()) << " FROM AS_TABLE($objects)" << Endl; + Cerr << sb << Endl; + result.mutable_query()->set_yql_text(sb); + (*result.mutable_parameters())["$objects"] = BuildVariableStructRecords(); + return result; +} + +Ydb::Table::ExecuteDataQueryRequest TTableRecords::BuildSelectQuery(const TString& tablePath) const { + Ydb::Table::ExecuteDataQueryRequest result; + TStringBuilder sb; + sb << "DECLARE $ids AS List<" << BuildColumnsSchemaTuple() << ">;" << Endl; + sb << "SELECT * FROM `" + tablePath + "`" << Endl; + if (GetColumnIds().size() > 1) { + sb << "WHERE (" << JoinSeq(", ", GetColumnIds()) << ") IN $ids" << Endl; + } else if (GetColumnIds().size() == 1) { + sb << "WHERE " << GetColumnIds()[0] << " IN $ids" << Endl; + } + Cerr << sb << Endl; + result.mutable_query()->set_yql_text(sb); + (*result.mutable_parameters())["$ids"] = BuildVariableTupleRecords(); + return result; +} + +Ydb::Table::ExecuteDataQueryRequest TTableRecords::BuildDeleteQuery(const TString& tablePath) const { + Ydb::Table::ExecuteDataQueryRequest result; + TStringBuilder sb; + sb << "DECLARE $ids AS List<" << BuildColumnsSchemaTuple() << ">;" << Endl; + sb << "DELETE FROM `" + tablePath + "`" << Endl; + sb << "WHERE (" << JoinSeq(", ", GetColumnIds()) << ") IN $ids" << Endl; + Cerr << sb << Endl; + result.mutable_query()->set_yql_text(sb); + (*result.mutable_parameters())["$ids"] = BuildVariableTupleRecords(); + return result; +} + +Ydb::Table::ExecuteDataQueryRequest TTableRecords::BuildUpdateQuery(const TString& tablePath) const { + Ydb::Table::ExecuteDataQueryRequest result; + TStringBuilder sb; + sb << "DECLARE $objects AS List<" << BuildColumnsSchemaStruct() << ">;" << Endl; + sb << "UPDATE `" + tablePath + "` ON" << Endl; + sb << "SELECT " << JoinSeq(",", GetColumnIds()) << " FROM AS_TABLE($objects)" << Endl; + Cerr << sb << Endl; + result.mutable_query()->set_yql_text(sb); + (*result.mutable_parameters())["$objects"] = BuildVariableStructRecords(); + return result; +} + +void TTableRecords::AddColumn(const Ydb::Column& c, const Ydb::Value& v) { + for (auto&& i : Columns) { + Y_VERIFY(i.name() != c.name()); + } + Columns.emplace_back(c); + for (auto&& i : Records) { + *i.add_items() = v; + } +} + +NKikimr::NMetadataManager::TTableRecords TTableRecords::SelectColumns(const std::vector<TString>& columnIds) const { + std::set<TString> columnIdsSet(columnIds.begin(), columnIds.end()); + TTableRecords result; + std::vector<ui32> idxs; + ui32 idx = 0; + for (auto&& i : Columns) { + if (columnIdsSet.contains(i.name())) { + result.Columns.emplace_back(i); + idxs.emplace_back(idx); + } + ++idx; + } + result.Records.reserve(Records.size()); + for (auto&& i : Records) { + result.Records.emplace_back(Ydb::Value()); + for (auto&& idx : idxs) { + *result.Records.back().add_items() = i.items()[idx]; + } + } + return result; +} + +} diff --git a/ydb/services/metadata/manager/table_record.h b/ydb/services/metadata/manager/table_record.h new file mode 100644 index 00000000000..b9a04f8dc34 --- /dev/null +++ b/ydb/services/metadata/manager/table_record.h @@ -0,0 +1,91 @@ +#pragma once +#include <ydb/public/api/protos/ydb_value.pb.h> +#include <ydb/public/api/protos/ydb_table.pb.h> + +#include <ydb/library/accessor/accessor.h> + +namespace NKikimr::NMetadataManager { + +class TTableRecord { +private: + using TValues = std::map<TString, Ydb::Value>; + YDB_READONLY_DEF(TValues, Values); +public: + std::vector<Ydb::Column> SelectOwnedColumns(const std::vector<Ydb::Column>& columns) const; + Ydb::ResultSet BuildRecordSet() const; + ui32 GetColumnsCount() const { + return Values.size(); + } + TTableRecord& SetColumn(const TString& columnId, const Ydb::Value& v); + bool CompareColumns(const TTableRecord& item, const std::vector<TString>& columnIds) const; + bool HasColumns(const std::vector<TString>& columnIds) const; + ui32 CountIntersectColumns(const std::vector<TString>& columnIds) const; + bool SameColumns(const TTableRecord& item) const; + bool TakeValuesFrom(const TTableRecord& item); + const Ydb::Value* GetValuePtr(const TString& columnId) const; +}; + +class TTableRecords { +private: + YDB_READONLY_DEF(std::vector<Ydb::Column>, Columns); + std::vector<Ydb::Value> Records; + + ui32 AddRecordImpl(const TTableRecord& record); + Ydb::TypedValue BuildVariableTupleRecords() const; + Ydb::TypedValue BuildVariableStructRecords() const; + TString BuildColumnsSchemaTuple() const; + TString BuildColumnsSchemaStruct() const; + + std::vector<TString> GetColumnIds() const; + + void PopRecord() { + Y_VERIFY(Records.size()); + Records.pop_back(); + } + +public: + TTableRecords SelectColumns(const std::vector<TString>& columnIds) const; + + Ydb::Table::ExecuteDataQueryRequest BuildInsertQuery(const TString& tablePath) const; + Ydb::Table::ExecuteDataQueryRequest BuildUpsertQuery(const TString& tablePath) const; + Ydb::Table::ExecuteDataQueryRequest BuildSelectQuery(const TString& tablePath) const; + Ydb::Table::ExecuteDataQueryRequest BuildDeleteQuery(const TString& tablePath) const; + Ydb::Table::ExecuteDataQueryRequest BuildUpdateQuery(const TString& tablePath) const; + + void AddColumn(const Ydb::Column& c, const Ydb::Value& v); + + bool empty() const { + return Records.empty(); + } + + void InitColumns(const std::vector<Ydb::Column>& columns) { + Y_VERIFY(Columns.empty()); + Columns = columns; + } + void ReserveRows(const ui32 rowsCount) { + Records.reserve(rowsCount); + } + bool AddRecordAllValues(const TTableRecord& record) { + if (AddRecordImpl(record) != record.GetValues().size()) { + PopRecord(); + return false; + } + return true; + } + bool AddRecordNativeValues(const TTableRecord& record) { + if (AddRecordImpl(record) != Columns.size()) { + PopRecord(); + return false; + } + return true; + } + bool AddRecordSomeValues(const TTableRecord& record) { + if (!AddRecordImpl(record)) { + PopRecord(); + return false; + } + return true; + } +}; + +} diff --git a/ydb/services/metadata/manager/ydb_value_operator.cpp b/ydb/services/metadata/manager/ydb_value_operator.cpp new file mode 100644 index 00000000000..f1a4ebce542 --- /dev/null +++ b/ydb/services/metadata/manager/ydb_value_operator.cpp @@ -0,0 +1,132 @@ +#include "ydb_value_operator.h" + +namespace NKikimr::NMetadataManager { + +bool TYDBValue::IsSameType(const Ydb::Value& v, const Ydb::Type& type) { + Y_VERIFY(type.has_type_id()); + if (type.type_id() == Ydb::Type::BOOL) { + return v.has_bool_value(); + } else if (type.type_id() == Ydb::Type::INT32) { + return v.has_int32_value(); + } else if (type.type_id() == Ydb::Type::UINT32) { + return v.has_uint32_value(); + } else if (type.type_id() == Ydb::Type::INT64) { + return v.has_int64_value(); + } else if (type.type_id() == Ydb::Type::UINT64) { + return v.has_uint64_value(); + } else if (type.type_id() == Ydb::Type::STRING) { + return v.has_bytes_value(); + } else if (type.type_id() == Ydb::Type::UTF8) { + return v.has_text_value(); + } + Y_VERIFY(false); +} + +bool TYDBValue::IsSameType(const Ydb::Value& l, const Ydb::Value& r) { + if (l.has_bool_value()) { + return r.has_bool_value(); + } + if (l.has_bytes_value()) { + return r.has_bytes_value(); + } + if (l.has_text_value()) { + return r.has_text_value(); + } + Y_VERIFY(false); +} + +bool TYDBValue::Compare(const Ydb::Value& l, const Ydb::Value& r) { + if (!IsSameType(l, r)) { + return false; + } + if (l.has_bool_value()) { + return l.bool_value() == r.bool_value(); + } + if (l.has_bytes_value()) { + return l.bytes_value() == r.bytes_value(); + } + if (l.has_text_value()) { + return l.text_value() == r.text_value(); + } + Y_VERIFY(false); +} + +TString TYDBValue::TypeToString(const Ydb::Type& type) { + Y_VERIFY(type.has_type_id()); + if (type.type_id() == Ydb::Type::BOOL) { + return "Bool"; + } else if (type.type_id() == Ydb::Type::INT32) { + return "Int32"; + } else if (type.type_id() == Ydb::Type::UINT32) { + return "Uint32"; + } else if (type.type_id() == Ydb::Type::INT64) { + return "Uint64"; + } else if (type.type_id() == Ydb::Type::UINT64) { + return "Uint64"; + } else if (type.type_id() == Ydb::Type::STRING) { + return "String"; + } else if (type.type_id() == Ydb::Type::UTF8) { + return "Utf8"; + } else { + Y_VERIFY(false); + } +} + +Ydb::Value TYDBValue::NullValue() { + Ydb::Value result; + result.set_null_flag_value(::google::protobuf::NULL_VALUE); + return result; +} + +Ydb::Value TYDBValue::Bytes(const TString& value) { + Ydb::Value result; + result.set_bytes_value(value); + return result; +} + +Ydb::Value TYDBValue::Bytes(const TStringBuf& value) { + Ydb::Value result; + result.set_bytes_value(TString(value.data(), value.size())); + return result; +} + +Ydb::Value TYDBValue::Bytes(const char* value) { + Ydb::Value result; + result.set_bytes_value(TString(value)); + return result; +} + +Ydb::Value TYDBValue::UInt64(const ui64 value) { + Ydb::Value result; + result.set_uint64_value(value); + return result; +} + +Ydb::Value TYDBValue::UInt32(const ui32 value) { + Ydb::Value result; + result.set_uint32_value(value); + return result; +} + +Ydb::Column TYDBColumn::Bytes(const TString& columnId) { + Ydb::Column result; + result.set_name(columnId); + result.mutable_type()->set_type_id(Ydb::Type::STRING); + return result; +} + +Ydb::Column TYDBColumn::UInt64(const TString& columnId) { + Ydb::Column result; + result.set_name(columnId); + result.mutable_type()->set_type_id(Ydb::Type::UINT64); + return result; +} + +Ydb::Column TYDBColumn::UInt32(const TString& columnId) { + Ydb::Column result; + result.set_name(columnId); + result.mutable_type()->set_type_id(Ydb::Type::UINT32); + return result; +} + +} diff --git a/ydb/services/metadata/manager/ydb_value_operator.h b/ydb/services/metadata/manager/ydb_value_operator.h new file mode 100644 index 00000000000..0cdcd39c665 --- /dev/null +++ b/ydb/services/metadata/manager/ydb_value_operator.h @@ -0,0 +1,28 @@ +#pragma once +#include <ydb/public/api/protos/ydb_table.pb.h> +#include <ydb/public/api/protos/ydb_value.pb.h> + +namespace NKikimr::NMetadataManager { + +class TYDBColumn { +public: + static Ydb::Column Bytes(const TString& columnId); + static Ydb::Column UInt64(const TString& columnId); + static Ydb::Column UInt32(const TString& columnId); +}; + +class TYDBValue { +public: + static bool IsSameType(const Ydb::Value& l, const Ydb::Type& type); + static bool IsSameType(const Ydb::Value& l, const Ydb::Value& r); + static bool Compare(const Ydb::Value& l, const Ydb::Value& r); + static TString TypeToString(const Ydb::Type& type); + static Ydb::Value NullValue(); + static Ydb::Value Bytes(const char* value); + static Ydb::Value Bytes(const TString& value); + static Ydb::Value Bytes(const TStringBuf& value); + static Ydb::Value UInt64(const ui64 value); + static Ydb::Value UInt32(const ui32 value); +}; + +} diff --git a/ydb/services/metadata/request/request_actor.h b/ydb/services/metadata/request/request_actor.h index 2da9ea8aa93..fee842d47b5 100644 --- a/ydb/services/metadata/request/request_actor.h +++ b/ydb/services/metadata/request/request_actor.h @@ -89,7 +89,7 @@ private: TRequest ProtoRequest; const NActors::TActorId ActorFinishId; const NActors::TActorId ActorRestartId; - const TConfig& Config; + const TConfig Config; ui32 Retry = 0; protected: class TEvRequestInternalResult: public NActors::TEventLocal<TEvRequestInternalResult, TDialogPolicy::EvResultInternal> { @@ -157,8 +157,14 @@ public: : ProtoRequest(request) , ActorFinishId(actorFinishId) , ActorRestartId(actorRestartId) - , Config(config) - { + , Config(config) { + + } + + TYDBRequest(const TRequest& request, const NActors::TActorId actorCallbackId) + : ProtoRequest(request) + , ActorFinishId(actorCallbackId) + , ActorRestartId(actorCallbackId) { } }; @@ -167,7 +173,6 @@ template <class TDialogPolicy> class TSessionedActorImpl: public NActors::TActorBootstrapped<TSessionedActorImpl<TDialogPolicy>> { private: ui32 Retry = 0; - const TActorId FinishedActorId; static_assert(!std::is_same<TDialogPolicy, TDialogCreateSession>()); using TBase = NActors::TActorBootstrapped<TSessionedActorImpl<TDialogPolicy>>; @@ -226,6 +231,40 @@ public: } }; +class IQueryOutput { +public: + using TPtr = std::shared_ptr<IQueryOutput>; + virtual ~IQueryOutput() = default; + + virtual void OnReply(const TDialogYQLRequest::TResponse& response) = 0; +}; + +class TYQLQuerySessionedActor: public TSessionedActorImpl<TDialogYQLRequest> { +private: + using TBase = TSessionedActorImpl<TDialogYQLRequest>; + const TString Query; + IQueryOutput::TPtr Output; +protected: + virtual std::optional<TDialogYQLRequest::TRequest> OnSessionId(const TString& sessionId) override { + Ydb::Table::ExecuteDataQueryRequest request; + request.mutable_query()->set_yql_text(Query); + request.set_session_id(sessionId); + request.mutable_tx_control()->mutable_begin_tx()->mutable_snapshot_read_only(); + return request; + } + virtual void OnResult(const TDialogYQLRequest::TResponse& response) override { + Output->OnReply(response); + } +public: + TYQLQuerySessionedActor(const TString& query, const NInternal::NRequest::TConfig& config, IQueryOutput::TPtr output) + : TBase(config) + , Query(query) + , Output(output) + { + + } +}; + using TSessionedActor = TSessionedActorImpl<TDialogYQLRequest>; } diff --git a/ydb/services/metadata/secret/CMakeLists.txt b/ydb/services/metadata/secret/CMakeLists.txt new file mode 100644 index 00000000000..0de0feb0f0d --- /dev/null +++ b/ydb/services/metadata/secret/CMakeLists.txt @@ -0,0 +1,42 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(services-metadata-secret) +target_link_libraries(services-metadata-secret PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core + ydb-core-base + core-grpc_services-local_rpc + core-grpc_services-base + ydb-core-grpc_services + services-metadata-request +) +target_sources(services-metadata-secret PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/secret/secret.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/secret/access.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/secret/snapshot.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/secret/initializer.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/secret/fetcher.cpp +) + +add_global_library_for(services-metadata-secret.global services-metadata-secret) +target_link_libraries(services-metadata-secret.global PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core + ydb-core-base + core-grpc_services-local_rpc + core-grpc_services-base + ydb-core-grpc_services + services-metadata-request +) +target_sources(services-metadata-secret.global PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/secret/manager.cpp +) diff --git a/ydb/services/metadata/secret/access.cpp b/ydb/services/metadata/secret/access.cpp new file mode 100644 index 00000000000..4bd991c29e7 --- /dev/null +++ b/ydb/services/metadata/secret/access.cpp @@ -0,0 +1,87 @@ +#include "access.h" +#include <ydb/core/base/appdata.h> +#include <ydb/services/metadata/manager/ydb_value_operator.h> + +namespace NKikimr::NMetadata::NSecret { + +TString TAccess::GetStorageTablePath() { + return "/" + AppData()->TenantName + "/.metadata/secret_access"; +} + +bool TAccess::DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& rawValue) { + if (!decoder.Read(decoder.GetOwnerUserIdIdx(), OwnerUserId, rawValue)) { + return false; + } + if (!decoder.Read(decoder.GetSecretIdIdx(), SecretId, rawValue)) { + return false; + } + if (!decoder.Read(decoder.GetAccessUserIdIdx(), AccessUserId, rawValue)) { + return false; + } + return true; +} + +NMetadataManager::TTableRecord TAccess::SerializeToRecord() const { + NMetadataManager::TTableRecord result; + result.SetColumn(TDecoder::OwnerUserId, NMetadataManager::TYDBValue::Bytes(OwnerUserId)); + result.SetColumn(TDecoder::SecretId, NMetadataManager::TYDBValue::Bytes(SecretId)); + result.SetColumn(TDecoder::AccessUserId, NMetadataManager::TYDBValue::Bytes(AccessUserId)); + return result; +} + +void TAccess::AlteringPreparation(std::vector<TAccess>&& objects, + NMetadataManager::IAlterPreparationController<TAccess>::TPtr controller, + const NMetadata::IOperationsManager::TModificationContext& context) { + if (!!context.GetUserToken()) { + for (auto&& i : objects) { + if (i.GetOwnerUserId() != context.GetUserToken()->GetUserSID()) { + controller->PreparationProblem("no permissions for modify secret access"); + return; + } + } + } + controller->PreparationFinished(std::move(objects)); +} + +NMetadata::TOperationParsingResult TAccess::BuildPatchFromSettings(const NYql::TObjectSettingsImpl& settings, + const NMetadata::IOperationsManager::TModificationContext& context) +{ + NKikimr::NMetadataManager::TTableRecord result; + TStringBuf sb(settings.GetObjectId().data(), settings.GetObjectId().size()); + TStringBuf l; + TStringBuf r; + if (!sb.TrySplit('/', l, r)) { + return "incorrect objectId format (secretId/accessUserId)"; + } + result.SetColumn(TDecoder::SecretId, NMetadataManager::TYDBValue::Bytes(l)); + result.SetColumn(TDecoder::AccessUserId, NMetadataManager::TYDBValue::Bytes(r)); + if (!context.GetUserToken()) { + auto it = settings.GetFeatures().find(TDecoder::OwnerUserId); + if (it != settings.GetFeatures().end()) { + result.SetColumn(TDecoder::OwnerUserId, NMetadataManager::TYDBValue::Bytes(it->second)); + } else { + return "OwnerUserId not defined"; + } + } else { + result.SetColumn(TDecoder::OwnerUserId, NMetadataManager::TYDBValue::Bytes(context.GetUserToken()->GetUserSID())); + } + return result; +} + +std::vector<Ydb::Column> TAccess::TDecoder::GetColumns() { + return { + NMetadataManager::TYDBColumn::Bytes(OwnerUserId), + NMetadataManager::TYDBColumn::Bytes(SecretId), + NMetadataManager::TYDBColumn::Bytes(AccessUserId) + }; +} + +std::vector<Ydb::Column> TAccess::TDecoder::GetPKColumns() { + return GetColumns(); +} + +std::vector<TString> TAccess::TDecoder::GetPKColumnIds() { + return { OwnerUserId, SecretId, AccessUserId }; +} + +} diff --git a/ydb/services/metadata/secret/access.h b/ydb/services/metadata/secret/access.h new file mode 100644 index 00000000000..2ab5882afb5 --- /dev/null +++ b/ydb/services/metadata/secret/access.h @@ -0,0 +1,55 @@ +#pragma once +#include <ydb/library/accessor/accessor.h> +#include <ydb/core/base/appdata.h> + +#include <ydb/services/metadata/abstract/decoder.h> +#include <ydb/services/metadata/abstract/manager.h> +#include <ydb/services/metadata/manager/object.h> +#include <ydb/services/metadata/manager/preparation_controller.h> + +namespace NKikimr::NMetadata::NSecret { + +class TAccess: public NMetadataManager::TObject<TAccess> { +private: + using TBase = NMetadataManager::TObject<TAccess>; + YDB_ACCESSOR_DEF(TString, OwnerUserId); + YDB_ACCESSOR_DEF(TString, SecretId); + YDB_ACCESSOR_DEF(TString, AccessUserId); +public: + class TDecoder: public NInternal::TDecoderBase { + private: + YDB_ACCESSOR(i32, OwnerUserIdIdx, -1); + YDB_ACCESSOR(i32, SecretIdIdx, -1); + YDB_ACCESSOR(i32, AccessUserIdIdx, -1); + public: + static inline const TString OwnerUserId = "ownerUserId"; + static inline const TString SecretId = "secretId"; + static inline const TString AccessUserId = "accessUserId"; + static std::vector<TString> GetPKColumnIds(); + static std::vector<Ydb::Column> GetPKColumns(); + static std::vector<Ydb::Column> GetColumns(); + + TDecoder(const Ydb::ResultSet& rawData) { + OwnerUserIdIdx = GetFieldIndex(rawData, OwnerUserId); + SecretIdIdx = GetFieldIndex(rawData, SecretId); + AccessUserIdIdx = GetFieldIndex(rawData, AccessUserId); + } + }; + + using TBase::TBase; + + static void AlteringPreparation(std::vector<TAccess>&& objects, + NMetadataManager::IAlterPreparationController<TAccess>::TPtr controller, + const NMetadata::IOperationsManager::TModificationContext& context); + static TString GetStorageTablePath(); + bool DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& rawValue); + NMetadataManager::TTableRecord SerializeToRecord() const; + + static NMetadata::TOperationParsingResult BuildPatchFromSettings(const NYql::TObjectSettingsImpl& settings, + const NMetadata::IOperationsManager::TModificationContext& context); + static TString GetTypeId() { + return "SECRET_ACCESS"; + } +}; + +} diff --git a/ydb/services/metadata/secret/fetcher.cpp b/ydb/services/metadata/secret/fetcher.cpp new file mode 100644 index 00000000000..1c4d3ec3804 --- /dev/null +++ b/ydb/services/metadata/secret/fetcher.cpp @@ -0,0 +1,13 @@ +#include "fetcher.h" +#include "manager.h" + +namespace NKikimr::NMetadata::NSecret { + +std::vector<IOperationsManager::TPtr> TManager::DoGetManagers() const { + return { + std::make_shared<TSecretManager>(), + std::make_shared<TAccessManager>(), + }; +} + +} diff --git a/ydb/services/metadata/secret/fetcher.h b/ydb/services/metadata/secret/fetcher.h new file mode 100644 index 00000000000..66a3903898b --- /dev/null +++ b/ydb/services/metadata/secret/fetcher.h @@ -0,0 +1,15 @@ +#pragma once + +#include "snapshot.h" +#include <ydb/services/metadata/abstract/common.h> +#include <ydb/library/accessor/accessor.h> + +namespace NKikimr::NMetadata::NSecret { + +class TManager: public NMetadataProvider::TSnapshotsManager<TSnapshot> { +protected: + virtual std::vector<IOperationsManager::TPtr> DoGetManagers() const override; +public: +}; + +} diff --git a/ydb/services/metadata/secret/initializer.cpp b/ydb/services/metadata/secret/initializer.cpp new file mode 100644 index 00000000000..d92c03727ec --- /dev/null +++ b/ydb/services/metadata/secret/initializer.cpp @@ -0,0 +1,68 @@ +#include "initializer.h" +#include "secret.h" +#include "access.h" + +namespace NKikimr::NMetadata::NSecret { + +void TSecretInitializer::DoPrepare(NMetadataInitializer::IInitializerInput::TPtr controller) const { + TVector<NMetadataInitializer::ITableModifier::TPtr> result; + { + Ydb::Table::CreateTableRequest request; + request.set_session_id(""); + request.set_path(TSecret::GetStorageTablePath()); + request.add_primary_key(TSecret::TDecoder::OwnerUserId); + request.add_primary_key(TSecret::TDecoder::SecretId); + { + auto& column = *request.add_columns(); + column.set_name(TSecret::TDecoder::OwnerUserId); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *request.add_columns(); + column.set_name(TSecret::TDecoder::SecretId); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *request.add_columns(); + column.set_name(TSecret::TDecoder::Value); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + result.emplace_back(new NMetadataInitializer::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request, "create_secret")); + auto hRequest = TSecret::AddHistoryTableScheme(request); + result.emplace_back(new NMetadataInitializer::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(hRequest, "create_secret_history")); + } + controller->PreparationFinished(result); +} + +void TAccessInitializer::DoPrepare(NMetadataInitializer::IInitializerInput::TPtr controller) const { + TVector<NMetadataInitializer::ITableModifier::TPtr> result; + { + Ydb::Table::CreateTableRequest request; + request.set_session_id(""); + request.set_path(TAccess::GetStorageTablePath()); + request.add_primary_key(TAccess::TDecoder::OwnerUserId); + request.add_primary_key(TAccess::TDecoder::SecretId); + request.add_primary_key(TAccess::TDecoder::AccessUserId); + { + auto& column = *request.add_columns(); + column.set_name(TAccess::TDecoder::OwnerUserId); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *request.add_columns(); + column.set_name(TAccess::TDecoder::SecretId); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *request.add_columns(); + column.set_name(TAccess::TDecoder::AccessUserId); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + result.emplace_back(new NMetadataInitializer::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request, "create_access")); + auto hRequest = TAccess::AddHistoryTableScheme(request); + result.emplace_back(new NMetadataInitializer::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(hRequest, "create_access_history")); + } + controller->PreparationFinished(result); +} + +} diff --git a/ydb/services/metadata/secret/initializer.h b/ydb/services/metadata/secret/initializer.h new file mode 100644 index 00000000000..f988388214e --- /dev/null +++ b/ydb/services/metadata/secret/initializer.h @@ -0,0 +1,20 @@ +#pragma once + +#include <ydb/services/metadata/abstract/common.h> +#include <ydb/library/accessor/accessor.h> + +namespace NKikimr::NMetadata::NSecret { + +class TSecretInitializer: public IInitializationBehaviour { +protected: + virtual void DoPrepare(NMetadataInitializer::IInitializerInput::TPtr controller) const override; +public: +}; + +class TAccessInitializer: public IInitializationBehaviour { +protected: + virtual void DoPrepare(NMetadataInitializer::IInitializerInput::TPtr controller) const override; +public: +}; + +} diff --git a/ydb/services/metadata/secret/manager.cpp b/ydb/services/metadata/secret/manager.cpp new file mode 100644 index 00000000000..177a2ea17db --- /dev/null +++ b/ydb/services/metadata/secret/manager.cpp @@ -0,0 +1,16 @@ +#include "manager.h" +#include "initializer.h" +namespace NKikimr::NMetadata::NSecret { + +TSecretManager::TFactory::TRegistrator<TSecretManager> TSecretManager::Registrator(TSecretManager::GetTypeIdStatic()); +TAccessManager::TFactory::TRegistrator<TAccessManager> TAccessManager::Registrator(TAccessManager::GetTypeIdStatic()); + +IInitializationBehaviour::TPtr TAccessManager::DoGetInitializationBehaviour() const { + return std::make_shared<TAccessInitializer>(); +} + +IInitializationBehaviour::TPtr TSecretManager::DoGetInitializationBehaviour() const { + return std::make_shared<TSecretInitializer>(); +} + +} diff --git a/ydb/services/metadata/secret/manager.h b/ydb/services/metadata/secret/manager.h new file mode 100644 index 00000000000..7b8f76e19b0 --- /dev/null +++ b/ydb/services/metadata/secret/manager.h @@ -0,0 +1,25 @@ +#pragma once +#include "snapshot.h" +#include <ydb/services/metadata/abstract/common.h> +#include <ydb/services/metadata/manager/generic_manager.h> +#include <ydb/library/accessor/accessor.h> + +namespace NKikimr::NMetadata::NSecret { + +class TSecretManager: public TGenericOperationsManager<TSecret> { +private: + static TFactory::TRegistrator<TSecretManager> Registrator; +protected: + virtual IInitializationBehaviour::TPtr DoGetInitializationBehaviour() const override; +public: +}; + +class TAccessManager: public TGenericOperationsManager<TAccess> { +private: + static TFactory::TRegistrator<TAccessManager> Registrator; +protected: + virtual IInitializationBehaviour::TPtr DoGetInitializationBehaviour() const override; +public: +}; + +} diff --git a/ydb/services/metadata/secret/secret.cpp b/ydb/services/metadata/secret/secret.cpp new file mode 100644 index 00000000000..ccd45b426e2 --- /dev/null +++ b/ydb/services/metadata/secret/secret.cpp @@ -0,0 +1,90 @@ +#include "secret.h" +#include <ydb/core/base/appdata.h> +#include <ydb/services/metadata/manager/ydb_value_operator.h> + +namespace NKikimr::NMetadata::NSecret { + +TString TSecret::GetStorageTablePath() { + return "/" + AppData()->TenantName + "/.metadata/secrets"; +} + +bool TSecret::DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& rawValue) { + if (!decoder.Read(decoder.GetOwnerUserIdIdx(), OwnerUserId, rawValue)) { + return false; + } + if (!decoder.Read(decoder.GetSecretIdIdx(), SecretId, rawValue)) { + return false; + } + if (!decoder.Read(decoder.GetValueIdx(), Value, rawValue)) { + return false; + } + return true; +} + +NMetadataManager::TTableRecord TSecret::SerializeToRecord() const { + NMetadataManager::TTableRecord result; + result.SetColumn(TDecoder::OwnerUserId, NMetadataManager::TYDBValue::Bytes(OwnerUserId)); + result.SetColumn(TDecoder::SecretId, NMetadataManager::TYDBValue::Bytes(SecretId)); + result.SetColumn(TDecoder::Value, NMetadataManager::TYDBValue::Bytes(Value)); + return result; +} + +void TSecret::AlteringPreparation(std::vector<TSecret>&& objects, + NMetadataManager::IAlterPreparationController<TSecret>::TPtr controller, + const NMetadata::IOperationsManager::TModificationContext& context) { + if (!!context.GetUserToken()) { + for (auto&& i : objects) { + if (i.GetOwnerUserId() != context.GetUserToken()->GetUserSID()) { + controller->PreparationProblem("no permissions for modify secrets"); + return; + } + } + } + controller->PreparationFinished(std::move(objects)); +} + +NMetadata::TOperationParsingResult TSecret::BuildPatchFromSettings(const NYql::TObjectSettingsImpl& settings, + const NMetadata::IOperationsManager::TModificationContext& context) { + NKikimr::NMetadataManager::TTableRecord result; + if (!context.GetUserToken()) { + auto it = settings.GetFeatures().find(TDecoder::OwnerUserId); + if (it != settings.GetFeatures().end()) { + result.SetColumn(TDecoder::OwnerUserId, NMetadataManager::TYDBValue::Bytes(it->second)); + } else { + return "OwnerUserId not defined"; + } + } else { + result.SetColumn(TDecoder::OwnerUserId, NMetadataManager::TYDBValue::Bytes(context.GetUserToken()->GetUserSID())); + } + { + result.SetColumn(TDecoder::SecretId, NMetadataManager::TYDBValue::Bytes(settings.GetObjectId())); + } + { + auto it = settings.GetFeatures().find(TDecoder::Value); + if (it != settings.GetFeatures().end()) { + result.SetColumn(TDecoder::Value, NMetadataManager::TYDBValue::Bytes(it->second)); + } + } + return result; +} + +std::vector<Ydb::Column> TSecret::TDecoder::GetColumns() { + return { + NMetadataManager::TYDBColumn::Bytes(OwnerUserId), + NMetadataManager::TYDBColumn::Bytes(SecretId), + NMetadataManager::TYDBColumn::Bytes(Value) + }; +} + +std::vector<Ydb::Column> TSecret::TDecoder::GetPKColumns() { + return { + NMetadataManager::TYDBColumn::Bytes(OwnerUserId), + NMetadataManager::TYDBColumn::Bytes(SecretId) + }; +} + +std::vector<TString> TSecret::TDecoder::GetPKColumnIds() { + return { OwnerUserId, SecretId }; +} + +} diff --git a/ydb/services/metadata/secret/secret.h b/ydb/services/metadata/secret/secret.h new file mode 100644 index 00000000000..ca6b653f323 --- /dev/null +++ b/ydb/services/metadata/secret/secret.h @@ -0,0 +1,70 @@ +#pragma once +#include <ydb/library/accessor/accessor.h> +#include <ydb/core/base/appdata.h> + +#include <ydb/services/metadata/abstract/decoder.h> +#include <ydb/services/metadata/abstract/manager.h> +#include <ydb/services/metadata/manager/object.h> +#include <ydb/services/metadata/manager/preparation_controller.h> + +namespace NKikimr::NMetadata::NSecret { + +class TSecretId { +private: + YDB_READONLY_PROTECT_DEF(TString, OwnerUserId); + YDB_READONLY_PROTECT_DEF(TString, SecretId); +public: + TSecretId() = default; + TSecretId(const TString& ownerUserId, const TString& secretId) + : OwnerUserId(ownerUserId) + , SecretId(secretId) + { + } + + bool operator<(const TSecretId& item) const { + return std::tie(OwnerUserId, SecretId) < std::tie(item.OwnerUserId, item.SecretId); + } +}; + +class TSecret: public TSecretId, public NMetadataManager::TObject<TSecret> { +private: + using TBase = TSecretId; + YDB_ACCESSOR_DEF(TString, Value); +public: + class TDecoder: public NInternal::TDecoderBase { + private: + YDB_ACCESSOR(i32, OwnerUserIdIdx, -1); + YDB_ACCESSOR(i32, SecretIdIdx, -1); + YDB_ACCESSOR(i32, ValueIdx, -1); + public: + static inline const TString OwnerUserId = "ownerUserId"; + static inline const TString SecretId = "secretId"; + static inline const TString Value = "value"; + static std::vector<TString> GetPKColumnIds(); + static std::vector<Ydb::Column> GetPKColumns(); + static std::vector<Ydb::Column> GetColumns(); + + TDecoder(const Ydb::ResultSet& rawData) { + OwnerUserIdIdx = GetFieldIndex(rawData, OwnerUserId); + SecretIdIdx = GetFieldIndex(rawData, SecretId); + ValueIdx = GetFieldIndex(rawData, Value); + } + }; + + using TBase::TBase; + + static void AlteringPreparation(std::vector<TSecret>&& objects, + NMetadataManager::IAlterPreparationController<TSecret>::TPtr controller, + const NMetadata::IOperationsManager::TModificationContext& context); + static TString GetStorageTablePath(); + bool DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& rawValue); + NMetadataManager::TTableRecord SerializeToRecord() const; + static NMetadata::TOperationParsingResult BuildPatchFromSettings(const NYql::TObjectSettingsImpl& settings, + const NMetadata::IOperationsManager::TModificationContext& context); + static TString GetTypeId() { + return "SECRET"; + } + +}; + +} diff --git a/ydb/services/metadata/secret/snapshot.cpp b/ydb/services/metadata/secret/snapshot.cpp new file mode 100644 index 00000000000..f97ba3a3bf4 --- /dev/null +++ b/ydb/services/metadata/secret/snapshot.cpp @@ -0,0 +1,47 @@ +#include "snapshot.h" + +namespace NKikimr::NMetadata::NSecret { + +bool TSnapshot::DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawDataResult) { + Y_VERIFY(rawDataResult.result_sets().size() == 2); + { + auto& rawData = rawDataResult.result_sets()[0]; + TSecret::TDecoder decoder(rawData); + for (auto&& r : rawData.rows()) { + TSecret object; + if (!object.DeserializeFromRecord(decoder, r)) { + ALS_ERROR(NKikimrServices::METADATA_SECRET) << "cannot parse secret info for snapshot"; + continue; + } + Secrets.emplace(object, object); + } + } + { + auto& rawData = rawDataResult.result_sets()[1]; + TAccess::TDecoder decoder(rawData); + for (auto&& r : rawData.rows()) { + TAccess object; + if (!object.DeserializeFromRecord(decoder, r)) { + ALS_ERROR(NKikimrServices::METADATA_SECRET) << "cannot parse secret info for snapshot"; + continue; + } + Access.emplace_back(object); + } + } + return true; +} + +TString TSnapshot::DoSerializeToString() const { + TStringBuilder sb; + sb << "SECRETS:"; + for (auto&& i : Secrets) { + sb << i.first.GetOwnerUserId() << ":" << i.first.GetSecretId() << ":" << i.second.GetValue() << ";"; + } + sb << "ACCESS:"; + for (auto&& i : Access) { + sb << i.GetOwnerUserId() << ":" << i.GetSecretId() << ":" << i.GetAccessUserId() << ";"; + } + return sb; +} + +} diff --git a/ydb/services/metadata/secret/snapshot.h b/ydb/services/metadata/secret/snapshot.h new file mode 100644 index 00000000000..c25ea23600f --- /dev/null +++ b/ydb/services/metadata/secret/snapshot.h @@ -0,0 +1,23 @@ +#pragma once +#include "secret.h" +#include "access.h" + +#include <ydb/services/metadata/abstract/common.h> +#include <ydb/library/accessor/accessor.h> + +namespace NKikimr::NMetadata::NSecret { + +class TSnapshot: public NMetadataProvider::ISnapshot { +private: + using TBase = NMetadataProvider::ISnapshot; + using TSecrets = std::map<TSecretId, TSecret>; + YDB_READONLY_DEF(TSecrets, Secrets); + YDB_READONLY_DEF(std::vector<TAccess>, Access); +protected: + virtual bool DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawData) override; + virtual TString DoSerializeToString() const override; +public: + using TBase::TBase; +}; + +} diff --git a/ydb/services/metadata/secret/ut/ut_secret.cpp b/ydb/services/metadata/secret/ut/ut_secret.cpp new file mode 100644 index 00000000000..05895729c54 --- /dev/null +++ b/ydb/services/metadata/secret/ut/ut_secret.cpp @@ -0,0 +1,213 @@ +#include <ydb/core/cms/console/configs_dispatcher.h> +#include <ydb/core/testlib/cs_helper.h> +#include <ydb/core/tx/tiering/external_data.h> +#include <ydb/core/tx/schemeshard/schemeshard.h> +#include <ydb/core/tx/tx_proxy/proxy.h> +#include <ydb/core/wrappers/ut_helpers/s3_mock.h> +#include <ydb/core/wrappers/s3_wrapper.h> +#include <ydb/core/wrappers/fake_storage.h> +#include <ydb/library/accessor/accessor.h> +#include <ydb/public/sdk/cpp/client/ydb_table/table.h> +#include <ydb/services/metadata/abstract/common.h> +#include <ydb/services/metadata/manager/alter.h> +#include <ydb/services/metadata/manager/common.h> +#include <ydb/services/metadata/manager/table_record.h> +#include <ydb/services/metadata/manager/ydb_value_operator.h> +#include <ydb/services/metadata/secret/manager.h> +#include <ydb/services/metadata/secret/fetcher.h> +#include <ydb/services/metadata/secret/snapshot.h> +#include <ydb/services/metadata/service.h> + +#include <library/cpp/actors/core/av_bootstrapped.h> +#include <library/cpp/protobuf/json/proto2json.h> +#include <library/cpp/testing/unittest/registar.h> + +#include <util/system/hostname.h> + +namespace NKikimr { + +using namespace NColumnShard; + +Y_UNIT_TEST_SUITE(Secret) { + + class TJsonChecker { + private: + YDB_ACCESSOR_DEF(TString, Path); + YDB_ACCESSOR_DEF(TString, Expectation); + public: + TJsonChecker(const TString& path, const TString& expectation) + : Path(path) + , Expectation(expectation) + { + + } + bool Check(const NJson::TJsonValue& jsonInfo) const { + auto* jsonPathValue = jsonInfo.GetValueByPath(Path); + if (!jsonPathValue) { + return Expectation == "__NULL"; + } + return jsonPathValue->GetStringRobust() == Expectation; + } + + TString GetDebugString() const { + TStringBuilder sb; + sb << "path=" << Path << ";" + << "expectation=" << Expectation << ";"; + return sb; + } + }; + + class TSecretUserEmulator: public NActors::TActorBootstrapped<TSecretUserEmulator> { + private: + using TBase = NActors::TActorBootstrapped<TSecretUserEmulator>; + std::shared_ptr<NMetadata::NSecret::TManager> Manager = std::make_shared<NMetadata::NSecret::TManager>(); + YDB_READONLY_FLAG(Found, false); + YDB_READONLY(TInstant, Start, Now()); + YDB_ACCESSOR(ui32, ExpectedSecretsCount, 1); + YDB_ACCESSOR(ui32, ExpectedAccessCount, 1); + using TKeyCheckers = TMap<NMetadata::NSecret::TSecretId, TJsonChecker>; + YDB_ACCESSOR_DEF(TKeyCheckers, Checkers); + public: + + void ResetConditions() { + FoundFlag = false; + Checkers.clear(); + } + + STATEFN(StateInit) { + switch (ev->GetTypeRewrite()) { + hFunc(NMetadataProvider::TEvRefreshSubscriberData, Handle); + default: + Y_VERIFY(false); + } + } + + void CheckRuntime(TTestActorRuntime& runtime) { + const auto pred = [this](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event)->TTestActorRuntimeBase::EEventAction { + if (event->HasBuffer() && !event->HasEvent()) { + } else if (!event->GetBase()) { + } else { + auto ptr = dynamic_cast<NMetadataProvider::TEvRefreshSubscriberData*>(event->GetBase()); + if (ptr) { + CheckFound(ptr); + } + } + return TTestActorRuntimeBase::EEventAction::PROCESS; + }; + + runtime.SetObserverFunc(pred); + + for (const TInstant start = Now(); !IsFound() && Now() - start < TDuration::Seconds(10); ) { + runtime.SimulateSleep(TDuration::Seconds(1)); + } + runtime.SetObserverFunc(TTestActorRuntime::DefaultObserverFunc); + Y_VERIFY(IsFound()); + } + + void CheckFound(NMetadataProvider::TEvRefreshSubscriberData* event) { + auto snapshot = event->GetSnapshotAs<NMetadata::NSecret::TSnapshot>(); + Y_VERIFY(!!snapshot); + if (ExpectedSecretsCount) { + if (snapshot->GetSecrets().size() != ExpectedSecretsCount) { + Cerr << "snapshot->GetSecrets().size() incorrect: " << snapshot->SerializeToString() << Endl; + return; + } + } else if (snapshot->GetSecrets().size()) { + Cerr << "snapshot->GetSecrets().size() incorrect (zero expects): " << snapshot->SerializeToString() << Endl; + return; + } + if (ExpectedAccessCount) { + if (snapshot->GetAccess().size() != ExpectedAccessCount) { + Cerr << "snapshot->GetAccess().size() incorrect: " << snapshot->SerializeToString() << Endl; + return; + } + } else if (snapshot->GetAccess().size()) { + Cerr << "snapshot->GetAccess().size() incorrect (zero expects): " << snapshot->SerializeToString() << Endl; + return; + } + FoundFlag = true; + } + + void Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev) { + CheckFound(ev->Get()); + } + + void Bootstrap() { + auto manager = std::make_shared<NMetadata::NSecret::TManager>(); + Become(&TThis::StateInit); + Sender<NMetadataProvider::TEvSubscribeExternal>(manager).SendTo(NMetadataProvider::MakeServiceId(SelfId().NodeId())); + Start = Now(); + } + }; + + Y_UNIT_TEST(Simple) { + TPortManager pm; + + ui32 grpcPort = pm.GetPort(); + ui32 msgbPort = pm.GetPort(); + + Tests::TServerSettings serverSettings(msgbPort); + serverSettings.Port = msgbPort; + serverSettings.GrpcPort = grpcPort; + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableMetadataProvider(true) + .SetEnableOlapSchemaOperations(true); + ; + + Tests::TServer::TPtr server = new Tests::TServer(serverSettings); + server->EnableGRpc(grpcPort); + // server->SetupDefaultProfiles(); + + Tests::TClient client(serverSettings); + + auto& runtime = *server->GetRuntime(); + + auto sender = runtime.AllocateEdgeActor(); + server->SetupRootStoragePools(sender); + + TSecretUserEmulator* emulator = new TSecretUserEmulator; + runtime.Register(emulator); + { + runtime.SimulateSleep(TDuration::Seconds(10)); + Cerr << "Initialization finished" << Endl; + + Tests::NCS::THelper lHelper(*server); + lHelper.StartSchemaRequest("CREATE OBJECT secret1 (TYPE SECRET WITH value = `100`, ownerUserId = `root@root`)"); + + emulator->SetExpectedSecretsCount(1).SetExpectedAccessCount(0); + { + const TInstant start = Now(); + while (!emulator->IsFound() && Now() - start < TDuration::Seconds(20)) { + runtime.SimulateSleep(TDuration::Seconds(1)); + } + Y_VERIFY(emulator->IsFound()); + } + + lHelper.StartSchemaRequest("ALTER OBJECT secret1 (TYPE SECRET SET value = `abcde`, ownerUserId = `root@root`)"); + lHelper.StartSchemaRequest("CREATE OBJECT `secret1/test@test1` (TYPE SECRET_ACCESS WITH ownerUserId = `root@root`)"); + + emulator->SetExpectedSecretsCount(1).SetExpectedAccessCount(1); + { + const TInstant start = Now(); + while (!emulator->IsFound() && Now() - start < TDuration::Seconds(20)) { + runtime.SimulateSleep(TDuration::Seconds(1)); + } + Y_VERIFY(emulator->IsFound()); + } + + lHelper.StartSchemaRequest("DROP OBJECT `secret1/test@test1` (TYPE SECRET_ACCESS WITH ownerUserId = `root@root`)"); + lHelper.StartSchemaRequest("DROP OBJECT `secret1` (TYPE SECRET WITH ownerUserId = `root@root`)"); + + emulator->SetExpectedSecretsCount(0).SetExpectedAccessCount(0); + { + const TInstant start = Now(); + while (!emulator->IsFound() && Now() - start < TDuration::Seconds(20)) { + runtime.SimulateSleep(TDuration::Seconds(1)); + } + Y_VERIFY(emulator->IsFound()); + } + } + } +} +} diff --git a/ydb/services/metadata/service.cpp b/ydb/services/metadata/service.cpp index 70714c1c43b..059b3fd51e8 100644 --- a/ydb/services/metadata/service.cpp +++ b/ydb/services/metadata/service.cpp @@ -6,4 +6,12 @@ NActors::TActorId MakeServiceId(const ui32 nodeId) { return NActors::TActorId(nodeId, "SrvcMetaData"); } +void TServiceOperator::Register() { + Singleton<TServiceOperator>()->EnabledFlag = true; +} + +bool TServiceOperator::IsEnabled() { + return Singleton<TServiceOperator>()->EnabledFlag; +} + } diff --git a/ydb/services/metadata/service.h b/ydb/services/metadata/service.h index d5761869748..667b9e25f23 100644 --- a/ydb/services/metadata/service.h +++ b/ydb/services/metadata/service.h @@ -1,30 +1,50 @@ #pragma once #include <ydb/services/metadata/abstract/common.h> +#include <ydb/services/metadata/abstract/manager.h> #include <library/cpp/actors/core/event_local.h> namespace NKikimr::NMetadataProvider { +class TEvAlterObjects: public NActors::TEventLocal<TEvAlterObjects, EEvSubscribe::EvAlterObjects> { +private: + YDB_READONLY_DEF(NMetadata::IAlterCommand::TPtr, Command); +public: + TEvAlterObjects(NMetadata::IAlterCommand::TPtr command) + : Command(command) { + + } +}; + class TEvSubscribeExternal: public NActors::TEventLocal<TEvSubscribeExternal, EEvSubscribe::EvSubscribeExternal> { private: - YDB_READONLY_DEF(ISnapshotParser::TPtr, SnapshotParser); + YDB_READONLY_DEF(ISnapshotParser::TPtr, Fetcher); public: - TEvSubscribeExternal(ISnapshotParser::TPtr parser) - : SnapshotParser(parser) + TEvSubscribeExternal(ISnapshotParser::TPtr fetcher) + : Fetcher(fetcher) { - Y_VERIFY(!!SnapshotParser); + Y_VERIFY(!!Fetcher); } }; class TEvUnsubscribeExternal: public NActors::TEventLocal<TEvUnsubscribeExternal, EEvSubscribe::EvUnsubscribeExternal> { private: - YDB_READONLY_DEF(ISnapshotParser::TPtr, SnapshotParser); + YDB_READONLY_DEF(ISnapshotParser::TPtr, Fetcher); public: - TEvUnsubscribeExternal(ISnapshotParser::TPtr parser) - : SnapshotParser(parser) { - Y_VERIFY(!!SnapshotParser); + TEvUnsubscribeExternal(ISnapshotParser::TPtr fetcher) + : Fetcher(fetcher) { + Y_VERIFY(!!Fetcher); } }; NActors::TActorId MakeServiceId(const ui32 node); +class TServiceOperator { +private: + friend class TService; + bool EnabledFlag = false; + static void Register(); +public: + static bool IsEnabled(); +}; + } |