aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2022-11-09 15:31:05 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2022-11-09 15:31:05 +0300
commit6735fd34dc275e7c3a2183764ce285ba247df821 (patch)
treeb3f792b9b651578eec6ddc221f911d507cf51340
parentce105bf8bc2d9c7a79cbeed15f61c1fe5b98a3db (diff)
downloadydb-6735fd34dc275e7c3a2183764ce285ba247df821.tar.gz
separate initializer and use async mode for prepare table modifications
-rw-r--r--ydb/core/base/events.h3
-rw-r--r--ydb/core/protos/flat_scheme_op.proto1
-rw-r--r--ydb/core/protos/services.proto4
-rw-r--r--ydb/core/tx/columnshard/columnshard__write_index.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp14
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h6
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.cpp6
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.h22
-rw-r--r--ydb/core/tx/columnshard/ut_columnshard_schema.cpp16
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp6
-rw-r--r--ydb/core/tx/tiering/CMakeLists.txt3
-rw-r--r--ydb/core/tx/tiering/cleaner_task.cpp4
-rw-r--r--ydb/core/tx/tiering/cleaner_task.h6
-rw-r--r--ydb/core/tx/tiering/common.h22
-rw-r--r--ydb/core/tx/tiering/external_data.cpp268
-rw-r--r--ydb/core/tx/tiering/external_data.h87
-rw-r--r--ydb/core/tx/tiering/manager.cpp32
-rw-r--r--ydb/core/tx/tiering/manager.h7
-rw-r--r--ydb/core/tx/tiering/owner_enrich.cpp111
-rw-r--r--ydb/core/tx/tiering/owner_enrich.h48
-rw-r--r--ydb/core/tx/tiering/path_cleaner.cpp7
-rw-r--r--ydb/core/tx/tiering/path_cleaner.h3
-rw-r--r--ydb/core/tx/tiering/rule.cpp2
-rw-r--r--ydb/core/tx/tiering/rule.h13
-rw-r--r--ydb/core/tx/tiering/snapshot.cpp103
-rw-r--r--ydb/core/tx/tiering/snapshot.h31
-rw-r--r--ydb/core/tx/tiering/snapshot_enrich.cpp65
-rw-r--r--ydb/core/tx/tiering/snapshot_enrich.h55
-rw-r--r--ydb/core/tx/tiering/tier_config.cpp1
-rw-r--r--ydb/core/tx/tiering/tier_config.h6
-rw-r--r--ydb/core/tx/tiering/ut/ut_tiers.cpp51
-rw-r--r--ydb/services/bg_tasks/ds_table/executor.cpp6
-rw-r--r--ydb/services/bg_tasks/ds_table/executor.h9
-rw-r--r--ydb/services/metadata/abstract/common.h24
-rw-r--r--ydb/services/metadata/ds_table/accessor_refresh.cpp33
-rw-r--r--ydb/services/metadata/ds_table/accessor_refresh.h51
-rw-r--r--ydb/services/metadata/ds_table/service.cpp6
-rw-r--r--ydb/services/metadata/initializer/CMakeLists.txt2
-rw-r--r--ydb/services/metadata/initializer/accessor_init.cpp28
-rw-r--r--ydb/services/metadata/initializer/accessor_init.h19
-rw-r--r--ydb/services/metadata/initializer/common.cpp2
-rw-r--r--ydb/services/metadata/initializer/common.h10
-rw-r--r--ydb/services/metadata/initializer/controller.cpp14
-rw-r--r--ydb/services/metadata/initializer/controller.h19
-rw-r--r--ydb/services/metadata/initializer/events.cpp6
-rw-r--r--ydb/services/metadata/initializer/events.h46
-rw-r--r--ydb/services/metadata/service.h4
47 files changed, 802 insertions, 482 deletions
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h
index 1b3782b5d4..4f4ec8e3f5 100644
--- a/ydb/core/base/events.h
+++ b/ydb/core/base/events.h
@@ -151,7 +151,8 @@ struct TKikimrEvents : TEvents {
ES_METADATA_PROVIDER,
ES_INTERNAL_REQUEST,
ES_BACKGROUND_TASKS,
- ES_TIERING
+ ES_TIERING,
+ ES_METADATA_INITIALIZER
};
};
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index bdfeab0a08..f1f1f5b218 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -892,6 +892,7 @@ message TS3Settings {
message TTaskCleaner {
optional uint64 PathId = 1;
optional TS3Settings StorageSettings = 2;
+ optional string OwnerPath = 3;
}
message TBackupTask {
diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto
index f8aa3decad..484681dd89 100644
--- a/ydb/core/protos/services.proto
+++ b/ydb/core/protos/services.proto
@@ -337,6 +337,7 @@ enum EServiceKikimr {
// Background tasks
BG_TASKS = 1700;
+ METADATA_INITIALIZER = 1800;
};
message TActivity {
@@ -928,5 +929,8 @@ message TActivity {
TEST_SHARD_VALIDATION_ACTOR = 583;
TX_TIERING_TIER_CLEANER = 584;
TX_TIERING_PATH_CLEANER = 585;
+ METADATA_INITIALIZER = 586;
+ TIERING_OWNER_ENRICH = 587;
+ TIERING_SNAPSHOT_ENRICH = 588;
};
};
diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp
index e6c88ef4cd..22ec60a381 100644
--- a/ydb/core/tx/columnshard/columnshard__write_index.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp
@@ -137,7 +137,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
}
// Mark exported blobs
- auto& tManager = Self->GetTierManagerVerified(tierName);
+ auto& tManager = Self->GetTierManagerVerified(NTiers::TGlobalTierId(Self->OwnerPath, tierName));
if (tManager.NeedExport()) {
for (auto& rec : portionInfo.Records) {
auto& blobId = rec.BlobRange.BlobId;
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 0cbfb6079e..f702f8b383 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -998,14 +998,14 @@ void TColumnShard::MapExternBlobs(const TActorContext& /*ctx*/, NOlap::TReadMeta
void TColumnShard::ExportBlobs(const TActorContext& ctx, ui64 exportNo, const TString& tierName,
TEvPrivate::TEvExport::TBlobDataMap&& blobsInfo) const {
- if (auto s3 = GetS3ActorForTier(tierName)) {
+ if (auto s3 = GetS3ActorForTier(NTiers::TGlobalTierId(OwnerPath, tierName))) {
auto event = std::make_unique<TEvPrivate::TEvExport>(exportNo, tierName, s3, std::move(blobsInfo));
ctx.Register(CreateExportActor(TabletID(), ctx.SelfID, event.release()));
}
}
void TColumnShard::ForgetBlobs(const TActorContext& ctx, const TString& tierName, std::vector<NOlap::TEvictedBlob>&& blobs) const {
- if (auto s3 = GetS3ActorForTier(tierName)) {
+ if (auto s3 = GetS3ActorForTier(NTiers::TGlobalTierId(OwnerPath, tierName))) {
auto forget = std::make_unique<TEvPrivate::TEvForget>();
forget->Evicted = std::move(blobs);
ctx.Send(s3, forget.release());
@@ -1014,7 +1014,7 @@ void TColumnShard::ForgetBlobs(const TActorContext& ctx, const TString& tierName
bool TColumnShard::GetExportedBlob(const TActorContext& ctx, TActorId dst, ui64 cookie, const TString& tierName,
NOlap::TEvictedBlob&& evicted, std::vector<NOlap::TBlobRange>&& ranges) {
- if (auto s3 = GetS3ActorForTier(tierName)) {
+ if (auto s3 = GetS3ActorForTier(NTiers::TGlobalTierId(OwnerPath, tierName))) {
auto get = std::make_unique<TEvPrivate::TEvGetExported>();
get->DstActor = dst;
get->DstCookie = cookie;
@@ -1036,17 +1036,15 @@ void TColumnShard::Die(const TActorContext& ctx) {
return IActor::Die(ctx);
}
-TActorId TColumnShard::GetS3ActorForTier(const TString& tierName) const {
+TActorId TColumnShard::GetS3ActorForTier(const NTiers::TGlobalTierId& tierId) const {
if (!Tiers) {
return {};
}
- return Tiers->GetStorageActorId(tierName);
+ return Tiers->GetStorageActorId(tierId);
}
void TColumnShard::Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev) {
- if (!Tiers) {
- Tiers = std::make_shared<TTiersManager>(TabletID(), SelfId(), OwnerPath);
- }
+ Y_VERIFY(Tiers);
Tiers->TakeConfigs(ev->Get()->GetSnapshot());
if (EnableTiering) {
Tiers->Start(Tiers);
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index 1595cbda2d..6929177f5a 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -139,9 +139,9 @@ class TColumnShard
Y_UNUSED(ctx);
}
- const NTiers::TManager& GetTierManagerVerified(const TString& tierName) const {
+ const NTiers::TManager& GetTierManagerVerified(const NTiers::TGlobalTierId& tierId) const {
Y_VERIFY(!!Tiers);
- return Tiers->GetManagerVerified(tierName);
+ return Tiers->GetManagerVerified(tierId);
}
void Die(const TActorContext& ctx) override;
@@ -447,7 +447,7 @@ private:
NOlap::TIndexInfo ConvertSchema(const NKikimrSchemeOp::TColumnTableSchema& schema);
void MapExternBlobs(const TActorContext& ctx, NOlap::TReadMetadata& metadata);
- TActorId GetS3ActorForTier(const TString& tierName) const;
+ TActorId GetS3ActorForTier(const NTiers::TGlobalTierId& tierId) const;
void ExportBlobs(const TActorContext& ctx, ui64 exportNo, const TString& tierName,
TEvPrivate::TEvExport::TBlobDataMap&& blobsInfo) const;
void ForgetBlobs(const TActorContext& ctx, const TString& tierName, std::vector<NOlap::TEvictedBlob>&& blobs) const;
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/columnshard_ut_common.cpp
index af407457cb..18f08925e9 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.cpp
@@ -296,7 +296,7 @@ NMetadataProvider::ISnapshot::TPtr TTestSchema::BuildSnapshot(const TTableSpecia
for (auto&& tier : specials.Tiers) {
{
NColumnShard::NTiers::TTierConfig tConfig;
- tConfig.SetOwnerPath("/Root");
+ tConfig.SetOwnerPath("/Root/olapStore");
tConfig.SetTierName(tier.Name);
tConfig.MutableProtoConfig().SetName(tier.Name);
auto& cProto = tConfig.MutableProtoConfig();
@@ -309,11 +309,11 @@ NMetadataProvider::ISnapshot::TPtr TTestSchema::BuildSnapshot(const TTableSpecia
if (tier.CompressionLevel) {
cProto.MutableCompression()->SetCompressionLevel(*tier.CompressionLevel);
}
- cs->MutableTierConfigs().emplace(tConfig.GetConfigId(), tConfig);
+ cs->MutableTierConfigs().emplace(tConfig.GetGlobalTierId(), tConfig);
}
{
NColumnShard::NTiers::TTieringRule tRule;
- tRule.SetOwnerPath("/Root");
+ tRule.SetOwnerPath("/Root/olapStore");
tRule.SetDurationForEvict(TDuration::Seconds(tier.GetEvictAfterSecondsUnsafe()));
tRule.SetTablePath(tablePath).SetTablePathId(tablePathId);
tRule.SetTierName(tier.Name);
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h
index 3c4829c6c2..f9e6b18c8f 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.h
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.h
@@ -207,8 +207,8 @@ struct TTestSchema {
}
static TString CreateTableTxBody(ui64 pathId, const TVector<std::pair<TString, TTypeInfo>>& columns,
- const TVector<std::pair<TString, TTypeInfo>>& pk,
- const TTableSpecials& specials = {}) {
+ const TVector<std::pair<TString, TTypeInfo>>& pk,
+ const TTableSpecials& specials = {}) {
NKikimrTxColumnShard::TSchemaTxBody tx;
auto* table = tx.MutableEnsureTables()->AddTables();
table->SetPathId(pathId);
@@ -231,6 +231,24 @@ struct TTestSchema {
return out;
}
+ static TString CreateInitShardTxBody(ui64 pathId, const TVector<std::pair<TString, TTypeInfo>>& columns,
+ const TVector<std::pair<TString, TTypeInfo>>& pk,
+ const TTableSpecials& specials = {}, const TString& ownerPath = "/Root/olap") {
+ NKikimrTxColumnShard::TSchemaTxBody tx;
+ auto* table = tx.MutableInitShard()->AddTables();
+ tx.MutableInitShard()->SetOwnerPath(ownerPath);
+ table->SetPathId(pathId);
+
+ InitSchema(columns, pk, specials, table->MutableSchema());
+ if (specials.HasTtl()) {
+ InitTtl(specials, table->MutableTtlSettings());
+ }
+
+ TString out;
+ Y_PROTOBUF_SUPPRESS_NODISCARD tx.SerializeToString(&out);
+ return out;
+ }
+
static TString CreateStandaloneTableTxBody(ui64 pathId, const TVector<std::pair<TString, TTypeInfo>>& columns,
const TVector<std::pair<TString, TTypeInfo>>& pk,
const TTableSpecials& specials = {}) {
diff --git a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
index 7dd77b12ed..e9d2db69bb 100644
--- a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
+++ b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
@@ -161,11 +161,13 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
spec.SetEvictAfterSeconds(ttlSec);
}
bool ok = ProposeSchemaTx(runtime, sender,
- TTestSchema::CreateTableTxBody(tableId, testYdbSchema, testYdbPk, spec),
+ TTestSchema::CreateInitShardTxBody(tableId, testYdbSchema, testYdbPk, spec, "/Root/olapStore"),
{++planStep, ++txId});
UNIT_ASSERT(ok);
PlanSchemaTx(runtime, sender, {planStep, txId});
- ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(spec, "test", tableId));
+ if (spec.HasTiers()) {
+ ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(spec, "test", tableId));
+ }
//
ui32 portionSize = 80 * 1000;
@@ -228,7 +230,9 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
{++planStep, ++txId});
UNIT_ASSERT(ok);
PlanSchemaTx(runtime, sender, {planStep, txId});
- ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(spec, "test", tableId));
+ if (spec.HasTiers()) {
+ ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(spec, "test", tableId));
+ }
if (internal) {
TriggerTTL(runtime, sender, {++planStep, ++txId}, {}, 0, spec.GetTtlColumn());
@@ -259,7 +263,9 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
TTestSchema::AlterTableTxBody(tableId, 3, TTestSchema::TTableSpecials()),
{++planStep, ++txId});
UNIT_ASSERT(ok);
- ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(TTestSchema::TTableSpecials(), "test", tableId));
+ if (spec.HasTiers()) {
+ ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(TTestSchema::TTableSpecials(), "test", tableId));
+ }
PlanSchemaTx(runtime, sender, {planStep, txId});
UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, blobs[0]));
@@ -427,7 +433,7 @@ TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTe
UNIT_ASSERT(specs.size() > 0);
{
const bool ok = ProposeSchemaTx(runtime, sender,
- TTestSchema::CreateTableTxBody(tableId, testYdbSchema, testYdbPk, specs[0]),
+ TTestSchema::CreateInitShardTxBody(tableId, testYdbSchema, testYdbPk, specs[0], "/Root/olapStore"),
{ ++planStep, ++txId });
UNIT_ASSERT(ok);
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp
index 46cde5886d..7c825b98e1 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp
@@ -314,7 +314,11 @@ public:
<< ", at schemeshard: " << ssId);
if (NBackgroundTasks::TServiceOperator::IsEnabled()) {
- NBackgroundTasks::TTask task(std::make_shared<NColumnShard::NTiers::TTaskCleanerActivity>(txState->TargetPathId.LocalPathId), nullptr);
+ NSchemeShard::TPath path = NSchemeShard::TPath::Init(txState->TargetPathId, context.SS);
+ TColumnTableInfo::TPtr tableInfo = context.SS->ColumnTables.at(path.Base()->PathId);
+ NSchemeShard::TPath ownerPath = tableInfo->IsStandalone() ? path : path.FindOlapStore();
+ Y_VERIFY(path.IsResolved());
+ NBackgroundTasks::TTask task(std::make_shared<NColumnShard::NTiers::TTaskCleanerActivity>(txState->TargetPathId.LocalPathId, ownerPath.PathString()), nullptr);
task.SetId(OperationId.SerializeToString());
context.SS->SelfId().Send(NBackgroundTasks::MakeServiceId(context.SS->SelfId().NodeId()), new NBackgroundTasks::TEvAddTask(std::move(task)));
return false;
diff --git a/ydb/core/tx/tiering/CMakeLists.txt b/ydb/core/tx/tiering/CMakeLists.txt
index f6662658a3..59d6f17042 100644
--- a/ydb/core/tx/tiering/CMakeLists.txt
+++ b/ydb/core/tx/tiering/CMakeLists.txt
@@ -31,6 +31,9 @@ target_sources(core-tx-tiering PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/external_data.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/tier_config.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/rule.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/owner_enrich.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/snapshot_enrich.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/snapshot.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/s3_actor.cpp
)
diff --git a/ydb/core/tx/tiering/cleaner_task.cpp b/ydb/core/tx/tiering/cleaner_task.cpp
index a1ab4b2042..4973c31bdc 100644
--- a/ydb/core/tx/tiering/cleaner_task.cpp
+++ b/ydb/core/tx/tiering/cleaner_task.cpp
@@ -8,11 +8,13 @@ TTaskCleanerActivity::TFactory::TRegistrator<TTaskCleanerActivity> TTaskCleanerA
NKikimrSchemeOp::TTaskCleaner TTaskCleanerActivity::DoSerializeToProto() const {
NKikimrSchemeOp::TTaskCleaner result;
result.SetPathId(PathId);
+ result.SetOwnerPath(OwnerPath);
return result;
}
bool TTaskCleanerActivity::DoDeserializeFromProto(const NKikimrSchemeOp::TTaskCleaner& protoData) {
PathId = protoData.GetPathId();
+ OwnerPath = protoData.GetOwnerPath();
return true;
}
@@ -20,7 +22,7 @@ void TTaskCleanerActivity::DoExecute(NBackgroundTasks::ITaskExecutorController::
const NBackgroundTasks::TTaskStateContainer& /*state*/)
{
#ifndef KIKIMR_DISABLE_S3_OPS
- TActivationContext::AsActorContext().Register(new TPathCleaner(PathId, controller));
+ TActivationContext::AsActorContext().Register(new TPathCleaner(PathId, OwnerPath, controller));
#else
controller->TaskFinished();
#endif
diff --git a/ydb/core/tx/tiering/cleaner_task.h b/ydb/core/tx/tiering/cleaner_task.h
index d036fc8aeb..12e11bb822 100644
--- a/ydb/core/tx/tiering/cleaner_task.h
+++ b/ydb/core/tx/tiering/cleaner_task.h
@@ -8,6 +8,7 @@ namespace NKikimr::NColumnShard::NTiers {
class TTaskCleanerActivity: public NBackgroundTasks::IProtoStringSerializable<NKikimrSchemeOp::TTaskCleaner, NBackgroundTasks::ITaskActivity> {
private:
+ YDB_READONLY_DEF(TString, OwnerPath);
YDB_READONLY(ui64, PathId, 0);
static TFactory::TRegistrator<TTaskCleanerActivity> Registrator;
protected:
@@ -19,8 +20,9 @@ protected:
public:
TTaskCleanerActivity() = default;
- TTaskCleanerActivity(const ui64 pathId)
- : PathId(pathId)
+ TTaskCleanerActivity(const ui64 pathId, const TString& ownerPath)
+ : OwnerPath(ownerPath)
+ , PathId(pathId)
{
}
diff --git a/ydb/core/tx/tiering/common.h b/ydb/core/tx/tiering/common.h
index a891f6acf8..536e920a30 100644
--- a/ydb/core/tx/tiering/common.h
+++ b/ydb/core/tx/tiering/common.h
@@ -1,10 +1,32 @@
#pragma once
#include <ydb/core/base/events.h>
+#include <ydb/library/accessor/accessor.h>
+
#include <library/cpp/actors/core/events.h>
namespace NKikimr::NColumnShard::NTiers {
+class TGlobalTierId {
+private:
+ YDB_ACCESSOR_DEF(TString, OwnerPath);
+ YDB_ACCESSOR_DEF(TString, TierName);
+public:
+ TGlobalTierId(const TString& ownerPath, const TString& tierName)
+ : OwnerPath(ownerPath)
+ , TierName(tierName) {
+
+ }
+
+ bool operator<(const TGlobalTierId& item) const {
+ return std::tie(OwnerPath, TierName) < std::tie(item.OwnerPath, item.TierName);
+ }
+
+ TString ToString() const {
+ return OwnerPath + "." + TierName;
+ }
+};
+
enum EEvents {
EvTierCleared = EventSpaceBegin(TKikimrEvents::ES_TIERING),
EvEnd
diff --git a/ydb/core/tx/tiering/external_data.cpp b/ydb/core/tx/tiering/external_data.cpp
index 8c508dde91..1d9ef8376d 100644
--- a/ydb/core/tx/tiering/external_data.cpp
+++ b/ydb/core/tx/tiering/external_data.cpp
@@ -1,4 +1,6 @@
#include "external_data.h"
+#include "owner_enrich.h"
+#include "snapshot_enrich.h"
#include <ydb/core/base/path.h>
@@ -9,260 +11,28 @@
namespace NKikimr::NColumnShard::NTiers {
-bool TConfigsSnapshot::DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawDataResult) {
- Y_VERIFY(rawDataResult.result_sets().size() == 2);
- {
- auto& rawData = rawDataResult.result_sets()[0];
- TTierConfig::TDecoder decoder(rawData);
- for (auto&& r : rawData.rows()) {
- TTierConfig config;
- if (!config.DeserializeFromRecord(decoder, r)) {
- ALS_ERROR(NKikimrServices::TX_TIERING) << "cannot parse tier config from snapshot";
- continue;
- }
- TierConfigs.emplace(config.GetConfigId(), config);
- }
+void TSnapshotConstructor::EnrichSnapshotData(ISnapshot::TPtr original, NMetadataProvider::ISnapshotAcceptorController::TPtr controller) const
+{
+ if (!TablesDecoder) {
+ TablesDecoder = std::make_shared<TTablesDecoderCache>();
}
- {
- auto& rawData = rawDataResult.result_sets()[1];
- TTieringRule::TDecoder decoder(rawData);
- TVector<TTieringRule> rulesLocal;
- rulesLocal.reserve(rawData.rows().size());
- for (auto&& r : rawData.rows()) {
- TTieringRule tr;
- if (!tr.DeserializeFromRecord(decoder, r)) {
- ALS_WARN(NKikimrServices::TX_TIERING) << "cannot parse record for tiering info";
- continue;
- }
- rulesLocal.emplace_back(std::move(tr));
- }
- std::sort(rulesLocal.begin(), rulesLocal.end());
- for (auto&& i : rulesLocal) {
- const TString tablePath = i.GetTablePath();
- TableTierings[tablePath].AddRule(std::move(i));
- }
- }
- return true;
-}
-
-std::optional<TTierConfig> TConfigsSnapshot::GetValue(const TString& key) const {
- auto it = TierConfigs.find(key);
- if (it == TierConfigs.end()) {
- return {};
- } else {
- return it->second;
- }
-}
-
-void TConfigsSnapshot::RemapTablePathToId(const TString& path, const ui64 pathId) {
- auto it = TableTierings.find(path);
- Y_VERIFY(it != TableTierings.end());
- it->second.SetTablePathId(pathId);
+ TActivationContext::AsActorContext().Register(new TActorSnapshotEnrich(original, controller, TablesDecoder));
}
-const TTableTiering* TConfigsSnapshot::GetTableTiering(const TString& tablePath) const {
- auto it = TableTierings.find(tablePath);
- if (it == TableTierings.end()) {
- return nullptr;
- } else {
- return &it->second;
- }
-}
-
-std::vector<NKikimr::NColumnShard::NTiers::TTierConfig> TConfigsSnapshot::GetTiersForPathId(const ui64 pathId) const {
- std::vector<TTierConfig> result;
- std::set<TString> readyIds;
- for (auto&& i : TableTierings) {
- for (auto&& r : i.second.GetRules()) {
- if (r.GetTablePathId() == pathId) {
- auto it = TierConfigs.find(r.GetTierId());
- if (it == TierConfigs.end()) {
- ALS_ERROR(NKikimrServices::TX_TIERING) << "inconsistency tiering for " << r.GetTierId();
- continue;
- } else if (readyIds.emplace(r.GetTierId()).second) {
- result.emplace_back(it->second);
- }
- }
- }
- }
- return result;
+TSnapshotConstructor::TSnapshotConstructor(const TString& ownerPath)
+ : OwnerPath(ownerPath)
+{
+ Y_VERIFY(!!OwnerPath);
+ const TString ownerPathNormal = TFsPath(OwnerPath).Fix().GetPath();
+ const TString tenantPathNormal = TFsPath(AppData()->TenantName).Fix().GetPath();
+ Y_VERIFY(ownerPathNormal.StartsWith(tenantPathNormal));
+ TablePath = "/" + AppData()->TenantName + "/.configs/tiering/";
+ Tables.emplace_back(TablePath + "tiers");
+ Tables.emplace_back(TablePath + "rules");
}
-TVector<NMetadataProvider::ITableModifier::TPtr> TSnapshotConstructor::DoGetTableSchema() const {
- TVector<NMetadataProvider::ITableModifier::TPtr> result;
- {
- Ydb::Table::CreateTableRequest request;
- request.set_session_id("");
- request.set_path(Tables[0]);
- request.add_primary_key("ownerPath");
- request.add_primary_key("tierName");
- {
- auto& column = *request.add_columns();
- column.set_name("tierName");
- column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
- }
- {
- auto& column = *request.add_columns();
- column.set_name("ownerPath");
- column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
- }
- {
- auto& column = *request.add_columns();
- column.set_name("tierConfig");
- column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
- }
- result.emplace_back(new NMetadataProvider::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request));
- }
- {
- Ydb::Table::CreateTableRequest request;
- request.set_session_id("");
- request.set_path(Tables[1]);
- request.add_primary_key("ownerPath");
- request.add_primary_key("tierName");
- request.add_primary_key("tablePath");
- {
- auto& column = *request.add_columns();
- column.set_name("tierName");
- column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
- }
- {
- auto& column = *request.add_columns();
- column.set_name("ownerPath");
- column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
- }
- {
- auto& column = *request.add_columns();
- column.set_name("tablePath");
- column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
- }
- {
- auto& column = *request.add_columns();
- column.set_name("durationForEvict");
- column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
- }
- {
- auto& column = *request.add_columns();
- column.set_name("column");
- column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
- }
- result.emplace_back(new NMetadataProvider::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request));
- }
- for (auto&& t: Tables) {
- Ydb::Scheme::ModifyPermissionsRequest request;
- request.set_path(t);
- request.set_clear_permissions(true);
- auto* permission = request.add_actions();
- permission->mutable_grant()->add_permission_names("ydb.tables.modify");
- permission->mutable_grant()->add_permission_names("ydb.tables.read");
- result.emplace_back(new NMetadataProvider::TGenericTableModifier<NInternal::NRequest::TDialogModifyPermissions>(request));
- }
- return result;
-}
-
-NThreading::TFuture<NMetadataProvider::ISnapshot::TPtr> TSnapshotConstructor::EnrichSnapshotData(ISnapshot::TPtr original) const {
- if (!Actor) {
- return NThreading::MakeErrorFuture<NMetadataProvider::ISnapshot::TPtr>(
- std::make_exception_ptr(std::runtime_error("actor finished for enrich process")));
- }
- auto current = std::dynamic_pointer_cast<TConfigsSnapshot>(original);
- Y_VERIFY(current);
- auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
- request->DatabaseName = NKikimr::CanonizePath(AppData()->TenantName);
- for (auto&& i : current->GetTableTierings()) {
- auto it = TablesRemapper.find(i.second.GetTablePath());
- if (it != TablesRemapper.end()) {
- current->RemapTablePathToId(it->first, it->second);
- } else {
- auto& entry = request->ResultSet.emplace_back();
- entry.Operation = TNavigate::OpTable;
- entry.Path = NKikimr::SplitPath(i.second.GetTablePath());
- }
- }
- if (request->ResultSet.empty()) {
- return NThreading::MakeFuture(original);
- } else {
- WaitPromise = NThreading::NewPromise<ISnapshot::TPtr>();
- WaitSnapshot = current;
- Actor->ProvideEvent(new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release()), MakeSchemeCacheID());
- return WaitPromise.GetFuture();
- }
-}
-
-void TSnapshotConstructorAgent::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
- Owner->ResolveInfo(ev->Get());
-}
-
-void TSnapshotConstructorAgent::ProvideEvent(IEventBase* event, const TActorId& recipient) {
- Send(recipient, event);
-}
-
-TSnapshotConstructorAgent::~TSnapshotConstructorAgent() {
- Owner->ActorStopped();
-}
-
-void TSnapshotConstructor::ResolveInfo(const TEvTxProxySchemeCache::TEvNavigateKeySetResult* info) {
- const auto& request = info->Request;
-
- if (request->ResultSet.empty()) {
- WaitPromise.SetException("cannot resolve table path to ids");
- WaitSnapshot = nullptr;
- return;
- }
-
- for (auto&& i : request->ResultSet) {
- const TString path = "/" + JoinSeq("/", i.Path);
- switch (i.Status) {
- case TNavigate::EStatus::Ok:
- TablesRemapper.emplace(path, i.TableId.PathId.LocalPathId);
- break;
- case TNavigate::EStatus::RootUnknown:
- case TNavigate::EStatus::PathErrorUnknown:
- WaitPromise.SetException("path not exists " + path);
- break;
- case TNavigate::EStatus::LookupError:
- case TNavigate::EStatus::RedirectLookupError:
- WaitPromise.SetException("RESOLVE_LOOKUP_ERROR" + path);
- break;
- default:
- WaitPromise.SetException("GENERIC_RESOLVE_ERROR" + path);
- break;
- }
- }
- if (WaitPromise.HasException()) {
- WaitSnapshot = nullptr;
- } else {
- for (auto&& i : WaitSnapshot->GetTableTierings()) {
- auto it = TablesRemapper.find(i.second.GetTablePath());
- if (it != TablesRemapper.end()) {
- WaitSnapshot->RemapTablePathToId(it->first, it->second);
- } else {
- WaitPromise.SetException("undecoded path " + i.second.GetTablePath());
- WaitSnapshot = nullptr;
- return;
- }
- }
- WaitPromise.SetValue(WaitSnapshot);
- WaitSnapshot = nullptr;
- }
-}
-
-TSnapshotConstructor::TSnapshotConstructor() {
- TablePath = "/" + AppData()->TenantName + "/.external_data";
- Tables.emplace_back(TablePath + "/tiers");
- Tables.emplace_back(TablePath + "/tiering");
-}
-
-TString NTiers::TConfigsSnapshot::DoSerializeToString() const {
- NJson::TJsonValue result = NJson::JSON_MAP;
- auto& jsonTiers = result.InsertValue("tiers", NJson::JSON_MAP);
- for (auto&& i : TierConfigs) {
- jsonTiers.InsertValue(i.first, i.second.GetDebugJson());
- }
- auto& jsonTiering = result.InsertValue("tiering", NJson::JSON_MAP);
- for (auto&& i : TableTierings) {
- jsonTiering.InsertValue(i.first, i.second.GetDebugJson());
- }
- return result.GetStringRobust();
+void TSnapshotConstructor::DoPrepare(NMetadataInitializer::IController::TPtr controller) const {
+ TActivationContext::AsActorContext().Register(new TActorOwnerEnrich(OwnerPath, controller, Tables));
}
}
diff --git a/ydb/core/tx/tiering/external_data.h b/ydb/core/tx/tiering/external_data.h
index 3a01c87c48..a04e3caf5d 100644
--- a/ydb/core/tx/tiering/external_data.h
+++ b/ydb/core/tx/tiering/external_data.h
@@ -1,6 +1,6 @@
#pragma once
-#include "rule.h"
-#include "tier_config.h"
+#include "snapshot.h"
+#include "snapshot_enrich.h"
#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
@@ -10,97 +10,24 @@
namespace NKikimr::NColumnShard::NTiers {
-class TConfigsSnapshot: public NMetadataProvider::ISnapshot {
-private:
- using TBase = NMetadataProvider::ISnapshot;
- using TConfigsMap = TMap<TString, TTierConfig>;
- YDB_ACCESSOR_DEF(TConfigsMap, TierConfigs);
- using TTieringMap = TMap<TString, TTableTiering>;
- YDB_ACCESSOR_DEF(TTieringMap, TableTierings);
-protected:
- virtual bool DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawData) override;
- virtual TString DoSerializeToString() const override;
-public:
- std::vector<TTierConfig> GetTiersForPathId(const ui64 pathId) const;
- const TTableTiering* GetTableTiering(const TString& tablePath) const;
- void RemapTablePathToId(const TString& path, const ui64 pathId);
- std::optional<TTierConfig> GetValue(const TString& key) const;
- using TBase::TBase;
-};
-
-class TSnapshotConstructor;
-
-class TSnapshotConstructorAgent: public TActor<TSnapshotConstructorAgent> {
-private:
- using TBase = TActor<TSnapshotConstructorAgent>;
- std::shared_ptr<TSnapshotConstructor> Owner;
-private:
- void Handle(NActors::TEvents::TEvPoison::TPtr& /*ev*/) {
- PassAway();
- }
- void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
-public:
- STATEFN(StateMain) {
- switch (ev->GetTypeRewrite()) {
- hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
- hFunc(NActors::TEvents::TEvPoison, Handle);
- default:
- break;
- }
- }
-
- TSnapshotConstructorAgent(std::shared_ptr<TSnapshotConstructor> owner)
- : TBase(&TThis::StateMain)
- , Owner(owner)
- {
-
- }
-
- ~TSnapshotConstructorAgent();
-
- void ProvideEvent(IEventBase* event, const TActorId& recipient);
-};
-
class TSnapshotConstructor: public NMetadataProvider::TGenericSnapshotParser<TConfigsSnapshot> {
private:
using TNavigate = NSchemeCache::TSchemeCacheNavigate;
using TBaseActor = TActor<TSnapshotConstructor>;
using ISnapshot = NMetadataProvider::ISnapshot;
TVector<TString> Tables;
- TMap<TString, ui64> TablesRemapper;
- TSnapshotConstructorAgent* Actor = nullptr;
TString TablePath;
- mutable std::shared_ptr<TConfigsSnapshot> WaitSnapshot;
- mutable NThreading::TPromise<NMetadataProvider::ISnapshot::TPtr> WaitPromise;
+ const TString OwnerPath;
+ mutable std::shared_ptr<TTablesDecoderCache> TablesDecoder;
protected:
- virtual TVector<NMetadataProvider::ITableModifier::TPtr> DoGetTableSchema() const override;
virtual const TVector<TString>& DoGetTables() const override {
return Tables;
}
+ virtual void DoPrepare(NMetadataInitializer::IController::TPtr controller) const override;
public:
- void ResolveInfo(const TEvTxProxySchemeCache::TEvNavigateKeySetResult* info);
- virtual NThreading::TFuture<ISnapshot::TPtr> EnrichSnapshotData(ISnapshot::TPtr original) const override;
-
- TSnapshotConstructor();
+ virtual void EnrichSnapshotData(ISnapshot::TPtr original, NMetadataProvider::ISnapshotAcceptorController::TPtr controller) const override;
- void Start(std::shared_ptr<TSnapshotConstructor> ownerPtr) {
- Y_VERIFY(!Actor);
- Actor = new TSnapshotConstructorAgent(ownerPtr);
- TActivationContext::AsActorContext().RegisterWithSameMailbox(Actor);
- }
-
- void ActorStopped() {
- Actor = nullptr;
- }
-
- void Stop() {
- if (Actor && TlsActivationContext) {
- TActivationContext::AsActorContext().Send(Actor->SelfId(), new NActors::TEvents::TEvPoison);
- }
- }
-
- ~TSnapshotConstructor() {
- }
+ TSnapshotConstructor(const TString& ownerPath);
};
}
diff --git a/ydb/core/tx/tiering/manager.cpp b/ydb/core/tx/tiering/manager.cpp
index a9cd24a2c7..e015d9c3ba 100644
--- a/ydb/core/tx/tiering/manager.cpp
+++ b/ydb/core/tx/tiering/manager.cpp
@@ -30,7 +30,7 @@ public:
}
void Bootstrap() {
Become(&TThis::StateMain);
- Send(GetExternalDataActorId(), new NMetadataProvider::TEvSubscribeExternal(Owner->GetExternalDataManipulation(), SelfId()));
+ Send(GetExternalDataActorId(), new NMetadataProvider::TEvSubscribeExternal(Owner->GetExternalDataManipulation()));
}
void Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev) {
auto snapshot = ev->Get()->GetSnapshot();
@@ -133,7 +133,7 @@ void TTiersManager::TakeConfigs(NMetadataProvider::ISnapshot::TPtr snapshotExt)
Snapshot = snapshotExt;
auto& snapshot = *snapshotPtr;
for (auto itSelf = Managers.begin(); itSelf != Managers.end(); ) {
- auto it = snapshot.GetTierConfigs().find(OwnerPath + "." + itSelf->first);
+ auto it = snapshot.GetTierConfigs().find(itSelf->first);
if (it == snapshot.GetTierConfigs().end()) {
itSelf->second.Stop();
itSelf = Managers.erase(itSelf);
@@ -143,29 +143,26 @@ void TTiersManager::TakeConfigs(NMetadataProvider::ISnapshot::TPtr snapshotExt)
}
}
for (auto&& i : snapshot.GetTierConfigs()) {
- if (i.second.GetOwnerPath() != OwnerPath && !!OwnerPath) {
- continue;
- }
- if (Managers.contains(i.second.GetTierName())) {
+ if (Managers.contains(i.second.GetGlobalTierId())) {
continue;
}
NTiers::TManager localManager(TabletId, TabletActorId, i.second);
- auto& manager = Managers.emplace(i.second.GetTierName(), std::move(localManager)).first->second;
+ auto& manager = Managers.emplace(i.second.GetGlobalTierId(), std::move(localManager)).first->second;
if (IsActive()) {
manager.Start();
}
}
}
-TActorId TTiersManager::GetStorageActorId(const TString& tierName) {
- auto it = Managers.find(tierName);
+TActorId TTiersManager::GetStorageActorId(const NTiers::TGlobalTierId& tierId) {
+ auto it = Managers.find(tierId);
if (it == Managers.end()) {
- ALS_ERROR(NKikimrServices::TX_TIERING) << "No S3 actor for tier '" << tierName << "' at tablet " << TabletId;
+ ALS_ERROR(NKikimrServices::TX_TIERING) << "No S3 actor for tier '" << tierId.ToString() << "' at tablet " << TabletId;
return {};
}
auto actorId = it->second.GetStorageActorId();
if (!actorId) {
- ALS_ERROR(NKikimrServices::TX_TIERING) << "Not started storage actor for tier '" << tierName << "' at tablet " << TabletId;
+ ALS_ERROR(NKikimrServices::TX_TIERING) << "Not started storage actor for tier '" << tierId.ToString() << "' at tablet " << TabletId;
return {};
}
return actorId;
@@ -201,7 +198,7 @@ TTiersManager& TTiersManager::Stop() {
return *this;
}
-const NKikimr::NColumnShard::NTiers::TManager& TTiersManager::GetManagerVerified(const TString& tierId) const {
+const NTiers::TManager& TTiersManager::GetManagerVerified(const NTiers::TGlobalTierId& tierId) const {
auto it = Managers.find(tierId);
Y_VERIFY(it != Managers.end());
return it->second;
@@ -209,9 +206,7 @@ const NKikimr::NColumnShard::NTiers::TManager& TTiersManager::GetManagerVerified
NMetadataProvider::ISnapshotParser::TPtr TTiersManager::GetExternalDataManipulation() const {
if (!ExternalDataManipulation) {
- ExternalDataManipulation = std::make_shared<NTiers::TSnapshotConstructor>();
- auto edmPtr = std::dynamic_pointer_cast<NTiers::TSnapshotConstructor>(ExternalDataManipulation);
- edmPtr->Start(edmPtr);
+ ExternalDataManipulation = std::make_shared<NTiers::TSnapshotConstructor>(OwnerPath);
}
return ExternalDataManipulation;
}
@@ -229,13 +224,6 @@ THashMap<ui64, NKikimr::NOlap::TTiersInfo> TTiersManager::GetTiering() const {
return result;
}
-TTiersManager::~TTiersManager() {
- auto cs = std::dynamic_pointer_cast<NTiers::TSnapshotConstructor>(ExternalDataManipulation);
- if (!!cs) {
- cs->Stop();
- }
-}
-
TActorId TTiersManager::GetActorId() const {
if (Actor) {
return Actor->SelfId();
diff --git a/ydb/core/tx/tiering/manager.h b/ydb/core/tx/tiering/manager.h
index 008682a891..8595ccc7c8 100644
--- a/ydb/core/tx/tiering/manager.h
+++ b/ydb/core/tx/tiering/manager.h
@@ -33,7 +33,7 @@ public:
class TTiersManager {
private:
class TActor;
- using TManagers = TMap<TString, NTiers::TManager>;
+ using TManagers = TMap<NTiers::TGlobalTierId, NTiers::TManager>;
ui64 TabletId = 0;
const TActorId TabletActorId;
TString OwnerPath;
@@ -52,13 +52,12 @@ public:
{
}
TActorId GetActorId() const;
- ~TTiersManager();
THashMap<ui64, NOlap::TTiersInfo> GetTiering() const;
void TakeConfigs(NMetadataProvider::ISnapshot::TPtr snapshot);
TTiersManager& Start(std::shared_ptr<TTiersManager> ownerPtr);
TTiersManager& Stop();
- TActorId GetStorageActorId(const TString& tierName);
- const NTiers::TManager& GetManagerVerified(const TString& tierId) const;
+ TActorId GetStorageActorId(const NTiers::TGlobalTierId& tierId);
+ const NTiers::TManager& GetManagerVerified(const NTiers::TGlobalTierId& tierId) const;
NMetadataProvider::ISnapshotParser::TPtr GetExternalDataManipulation() const;
TManagers::const_iterator begin() const {
diff --git a/ydb/core/tx/tiering/owner_enrich.cpp b/ydb/core/tx/tiering/owner_enrich.cpp
new file mode 100644
index 0000000000..f3d2fa611c
--- /dev/null
+++ b/ydb/core/tx/tiering/owner_enrich.cpp
@@ -0,0 +1,111 @@
+#include "owner_enrich.h"
+#include "snapshot.h"
+
+#include <ydb/core/base/path.h>
+
+#include <util/string/join.h>
+
+namespace NKikimr::NColumnShard::NTiers {
+
+TVector<NMetadataInitializer::ITableModifier::TPtr> TActorOwnerEnrich::BuildModifiers(const NSchemeCache::TSchemeCacheNavigate::TEntry& ownerEntry) const {
+ TVector<NMetadataInitializer::ITableModifier::TPtr> result;
+ {
+ Ydb::Table::CreateTableRequest request;
+ request.set_session_id("");
+ request.set_path(TableNames[0]);
+ request.add_primary_key("ownerPath");
+ request.add_primary_key("tierName");
+ {
+ auto& column = *request.add_columns();
+ column.set_name("ownerPath");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("tierName");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("tierConfig");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ result.emplace_back(new NMetadataInitializer::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request));
+ }
+ {
+ Ydb::Table::CreateTableRequest request;
+ request.set_session_id("");
+ request.set_path(TableNames[1]);
+ request.add_primary_key("ownerPath");
+ request.add_primary_key("tablePath");
+ request.add_primary_key("tierName");
+ {
+ auto& column = *request.add_columns();
+ column.set_name("ownerPath");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("tierName");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("tablePath");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("durationForEvict");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("column");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ result.emplace_back(new NMetadataInitializer::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request));
+ }
+ Y_VERIFY(!!ownerEntry.SecurityObject);
+ return result;
+}
+
+void TActorOwnerEnrich::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
+ auto* info = ev->Get();
+ const auto& request = info->Request;
+ auto g = PassAwayGuard();
+ if (request->ResultSet.empty()) {
+ Controller->PreparationProblem("cannot resolve owner path to ids");
+ return;
+ }
+
+ if (request->ResultSet.size() != 1) {
+ Controller->PreparationProblem("unexpected result set size: " + ::ToString(request->ResultSet.size()));
+ return;
+ }
+
+ for (auto&& i : request->ResultSet) {
+ const TString path = "/" + JoinSeq("/", i.Path);
+ switch (i.Status) {
+ case TNavigate::EStatus::Ok:
+ Controller->PreparationFinished(BuildModifiers(i));
+ return;
+ default:
+ Controller->PreparationProblem(::ToString(i.Status) + " for '" + path + "'");
+ return;
+ }
+ }
+ Controller->PreparationProblem("unexpected");
+}
+
+void TActorOwnerEnrich::Bootstrap() {
+ Become(&TThis::StateMain);
+ auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
+ request->DatabaseName = NKikimr::CanonizePath(AppData()->TenantName);
+ auto& entry = request->ResultSet.emplace_back();
+ entry.Operation = TNavigate::OpPath;
+ entry.Path = NKikimr::SplitPath(OwnerPath);
+ Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release()));
+}
+
+}
diff --git a/ydb/core/tx/tiering/owner_enrich.h b/ydb/core/tx/tiering/owner_enrich.h
new file mode 100644
index 0000000000..969c5fea28
--- /dev/null
+++ b/ydb/core/tx/tiering/owner_enrich.h
@@ -0,0 +1,48 @@
+#pragma once
+#include "rule.h"
+#include "tier_config.h"
+
+#include <ydb/core/protos/flat_scheme_op.pb.h>
+#include <ydb/core/tx/scheme_cache/scheme_cache.h>
+#include <ydb/services/metadata/service.h>
+
+#include <library/cpp/json/writer/json_value.h>
+
+namespace NKikimr::NColumnShard::NTiers {
+
+class TActorOwnerEnrich: public TActorBootstrapped<TActorOwnerEnrich> {
+private:
+ using TBase = TActorBootstrapped<TActorOwnerEnrich>;
+ using TNavigate = NSchemeCache::TSchemeCacheNavigate;
+ const TString OwnerPath;
+ NMetadataInitializer::IController::TPtr Controller;
+ const TVector<TString> TableNames;
+private:
+ TVector<NMetadataInitializer::ITableModifier::TPtr> BuildModifiers(const NSchemeCache::TSchemeCacheNavigate::TEntry& ownerEntry) const;
+
+ void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
+public:
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::TIERING_OWNER_ENRICH;
+ }
+
+ STATEFN(StateMain) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
+ default:
+ break;
+ }
+ }
+
+ TActorOwnerEnrich(const TString& ownerPath, NMetadataInitializer::IController::TPtr controller, const TVector<TString>& tableNames)
+ : OwnerPath(ownerPath)
+ , Controller(controller)
+ , TableNames(tableNames)
+ {
+
+ }
+
+ void Bootstrap();
+};
+
+}
diff --git a/ydb/core/tx/tiering/path_cleaner.cpp b/ydb/core/tx/tiering/path_cleaner.cpp
index df2a360be8..70e55ad362 100644
--- a/ydb/core/tx/tiering/path_cleaner.cpp
+++ b/ydb/core/tx/tiering/path_cleaner.cpp
@@ -20,9 +20,7 @@ void TPathCleaner::Handle(TEvTierCleared::TPtr& ev) {
NMetadataProvider::ISnapshotParser::TPtr TPathCleaner::GetTieringSnapshotParser() const {
if (!ExternalDataManipulation) {
- ExternalDataManipulation = std::make_shared<NTiers::TSnapshotConstructor>();
- auto edmPtr = std::dynamic_pointer_cast<NTiers::TSnapshotConstructor>(ExternalDataManipulation);
- edmPtr->Start(edmPtr);
+ ExternalDataManipulation = std::make_shared<NTiers::TSnapshotConstructor>(OwnerPath);
}
return ExternalDataManipulation;
}
@@ -48,8 +46,9 @@ void TPathCleaner::Bootstrap() {
Send(NMetadataProvider::MakeServiceId(SelfId().NodeId()), new NMetadataProvider::TEvSubscribeExternal(GetTieringSnapshotParser()));
}
-TPathCleaner::TPathCleaner(const ui64 pathId, NBackgroundTasks::ITaskExecutorController::TPtr controller)
+TPathCleaner::TPathCleaner(const ui64 pathId, const TString& ownerPath, NBackgroundTasks::ITaskExecutorController::TPtr controller)
: PathId(pathId)
+ , OwnerPath(ownerPath)
, Controller(controller) {
}
diff --git a/ydb/core/tx/tiering/path_cleaner.h b/ydb/core/tx/tiering/path_cleaner.h
index d938fcb76b..a83cfb479d 100644
--- a/ydb/core/tx/tiering/path_cleaner.h
+++ b/ydb/core/tx/tiering/path_cleaner.h
@@ -16,6 +16,7 @@ namespace NKikimr::NColumnShard::NTiers {
class TPathCleaner: public TActorBootstrapped<TPathCleaner> {
private:
YDB_READONLY(ui64, PathId, 0);
+ YDB_READONLY_DEF(TString, OwnerPath);
bool Truncated = false;
std::set<TString> TiersWait;
NBackgroundTasks::ITaskExecutorController::TPtr Controller;
@@ -25,7 +26,7 @@ protected:
void Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev);
void Handle(TEvTierCleared::TPtr& ev);
public:
- TPathCleaner(const ui64 pathId, NBackgroundTasks::ITaskExecutorController::TPtr controller);
+ TPathCleaner(const ui64 pathId, const TString& ownerPath, NBackgroundTasks::ITaskExecutorController::TPtr controller);
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::TX_TIERING_PATH_CLEANER;
diff --git a/ydb/core/tx/tiering/rule.cpp b/ydb/core/tx/tiering/rule.cpp
index 617c665a34..6bcb4fcbfa 100644
--- a/ydb/core/tx/tiering/rule.cpp
+++ b/ydb/core/tx/tiering/rule.cpp
@@ -3,7 +3,6 @@
namespace NKikimr::NColumnShard::NTiers {
NJson::TJsonValue TTieringRule::GetDebugJson() const {
NJson::TJsonValue result = NJson::JSON_MAP;
- result.InsertValue(TDecoder::OwnerPath, OwnerPath);
result.InsertValue(TDecoder::TierName, TierName);
result.InsertValue(TDecoder::TablePath, TablePath);
result.InsertValue("tablePathId", TablePathId);
@@ -15,6 +14,7 @@ NJson::TJsonValue TTieringRule::GetDebugJson() const {
NJson::TJsonValue TTableTiering::GetDebugJson() const {
NJson::TJsonValue result = NJson::JSON_MAP;
result.InsertValue(TTieringRule::TDecoder::TablePath, TablePath);
+ result.InsertValue("tablePathId", TablePathId);
result.InsertValue(TTieringRule::TDecoder::Column, Column);
auto&& jsonRules = result.InsertValue("rules", NJson::JSON_ARRAY);
for (auto&& i : Rules) {
diff --git a/ydb/core/tx/tiering/rule.h b/ydb/core/tx/tiering/rule.h
index 1187efc8ff..468ce5ec51 100644
--- a/ydb/core/tx/tiering/rule.h
+++ b/ydb/core/tx/tiering/rule.h
@@ -1,4 +1,5 @@
#pragma once
+#include "common.h"
#include "decoder.h"
#include <ydb/core/protos/flat_scheme_op.pb.h>
@@ -19,14 +20,15 @@ private:
YDB_ACCESSOR_DEF(TDuration, DurationForEvict);
public:
bool operator<(const TTieringRule& item) const {
- return std::tie(OwnerPath, TierName, TablePath, Column, DurationForEvict)
- < std::tie(item.OwnerPath, item.TierName, item.TablePath, item.Column, item.DurationForEvict);
+ return std::tie(TablePath, TierName, Column, DurationForEvict)
+ < std::tie(item.TablePath, item.TierName, item.Column, item.DurationForEvict);
}
- NJson::TJsonValue GetDebugJson() const;
- TString GetTierId() const {
- return OwnerPath + "." + TierName;
+ TGlobalTierId GetGlobalTierId() const {
+ return TGlobalTierId(OwnerPath, TierName);
}
+
+ NJson::TJsonValue GetDebugJson() const;
class TDecoder: public NInternal::TDecoderBase {
private:
YDB_READONLY(i32, OwnerPathIdx, -1);
@@ -53,6 +55,7 @@ public:
if (!decoder.Read(decoder.GetOwnerPathIdx(), OwnerPath, r)) {
return false;
}
+ OwnerPath = TFsPath(OwnerPath).Fix().GetPath();
if (!decoder.Read(decoder.GetTierNameIdx(), TierName, r)) {
return false;
}
diff --git a/ydb/core/tx/tiering/snapshot.cpp b/ydb/core/tx/tiering/snapshot.cpp
new file mode 100644
index 0000000000..25a1abf346
--- /dev/null
+++ b/ydb/core/tx/tiering/snapshot.cpp
@@ -0,0 +1,103 @@
+#include "snapshot.h"
+
+#include <ydb/core/base/path.h>
+
+#include <library/cpp/json/writer/json_value.h>
+#include <library/cpp/protobuf/json/proto2json.h>
+
+#include <util/string/join.h>
+
+namespace NKikimr::NColumnShard::NTiers {
+
+bool TConfigsSnapshot::DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawDataResult) {
+ Y_VERIFY(rawDataResult.result_sets().size() == 2);
+ {
+ auto& rawData = rawDataResult.result_sets()[0];
+ TTierConfig::TDecoder decoder(rawData);
+ for (auto&& r : rawData.rows()) {
+ TTierConfig config;
+ if (!config.DeserializeFromRecord(decoder, r)) {
+ ALS_ERROR(NKikimrServices::TX_TIERING) << "cannot parse tier config from snapshot";
+ continue;
+ }
+ TierConfigs.emplace(config.GetGlobalTierId(), config);
+ }
+ }
+ {
+ auto& rawData = rawDataResult.result_sets()[1];
+ TTieringRule::TDecoder decoder(rawData);
+ TVector<TTieringRule> rulesLocal;
+ rulesLocal.reserve(rawData.rows().size());
+ for (auto&& r : rawData.rows()) {
+ TTieringRule tr;
+ if (!tr.DeserializeFromRecord(decoder, r)) {
+ ALS_WARN(NKikimrServices::TX_TIERING) << "cannot parse record for tiering info";
+ continue;
+ }
+ rulesLocal.emplace_back(std::move(tr));
+ }
+ std::sort(rulesLocal.begin(), rulesLocal.end());
+ for (auto&& i : rulesLocal) {
+ TableTierings[i.GetTablePath()].AddRule(std::move(i));
+ }
+ }
+ return true;
+}
+
+std::optional<TTierConfig> TConfigsSnapshot::GetValue(const TGlobalTierId& key) const {
+ auto it = TierConfigs.find(key);
+ if (it == TierConfigs.end()) {
+ return {};
+ } else {
+ return it->second;
+ }
+}
+
+void TConfigsSnapshot::RemapTablePathToId(const TString& path, const ui64 pathId) {
+ auto it = TableTierings.find(path);
+ Y_VERIFY(it != TableTierings.end());
+ it->second.SetTablePathId(pathId);
+}
+
+const TTableTiering* TConfigsSnapshot::GetTableTiering(const TString& tablePath) const {
+ auto it = TableTierings.find(tablePath);
+ if (it == TableTierings.end()) {
+ return nullptr;
+ } else {
+ return &it->second;
+ }
+}
+
+std::vector<NKikimr::NColumnShard::NTiers::TTierConfig> TConfigsSnapshot::GetTiersForPathId(const ui64 pathId) const {
+ std::vector<TTierConfig> result;
+ std::set<TGlobalTierId> readyIds;
+ for (auto&& i : TableTierings) {
+ for (auto&& r : i.second.GetRules()) {
+ if (r.GetTablePathId() == pathId) {
+ auto it = TierConfigs.find(r.GetGlobalTierId());
+ if (it == TierConfigs.end()) {
+ ALS_ERROR(NKikimrServices::TX_TIERING) << "inconsistency tiering for " << r.GetGlobalTierId().ToString();
+ continue;
+ } else if (readyIds.emplace(r.GetGlobalTierId()).second) {
+ result.emplace_back(it->second);
+ }
+ }
+ }
+ }
+ return result;
+}
+
+TString NTiers::TConfigsSnapshot::DoSerializeToString() const {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ auto& jsonTiers = result.InsertValue("tiers", NJson::JSON_MAP);
+ for (auto&& i : TierConfigs) {
+ jsonTiers.InsertValue(i.first.ToString(), i.second.GetDebugJson());
+ }
+ auto& jsonTiering = result.InsertValue("rules", NJson::JSON_MAP);
+ for (auto&& i : TableTierings) {
+ jsonTiering.InsertValue(i.first, i.second.GetDebugJson());
+ }
+ return result.GetStringRobust();
+}
+
+}
diff --git a/ydb/core/tx/tiering/snapshot.h b/ydb/core/tx/tiering/snapshot.h
new file mode 100644
index 0000000000..b913dacbb6
--- /dev/null
+++ b/ydb/core/tx/tiering/snapshot.h
@@ -0,0 +1,31 @@
+#pragma once
+#include "rule.h"
+#include "tier_config.h"
+
+#include <ydb/core/protos/flat_scheme_op.pb.h>
+#include <ydb/core/tx/scheme_cache/scheme_cache.h>
+#include <ydb/services/metadata/service.h>
+
+#include <library/cpp/json/writer/json_value.h>
+
+namespace NKikimr::NColumnShard::NTiers {
+
+class TConfigsSnapshot: public NMetadataProvider::ISnapshot {
+private:
+ using TBase = NMetadataProvider::ISnapshot;
+ using TConfigsMap = TMap<TGlobalTierId, TTierConfig>;
+ YDB_ACCESSOR_DEF(TConfigsMap, TierConfigs);
+ using TTieringMap = TMap<TString, TTableTiering>;
+ YDB_ACCESSOR_DEF(TTieringMap, TableTierings);
+protected:
+ virtual bool DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawData) override;
+ virtual TString DoSerializeToString() const override;
+public:
+ std::vector<TTierConfig> GetTiersForPathId(const ui64 pathId) const;
+ const TTableTiering* GetTableTiering(const TString& tablePath) const;
+ void RemapTablePathToId(const TString& path, const ui64 pathId);
+ std::optional<TTierConfig> GetValue(const TGlobalTierId& key) const;
+ using TBase::TBase;
+};
+
+}
diff --git a/ydb/core/tx/tiering/snapshot_enrich.cpp b/ydb/core/tx/tiering/snapshot_enrich.cpp
new file mode 100644
index 0000000000..3f7a0809f0
--- /dev/null
+++ b/ydb/core/tx/tiering/snapshot_enrich.cpp
@@ -0,0 +1,65 @@
+#include "snapshot_enrich.h"
+#include "snapshot.h"
+
+#include <ydb/core/base/path.h>
+
+#include <util/string/join.h>
+
+namespace NKikimr::NColumnShard::NTiers {
+
+void TActorSnapshotEnrich::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
+ auto current = std::dynamic_pointer_cast<TConfigsSnapshot>(Original);
+ Y_VERIFY(current);
+ auto g = PassAwayGuard();
+
+ auto* info = ev->Get();
+ const auto& request = info->Request;
+
+ if (request->ResultSet.empty()) {
+ Controller->EnrichProblem("cannot resolve table path to ids");
+ return;
+ }
+
+ for (auto&& i : request->ResultSet) {
+ const TString path = "/" + JoinSeq("/", i.Path);
+ switch (i.Status) {
+ case TNavigate::EStatus::Ok:
+ Cache->MutableRemapper().emplace(path, i.TableId.PathId.LocalPathId);
+ current->RemapTablePathToId(path, i.TableId.PathId.LocalPathId);
+ break;
+ default:
+ Controller->EnrichProblem(::ToString(i.Status) + " for '" + path + "'");
+ return;
+ }
+ }
+ for (auto&& i : current->GetTableTierings()) {
+ Y_VERIFY(i.second.GetTablePathId());
+ }
+ Controller->Enriched(Original);
+}
+
+void TActorSnapshotEnrich::Bootstrap() {
+ Become(&TThis::StateMain);
+ auto current = std::dynamic_pointer_cast<TConfigsSnapshot>(Original);
+ Y_VERIFY(current);
+ const auto& remapper = Cache->GetRemapper();
+ auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
+ request->DatabaseName = NKikimr::CanonizePath(AppData()->TenantName);
+ for (auto&& i : current->GetTableTierings()) {
+ auto it = remapper.find(i.second.GetTablePath());
+ if (it != remapper.end()) {
+ current->RemapTablePathToId(it->first, it->second);
+ } else {
+ auto& entry = request->ResultSet.emplace_back();
+ entry.Operation = TNavigate::OpTable;
+ entry.Path = NKikimr::SplitPath(i.second.GetTablePath());
+ }
+ }
+ if (request->ResultSet.empty()) {
+ Controller->Enriched(Original);
+ } else {
+ Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release()));
+ }
+}
+
+}
diff --git a/ydb/core/tx/tiering/snapshot_enrich.h b/ydb/core/tx/tiering/snapshot_enrich.h
new file mode 100644
index 0000000000..eca0eb893c
--- /dev/null
+++ b/ydb/core/tx/tiering/snapshot_enrich.h
@@ -0,0 +1,55 @@
+#pragma once
+#include "rule.h"
+#include "tier_config.h"
+
+#include <ydb/core/protos/flat_scheme_op.pb.h>
+#include <ydb/core/tx/scheme_cache/scheme_cache.h>
+#include <ydb/services/metadata/service.h>
+
+#include <library/cpp/json/writer/json_value.h>
+
+namespace NKikimr::NColumnShard::NTiers {
+
+class TTablesDecoderCache {
+private:
+ using TRemapper = TMap<TString, ui64>;
+ YDB_ACCESSOR_DEF(TRemapper, Remapper);
+public:
+ using TPtr = std::shared_ptr<TTablesDecoderCache>;
+};
+
+class TActorSnapshotEnrich: public TActorBootstrapped<TActorSnapshotEnrich> {
+private:
+ using TBase = TActorBootstrapped<TActorSnapshotEnrich>;
+ using TNavigate = NSchemeCache::TSchemeCacheNavigate;
+ NMetadataProvider::ISnapshot::TPtr Original;
+ NMetadataProvider::ISnapshotAcceptorController::TPtr Controller;
+ TTablesDecoderCache::TPtr Cache;
+private:
+ void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
+public:
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::TIERING_SNAPSHOT_ENRICH;
+ }
+
+ STATEFN(StateMain) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
+ default:
+ break;
+ }
+ }
+
+ TActorSnapshotEnrich(NMetadataProvider::ISnapshot::TPtr original,
+ NMetadataProvider::ISnapshotAcceptorController::TPtr controller, TTablesDecoderCache::TPtr cache)
+ : Original(original)
+ , Controller(controller)
+ , Cache(cache)
+ {
+
+ }
+
+ void Bootstrap();
+};
+
+}
diff --git a/ydb/core/tx/tiering/tier_config.cpp b/ydb/core/tx/tiering/tier_config.cpp
index 19e5af26bb..1e4b7085d0 100644
--- a/ydb/core/tx/tiering/tier_config.cpp
+++ b/ydb/core/tx/tiering/tier_config.cpp
@@ -20,6 +20,7 @@ bool TTierConfig::DeserializeFromRecord(const TDecoder& decoder, const Ydb::Valu
if (!decoder.Read(decoder.GetOwnerPathIdx(), OwnerPath, r)) {
return false;
}
+ OwnerPath = TFsPath(OwnerPath).Fix().GetPath();
if (!decoder.Read(decoder.GetTierNameIdx(), TierName, r)) {
return false;
}
diff --git a/ydb/core/tx/tiering/tier_config.h b/ydb/core/tx/tiering/tier_config.h
index 78175ff980..4a36df4ab0 100644
--- a/ydb/core/tx/tiering/tier_config.h
+++ b/ydb/core/tx/tiering/tier_config.h
@@ -1,5 +1,7 @@
#pragma once
+#include "common.h"
#include "decoder.h"
+
#include <ydb/services/metadata/service.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>
@@ -41,8 +43,8 @@ public:
};
bool DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& r);
- TString GetConfigId() const {
- return OwnerPath + "." + TierName;
+ TGlobalTierId GetGlobalTierId() const {
+ return TGlobalTierId(OwnerPath, TierName);
}
bool NeedExport() const {
diff --git a/ydb/core/tx/tiering/ut/ut_tiers.cpp b/ydb/core/tx/tiering/ut/ut_tiers.cpp
index d8183fa43a..87657e20eb 100644
--- a/ydb/core/tx/tiering/ut/ut_tiers.cpp
+++ b/ydb/core/tx/tiering/ut/ut_tiers.cpp
@@ -73,6 +73,8 @@ public:
Y_UNIT_TEST_SUITE(ColumnShardTiers) {
const TString ConfigProtoStr = "Name : \"abc\"";
+ const TString ConfigProtoStr1 = "Name : \"abc1\"";
+ const TString ConfigProtoStr2 = "Name : \"abc2\"";
class TJsonChecker {
private:
@@ -109,7 +111,7 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
TInstant Start;
YDB_READONLY_FLAG(Found, false);
YDB_ACCESSOR(ui32, TieringsCount, 1);
- using TKeyCheckers = TMap<TString, TJsonChecker>;
+ using TKeyCheckers = TMap<NTiers::TGlobalTierId, TJsonChecker>;
YDB_ACCESSOR_DEF(TKeyCheckers, Checkers);
public:
STATEFN(StateInit) {
@@ -175,7 +177,7 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
NJson::TJsonValue jsonData;
NProtobufJson::Proto2Json(value->GetProtoConfig(), jsonData);
if (!i.second.Check(jsonData)) {
- Cerr << "config value incorrect:" << snapshot->SerializeToString() << ";snapshot_check_path=" << i.first << Endl;
+ Cerr << "config value incorrect:" << snapshot->SerializeToString() << ";snapshot_check_path=" << i.first.ToString() << Endl;
Cerr << "json path incorrect:" << jsonData << ";" << i.second.GetDebugString() << Endl;
return;
}
@@ -189,8 +191,7 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
void Bootstrap() {
ProviderId = NMetadataProvider::MakeServiceId(1);
- ExternalDataManipulation = std::make_shared<NTiers::TSnapshotConstructor>();
- ExternalDataManipulation->Start(ExternalDataManipulation);
+ ExternalDataManipulation = std::make_shared<NTiers::TSnapshotConstructor>("/Root/olapStore");
Become(&TThis::StateInit);
Sender<NMetadataProvider::TEvSubscribeExternal>(ExternalDataManipulation).SendTo(ProviderId);
Start = Now();
@@ -226,17 +227,17 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
lHelper.CreateTestOlapTable();
{
TTestCSEmulator* emulator = new TTestCSEmulator;
- emulator->MutableCheckers().emplace("/Root/olapStore.tier1", TJsonChecker("Name", "abc"));
+ emulator->MutableCheckers().emplace(NTiers::TGlobalTierId("/Root/olapStore", "tier1"), TJsonChecker("Name", "abc"));
runtime.Register(emulator);
runtime.SimulateSleep(TDuration::Seconds(10));
Cerr << "Initialization finished" << Endl;
- lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) "
+ lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/tiers` (ownerPath, tierName, tierConfig) "
"VALUES ('/Root/olapStore', 'tier1', '" + ConfigProtoStr + "')");
- lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiering` (ownerPath, tierName, tablePath, column, durationForEvict) "
+ lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/rules` (ownerPath, tierName, tablePath, column, durationForEvict) "
"VALUES ('/Root/olapStore', 'tier1', '/Root/olapStore/olapTable', 'timestamp', '10d')");
const TInstant start = Now();
- while (!emulator->IsFound() && Now() - start < TDuration::Seconds(20)) {
+ while (!emulator->IsFound() && Now() - start < TDuration::Seconds(2000)) {
runtime.SimulateSleep(TDuration::Seconds(1));
}
Y_VERIFY(emulator->IsFound());
@@ -275,31 +276,31 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
runtime.SimulateSleep(TDuration::Seconds(10));
Cerr << "Initialization finished" << Endl;
- lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) "
- "VALUES ('/Root/olapStore', 'tier1', '" + ConfigProtoStr + "')");
- lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiering` (ownerPath, tierName, tablePath, column, durationForEvict) "
+ lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/tiers` (ownerPath, tierName, tierConfig) "
+ "VALUES ('/Root/olapStore', 'tier1', '" + ConfigProtoStr1 + "')");
+ lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/rules` (ownerPath, tierName, tablePath, column, durationForEvict) "
"VALUES ('/Root/olapStore', 'tier1', '/Root/olapStore/olapTable', 'timestamp', '10d')");
{
TTestCSEmulator emulator;
- emulator.MutableCheckers().emplace("/Root/olapStore.tier1", TJsonChecker("Name", "abc"));
+ emulator.MutableCheckers().emplace(NTiers::TGlobalTierId("/Root/olapStore", "tier1"), TJsonChecker("Name", "abc1"));
emulator.SetTieringsCount(1);
emulator.CheckRuntime(runtime);
}
- lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) "
- "VALUES ('/Root/olapStore', 'tier2', '" + ConfigProtoStr + "')");
- lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiering` (ownerPath, tierName, tablePath, column, durationForEvict) "
+ lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/tiers` (ownerPath, tierName, tierConfig) "
+ "VALUES ('/Root/olapStore', 'tier2', '" + ConfigProtoStr2 + "')");
+ lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/rules` (ownerPath, tierName, tablePath, column, durationForEvict) "
"VALUES ('/Root/olapStore', 'tier2', '/Root/olapStore/olapTable', 'timestamp', '20d')");
{
TTestCSEmulator emulator;
- emulator.MutableCheckers().emplace("/Root/olapStore.tier1", TJsonChecker("Name", "abc"));
- emulator.MutableCheckers().emplace("/Root/olapStore.tier2", TJsonChecker("Name", "abc"));
+ emulator.MutableCheckers().emplace(NTiers::TGlobalTierId("/Root/olapStore", "tier1"), TJsonChecker("Name", "abc1"));
+ emulator.MutableCheckers().emplace(NTiers::TGlobalTierId("/Root/olapStore", "tier2"), TJsonChecker("Name", "abc2"));
emulator.SetTieringsCount(2);
emulator.CheckRuntime(runtime);
}
- lHelper.StartDataRequest("DELETE FROM `/Root/.external_data/tiers`");
- lHelper.StartDataRequest("DELETE FROM `/Root/.external_data/tiering`");
+ lHelper.StartDataRequest("DELETE FROM `/Root/.configs/tiering/tiers`");
+ lHelper.StartDataRequest("DELETE FROM `/Root/.configs/tiering/rules`");
{
TTestCSEmulator emulator;
emulator.SetTieringsCount(0);
@@ -376,18 +377,18 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
runtime.SimulateSleep(TDuration::Seconds(20));
Cerr << "Initialization finished" << Endl;
- lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) "
+ lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/tiers` (ownerPath, tierName, tierConfig) "
"VALUES ('/Root/olapStore', 'fakeTier1', '" + TierConfigProtoStr + "')");
- lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) "
+ lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/tiers` (ownerPath, tierName, tierConfig) "
"VALUES ('/Root/olapStore', 'fakeTier2', '" + TierConfigProtoStr + "')");
- lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiering` (ownerPath, tierName, tablePath, column, durationForEvict) "
+ lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/rules` (ownerPath, tierName, tablePath, column, durationForEvict) "
"VALUES ('/Root/olapStore', 'fakeTier1', '/Root/olapStore/olapTable', 'timestamp', '10d')");
- lHelper.StartDataRequest("INSERT INTO `/Root/.external_data/tiering` (ownerPath, tierName, tablePath, column, durationForEvict) "
+ lHelper.StartDataRequest("INSERT INTO `/Root/.configs/tiering/rules` (ownerPath, tierName, tablePath, column, durationForEvict) "
"VALUES ('/Root/olapStore', 'fakeTier2', '/Root/olapStore/olapTable', 'timestamp', '20d')");
{
TTestCSEmulator emulator;
- emulator.MutableCheckers().emplace("/Root/olapStore.fakeTier1", TJsonChecker("Name", "fakeTier"));
- emulator.MutableCheckers().emplace("/Root/olapStore.fakeTier2", TJsonChecker("ObjectStorage.Endpoint", TierEndpoint));
+ emulator.MutableCheckers().emplace(NTiers::TGlobalTierId("/Root/olapStore", "fakeTier1"), TJsonChecker("Name", "fakeTier"));
+ emulator.MutableCheckers().emplace(NTiers::TGlobalTierId("/Root/olapStore", "fakeTier2"), TJsonChecker("ObjectStorage.Endpoint", TierEndpoint));
emulator.SetTieringsCount(2);
emulator.CheckRuntime(runtime);
}
diff --git a/ydb/services/bg_tasks/ds_table/executor.cpp b/ydb/services/bg_tasks/ds_table/executor.cpp
index 252e8c7952..5cae73625f 100644
--- a/ydb/services/bg_tasks/ds_table/executor.cpp
+++ b/ydb/services/bg_tasks/ds_table/executor.cpp
@@ -8,9 +8,9 @@
namespace NKikimr::NBackgroundTasks {
-TVector<NKikimr::NMetadataProvider::ITableModifier::TPtr> TExecutor::BuildModifiers() const {
+TVector<NMetadataInitializer::ITableModifier::TPtr> TExecutor::BuildModifiers() const {
const TString tableName = Config.GetTablePath();
- TVector<NMetadataProvider::ITableModifier::TPtr> result;
+ TVector<NMetadataInitializer::ITableModifier::TPtr> result;
{
Ydb::Table::CreateTableRequest request;
request.set_session_id("");
@@ -66,7 +66,7 @@ TVector<NKikimr::NMetadataProvider::ITableModifier::TPtr> TExecutor::BuildModifi
column.set_name("state");
column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
}
- result.emplace_back(new NMetadataProvider::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request));
+ result.emplace_back(new NMetadataInitializer::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request));
}
return result;
}
diff --git a/ydb/services/bg_tasks/ds_table/executor.h b/ydb/services/bg_tasks/ds_table/executor.h
index 4b84cd589e..b15d25f6c9 100644
--- a/ydb/services/bg_tasks/ds_table/executor.h
+++ b/ydb/services/bg_tasks/ds_table/executor.h
@@ -51,9 +51,9 @@ public:
}
};
-class TExecutor: public NMetadataProvider::TDSAccessorInitialized {
+class TExecutor: public NMetadataInitializer::TDSAccessorInitialized {
private:
- using TBase = NMetadataProvider::TDSAccessorInitialized;
+ using TBase = NMetadataInitializer::TDSAccessorInitialized;
TString TableName;
const TString ExecutorId = TGUID::CreateTimebased().AsUuidString();
const TConfig Config;
@@ -62,7 +62,10 @@ private:
protected:
virtual void RegisterState() override;
virtual void OnInitialized() override;
- virtual TVector<NMetadataProvider::ITableModifier::TPtr> BuildModifiers() const override;
+ virtual void Prepare(NMetadataInitializer::IController::TPtr controller) override {
+ controller->PreparationFinished(BuildModifiers());
+ }
+ TVector<NMetadataInitializer::ITableModifier::TPtr> BuildModifiers() const;
void Handle(TEvStartAssign::TPtr& ev);
void Handle(TEvAssignFinished::TPtr& ev);
diff --git a/ydb/services/metadata/abstract/common.h b/ydb/services/metadata/abstract/common.h
index 7da8f03317..259a384bb7 100644
--- a/ydb/services/metadata/abstract/common.h
+++ b/ydb/services/metadata/abstract/common.h
@@ -15,7 +15,8 @@ namespace NKikimr::NMetadataProvider {
enum EEvSubscribe {
EvRefreshSubscriberData = EventSpaceBegin(TKikimrEvents::ES_METADATA_PROVIDER),
EvRefresh,
- EvEnrichSnapshot,
+ EvEnrichSnapshotResult,
+ EvEnrichSnapshotProblem,
EvSubscribeLocal,
EvUnsubscribeLocal,
EvSubscribeExternal,
@@ -25,6 +26,16 @@ enum EEvSubscribe {
static_assert(EEvSubscribe::EvEnd < EventSpaceEnd(TKikimrEvents::ES_METADATA_PROVIDER), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_METADATA_PROVIDER)");
+class ISnapshot;
+
+class ISnapshotAcceptorController {
+public:
+ using TPtr = std::shared_ptr<ISnapshotAcceptorController>;
+ virtual ~ISnapshotAcceptorController() = default;
+ virtual void Enriched(std::shared_ptr<ISnapshot> enrichedSnapshot) = 0;
+ virtual void EnrichProblem(const TString& errorMessage) = 0;
+};
+
class ISnapshot {
private:
YDB_READONLY_DEF(TInstant, Actuality);
@@ -52,7 +63,7 @@ public:
class ISnapshotParser {
protected:
virtual ISnapshot::TPtr CreateSnapshot(const TInstant actuality) const = 0;
- virtual TVector<ITableModifier::TPtr> DoGetTableSchema() const = 0;
+ virtual void DoPrepare(NMetadataInitializer::IController::TPtr controller) const = 0;
virtual const TVector<TString>& DoGetTables() const = 0;
mutable std::optional<TString> SnapshotId;
public:
@@ -60,12 +71,13 @@ public:
TString GetSnapshotId() const;
ISnapshot::TPtr ParseSnapshot(const Ydb::Table::ExecuteQueryResult& rawData, const TInstant actuality) const;
- TVector<ITableModifier::TPtr> GetTableSchema() const {
- return DoGetTableSchema();
+
+ void Prepare(NMetadataInitializer::IController::TPtr controller) const {
+ return DoPrepare(controller);
}
- virtual NThreading::TFuture<ISnapshot::TPtr> EnrichSnapshotData(ISnapshot::TPtr original) const {
- return NThreading::MakeFuture(original);
+ virtual void EnrichSnapshotData(ISnapshot::TPtr original, ISnapshotAcceptorController::TPtr controller) const {
+ controller->Enriched(original);
}
const TVector<TString>& GetTables() const {
diff --git a/ydb/services/metadata/ds_table/accessor_refresh.cpp b/ydb/services/metadata/ds_table/accessor_refresh.cpp
index 195c700d41..e89bd66d28 100644
--- a/ydb/services/metadata/ds_table/accessor_refresh.cpp
+++ b/ydb/services/metadata/ds_table/accessor_refresh.cpp
@@ -24,16 +24,7 @@ void TDSAccessorRefresher::Handle(TEvRequestResult<TDialogSelect>::TPtr& ev) {
if (!!parsedSnapshot && CurrentSelection.SerializeAsString() != startString) {
ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "New refresher data: " << CurrentSelection.DebugString();
- NActors::TActorIdentity actorId = SelfId();
- SnapshotConstructor->EnrichSnapshotData(parsedSnapshot).Subscribe(
- [actorId](NThreading::TFuture<ISnapshot::TPtr> f) {
- if (f.HasValue() && !f.HasException()) {
- actorId.Send(actorId, new TEvEnrichSnapshotResult(f.GetValueSync()));
- } else {
- actorId.Send(actorId, new TEvEnrichSnapshotResult("cannot enrich snapshot"));
- }
- }
- );
+ SnapshotConstructor->EnrichSnapshotData(parsedSnapshot, GetController());
} else {
Schedule(Config.GetRefreshPeriod(), new TEvRefresh());
}
@@ -42,12 +33,14 @@ void TDSAccessorRefresher::Handle(TEvRequestResult<TDialogSelect>::TPtr& ev) {
void TDSAccessorRefresher::Handle(TEvEnrichSnapshotResult::TPtr& ev) {
RequestedActuality = TInstant::Zero();
Schedule(Config.GetRefreshPeriod(), new TEvRefresh());
- if (ev->Get()->IsSuccess()) {
- CurrentSnapshot = ev->Get()->GetEnrichedSnapshot();
- OnSnapshotModified();
- } else {
- ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "enrich problem: " << ev->Get()->GetErrorText();
- }
+ CurrentSnapshot = ev->Get()->GetEnrichedSnapshot();
+ OnSnapshotModified();
+}
+
+void TDSAccessorRefresher::Handle(TEvEnrichSnapshotProblem::TPtr& ev) {
+ RequestedActuality = TInstant::Zero();
+ Schedule(Config.GetRefreshPeriod(), new TEvRefresh());
+ ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "enrich problem: " << ev->Get()->GetErrorText();
}
void TDSAccessorRefresher::Handle(TEvRequestResult<TDialogCreateSession>::TPtr& ev) {
@@ -84,10 +77,14 @@ TDSAccessorRefresher::TDSAccessorRefresher(const TConfig& config, ISnapshotParse
, SnapshotConstructor(snapshotConstructor)
, Config(config)
{
+
}
-TVector<NKikimr::NMetadataProvider::ITableModifier::TPtr> TDSAccessorRefresher::BuildModifiers() const {
- return SnapshotConstructor->GetTableSchema();
+ISnapshotAcceptorController::TPtr TDSAccessorRefresher::GetController() const {
+ if (!ControllerImpl) {
+ ControllerImpl = std::make_shared<TSnapshotAcceptorController>(SelfId());
+ }
+ return ControllerImpl;
}
}
diff --git a/ydb/services/metadata/ds_table/accessor_refresh.h b/ydb/services/metadata/ds_table/accessor_refresh.h
index 39da63b99a..920afdbcfb 100644
--- a/ydb/services/metadata/ds_table/accessor_refresh.h
+++ b/ydb/services/metadata/ds_table/accessor_refresh.h
@@ -13,49 +13,73 @@ class TEvRefresh: public NActors::TEventLocal<TEvRefresh, EEvSubscribe::EvRefres
public:
};
-class TEvEnrichSnapshotResult: public NActors::TEventLocal<TEvEnrichSnapshotResult, EEvSubscribe::EvEnrichSnapshot> {
+class TEvEnrichSnapshotResult: public NActors::TEventLocal<TEvEnrichSnapshotResult, EEvSubscribe::EvEnrichSnapshotResult> {
private:
- YDB_READONLY_FLAG(Success, false);
- YDB_READONLY_DEF(TString, ErrorText);
YDB_READONLY_DEF(ISnapshot::TPtr, EnrichedSnapshot);
public:
- TEvEnrichSnapshotResult(const TString& errorText)
+ TEvEnrichSnapshotResult(ISnapshot::TPtr snapshot)
+ : EnrichedSnapshot(snapshot) {
+
+ }
+};
+
+class TEvEnrichSnapshotProblem: public NActors::TEventLocal<TEvEnrichSnapshotProblem, EEvSubscribe::EvEnrichSnapshotProblem> {
+private:
+ YDB_READONLY_DEF(TString, ErrorText);
+public:
+ TEvEnrichSnapshotProblem(const TString& errorText)
: ErrorText(errorText) {
}
+};
- TEvEnrichSnapshotResult(ISnapshot::TPtr snapshot)
- : SuccessFlag(true)
- , EnrichedSnapshot(snapshot)
- {
+class TSnapshotAcceptorController: public ISnapshotAcceptorController {
+private:
+ const TActorIdentity ActorId;
+public:
+ TSnapshotAcceptorController(const TActorIdentity& actorId)
+ : ActorId(actorId) {
+
+ }
+
+ virtual void EnrichProblem(const TString& errorMessage) override {
+ ActorId.Send(ActorId, new TEvEnrichSnapshotProblem(errorMessage));
+ }
+ virtual void Enriched(ISnapshot::TPtr enrichedSnapshot) override {
+ ActorId.Send(ActorId, new TEvEnrichSnapshotResult(enrichedSnapshot));
}
};
-class TDSAccessorRefresher: public TDSAccessorInitialized {
+class TDSAccessorRefresher: public NMetadataInitializer::TDSAccessorInitialized {
private:
- using TBase = TDSAccessorInitialized;
+ using TBase = NMetadataInitializer::TDSAccessorInitialized;
ISnapshotParser::TPtr SnapshotConstructor;
YDB_READONLY_DEF(ISnapshot::TPtr, CurrentSnapshot);
YDB_READONLY_DEF(Ydb::Table::ExecuteQueryResult, CurrentSelection);
TInstant RequestedActuality = TInstant::Zero();
const TConfig Config;
+
+ mutable ISnapshotAcceptorController::TPtr ControllerImpl;
+
+ ISnapshotAcceptorController::TPtr GetController() const;
protected:
+ virtual void Prepare(NMetadataInitializer::IController::TPtr controller) override {
+ SnapshotConstructor->Prepare(controller);
+ }
bool IsReady() const {
return !!CurrentSnapshot;
}
virtual void OnInitialized() override;
virtual void OnSnapshotModified() = 0;
- virtual TVector<NMetadataProvider::ITableModifier::TPtr> BuildModifiers() const override;
public:
- using TBase::Handle;
-
STFUNC(StateMain) {
switch (ev->GetTypeRewrite()) {
hFunc(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogSelect>, Handle);
hFunc(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogCreateSession>, Handle);
hFunc(TEvRefresh, Handle);
hFunc(TEvEnrichSnapshotResult, Handle);
+ hFunc(TEvEnrichSnapshotProblem, Handle);
default:
TBase::StateMain(ev, ctx);
}
@@ -64,6 +88,7 @@ public:
TDSAccessorRefresher(const TConfig& config, ISnapshotParser::TPtr snapshotConstructor);
void Handle(TEvEnrichSnapshotResult::TPtr& ev);
+ void Handle(TEvEnrichSnapshotProblem::TPtr& ev);
void Handle(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogSelect>::TPtr& ev);
void Handle(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogCreateSession>::TPtr& ev);
void Handle(TEvRefresh::TPtr& ev);
diff --git a/ydb/services/metadata/ds_table/service.cpp b/ydb/services/metadata/ds_table/service.cpp
index faa76a1587..d75d258450 100644
--- a/ydb/services/metadata/ds_table/service.cpp
+++ b/ydb/services/metadata/ds_table/service.cpp
@@ -20,11 +20,7 @@ void TService::Handle(TEvSubscribeExternal::TPtr& ev) {
THolder<TExternalData> actor = MakeHolder<TExternalData>(Config, ev->Get()->GetSnapshotParser());
it = Accessors.emplace(ev->Get()->GetSnapshotParser()->GetSnapshotId(), Register(actor.Release())).first;
}
- if (!!ev->Get()->GetSubscriberId()) {
- Send<TEvSubscribe>(it->second, ev->Get()->GetSubscriberId());
- } else {
- Send<TEvSubscribe>(it->second, ev->Sender);
- }
+ Send<TEvSubscribe>(it->second, ev->Sender);
}
void TService::Handle(TEvUnsubscribeExternal::TPtr& ev) {
diff --git a/ydb/services/metadata/initializer/CMakeLists.txt b/ydb/services/metadata/initializer/CMakeLists.txt
index 55e553e783..d3b624ff7c 100644
--- a/ydb/services/metadata/initializer/CMakeLists.txt
+++ b/ydb/services/metadata/initializer/CMakeLists.txt
@@ -21,4 +21,6 @@ target_link_libraries(services-metadata-initializer PUBLIC
target_sources(services-metadata-initializer PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/metadata/initializer/accessor_init.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/initializer/common.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/initializer/controller.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/initializer/events.cpp
)
diff --git a/ydb/services/metadata/initializer/accessor_init.cpp b/ydb/services/metadata/initializer/accessor_init.cpp
index 7536ff85f7..9f03393e60 100644
--- a/ydb/services/metadata/initializer/accessor_init.cpp
+++ b/ydb/services/metadata/initializer/accessor_init.cpp
@@ -1,25 +1,41 @@
#include "accessor_init.h"
+#include "controller.h"
+
#include <ydb/core/grpc_services/local_rpc/local_rpc.h>
-namespace NKikimr::NMetadataProvider {
+namespace NKikimr::NMetadataInitializer {
void TDSAccessorInitialized::Bootstrap() {
RegisterState();
- auto modifiers = BuildModifiers();
+ Controller = std::make_shared<TInitializerController>(SelfId());
+ Send(SelfId(), new TEvInitializerPreparationStart);
+}
+
+void TDSAccessorInitialized::Handle(TEvInitializerPreparationStart::TPtr& /*ev*/) {
+ Prepare(Controller);
+}
+
+void TDSAccessorInitialized::Handle(TEvInitializerPreparationFinished::TPtr& ev) {
+ auto modifiers = ev->Get()->GetModifiers();
for (auto&& i : modifiers) {
Modifiers.emplace_back(i);
}
if (Modifiers.size()) {
- ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "modifiers count: " << Modifiers.size();
+ ALS_INFO(NKikimrServices::METADATA_INITIALIZER) << "modifiers count: " << Modifiers.size();
Modifiers.front()->Execute(SelfId(), Config);
} else {
- ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "initialization finished";
+ ALS_INFO(NKikimrServices::METADATA_INITIALIZER) << "initialization finished";
OnInitialized();
}
}
+void TDSAccessorInitialized::Handle(TEvInitializerPreparationProblem::TPtr& ev) {
+ ALS_ERROR(NKikimrServices::METADATA_INITIALIZER) << "preparation problems: " << ev->Get()->GetErrorMessage();
+ Schedule(TDuration::Seconds(1), new TEvInitializerPreparationStart);
+}
+
void TDSAccessorInitialized::Handle(NInternal::NRequest::TEvRequestFinished::TPtr& /*ev*/) {
- ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "modifiers count: " << Modifiers.size();
+ ALS_INFO(NKikimrServices::METADATA_INITIALIZER) << "modifiers count: " << Modifiers.size();
if (Modifiers.empty()) {
return;
}
@@ -27,7 +43,7 @@ void TDSAccessorInitialized::Handle(NInternal::NRequest::TEvRequestFinished::TPt
if (Modifiers.size()) {
Modifiers.front()->Execute(SelfId(), Config);
} else {
- ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "initialization finished";
+ ALS_INFO(NKikimrServices::METADATA_INITIALIZER) << "initialization finished";
OnInitialized();
}
}
diff --git a/ydb/services/metadata/initializer/accessor_init.h b/ydb/services/metadata/initializer/accessor_init.h
index b7086c12fe..c558e05df3 100644
--- a/ydb/services/metadata/initializer/accessor_init.h
+++ b/ydb/services/metadata/initializer/accessor_init.h
@@ -1,5 +1,6 @@
#pragma once
#include "common.h"
+#include "events.h"
#include <ydb/services/metadata/ds_table/config.h>
@@ -8,24 +9,36 @@
#include <library/cpp/threading/future/core/future.h>
#include <library/cpp/actors/core/av_bootstrapped.h>
-namespace NKikimr::NMetadataProvider {
+namespace NKikimr::NMetadataInitializer {
class TDSAccessorInitialized: public NActors::TActorBootstrapped<TDSAccessorInitialized> {
private:
TDeque<ITableModifier::TPtr> Modifiers;
const NInternal::NRequest::TConfig Config;
+ IController::TPtr Controller;
+
+ void Handle(TEvInitializerPreparationStart::TPtr& ev);
+ void Handle(TEvInitializerPreparationFinished::TPtr& ev);
+ void Handle(TEvInitializerPreparationProblem::TPtr& ev);
+ void Handle(NInternal::NRequest::TEvRequestFinished::TPtr& ev);
protected:
virtual void RegisterState() = 0;
virtual void OnInitialized() = 0;
- virtual TVector<NMetadataProvider::ITableModifier::TPtr> BuildModifiers() const = 0;
+ virtual void Prepare(IController::TPtr controller) = 0;
public:
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::METADATA_INITIALIZER;
+ }
+
void Bootstrap();
TDSAccessorInitialized(const NInternal::NRequest::TConfig& config);
- void Handle(NInternal::NRequest::TEvRequestFinished::TPtr& ev);
STATEFN(StateMain) {
switch (ev->GetTypeRewrite()) {
hFunc(NInternal::NRequest::TEvRequestFinished, Handle);
+ hFunc(TEvInitializerPreparationStart, Handle);
+ hFunc(TEvInitializerPreparationFinished, Handle);
+ hFunc(TEvInitializerPreparationProblem, Handle);
default:
break;
}
diff --git a/ydb/services/metadata/initializer/common.cpp b/ydb/services/metadata/initializer/common.cpp
index fa5484fdbe..e9b8208654 100644
--- a/ydb/services/metadata/initializer/common.cpp
+++ b/ydb/services/metadata/initializer/common.cpp
@@ -1,5 +1,5 @@
#include "common.h"
-namespace NKikimr::NMetadataProvider {
+namespace NKikimr::NMetadataInitializer {
}
diff --git a/ydb/services/metadata/initializer/common.h b/ydb/services/metadata/initializer/common.h
index f9fac85278..40e8e79ee8 100644
--- a/ydb/services/metadata/initializer/common.h
+++ b/ydb/services/metadata/initializer/common.h
@@ -2,7 +2,7 @@
#include <ydb/services/metadata/request/config.h>
#include <ydb/services/metadata/request/request_actor.h>
-namespace NKikimr::NMetadataProvider {
+namespace NKikimr::NMetadataInitializer {
class ITableModifier {
protected:
@@ -31,4 +31,12 @@ public:
}
};
+class IController {
+public:
+ using TPtr = std::shared_ptr<IController>;
+ virtual void PreparationFinished(const TVector<ITableModifier::TPtr>& modifiers) const = 0;
+ virtual void PreparationProblem(const TString& errorMessage) const = 0;
+ virtual ~IController() = default;
+};
+
}
diff --git a/ydb/services/metadata/initializer/controller.cpp b/ydb/services/metadata/initializer/controller.cpp
new file mode 100644
index 0000000000..1481e54733
--- /dev/null
+++ b/ydb/services/metadata/initializer/controller.cpp
@@ -0,0 +1,14 @@
+#include "controller.h"
+#include "events.h"
+
+namespace NKikimr::NMetadataInitializer {
+
+void TInitializerController::PreparationFinished(const TVector<ITableModifier::TPtr>& modifiers) const {
+ ActorId.Send(ActorId, new TEvInitializerPreparationFinished(modifiers));
+}
+
+void TInitializerController::PreparationProblem(const TString& errorMessage) const {
+ ActorId.Send(ActorId, new TEvInitializerPreparationProblem(errorMessage));
+}
+
+}
diff --git a/ydb/services/metadata/initializer/controller.h b/ydb/services/metadata/initializer/controller.h
new file mode 100644
index 0000000000..2d2c2617e8
--- /dev/null
+++ b/ydb/services/metadata/initializer/controller.h
@@ -0,0 +1,19 @@
+#pragma once
+#include "common.h"
+
+namespace NKikimr::NMetadataInitializer {
+
+class TInitializerController: public IController {
+private:
+ const TActorIdentity ActorId;
+public:
+ TInitializerController(const TActorIdentity& actorId)
+ : ActorId(actorId) {
+
+ }
+
+ virtual void PreparationFinished(const TVector<ITableModifier::TPtr>& modifiers) const override;
+ virtual void PreparationProblem(const TString& errorMessage) const override;
+};
+
+}
diff --git a/ydb/services/metadata/initializer/events.cpp b/ydb/services/metadata/initializer/events.cpp
new file mode 100644
index 0000000000..f4c4d34515
--- /dev/null
+++ b/ydb/services/metadata/initializer/events.cpp
@@ -0,0 +1,6 @@
+#include "events.h"
+#include <ydb/core/grpc_services/local_rpc/local_rpc.h>
+
+namespace NKikimr::NMetadataInitializer {
+
+}
diff --git a/ydb/services/metadata/initializer/events.h b/ydb/services/metadata/initializer/events.h
new file mode 100644
index 0000000000..5fe9b13683
--- /dev/null
+++ b/ydb/services/metadata/initializer/events.h
@@ -0,0 +1,46 @@
+#pragma once
+#include "common.h"
+
+#include <ydb/services/metadata/ds_table/config.h>
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/event_local.h>
+#include <library/cpp/threading/future/core/future.h>
+#include <library/cpp/actors/core/av_bootstrapped.h>
+
+namespace NKikimr::NMetadataInitializer {
+
+enum EInitializerEvents {
+ EvInitializerPreparationStart = EventSpaceBegin(TKikimrEvents::ES_METADATA_INITIALIZER),
+ EvInitializerPreparationFinished,
+ EvInitializerPreparationProblem,
+ EvEnd
+};
+
+static_assert(EInitializerEvents::EvEnd < EventSpaceEnd(TKikimrEvents::ES_METADATA_INITIALIZER), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_METADATA_INITIALIZER)");
+
+class TEvInitializerPreparationStart: public TEventLocal<TEvInitializerPreparationStart, EInitializerEvents::EvInitializerPreparationStart> {
+public:
+};
+
+class TEvInitializerPreparationFinished: public TEventLocal<TEvInitializerPreparationFinished, EInitializerEvents::EvInitializerPreparationFinished> {
+private:
+ YDB_READONLY_DEF(TVector<ITableModifier::TPtr>, Modifiers);
+public:
+ TEvInitializerPreparationFinished(const TVector<ITableModifier::TPtr>& modifiers)
+ : Modifiers(modifiers) {
+
+ }
+};
+
+class TEvInitializerPreparationProblem: public TEventLocal<TEvInitializerPreparationProblem, EInitializerEvents::EvInitializerPreparationProblem> {
+private:
+ YDB_READONLY_DEF(TString, ErrorMessage);
+public:
+ TEvInitializerPreparationProblem(const TString& errorMessage)
+ : ErrorMessage(errorMessage) {
+
+ }
+};
+
+}
diff --git a/ydb/services/metadata/service.h b/ydb/services/metadata/service.h
index 858986f779..d576186974 100644
--- a/ydb/services/metadata/service.h
+++ b/ydb/services/metadata/service.h
@@ -7,11 +7,9 @@ namespace NKikimr::NMetadataProvider {
class TEvSubscribeExternal: public NActors::TEventLocal<TEvSubscribeExternal, EEvSubscribe::EvSubscribeExternal> {
private:
YDB_READONLY_DEF(ISnapshotParser::TPtr, SnapshotParser);
- YDB_READONLY_DEF(NActors::TActorId, SubscriberId);
public:
- TEvSubscribeExternal(ISnapshotParser::TPtr parser, const NActors::TActorId& subscriberId = {})
+ TEvSubscribeExternal(ISnapshotParser::TPtr parser)
: SnapshotParser(parser)
- , SubscriberId(subscriberId)
{
Y_VERIFY(!!SnapshotParser);
}