diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-11-09 15:31:05 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-11-09 15:31:05 +0300 |
commit | 6735fd34dc275e7c3a2183764ce285ba247df821 (patch) | |
tree | b3f792b9b651578eec6ddc221f911d507cf51340 | |
parent | ce105bf8bc2d9c7a79cbeed15f61c1fe5b98a3db (diff) | |
download | ydb-6735fd34dc275e7c3a2183764ce285ba247df821.tar.gz |
separate initializer and use async mode for prepare table modifications
47 files changed, 802 insertions, 482 deletions
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h index 1b3782b5d4..4f4ec8e3f5 100644 --- a/ydb/core/base/events.h +++ b/ydb/core/base/events.h @@ -151,7 +151,8 @@ struct TKikimrEvents : TEvents { ES_METADATA_PROVIDER, ES_INTERNAL_REQUEST, ES_BACKGROUND_TASKS, - ES_TIERING + ES_TIERING, + ES_METADATA_INITIALIZER }; }; diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index bdfeab0a08..f1f1f5b218 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -892,6 +892,7 @@ message TS3Settings { message TTaskCleaner { optional uint64 PathId = 1; optional TS3Settings StorageSettings = 2; + optional string OwnerPath = 3; } message TBackupTask { diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto index f8aa3decad..484681dd89 100644 --- a/ydb/core/protos/services.proto +++ b/ydb/core/protos/services.proto @@ -337,6 +337,7 @@ enum EServiceKikimr { // Background tasks BG_TASKS = 1700; + METADATA_INITIALIZER = 1800; }; message TActivity { @@ -928,5 +929,8 @@ message TActivity { TEST_SHARD_VALIDATION_ACTOR = 583; TX_TIERING_TIER_CLEANER = 584; TX_TIERING_PATH_CLEANER = 585; + METADATA_INITIALIZER = 586; + TIERING_OWNER_ENRICH = 587; + TIERING_SNAPSHOT_ENRICH = 588; }; }; diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp index e6c88ef4cd..22ec60a381 100644 --- a/ydb/core/tx/columnshard/columnshard__write_index.cpp +++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp @@ -137,7 +137,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) } // Mark exported blobs - auto& tManager = Self->GetTierManagerVerified(tierName); + auto& tManager = Self->GetTierManagerVerified(NTiers::TGlobalTierId(Self->OwnerPath, tierName)); if (tManager.NeedExport()) { for (auto& rec : portionInfo.Records) { auto& blobId = rec.BlobRange.BlobId; diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 0cbfb6079e..f702f8b383 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -998,14 +998,14 @@ void TColumnShard::MapExternBlobs(const TActorContext& /*ctx*/, NOlap::TReadMeta void TColumnShard::ExportBlobs(const TActorContext& ctx, ui64 exportNo, const TString& tierName, TEvPrivate::TEvExport::TBlobDataMap&& blobsInfo) const { - if (auto s3 = GetS3ActorForTier(tierName)) { + if (auto s3 = GetS3ActorForTier(NTiers::TGlobalTierId(OwnerPath, tierName))) { auto event = std::make_unique<TEvPrivate::TEvExport>(exportNo, tierName, s3, std::move(blobsInfo)); ctx.Register(CreateExportActor(TabletID(), ctx.SelfID, event.release())); } } void TColumnShard::ForgetBlobs(const TActorContext& ctx, const TString& tierName, std::vector<NOlap::TEvictedBlob>&& blobs) const { - if (auto s3 = GetS3ActorForTier(tierName)) { + if (auto s3 = GetS3ActorForTier(NTiers::TGlobalTierId(OwnerPath, tierName))) { auto forget = std::make_unique<TEvPrivate::TEvForget>(); forget->Evicted = std::move(blobs); ctx.Send(s3, forget.release()); @@ -1014,7 +1014,7 @@ void TColumnShard::ForgetBlobs(const TActorContext& ctx, const TString& tierName bool TColumnShard::GetExportedBlob(const TActorContext& ctx, TActorId dst, ui64 cookie, const TString& tierName, NOlap::TEvictedBlob&& evicted, std::vector<NOlap::TBlobRange>&& ranges) { - if (auto s3 = GetS3ActorForTier(tierName)) { + if (auto s3 = GetS3ActorForTier(NTiers::TGlobalTierId(OwnerPath, tierName))) { auto get = std::make_unique<TEvPrivate::TEvGetExported>(); get->DstActor = dst; get->DstCookie = cookie; @@ -1036,17 +1036,15 @@ void TColumnShard::Die(const TActorContext& ctx) { return IActor::Die(ctx); } -TActorId TColumnShard::GetS3ActorForTier(const TString& tierName) const { +TActorId TColumnShard::GetS3ActorForTier(const NTiers::TGlobalTierId& tierId) const { if (!Tiers) { return {}; } - return Tiers->GetStorageActorId(tierName); + return Tiers->GetStorageActorId(tierId); } void TColumnShard::Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev) { - if (!Tiers) { - Tiers = std::make_shared<TTiersManager>(TabletID(), SelfId(), OwnerPath); - } + Y_VERIFY(Tiers); Tiers->TakeConfigs(ev->Get()->GetSnapshot()); if (EnableTiering) { Tiers->Start(Tiers); diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 1595cbda2d..6929177f5a 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -139,9 +139,9 @@ class TColumnShard Y_UNUSED(ctx); } - const NTiers::TManager& GetTierManagerVerified(const TString& tierName) const { + const NTiers::TManager& GetTierManagerVerified(const NTiers::TGlobalTierId& tierId) const { Y_VERIFY(!!Tiers); - return Tiers->GetManagerVerified(tierName); + return Tiers->GetManagerVerified(tierId); } void Die(const TActorContext& ctx) override; @@ -447,7 +447,7 @@ private: NOlap::TIndexInfo ConvertSchema(const NKikimrSchemeOp::TColumnTableSchema& schema); void MapExternBlobs(const TActorContext& ctx, NOlap::TReadMetadata& metadata); - TActorId GetS3ActorForTier(const TString& tierName) const; + TActorId GetS3ActorForTier(const NTiers::TGlobalTierId& tierId) const; void ExportBlobs(const TActorContext& ctx, ui64 exportNo, const TString& tierName, TEvPrivate::TEvExport::TBlobDataMap&& blobsInfo) const; void ForgetBlobs(const TActorContext& ctx, const TString& tierName, std::vector<NOlap::TEvictedBlob>&& blobs) const; diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/columnshard_ut_common.cpp index af407457cb..18f08925e9 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp +++ b/ydb/core/tx/columnshard/columnshard_ut_common.cpp @@ -296,7 +296,7 @@ NMetadataProvider::ISnapshot::TPtr TTestSchema::BuildSnapshot(const TTableSpecia for (auto&& tier : specials.Tiers) { { NColumnShard::NTiers::TTierConfig tConfig; - tConfig.SetOwnerPath("/Root"); + tConfig.SetOwnerPath("/Root/olapStore"); tConfig.SetTierName(tier.Name); tConfig.MutableProtoConfig().SetName(tier.Name); auto& cProto = tConfig.MutableProtoConfig(); @@ -309,11 +309,11 @@ NMetadataProvider::ISnapshot::TPtr TTestSchema::BuildSnapshot(const TTableSpecia if (tier.CompressionLevel) { cProto.MutableCompression()->SetCompressionLevel(*tier.CompressionLevel); } - cs->MutableTierConfigs().emplace(tConfig.GetConfigId(), tConfig); + cs->MutableTierConfigs().emplace(tConfig.GetGlobalTierId(), tConfig); } { NColumnShard::NTiers::TTieringRule tRule; - tRule.SetOwnerPath("/Root"); + tRule.SetOwnerPath("/Root/olapStore"); tRule.SetDurationForEvict(TDuration::Seconds(tier.GetEvictAfterSecondsUnsafe())); tRule.SetTablePath(tablePath).SetTablePathId(tablePathId); tRule.SetTierName(tier.Name); diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h index 3c4829c6c2..f9e6b18c8f 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.h +++ b/ydb/core/tx/columnshard/columnshard_ut_common.h @@ -207,8 +207,8 @@ struct TTestSchema { } static TString CreateTableTxBody(ui64 pathId, const TVector<std::pair<TString, TTypeInfo>>& columns, - const TVector<std::pair<TString, TTypeInfo>>& pk, - const TTableSpecials& specials = {}) { + const TVector<std::pair<TString, TTypeInfo>>& pk, + const TTableSpecials& specials = {}) { NKikimrTxColumnShard::TSchemaTxBody tx; auto* table = tx.MutableEnsureTables()->AddTables(); table->SetPathId(pathId); @@ -231,6 +231,24 @@ struct TTestSchema { return out; } + static TString CreateInitShardTxBody(ui64 pathId, const TVector<std::pair<TString, TTypeInfo>>& columns, + const TVector<std::pair<TString, TTypeInfo>>& pk, + const TTableSpecials& specials = {}, const TString& ownerPath = "/Root/olap") { + NKikimrTxColumnShard::TSchemaTxBody tx; + auto* table = tx.MutableInitShard()->AddTables(); + tx.MutableInitShard()->SetOwnerPath(ownerPath); + table->SetPathId(pathId); + + InitSchema(columns, pk, specials, table->MutableSchema()); + if (specials.HasTtl()) { + InitTtl(specials, table->MutableTtlSettings()); + } + + TString out; + Y_PROTOBUF_SUPPRESS_NODISCARD tx.SerializeToString(&out); + return out; + } + static TString CreateStandaloneTableTxBody(ui64 pathId, const TVector<std::pair<TString, TTypeInfo>>& columns, const TVector<std::pair<TString, TTypeInfo>>& pk, const TTableSpecials& specials = {}) { diff --git a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp index 7dd77b12ed..e9d2db69bb 100644 --- a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp @@ -161,11 +161,13 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, spec.SetEvictAfterSeconds(ttlSec); } bool ok = ProposeSchemaTx(runtime, sender, - TTestSchema::CreateTableTxBody(tableId, testYdbSchema, testYdbPk, spec), + TTestSchema::CreateInitShardTxBody(tableId, testYdbSchema, testYdbPk, spec, "/Root/olapStore"), {++planStep, ++txId}); UNIT_ASSERT(ok); PlanSchemaTx(runtime, sender, {planStep, txId}); - ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(spec, "test", tableId)); + if (spec.HasTiers()) { + ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(spec, "test", tableId)); + } // ui32 portionSize = 80 * 1000; @@ -228,7 +230,9 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, {++planStep, ++txId}); UNIT_ASSERT(ok); PlanSchemaTx(runtime, sender, {planStep, txId}); - ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(spec, "test", tableId)); + if (spec.HasTiers()) { + ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(spec, "test", tableId)); + } if (internal) { TriggerTTL(runtime, sender, {++planStep, ++txId}, {}, 0, spec.GetTtlColumn()); @@ -259,7 +263,9 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, TTestSchema::AlterTableTxBody(tableId, 3, TTestSchema::TTableSpecials()), {++planStep, ++txId}); UNIT_ASSERT(ok); - ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(TTestSchema::TTableSpecials(), "test", tableId)); + if (spec.HasTiers()) { + ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(TTestSchema::TTableSpecials(), "test", tableId)); + } PlanSchemaTx(runtime, sender, {planStep, txId}); UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, blobs[0])); @@ -427,7 +433,7 @@ TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTe UNIT_ASSERT(specs.size() > 0); { const bool ok = ProposeSchemaTx(runtime, sender, - TTestSchema::CreateTableTxBody(tableId, testYdbSchema, testYdbPk, specs[0]), + TTestSchema::CreateInitShardTxBody(tableId, testYdbSchema, testYdbPk, specs[0], "/Root/olapStore"), { ++planStep, ++txId }); UNIT_ASSERT(ok); } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp index 46cde5886d..7c825b98e1 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp @@ -314,7 +314,11 @@ public: << ", at schemeshard: " << ssId); if (NBackgroundTasks::TServiceOperator::IsEnabled()) { - NBackgroundTasks::TTask task(std::make_shared<NColumnShard::NTiers::TTaskCleanerActivity>(txState->TargetPathId.LocalPathId), nullptr); + NSchemeShard::TPath path = NSchemeShard::TPath::Init(txState->TargetPathId, context.SS); + TColumnTableInfo::TPtr tableInfo = context.SS->ColumnTables.at(path.Base()->PathId); + NSchemeShard::TPath ownerPath = tableInfo->IsStandalone() ? path : path.FindOlapStore(); + Y_VERIFY(path.IsResolved()); + NBackgroundTasks::TTask task(std::make_shared<NColumnShard::NTiers::TTaskCleanerActivity>(txState->TargetPathId.LocalPathId, ownerPath.PathString()), nullptr); task.SetId(OperationId.SerializeToString()); context.SS->SelfId().Send(NBackgroundTasks::MakeServiceId(context.SS->SelfId().NodeId()), new NBackgroundTasks::TEvAddTask(std::move(task))); return false; diff --git a/ydb/core/tx/tiering/CMakeLists.txt b/ydb/core/tx/tiering/CMakeLists.txt index f6662658a3..59d6f17042 100644 --- a/ydb/core/tx/tiering/CMakeLists.txt +++ b/ydb/core/tx/tiering/CMakeLists.txt @@ -31,6 +31,9 @@ target_sources(core-tx-tiering PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/external_data.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/tier_config.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/rule.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/owner_enrich.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/snapshot_enrich.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/snapshot.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/s3_actor.cpp ) diff --git a/ydb/core/tx/tiering/cleaner_task.cpp b/ydb/core/tx/tiering/cleaner_task.cpp index a1ab4b2042..4973c31bdc 100644 --- a/ydb/core/tx/tiering/cleaner_task.cpp +++ b/ydb/core/tx/tiering/cleaner_task.cpp @@ -8,11 +8,13 @@ TTaskCleanerActivity::TFactory::TRegistrator<TTaskCleanerActivity> TTaskCleanerA NKikimrSchemeOp::TTaskCleaner TTaskCleanerActivity::DoSerializeToProto() const { NKikimrSchemeOp::TTaskCleaner result; result.SetPathId(PathId); + result.SetOwnerPath(OwnerPath); return result; } bool TTaskCleanerActivity::DoDeserializeFromProto(const NKikimrSchemeOp::TTaskCleaner& protoData) { PathId = protoData.GetPathId(); + OwnerPath = protoData.GetOwnerPath(); return true; } @@ -20,7 +22,7 @@ void TTaskCleanerActivity::DoExecute(NBackgroundTasks::ITaskExecutorController:: const NBackgroundTasks::TTaskStateContainer& /*state*/) { #ifndef KIKIMR_DISABLE_S3_OPS - TActivationContext::AsActorContext().Register(new TPathCleaner(PathId, controller)); + TActivationContext::AsActorContext().Register(new TPathCleaner(PathId, OwnerPath, controller)); #else controller->TaskFinished(); #endif diff --git a/ydb/core/tx/tiering/cleaner_task.h b/ydb/core/tx/tiering/cleaner_task.h index d036fc8aeb..12e11bb822 100644 --- a/ydb/core/tx/tiering/cleaner_task.h +++ b/ydb/core/tx/tiering/cleaner_task.h @@ -8,6 +8,7 @@ namespace NKikimr::NColumnShard::NTiers { class TTaskCleanerActivity: public NBackgroundTasks::IProtoStringSerializable<NKikimrSchemeOp::TTaskCleaner, NBackgroundTasks::ITaskActivity> { private: + YDB_READONLY_DEF(TString, OwnerPath); YDB_READONLY(ui64, PathId, 0); static TFactory::TRegistrator<TTaskCleanerActivity> Registrator; protected: @@ -19,8 +20,9 @@ protected: public: TTaskCleanerActivity() = default; - TTaskCleanerActivity(const ui64 pathId) - : PathId(pathId) + TTaskCleanerActivity(const ui64 pathId, const TString& ownerPath) + : OwnerPath(ownerPath) + , PathId(pathId) { } diff --git a/ydb/core/tx/tiering/common.h b/ydb/core/tx/tiering/common.h index a891f6acf8..536e920a30 100644 --- a/ydb/core/tx/tiering/common.h +++ b/ydb/core/tx/tiering/common.h @@ -1,10 +1,32 @@ #pragma once #include <ydb/core/base/events.h> +#include <ydb/library/accessor/accessor.h> + #include <library/cpp/actors/core/events.h> namespace NKikimr::NColumnShard::NTiers { +class TGlobalTierId { +private: + YDB_ACCESSOR_DEF(TString, OwnerPath); + YDB_ACCESSOR_DEF(TString, TierName); +public: + TGlobalTierId(const TString& ownerPath, const TString& tierName) + : OwnerPath(ownerPath) + , TierName(tierName) { + + } + + bool operator<(const TGlobalTierId& item) const { + return std::tie(OwnerPath, TierName) < std::tie(item.OwnerPath, item.TierName); + } + + TString ToString() const { + return OwnerPath + "." + TierName; + } +}; + enum EEvents { EvTierCleared = EventSpaceBegin(TKikimrEvents::ES_TIERING), EvEnd diff --git a/ydb/core/tx/tiering/external_data.cpp b/ydb/core/tx/tiering/external_data.cpp index 8c508dde91..1d9ef8376d 100644 --- a/ydb/core/tx/tiering/external_data.cpp +++ b/ydb/core/tx/tiering/external_data.cpp @@ -1,4 +1,6 @@ #include "external_data.h" +#include "owner_enrich.h" +#include "snapshot_enrich.h" #include <ydb/core/base/path.h> @@ -9,260 +11,28 @@ namespace NKikimr::NColumnShard::NTiers { -bool TConfigsSnapshot::DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawDataResult) { - Y_VERIFY(rawDataResult.result_sets().size() == 2); - { - auto& rawData = rawDataResult.result_sets()[0]; - TTierConfig::TDecoder decoder(rawData); - for (auto&& r : rawData.rows()) { - TTierConfig config; - if (!config.DeserializeFromRecord(decoder, r)) { - ALS_ERROR(NKikimrServices::TX_TIERING) << "cannot parse tier config from snapshot"; - continue; - } - TierConfigs.emplace(config.GetConfigId(), config); - } +void TSnapshotConstructor::EnrichSnapshotData(ISnapshot::TPtr original, NMetadataProvider::ISnapshotAcceptorController::TPtr controller) const +{ + if (!TablesDecoder) { + TablesDecoder = std::make_shared<TTablesDecoderCache>(); } - { - auto& rawData = rawDataResult.result_sets()[1]; - TTieringRule::TDecoder decoder(rawData); - TVector<TTieringRule> rulesLocal; - rulesLocal.reserve(rawData.rows().size()); - for (auto&& r : rawData.rows()) { - TTieringRule tr; - if (!tr.DeserializeFromRecord(decoder, r)) { - ALS_WARN(NKikimrServices::TX_TIERING) << "cannot parse record for tiering info"; - continue; - } - rulesLocal.emplace_back(std::move(tr)); - } - std::sort(rulesLocal.begin(), rulesLocal.end()); - for (auto&& i : rulesLocal) { - const TString tablePath = i.GetTablePath(); - TableTierings[tablePath].AddRule(std::move(i)); - } - } - return true; -} - -std::optional<TTierConfig> TConfigsSnapshot::GetValue(const TString& key) const { - auto it = TierConfigs.find(key); - if (it == TierConfigs.end()) { - return {}; - } else { - return it->second; - } -} - -void TConfigsSnapshot::RemapTablePathToId(const TString& path, const ui64 pathId) { - auto it = TableTierings.find(path); - Y_VERIFY(it != TableTierings.end()); - it->second.SetTablePathId(pathId); + TActivationContext::AsActorContext().Register(new TActorSnapshotEnrich(original, controller, TablesDecoder)); } -const TTableTiering* TConfigsSnapshot::GetTableTiering(const TString& tablePath) const { - auto it = TableTierings.find(tablePath); - if (it == TableTierings.end()) { - return nullptr; - } else { - return &it->second; - } -} - -std::vector<NKikimr::NColumnShard::NTiers::TTierConfig> TConfigsSnapshot::GetTiersForPathId(const ui64 pathId) const { - std::vector<TTierConfig> result; - std::set<TString> readyIds; - for (auto&& i : TableTierings) { - for (auto&& r : i.second.GetRules()) { - if (r.GetTablePathId() == pathId) { - auto it = TierConfigs.find(r.GetTierId()); - if (it == TierConfigs.end()) { - ALS_ERROR(NKikimrServices::TX_TIERING) << "inconsistency tiering for " << r.GetTierId(); - continue; - } else if (readyIds.emplace(r.GetTierId()).second) { - result.emplace_back(it->second); - } - } - } - } - return result; +TSnapshotConstructor::TSnapshotConstructor(const TString& ownerPath) + : OwnerPath(ownerPath) +{ + Y_VERIFY(!!OwnerPath); + const TString ownerPathNormal = TFsPath(OwnerPath).Fix().GetPath(); + const TString tenantPathNormal = TFsPath(AppData()->TenantName).Fix().GetPath(); + Y_VERIFY(ownerPathNormal.StartsWith(tenantPathNormal)); + TablePath = "/" + AppData()->TenantName + "/.configs/tiering/"; + Tables.emplace_back(TablePath + "tiers"); + Tables.emplace_back(TablePath + "rules"); } -TVector<NMetadataProvider::ITableModifier::TPtr> TSnapshotConstructor::DoGetTableSchema() const { - TVector<NMetadataProvider::ITableModifier::TPtr> result; - { - Ydb::Table::CreateTableRequest request; - request.set_session_id(""); - request.set_path(Tables[0]); - request.add_primary_key("ownerPath"); - request.add_primary_key("tierName"); - { - auto& column = *request.add_columns(); - column.set_name("tierName"); - column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); - } - { - auto& column = *request.add_columns(); - column.set_name("ownerPath"); - column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); - } - { - auto& column = *request.add_columns(); - column.set_name("tierConfig"); - column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); - } - result.emplace_back(new NMetadataProvider::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request)); - } - { - Ydb::Table::CreateTableRequest request; - request.set_session_id(""); - request.set_path(Tables[1]); - request.add_primary_key("ownerPath"); - request.add_primary_key("tierName"); - request.add_primary_key("tablePath"); - { - auto& column = *request.add_columns(); - column.set_name("tierName"); - column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); - } - { - auto& column = *request.add_columns(); - column.set_name("ownerPath"); - column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); - } - { - auto& column = *request.add_columns(); - column.set_name("tablePath"); - column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); - } - { - auto& column = *request.add_columns(); - column.set_name("durationForEvict"); - column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); - } - { - auto& column = *request.add_columns(); - column.set_name("column"); - column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); - } - result.emplace_back(new NMetadataProvider::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request)); - } - for (auto&& t: Tables) { - Ydb::Scheme::ModifyPermissionsRequest request; - request.set_path(t); - request.set_clear_permissions(true); - auto* permission = request.add_actions(); - permission->mutable_grant()->add_permission_names("ydb.tables.modify"); - permission->mutable_grant()->add_permission_names("ydb.tables.read"); - result.emplace_back(new NMetadataProvider::TGenericTableModifier<NInternal::NRequest::TDialogModifyPermissions>(request)); - } - return result; -} - -NThreading::TFuture<NMetadataProvider::ISnapshot::TPtr> TSnapshotConstructor::EnrichSnapshotData(ISnapshot::TPtr original) const { - if (!Actor) { - return NThreading::MakeErrorFuture<NMetadataProvider::ISnapshot::TPtr>( - std::make_exception_ptr(std::runtime_error("actor finished for enrich process"))); - } - auto current = std::dynamic_pointer_cast<TConfigsSnapshot>(original); - Y_VERIFY(current); - auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>(); - request->DatabaseName = NKikimr::CanonizePath(AppData()->TenantName); - for (auto&& i : current->GetTableTierings()) { - auto it = TablesRemapper.find(i.second.GetTablePath()); - if (it != TablesRemapper.end()) { - current->RemapTablePathToId(it->first, it->second); - } else { - auto& entry = request->ResultSet.emplace_back(); - entry.Operation = TNavigate::OpTable; - entry.Path = NKikimr::SplitPath(i.second.GetTablePath()); - } - } - if (request->ResultSet.empty()) { - return NThreading::MakeFuture(original); - } else { - WaitPromise = NThreading::NewPromise<ISnapshot::TPtr>(); - WaitSnapshot = current; - Actor->ProvideEvent(new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release()), MakeSchemeCacheID()); - return WaitPromise.GetFuture(); - } -} - -void TSnapshotConstructorAgent::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { - Owner->ResolveInfo(ev->Get()); -} - -void TSnapshotConstructorAgent::ProvideEvent(IEventBase* event, const TActorId& recipient) { - Send(recipient, event); -} - -TSnapshotConstructorAgent::~TSnapshotConstructorAgent() { - Owner->ActorStopped(); -} - -void TSnapshotConstructor::ResolveInfo(const TEvTxProxySchemeCache::TEvNavigateKeySetResult* info) { - const auto& request = info->Request; - - if (request->ResultSet.empty()) { - WaitPromise.SetException("cannot resolve table path to ids"); - WaitSnapshot = nullptr; - return; - } - - for (auto&& i : request->ResultSet) { - const TString path = "/" + JoinSeq("/", i.Path); - switch (i.Status) { - case TNavigate::EStatus::Ok: - TablesRemapper.emplace(path, i.TableId.PathId.LocalPathId); - break; - case TNavigate::EStatus::RootUnknown: - case TNavigate::EStatus::PathErrorUnknown: - WaitPromise.SetException("path not exists " + path); - break; - case TNavigate::EStatus::LookupError: - case TNavigate::EStatus::RedirectLookupError: - WaitPromise.SetException("RESOLVE_LOOKUP_ERROR" + path); - break; - default: - WaitPromise.SetException("GENERIC_RESOLVE_ERROR" + path); - break; - } - } - if (WaitPromise.HasException()) { - WaitSnapshot = nullptr; - } else { - for (auto&& i : WaitSnapshot->GetTableTierings()) { - auto it = TablesRemapper.find(i.second.GetTablePath()); - if (it != TablesRemapper.end()) { - WaitSnapshot->RemapTablePathToId(it->first, it->second); - } else { - WaitPromise.SetException("undecoded path " + i.second.GetTablePath()); - WaitSnapshot = nullptr; - return; - } - } - WaitPromise.SetValue(WaitSnapshot); - WaitSnapshot = nullptr; - } -} - -TSnapshotConstructor::TSnapshotConstructor() { - TablePath = "/" + AppData()->TenantName + "/.external_data"; - Tables.emplace_back(TablePath + "/tiers"); - Tables.emplace_back(TablePath + "/tiering"); -} - -TString NTiers::TConfigsSnapshot::DoSerializeToString() const { - NJson::TJsonValue result = NJson::JSON_MAP; - auto& jsonTiers = result.InsertValue("tiers", NJson::JSON_MAP); - for (auto&& i : TierConfigs) { - jsonTiers.InsertValue(i.first, i.second.GetDebugJson()); - } - auto& jsonTiering = result.InsertValue("tiering", NJson::JSON_MAP); - for (auto&& i : TableTierings) { - jsonTiering.InsertValue(i.first, i.second.GetDebugJson()); - } - return result.GetStringRobust(); +void TSnapshotConstructor::DoPrepare(NMetadataInitializer::IController::TPtr controller) const { + TActivationContext::AsActorContext().Register(new TActorOwnerEnrich(OwnerPath, controller, Tables)); } } diff --git a/ydb/core/tx/tiering/external_data.h b/ydb/core/tx/tiering/external_data.h index 3a01c87c48..a04e3caf5d 100644 --- a/ydb/core/tx/tiering/external_data.h +++ b/ydb/core/tx/tiering/external_data.h @@ -1,6 +1,6 @@ #pragma once -#include "rule.h" -#include "tier_config.h" +#include "snapshot.h" +#include "snapshot_enrich.h" #include <ydb/core/protos/flat_scheme_op.pb.h> #include <ydb/core/tx/scheme_cache/scheme_cache.h> @@ -10,97 +10,24 @@ namespace NKikimr::NColumnShard::NTiers { -class TConfigsSnapshot: public NMetadataProvider::ISnapshot { -private: - using TBase = NMetadataProvider::ISnapshot; - using TConfigsMap = TMap<TString, TTierConfig>; - YDB_ACCESSOR_DEF(TConfigsMap, TierConfigs); - using TTieringMap = TMap<TString, TTableTiering>; - YDB_ACCESSOR_DEF(TTieringMap, TableTierings); -protected: - virtual bool DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawData) override; - virtual TString DoSerializeToString() const override; -public: - std::vector<TTierConfig> GetTiersForPathId(const ui64 pathId) const; - const TTableTiering* GetTableTiering(const TString& tablePath) const; - void RemapTablePathToId(const TString& path, const ui64 pathId); - std::optional<TTierConfig> GetValue(const TString& key) const; - using TBase::TBase; -}; - -class TSnapshotConstructor; - -class TSnapshotConstructorAgent: public TActor<TSnapshotConstructorAgent> { -private: - using TBase = TActor<TSnapshotConstructorAgent>; - std::shared_ptr<TSnapshotConstructor> Owner; -private: - void Handle(NActors::TEvents::TEvPoison::TPtr& /*ev*/) { - PassAway(); - } - void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev); -public: - STATEFN(StateMain) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); - hFunc(NActors::TEvents::TEvPoison, Handle); - default: - break; - } - } - - TSnapshotConstructorAgent(std::shared_ptr<TSnapshotConstructor> owner) - : TBase(&TThis::StateMain) - , Owner(owner) - { - - } - - ~TSnapshotConstructorAgent(); - - void ProvideEvent(IEventBase* event, const TActorId& recipient); -}; - class TSnapshotConstructor: public NMetadataProvider::TGenericSnapshotParser<TConfigsSnapshot> { private: using TNavigate = NSchemeCache::TSchemeCacheNavigate; using TBaseActor = TActor<TSnapshotConstructor>; using ISnapshot = NMetadataProvider::ISnapshot; TVector<TString> Tables; - TMap<TString, ui64> TablesRemapper; - TSnapshotConstructorAgent* Actor = nullptr; TString TablePath; - mutable std::shared_ptr<TConfigsSnapshot> WaitSnapshot; - mutable NThreading::TPromise<NMetadataProvider::ISnapshot::TPtr> WaitPromise; + const TString OwnerPath; + mutable std::shared_ptr<TTablesDecoderCache> TablesDecoder; protected: - virtual TVector<NMetadataProvider::ITableModifier::TPtr> DoGetTableSchema() const override; virtual const TVector<TString>& DoGetTables() const override { return Tables; } + virtual void DoPrepare(NMetadataInitializer::IController::TPtr controller) const override; public: - void ResolveInfo(const TEvTxProxySchemeCache::TEvNavigateKeySetResult* info); - virtual NThreading::TFuture<ISnapshot::TPtr> EnrichSnapshotData(ISnapshot::TPtr original) const override; - - TSnapshotConstructor(); + virtual void EnrichSnapshotData(ISnapshot::TPtr original, NMetadataProvider::ISnapshotAcceptorController::TPtr controller) const override; - void Start(std::shared_ptr<TSnapshotConstructor> ownerPtr) { - Y_VERIFY(!Actor); - Actor = new TSnapshotConstructorAgent(ownerPtr); - TActivationContext::AsActorContext().RegisterWithSameMailbox(Actor); - } - - void ActorStopped() { - Actor = nullptr; - } - - void Stop() { - if (Actor && TlsActivationContext) { - TActivationContext::AsActorContext().Send(Actor->SelfId(), new NActors::TEvents::TEvPoison); - } - } - - ~TSnapshotConstructor() { - } + TSnapshotConstructor(const TString& ownerPath); }; } diff --git a/ydb/core/tx/tiering/manager.cpp b/ydb/core/tx/tiering/manager.cpp index a9cd24a2c7..e015d9c3ba 100644 --- a/ydb/core/tx/tiering/manager.cpp +++ b/ydb/core/tx/tiering/manager.cpp @@ -30,7 +30,7 @@ public: } void Bootstrap() { Become(&TThis::StateMain); - Send(GetExternalDataActorId(), new NMetadataProvider::TEvSubscribeExternal(Owner->GetExternalDataManipulation(), SelfId())); + Send(GetExternalDataActorId(), new NMetadataProvider::TEvSubscribeExternal(Owner->GetExternalDataManipulation())); } void Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev) { auto snapshot = ev->Get()->GetSnapshot(); @@ -133,7 +133,7 @@ void TTiersManager::TakeConfigs(NMetadataProvider::ISnapshot::TPtr snapshotExt) Snapshot = snapshotExt; auto& snapshot = *snapshotPtr; for (auto itSelf = Managers.begin(); itSelf != Managers.end(); ) { - auto it = snapshot.GetTierConfigs().find(OwnerPath + "." + itSelf->first); + auto it = snapshot.GetTierConfigs().find(itSelf->first); if (it == snapshot.GetTierConfigs().end()) { itSelf->second.Stop(); itSelf = Managers.erase(itSelf); @@ -143,29 +143,26 @@ void TTiersManager::TakeConfigs(NMetadataProvider::ISnapshot::TPtr snapshotExt) } } for (auto&& i : snapshot.GetTierConfigs()) { - if (i.second.GetOwnerPath() != OwnerPath && !!OwnerPath) { - continue; - } - if (Managers.contains(i.second.GetTierName())) { + if (Managers.contains(i.second.GetGlobalTierId())) { continue; } NTiers::TManager localManager(TabletId, TabletActorId, i.second); - auto& manager = Managers.emplace(i.second.GetTierName(), std::move(localManager)).first->second; + auto& manager = Managers.emplace(i.second.GetGlobalTierId(), std::move(localManager)).first->second; if (IsActive()) { manager.Start(); } } } -TActorId TTiersManager::GetStorageActorId(const TString& tierName) { - auto it = Managers.find(tierName); +TActorId TTiersManager::GetStorageActorId(const NTiers::TGlobalTierId& tierId) { + auto it = Managers.find(tierId); if (it == Managers.end()) { - ALS_ERROR(NKikimrServices::TX_TIERING) << "No S3 actor for tier '" << tierName << "' at tablet " << TabletId; + ALS_ERROR(NKikimrServices::TX_TIERING) << "No S3 actor for tier '" << tierId.ToString() << "' at tablet " << TabletId; return {}; } auto actorId = it->second.GetStorageActorId(); if (!actorId) { - ALS_ERROR(NKikimrServices::TX_TIERING) << "Not started storage actor for tier '" << tierName << "' at tablet " << TabletId; + ALS_ERROR(NKikimrServices::TX_TIERING) << "Not started storage actor for tier '" << tierId.ToString() << "' at tablet " << TabletId; return {}; } return actorId; @@ -201,7 +198,7 @@ TTiersManager& TTiersManager::Stop() { return *this; } -const NKikimr::NColumnShard::NTiers::TManager& TTiersManager::GetManagerVerified(const TString& tierId) const { +const NTiers::TManager& TTiersManager::GetManagerVerified(const NTiers::TGlobalTierId& tierId) const { auto it = Managers.find(tierId); Y_VERIFY(it != Managers.end()); return it->second; @@ -209,9 +206,7 @@ const NKikimr::NColumnShard::NTiers::TManager& TTiersManager::GetManagerVerified NMetadataProvider::ISnapshotParser::TPtr TTiersManager::GetExternalDataManipulation() const { if (!ExternalDataManipulation) { - ExternalDataManipulation = std::make_shared<NTiers::TSnapshotConstructor>(); - auto edmPtr = std::dynamic_pointer_cast<NTiers::TSnapshotConstructor>(ExternalDataManipulation); - edmPtr->Start(edmPtr); + ExternalDataManipulation = std::make_shared<NTiers::TSnapshotConstructor>(OwnerPath); } return ExternalDataManipulation; } @@ -229,13 +224,6 @@ THashMap<ui64, NKikimr::NOlap::TTiersInfo> TTiersManager::GetTiering() const { return result; } -TTiersManager::~TTiersManager() { - auto cs = std::dynamic_pointer_cast<NTiers::TSnapshotConstructor>(ExternalDataManipulation); - if (!!cs) { - cs->Stop(); - } -} - TActorId TTiersManager::GetActorId() const { if (Actor) { return Actor->SelfId(); diff --git a/ydb/core/tx/tiering/manager.h b/ydb/core/tx/tiering/manager.h index 008682a891..8595ccc7c8 100644 --- a/ydb/core/tx/tiering/manager.h +++ b/ydb/core/tx/tiering/manager.h @@ -33,7 +33,7 @@ public: class TTiersManager { private: class TActor; - using TManagers = TMap<TString, NTiers::TManager>; + using TManagers = TMap<NTiers::TGlobalTierId, NTiers::TManager>; ui64 TabletId = 0; const TActorId TabletActorId; TString OwnerPath; @@ -52,13 +52,12 @@ public: { } TActorId GetActorId() const; - ~TTiersManager(); THashMap<ui64, NOlap::TTiersInfo> GetTiering() const; void TakeConfigs(NMetadataProvider::ISnapshot::TPtr snapshot); TTiersManager& Start(std::shared_ptr<TTiersManager> ownerPtr); TTiersManager& Stop(); - TActorId GetStorageActorId(const TString& tierName); - const NTiers::TManager& GetManagerVerified(const TString& tierId) const; + TActorId GetStorageActorId(const NTiers::TGlobalTierId& tierId); + const NTiers::TManager& GetManagerVerified(const NTiers::TGlobalTierId& tierId) const; NMetadataProvider::ISnapshotParser::TPtr GetExternalDataManipulation() const; TManagers::const_iterator begin() const { diff --git a/ydb/core/tx/tiering/owner_enrich.cpp b/ydb/core/tx/tiering/owner_enrich.cpp new file mode 100644 index 0000000000..f3d2fa611c --- /dev/null +++ b/ydb/core/tx/tiering/owner_enrich.cpp @@ -0,0 +1,111 @@ +#include "owner_enrich.h" +#include "snapshot.h" + +#include <ydb/core/base/path.h> + +#include <util/string/join.h> + +namespace NKikimr::NColumnShard::NTiers { + +TVector<NMetadataInitializer::ITableModifier::TPtr> TActorOwnerEnrich::BuildModifiers(const NSchemeCache::TSchemeCacheNavigate::TEntry& ownerEntry) const { + TVector<NMetadataInitializer::ITableModifier::TPtr> result; + { + Ydb::Table::CreateTableRequest request; + request.set_session_id(""); + request.set_path(TableNames[0]); + request.add_primary_key("ownerPath"); + request.add_primary_key("tierName"); + { + auto& column = *request.add_columns(); + column.set_name("ownerPath"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *request.add_columns(); + column.set_name("tierName"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *request.add_columns(); + column.set_name("tierConfig"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + result.emplace_back(new NMetadataInitializer::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request)); + } + { + Ydb::Table::CreateTableRequest request; + request.set_session_id(""); + request.set_path(TableNames[1]); + request.add_primary_key("ownerPath"); + request.add_primary_key("tablePath"); + request.add_primary_key("tierName"); + { + auto& column = *request.add_columns(); + column.set_name("ownerPath"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *request.add_columns(); + column.set_name("tierName"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *request.add_columns(); + column.set_name("tablePath"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *request.add_columns(); + column.set_name("durationForEvict"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *request.add_columns(); + column.set_name("column"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + result.emplace_back(new NMetadataInitializer::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request)); + } + Y_VERIFY(!!ownerEntry.SecurityObject); + return result; +} + +void TActorOwnerEnrich::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { + auto* info = ev->Get(); + const auto& request = info->Request; + auto g = PassAwayGuard(); + if (request->ResultSet.empty()) { + Controller->PreparationProblem("cannot resolve owner path to ids"); + return; + } + + if (request->ResultSet.size() != 1) { + Controller->PreparationProblem("unexpected result set size: " + ::ToString(request->ResultSet.size())); + return; + } + + for (auto&& i : request->ResultSet) { + const TString path = "/" + JoinSeq("/", i.Path); + switch (i.Status) { + case TNavigate::EStatus::Ok: + Controller->PreparationFinished(BuildModifiers(i)); + return; + default: + Controller->PreparationProblem(::ToString(i.Status) + " for '" + path + "'"); + return; + } + } + Controller->PreparationProblem("unexpected"); +} + +void TActorOwnerEnrich::Bootstrap() { + Become(&TThis::StateMain); + auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>(); + request->DatabaseName = NKikimr::CanonizePath(AppData()->TenantName); + auto& entry = request->ResultSet.emplace_back(); + entry.Operation = TNavigate::OpPath; + entry.Path = NKikimr::SplitPath(OwnerPath); + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release())); +} + +} diff --git a/ydb/core/tx/tiering/owner_enrich.h b/ydb/core/tx/tiering/owner_enrich.h new file mode 100644 index 0000000000..969c5fea28 --- /dev/null +++ b/ydb/core/tx/tiering/owner_enrich.h @@ -0,0 +1,48 @@ +#pragma once +#include "rule.h" +#include "tier_config.h" + +#include <ydb/core/protos/flat_scheme_op.pb.h> +#include <ydb/core/tx/scheme_cache/scheme_cache.h> +#include <ydb/services/metadata/service.h> + +#include <library/cpp/json/writer/json_value.h> + +namespace NKikimr::NColumnShard::NTiers { + +class TActorOwnerEnrich: public TActorBootstrapped<TActorOwnerEnrich> { +private: + using TBase = TActorBootstrapped<TActorOwnerEnrich>; + using TNavigate = NSchemeCache::TSchemeCacheNavigate; + const TString OwnerPath; + NMetadataInitializer::IController::TPtr Controller; + const TVector<TString> TableNames; +private: + TVector<NMetadataInitializer::ITableModifier::TPtr> BuildModifiers(const NSchemeCache::TSchemeCacheNavigate::TEntry& ownerEntry) const; + + void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev); +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::TIERING_OWNER_ENRICH; + } + + STATEFN(StateMain) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); + default: + break; + } + } + + TActorOwnerEnrich(const TString& ownerPath, NMetadataInitializer::IController::TPtr controller, const TVector<TString>& tableNames) + : OwnerPath(ownerPath) + , Controller(controller) + , TableNames(tableNames) + { + + } + + void Bootstrap(); +}; + +} diff --git a/ydb/core/tx/tiering/path_cleaner.cpp b/ydb/core/tx/tiering/path_cleaner.cpp index df2a360be8..70e55ad362 100644 --- a/ydb/core/tx/tiering/path_cleaner.cpp +++ b/ydb/core/tx/tiering/path_cleaner.cpp @@ -20,9 +20,7 @@ void TPathCleaner::Handle(TEvTierCleared::TPtr& ev) { NMetadataProvider::ISnapshotParser::TPtr TPathCleaner::GetTieringSnapshotParser() const { if (!ExternalDataManipulation) { - ExternalDataManipulation = std::make_shared<NTiers::TSnapshotConstructor>(); - auto edmPtr = std::dynamic_pointer_cast<NTiers::TSnapshotConstructor>(ExternalDataManipulation); - edmPtr->Start(edmPtr); + ExternalDataManipulation = std::make_shared<NTiers::TSnapshotConstructor>(OwnerPath); } return ExternalDataManipulation; } @@ -48,8 +46,9 @@ void TPathCleaner::Bootstrap() { Send(NMetadataProvider::MakeServiceId(SelfId().NodeId()), new NMetadataProvider::TEvSubscribeExternal(GetTieringSnapshotParser())); } -TPathCleaner::TPathCleaner(const ui64 pathId, NBackgroundTasks::ITaskExecutorController::TPtr controller) +TPathCleaner::TPathCleaner(const ui64 pathId, const TString& ownerPath, NBackgroundTasks::ITaskExecutorController::TPtr controller) : PathId(pathId) + , OwnerPath(ownerPath) , Controller(controller) { } diff --git a/ydb/core/tx/tiering/path_cleaner.h b/ydb/core/tx/tiering/path_cleaner.h index d938fcb76b..a83cfb479d 100644 --- a/ydb/core/tx/tiering/path_cleaner.h +++ b/ydb/core/tx/tiering/path_cleaner.h @@ -16,6 +16,7 @@ namespace NKikimr::NColumnShard::NTiers { class TPathCleaner: public TActorBootstrapped<TPathCleaner> { private: YDB_READONLY(ui64, PathId, 0); + YDB_READONLY_DEF(TString, OwnerPath); bool Truncated = false; std::set<TString> TiersWait; NBackgroundTasks::ITaskExecutorController::TPtr Controller; @@ -25,7 +26,7 @@ protected: void Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev); void Handle(TEvTierCleared::TPtr& ev); public: - TPathCleaner(const ui64 pathId, NBackgroundTasks::ITaskExecutorController::TPtr controller); + TPathCleaner(const ui64 pathId, const TString& ownerPath, NBackgroundTasks::ITaskExecutorController::TPtr controller); static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::TX_TIERING_PATH_CLEANER; diff --git a/ydb/core/tx/tiering/rule.cpp b/ydb/core/tx/tiering/rule.cpp index 617c665a34..6bcb4fcbfa 100644 --- a/ydb/core/tx/tiering/rule.cpp +++ b/ydb/core/tx/tiering/rule.cpp @@ -3,7 +3,6 @@ namespace NKikimr::NColumnShard::NTiers { NJson::TJsonValue TTieringRule::GetDebugJson() const { NJson::TJsonValue result = NJson::JSON_MAP; - result.InsertValue(TDecoder::OwnerPath, OwnerPath); result.InsertValue(TDecoder::TierName, TierName); result.InsertValue(TDecoder::TablePath, TablePath); result.InsertValue("tablePathId", TablePathId); @@ -15,6 +14,7 @@ NJson::TJsonValue TTieringRule::GetDebugJson() const { NJson::TJsonValue TTableTiering::GetDebugJson() const { NJson::TJsonValue result = NJson::JSON_MAP; result.InsertValue(TTieringRule::TDecoder::TablePath, TablePath); + result.InsertValue("tablePathId", TablePathId); result.InsertValue(TTieringRule::TDecoder::Column, Column); auto&& jsonRules = result.InsertValue("rules", NJson::JSON_ARRAY); for (auto&& i : Rules) { diff --git a/ydb/core/tx/tiering/rule.h b/ydb/core/tx/tiering/rule.h index 1187efc8ff..468ce5ec51 100644 --- a/ydb/core/tx/tiering/rule.h +++ b/ydb/core/tx/tiering/rule.h @@ -1,4 +1,5 @@ #pragma once +#include "common.h" #include "decoder.h" #include <ydb/core/protos/flat_scheme_op.pb.h> @@ -19,14 +20,15 @@ private: YDB_ACCESSOR_DEF(TDuration, DurationForEvict); public: bool operator<(const TTieringRule& item) const { - return std::tie(OwnerPath, TierName, TablePath, Column, DurationForEvict) - < std::tie(item.OwnerPath, item.TierName, item.TablePath, item.Column, item.DurationForEvict); + return std::tie(TablePath, TierName, Column, DurationForEvict) + < std::tie(item.TablePath, item.TierName, item.Column, item.DurationForEvict); } - NJson::TJsonValue GetDebugJson() const; - TString GetTierId() const { - return OwnerPath + "." + TierName; + TGlobalTierId GetGlobalTierId() const { + return TGlobalTierId(OwnerPath, TierName); } + + NJson::TJsonValue GetDebugJson() const; class TDecoder: public NInternal::TDecoderBase { private: YDB_READONLY(i32, OwnerPathIdx, -1); @@ -53,6 +55,7 @@ public: if (!decoder.Read(decoder.GetOwnerPathIdx(), OwnerPath, r)) { return false; } + OwnerPath = TFsPath(OwnerPath).Fix().GetPath(); if (!decoder.Read(decoder.GetTierNameIdx(), TierName, r)) { return false; } diff --git a/ydb/core/tx/tiering/snapshot.cpp b/ydb/core/tx/tiering/snapshot.cpp new file mode 100644 index 0000000000..25a1abf346 --- /dev/null +++ b/ydb/core/tx/tiering/snapshot.cpp @@ -0,0 +1,103 @@ +#include "snapshot.h" + +#include <ydb/core/base/path.h> + +#include <library/cpp/json/writer/json_value.h> +#include <library/cpp/protobuf/json/proto2json.h> + +#include <util/string/join.h> + +namespace NKikimr::NColumnShard::NTiers { + +bool TConfigsSnapshot::DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawDataResult) { + Y_VERIFY(rawDataResult.result_sets().size() == 2); + { + auto& rawData = rawDataResult.result_sets()[0]; + TTierConfig::TDecoder decoder(rawData); + for (auto&& r : rawData.rows()) { + TTierConfig config; + if (!config.DeserializeFromRecord(decoder, r)) { + ALS_ERROR(NKikimrServices::TX_TIERING) << "cannot parse tier config from snapshot"; + continue; + } + TierConfigs.emplace(config.GetGlobalTierId(), config); + } + } + { + auto& rawData = rawDataResult.result_sets()[1]; + TTieringRule::TDecoder decoder(rawData); + TVector<TTieringRule> rulesLocal; + rulesLocal.reserve(rawData.rows().size()); + for (auto&& r : rawData.rows()) { + TTieringRule tr; + if (!tr.DeserializeFromRecord(decoder, r)) { + ALS_WARN(NKikimrServices::TX_TIERING) << "cannot parse record for tiering info"; + continue; + } + rulesLocal.emplace_back(std::move(tr)); + } + std::sort(rulesLocal.begin(), rulesLocal.end()); + for (auto&& i : rulesLocal) { + TableTierings[i.GetTablePath()].AddRule(std::move(i)); + } + } + return true; +} + +std::optional<TTierConfig> TConfigsSnapshot::GetValue(const TGlobalTierId& key) const { + auto it = TierConfigs.find(key); + if (it == TierConfigs.end()) { + return {}; + } else { + return it->second; + } +} + +void TConfigsSnapshot::RemapTablePathToId(const TString& path, const ui64 pathId) { + auto it = TableTierings.find(path); + Y_VERIFY(it != TableTierings.end()); + it->second.SetTablePathId(pathId); +} + +const TTableTiering* TConfigsSnapshot::GetTableTiering(const TString& tablePath) const { + auto it = TableTierings.find(tablePath); + if (it == TableTierings.end()) { + return nullptr; + } else { + return &it->second; + } +} + +std::vector<NKikimr::NColumnShard::NTiers::TTierConfig> TConfigsSnapshot::GetTiersForPathId(const ui64 pathId) const { + std::vector<TTierConfig> result; + std::set<TGlobalTierId> readyIds; + for (auto&& i : TableTierings) { + for (auto&& r : i.second.GetRules()) { + if (r.GetTablePathId() == pathId) { + auto it = TierConfigs.find(r.GetGlobalTierId()); + if (it == TierConfigs.end()) { + ALS_ERROR(NKikimrServices::TX_TIERING) << "inconsistency tiering for " << r.GetGlobalTierId().ToString(); + continue; + } else if (readyIds.emplace(r.GetGlobalTierId()).second) { + result.emplace_back(it->second); + } + } + } + } + return result; +} + +TString NTiers::TConfigsSnapshot::DoSerializeToString() const { + NJson::TJsonValue result = NJson::JSON_MAP; + auto& jsonTiers = result.InsertValue("tiers", NJson::JSON_MAP); + for (auto&& i : TierConfigs) { + jsonTiers.InsertValue(i.first.ToString(), i.second.GetDebugJson()); + } + auto& jsonTiering = result.InsertValue("rules", NJson::JSON_MAP); + for (auto&& i : TableTierings) { + jsonTiering.InsertValue(i.first, i.second.GetDebugJson()); + } + return result.GetStringRobust(); +} + +} diff --git a/ydb/core/tx/tiering/snapshot.h b/ydb/core/tx/tiering/snapshot.h new file mode 100644 index 0000000000..b913dacbb6 --- /dev/null +++ b/ydb/core/tx/tiering/snapshot.h @@ -0,0 +1,31 @@ +#pragma once +#include "rule.h" +#include "tier_config.h" + +#include <ydb/core/protos/flat_scheme_op.pb.h> +#include <ydb/core/tx/scheme_cache/scheme_cache.h> +#include <ydb/services/metadata/service.h> + +#include <library/cpp/json/writer/json_value.h> + +namespace NKikimr::NColumnShard::NTiers { + +class TConfigsSnapshot: public NMetadataProvider::ISnapshot { +private: + using TBase = NMetadataProvider::ISnapshot; + using TConfigsMap = TMap<TGlobalTierId, TTierConfig>; + YDB_ACCESSOR_DEF(TConfigsMap, TierConfigs); + using TTieringMap = TMap<TString, TTableTiering>; + YDB_ACCESSOR_DEF(TTieringMap, TableTierings); +protected: + virtual bool DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawData) override; + virtual TString DoSerializeToString() const override; +public: + std::vector<TTierConfig> GetTiersForPathId(const ui64 pathId) const; + const TTableTiering* GetTableTiering(const TString& tablePath) const; + void RemapTablePathToId(const TString& path, const ui64 pathId); + std::optional<TTierConfig> GetValue(const TGlobalTierId& key) const; + using TBase::TBase; +}; + +} diff --git a/ydb/core/tx/tiering/snapshot_enrich.cpp b/ydb/core/tx/tiering/snapshot_enrich.cpp new file mode 100644 index 0000000000..3f7a0809f0 --- /dev/null +++ b/ydb/core/tx/tiering/snapshot_enrich.cpp @@ -0,0 +1,65 @@ +#include "snapshot_enrich.h" +#include "snapshot.h" + +#include <ydb/core/base/path.h> + +#include <util/string/join.h> + +namespace NKikimr::NColumnShard::NTiers { + +void TActorSnapshotEnrich::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { + auto current = std::dynamic_pointer_cast<TConfigsSnapshot>(Original); + Y_VERIFY(current); + auto g = PassAwayGuard(); + + auto* info = ev->Get(); + const auto& request = info->Request; + + if (request->ResultSet.empty()) { + Controller->EnrichProblem("cannot resolve table path to ids"); + return; + } + + for (auto&& i : request->ResultSet) { + const TString path = "/" + JoinSeq("/", i.Path); + switch (i.Status) { + case TNavigate::EStatus::Ok: + Cache->MutableRemapper().emplace(path, i.TableId.PathId.LocalPathId); + current->RemapTablePathToId(path, i.TableId.PathId.LocalPathId); + break; + default: + Controller->EnrichProblem(::ToString(i.Status) + " for '" + path + "'"); + return; + } + } + for (auto&& i : current->GetTableTierings()) { + Y_VERIFY(i.second.GetTablePathId()); + } + Controller->Enriched(Original); +} + +void TActorSnapshotEnrich::Bootstrap() { + Become(&TThis::StateMain); + auto current = std::dynamic_pointer_cast<TConfigsSnapshot>(Original); + Y_VERIFY(current); + const auto& remapper = Cache->GetRemapper(); + auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>(); + request->DatabaseName = NKikimr::CanonizePath(AppData()->TenantName); + for (auto&& i : current->GetTableTierings()) { + auto it = remapper.find(i.second.GetTablePath()); + if (it != remapper.end()) { + current->RemapTablePathToId(it->first, it->second); + } else { + auto& entry = request->ResultSet.emplace_back(); + entry.Operation = TNavigate::OpTable; + entry.Path = NKikimr::SplitPath(i.second.GetTablePath()); + } + } + if (request->ResultSet.empty()) { + Controller->Enriched(Original); + } else { + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release())); + } +} + +} diff --git a/ydb/core/tx/tiering/snapshot_enrich.h b/ydb/core/tx/tiering/snapshot_enrich.h new file mode 100644 index 0000000000..eca0eb893c --- /dev/null +++ b/ydb/core/tx/tiering/snapshot_enrich.h @@ -0,0 +1,55 @@ +#pragma once +#include "rule.h" +#include "tier_config.h" + +#include <ydb/core/protos/flat_scheme_op.pb.h> +#include <ydb/core/tx/scheme_cache/scheme_cache.h> +#include <ydb/services/metadata/service.h> + +#include <library/cpp/json/writer/json_value.h> + +namespace NKikimr::NColumnShard::NTiers { + +class TTablesDecoderCache { +private: + using TRemapper = TMap<TString, ui64>; + YDB_ACCESSOR_DEF(TRemapper, Remapper); +public: + using TPtr = std::shared_ptr<TTablesDecoderCache>; +}; + +class TActorSnapshotEnrich: public TActorBootstrapped<TActorSnapshotEnrich> { +private: + using TBase = TActorBootstrapped<TActorSnapshotEnrich>; + using TNavigate = NSchemeCache::TSchemeCacheNavigate; + NMetadataProvider::ISnapshot::TPtr Original; + NMetadataProvider::ISnapshotAcceptorController::TPtr Controller; + TTablesDecoderCache::TPtr Cache; +private: + void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev); +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::TIERING_SNAPSHOT_ENRICH; + } + + STATEFN(StateMain) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); + default: + break; + } + } + + TActorSnapshotEnrich(NMetadataProvider::ISnapshot::TPtr original, + NMetadataProvider::ISnapshotAcceptorController::TPtr controller, TTablesDecoderCache::TPtr cache) + : Original(original) + , Controller(controller) + , Cache(cache) + { + + } + + void Bootstrap(); +}; + +} diff --git a/ydb/core/tx/tiering/tier_config.cpp b/ydb/core/tx/tiering/tier_config.cpp index 19e5af26bb..1e4b7085d0 100644 --- a/ydb/core/tx/tiering/tier_config.cpp +++ b/ydb/core/tx/tiering/tier_config.cpp @@ -20,6 +20,7 @@ bool TTierConfig::DeserializeFromRecord(const TDecoder& decoder, const Ydb::Valu if (!decoder.Read(decoder.GetOwnerPathIdx(), OwnerPath, r)) { return false; } + OwnerPath = TFsPath(OwnerPath).Fix().GetPath(); if (!decoder.Read(decoder.GetTierNameIdx(), TierName, r)) { return false; } diff --git a/ydb/core/tx/tiering/tier_config.h b/ydb/core/tx/tiering/tier_config.h index 78175ff980..4a36df4ab0 100644 --- a/ydb/core/tx/tiering/tier_config.h +++ b/ydb/core/tx/tiering/tier_config.h @@ -1,5 +1,7 @@ #pragma once +#include "common.h" #include "decoder.h" + #include <ydb/services/metadata/service.h> #include <ydb/core/protos/flat_scheme_op.pb.h> @@ -41,8 +43,8 @@ public: }; bool DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& r); - TString GetConfigId() const { - return OwnerPath + "." + TierName; + TGlobalTierId GetGlobalTierId() const { + return TGlobalTierId(OwnerPath, TierName); } bool NeedExport() const { diff --git a/ydb/core/tx/tiering/ut/ut_tiers.cpp b/ydb/core/tx/tiering/ut/ut_tiers.cpp index d8183fa43a..87657e20eb 100644 --- a/ydb/core/tx/tiering/ut/ut_tiers.cpp +++ b/ydb/core/tx/tiering/ut/ut_tiers.cpp @@ -73,6 +73,8 @@ public: Y_UNIT_TEST_SUITE(ColumnShardTiers) { const TString ConfigProtoStr = "Name : \"abc\""; + const TString ConfigProtoStr1 = "Name : \"abc1\""; + const TString ConfigProtoStr2 = "Name : \"abc2\""; class TJsonChecker { private: @@ -109,7 +111,7 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { TInstant Start; YDB_READONLY_FLAG(Found, false); YDB_ACCESSOR(ui32, TieringsCount, 1); - using TKeyCheckers = TMap<TString, TJsonChecker>; + using TKeyCheckers = TMap<NTiers::TGlobalTierId, TJsonChecker>; YDB_ACCESSOR_DEF(TKeyCheckers, Checkers); public: STATEFN(StateInit) { @@ -175,7 +177,7 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { NJson::TJsonValue jsonData; NProtobufJson::Proto2Json(value->GetProtoConfig(), jsonData); if (!i.second.Check(jsonData)) { - Cerr << "config value incorrect:" << snapshot->SerializeToString() << ";snapshot_check_path=" << i.first << Endl; + Cerr << "config value incorrect:" << snapshot->SerializeToString() << ";snapshot_check_path=" << i.first.ToString() << Endl; Cerr << "json path incorrect:" << jsonData << ";" << i.second.GetDebugString() << Endl; return; } @@ -189,8 +191,7 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { void Bootstrap() { ProviderId = NMetadataProvider::MakeServiceId(1); - ExternalDataManipulation = std::make_shared<NTiers::TSnapshotConstructor>(); - ExternalDataManipulation->Start(ExternalDataManipulation); + ExternalDataManipulation = std::make_shared<NTiers::TSnapshotConstructor>("/Root/olapStore"); Become(&TThis::StateInit); Sender<NMetadataProvider::TEvSubscribeExternal>(ExternalDataManipulation).SendTo(ProviderId); Start = Now(); @@ -226,17 +227,17 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { lHelper.CreateTestOlapTable(); { TTestCSEmulator* emulator = new TTestCSEmulator; - emulator->MutableCheckers().emplace("/Root/olapStore.tier1", TJsonChecker("Name", "abc")); + emulator->MutableCheckers().emplace(NTiers::TGlobalTierId("/Root/olapStore", "tier1"), TJsonChecker("Name", "abc")); runtime.Register(emulator); runtime.SimulateSleep(TDuration::Seconds(10)); Cerr << "Initialization finished" << Endl; - lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) " + lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/tiers` (ownerPath, tierName, tierConfig) " "VALUES ('/Root/olapStore', 'tier1', '" + ConfigProtoStr + "')"); - lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiering` (ownerPath, tierName, tablePath, column, durationForEvict) " + lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/rules` (ownerPath, tierName, tablePath, column, durationForEvict) " "VALUES ('/Root/olapStore', 'tier1', '/Root/olapStore/olapTable', 'timestamp', '10d')"); const TInstant start = Now(); - while (!emulator->IsFound() && Now() - start < TDuration::Seconds(20)) { + while (!emulator->IsFound() && Now() - start < TDuration::Seconds(2000)) { runtime.SimulateSleep(TDuration::Seconds(1)); } Y_VERIFY(emulator->IsFound()); @@ -275,31 +276,31 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { runtime.SimulateSleep(TDuration::Seconds(10)); Cerr << "Initialization finished" << Endl; - lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) " - "VALUES ('/Root/olapStore', 'tier1', '" + ConfigProtoStr + "')"); - lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiering` (ownerPath, tierName, tablePath, column, durationForEvict) " + lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/tiers` (ownerPath, tierName, tierConfig) " + "VALUES ('/Root/olapStore', 'tier1', '" + ConfigProtoStr1 + "')"); + lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/rules` (ownerPath, tierName, tablePath, column, durationForEvict) " "VALUES ('/Root/olapStore', 'tier1', '/Root/olapStore/olapTable', 'timestamp', '10d')"); { TTestCSEmulator emulator; - emulator.MutableCheckers().emplace("/Root/olapStore.tier1", TJsonChecker("Name", "abc")); + emulator.MutableCheckers().emplace(NTiers::TGlobalTierId("/Root/olapStore", "tier1"), TJsonChecker("Name", "abc1")); emulator.SetTieringsCount(1); emulator.CheckRuntime(runtime); } - lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) " - "VALUES ('/Root/olapStore', 'tier2', '" + ConfigProtoStr + "')"); - lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiering` (ownerPath, tierName, tablePath, column, durationForEvict) " + lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/tiers` (ownerPath, tierName, tierConfig) " + "VALUES ('/Root/olapStore', 'tier2', '" + ConfigProtoStr2 + "')"); + lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/rules` (ownerPath, tierName, tablePath, column, durationForEvict) " "VALUES ('/Root/olapStore', 'tier2', '/Root/olapStore/olapTable', 'timestamp', '20d')"); { TTestCSEmulator emulator; - emulator.MutableCheckers().emplace("/Root/olapStore.tier1", TJsonChecker("Name", "abc")); - emulator.MutableCheckers().emplace("/Root/olapStore.tier2", TJsonChecker("Name", "abc")); + emulator.MutableCheckers().emplace(NTiers::TGlobalTierId("/Root/olapStore", "tier1"), TJsonChecker("Name", "abc1")); + emulator.MutableCheckers().emplace(NTiers::TGlobalTierId("/Root/olapStore", "tier2"), TJsonChecker("Name", "abc2")); emulator.SetTieringsCount(2); emulator.CheckRuntime(runtime); } - lHelper.StartDataRequest("DELETE FROM `/Root/.external_data/tiers`"); - lHelper.StartDataRequest("DELETE FROM `/Root/.external_data/tiering`"); + lHelper.StartDataRequest("DELETE FROM `/Root/.configs/tiering/tiers`"); + lHelper.StartDataRequest("DELETE FROM `/Root/.configs/tiering/rules`"); { TTestCSEmulator emulator; emulator.SetTieringsCount(0); @@ -376,18 +377,18 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { runtime.SimulateSleep(TDuration::Seconds(20)); Cerr << "Initialization finished" << Endl; - lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) " + lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/tiers` (ownerPath, tierName, tierConfig) " "VALUES ('/Root/olapStore', 'fakeTier1', '" + TierConfigProtoStr + "')"); - lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) " + lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/tiers` (ownerPath, tierName, tierConfig) " "VALUES ('/Root/olapStore', 'fakeTier2', '" + TierConfigProtoStr + "')"); - lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiering` (ownerPath, tierName, tablePath, column, durationForEvict) " + lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/rules` (ownerPath, tierName, tablePath, column, durationForEvict) " "VALUES ('/Root/olapStore', 'fakeTier1', '/Root/olapStore/olapTable', 'timestamp', '10d')"); - lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiering` (ownerPath, tierName, tablePath, column, durationForEvict) " + lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/rules` (ownerPath, tierName, tablePath, column, durationForEvict) " "VALUES ('/Root/olapStore', 'fakeTier2', '/Root/olapStore/olapTable', 'timestamp', '20d')"); { TTestCSEmulator emulator; - emulator.MutableCheckers().emplace("/Root/olapStore.fakeTier1", TJsonChecker("Name", "fakeTier")); - emulator.MutableCheckers().emplace("/Root/olapStore.fakeTier2", TJsonChecker("ObjectStorage.Endpoint", TierEndpoint)); + emulator.MutableCheckers().emplace(NTiers::TGlobalTierId("/Root/olapStore", "fakeTier1"), TJsonChecker("Name", "fakeTier")); + emulator.MutableCheckers().emplace(NTiers::TGlobalTierId("/Root/olapStore", "fakeTier2"), TJsonChecker("ObjectStorage.Endpoint", TierEndpoint)); emulator.SetTieringsCount(2); emulator.CheckRuntime(runtime); } diff --git a/ydb/services/bg_tasks/ds_table/executor.cpp b/ydb/services/bg_tasks/ds_table/executor.cpp index 252e8c7952..5cae73625f 100644 --- a/ydb/services/bg_tasks/ds_table/executor.cpp +++ b/ydb/services/bg_tasks/ds_table/executor.cpp @@ -8,9 +8,9 @@ namespace NKikimr::NBackgroundTasks { -TVector<NKikimr::NMetadataProvider::ITableModifier::TPtr> TExecutor::BuildModifiers() const { +TVector<NMetadataInitializer::ITableModifier::TPtr> TExecutor::BuildModifiers() const { const TString tableName = Config.GetTablePath(); - TVector<NMetadataProvider::ITableModifier::TPtr> result; + TVector<NMetadataInitializer::ITableModifier::TPtr> result; { Ydb::Table::CreateTableRequest request; request.set_session_id(""); @@ -66,7 +66,7 @@ TVector<NKikimr::NMetadataProvider::ITableModifier::TPtr> TExecutor::BuildModifi column.set_name("state"); column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); } - result.emplace_back(new NMetadataProvider::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request)); + result.emplace_back(new NMetadataInitializer::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request)); } return result; } diff --git a/ydb/services/bg_tasks/ds_table/executor.h b/ydb/services/bg_tasks/ds_table/executor.h index 4b84cd589e..b15d25f6c9 100644 --- a/ydb/services/bg_tasks/ds_table/executor.h +++ b/ydb/services/bg_tasks/ds_table/executor.h @@ -51,9 +51,9 @@ public: } }; -class TExecutor: public NMetadataProvider::TDSAccessorInitialized { +class TExecutor: public NMetadataInitializer::TDSAccessorInitialized { private: - using TBase = NMetadataProvider::TDSAccessorInitialized; + using TBase = NMetadataInitializer::TDSAccessorInitialized; TString TableName; const TString ExecutorId = TGUID::CreateTimebased().AsUuidString(); const TConfig Config; @@ -62,7 +62,10 @@ private: protected: virtual void RegisterState() override; virtual void OnInitialized() override; - virtual TVector<NMetadataProvider::ITableModifier::TPtr> BuildModifiers() const override; + virtual void Prepare(NMetadataInitializer::IController::TPtr controller) override { + controller->PreparationFinished(BuildModifiers()); + } + TVector<NMetadataInitializer::ITableModifier::TPtr> BuildModifiers() const; void Handle(TEvStartAssign::TPtr& ev); void Handle(TEvAssignFinished::TPtr& ev); diff --git a/ydb/services/metadata/abstract/common.h b/ydb/services/metadata/abstract/common.h index 7da8f03317..259a384bb7 100644 --- a/ydb/services/metadata/abstract/common.h +++ b/ydb/services/metadata/abstract/common.h @@ -15,7 +15,8 @@ namespace NKikimr::NMetadataProvider { enum EEvSubscribe { EvRefreshSubscriberData = EventSpaceBegin(TKikimrEvents::ES_METADATA_PROVIDER), EvRefresh, - EvEnrichSnapshot, + EvEnrichSnapshotResult, + EvEnrichSnapshotProblem, EvSubscribeLocal, EvUnsubscribeLocal, EvSubscribeExternal, @@ -25,6 +26,16 @@ enum EEvSubscribe { static_assert(EEvSubscribe::EvEnd < EventSpaceEnd(TKikimrEvents::ES_METADATA_PROVIDER), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_METADATA_PROVIDER)"); +class ISnapshot; + +class ISnapshotAcceptorController { +public: + using TPtr = std::shared_ptr<ISnapshotAcceptorController>; + virtual ~ISnapshotAcceptorController() = default; + virtual void Enriched(std::shared_ptr<ISnapshot> enrichedSnapshot) = 0; + virtual void EnrichProblem(const TString& errorMessage) = 0; +}; + class ISnapshot { private: YDB_READONLY_DEF(TInstant, Actuality); @@ -52,7 +63,7 @@ public: class ISnapshotParser { protected: virtual ISnapshot::TPtr CreateSnapshot(const TInstant actuality) const = 0; - virtual TVector<ITableModifier::TPtr> DoGetTableSchema() const = 0; + virtual void DoPrepare(NMetadataInitializer::IController::TPtr controller) const = 0; virtual const TVector<TString>& DoGetTables() const = 0; mutable std::optional<TString> SnapshotId; public: @@ -60,12 +71,13 @@ public: TString GetSnapshotId() const; ISnapshot::TPtr ParseSnapshot(const Ydb::Table::ExecuteQueryResult& rawData, const TInstant actuality) const; - TVector<ITableModifier::TPtr> GetTableSchema() const { - return DoGetTableSchema(); + + void Prepare(NMetadataInitializer::IController::TPtr controller) const { + return DoPrepare(controller); } - virtual NThreading::TFuture<ISnapshot::TPtr> EnrichSnapshotData(ISnapshot::TPtr original) const { - return NThreading::MakeFuture(original); + virtual void EnrichSnapshotData(ISnapshot::TPtr original, ISnapshotAcceptorController::TPtr controller) const { + controller->Enriched(original); } const TVector<TString>& GetTables() const { diff --git a/ydb/services/metadata/ds_table/accessor_refresh.cpp b/ydb/services/metadata/ds_table/accessor_refresh.cpp index 195c700d41..e89bd66d28 100644 --- a/ydb/services/metadata/ds_table/accessor_refresh.cpp +++ b/ydb/services/metadata/ds_table/accessor_refresh.cpp @@ -24,16 +24,7 @@ void TDSAccessorRefresher::Handle(TEvRequestResult<TDialogSelect>::TPtr& ev) { if (!!parsedSnapshot && CurrentSelection.SerializeAsString() != startString) { ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "New refresher data: " << CurrentSelection.DebugString(); - NActors::TActorIdentity actorId = SelfId(); - SnapshotConstructor->EnrichSnapshotData(parsedSnapshot).Subscribe( - [actorId](NThreading::TFuture<ISnapshot::TPtr> f) { - if (f.HasValue() && !f.HasException()) { - actorId.Send(actorId, new TEvEnrichSnapshotResult(f.GetValueSync())); - } else { - actorId.Send(actorId, new TEvEnrichSnapshotResult("cannot enrich snapshot")); - } - } - ); + SnapshotConstructor->EnrichSnapshotData(parsedSnapshot, GetController()); } else { Schedule(Config.GetRefreshPeriod(), new TEvRefresh()); } @@ -42,12 +33,14 @@ void TDSAccessorRefresher::Handle(TEvRequestResult<TDialogSelect>::TPtr& ev) { void TDSAccessorRefresher::Handle(TEvEnrichSnapshotResult::TPtr& ev) { RequestedActuality = TInstant::Zero(); Schedule(Config.GetRefreshPeriod(), new TEvRefresh()); - if (ev->Get()->IsSuccess()) { - CurrentSnapshot = ev->Get()->GetEnrichedSnapshot(); - OnSnapshotModified(); - } else { - ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "enrich problem: " << ev->Get()->GetErrorText(); - } + CurrentSnapshot = ev->Get()->GetEnrichedSnapshot(); + OnSnapshotModified(); +} + +void TDSAccessorRefresher::Handle(TEvEnrichSnapshotProblem::TPtr& ev) { + RequestedActuality = TInstant::Zero(); + Schedule(Config.GetRefreshPeriod(), new TEvRefresh()); + ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "enrich problem: " << ev->Get()->GetErrorText(); } void TDSAccessorRefresher::Handle(TEvRequestResult<TDialogCreateSession>::TPtr& ev) { @@ -84,10 +77,14 @@ TDSAccessorRefresher::TDSAccessorRefresher(const TConfig& config, ISnapshotParse , SnapshotConstructor(snapshotConstructor) , Config(config) { + } -TVector<NKikimr::NMetadataProvider::ITableModifier::TPtr> TDSAccessorRefresher::BuildModifiers() const { - return SnapshotConstructor->GetTableSchema(); +ISnapshotAcceptorController::TPtr TDSAccessorRefresher::GetController() const { + if (!ControllerImpl) { + ControllerImpl = std::make_shared<TSnapshotAcceptorController>(SelfId()); + } + return ControllerImpl; } } diff --git a/ydb/services/metadata/ds_table/accessor_refresh.h b/ydb/services/metadata/ds_table/accessor_refresh.h index 39da63b99a..920afdbcfb 100644 --- a/ydb/services/metadata/ds_table/accessor_refresh.h +++ b/ydb/services/metadata/ds_table/accessor_refresh.h @@ -13,49 +13,73 @@ class TEvRefresh: public NActors::TEventLocal<TEvRefresh, EEvSubscribe::EvRefres public: }; -class TEvEnrichSnapshotResult: public NActors::TEventLocal<TEvEnrichSnapshotResult, EEvSubscribe::EvEnrichSnapshot> { +class TEvEnrichSnapshotResult: public NActors::TEventLocal<TEvEnrichSnapshotResult, EEvSubscribe::EvEnrichSnapshotResult> { private: - YDB_READONLY_FLAG(Success, false); - YDB_READONLY_DEF(TString, ErrorText); YDB_READONLY_DEF(ISnapshot::TPtr, EnrichedSnapshot); public: - TEvEnrichSnapshotResult(const TString& errorText) + TEvEnrichSnapshotResult(ISnapshot::TPtr snapshot) + : EnrichedSnapshot(snapshot) { + + } +}; + +class TEvEnrichSnapshotProblem: public NActors::TEventLocal<TEvEnrichSnapshotProblem, EEvSubscribe::EvEnrichSnapshotProblem> { +private: + YDB_READONLY_DEF(TString, ErrorText); +public: + TEvEnrichSnapshotProblem(const TString& errorText) : ErrorText(errorText) { } +}; - TEvEnrichSnapshotResult(ISnapshot::TPtr snapshot) - : SuccessFlag(true) - , EnrichedSnapshot(snapshot) - { +class TSnapshotAcceptorController: public ISnapshotAcceptorController { +private: + const TActorIdentity ActorId; +public: + TSnapshotAcceptorController(const TActorIdentity& actorId) + : ActorId(actorId) { + + } + + virtual void EnrichProblem(const TString& errorMessage) override { + ActorId.Send(ActorId, new TEvEnrichSnapshotProblem(errorMessage)); + } + virtual void Enriched(ISnapshot::TPtr enrichedSnapshot) override { + ActorId.Send(ActorId, new TEvEnrichSnapshotResult(enrichedSnapshot)); } }; -class TDSAccessorRefresher: public TDSAccessorInitialized { +class TDSAccessorRefresher: public NMetadataInitializer::TDSAccessorInitialized { private: - using TBase = TDSAccessorInitialized; + using TBase = NMetadataInitializer::TDSAccessorInitialized; ISnapshotParser::TPtr SnapshotConstructor; YDB_READONLY_DEF(ISnapshot::TPtr, CurrentSnapshot); YDB_READONLY_DEF(Ydb::Table::ExecuteQueryResult, CurrentSelection); TInstant RequestedActuality = TInstant::Zero(); const TConfig Config; + + mutable ISnapshotAcceptorController::TPtr ControllerImpl; + + ISnapshotAcceptorController::TPtr GetController() const; protected: + virtual void Prepare(NMetadataInitializer::IController::TPtr controller) override { + SnapshotConstructor->Prepare(controller); + } bool IsReady() const { return !!CurrentSnapshot; } virtual void OnInitialized() override; virtual void OnSnapshotModified() = 0; - virtual TVector<NMetadataProvider::ITableModifier::TPtr> BuildModifiers() const override; public: - using TBase::Handle; - STFUNC(StateMain) { switch (ev->GetTypeRewrite()) { hFunc(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogSelect>, Handle); hFunc(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogCreateSession>, Handle); hFunc(TEvRefresh, Handle); hFunc(TEvEnrichSnapshotResult, Handle); + hFunc(TEvEnrichSnapshotProblem, Handle); default: TBase::StateMain(ev, ctx); } @@ -64,6 +88,7 @@ public: TDSAccessorRefresher(const TConfig& config, ISnapshotParser::TPtr snapshotConstructor); void Handle(TEvEnrichSnapshotResult::TPtr& ev); + void Handle(TEvEnrichSnapshotProblem::TPtr& ev); void Handle(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogSelect>::TPtr& ev); void Handle(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogCreateSession>::TPtr& ev); void Handle(TEvRefresh::TPtr& ev); diff --git a/ydb/services/metadata/ds_table/service.cpp b/ydb/services/metadata/ds_table/service.cpp index faa76a1587..d75d258450 100644 --- a/ydb/services/metadata/ds_table/service.cpp +++ b/ydb/services/metadata/ds_table/service.cpp @@ -20,11 +20,7 @@ void TService::Handle(TEvSubscribeExternal::TPtr& ev) { THolder<TExternalData> actor = MakeHolder<TExternalData>(Config, ev->Get()->GetSnapshotParser()); it = Accessors.emplace(ev->Get()->GetSnapshotParser()->GetSnapshotId(), Register(actor.Release())).first; } - if (!!ev->Get()->GetSubscriberId()) { - Send<TEvSubscribe>(it->second, ev->Get()->GetSubscriberId()); - } else { - Send<TEvSubscribe>(it->second, ev->Sender); - } + Send<TEvSubscribe>(it->second, ev->Sender); } void TService::Handle(TEvUnsubscribeExternal::TPtr& ev) { diff --git a/ydb/services/metadata/initializer/CMakeLists.txt b/ydb/services/metadata/initializer/CMakeLists.txt index 55e553e783..d3b624ff7c 100644 --- a/ydb/services/metadata/initializer/CMakeLists.txt +++ b/ydb/services/metadata/initializer/CMakeLists.txt @@ -21,4 +21,6 @@ target_link_libraries(services-metadata-initializer PUBLIC target_sources(services-metadata-initializer PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/metadata/initializer/accessor_init.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/initializer/common.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/initializer/controller.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/initializer/events.cpp ) diff --git a/ydb/services/metadata/initializer/accessor_init.cpp b/ydb/services/metadata/initializer/accessor_init.cpp index 7536ff85f7..9f03393e60 100644 --- a/ydb/services/metadata/initializer/accessor_init.cpp +++ b/ydb/services/metadata/initializer/accessor_init.cpp @@ -1,25 +1,41 @@ #include "accessor_init.h" +#include "controller.h" + #include <ydb/core/grpc_services/local_rpc/local_rpc.h> -namespace NKikimr::NMetadataProvider { +namespace NKikimr::NMetadataInitializer { void TDSAccessorInitialized::Bootstrap() { RegisterState(); - auto modifiers = BuildModifiers(); + Controller = std::make_shared<TInitializerController>(SelfId()); + Send(SelfId(), new TEvInitializerPreparationStart); +} + +void TDSAccessorInitialized::Handle(TEvInitializerPreparationStart::TPtr& /*ev*/) { + Prepare(Controller); +} + +void TDSAccessorInitialized::Handle(TEvInitializerPreparationFinished::TPtr& ev) { + auto modifiers = ev->Get()->GetModifiers(); for (auto&& i : modifiers) { Modifiers.emplace_back(i); } if (Modifiers.size()) { - ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "modifiers count: " << Modifiers.size(); + ALS_INFO(NKikimrServices::METADATA_INITIALIZER) << "modifiers count: " << Modifiers.size(); Modifiers.front()->Execute(SelfId(), Config); } else { - ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "initialization finished"; + ALS_INFO(NKikimrServices::METADATA_INITIALIZER) << "initialization finished"; OnInitialized(); } } +void TDSAccessorInitialized::Handle(TEvInitializerPreparationProblem::TPtr& ev) { + ALS_ERROR(NKikimrServices::METADATA_INITIALIZER) << "preparation problems: " << ev->Get()->GetErrorMessage(); + Schedule(TDuration::Seconds(1), new TEvInitializerPreparationStart); +} + void TDSAccessorInitialized::Handle(NInternal::NRequest::TEvRequestFinished::TPtr& /*ev*/) { - ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "modifiers count: " << Modifiers.size(); + ALS_INFO(NKikimrServices::METADATA_INITIALIZER) << "modifiers count: " << Modifiers.size(); if (Modifiers.empty()) { return; } @@ -27,7 +43,7 @@ void TDSAccessorInitialized::Handle(NInternal::NRequest::TEvRequestFinished::TPt if (Modifiers.size()) { Modifiers.front()->Execute(SelfId(), Config); } else { - ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "initialization finished"; + ALS_INFO(NKikimrServices::METADATA_INITIALIZER) << "initialization finished"; OnInitialized(); } } diff --git a/ydb/services/metadata/initializer/accessor_init.h b/ydb/services/metadata/initializer/accessor_init.h index b7086c12fe..c558e05df3 100644 --- a/ydb/services/metadata/initializer/accessor_init.h +++ b/ydb/services/metadata/initializer/accessor_init.h @@ -1,5 +1,6 @@ #pragma once #include "common.h" +#include "events.h" #include <ydb/services/metadata/ds_table/config.h> @@ -8,24 +9,36 @@ #include <library/cpp/threading/future/core/future.h> #include <library/cpp/actors/core/av_bootstrapped.h> -namespace NKikimr::NMetadataProvider { +namespace NKikimr::NMetadataInitializer { class TDSAccessorInitialized: public NActors::TActorBootstrapped<TDSAccessorInitialized> { private: TDeque<ITableModifier::TPtr> Modifiers; const NInternal::NRequest::TConfig Config; + IController::TPtr Controller; + + void Handle(TEvInitializerPreparationStart::TPtr& ev); + void Handle(TEvInitializerPreparationFinished::TPtr& ev); + void Handle(TEvInitializerPreparationProblem::TPtr& ev); + void Handle(NInternal::NRequest::TEvRequestFinished::TPtr& ev); protected: virtual void RegisterState() = 0; virtual void OnInitialized() = 0; - virtual TVector<NMetadataProvider::ITableModifier::TPtr> BuildModifiers() const = 0; + virtual void Prepare(IController::TPtr controller) = 0; public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::METADATA_INITIALIZER; + } + void Bootstrap(); TDSAccessorInitialized(const NInternal::NRequest::TConfig& config); - void Handle(NInternal::NRequest::TEvRequestFinished::TPtr& ev); STATEFN(StateMain) { switch (ev->GetTypeRewrite()) { hFunc(NInternal::NRequest::TEvRequestFinished, Handle); + hFunc(TEvInitializerPreparationStart, Handle); + hFunc(TEvInitializerPreparationFinished, Handle); + hFunc(TEvInitializerPreparationProblem, Handle); default: break; } diff --git a/ydb/services/metadata/initializer/common.cpp b/ydb/services/metadata/initializer/common.cpp index fa5484fdbe..e9b8208654 100644 --- a/ydb/services/metadata/initializer/common.cpp +++ b/ydb/services/metadata/initializer/common.cpp @@ -1,5 +1,5 @@ #include "common.h" -namespace NKikimr::NMetadataProvider { +namespace NKikimr::NMetadataInitializer { } diff --git a/ydb/services/metadata/initializer/common.h b/ydb/services/metadata/initializer/common.h index f9fac85278..40e8e79ee8 100644 --- a/ydb/services/metadata/initializer/common.h +++ b/ydb/services/metadata/initializer/common.h @@ -2,7 +2,7 @@ #include <ydb/services/metadata/request/config.h> #include <ydb/services/metadata/request/request_actor.h> -namespace NKikimr::NMetadataProvider { +namespace NKikimr::NMetadataInitializer { class ITableModifier { protected: @@ -31,4 +31,12 @@ public: } }; +class IController { +public: + using TPtr = std::shared_ptr<IController>; + virtual void PreparationFinished(const TVector<ITableModifier::TPtr>& modifiers) const = 0; + virtual void PreparationProblem(const TString& errorMessage) const = 0; + virtual ~IController() = default; +}; + } diff --git a/ydb/services/metadata/initializer/controller.cpp b/ydb/services/metadata/initializer/controller.cpp new file mode 100644 index 0000000000..1481e54733 --- /dev/null +++ b/ydb/services/metadata/initializer/controller.cpp @@ -0,0 +1,14 @@ +#include "controller.h" +#include "events.h" + +namespace NKikimr::NMetadataInitializer { + +void TInitializerController::PreparationFinished(const TVector<ITableModifier::TPtr>& modifiers) const { + ActorId.Send(ActorId, new TEvInitializerPreparationFinished(modifiers)); +} + +void TInitializerController::PreparationProblem(const TString& errorMessage) const { + ActorId.Send(ActorId, new TEvInitializerPreparationProblem(errorMessage)); +} + +} diff --git a/ydb/services/metadata/initializer/controller.h b/ydb/services/metadata/initializer/controller.h new file mode 100644 index 0000000000..2d2c2617e8 --- /dev/null +++ b/ydb/services/metadata/initializer/controller.h @@ -0,0 +1,19 @@ +#pragma once +#include "common.h" + +namespace NKikimr::NMetadataInitializer { + +class TInitializerController: public IController { +private: + const TActorIdentity ActorId; +public: + TInitializerController(const TActorIdentity& actorId) + : ActorId(actorId) { + + } + + virtual void PreparationFinished(const TVector<ITableModifier::TPtr>& modifiers) const override; + virtual void PreparationProblem(const TString& errorMessage) const override; +}; + +} diff --git a/ydb/services/metadata/initializer/events.cpp b/ydb/services/metadata/initializer/events.cpp new file mode 100644 index 0000000000..f4c4d34515 --- /dev/null +++ b/ydb/services/metadata/initializer/events.cpp @@ -0,0 +1,6 @@ +#include "events.h" +#include <ydb/core/grpc_services/local_rpc/local_rpc.h> + +namespace NKikimr::NMetadataInitializer { + +} diff --git a/ydb/services/metadata/initializer/events.h b/ydb/services/metadata/initializer/events.h new file mode 100644 index 0000000000..5fe9b13683 --- /dev/null +++ b/ydb/services/metadata/initializer/events.h @@ -0,0 +1,46 @@ +#pragma once +#include "common.h" + +#include <ydb/services/metadata/ds_table/config.h> + +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/event_local.h> +#include <library/cpp/threading/future/core/future.h> +#include <library/cpp/actors/core/av_bootstrapped.h> + +namespace NKikimr::NMetadataInitializer { + +enum EInitializerEvents { + EvInitializerPreparationStart = EventSpaceBegin(TKikimrEvents::ES_METADATA_INITIALIZER), + EvInitializerPreparationFinished, + EvInitializerPreparationProblem, + EvEnd +}; + +static_assert(EInitializerEvents::EvEnd < EventSpaceEnd(TKikimrEvents::ES_METADATA_INITIALIZER), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_METADATA_INITIALIZER)"); + +class TEvInitializerPreparationStart: public TEventLocal<TEvInitializerPreparationStart, EInitializerEvents::EvInitializerPreparationStart> { +public: +}; + +class TEvInitializerPreparationFinished: public TEventLocal<TEvInitializerPreparationFinished, EInitializerEvents::EvInitializerPreparationFinished> { +private: + YDB_READONLY_DEF(TVector<ITableModifier::TPtr>, Modifiers); +public: + TEvInitializerPreparationFinished(const TVector<ITableModifier::TPtr>& modifiers) + : Modifiers(modifiers) { + + } +}; + +class TEvInitializerPreparationProblem: public TEventLocal<TEvInitializerPreparationProblem, EInitializerEvents::EvInitializerPreparationProblem> { +private: + YDB_READONLY_DEF(TString, ErrorMessage); +public: + TEvInitializerPreparationProblem(const TString& errorMessage) + : ErrorMessage(errorMessage) { + + } +}; + +} diff --git a/ydb/services/metadata/service.h b/ydb/services/metadata/service.h index 858986f779..d576186974 100644 --- a/ydb/services/metadata/service.h +++ b/ydb/services/metadata/service.h @@ -7,11 +7,9 @@ namespace NKikimr::NMetadataProvider { class TEvSubscribeExternal: public NActors::TEventLocal<TEvSubscribeExternal, EEvSubscribe::EvSubscribeExternal> { private: YDB_READONLY_DEF(ISnapshotParser::TPtr, SnapshotParser); - YDB_READONLY_DEF(NActors::TActorId, SubscriberId); public: - TEvSubscribeExternal(ISnapshotParser::TPtr parser, const NActors::TActorId& subscriberId = {}) + TEvSubscribeExternal(ISnapshotParser::TPtr parser) : SnapshotParser(parser) - , SubscriberId(subscriberId) { Y_VERIFY(!!SnapshotParser); } |