summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/kqp/ut/common/columnshard.cpp40
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.h6
-rw-r--r--ydb/core/kqp/ut/olap/tiering_ut.cpp12
-rw-r--r--ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp18
-rw-r--r--ydb/core/tx/columnshard/blobs_action/abstract/storage.h6
-rw-r--r--ydb/core/tx/columnshard/blobs_action/bs/storage.h4
-rw-r--r--ydb/core/tx/columnshard/blobs_action/local/storage.h3
-rw-r--r--ydb/core/tx/columnshard/blobs_action/tier/storage.cpp19
-rw-r--r--ydb/core/tx/columnshard/blobs_action/tier/storage.h3
-rw-r--r--ydb/core/tx/columnshard/columnshard.cpp39
-rw-r--r--ydb/core/tx/columnshard/columnshard__init.cpp12
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp25
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h10
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/tiering/tier_info.h31
-rw-r--r--ydb/core/tx/columnshard/engines/storage/actualizer/index/index.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/storage/actualizer/index/index.h3
-rw-r--r--ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.cpp7
-rw-r--r--ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.h11
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule/granule.cpp2
-rw-r--r--ydb/core/tx/columnshard/hooks/abstract/abstract.h10
-rw-r--r--ydb/core/tx/columnshard/hooks/abstract/ya.make5
-rw-r--r--ydb/core/tx/columnshard/tables_manager.cpp15
-rw-r--r--ydb/core/tx/columnshard/tables_manager.h6
-rw-r--r--ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp38
-rw-r--r--ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h8
-rw-r--r--ydb/core/tx/columnshard/test_helper/controllers.cpp13
-rw-r--r--ydb/core/tx/columnshard/test_helper/controllers.h16
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_backup.cpp2
-rw-r--r--ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp20
-rw-r--r--ydb/core/tx/schemeshard/olap/manager/manager.cpp42
-rw-r--r--ydb/core/tx/schemeshard/olap/manager/manager.h1
-rw-r--r--ydb/core/tx/schemeshard/olap/operations/alter/common/update.cpp21
-rw-r--r--ydb/core/tx/schemeshard/olap/operations/alter/in_store/schema/update.cpp2
-rw-r--r--ydb/core/tx/schemeshard/olap/operations/alter/standalone/update.cpp2
-rw-r--r--ydb/core/tx/schemeshard/olap/operations/alter_store.cpp2
-rw-r--r--ydb/core/tx/schemeshard/olap/operations/create_table.cpp13
-rw-r--r--ydb/core/tx/schemeshard/olap/operations/drop_table.cpp9
-rw-r--r--ydb/core/tx/schemeshard/olap/schema/schema.cpp5
-rw-r--r--ydb/core/tx/schemeshard/olap/schema/schema.h6
-rw-r--r--ydb/core/tx/schemeshard/olap/table/table.h10
-rw-r--r--ydb/core/tx/schemeshard/olap/ttl/validator.cpp36
-rw-r--r--ydb/core/tx/schemeshard/olap/ttl/validator.h6
-rw-r--r--ydb/core/tx/schemeshard/olap/ttl/ya.make1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_alter_external_data_source.cpp24
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp25
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.h2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.h9
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_path_describer.cpp7
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_schema.h2
-rw-r--r--ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp34
-rw-r--r--ydb/core/tx/schemeshard/ya.make1
-rw-r--r--ydb/core/tx/tiering/common.h5
-rw-r--r--ydb/core/tx/tiering/external_data.cpp24
-rw-r--r--ydb/core/tx/tiering/external_data.h25
-rw-r--r--ydb/core/tx/tiering/fetcher.cpp3
-rw-r--r--ydb/core/tx/tiering/fetcher.h198
-rw-r--r--ydb/core/tx/tiering/manager.cpp318
-rw-r--r--ydb/core/tx/tiering/manager.h92
-rw-r--r--ydb/core/tx/tiering/snapshot.cpp36
-rw-r--r--ydb/core/tx/tiering/snapshot.h25
-rw-r--r--ydb/core/tx/tiering/tier/behaviour.cpp33
-rw-r--r--ydb/core/tx/tiering/tier/behaviour.h20
-rw-r--r--ydb/core/tx/tiering/tier/checker.cpp52
-rw-r--r--ydb/core/tx/tiering/tier/checker.h38
-rw-r--r--ydb/core/tx/tiering/tier/initializer.cpp36
-rw-r--r--ydb/core/tx/tiering/tier/initializer.h15
-rw-r--r--ydb/core/tx/tiering/tier/manager.cpp77
-rw-r--r--ydb/core/tx/tiering/tier/manager.h19
-rw-r--r--ydb/core/tx/tiering/tier/object.cpp114
-rw-r--r--ydb/core/tx/tiering/tier/object.h77
-rw-r--r--ydb/core/tx/tiering/tier/ss_checker.cpp26
-rw-r--r--ydb/core/tx/tiering/tier/ss_checker.h68
-rw-r--r--ydb/core/tx/tiering/tier/ss_fetcher.cpp77
-rw-r--r--ydb/core/tx/tiering/tier/ss_fetcher.h79
-rw-r--r--ydb/core/tx/tiering/tier/ya.make12
-rw-r--r--ydb/core/tx/tiering/ut/ut_tiers.cpp312
-rw-r--r--ydb/core/tx/tiering/ya.make6
-rw-r--r--ydb/core/wrappers/abstract.cpp2
-rw-r--r--ydb/core/wrappers/fake_storage.h2
-rw-r--r--ydb/services/metadata/secret/accessor/secret_id.cpp32
-rw-r--r--ydb/services/metadata/secret/accessor/secret_id.h223
-rw-r--r--ydb/services/metadata/secret/accessor/snapshot.h18
-rw-r--r--ydb/services/metadata/secret/accessor/ya.make13
-rw-r--r--ydb/services/metadata/secret/secret.cpp24
-rw-r--r--ydb/services/metadata/secret/secret.h216
-rw-r--r--ydb/services/metadata/secret/snapshot.h11
-rw-r--r--ydb/services/metadata/secret/ya.make1
87 files changed, 1373 insertions, 1605 deletions
diff --git a/ydb/core/kqp/ut/common/columnshard.cpp b/ydb/core/kqp/ut/common/columnshard.cpp
index 28ffe5f3e7d..016440eff63 100644
--- a/ydb/core/kqp/ut/common/columnshard.cpp
+++ b/ydb/core/kqp/ut/common/columnshard.cpp
@@ -11,27 +11,6 @@ extern "C" {
namespace NKikimr {
namespace NKqp {
-
- TString GetConfigProtoWithName(const TString & tierName) {
- return TStringBuilder() << "Name : \"" << tierName << "\"\n" <<
- R"(
- ObjectStorage : {
- Endpoint: "fake"
- Bucket: "fake"
- SecretableAccessKey: {
- Value: {
- Data: "secretAccessKey"
- }
- }
- SecretableSecretKey: {
- Value: {
- Data: "fakeSecret"
- }
- }
- }
- )";
- }
-
using namespace NYdb;
TTestHelper::TTestHelper(const TKikimrSettings& settings) {
@@ -39,9 +18,13 @@ namespace NKqp {
if (!kikimrSettings.FeatureFlags.HasEnableTieringInColumnShard()) {
kikimrSettings.SetEnableTieringInColumnShard(true);
}
+ if (!kikimrSettings.FeatureFlags.HasEnableExternalDataSources()) {
+ kikimrSettings.SetEnableExternalDataSources(true);
+ }
Kikimr = std::make_unique<TKikimrRunner>(kikimrSettings);
- TableClient = std::make_unique<NYdb::NTable::TTableClient>(Kikimr->GetTableClient());
+ TableClient =
+ std::make_unique<NYdb::NTable::TTableClient>(Kikimr->GetTableClient(NYdb::NTable::TClientSettings().AuthToken("root@builtin")));
Session = std::make_unique<NYdb::NTable::TSession>(TableClient->CreateSession().GetValueSync().GetSession());
}
@@ -64,7 +47,18 @@ namespace NKqp {
}
void TTestHelper::CreateTier(const TString& tierName) {
- auto result = GetSession().ExecuteSchemeQuery("CREATE OBJECT " + tierName + " (TYPE TIER) WITH tierConfig = `" + GetConfigProtoWithName(tierName) + "`").GetValueSync();
+ auto result = GetSession().ExecuteSchemeQuery(R"(
+ UPSERT OBJECT `accessKey` (TYPE SECRET) WITH (value = `secretAccessKey`);
+ UPSERT OBJECT `secretKey` (TYPE SECRET) WITH (value = `fakeSecret`);
+ CREATE EXTERNAL DATA SOURCE `)" + tierName + R"(` WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="http://fake.fake/fake",
+ AUTH_METHOD="AWS",
+ AWS_ACCESS_KEY_ID_SECRET_NAME="accessKey",
+ AWS_SECRET_ACCESS_KEY_SECRET_NAME="secretKey",
+ AWS_REGION="ru-central1"
+ );
+ )").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h
index 637ca91929c..5352993653e 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.h
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.h
@@ -166,9 +166,9 @@ public:
NYdb::TDriverConfig GetDriverConfig() const { return DriverConfig; }
- NYdb::NTable::TTableClient GetTableClient() const {
- return NYdb::NTable::TTableClient(*Driver, NYdb::NTable::TClientSettings()
- .UseQueryCache(false));
+ NYdb::NTable::TTableClient GetTableClient(
+ NYdb::NTable::TClientSettings settings = NYdb::NTable::TClientSettings()) const {
+ return NYdb::NTable::TTableClient(*Driver, settings.UseQueryCache(false));
}
NYdb::NQuery::TQueryClient GetQueryClient(
diff --git a/ydb/core/kqp/ut/olap/tiering_ut.cpp b/ydb/core/kqp/ut/olap/tiering_ut.cpp
index f14fa26a325..847091530c4 100644
--- a/ydb/core/kqp/ut/olap/tiering_ut.cpp
+++ b/ydb/core/kqp/ut/olap/tiering_ut.cpp
@@ -68,7 +68,7 @@ public:
UNIT_ASSERT_GT(columnRawBytes, 0);
}
- TestHelper->SetTiering("/Root/olapStore/olapTable", "tier1", "timestamp");
+ TestHelper->SetTiering("/Root/olapStore/olapTable", "/Root/tier1", "timestamp");
csController->WaitActualization(TDuration::Seconds(5));
{
@@ -82,7 +82,7 @@ public:
auto rows = ExecuteScanQuery(tableClient, selectQuery);
UNIT_ASSERT_VALUES_EQUAL(rows.size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[0].at("TierName")), "tier1");
+ UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[0].at("TierName")), "/Root/tier1");
UNIT_ASSERT_VALUES_EQUAL_C(GetUint64(rows[0].at("RawBytes")), columnRawBytes,
TStringBuilder() << "RawBytes changed after eviction: before=" << columnRawBytes
<< " after=" << GetUint64(rows[0].at("RawBytes")));
@@ -121,7 +121,7 @@ class TTestEvictionResetTiering : public TTestEvictionBase {
class TTestEvictionIncreaseDuration : public TTestEvictionBase {
private:
void UnevictAll() {
- const TString query = R"(ALTER TABLE `/Root/olapStore/olapTable` SET TTL Interval("P30000D") TO EXTERNAL DATA SOURCE tier1 ON timestamp)";
+ const TString query = R"(ALTER TABLE `/Root/olapStore/olapTable` SET TTL Interval("P30000D") TO EXTERNAL DATA SOURCE `/Root/tier1` ON timestamp)";
auto result = TestHelper->GetSession().ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
}
@@ -152,18 +152,18 @@ Y_UNIT_TEST_SUITE(KqpOlapTiering) {
testHelper.CreateTier("tier1");
{
- const TString query = R"(ALTER TABLE `/Root/olapStore/olapTable` SET TTL Interval("P10D") TO EXTERNAL DATA SOURCE tier1 ON unknown_column;)";
+ const TString query = R"(ALTER TABLE `/Root/olapStore/olapTable` SET TTL Interval("P10D") TO EXTERNAL DATA SOURCE `/Root/tier1` ON unknown_column;)";
auto result = testHelper.GetSession().ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_UNEQUAL(result.GetStatus(), NYdb::EStatus::SUCCESS);
}
{
- const TString query = R"(ALTER TABLE `/Root/olapStore/olapTable` SET TTL Interval("P10D") TO EXTERNAL DATA SOURCE tier1 ON uid;)";
+ const TString query = R"(ALTER TABLE `/Root/olapStore/olapTable` SET TTL Interval("P10D") TO EXTERNAL DATA SOURCE `/Root/tier1` ON uid;)";
auto result = testHelper.GetSession().ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_UNEQUAL(result.GetStatus(), NYdb::EStatus::SUCCESS);
}
- testHelper.SetTiering("/Root/olapStore/olapTable", "tier1", "timestamp");
+ testHelper.SetTiering("/Root/olapStore/olapTable", "/Root/tier1", "timestamp");
}
}
diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
index f200071b8eb..fed37645c47 100644
--- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
+++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
@@ -5468,7 +5468,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
WITH (
STORE = COLUMN,
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10,
- TTL = Interval("PT10S") TO EXTERNAL DATA SOURCE tier1 ON Key
+ TTL = Interval("PT10S") TO EXTERNAL DATA SOURCE `/Root/tier1` ON Key
);)";
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
@@ -5479,12 +5479,12 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
UNIT_ASSERT(desc.GetTableDescription().GetTtlSettings());
auto ttl = desc.GetTableDescription().GetTtlSettings();
UNIT_ASSERT_VALUES_EQUAL(ttl->GetTiers().size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(std::get<TTtlEvictToExternalStorageAction>(ttl->GetTiers()[0].GetAction()).GetStorage(), "tier1");
+ UNIT_ASSERT_VALUES_EQUAL(std::get<TTtlEvictToExternalStorageAction>(ttl->GetTiers()[0].GetAction()).GetStorage(), "/Root/tier1");
UNIT_ASSERT_VALUES_EQUAL(std::get<TDateTypeColumnModeSettings>(ttl->GetTiers()[0].GetExpression()).GetExpireAfter(), TDuration::Seconds(10));
}
auto query2 = TStringBuilder() << R"(
--!syntax_v1
- ALTER TABLE `)" << tableName << R"(` SET (TTL = Interval("PT10S") TO EXTERNAL DATA SOURCE tier2 ON Key);)";
+ ALTER TABLE `)" << tableName << R"(` SET (TTL = Interval("PT10S") TO EXTERNAL DATA SOURCE `/Root/tier2` ON Key);)";
result = session.ExecuteSchemeQuery(query2).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
@@ -5495,7 +5495,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
UNIT_ASSERT(desc.GetTableDescription().GetTtlSettings());
auto ttl = desc.GetTableDescription().GetTtlSettings();
UNIT_ASSERT_VALUES_EQUAL(ttl->GetTiers().size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(std::get<TTtlEvictToExternalStorageAction>(ttl->GetTiers()[0].GetAction()).GetStorage(), "tier2");
+ UNIT_ASSERT_VALUES_EQUAL(std::get<TTtlEvictToExternalStorageAction>(ttl->GetTiers()[0].GetAction()).GetStorage(), "/Root/tier2");
UNIT_ASSERT_VALUES_EQUAL(std::get<TDateTypeColumnModeSettings>(ttl->GetTiers()[0].GetExpression()).GetExpireAfter(), TDuration::Seconds(10));
}
@@ -5515,7 +5515,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
auto query4 = TStringBuilder() << R"(
--!syntax_v1
- ALTER TABLE `)" << tableName << R"(` SET (TTL = Interval("PT10S") TO EXTERNAL DATA SOURCE tier1 ON Key);)";
+ ALTER TABLE `)" << tableName << R"(` SET (TTL = Interval("PT10S") TO EXTERNAL DATA SOURCE `/Root/tier1` ON Key);)";
result = session.ExecuteSchemeQuery(query4).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
@@ -5526,7 +5526,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
UNIT_ASSERT(desc.GetTableDescription().GetTtlSettings());
auto ttl = desc.GetTableDescription().GetTtlSettings();
UNIT_ASSERT_VALUES_EQUAL(ttl->GetTiers().size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(std::get<TTtlEvictToExternalStorageAction>(ttl->GetTiers()[0].GetAction()).GetStorage(), "tier1");
+ UNIT_ASSERT_VALUES_EQUAL(std::get<TTtlEvictToExternalStorageAction>(ttl->GetTiers()[0].GetAction()).GetStorage(), "/Root/tier1");
UNIT_ASSERT_VALUES_EQUAL(std::get<TDateTypeColumnModeSettings>(ttl->GetTiers()[0].GetExpression()).GetExpireAfter(), TDuration::Seconds(10));
}
@@ -8441,7 +8441,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
testHelper.BulkUpsert(testTable, tableInserter);
}
- testHelper.SetTiering(tableName, "tier1", "created_at");
+ testHelper.SetTiering(tableName, "/Root/tier1", "created_at");
while (csController->GetTieringUpdates().Val() == 0) {
Cout << "Wait tiering..." << Endl;
@@ -8509,7 +8509,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
UNIT_ASSERT_VALUES_EQUAL(description.GetTtlSettings()->GetDateTypeColumn().GetExpireAfter(), TDuration::Hours(1));
}
{
- auto alterQuery = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "`SET (TTL = Interval(\"PT10S\") TO EXTERNAL DATA SOURCE tier1, Interval(\"PT1H\") DELETE ON created_at);";
+ auto alterQuery = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "`SET (TTL = Interval(\"PT10S\") TO EXTERNAL DATA SOURCE `/Root/tier1`, Interval(\"PT1H\") DELETE ON created_at);";
auto alterResult = testHelper.GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString());
}
@@ -8524,7 +8524,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
UNIT_ASSERT_VALUES_EQUAL(ttl->GetTiers().size(), 2);
auto evictTier = ttl->GetTiers()[0];
UNIT_ASSERT(std::holds_alternative<TTtlEvictToExternalStorageAction>(evictTier.GetAction()));
- UNIT_ASSERT_VALUES_EQUAL(std::get<TTtlEvictToExternalStorageAction>(evictTier.GetAction()).GetStorage(), "tier1");
+ UNIT_ASSERT_VALUES_EQUAL(std::get<TTtlEvictToExternalStorageAction>(evictTier.GetAction()).GetStorage(), "/Root/tier1");
UNIT_ASSERT_VALUES_EQUAL(std::get<TDateTypeColumnModeSettings>(evictTier.GetExpression()).GetExpireAfter(), TDuration::Seconds(10));
auto deleteTier = ttl->GetTiers()[1];
UNIT_ASSERT(std::holds_alternative<TTtlDeleteAction>(deleteTier.GetAction()));
diff --git a/ydb/core/tx/columnshard/blobs_action/abstract/storage.h b/ydb/core/tx/columnshard/blobs_action/abstract/storage.h
index 3e392a21398..6263e66f151 100644
--- a/ydb/core/tx/columnshard/blobs_action/abstract/storage.h
+++ b/ydb/core/tx/columnshard/blobs_action/abstract/storage.h
@@ -54,6 +54,7 @@ protected:
virtual void DoStartGCAction(const std::shared_ptr<IBlobsGCAction>& counters) const = 0;
void StartGCAction(const std::shared_ptr<IBlobsGCAction>& action) const {
+ AFL_VERIFY(IsReady());
return DoStartGCAction(action);
}
@@ -96,14 +97,17 @@ public:
}
std::shared_ptr<IBlobsDeclareRemovingAction> StartDeclareRemovingAction(const NBlobOperations::EConsumer consumerId) {
+ AFL_VERIFY(IsReady());
return DoStartDeclareRemovingAction(Counters->GetConsumerCounter(consumerId)->GetRemoveDeclareCounters());
}
std::shared_ptr<IBlobsWritingAction> StartWritingAction(const NBlobOperations::EConsumer consumerId) {
+ AFL_VERIFY(IsReady());
auto result = DoStartWritingAction();
result->SetCounters(Counters->GetConsumerCounter(consumerId)->GetWriteCounters());
return result;
}
std::shared_ptr<IBlobsReadingAction> StartReadingAction(const NBlobOperations::EConsumer consumerId) {
+ AFL_VERIFY(IsReady());
auto result = DoStartReadingAction();
result->SetCounters(Counters->GetConsumerCounter(consumerId)->GetReadCounters());
return result;
@@ -129,6 +133,8 @@ public:
CurrentGCAction = task;
return CurrentGCAction;
}
+
+ virtual bool IsReady() const = 0;
};
}
diff --git a/ydb/core/tx/columnshard/blobs_action/bs/storage.h b/ydb/core/tx/columnshard/blobs_action/bs/storage.h
index fd5c21eb309..8cdc80868e0 100644
--- a/ydb/core/tx/columnshard/blobs_action/bs/storage.h
+++ b/ydb/core/tx/columnshard/blobs_action/bs/storage.h
@@ -41,6 +41,10 @@ public:
virtual std::shared_ptr<IBlobInUseTracker> GetBlobsTracker() const override {
return Manager;
}
+
+ virtual bool IsReady() const override {
+ return true;
+ }
};
}
diff --git a/ydb/core/tx/columnshard/blobs_action/local/storage.h b/ydb/core/tx/columnshard/blobs_action/local/storage.h
index beb5c4286ca..142c0700f0b 100644
--- a/ydb/core/tx/columnshard/blobs_action/local/storage.h
+++ b/ydb/core/tx/columnshard/blobs_action/local/storage.h
@@ -48,6 +48,9 @@ public:
return false;
}
+ virtual bool IsReady() const override {
+ return true;
+ }
};
}
diff --git a/ydb/core/tx/columnshard/blobs_action/tier/storage.cpp b/ydb/core/tx/columnshard/blobs_action/tier/storage.cpp
index cf842edbd41..2cb28089eb8 100644
--- a/ydb/core/tx/columnshard/blobs_action/tier/storage.cpp
+++ b/ydb/core/tx/columnshard/blobs_action/tier/storage.cpp
@@ -54,12 +54,14 @@ void TOperator::DoStartGCAction(const std::shared_ptr<IBlobsGCAction>& action) c
}
void TOperator::InitNewExternalOperator(const NColumnShard::NTiers::TManager* tierManager) {
- NKikimrSchemeOp::TS3Settings settings;
- if (tierManager) {
- settings = tierManager->GetS3Settings();
- } else {
- settings.SetEndpoint("nowhere");
+ if (!tierManager || !tierManager->IsReady()) {
+ TGuard<TSpinLock> changeLock(ChangeOperatorLock);
+ CurrentS3Settings.reset();
+ ExternalStorageOperator = nullptr;
+ return;
}
+
+ NKikimrSchemeOp::TS3Settings settings = tierManager->GetS3Settings();
{
TGuard<TSpinLock> changeLock(ChangeOperatorLock);
if (CurrentS3Settings && CurrentS3Settings->SerializeAsString() == settings.SerializeAsString()) {
@@ -103,12 +105,7 @@ TOperator::TOperator(const TString& storageId, const TActorId& shardActorId, con
void TOperator::DoOnTieringModified(const std::shared_ptr<NColumnShard::ITiersManager>& tiers) {
auto* tierManager = tiers->GetManagerOptional(TBase::GetStorageId());
- if (tierManager) {
- InitNewExternalOperator(tierManager);
- } else {
- TGuard<TSpinLock> changeLock(ChangeOperatorLock);
- ExternalStorageOperator = nullptr;
- }
+ InitNewExternalOperator(tierManager);
}
bool TOperator::DoLoad(IBlobManagerDb& dbBlobs) {
diff --git a/ydb/core/tx/columnshard/blobs_action/tier/storage.h b/ydb/core/tx/columnshard/blobs_action/tier/storage.h
index db188f1be71..7495014b12a 100644
--- a/ydb/core/tx/columnshard/blobs_action/tier/storage.h
+++ b/ydb/core/tx/columnshard/blobs_action/tier/storage.h
@@ -56,6 +56,9 @@ public:
return GCInfo->HasToDelete(blobId, tabletId);
}
+ virtual bool IsReady() const override {
+ return !!ExternalStorageOperator;
+ }
};
}
diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp
index 7d53b61bfd3..f0782462de4 100644
--- a/ydb/core/tx/columnshard/columnshard.cpp
+++ b/ydb/core/tx/columnshard/columnshard.cpp
@@ -58,25 +58,27 @@ void TColumnShard::BecomeBroken(const TActorContext& ctx) {
CleanupActors(ctx);
}
-void TColumnShard::SwitchToWork(const TActorContext& ctx) {
+void TColumnShard::TrySwitchToWork(const TActorContext& ctx) {
+ if (!Tiers->AreConfigsComplete()) {
+ AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "skip_switch_to_work")("reason", "tiering_metadata_not_ready");
+ return;
+ }
+ if (!IsTxInitFinished) {
+ AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "skip_switch_to_work")("reason", "db_reading_not_finished");
+ return;
+ }
+
+ ProgressTxController->OnTabletInit();
{
const TLogContextGuard gLogging =
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletID())("self_id", SelfId())("process", "SwitchToWork");
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "initialize_shard")("step", "SwitchToWork");
-
- for (const auto& [pathId, tiering] : TablesManager.GetTtl()) {
- THashSet<TString> tiers;
- for (const auto& [name, config] : tiering.GetTierByName()) {
- tiers.emplace(name);
- }
- ActivateTiering(pathId, tiers);
- }
-
Become(&TThis::StateWork);
SignalTabletActive(ctx);
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "initialize_shard")("step", "SignalTabletActive");
TryRegisterMediatorTimeCast();
EnqueueProgressTx(ctx, std::nullopt);
+ OnTieringModified();
}
Counters.GetCSCounters().OnIndexMetadataLimit(NOlap::IColumnEngine::GetMetadataLimit());
EnqueueBackgroundActivities();
@@ -87,6 +89,7 @@ void TColumnShard::SwitchToWork(const TActorContext& ctx) {
NYDBTest::TControllers::GetColumnShardController()->OnSwitchToWork(TabletID());
AFL_VERIFY(!!StartInstant);
Counters.GetCSCounters().Initialization.OnSwitchToWork(TMonotonic::Now() - *StartInstant, TMonotonic::Now() - CreateInstant);
+ NYDBTest::TControllers::GetColumnShardController()->OnTabletInitCompleted(*this);
}
void TColumnShard::OnActivateExecutor(const TActorContext& ctx) {
@@ -104,8 +107,10 @@ void TColumnShard::OnActivateExecutor(const TActorContext& ctx) {
ctx.Send(selfActorId, new TEvPrivate::TEvTieringModified);
});
Tiers->Start(Tiers);
- if (!NMetadata::NProvider::TServiceOperator::IsEnabled()) {
- Tiers->TakeConfigs(NYDBTest::TControllers::GetColumnShardController()->GetFallbackTiersSnapshot(), nullptr);
+ if (const auto& tiersSnapshot = NYDBTest::TControllers::GetColumnShardController()->GetOverrideTierConfigs(); !tiersSnapshot.empty()) {
+ for (const auto& [id, tier] : tiersSnapshot) {
+ Tiers->UpdateTierConfig(tier, CanonizePath(id), false);
+ }
}
BackgroundSessionsManager = std::make_shared<NOlap::NBackground::TSessionsManager>(
std::make_shared<NBackground::TAdapter>(selfActorId, (NOlap::TTabletId)TabletID(), *this));
@@ -124,10 +129,20 @@ void TColumnShard::OnActivateExecutor(const TActorContext& ctx) {
}
void TColumnShard::Handle(TEvPrivate::TEvTieringModified::TPtr& /*ev*/, const TActorContext& /*ctx*/) {
+ if (const auto& tiersSnapshot = NYDBTest::TControllers::GetColumnShardController()->GetOverrideTierConfigs(); !tiersSnapshot.empty()) {
+ for (const auto& [id, tier] : tiersSnapshot) {
+ Tiers->UpdateTierConfig(tier, CanonizePath(id), false);
+ }
+ }
+
OnTieringModified();
NYDBTest::TControllers::GetColumnShardController()->OnTieringModified(Tiers);
}
+void TColumnShard::HandleInit(TEvPrivate::TEvTieringModified::TPtr& /*ev*/, const TActorContext& ctx) {
+ TrySwitchToWork(ctx);
+}
+
void TColumnShard::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext&) {
auto tabletId = ev->Get()->TabletId;
auto clientId = ev->Get()->ClientId;
diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp
index 852c0544410..f8743854a38 100644
--- a/ydb/core/tx/columnshard/columnshard__init.cpp
+++ b/ydb/core/tx/columnshard/columnshard__init.cpp
@@ -16,6 +16,7 @@
#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h>
#include <ydb/core/tx/columnshard/operations/write.h>
#include <ydb/core/tx/columnshard/transactions/locks_db.h>
+#include <ydb/core/tx/tiering/manager.h>
namespace NKikimr::NColumnShard {
@@ -103,9 +104,14 @@ bool TTxInit::Execute(TTransactionContext& txc, const TActorContext& ctx) {
void TTxInit::Complete(const TActorContext& ctx) {
Self->Counters.GetCSCounters().Initialization.OnTxInitFinished(TMonotonic::Now() - StartInstant);
- Self->ProgressTxController->OnTabletInit();
- Self->SwitchToWork(ctx);
- NYDBTest::TControllers::GetColumnShardController()->OnTabletInitCompleted(*Self);
+ AFL_VERIFY(!Self->IsTxInitFinished);
+ Self->IsTxInitFinished = true;
+
+ for (const auto& [pathId, tiering] : Self->TablesManager.GetTtl()) {
+ Self->Tiers->EnablePathId(pathId, tiering.GetUsedTiers());
+ }
+
+ Self->TrySwitchToWork(ctx);
}
class TTxUpdateSchema: public TTransactionBase<TColumnShard> {
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 033705facc0..dd74874db49 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -46,7 +46,6 @@
#include <ydb/core/tx/priorities/usage/abstract.h>
#include <ydb/core/tx/priorities/usage/events.h>
#include <ydb/core/tx/priorities/usage/service.h>
-#include <ydb/core/tx/tiering/external_data.h>
#include <ydb/core/tx/tiering/manager.h>
#include <ydb/services/metadata/service.h>
@@ -398,7 +397,7 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl
tableVerProto.SetSchemaPresetId(preset.GetId());
if (TablesManager.RegisterSchemaPreset(preset, db)) {
- TablesManager.AddSchemaVersion(tableProto.GetSchemaPreset().GetId(), version, tableProto.GetSchemaPreset().GetSchema(), db, Tiers);
+ TablesManager.AddSchemaVersion(tableProto.GetSchemaPreset().GetId(), version, tableProto.GetSchemaPreset().GetSchema(), db);
}
} else {
Y_ABORT_UNLESS(tableProto.HasSchema(), "Tables has either schema or preset");
@@ -423,7 +422,7 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl
tableVerProto.SetSchemaPresetVersionAdj(tableProto.GetSchemaPresetVersionAdj());
- TablesManager.AddTableVersion(pathId, version, tableVerProto, schema, db, Tiers);
+ TablesManager.AddTableVersion(pathId, version, tableVerProto, schema, db);
InsertTable->RegisterPathInfo(pathId);
Counters.GetTabletCounters()->SetCounter(COUNTER_TABLES, TablesManager.GetTables().size());
@@ -447,7 +446,7 @@ void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterP
std::optional<NKikimrSchemeOp::TColumnTableSchema> schema;
if (alterProto.HasSchemaPreset()) {
tableVerProto.SetSchemaPresetId(alterProto.GetSchemaPreset().GetId());
- TablesManager.AddSchemaVersion(alterProto.GetSchemaPreset().GetId(), version, alterProto.GetSchemaPreset().GetSchema(), db, Tiers);
+ TablesManager.AddSchemaVersion(alterProto.GetSchemaPreset().GetId(), version, alterProto.GetSchemaPreset().GetSchema(), db);
} else if (alterProto.HasSchema()) {
schema = alterProto.GetSchema();
}
@@ -464,7 +463,7 @@ void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterP
ActivateTiering(pathId, usedTiers);
tableVerProto.SetSchemaPresetVersionAdj(alterProto.GetSchemaPresetVersionAdj());
- TablesManager.AddTableVersion(pathId, version, tableVerProto, schema, db, Tiers);
+ TablesManager.AddTableVersion(pathId, version, tableVerProto, schema, db);
}
void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProto, const NOlap::TSnapshot& version,
@@ -501,7 +500,7 @@ void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto,
if (!TablesManager.HasPreset(presetProto.GetId())) {
continue; // we don't update presets that we don't use
}
- TablesManager.AddSchemaVersion(presetProto.GetId(), version, presetProto.GetSchema(), db, Tiers);
+ TablesManager.AddSchemaVersion(presetProto.GetId(), version, presetProto.GetSchema(), db);
}
}
@@ -1582,18 +1581,10 @@ void TColumnShard::Handle(NOlap::NBlobOperations::NEvents::TEvDeleteSharedBlobs:
Execute(new TTxRemoveSharedBlobs(this, blobs, NActors::ActorIdFromProto(ev->Get()->Record.GetSourceActorId()), ev->Get()->Record.GetStorageId()), ctx);
}
-void TColumnShard::Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev) {
- Y_ABORT_UNLESS(Tiers);
- AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "TEvRefreshSubscriberData")("snapshot", ev->Get()->GetSnapshot()->SerializeToString());
- Tiers->TakeConfigs(ev->Get()->GetSnapshot(), nullptr);
-}
-
void TColumnShard::ActivateTiering(const ui64 pathId, const THashSet<TString>& usedTiers) {
AFL_VERIFY(Tiers);
if (!usedTiers.empty()) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "activate_tiering")("path_id", pathId)("tiers", JoinStrings(usedTiers.begin(), usedTiers.end(), ","));
- }
- if (!usedTiers.empty()) {
Tiers->EnablePathId(pathId, usedTiers);
} else {
Tiers->DisablePathId(pathId);
@@ -1605,7 +1596,7 @@ void TColumnShard::Enqueue(STFUNC_SIG) {
const TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletID())(
"self_id", SelfId())("process", "Enqueue")("ev", ev->GetTypeName());
switch (ev->GetTypeRewrite()) {
- HFunc(TEvPrivate::TEvTieringModified, Handle);
+ HFunc(TEvPrivate::TEvTieringModified, HandleInit);
HFunc(TEvPrivate::TEvNormalizerResult, Handle);
HFunc(NOlap::NDataAccessorControl::TEvAskTabletDataAccessors, Handle);
default:
@@ -1616,10 +1607,6 @@ void TColumnShard::Enqueue(STFUNC_SIG) {
void TColumnShard::OnTieringModified(const std::optional<ui64> pathId) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "OnTieringModified")("path_id", pathId);
- if (!Tiers->IsReady()) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_reload_tiering")("reason", "manager_not_ready")("path_id", pathId);
- return;
- }
StoragesManager->OnTieringModified(Tiers);
if (TablesManager.HasPrimaryIndex()) {
if (pathId) {
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index 2926121bd1e..ddbb00c45d8 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -261,7 +261,6 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
void Handle(TEvPrivate::TEvPingSnapshotsUsage::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorContext& ctx);
- void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev);
void Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvWriteDraft::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvGarbageCollectionFinished::TPtr& ev, const TActorContext& ctx);
@@ -290,6 +289,8 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
void Handle(NOlap::NDataAccessorControl::TEvAskTabletDataAccessors::TPtr& ev, const TActorContext& ctx);
+ void HandleInit(TEvPrivate::TEvTieringModified::TPtr& ev, const TActorContext&);
+
ITransaction* CreateTxInitSchema();
void OnActivateExecutor(const TActorContext& ctx) override;
@@ -308,7 +309,7 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
void CleanupActors(const TActorContext& ctx);
void BecomeBroken(const TActorContext& ctx);
- void SwitchToWork(const TActorContext& ctx);
+ void TrySwitchToWork(const TActorContext& ctx);
bool IsAnyChannelYellowStop() const {
return Executor()->GetStats().IsAnyChannelYellowStop;
@@ -395,8 +396,6 @@ protected:
"self_id", SelfId())("ev", ev->GetTypeName());
TRACE_EVENT(NKikimrServices::TX_COLUMNSHARD);
switch (ev->GetTypeRewrite()) {
- hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, Handle);
-
HFunc(TEvTxProcessing::TEvReadSet, Handle);
HFunc(TEvTxProcessing::TEvReadSetAck, Handle);
@@ -478,6 +477,7 @@ private:
const TMonotonic CreateInstant = TMonotonic::Now();
std::optional<TMonotonic> StartInstant;
+ bool IsTxInitFinished = false;
struct TLongTxWriteInfo {
TInsertWriteId InsertWriteId;
@@ -600,8 +600,6 @@ private:
void FillColumnTableStats(const TActorContext& ctx, std::unique_ptr<TEvDataShard::TEvPeriodicTableStats>& ev);
public:
- std::shared_ptr<ITxReader> StartReader;
-
ui64 TabletTxCounter = 0;
const TTablesManager& GetTablesManager() const {
diff --git a/ydb/core/tx/columnshard/engines/scheme/tiering/tier_info.h b/ydb/core/tx/columnshard/engines/scheme/tiering/tier_info.h
index 81a04ece79c..6097f85ee0c 100644
--- a/ydb/core/tx/columnshard/engines/scheme/tiering/tier_info.h
+++ b/ydb/core/tx/columnshard/engines/scheme/tiering/tier_info.h
@@ -1,13 +1,16 @@
#pragma once
#include "common.h"
+#include <ydb/core/base/path.h>
#include <ydb/core/formats/arrow/arrow_helpers.h>
-#include <ydb/library/formats/arrow/common/validation.h>
#include <ydb/core/formats/arrow/serializer/abstract.h>
#include <ydb/core/tx/columnshard/common/scalars.h>
+
+#include <ydb/library/formats/arrow/common/validation.h>
+
#include <contrib/libs/apache/arrow/cpp/src/arrow/util/compression.h>
-#include <util/generic/set.h>
#include <util/generic/hash_set.h>
+#include <util/generic/set.h>
namespace NKikimr::NOlap {
@@ -28,8 +31,7 @@ public:
: Name(tierName)
, EvictColumnName(column)
, EvictDuration(evictDuration)
- , TtlUnitsInSecond(unitsInSecond)
- {
+ , TtlUnitsInSecond(unitsInSecond) {
Y_ABORT_UNLESS(!!Name);
Y_ABORT_UNLESS(!!EvictColumnName);
}
@@ -242,7 +244,7 @@ public:
tierInfo = TTierInfo::MakeTtl(TDuration::Seconds(tier.GetApplyAfterSeconds()), ttlColumnName, unitsInSecond);
break;
case NKikimrSchemeOp::TTTLSettings_TTier::kEvictToExternalStorage:
- tierInfo = std::make_shared<TTierInfo>(tier.GetEvictToExternalStorage().GetStorage(),
+ tierInfo = std::make_shared<TTierInfo>(CanonizePath(tier.GetEvictToExternalStorage().GetStorage()),
TDuration::Seconds(tier.GetApplyAfterSeconds()), ttlColumnName, unitsInSecond);
break;
case NKikimrSchemeOp::TTTLSettings_TTier::ACTION_NOT_SET:
@@ -268,16 +270,21 @@ public:
return sb;
}
+ THashSet<TString> GetUsedTiers() const {
+ THashSet<TString> tiers;
+ for (const auto& [name, info] : TierByName) {
+ if (name != NTiering::NCommon::DeleteTierName) {
+ tiers.emplace(name);
+ }
+ }
+ return tiers;
+ }
+
static THashSet<TString> GetUsedTiers(const TProto& ttlSettings) {
THashSet<TString> usedTiers;
for (const auto& tier : ttlSettings.GetTiers()) {
- switch (tier.GetActionCase()) {
- case NKikimrSchemeOp::TTTLSettings_TTier::kEvictToExternalStorage:
- usedTiers.emplace(tier.GetEvictToExternalStorage().GetStorage());
- break;
- case NKikimrSchemeOp::TTTLSettings_TTier::kDelete:
- case NKikimrSchemeOp::TTTLSettings_TTier::ACTION_NOT_SET:
- break;
+ if (tier.HasEvictToExternalStorage()) {
+ usedTiers.emplace(CanonizePath(tier.GetEvictToExternalStorage().GetStorage()));
}
}
return usedTiers;
diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/index/index.cpp b/ydb/core/tx/columnshard/engines/storage/actualizer/index/index.cpp
index b1c060f40a9..47a1e4510d8 100644
--- a/ydb/core/tx/columnshard/engines/storage/actualizer/index/index.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/actualizer/index/index.cpp
@@ -36,16 +36,17 @@ void TGranuleActualizationIndex::RefreshScheme(const TAddExternalContext& contex
NYDBTest::TControllers::GetColumnShardController()->OnActualizationRefreshScheme();
}
-TGranuleActualizationIndex::TGranuleActualizationIndex(const ui64 pathId, const TVersionedIndex& versionedIndex)
+TGranuleActualizationIndex::TGranuleActualizationIndex(const ui64 pathId, const TVersionedIndex& versionedIndex, const std::shared_ptr<IStoragesManager>& storagesManager)
: PathId(pathId)
, VersionedIndex(versionedIndex)
+ , StoragesManager(storagesManager)
{
Y_UNUSED(PathId);
}
void TGranuleActualizationIndex::Start() {
AFL_VERIFY(Actualizers.empty());
- TieringActualizer = std::make_shared<TTieringActualizer>(PathId, VersionedIndex);
+ TieringActualizer = std::make_shared<TTieringActualizer>(PathId, VersionedIndex, StoragesManager);
SchemeActualizer = std::make_shared<TSchemeActualizer>(PathId, VersionedIndex);
Actualizers.emplace_back(TieringActualizer);
Actualizers.emplace_back(SchemeActualizer);
diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/index/index.h b/ydb/core/tx/columnshard/engines/storage/actualizer/index/index.h
index 49785e2f8a7..df3c0768d22 100644
--- a/ydb/core/tx/columnshard/engines/storage/actualizer/index/index.h
+++ b/ydb/core/tx/columnshard/engines/storage/actualizer/index/index.h
@@ -22,6 +22,7 @@ private:
const ui64 PathId;
const TVersionedIndex& VersionedIndex;
+ std::shared_ptr<IStoragesManager> StoragesManager;
public:
std::vector<TCSMetadataRequest> CollectMetadataRequests(const THashMap<ui64, TPortionInfo::TPtr>& portions);
@@ -31,7 +32,7 @@ public:
}
void Start();
- TGranuleActualizationIndex(const ui64 pathId, const TVersionedIndex& versionedIndex);
+ TGranuleActualizationIndex(const ui64 pathId, const TVersionedIndex& versionedIndex, const std::shared_ptr<IStoragesManager>& storagesManager);
void ExtractActualizationTasks(TTieringProcessContext& tasksContext, const NActualizer::TExternalTasksContext& externalContext) const;
diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.cpp b/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.cpp
index ab0e0990f9f..25bb2f71113 100644
--- a/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.cpp
@@ -57,6 +57,13 @@ std::optional<TTieringActualizer::TFullActualizationInfo> TTieringActualizer::Bu
targetTierName = tieringInfo.GetNextTierNameVerified();
}
if (d) {
+ if (targetTierName != NTiering::NCommon::DeleteTierName) {
+ if (const auto op = StoragesManager->GetOperatorOptional(targetTierName); !op || !op->IsReady()) {
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_eviction")("reason", "storage_not_ready")("tier", targetTierName)(
+ "portion", portion.GetPortionId());
+ return std::nullopt;
+ }
+ }
// if (currentTierName == "deploy_logs_s3" && targetTierName == IStoragesManager::DefaultStorageId) {
// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("tiering_info", tieringInfo.DebugString())("max", max->ToString())("now", now.ToString())("d", *d)("tiering", Tiering->GetDebugString())("pathId", PathId);
// AFL_VERIFY(false)("tiering_info", tieringInfo.DebugString())("max", max->ToString())("now", now.ToString())("d", *d)("tiering", Tiering->GetDebugString())("pathId", PathId);
diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.h b/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.h
index 3f3e6aca9d6..83b4cd71933 100644
--- a/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.h
+++ b/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.h
@@ -1,9 +1,11 @@
#pragma once
#include "counters.h"
+
+#include <ydb/core/tx/columnshard/engines/scheme/tiering/tier_info.h>
+#include <ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h>
#include <ydb/core/tx/columnshard/engines/storage/actualizer/abstract/abstract.h>
#include <ydb/core/tx/columnshard/engines/storage/actualizer/common/address.h>
-#include <ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h>
-#include <ydb/core/tx/columnshard/engines/scheme/tiering/tier_info.h>
+#include <ydb/core/tx/tiering/manager.h>
namespace NKikimr::NOlap {
class TTiering;
@@ -116,6 +118,7 @@ private:
std::shared_ptr<ISnapshotSchema> TargetCriticalSchema;
const ui64 PathId;
const TVersionedIndex& VersionedIndex;
+ const std::shared_ptr<IStoragesManager>& StoragesManager;
THashMap<TRWAddress, TRWAddressPortionsInfo> PortionIdByWaitDuration;
THashMap<ui64, TFindActualizationInfo> PortionsInfo;
@@ -138,11 +141,13 @@ public:
void Refresh(const std::optional<TTiering>& info, const TAddExternalContext& externalContext);
- TTieringActualizer(const ui64 pathId, const TVersionedIndex& versionedIndex)
+ TTieringActualizer(const ui64 pathId, const TVersionedIndex& versionedIndex, const std::shared_ptr<IStoragesManager>& storagesManager)
: PathId(pathId)
, VersionedIndex(versionedIndex)
+ , StoragesManager(storagesManager)
{
Y_UNUSED(PathId);
+ AFL_VERIFY(StoragesManager);
}
};
diff --git a/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp
index 32fa3ec7085..7dbb009ccf7 100644
--- a/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp
@@ -146,7 +146,7 @@ TGranuleMeta::TGranuleMeta(
NDataAccessorControl::TManagerConstructionContext mmContext(DataAccessorsManager->GetTabletActorId(), false);
ResetAccessorsManager(versionedIndex.GetLastSchema()->GetIndexInfo().GetMetadataManagerConstructor(), mmContext);
AFL_VERIFY(!!OptimizerPlanner);
- ActualizationIndex = std::make_unique<NActualizer::TGranuleActualizationIndex>(PathId, versionedIndex);
+ ActualizationIndex = std::make_unique<NActualizer::TGranuleActualizationIndex>(PathId, versionedIndex, StoragesManager);
}
void TGranuleMeta::UpsertPortionOnLoad(const std::shared_ptr<TPortionInfo>& portion) {
diff --git a/ydb/core/tx/columnshard/hooks/abstract/abstract.h b/ydb/core/tx/columnshard/hooks/abstract/abstract.h
index 34786258719..46ee0ba1c33 100644
--- a/ydb/core/tx/columnshard/hooks/abstract/abstract.h
+++ b/ydb/core/tx/columnshard/hooks/abstract/abstract.h
@@ -1,10 +1,10 @@
#pragma once
#include <ydb/core/tablet_flat/tablet_flat_executor.h>
+#include <ydb/core/tx/columnshard/common/limits.h>
#include <ydb/core/tx/columnshard/common/snapshot.h>
#include <ydb/core/tx/columnshard/engines/writer/write_controller.h>
-#include <ydb/core/tx/tiering/snapshot.h>
-#include <ydb/core/tx/columnshard/common/limits.h>
+#include <ydb/core/tx/tiering/tier/object.h>
#include <ydb/library/accessor/accessor.h>
#include <ydb/services/metadata/abstract/fetcher.h>
@@ -308,10 +308,8 @@ public:
return nullptr;
}
- virtual NMetadata::NFetcher::ISnapshot::TPtr GetFallbackTiersSnapshot() const {
- static std::shared_ptr<NColumnShard::NTiers::TTiersSnapshot> result =
- std::make_shared<NColumnShard::NTiers::TTiersSnapshot>(TInstant::Now());
- return result;
+ virtual THashMap<TString, NColumnShard::NTiers::TTierConfig> GetOverrideTierConfigs() const {
+ return {};
}
virtual void OnSwitchToWork(const ui64 tabletId) {
diff --git a/ydb/core/tx/columnshard/hooks/abstract/ya.make b/ydb/core/tx/columnshard/hooks/abstract/ya.make
index 1fc805cb1b4..33386775bd3 100644
--- a/ydb/core/tx/columnshard/hooks/abstract/ya.make
+++ b/ydb/core/tx/columnshard/hooks/abstract/ya.make
@@ -5,7 +5,10 @@ SRCS(
)
PEERDIR(
- ydb/core/tx/tiering
+ ydb/core/tx/tiering/tier
+ ydb/core/tx/columnshard/blobs_action/protos
+ ydb/core/tx/columnshard/data_sharing/protos
+ yql/essentials/core/expr_nodes
)
END()
diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp
index 410c3ca0b10..a1aee2a6a42 100644
--- a/ydb/core/tx/columnshard/tables_manager.cpp
+++ b/ydb/core/tx/columnshard/tables_manager.cpp
@@ -270,8 +270,8 @@ bool TTablesManager::RegisterSchemaPreset(const TSchemaPreset& schemaPreset, NIc
return true;
}
-void TTablesManager::AddSchemaVersion(const ui32 presetId, const NOlap::TSnapshot& version, const NKikimrSchemeOp::TColumnTableSchema& schema,
- NIceDb::TNiceDb& db, std::shared_ptr<TTiersManager>& manager) {
+void TTablesManager::AddSchemaVersion(
+ const ui32 presetId, const NOlap::TSnapshot& version, const NKikimrSchemeOp::TColumnTableSchema& schema, NIceDb::TNiceDb& db) {
Y_ABORT_UNLESS(SchemaPresetsIds.contains(presetId));
TSchemaPreset::TSchemaPresetVersionInfo versionInfo;
@@ -295,9 +295,7 @@ void TTablesManager::AddSchemaVersion(const ui32 presetId, const NOlap::TSnapsho
for (auto&& i : Tables) {
PrimaryIndex->RegisterTable(i.first);
}
- if (manager->IsReady()) {
- PrimaryIndex->OnTieringModified(Ttl);
- }
+ PrimaryIndex->OnTieringModified(Ttl);
} else {
PrimaryIndex->RegisterSchemaVersion(version, NOlap::IColumnEngine::TSchemaInitializationData(versionInfo));
}
@@ -309,8 +307,7 @@ std::unique_ptr<NTabletFlatExecutor::ITransaction> TTablesManager::CreateAddShar
}
void TTablesManager::AddTableVersion(const ui64 pathId, const NOlap::TSnapshot& version,
- const NKikimrTxColumnShard::TTableVersionInfo& versionInfo, const std::optional<NKikimrSchemeOp::TColumnTableSchema>& schema,
- NIceDb::TNiceDb& db, std::shared_ptr<TTiersManager>& manager) {
+ const NKikimrTxColumnShard::TTableVersionInfo& versionInfo, const std::optional<NKikimrSchemeOp::TColumnTableSchema>& schema, NIceDb::TNiceDb& db) {
auto it = Tables.find(pathId);
AFL_VERIFY(it != Tables.end());
auto& table = it->second;
@@ -338,11 +335,11 @@ void TTablesManager::AddTableVersion(const ui64 pathId, const NOlap::TSnapshot&
} else {
Y_ABORT_UNLESS(SchemaPresetsIds.contains(fakePreset.GetId()));
}
- AddSchemaVersion(fakePreset.GetId(), version, *schema, db, manager);
+ AddSchemaVersion(fakePreset.GetId(), version, *schema, db);
}
if (isTtlModified) {
- if (PrimaryIndex && manager->IsReady()) {
+ if (PrimaryIndex) {
if (auto findTtl = Ttl.FindPtr(pathId)) {
PrimaryIndex->OnTieringModified(*findTtl, pathId);
} else {
diff --git a/ydb/core/tx/columnshard/tables_manager.h b/ydb/core/tx/columnshard/tables_manager.h
index f44ca4c872c..05f1872c923 100644
--- a/ydb/core/tx/columnshard/tables_manager.h
+++ b/ydb/core/tx/columnshard/tables_manager.h
@@ -244,10 +244,10 @@ public:
void RegisterTable(TTableInfo&& table, NIceDb::TNiceDb& db);
bool RegisterSchemaPreset(const TSchemaPreset& schemaPreset, NIceDb::TNiceDb& db);
- void AddSchemaVersion(const ui32 presetId, const NOlap::TSnapshot& version, const NKikimrSchemeOp::TColumnTableSchema& schema,
- NIceDb::TNiceDb& db, std::shared_ptr<TTiersManager>& manager);
+ void AddSchemaVersion(
+ const ui32 presetId, const NOlap::TSnapshot& version, const NKikimrSchemeOp::TColumnTableSchema& schema, NIceDb::TNiceDb& db);
void AddTableVersion(const ui64 pathId, const NOlap::TSnapshot& version, const NKikimrTxColumnShard::TTableVersionInfo& versionInfo,
- const std::optional<NKikimrSchemeOp::TColumnTableSchema>& schema, NIceDb::TNiceDb& db, std::shared_ptr<TTiersManager>& manager);
+ const std::optional<NKikimrSchemeOp::TColumnTableSchema>& schema, NIceDb::TNiceDb& db);
bool FillMonitoringReport(NTabletFlatExecutor::TTransactionContext& txc, NJson::TJsonValue& json);
[[nodiscard]] std::unique_ptr<NTabletFlatExecutor::ITransaction> CreateAddShardingInfoTx(TColumnShard& owner, const ui64 pathId,
diff --git a/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp
index ae6f4bbb0ca..8bda3b49327 100644
--- a/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp
+++ b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp
@@ -1,18 +1,18 @@
#include "columnshard_ut_common.h"
#include "shard_reader.h"
-#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
+#include <ydb/core/base/tablet.h>
+#include <ydb/core/base/tablet_resolver.h>
+#include <ydb/core/protos/data_events.pb.h>
+#include <ydb/core/scheme/scheme_types_proto.h>
#include <ydb/core/tx/columnshard/engines/reader/sys_view/portions/portions.h>
#include <ydb/core/tx/columnshard/engines/storage/indexes/max/meta.h>
+#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
#include <ydb/core/tx/data_events/common/modification_type.h>
#include <ydb/core/tx/data_events/payload_helper.h>
-#include <ydb/core/protos/data_events.pb.h>
-
-#include <ydb/core/base/tablet.h>
-#include <ydb/core/base/tablet_resolver.h>
-#include <ydb/core/scheme/scheme_types_proto.h>
-#include <ydb/core/tx/tiering/snapshot.h>
+#include <ydb/core/tx/tiering/manager.h>
#include <ydb/core/tx/tiering/tier/object.h>
+
#include <library/cpp/testing/unittest/registar.h>
namespace NKikimr::NTxUT {
@@ -48,8 +48,8 @@ void TTester::Setup(TTestActorRuntime& runtime) {
runtime.UpdateCurrentTime(TInstant::Now());
}
-void ProvideTieringSnapshot(TTestBasicRuntime& runtime, const TActorId& sender, NMetadata::NFetcher::ISnapshot::TPtr snapshot) {
- auto event = std::make_unique<NMetadata::NProvider::TEvRefreshSubscriberData>(snapshot);
+void RefreshTiering(TTestBasicRuntime& runtime, const TActorId& sender) {
+ auto event = std::make_unique<TEvPrivate::TEvTieringModified>();
ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, event.release());
}
@@ -371,27 +371,25 @@ TSerializedTableRange MakeTestRange(std::pair<ui64, ui64> range, bool inclusiveF
TConstArrayRef<TCell>(cellsTo), inclusiveTo);
}
-NMetadata::NFetcher::ISnapshot::TPtr TTestSchema::BuildSnapshot(const TTableSpecials& specials) {
- std::unique_ptr<NColumnShard::NTiers::TTiersSnapshot> cs(new NColumnShard::NTiers::TTiersSnapshot(Now()));
+THashMap<TString, NColumnShard::NTiers::TTierConfig> TTestSchema::BuildSnapshot(const TTableSpecials& specials) {
if (specials.Tiers.empty()) {
- return cs;
+ return {};
}
+ THashMap<TString, NColumnShard::NTiers::TTierConfig> tiers;
for (auto&& tier : specials.Tiers) {
{
- NKikimrSchemeOp::TStorageTierConfig cProto;
- cProto.SetName(tier.Name);
- *cProto.MutableObjectStorage() = tier.S3;
+ NKikimrSchemeOp::TCompressionOptions compressionProto;
if (tier.Codec) {
- cProto.MutableCompression()->SetCodec(tier.GetCodecId());
+ compressionProto.SetCodec(tier.GetCodecId());
}
if (tier.CompressionLevel) {
- cProto.MutableCompression()->SetLevel(*tier.CompressionLevel);
+ compressionProto.SetLevel(*tier.CompressionLevel);
}
- NColumnShard::NTiers::TTierConfig tConfig(tier.Name, cProto);
- cs->MutableTierConfigs().emplace(tConfig.GetTierName(), tConfig);
+ NColumnShard::NTiers::TTierConfig tConfig(tier.S3, compressionProto);
+ tiers.emplace(tier.Name, tConfig);
}
}
- return cs;
+ return tiers;
}
void TTestSchema::InitSchema(const std::vector<NArrow::NTest::TTestColumn>& columns, const std::vector<NArrow::NTest::TTestColumn>& pk,
diff --git a/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h
index 5d0eecffe35..611fc1e2a28 100644
--- a/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h
+++ b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h
@@ -11,6 +11,7 @@
#include <ydb/core/tx/columnshard/test_helper/helper.h>
#include <ydb/core/tx/data_events/common/modification_type.h>
#include <ydb/core/tx/long_tx_service/public/types.h>
+#include <ydb/core/tx/tiering/manager.h>
#include <ydb/public/sdk/cpp/client/ydb_value/value.h>
#include <ydb/services/metadata/abstract/fetcher.h>
@@ -107,7 +108,7 @@ struct TTestSchema {
s3Config.SetProxyPort(8080);
s3Config.SetProxyScheme(NKikimrSchemeOp::TS3Settings::HTTP);
#else
- s3Config.SetEndpoint("fake");
+ s3Config.SetEndpoint("fake.fake");
s3Config.SetSecretKey("fakeSecret");
#endif
s3Config.SetRequestTimeoutMs(10000);
@@ -359,7 +360,7 @@ struct TTestSchema {
return out;
}
- static NMetadata::NFetcher::ISnapshot::TPtr BuildSnapshot(const TTableSpecials& specials);
+ static THashMap<TString, NColumnShard::NTiers::TTierConfig> BuildSnapshot(const TTableSpecials& specials);
static TString CommitTxBody(ui64, const std::vector<ui64>& writeIds) {
NKikimrTxColumnShard::TCommitTxBody proto;
@@ -404,8 +405,9 @@ struct TTestSchema {
}
};
+void RefreshTiering(TTestBasicRuntime& runtime, const TActorId& sender);
+
bool ProposeSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, const TString& txBody, NOlap::TSnapshot snap);
-void ProvideTieringSnapshot(TTestBasicRuntime& runtime, const TActorId& sender, NMetadata::NFetcher::ISnapshot::TPtr snapshot);
void PlanSchemaTx(TTestBasicRuntime& runtime, const TActorId& sender, NOlap::TSnapshot snap);
void PlanWriteTx(TTestBasicRuntime& runtime, const TActorId& sender, NOlap::TSnapshot snap, bool waitResult = true);
diff --git a/ydb/core/tx/columnshard/test_helper/controllers.cpp b/ydb/core/tx/columnshard/test_helper/controllers.cpp
index 997a700d901..a9f1a877a13 100644
--- a/ydb/core/tx/columnshard/test_helper/controllers.cpp
+++ b/ydb/core/tx/columnshard/test_helper/controllers.cpp
@@ -1,7 +1,8 @@
#include "columnshard_ut_common.h"
#include "controllers.h"
-#include <ydb/core/tx/columnshard/engines/changes/ttl.h>
+
#include <ydb/core/tx/columnshard/engines/changes/indexation.h>
+#include <ydb/core/tx/columnshard/engines/changes/ttl.h>
namespace NKikimr::NOlap {
@@ -10,13 +11,13 @@ void TWaitCompactionController::OnTieringModified(const std::shared_ptr<NKikimr:
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "OnTieringModified")("count", TiersModificationsCount);
}
-void TWaitCompactionController::SetTiersSnapshot(TTestBasicRuntime& runtime, const TActorId& tabletActorId, const NMetadata::NFetcher::ISnapshot::TPtr& snapshot) {
- CurrentConfig = snapshot;
+void TWaitCompactionController::OverrideTierConfigs(
+ TTestBasicRuntime& runtime, const TActorId& tabletActorId, THashMap<TString, NColumnShard::NTiers::TTierConfig> tiers) {
+ OverrideTiers = std::move(tiers);
ui32 startCount = TiersModificationsCount;
- NTxUT::ProvideTieringSnapshot(runtime, tabletActorId, snapshot);
+ NTxUT::RefreshTiering(runtime, tabletActorId);
while (TiersModificationsCount == startCount) {
runtime.SimulateSleep(TDuration::Seconds(1));
}
}
-
-} \ No newline at end of file
+} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/test_helper/controllers.h b/ydb/core/tx/columnshard/test_helper/controllers.h
index f4516d59478..281058322ac 100644
--- a/ydb/core/tx/columnshard/test_helper/controllers.h
+++ b/ydb/core/tx/columnshard/test_helper/controllers.h
@@ -1,6 +1,7 @@
#pragma once
-#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
#include <ydb/core/testlib/basics/runtime.h>
+#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
+#include <ydb/core/tx/tiering/manager.h>
namespace NKikimr::NOlap {
@@ -8,7 +9,7 @@ class TWaitCompactionController: public NYDBTest::NColumnShard::TController {
private:
using TBase = NKikimr::NYDBTest::ICSController;
TAtomicCounter ExportsFinishedCount = 0;
- NMetadata::NFetcher::ISnapshot::TPtr CurrentConfig;
+ THashMap<TString, NColumnShard::NTiers::TTierConfig> OverrideTiers;
ui32 TiersModificationsCount = 0;
YDB_READONLY(TAtomicCounter, TieringMetadataActualizationCount, 0);
YDB_READONLY(TAtomicCounter, StatisticsUsageCount, 0);
@@ -63,14 +64,11 @@ public:
virtual void OnMaxValueUsage() override {
MaxValueUsageCount.Inc();
}
- void SetTiersSnapshot(TTestBasicRuntime& runtime, const TActorId& tabletActorId, const NMetadata::NFetcher::ISnapshot::TPtr& snapshot);
+ void OverrideTierConfigs(
+ TTestBasicRuntime& runtime, const TActorId& tabletActorId, THashMap<TString, NColumnShard::NTiers::TTierConfig> tiers);
- virtual NMetadata::NFetcher::ISnapshot::TPtr GetFallbackTiersSnapshot() const override {
- if (CurrentConfig) {
- return CurrentConfig;
- } else {
- return TBase::GetFallbackTiersSnapshot();
- }
+ THashMap<TString, NColumnShard::NTiers::TTierConfig> GetOverrideTierConfigs() const override {
+ return OverrideTiers;
}
};
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_backup.cpp b/ydb/core/tx/columnshard/ut_rw/ut_backup.cpp
index 887780e4da7..8ca1b5a8724 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_backup.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_backup.cpp
@@ -98,7 +98,7 @@ Y_UNIT_TEST_SUITE(Backup) {
txBody.MutableBackupTask()->SetTableId(tableId);
txBody.MutableBackupTask()->SetSnapshotStep(backupSnapshot.GetPlanStep());
txBody.MutableBackupTask()->SetSnapshotTxId(backupSnapshot.GetTxId());
- txBody.MutableBackupTask()->MutableS3Settings()->SetEndpoint("fake");
+ txBody.MutableBackupTask()->MutableS3Settings()->SetEndpoint("fake.fake");
txBody.MutableBackupTask()->MutableS3Settings()->SetSecretKey("fakeSecret");
AFL_VERIFY(csControllerGuard->GetFinishedExportsCount() == 0);
UNIT_ASSERT(ProposeTx(runtime, sender, NKikimrTxColumnShard::TX_KIND_BACKUP, txBody.SerializeAsString(), ++txId));
diff --git a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
index 24aa3d39ad1..205286404ee 100644
--- a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
+++ b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
@@ -239,7 +239,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
TTestSchema::CreateInitShardTxBody(tableId, ydbSchema, testYdbPk, spec, "/Root/olapStore"),
NOlap::TSnapshot(++planStep, ++txId));
if (spec.HasTiers()) {
- csControllerGuard->SetTiersSnapshot(runtime, sender, TTestSchema::BuildSnapshot(spec));
+ csControllerGuard->OverrideTierConfigs(runtime, sender, TTestSchema::BuildSnapshot(spec));
}
//
@@ -293,7 +293,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
TTestSchema::AlterTableTxBody(tableId, 2, spec),
NOlap::TSnapshot(++planStep, ++txId));
if (spec.HasTiers()) {
- csControllerGuard->SetTiersSnapshot(runtime, sender, TTestSchema::BuildSnapshot(spec));
+ csControllerGuard->OverrideTierConfigs(runtime, sender, TTestSchema::BuildSnapshot(spec));
}
if (internal) {
@@ -320,7 +320,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
NOlap::TSnapshot(++planStep, ++txId));
UNIT_ASSERT(ok);
if (spec.HasTiers()) {
- csControllerGuard->SetTiersSnapshot(runtime, sender, TTestSchema::BuildSnapshot(TTestSchema::TTableSpecials()));
+ csControllerGuard->OverrideTierConfigs(runtime, sender, TTestSchema::BuildSnapshot(TTestSchema::TTableSpecials()));
}
PlanSchemaTx(runtime, sender, NOlap::TSnapshot(planStep, txId));
@@ -583,7 +583,7 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt
TTestSchema::CreateInitShardTxBody(tableId, testYdbSchema, testYdbPk, specs[0], "/Root/olapStore"),
NOlap::TSnapshot(++planStep, ++txId));
if (specs[0].Tiers.size()) {
- csControllerGuard->SetTiersSnapshot(runtime, sender, TTestSchema::BuildSnapshot(specs[0]));
+ csControllerGuard->OverrideTierConfigs(runtime, sender, TTestSchema::BuildSnapshot(specs[0]));
}
for (auto& data : blobs) {
@@ -617,14 +617,14 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt
TString originalEndpoint;
for (auto&& spec : specs[i].Tiers) {
hasColdEviction = true;
- if (spec.S3.GetEndpoint() != "fake") {
+ if (spec.S3.GetEndpoint() != "fake.fake") {
misconfig = true;
// misconfig in export => OK, misconfig after export => ERROR
if (i > 1) {
expectedReadResult = EExpectedResult::ERROR;
}
originalEndpoint = spec.S3.GetEndpoint();
- spec.S3.SetEndpoint("fake");
+ spec.S3.SetEndpoint("fake.fake");
tIdxCorrect = tIdx++;
}
break;
@@ -636,7 +636,7 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt
NOlap::TSnapshot(++planStep, ++txId));
}
if (specs[i].HasTiers() || reboots) {
- csControllerGuard->SetTiersSnapshot(runtime, sender, TTestSchema::BuildSnapshot(specs[i]));
+ csControllerGuard->OverrideTierConfigs(runtime, sender, TTestSchema::BuildSnapshot(specs[i]));
}
UNIT_ASSERT(TriggerMetadata(runtime, sender, csControllerGuard));
@@ -679,7 +679,7 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt
if (tIdxCorrect) {
specs[i].Tiers[*tIdxCorrect].S3.SetEndpoint(originalEndpoint);
- csControllerGuard->SetTiersSnapshot(runtime, sender, TTestSchema::BuildSnapshot(specs[i]));
+ csControllerGuard->OverrideTierConfigs(runtime, sender, TTestSchema::BuildSnapshot(specs[i]));
}
@@ -1064,7 +1064,7 @@ void TestDropWriteRace() {
void TestCompaction(std::optional<ui32> numWrites = {}) {
TTestBasicRuntime runtime;
TTester::Setup(runtime);
- auto csDefaultControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<TDefaultTestsController>();
+ auto csControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
TActorId sender = runtime.AllocateEdgeActor();
CreateTestBootstrapper(runtime,
@@ -1100,7 +1100,7 @@ void TestCompaction(std::optional<ui32> numWrites = {}) {
SetupSchema(runtime, sender, TTestSchema::AlterTableTxBody(tableId, 1, spec),
NOlap::TSnapshot(++planStep, ++txId));
- ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(spec));
+ csControllerGuard->OverrideTierConfigs(runtime, sender, TTestSchema::BuildSnapshot(spec));
// Writes
diff --git a/ydb/core/tx/schemeshard/olap/manager/manager.cpp b/ydb/core/tx/schemeshard/olap/manager/manager.cpp
index fe903af603a..109bcfe33b1 100644
--- a/ydb/core/tx/schemeshard/olap/manager/manager.cpp
+++ b/ydb/core/tx/schemeshard/olap/manager/manager.cpp
@@ -3,59 +3,17 @@
namespace NKikimr::NSchemeShard {
void TTablesStorage::OnAddObject(const TPathId& pathId, TColumnTableInfo::TPtr object) {
- for (const auto& tier : object->Description.GetTtlSettings().GetEnabled().GetTiers()) {
- std::optional<TString> usedExternalStorage;
- switch (tier.GetActionCase()) {
- case NKikimrSchemeOp::TTTLSettings_TTier::kEvictToExternalStorage:
- usedExternalStorage = tier.GetEvictToExternalStorage().GetStorage();
- break;
- case NKikimrSchemeOp::TTTLSettings_TTier::kDelete:
- case NKikimrSchemeOp::TTTLSettings_TTier::ACTION_NOT_SET:
- break;
- }
- if (usedExternalStorage) {
- AFL_VERIFY(PathsByTier[*usedExternalStorage].emplace(pathId).second);
- }
- }
for (auto&& s : object->GetColumnShards()) {
AFL_VERIFY(TablesByShard[s].AddId(pathId));
}
}
void TTablesStorage::OnRemoveObject(const TPathId& pathId, TColumnTableInfo::TPtr object) {
- for (const auto& tier : object->Description.GetTtlSettings().GetEnabled().GetTiers()) {
- std::optional<TString> usedExternalStorage;
- switch (tier.GetActionCase()) {
- case NKikimrSchemeOp::TTTLSettings_TTier::kEvictToExternalStorage:
- usedExternalStorage = tier.GetEvictToExternalStorage().GetStorage();
- break;
- case NKikimrSchemeOp::TTTLSettings_TTier::kDelete:
- case NKikimrSchemeOp::TTTLSettings_TTier::ACTION_NOT_SET:
- break;
- }
- if (usedExternalStorage) {
- auto findTier = PathsByTier.find(*usedExternalStorage);
- AFL_VERIFY(findTier);
- AFL_VERIFY(findTier->second.erase(pathId));
- if (findTier->second.empty()) {
- PathsByTier.erase(findTier);
- }
- }
- }
for (auto&& s : object->GetColumnShards()) {
TablesByShard[s].RemoveId(pathId);
}
}
-const THashSet<TPathId>& TTablesStorage::GetTablesWithTier(const TString& storageId) const {
- auto it = PathsByTier.find(storageId);
- if (it != PathsByTier.end()) {
- return it->second;
- } else {
- return Default<THashSet<TPathId>>();
- }
-}
-
TColumnTableInfo::TPtr TTablesStorage::ExtractPtr(const TPathId& id) {
auto it = Tables.find(id);
Y_ABORT_UNLESS(it != Tables.end());
diff --git a/ydb/core/tx/schemeshard/olap/manager/manager.h b/ydb/core/tx/schemeshard/olap/manager/manager.h
index 8c025690e97..a2697cf5b59 100644
--- a/ydb/core/tx/schemeshard/olap/manager/manager.h
+++ b/ydb/core/tx/schemeshard/olap/manager/manager.h
@@ -9,7 +9,6 @@ namespace NKikimr::NSchemeShard {
class TTablesStorage {
private:
THashMap<TPathId, TColumnTableInfo::TPtr> Tables;
- THashMap<TString, THashSet<TPathId>> PathsByTier;
THashMap<ui64, TColumnTablesLayout::TTableIdsGroup> TablesByShard;
void OnAddObject(const TPathId& pathId, TColumnTableInfo::TPtr object);
diff --git a/ydb/core/tx/schemeshard/olap/operations/alter/common/update.cpp b/ydb/core/tx/schemeshard/olap/operations/alter/common/update.cpp
index 442c6783355..e083706f1aa 100644
--- a/ydb/core/tx/schemeshard/olap/operations/alter/common/update.cpp
+++ b/ydb/core/tx/schemeshard/olap/operations/alter/common/update.cpp
@@ -12,6 +12,27 @@ TConclusionStatus TColumnTableUpdate::DoStart(const TUpdateStartContext& context
auto tableInfo = context.GetSSOperationContext()->SS->ColumnTables.TakeVerified(pathId);
context.GetSSOperationContext()->SS->PersistColumnTableAlter(*context.GetDB(), pathId, *GetTargetTableInfoVerified());
tableInfo->AlterData = GetTargetTableInfoVerified();
+
+ {
+ THashSet<TString> oldDataSources = tableInfo->GetUsedTiers();
+ THashSet<TString> newDataSources = GetTargetTableInfoVerified()->GetUsedTiers();
+ for (const auto& tier : oldDataSources) {
+ if (!newDataSources.contains(tier)) {
+ auto tierPath = TPath::Resolve(tier, context.GetSSOperationContext()->SS);
+ AFL_VERIFY(tierPath.IsResolved())("path", tier);
+ context.GetSSOperationContext()->SS->PersistRemoveExternalDataSourceReference(*context.GetDB(), tierPath->PathId, pathId);
+ }
+ }
+ for (const auto& tier : newDataSources) {
+ if (!oldDataSources.contains(tier)) {
+ auto tierPath = TPath::Resolve(tier, context.GetSSOperationContext()->SS);
+ AFL_VERIFY(tierPath.IsResolved())("path", tier);
+ context.GetSSOperationContext()->SS->PersistExternalDataSourceReference(
+ *context.GetDB(), tierPath->PathId, TPath::Init(pathId, context.GetSSOperationContext()->SS));
+ }
+ }
+ }
+
return TConclusionStatus::Success();
}
diff --git a/ydb/core/tx/schemeshard/olap/operations/alter/in_store/schema/update.cpp b/ydb/core/tx/schemeshard/olap/operations/alter/in_store/schema/update.cpp
index bbf1845ac1b..4bef41d1628 100644
--- a/ydb/core/tx/schemeshard/olap/operations/alter/in_store/schema/update.cpp
+++ b/ydb/core/tx/schemeshard/olap/operations/alter/in_store/schema/update.cpp
@@ -41,7 +41,7 @@ NKikimr::TConclusionStatus TInStoreSchemaUpdate::DoInitializeImpl(const TUpdateI
return patch;
}
TSimpleErrorCollector collector;
- if (!originalSchema.ValidateTtlSettings(ttl.GetData(), collector)) {
+ if (!originalSchema.ValidateTtlSettings(ttl.GetData(), *context.GetSSOperationContext(), collector)) {
return TConclusionStatus::Fail("ttl update error: " + collector->GetErrorMessage() + ". in alter constructor STANDALONE_UPDATE");
}
*description.MutableTtlSettings() = ttl.SerializeToProto();
diff --git a/ydb/core/tx/schemeshard/olap/operations/alter/standalone/update.cpp b/ydb/core/tx/schemeshard/olap/operations/alter/standalone/update.cpp
index da5218ecb46..1b11cd53ff9 100644
--- a/ydb/core/tx/schemeshard/olap/operations/alter/standalone/update.cpp
+++ b/ydb/core/tx/schemeshard/olap/operations/alter/standalone/update.cpp
@@ -65,7 +65,7 @@ NKikimr::TConclusionStatus TStandaloneSchemaUpdate::DoInitializeImpl(const TUpda
}
*description.MutableTtlSettings() = ttl.SerializeToProto();
}
- if (!targetSchema.ValidateTtlSettings(ttl.GetData(), collector)) {
+ if (!targetSchema.ValidateTtlSettings(ttl.GetData(), *context.GetSSOperationContext(), collector)) {
return TConclusionStatus::Fail("ttl update error: " + collector->GetErrorMessage() + ". in alter constructor STANDALONE_UPDATE");
}
auto saSharding = originalTable.GetTableInfoVerified().GetStandaloneShardingVerified();
diff --git a/ydb/core/tx/schemeshard/olap/operations/alter_store.cpp b/ydb/core/tx/schemeshard/olap/operations/alter_store.cpp
index 174a9610653..bc29d6c5613 100644
--- a/ydb/core/tx/schemeshard/olap/operations/alter_store.cpp
+++ b/ydb/core/tx/schemeshard/olap/operations/alter_store.cpp
@@ -533,7 +533,7 @@ public:
}
auto it = alterData->SchemaPresets.find(table->Description.GetSchemaPresetId());
AFL_VERIFY(it != alterData->SchemaPresets.end())("preset_info", table->Description.DebugString());
- if (!it->second.ValidateTtlSettings(table->Description.GetTtlSettings(), errors)) {
+ if (!it->second.ValidateTtlSettings(table->Description.GetTtlSettings(), context, errors)) {
return result;
}
}
diff --git a/ydb/core/tx/schemeshard/olap/operations/create_table.cpp b/ydb/core/tx/schemeshard/olap/operations/create_table.cpp
index c69ab804eb3..a407a991689 100644
--- a/ydb/core/tx/schemeshard/olap/operations/create_table.cpp
+++ b/ydb/core/tx/schemeshard/olap/operations/create_table.cpp
@@ -24,7 +24,7 @@ private:
protected:
ui32 ShardsCount = 0;
public:
- bool Deserialize(const NKikimrSchemeOp::TColumnTableDescription& description, IErrorCollector& errors) {
+ bool Deserialize(const NKikimrSchemeOp::TColumnTableDescription& description, const TOperationContext& context, IErrorCollector& errors) {
Name = description.GetName();
ShardsCount = std::max<ui32>(description.GetColumnShardCount(), 1);
@@ -34,7 +34,7 @@ public:
if (description.HasTtlSettings()) {
TtlSettings = description.GetTtlSettings();
- if (!GetSchema().ValidateTtlSettings(description.GetTtlSettings(), errors)) {
+ if (!GetSchema().ValidateTtlSettings(description.GetTtlSettings(), context, errors)) {
return false;
}
}
@@ -50,7 +50,7 @@ public:
}
FillDefaultSharding(*tableInfo->Description.MutableSharding());
- if (!Deserialize(description, errors)) {
+ if (!Deserialize(description, context, errors)) {
return nullptr;
}
if (tableInfo->Description.GetSharding().HasHashSharding()) {
@@ -834,6 +834,13 @@ public:
}
NIceDb::TNiceDb db(context.GetDB());
+
+ for (const auto& tier : tableInfo->GetUsedTiers()) {
+ auto tierPath = TPath::Resolve(tier, context.SS);
+ AFL_VERIFY(tierPath.IsResolved())("path", tier);
+ context.SS->PersistExternalDataSourceReference(db, tierPath->PathId, dstPath);
+ }
+
context.SS->PersistTxState(db, OperationId);
context.OnComplete.ActivateTx(OperationId);
diff --git a/ydb/core/tx/schemeshard/olap/operations/drop_table.cpp b/ydb/core/tx/schemeshard/olap/operations/drop_table.cpp
index f5483811903..08541ce5dd1 100644
--- a/ydb/core/tx/schemeshard/olap/operations/drop_table.cpp
+++ b/ydb/core/tx/schemeshard/olap/operations/drop_table.cpp
@@ -266,14 +266,21 @@ private:
Y_ABORT_UNLESS(txState);
Y_ABORT_UNLESS(txState->TxType == TTxState::TxDropColumnTable);
+ NIceDb::TNiceDb db(context.GetDB());
+
bool isStandalone = false;
{
Y_ABORT_UNLESS(context.SS->ColumnTables.contains(txState->TargetPathId));
auto tableInfo = context.SS->ColumnTables.GetVerified(txState->TargetPathId);
isStandalone = tableInfo->IsStandalone();
+
+ for (const auto& tier : tableInfo->GetUsedTiers()) {
+ auto tierPath = TPath::Resolve(tier, context.SS);
+ AFL_VERIFY(tierPath.IsResolved())("path", tier);
+ context.SS->PersistRemoveExternalDataSourceReference(db, tierPath->PathId, txState->TargetPathId);
+ }
}
- NIceDb::TNiceDb db(context.GetDB());
context.SS->PersistColumnTableRemove(db, txState->TargetPathId);
if (isStandalone) {
diff --git a/ydb/core/tx/schemeshard/olap/schema/schema.cpp b/ydb/core/tx/schemeshard/olap/schema/schema.cpp
index 97cc54eeaaf..82e2a499bdd 100644
--- a/ydb/core/tx/schemeshard/olap/schema/schema.cpp
+++ b/ydb/core/tx/schemeshard/olap/schema/schema.cpp
@@ -5,7 +5,8 @@
namespace NKikimr::NSchemeShard {
-bool TOlapSchema::ValidateTtlSettings(const NKikimrSchemeOp::TColumnDataLifeCycle& ttl, IErrorCollector& errors) const {
+bool TOlapSchema::ValidateTtlSettings(
+ const NKikimrSchemeOp::TColumnDataLifeCycle& ttl, const TOperationContext& context, IErrorCollector& errors) const {
using TTtlProto = NKikimrSchemeOp::TColumnDataLifeCycle;
switch (ttl.GetStatusCase()) {
case TTtlProto::kEnabled:
@@ -15,7 +16,7 @@ bool TOlapSchema::ValidateTtlSettings(const NKikimrSchemeOp::TColumnDataLifeCycl
errors.AddError("Incorrect ttl column - not found in scheme");
return false;
}
- return TTTLValidator::ValidateColumnTableTtl(ttl.GetEnabled(), Indexes, {}, Columns.GetColumns(), Columns.GetColumnsByName(), errors);
+ return TTTLValidator::ValidateColumnTableTtl(ttl.GetEnabled(), Indexes, {}, Columns.GetColumns(), Columns.GetColumnsByName(), context, errors);
}
case TTtlProto::kDisabled:
default:
diff --git a/ydb/core/tx/schemeshard/olap/schema/schema.h b/ydb/core/tx/schemeshard/olap/schema/schema.h
index 309ce3ab69f..9e950b36c99 100644
--- a/ydb/core/tx/schemeshard/olap/schema/schema.h
+++ b/ydb/core/tx/schemeshard/olap/schema/schema.h
@@ -9,6 +9,10 @@
#include <ydb/core/tx/schemeshard/olap/options/schema.h>
namespace NKikimr::NSchemeShard {
+struct TOperationContext;
+}
+
+namespace NKikimr::NSchemeShard {
class TOlapSchema {
private:
@@ -27,7 +31,7 @@ namespace NKikimr::NSchemeShard {
void ParseFromLocalDB(const NKikimrSchemeOp::TColumnTableSchema& tableSchema);
void Serialize(NKikimrSchemeOp::TColumnTableSchema& tableSchema) const;
bool Validate(const NKikimrSchemeOp::TColumnTableSchema& opSchema, IErrorCollector& errors) const;
- bool ValidateTtlSettings(const NKikimrSchemeOp::TColumnDataLifeCycle& ttlSettings, IErrorCollector& errors) const;
+ bool ValidateTtlSettings(const NKikimrSchemeOp::TColumnDataLifeCycle& ttlSettings, const TOperationContext& context, IErrorCollector& errors) const;
};
class TOlapStoreSchemaPreset: public TOlapSchema {
diff --git a/ydb/core/tx/schemeshard/olap/table/table.h b/ydb/core/tx/schemeshard/olap/table/table.h
index a092e175e25..8a4d665d6fc 100644
--- a/ydb/core/tx/schemeshard/olap/table/table.h
+++ b/ydb/core/tx/schemeshard/olap/table/table.h
@@ -49,6 +49,16 @@ public:
}
}
+ THashSet<TString> GetUsedTiers() const {
+ THashSet<TString> tiers;
+ for (const auto& tier : Description.GetTtlSettings().GetEnabled().GetTiers()) {
+ if (tier.HasEvictToExternalStorage()) {
+ tiers.emplace(tier.GetEvictToExternalStorage().GetStorage());
+ }
+ }
+ return tiers;
+ }
+
NKikimrSchemeOp::TColumnTableDescription Description;
TMaybe<NKikimrSchemeOp::TColumnStoreSharding> StandaloneSharding;
TMaybe<NKikimrSchemeOp::TAlterColumnTable> AlterBody;
diff --git a/ydb/core/tx/schemeshard/olap/ttl/validator.cpp b/ydb/core/tx/schemeshard/olap/ttl/validator.cpp
index bb9b0230aad..f781a064e0f 100644
--- a/ydb/core/tx/schemeshard/olap/ttl/validator.cpp
+++ b/ydb/core/tx/schemeshard/olap/ttl/validator.cpp
@@ -1,5 +1,8 @@
#include "validator.h"
+
#include <ydb/core/tx/schemeshard/common/validation.h>
+#include <ydb/core/tx/schemeshard/schemeshard_impl.h>
+#include <ydb/core/tx/tiering/tier/object.h>
namespace NKikimr::NSchemeShard {
@@ -15,7 +18,7 @@ static inline NScheme::TTypeInfo GetType(const TOlapColumnsDescription::TColumn&
}
-bool TTTLValidator::ValidateColumnTableTtl(const NKikimrSchemeOp::TColumnDataLifeCycle::TTtl& ttl, const TOlapIndexesDescription& indexes, const THashMap<ui32, TOlapColumnsDescription::TColumn>& sourceColumns, const THashMap<ui32, TOlapColumnsDescription::TColumn>& alterColumns, const THashMap<TString, ui32>& colName2Id, IErrorCollector& errors) {
+bool TTTLValidator::ValidateColumnTableTtl(const NKikimrSchemeOp::TColumnDataLifeCycle::TTtl& ttl, const TOlapIndexesDescription& indexes, const THashMap<ui32, TOlapColumnsDescription::TColumn>& sourceColumns, const THashMap<ui32, TOlapColumnsDescription::TColumn>& alterColumns, const THashMap<TString, ui32>& colName2Id, const TOperationContext& context, IErrorCollector& errors) {
const TString colName = ttl.GetColumnName();
auto it = colName2Id.find(colName);
@@ -44,11 +47,6 @@ bool TTTLValidator::ValidateColumnTableTtl(const NKikimrSchemeOp::TColumnDataLif
return false;
}
- if (!ttl.HasExpireAfterSeconds() && ttl.GetTiers().empty()) {
- errors.AddError("TTL without eviction time");
- return false;
- }
-
auto unit = ttl.GetColumnUnit();
const auto& columnType = GetType(*column);
@@ -97,6 +95,32 @@ bool TTTLValidator::ValidateColumnTableTtl(const NKikimrSchemeOp::TColumnDataLif
}
}
+ for (const auto& tier : ttl.GetTiers()) {
+ if (!tier.HasEvictToExternalStorage()) {
+ continue;
+ }
+ const TString& tierPathString = tier.GetEvictToExternalStorage().GetStorage();
+ TPath tierPath = TPath::Resolve(tierPathString, context.SS);
+ if (!tierPath.IsResolved() || tierPath.IsDeleted() || tierPath.IsUnderDeleting()) {
+ errors.AddError("Object not found: " + tierPathString);
+ return false;
+ }
+ if (!tierPath->IsExternalDataSource()) {
+ errors.AddError("Not an external data source: " + tierPathString);
+ return false;
+ }
+ {
+ auto* findExternalDataSource = context.SS->ExternalDataSources.FindPtr(tierPath->PathId);
+ AFL_VERIFY(findExternalDataSource);
+ NKikimrSchemeOp::TExternalDataSourceDescription proto;
+ (*findExternalDataSource)->FillProto(proto);
+ if (auto status = NColumnShard::NTiers::TTierConfig().DeserializeFromProto(proto); status.IsFail()) {
+ errors.AddError("Cannot use external data source \"" + tierPathString + "\" for tiering: " + status.GetErrorMessage());
+ return false;
+ }
+ }
+ }
+
return true;
}
diff --git a/ydb/core/tx/schemeshard/olap/ttl/validator.h b/ydb/core/tx/schemeshard/olap/ttl/validator.h
index 72b9a975e83..39207bbaf71 100644
--- a/ydb/core/tx/schemeshard/olap/ttl/validator.h
+++ b/ydb/core/tx/schemeshard/olap/ttl/validator.h
@@ -3,12 +3,16 @@
#include <ydb/core/tx/schemeshard/olap/indexes/schema.h>
namespace NKikimr::NSchemeShard {
+struct TOperationContext;
+}
+
+namespace NKikimr::NSchemeShard {
class TTTLValidator {
public:
static bool ValidateColumnTableTtl(const NKikimrSchemeOp::TColumnDataLifeCycle::TTtl& ttl, const TOlapIndexesDescription& indexes,
const THashMap<ui32, TOlapColumnsDescription::TColumn>& sourceColumns,
const THashMap<ui32, TOlapColumnsDescription::TColumn>& alterColumns,
- const THashMap<TString, ui32>& colName2Id,
+ const THashMap<TString, ui32>& colName2Id, const TOperationContext& context,
IErrorCollector& errors);
};
diff --git a/ydb/core/tx/schemeshard/olap/ttl/ya.make b/ydb/core/tx/schemeshard/olap/ttl/ya.make
index f6c57de62a9..3d84cb700fb 100644
--- a/ydb/core/tx/schemeshard/olap/ttl/ya.make
+++ b/ydb/core/tx/schemeshard/olap/ttl/ya.make
@@ -9,6 +9,7 @@ SRCS(
PEERDIR(
ydb/core/base
ydb/core/protos
+ ydb/core/tx/tiering/tier
)
YQL_LAST_ABI_VERSION()
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_external_data_source.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_external_data_source.cpp
index 1009c5dff47..e80fe117bdd 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_external_data_source.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_external_data_source.cpp
@@ -3,6 +3,8 @@
#include "schemeshard__operation_common.h"
#include "schemeshard_impl.h"
+#include <ydb/core/tx/tiering/tier/object.h>
+
#include <utility>
namespace {
@@ -229,9 +231,7 @@ public:
RETURN_RESULT_UNLESS(IsDestinationPathValid(result, dstPath, acl));
RETURN_RESULT_UNLESS(IsApplyIfChecksPassed(result, context));
- RETURN_RESULT_UNLESS(IsDescriptionValid(result,
- externalDataSourceDescription,
- context.SS->ExternalSourceFactory));
+ RETURN_RESULT_UNLESS(IsDescriptionValid(result, externalDataSourceDescription, context.SS->ExternalSourceFactory));
const auto oldExternalDataSourceInfo =
context.SS->ExternalDataSources.Value(dstPath->PathId, nullptr);
@@ -241,6 +241,24 @@ public:
oldExternalDataSourceInfo->AlterVersion + 1);
Y_ABORT_UNLESS(externalDataSourceInfo);
+ {
+ bool isTieredStorage = false;
+ for (const auto& referrer : externalDataSourceInfo->ExternalTableReferences.GetReferences()) {
+ if (TPath::Init(PathIdFromPathId(referrer.GetPathId()), context.SS)->PathType ==
+ NKikimrSchemeOp::EPathType::EPathTypeColumnTable) {
+ isTieredStorage = true;
+ break;
+ }
+ }
+ if (isTieredStorage) {
+ if (auto status = NColumnShard::NTiers::TTierConfig().DeserializeFromProto(externalDataSourceDescription); status.IsFail()) {
+ result->SetError(NKikimrScheme::StatusInvalidParameter,
+ "Cannot make this change while the external data source is used as a tiered storage: " + status.GetErrorMessage());
+ return result;
+ }
+ }
+ }
+
AddPathInSchemeShard(result, dstPath);
const TPathElement::TPtr externalDataSource =
ReplaceExternalDataSourcePathElement(dstPath);
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index d0d6fd81786..9aca9b5724a 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -2994,6 +2994,31 @@ void TSchemeShard::PersistRemoveExternalDataSource(NIceDb::TNiceDb& db, TPathId
db.Table<Schema::ExternalDataSource>().Key(pathId.OwnerId, pathId.LocalPathId).Delete();
}
+void TSchemeShard::PersistExternalDataSourceReference(NIceDb::TNiceDb& db, TPathId pathId, const TPath& referrer) {
+ auto findSource = ExternalDataSources.FindPtr(pathId);
+ Y_ABORT_UNLESS(findSource);
+ auto* ref = (*findSource)->ExternalTableReferences.AddReferences();
+ ref->SetPath(referrer.PathString());
+ PathIdFromPathId(referrer->PathId, ref->MutablePathId());
+ db.Table<Schema::ExternalDataSource>()
+ .Key(pathId.OwnerId, pathId.LocalPathId)
+ .Update(
+ NIceDb::TUpdate<Schema::ExternalDataSource::ExternalTableReferences>{ (*findSource)->ExternalTableReferences.SerializeAsString() });
+}
+
+void TSchemeShard::PersistRemoveExternalDataSourceReference(NIceDb::TNiceDb& db, TPathId pathId, TPathId referrer) {
+ auto findSource = ExternalDataSources.FindPtr(pathId);
+ Y_ABORT_UNLESS(findSource);
+ EraseIf(*(*findSource)->ExternalTableReferences.MutableReferences(),
+ [referrer](const NKikimrSchemeOp::TExternalTableReferences::TReference& reference) {
+ return PathIdFromPathId(reference.GetPathId()) == referrer;
+ });
+ db.Table<Schema::ExternalDataSource>()
+ .Key(pathId.OwnerId, pathId.LocalPathId)
+ .Update(
+ NIceDb::TUpdate<Schema::ExternalDataSource::ExternalTableReferences>{ (*findSource)->ExternalTableReferences.SerializeAsString() });
+}
+
void TSchemeShard::PersistView(NIceDb::TNiceDb &db, TPathId pathId) {
Y_ABORT_UNLESS(IsLocalId(pathId));
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h
index 5120d8aa737..7dc89abd059 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.h
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.h
@@ -835,6 +835,8 @@ public:
// ExternalDataSource
void PersistExternalDataSource(NIceDb::TNiceDb &db, TPathId pathId, const TExternalDataSourceInfo::TPtr externalDataSource);
void PersistRemoveExternalDataSource(NIceDb::TNiceDb& db, TPathId pathId);
+ void PersistExternalDataSourceReference(NIceDb::TNiceDb &db, TPathId pathId, const TPath& referrer);
+ void PersistRemoveExternalDataSourceReference(NIceDb::TNiceDb &db, TPathId pathId, TPathId referrer);
void PersistView(NIceDb::TNiceDb &db, TPathId pathId);
void PersistRemoveView(NIceDb::TNiceDb& db, TPathId pathId);
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h
index a6623c63e54..0adf960869d 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.h
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h
@@ -3602,6 +3602,15 @@ struct TExternalDataSourceInfo: TSimpleRefCount<TExternalDataSourceInfo> {
NKikimrSchemeOp::TAuth Auth;
NKikimrSchemeOp::TExternalTableReferences ExternalTableReferences;
NKikimrSchemeOp::TExternalDataSourceProperties Properties;
+
+ void FillProto(NKikimrSchemeOp::TExternalDataSourceDescription& proto) const {
+ proto.SetVersion(AlterVersion);
+ proto.SetSourceType(SourceType);
+ proto.SetLocation(Location);
+ proto.SetInstallation(Installation);
+ proto.MutableAuth()->CopyFrom(Auth);
+ proto.MutableProperties()->CopyFrom(Properties);
+ }
};
struct TViewInfo : TSimpleRefCount<TViewInfo> {
diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
index fe521ea8dbb..bad67d7a839 100644
--- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
@@ -1059,12 +1059,7 @@ void TPathDescriber::DescribeExternalDataSource(const TActorContext&, TPathId pa
auto entry = Result->Record.MutablePathDescription()->MutableExternalDataSourceDescription();
entry->SetName(pathEl->Name);
PathIdFromPathId(pathId, entry->MutablePathId());
- entry->SetVersion(externalDataSourceInfo->AlterVersion);
- entry->SetSourceType(externalDataSourceInfo->SourceType);
- entry->SetLocation(externalDataSourceInfo->Location);
- entry->SetInstallation(externalDataSourceInfo->Installation);
- entry->MutableAuth()->CopyFrom(externalDataSourceInfo->Auth);
- entry->MutableProperties()->CopyFrom(externalDataSourceInfo->Properties);
+ externalDataSourceInfo->FillProto(*entry);
}
void TPathDescriber::DescribeView(const TActorContext&, TPathId pathId, TPathElement::TPtr pathEl) {
diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h
index 6f0e1c12a5a..c3d154d6be0 100644
--- a/ydb/core/tx/schemeshard/schemeshard_schema.h
+++ b/ydb/core/tx/schemeshard/schemeshard_schema.h
@@ -1779,7 +1779,7 @@ struct Schema : NIceDb::Schema {
struct Location : Column<5, NScheme::NTypeIds::Utf8> {};
struct Installation : Column<6, NScheme::NTypeIds::Utf8> {};
struct Auth : Column<7, NScheme::NTypeIds::String> {};
- struct ExternalTableReferences : Column<8, NScheme::NTypeIds::String> {};
+ struct ExternalTableReferences : Column<8, NScheme::NTypeIds::String> {}; // references from any scheme objects
struct Properties : Column<9, NScheme::NTypeIds::String> {};
using TKey = TableKey<OwnerPathId, LocalPathId>;
diff --git a/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp b/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp
index 54992dee3d9..c67c29dae21 100644
--- a/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp
+++ b/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp
@@ -543,6 +543,19 @@ Y_UNIT_TEST_SUITE(TOlap) {
NLs::HasColumnTableTtlSettingsVersion(1),
NLs::HasColumnTableTtlSettingsDisabled()));
+ TestCreateExternalDataSource(runtime, ++txId, "/MyRoot", R"(
+ Name: "Tier1"
+ SourceType: "ObjectStorage"
+ Location: "http://fake.fake/fake"
+ Auth: {
+ Aws: {
+ AwsAccessKeyIdSecretName: "secret"
+ AwsSecretAccessKeySecretName: "secret"
+ }
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+
TString tableSchema3 = R"(
Name: "Table3"
ColumnShardCount: 1
@@ -553,7 +566,7 @@ Y_UNIT_TEST_SUITE(TOlap) {
Tiers: {
ApplyAfterSeconds: 360
EvictToExternalStorage {
- Storage: "Tier1"
+ Storage: "/MyRoot/Tier1"
}
}
}
@@ -567,7 +580,7 @@ Y_UNIT_TEST_SUITE(TOlap) {
NLs::HasColumnTableSchemaPreset("default"),
NLs::HasColumnTableSchemaVersion(1),
NLs::HasColumnTableTtlSettingsVersion(1),
- NLs::HasColumnTableTtlSettingsTier("timestamp", TDuration::Seconds(360), "Tier1")));
+ NLs::HasColumnTableTtlSettingsTier("timestamp", TDuration::Seconds(360), "/MyRoot/Tier1")));
TString tableSchema4 = R"(
Name: "Table4"
@@ -579,7 +592,7 @@ Y_UNIT_TEST_SUITE(TOlap) {
Tiers: {
ApplyAfterSeconds: 3600000000
EvictToExternalStorage {
- Storage: "Tier1"
+ Storage: "/MyRoot/Tier1"
}
}
}
@@ -723,6 +736,19 @@ Y_UNIT_TEST_SUITE(TOlap) {
)");
env.TestWaitNotification(runtime, txId);
+ TestCreateExternalDataSource(runtime, ++txId, "/MyRoot", R"(
+ Name: "Tier1"
+ SourceType: "ObjectStorage"
+ Location: "http://fake.fake/fake"
+ Auth: {
+ Aws: {
+ AwsAccessKeyIdSecretName: "secret"
+ AwsSecretAccessKeySecretName: "secret"
+ }
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+
TestAlterColumnTable(runtime, ++txId, "/MyRoot/OlapStore", R"(
Name: "ColumnTable"
AlterTtlSettings {
@@ -732,7 +758,7 @@ Y_UNIT_TEST_SUITE(TOlap) {
Tiers: {
ApplyAfterSeconds: 3600000000
EvictToExternalStorage {
- Storage: "Tier1"
+ Storage: "/MyRoot/Tier1"
}
}
}
diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make
index 8909b0f3c42..1f681881caa 100644
--- a/ydb/core/tx/schemeshard/ya.make
+++ b/ydb/core/tx/schemeshard/ya.make
@@ -310,6 +310,7 @@ PEERDIR(
yql/essentials/providers/common/proto
ydb/services/bg_tasks
ydb/core/tx/columnshard/bg_tasks/manager
+ ydb/core/tx/tiering/tier
)
YQL_LAST_ABI_VERSION()
diff --git a/ydb/core/tx/tiering/common.h b/ydb/core/tx/tiering/common.h
index 1eb341b78b8..cd7fa14b82b 100644
--- a/ydb/core/tx/tiering/common.h
+++ b/ydb/core/tx/tiering/common.h
@@ -13,6 +13,11 @@ enum EEvents {
EvSSFetchingProblem,
EvTimeout,
EvTiersManagerReadyForUsage,
+ EvWatchSchemeObject,
+ EvNotifySchemeObjectUpdated,
+ EvNotifySchemeObjectDeleted,
+ EvSchemeObjectResulutionFailed,
+ EvListTieredStoragesResult,
EvEnd
};
diff --git a/ydb/core/tx/tiering/external_data.cpp b/ydb/core/tx/tiering/external_data.cpp
deleted file mode 100644
index 812215b01ff..00000000000
--- a/ydb/core/tx/tiering/external_data.cpp
+++ /dev/null
@@ -1,24 +0,0 @@
-#include "external_data.h"
-
-#include <ydb/core/base/path.h>
-#include <ydb/core/tx/tiering/tier/manager.h>
-
-#include <library/cpp/json/writer/json_value.h>
-#include <library/cpp/protobuf/json/proto2json.h>
-
-#include <util/string/join.h>
-
-namespace NKikimr::NColumnShard::NTiers {
-
-void TSnapshotConstructor::EnrichSnapshotData(ISnapshot::TPtr original, NMetadata::NFetcher::ISnapshotAcceptorController::TPtr controller) const {
- controller->OnSnapshotEnriched(original);
-}
-
-TSnapshotConstructor::TSnapshotConstructor() {
-}
-
-std::vector<NMetadata::IClassBehaviour::TPtr> TSnapshotConstructor::DoGetManagers() const {
- return { TTierConfig::GetBehaviour() };
-}
-
-}
diff --git a/ydb/core/tx/tiering/external_data.h b/ydb/core/tx/tiering/external_data.h
deleted file mode 100644
index 456ad1ff59e..00000000000
--- a/ydb/core/tx/tiering/external_data.h
+++ /dev/null
@@ -1,25 +0,0 @@
-#pragma once
-#include "snapshot.h"
-
-#include <ydb/core/protos/flat_scheme_op.pb.h>
-#include <ydb/core/tx/scheme_cache/scheme_cache.h>
-#include <ydb/services/metadata/service.h>
-
-#include <library/cpp/json/writer/json_value.h>
-
-namespace NKikimr::NColumnShard::NTiers {
-
-class TSnapshotConstructor: public NMetadata::NFetcher::TSnapshotsFetcher<TTiersSnapshot> {
-private:
- using TNavigate = NSchemeCache::TSchemeCacheNavigate;
- using TBaseActor = TActor<TSnapshotConstructor>;
- using ISnapshot = NMetadata::NFetcher::ISnapshot;
-protected:
- virtual std::vector<NMetadata::IClassBehaviour::TPtr> DoGetManagers() const override;
-public:
- virtual void EnrichSnapshotData(ISnapshot::TPtr original, NMetadata::NFetcher::ISnapshotAcceptorController::TPtr controller) const override;
-
- TSnapshotConstructor();
-};
-
-}
diff --git a/ydb/core/tx/tiering/fetcher.cpp b/ydb/core/tx/tiering/fetcher.cpp
new file mode 100644
index 00000000000..ddb51424448
--- /dev/null
+++ b/ydb/core/tx/tiering/fetcher.cpp
@@ -0,0 +1,3 @@
+#include "fetcher.h"
+
+namespace NKikimr::NColumnShard {}
diff --git a/ydb/core/tx/tiering/fetcher.h b/ydb/core/tx/tiering/fetcher.h
new file mode 100644
index 00000000000..38fd0865537
--- /dev/null
+++ b/ydb/core/tx/tiering/fetcher.h
@@ -0,0 +1,198 @@
+#pragma once
+
+#include <ydb/core/base/path.h>
+#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
+#include <ydb/core/tx/scheme_cache/scheme_cache.h>
+#include <ydb/core/tx/tiering/common.h>
+#include <ydb/core/tx/tiering/tier/object.h>
+
+#include <ydb/library/actors/core/actor_bootstrapped.h>
+
+#include <util/string/vector.h>
+
+namespace NKikimr::NColumnShard {
+
+namespace NTiers {
+
+class TEvWatchSchemeObject: public TEventLocal<TEvWatchSchemeObject, NTiers::EvWatchSchemeObject> {
+private:
+ YDB_READONLY_DEF(std::vector<TString>, ObjectPaths);
+
+public:
+ TEvWatchSchemeObject(std::vector<TString> paths)
+ : ObjectPaths(std::move(paths)) {
+ }
+};
+
+class TEvNotifySchemeObjectUpdated: public TEventLocal<TEvNotifySchemeObjectUpdated, NTiers::EvNotifySchemeObjectUpdated> {
+private:
+ YDB_READONLY_DEF(TString, ObjectPath);
+ YDB_READONLY_DEF(NKikimrSchemeOp::TPathDescription, Description);
+
+public:
+ TEvNotifySchemeObjectUpdated(const TString& path, NKikimrSchemeOp::TPathDescription description)
+ : ObjectPath(path)
+ , Description(std::move(description)) {
+ }
+};
+
+class TEvNotifySchemeObjectDeleted: public TEventLocal<TEvNotifySchemeObjectDeleted, NTiers::EvNotifySchemeObjectDeleted> {
+private:
+ YDB_READONLY_DEF(TString, ObjectPath);
+
+public:
+ TEvNotifySchemeObjectDeleted(TString path)
+ : ObjectPath(std::move(path)) {
+ }
+};
+
+class TEvSchemeObjectResolutionFailed: public TEventLocal<TEvSchemeObjectResolutionFailed, NTiers::EvSchemeObjectResulutionFailed> {
+public:
+ enum EReason {
+ NOT_FOUND = 0,
+ LOOKUP_ERROR = 1
+ };
+
+private:
+ YDB_READONLY_DEF(TString, ObjectPath);
+ YDB_READONLY_DEF(EReason, Reason);
+
+public:
+ TEvSchemeObjectResolutionFailed(TString path, const EReason reason)
+ : ObjectPath(std::move(path))
+ , Reason(reason) {
+ }
+};
+
+} // namespace NTiers
+
+class TSchemeObjectWatcher: public TActorBootstrapped<TSchemeObjectWatcher> {
+private:
+ TActorId Owner;
+ THashSet<TPathId> WatchedPathIds;
+
+private:
+ THolder<NSchemeCache::TSchemeCacheNavigate> BuildSchemeCacheNavigateRequest(
+ const TVector<TVector<TString>>& paths, TIntrusiveConstPtr<NACLib::TUserToken> userToken) {
+ auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
+ request->DatabaseName = AppDataVerified().TenantName;
+ if (userToken && !userToken->GetSerializedToken().empty()) {
+ request->UserToken = userToken;
+ }
+
+ for (const auto& pathComponents : paths) {
+ auto& entry = request->ResultSet.emplace_back();
+ entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpPath;
+ entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByPath;
+ entry.ShowPrivatePath = true;
+ entry.Path = pathComponents;
+ }
+
+ return request;
+ }
+
+ void WatchObjects(const std::vector<TString>& paths) {
+ TVector<TVector<TString>> splitPaths;
+ for (const TString& path : paths) {
+ splitPaths.emplace_back(SplitPath(path));
+ }
+
+ auto event = BuildSchemeCacheNavigateRequest(
+ std::move(splitPaths), MakeIntrusive<NACLib::TUserToken>(BUILTIN_ACL_METADATA, TVector<NACLib::TSID>{}));
+ Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(event.Release()), IEventHandle::FlagTrackDelivery);
+ }
+
+ void WatchPathId(const TPathId& pathId) {
+ if (WatchedPathIds.emplace(pathId).second) {
+ Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchPathId(pathId), IEventHandle::FlagTrackDelivery);
+ } else {
+ AFL_DEBUG(NKikimrServices::TX_TIERING)("event", "skip_watch_path_id")("reason", "already_subscribed")("path", pathId.ToString());
+ }
+ }
+
+ void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
+ const NSchemeCache::TSchemeCacheNavigate* result = ev->Get()->Request.Get();
+ for (auto entry : result->ResultSet) {
+ AFL_DEBUG(NKikimrServices::TX_TIERING)("component", "TSchemeObjectWatcher")("event", ev->ToString())("path", JoinPath(entry.Path));
+ switch (entry.Status) {
+ case NSchemeCache::TSchemeCacheNavigate::EStatus::Ok:
+ WatchPathId(entry.TableId.PathId);
+ break;
+
+ case NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown:
+ case NSchemeCache::TSchemeCacheNavigate::EStatus::RootUnknown:
+ Send(Owner, new NTiers::TEvSchemeObjectResolutionFailed(
+ JoinPath(entry.Path), NTiers::TEvSchemeObjectResolutionFailed::EReason::NOT_FOUND));
+ break;
+
+ case NSchemeCache::TSchemeCacheNavigate::EStatus::RedirectLookupError:
+ case NSchemeCache::TSchemeCacheNavigate::EStatus::LookupError:
+ Send(Owner, new NTiers::TEvSchemeObjectResolutionFailed(
+ JoinPath(entry.Path), NTiers::TEvSchemeObjectResolutionFailed::EReason::LOOKUP_ERROR));
+ break;
+
+ case NSchemeCache::TSchemeCacheNavigate::EStatus::AccessDenied:
+ case NSchemeCache::TSchemeCacheNavigate::EStatus::PathNotTable:
+ case NSchemeCache::TSchemeCacheNavigate::EStatus::PathNotPath:
+ case NSchemeCache::TSchemeCacheNavigate::EStatus::TableCreationNotComplete:
+ case NSchemeCache::TSchemeCacheNavigate::EStatus::Unknown:
+ AFL_VERIFY(false)("entry", entry.ToString());
+ }
+ }
+ }
+
+ void Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev) {
+ AFL_DEBUG(NKikimrServices::TX_TIERING)("event", "object_fetched")("path", ev->Get()->Path);
+ const auto& describeResult = *ev->Get()->Result;
+ Send(Owner, new NTiers::TEvNotifySchemeObjectUpdated(describeResult.GetPath(), describeResult.GetPathDescription()));
+ }
+
+ void Handle(TEvTxProxySchemeCache::TEvWatchNotifyDeleted::TPtr& ev) {
+ const auto& record = ev->Get();
+ const TString name = TString(ExtractBase(record->Path));
+ const TString storageDir = TString(ExtractParent(record->Path));
+ AFL_DEBUG(NKikimrServices::TX_TIERING)("event", "object_deleted")("path", record->Path);
+ AFL_VERIFY(WatchedPathIds.erase(record->PathId));
+ Send(Owner, new NTiers::TEvNotifySchemeObjectDeleted(record->Path));
+ }
+
+ void Handle(NTiers::TEvWatchSchemeObject::TPtr& ev) {
+ AFL_DEBUG(NKikimrServices::TX_TIERING)("event", "watch_scheme_objects")(
+ "names", JoinStrings(ev->Get()->GetObjectPaths().begin(), ev->Get()->GetObjectPaths().end(), ","));
+ WatchObjects(ev->Get()->GetObjectPaths());
+ }
+
+ void Handle(NActors::TEvents::TEvPoison::TPtr& /*ev*/) {
+ Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchRemove());
+ PassAway();
+ }
+
+ void Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) {
+ AFL_WARN(NKikimrServices::TX_TIERING)("error", "event_undelivered_to_scheme_cache")("reason", ev->Get()->Reason);
+ }
+
+public:
+ TSchemeObjectWatcher(TActorId owner)
+ : Owner(owner) {
+ }
+
+ STATEFN(StateMain) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
+ hFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle);
+ hFunc(TEvTxProxySchemeCache::TEvWatchNotifyDeleted, Handle);
+ IgnoreFunc(TEvTxProxySchemeCache::TEvWatchNotifyUnavailable);
+ hFunc(NTiers::TEvWatchSchemeObject, Handle);
+ hFunc(NActors::TEvents::TEvPoison, Handle);
+ hFunc(NActors::TEvents::TEvUndelivered, Handle);
+ default:
+ break;
+ }
+ }
+
+ void Bootstrap() {
+ Become(&TSchemeObjectWatcher::StateMain);
+ }
+};
+
+} // namespace NKikimr::NColumnShard
diff --git a/ydb/core/tx/tiering/manager.cpp b/ydb/core/tx/tiering/manager.cpp
index dbe06df7e0d..d937aaa9542 100644
--- a/ydb/core/tx/tiering/manager.cpp
+++ b/ydb/core/tx/tiering/manager.cpp
@@ -1,109 +1,174 @@
#include "common.h"
#include "manager.h"
-#include "external_data.h"
#include <ydb/core/tx/columnshard/columnshard_private_events.h>
+#include <ydb/core/tx/tiering/fetcher.h>
+
+#include <ydb/library/table_creator/table_creator.h>
#include <ydb/services/metadata/secret/fetcher.h>
+#include <library/cpp/retry/retry_policy.h>
+#include <util/string/vector.h>
+
namespace NKikimr::NColumnShard {
class TTiersManager::TActor: public TActorBootstrapped<TTiersManager::TActor> {
private:
+ using IRetryPolicy = IRetryPolicy<>;
+
std::shared_ptr<TTiersManager> Owner;
+ IRetryPolicy::TPtr RetryPolicy;
+ THashMap<TString, IRetryPolicy::IRetryState::TPtr> RetryStateByObject;
NMetadata::NFetcher::ISnapshotsFetcher::TPtr SecretsFetcher;
- std::shared_ptr<NMetadata::NSecret::TSnapshot> SecretsSnapshot;
- std::shared_ptr<NTiers::TTiersSnapshot> ConfigsSnapshot;
+ TActorId TiersFetcher;
+
+private:
TActorId GetExternalDataActorId() const {
return NMetadata::NProvider::MakeServiceId(SelfId().NodeId());
}
-public:
- TActor(std::shared_ptr<TTiersManager> owner)
- : Owner(owner)
- , SecretsFetcher(std::make_shared<NMetadata::NSecret::TSnapshotsFetcher>())
- {
+ void OnInvalidTierConfig(const TString& path) {
+ if (!Owner->TierRefCount.contains(path)) {
+ ResetRetryState(path);
+ return;
+ }
+ AFL_DEBUG(NKikimrServices::TX_TIERING)("component", "tiers_manager")("event", "retry_watch_objects");
+ auto findRetryState = RetryStateByObject.find(path);
+ if (!findRetryState) {
+ findRetryState = RetryStateByObject.emplace(path, RetryPolicy->CreateRetryState()).first;
+ }
+ auto retryDelay = findRetryState->second->GetNextRetryDelay();
+ AFL_VERIFY(retryDelay)("object", path);
+ ActorContext().Schedule(*retryDelay, std::make_unique<IEventHandle>(SelfId(), TiersFetcher, new NTiers::TEvWatchSchemeObject(std::vector<TString>({ path }))));
}
- ~TActor() {
- Owner->Stop(false);
+
+ void ResetRetryState(const TString& path) {
+ RetryStateByObject.erase(path);
+ }
+
+ void OnFetchingFailure(const TString& path) {
+ if (Owner->TierRefCount.contains(path)) {
+ OnInvalidTierConfig(path);
+ }
}
STATEFN(StateMain) {
switch (ev->GetTypeRewrite()) {
hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, Handle);
hFunc(NActors::TEvents::TEvPoison, Handle);
+ hFunc(NTiers::TEvNotifySchemeObjectUpdated, Handle);
+ hFunc(NTiers::TEvNotifySchemeObjectDeleted, Handle);
+ hFunc(NTiers::TEvSchemeObjectResolutionFailed, Handle);
+ hFunc(NTiers::TEvWatchSchemeObject, Handle);
default:
break;
}
}
- void Bootstrap() {
- Become(&TThis::StateMain);
- AFL_INFO(NKikimrServices::TX_TIERING)("event", "start_subscribing_metadata");
- Send(GetExternalDataActorId(), new NMetadata::NProvider::TEvSubscribeExternal(Owner->GetExternalDataManipulation()));
- Send(GetExternalDataActorId(), new NMetadata::NProvider::TEvSubscribeExternal(SecretsFetcher));
- }
-
void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev) {
auto snapshot = ev->Get()->GetSnapshot();
- if (auto configs = std::dynamic_pointer_cast<NTiers::TTiersSnapshot>(snapshot)) {
- AFL_DEBUG(NKikimrServices::TX_TIERING)("event", "TEvRefreshSubscriberData")("snapshot", "configs");
- ConfigsSnapshot = configs;
- if (SecretsSnapshot) {
- Owner->TakeConfigs(ConfigsSnapshot, SecretsSnapshot);
- } else {
- ALS_DEBUG(NKikimrServices::TX_TIERING) << "Waiting secrets for update at tablet " << Owner->TabletId;
- }
- } else if (auto secrets = std::dynamic_pointer_cast<NMetadata::NSecret::TSnapshot>(snapshot)) {
+ if (auto secrets = std::dynamic_pointer_cast<NMetadata::NSecret::TSnapshot>(snapshot)) {
AFL_DEBUG(NKikimrServices::TX_TIERING)("event", "TEvRefreshSubscriberData")("snapshot", "secrets");
- SecretsSnapshot = secrets;
- if (ConfigsSnapshot) {
- Owner->TakeConfigs(ConfigsSnapshot, SecretsSnapshot);
- } else {
- ALS_DEBUG(NKikimrServices::TX_TIERING) << "Waiting configs for update at tablet " << Owner->TabletId;
- }
+ Owner->UpdateSecretsSnapshot(secrets);
} else {
- Y_ABORT_UNLESS(false, "unexpected behaviour");
+ AFL_VERIFY(false);
}
}
void Handle(NActors::TEvents::TEvPoison::TPtr& /*ev*/) {
- Send(GetExternalDataActorId(), new NMetadata::NProvider::TEvUnsubscribeExternal(Owner->GetExternalDataManipulation()));
Send(GetExternalDataActorId(), new NMetadata::NProvider::TEvUnsubscribeExternal(SecretsFetcher));
PassAway();
}
+
+ void Handle(NTiers::TEvNotifySchemeObjectUpdated::TPtr& ev) {
+ AFL_DEBUG(NKikimrServices::TX_TIERING)("component", "tiering_manager")("event", "object_updated")("path", ev->Get()->GetObjectPath());
+ const TString& objectPath = ev->Get()->GetObjectPath();
+ const auto& description = ev->Get()->GetDescription();
+ ResetRetryState(objectPath);
+ if (description.GetSelf().GetPathType() == NKikimrSchemeOp::EPathTypeExternalDataSource) {
+ NTiers::TTierConfig tier;
+ if (const auto status = tier.DeserializeFromProto(description.GetExternalDataSourceDescription()); status.IsFail()) {
+ AFL_WARN(NKikimrServices::TX_TIERING)("event", "fetched_invalid_tier_settings")("error", status.GetErrorMessage());
+ OnInvalidTierConfig(objectPath);
+ return;
+ }
+ Owner->UpdateTierConfig(tier, objectPath);
+ } else {
+ AFL_WARN(false)("error", "invalid_object_type")("type", static_cast<ui64>(description.GetSelf().GetPathType()))("path", objectPath);
+ OnInvalidTierConfig(objectPath);
+ }
+ }
+
+ void Handle(NTiers::TEvNotifySchemeObjectDeleted::TPtr& ev) {
+ AFL_DEBUG(NKikimrServices::TX_TIERING)("component", "tiering_manager")("event", "object_deleted")("name", ev->Get()->GetObjectPath());
+ OnInvalidTierConfig(ev->Get()->GetObjectPath());
+ }
+
+ void Handle(NTiers::TEvSchemeObjectResolutionFailed::TPtr& ev) {
+ const TString objectPath = ev->Get()->GetObjectPath();
+ AFL_WARN(NKikimrServices::TX_TIERING)("event", "object_resolution_failed")("path", objectPath)(
+ "reason", static_cast<ui64>(ev->Get()->GetReason()));
+ OnInvalidTierConfig(objectPath);
+ }
+
+ void Handle(NTiers::TEvWatchSchemeObject::TPtr& ev) {
+ Send(TiersFetcher, ev->Release());
+ }
+
+public:
+ TActor(std::shared_ptr<TTiersManager> owner)
+ : Owner(owner)
+ , RetryPolicy(IRetryPolicy::GetExponentialBackoffPolicy(
+ []() {
+ return ERetryErrorClass::ShortRetry;
+ },
+ TDuration::MilliSeconds(10), TDuration::MilliSeconds(200), TDuration::Seconds(30), 10))
+ , SecretsFetcher(std::make_shared<NMetadata::NSecret::TSnapshotsFetcher>()) {
+ }
+
+ void Bootstrap() {
+ AFL_INFO(NKikimrServices::TX_TIERING)("event", "start_subscribing_metadata");
+ TiersFetcher = Register(new TSchemeObjectWatcher(SelfId()));
+ Send(GetExternalDataActorId(), new NMetadata::NProvider::TEvSubscribeExternal(SecretsFetcher));
+ Become(&TThis::StateMain);
+ }
+
+ ~TActor() {
+ Owner->Stop(false);
+ }
};
namespace NTiers {
TManager& TManager::Restart(const TTierConfig& config, std::shared_ptr<NMetadata::NSecret::TSnapshot> secrets) {
- ALS_DEBUG(NKikimrServices::TX_TIERING) << "Restarting tier '" << GetTierName() << "' at tablet " << TabletId;
- if (Config.IsSame(config)) {
- return *this;
- }
+ ALS_DEBUG(NKikimrServices::TX_TIERING) << "Restarting tier '" << TierName << "' at tablet " << TabletId;
Stop();
- Config = config;
- Start(secrets);
+ Start(config, secrets);
return *this;
}
bool TManager::Stop() {
S3Settings.reset();
- ALS_DEBUG(NKikimrServices::TX_TIERING) << "Tier '" << GetTierName() << "' stopped at tablet " << TabletId;
+ ALS_DEBUG(NKikimrServices::TX_TIERING) << "Tier '" << TierName << "' stopped at tablet " << TabletId;
return true;
}
-bool TManager::Start(std::shared_ptr<NMetadata::NSecret::TSnapshot> secrets) {
- AFL_VERIFY(!S3Settings)("tier", GetTierName())("event", "already started");
- S3Settings = Config.GetPatchedConfig(secrets);
- ALS_DEBUG(NKikimrServices::TX_TIERING) << "Tier '" << GetTierName() << "' started at tablet " << TabletId;
+bool TManager::Start(const TTierConfig& config, std::shared_ptr<NMetadata::NSecret::TSnapshot> secrets) {
+ AFL_VERIFY(!S3Settings)("tier", TierName)("event", "already started");
+ auto patchedConfig = config.GetPatchedConfig(secrets);
+ if (patchedConfig.IsFail()) {
+ AFL_ERROR(NKikimrServices::TX_TIERING)("error", "cannot_read_secrets")("reason", patchedConfig.GetErrorMessage());
+ return false;
+ }
+ S3Settings = patchedConfig.DetachResult();
+ ALS_DEBUG(NKikimrServices::TX_TIERING) << "Tier '" << TierName << "' started at tablet " << TabletId;
return true;
}
-TManager::TManager(const ui64 tabletId, const NActors::TActorId& tabletActorId, const TTierConfig& config)
+TManager::TManager(const ui64 tabletId, const NActors::TActorId& tabletActorId, const TString& tierName)
: TabletId(tabletId)
, TabletActorId(tabletActorId)
- , Config(config)
-{
+ , TierName(tierName) {
}
NArrow::NSerialization::TSerializerContainer ConvertCompression(const NKikimrSchemeOp::TCompressionOptions& compressionProto) {
@@ -119,40 +184,68 @@ NArrow::NSerialization::TSerializerContainer ConvertCompression(const NKikimrSch
}
}
-void TTiersManager::TakeConfigs(NMetadata::NFetcher::ISnapshot::TPtr snapshotExt, std::shared_ptr<NMetadata::NSecret::TSnapshot> secrets) {
- ALS_INFO(NKikimrServices::TX_TIERING) << "Take configs:"
- << (snapshotExt ? " snapshots" : "") << (secrets ? " secrets" : "") << " at tablet " << TabletId;
+TTiersManager::TTierRefGuard::TTierRefGuard(const TString& tierName, TTiersManager& owner)
+ : TierName(tierName)
+ , Owner(&owner) {
+ if (!Owner->TierRefCount.contains(TierName)) {
+ Owner->RegisterTier(tierName);
+ }
+ ++Owner->TierRefCount[TierName];
+}
- auto snapshotPtr = std::dynamic_pointer_cast<NTiers::TTiersSnapshot>(snapshotExt);
- Y_ABORT_UNLESS(snapshotPtr);
- Snapshot = snapshotExt;
- Secrets = secrets;
- auto& snapshot = *snapshotPtr;
- for (auto itSelf = Managers.begin(); itSelf != Managers.end(); ) {
- auto it = snapshot.GetTierConfigs().find(itSelf->first);
- if (it == snapshot.GetTierConfigs().end()) {
- itSelf->second.Stop();
- itSelf = Managers.erase(itSelf);
- } else {
- itSelf->second.Restart(it->second, Secrets);
- ++itSelf;
+TTiersManager::TTierRefGuard::~TTierRefGuard() {
+ if (Owner) {
+ auto findTier = Owner->TierRefCount.FindPtr(TierName);
+ AFL_VERIFY(findTier);
+ AFL_VERIFY(*findTier);
+ --*findTier;
+ if (!*findTier) {
+ AFL_VERIFY(Owner->TierRefCount.erase(TierName));
+ Owner->UnregisterTier(TierName);
}
}
- for (auto&& i : snapshot.GetTierConfigs()) {
- auto tierName = i.second.GetTierName();
- ALS_DEBUG(NKikimrServices::TX_TIERING) << "Take config for tier '" << tierName << "' at tablet " << TabletId;
- if (Managers.contains(tierName)) {
- ALS_DEBUG(NKikimrServices::TX_TIERING) << "Ignore tier '" << tierName << "' at tablet " << TabletId;
- continue;
+}
+
+void TTiersManager::OnConfigsUpdated(bool notifyShard) {
+ for (auto& [tierName, manager] : Managers) {
+ auto* findTierConfig = TierConfigs.FindPtr(tierName);
+ if (Secrets && findTierConfig) {
+ if (manager.IsReady()) {
+ manager.Restart(*findTierConfig, Secrets);
+ } else {
+ manager.Start(*findTierConfig, Secrets);
+ }
+ } else {
+ AFL_DEBUG(NKikimrServices::TX_TIERING)("event", "skip_tier_manager_reloading")("tier", tierName)("has_secrets", !!Secrets)(
+ "found_tier_config", !!findTierConfig);
}
- NTiers::TManager localManager(TabletId, TabletActorId, i.second);
- auto itManager = Managers.emplace(tierName, std::move(localManager)).first;
- itManager->second.Start(Secrets);
}
- if (ShardCallback && TlsActivationContext) {
+ if (notifyShard && ShardCallback && TlsActivationContext) {
ShardCallback(TActivationContext::AsActorContext());
}
+
+ AFL_DEBUG(NKikimrServices::TX_TIERING)("event", "configs_updated")("configs", DebugString());
+}
+
+void TTiersManager::RegisterTier(const TString& name) {
+ auto emplaced = Managers.emplace(name, NTiers::TManager(TabletId, TabletActorId, name));
+ AFL_VERIFY(emplaced.second);
+
+ auto* findConfig = TierConfigs.FindPtr(name);
+ if (Secrets && findConfig) {
+ emplaced.first->second.Start(*findConfig, Secrets);
+ } else {
+ AFL_DEBUG(NKikimrServices::TX_TIERING)("event", "skip_tier_manager_start")("tier", name)("has_secrets", !!Secrets)(
+ "found_tier_config", !!findConfig);
+ }
+}
+
+void TTiersManager::UnregisterTier(const TString& name) {
+ auto findManager = Managers.find(name);
+ AFL_VERIFY(findManager != Managers.end());
+ findManager->second.Stop();
+ Managers.erase(findManager);
}
TTiersManager& TTiersManager::Start(std::shared_ptr<TTiersManager> ownerPtr) {
@@ -185,11 +278,48 @@ const NTiers::TManager* TTiersManager::GetManagerOptional(const TString& tierId)
}
}
-NMetadata::NFetcher::ISnapshotsFetcher::TPtr TTiersManager::GetExternalDataManipulation() const {
- if (!ExternalDataManipulation) {
- ExternalDataManipulation = std::make_shared<NTiers::TSnapshotConstructor>();
+void TTiersManager::EnablePathId(const ui64 pathId, const THashSet<TString>& usedTiers) {
+ AFL_VERIFY(Actor)("error", "tiers_manager_is_not_started");
+ auto& tierRefs = UsedTiers[pathId];
+ tierRefs.clear();
+ for (const TString& tierName : usedTiers) {
+ AFL_VERIFY(tierName == CanonizePath(tierName))("current", tierName)("canonized", CanonizePath(tierName));
+ tierRefs.emplace_back(tierName, *this);
+ if (!TierConfigs.contains(tierName)) {
+ const auto& actorContext = NActors::TActivationContext::AsActorContext();
+ AFL_VERIFY(&actorContext)("error", "no_actor_context");
+ actorContext.Send(Actor->SelfId(), new NTiers::TEvWatchSchemeObject({ tierName }));
+ }
}
- return ExternalDataManipulation;
+ OnConfigsUpdated(false);
+}
+
+void TTiersManager::DisablePathId(const ui64 pathId) {
+ UsedTiers.erase(pathId);
+ OnConfigsUpdated(false);
+}
+
+void TTiersManager::UpdateSecretsSnapshot(std::shared_ptr<NMetadata::NSecret::TSnapshot> secrets) {
+ AFL_INFO(NKikimrServices::TX_TIERING)("event", "update_secrets")("tablet", TabletId);
+ AFL_VERIFY(secrets);
+ Secrets = secrets;
+ OnConfigsUpdated();
+}
+
+void TTiersManager::UpdateTierConfig(const NTiers::TTierConfig& config, const TString& tierName, const bool notifyShard) {
+ AFL_INFO(NKikimrServices::TX_TIERING)("event", "update_tier_config")("name", tierName)("tablet", TabletId);
+ AFL_VERIFY(tierName == CanonizePath(tierName))("current", tierName)("canonized", CanonizePath(tierName));
+ TierConfigs[tierName] = config;
+ OnConfigsUpdated(notifyShard);
+}
+
+bool TTiersManager::AreConfigsComplete() const {
+ for (const auto& [tier, cnt] : TierRefCount) {
+ if (!TierConfigs.contains(tier)) {
+ return false;
+ }
+ }
+ return true;
}
TActorId TTiersManager::GetActorId() const {
@@ -200,4 +330,36 @@ TActorId TTiersManager::GetActorId() const {
}
}
+TString TTiersManager::DebugString() {
+ TStringBuilder sb;
+ sb << "TIERS=";
+ if (TierConfigs) {
+ sb << "{";
+ for (const auto& [name, config] : TierConfigs) {
+ sb << name << ";";
+ }
+ sb << "}";
+ }
+ sb << ";USED_TIERS=";
+ {
+ sb << "{";
+ for (const auto& [pathId, tiers] : UsedTiers) {
+ sb << pathId << ":{";
+ for (const auto& tierRef : tiers) {
+ sb << tierRef.GetTierName() << ";";
+ }
+ sb << "}";
+ }
+ sb << "}";
+ }
+ sb << ";SECRETS=";
+ if (Secrets) {
+ sb << "{";
+ for (const auto& [name, config] : Secrets->GetSecrets()) {
+ sb << name.SerializeToString() << ";";
+ }
+ sb << "}";
+ }
+ return sb;
+}
}
diff --git a/ydb/core/tx/tiering/manager.h b/ydb/core/tx/tiering/manager.h
index d0a464e40e6..2aa78f597b7 100644
--- a/ydb/core/tx/tiering/manager.h
+++ b/ydb/core/tx/tiering/manager.h
@@ -1,18 +1,15 @@
#pragma once
-#include "external_data.h"
+#include "common.h"
#include "abstract/manager.h"
-#include <ydb/library/actors/core/actor_bootstrapped.h>
-#include <ydb/library/actors/core/actor.h>
-
#include <ydb/core/formats/arrow/serializer/abstract.h>
+#include <ydb/core/tx/tiering/tier/object.h>
+
#include <ydb/public/sdk/cpp/client/ydb_types/s3_settings.h>
#include <ydb/services/metadata/secret/snapshot.h>
#include <ydb/services/metadata/service.h>
-#include <ydb/library/accessor/accessor.h>
-
#include <functional>
namespace NKikimr::NColumnShard {
@@ -24,9 +21,9 @@ NArrow::NSerialization::TSerializerContainer ConvertCompression(const NKikimrSch
class TManager {
private:
ui64 TabletId = 0;
- YDB_READONLY_DEF(NActors::TActorId, TabletActorId);
- YDB_READONLY_DEF(TTierConfig, Config);
- YDB_READONLY_DEF(NActors::TActorId, StorageActorId);
+ NActors::TActorId TabletActorId;
+ TString TierName;
+ NActors::TActorId StorageActorId;
std::optional<NKikimrSchemeOp::TS3Settings> S3Settings;
public:
const NKikimrSchemeOp::TS3Settings& GetS3Settings() const {
@@ -34,60 +31,89 @@ public:
return *S3Settings;
}
- TManager(const ui64 tabletId, const NActors::TActorId& tabletActorId, const TTierConfig& config);
+ TManager(const ui64 tabletId, const NActors::TActorId& tabletActorId, const TString& tierName);
+ bool IsReady() const {
+ return !!S3Settings;
+ }
TManager& Restart(const TTierConfig& config, std::shared_ptr<NMetadata::NSecret::TSnapshot> secrets);
bool Stop();
- bool Start(std::shared_ptr<NMetadata::NSecret::TSnapshot> secrets);
-
- TString GetTierName() const {
- return GetConfig().GetTierName();
- }
+ bool Start(const TTierConfig& config, std::shared_ptr<NMetadata::NSecret::TSnapshot> secrets);
};
}
class TTiersManager: public ITiersManager {
private:
+ friend class TTierRef;
+ class TTierRefGuard: public TMoveOnly {
+ private:
+ YDB_READONLY_DEF(TString, TierName);
+ TTiersManager* Owner;
+
+ public:
+ TTierRefGuard(const TString& tierName, TTiersManager& owner);
+ ~TTierRefGuard();
+
+ TTierRefGuard(TTierRefGuard&& other)
+ : TierName(other.TierName)
+ , Owner(other.Owner) {
+ other.Owner = nullptr;
+ }
+ TTierRefGuard& operator=(TTierRefGuard&& other) {
+ std::swap(Owner, other.Owner);
+ std::swap(TierName, other.TierName);
+ return *this;
+ }
+ };
+
+private:
class TActor;
+ friend class TActor;
using TManagers = std::map<TString, NTiers::TManager>;
+
ui64 TabletId = 0;
const TActorId TabletActorId;
std::function<void(const TActorContext& ctx)> ShardCallback;
- TActor* Actor = nullptr;
+ IActor* Actor = nullptr;
TManagers Managers;
- std::shared_ptr<NMetadata::NSecret::TSnapshot> Secrets;
- NMetadata::NFetcher::ISnapshot::TPtr Snapshot;
- mutable NMetadata::NFetcher::ISnapshotsFetcher::TPtr ExternalDataManipulation;
+ using TTierRefCount = THashMap<TString, ui64>;
+ using TTierRefsByPathId = THashMap<ui64, std::vector<TTierRefGuard>>;
+ YDB_READONLY_DEF(TTierRefCount, TierRefCount);
+ YDB_READONLY_DEF(TTierRefsByPathId, UsedTiers);
+
+ using TTierByName = THashMap<TString, NTiers::TTierConfig>;
+ YDB_READONLY_DEF(TTierByName, TierConfigs);
+ YDB_READONLY_DEF(std::shared_ptr<NMetadata::NSecret::TSnapshot>, Secrets);
+
+private:
+ void OnConfigsUpdated(bool notifyShard = true);
+ void RegisterTier(const TString& name);
+ void UnregisterTier(const TString& name);
public:
- TTiersManager(const ui64 tabletId, const TActorId& tabletActorId,
- std::function<void(const TActorContext& ctx)> shardCallback = {})
+ TTiersManager(const ui64 tabletId, const TActorId& tabletActorId, std::function<void(const TActorContext& ctx)> shardCallback = {})
: TabletId(tabletId)
, TabletActorId(tabletActorId)
, ShardCallback(shardCallback)
- {
+ , Secrets(std::make_shared<NMetadata::NSecret::TSnapshot>(TInstant::Zero())) {
}
TActorId GetActorId() const;
- void TakeConfigs(NMetadata::NFetcher::ISnapshot::TPtr snapshot, std::shared_ptr<NMetadata::NSecret::TSnapshot> secrets);
- void EnablePathId(const ui64 /*pathId*/, const THashSet<TString>& /*usedTiers*/) {
- }
- void DisablePathId(const ui64 /*pathId*/) {
- }
+ void EnablePathId(const ui64 pathId, const THashSet<TString>& usedTiers);
+ void DisablePathId(const ui64 pathId);
- bool IsReady() const {
- return !!Snapshot;
- }
+ void UpdateSecretsSnapshot(std::shared_ptr<NMetadata::NSecret::TSnapshot> secrets);
+ void UpdateTierConfig(const NTiers::TTierConfig& config, const TString& tierName, const bool notifyShard = true);
+ bool AreConfigsComplete() const;
+
+ TString DebugString();
TTiersManager& Start(std::shared_ptr<TTiersManager> ownerPtr);
TTiersManager& Stop(const bool needStopActor);
virtual const std::map<TString, NTiers::TManager>& GetManagers() const override {
- AFL_VERIFY(IsReady());
return Managers;
}
virtual const NTiers::TManager* GetManagerOptional(const TString& tierId) const override;
- NMetadata::NFetcher::ISnapshotsFetcher::TPtr GetExternalDataManipulation() const;
-
};
}
diff --git a/ydb/core/tx/tiering/snapshot.cpp b/ydb/core/tx/tiering/snapshot.cpp
deleted file mode 100644
index d64987b5b62..00000000000
--- a/ydb/core/tx/tiering/snapshot.cpp
+++ /dev/null
@@ -1,36 +0,0 @@
-#include "snapshot.h"
-
-#include <ydb/core/base/path.h>
-
-#include <library/cpp/json/writer/json_value.h>
-#include <library/cpp/protobuf/json/proto2json.h>
-
-#include <util/string/join.h>
-
-namespace NKikimr::NColumnShard::NTiers {
-
-bool TTiersSnapshot::DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawDataResult) {
- Y_ABORT_UNLESS(rawDataResult.result_sets().size() == 1);
- ParseSnapshotObjects<TTierConfig>(rawDataResult.result_sets()[0], [this](TTierConfig&& s) {TierConfigs.emplace(s.GetTierName(), s); });
- return true;
-}
-
-std::optional<TTierConfig> TTiersSnapshot::GetTierById(const TString& tierName) const {
- auto it = TierConfigs.find(tierName);
- if (it == TierConfigs.end()) {
- return {};
- } else {
- return it->second;
- }
-}
-
-TString NTiers::TTiersSnapshot::DoSerializeToString() const {
- NJson::TJsonValue result = NJson::JSON_MAP;
- auto& jsonTiers = result.InsertValue("tiers", NJson::JSON_MAP);
- for (auto&& i : TierConfigs) {
- jsonTiers.InsertValue(i.first, i.second.GetDebugJson());
- }
- return result.GetStringRobust();
-}
-
-}
diff --git a/ydb/core/tx/tiering/snapshot.h b/ydb/core/tx/tiering/snapshot.h
deleted file mode 100644
index 4eea9921c56..00000000000
--- a/ydb/core/tx/tiering/snapshot.h
+++ /dev/null
@@ -1,25 +0,0 @@
-#pragma once
-#include <ydb/core/protos/flat_scheme_op.pb.h>
-#include <ydb/core/tx/scheme_cache/scheme_cache.h>
-#include <ydb/core/tx/tiering/tier/object.h>
-
-#include <ydb/services/metadata/service.h>
-
-#include <library/cpp/json/writer/json_value.h>
-
-namespace NKikimr::NColumnShard::NTiers {
-
-class TTiersSnapshot: public NMetadata::NFetcher::ISnapshot {
-private:
- using TBase = NMetadata::NFetcher::ISnapshot;
- using TConfigsMap = TMap<TString, TTierConfig>;
- YDB_ACCESSOR_DEF(TConfigsMap, TierConfigs);
-protected:
- virtual bool DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawData) override;
- virtual TString DoSerializeToString() const override;
-public:
- std::optional<TTierConfig> GetTierById(const TString& tierName) const;
- using TBase::TBase;
-};
-
-}
diff --git a/ydb/core/tx/tiering/tier/behaviour.cpp b/ydb/core/tx/tiering/tier/behaviour.cpp
deleted file mode 100644
index 01b3d13290f..00000000000
--- a/ydb/core/tx/tiering/tier/behaviour.cpp
+++ /dev/null
@@ -1,33 +0,0 @@
-#include "behaviour.h"
-#include "manager.h"
-#include "initializer.h"
-
-#include <ydb/core/tx/tiering/tier/checker.h>
-
-#include <ydb/services/metadata/manager/ydb_value_operator.h>
-#include <ydb/services/metadata/secret/fetcher.h>
-
-#include <library/cpp/json/writer/json_value.h>
-#include <library/cpp/protobuf/json/proto2json.h>
-
-namespace NKikimr::NColumnShard::NTiers {
-
-TTierConfigBehaviour::TFactory::TRegistrator<TTierConfigBehaviour> TTierConfigBehaviour::Registrator(TTierConfig::GetTypeId());
-
-TString TTierConfigBehaviour::GetInternalStorageTablePath() const {
- return "tiering/tiers";
-}
-
-NMetadata::NModifications::IOperationsManager::TPtr TTierConfigBehaviour::ConstructOperationsManager() const {
- return std::make_shared<TTiersManager>();
-}
-
-NMetadata::NInitializer::IInitializationBehaviour::TPtr TTierConfigBehaviour::ConstructInitializer() const {
- return std::make_shared<TTiersInitializer>();
-}
-
-TString TTierConfigBehaviour::GetTypeId() const {
- return TTierConfig::GetTypeId();
-}
-
-}
diff --git a/ydb/core/tx/tiering/tier/behaviour.h b/ydb/core/tx/tiering/tier/behaviour.h
deleted file mode 100644
index fd231708fff..00000000000
--- a/ydb/core/tx/tiering/tier/behaviour.h
+++ /dev/null
@@ -1,20 +0,0 @@
-#pragma once
-
-#include "object.h"
-#include <ydb/services/metadata/abstract/kqp_common.h>
-
-namespace NKikimr::NColumnShard::NTiers {
-
-class TTierConfigBehaviour: public NMetadata::TClassBehaviour<TTierConfig> {
-private:
- static TFactory::TRegistrator<TTierConfigBehaviour> Registrator;
-protected:
- virtual std::shared_ptr<NMetadata::NInitializer::IInitializationBehaviour> ConstructInitializer() const override;
- virtual std::shared_ptr<NMetadata::NModifications::IOperationsManager> ConstructOperationsManager() const override;
-
- virtual TString GetInternalStorageTablePath() const override;
- virtual TString GetTypeId() const override;
-
-};
-
-}
diff --git a/ydb/core/tx/tiering/tier/checker.cpp b/ydb/core/tx/tiering/tier/checker.cpp
deleted file mode 100644
index af9d88ae691..00000000000
--- a/ydb/core/tx/tiering/tier/checker.cpp
+++ /dev/null
@@ -1,52 +0,0 @@
-#include "checker.h"
-
-#include <ydb/core/tx/tiering/external_data.h>
-#include <ydb/services/metadata/secret/fetcher.h>
-
-namespace NKikimr::NColumnShard::NTiers {
-
-void TTierPreparationActor::StartChecker() {
- if (!Secrets) {
- return;
- }
- auto g = PassAwayGuard();
- if (const auto& userToken = Context.GetExternalData().GetUserToken()) {
- for (auto&& tier : Objects) {
- if (!Secrets->CheckSecretAccess(tier.GetAccessKey(), *userToken)) {
- Controller->OnPreparationProblem("no access for secret: " + tier.GetAccessKey().DebugString());
- return;
- } else if (!Secrets->CheckSecretAccess(tier.GetSecretKey(), *userToken)) {
- Controller->OnPreparationProblem("no access for secret: " + tier.GetSecretKey().DebugString());
- return;
- }
- }
- }
- Controller->OnPreparationFinished(std::move(Objects));
-}
-
-void TTierPreparationActor::Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev) {
- if (auto snapshot = ev->Get()->GetSnapshotPtrAs<NMetadata::NSecret::TSnapshot>()) {
- Secrets = snapshot;
- } else {
- Y_ABORT_UNLESS(false);
- }
- StartChecker();
-}
-
-void TTierPreparationActor::Bootstrap() {
- Become(&TThis::StateMain);
- Send(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()),
- new NMetadata::NProvider::TEvAskSnapshot(std::make_shared<NMetadata::NSecret::TSnapshotsFetcher>()));
-}
-
-TTierPreparationActor::TTierPreparationActor(std::vector<TTierConfig>&& objects,
- NMetadata::NModifications::IAlterPreparationController<TTierConfig>::TPtr controller,
- const NMetadata::NModifications::IOperationsManager::TInternalModificationContext& context)
- : Objects(std::move(objects))
- , Controller(controller)
- , Context(context)
-{
-
-}
-
-}
diff --git a/ydb/core/tx/tiering/tier/checker.h b/ydb/core/tx/tiering/tier/checker.h
deleted file mode 100644
index 109c9de208a..00000000000
--- a/ydb/core/tx/tiering/tier/checker.h
+++ /dev/null
@@ -1,38 +0,0 @@
-#pragma once
-#include "object.h"
-
-#include <ydb/core/tx/schemeshard/schemeshard.h>
-#include <ydb/core/tx/tiering/snapshot.h>
-
-#include <ydb/services/metadata/abstract/common.h>
-#include <ydb/services/metadata/abstract/kqp_common.h>
-#include <ydb/services/metadata/manager/preparation_controller.h>
-#include <ydb/services/metadata/secret/snapshot.h>
-
-namespace NKikimr::NColumnShard::NTiers {
-
-class TTierPreparationActor: public NActors::TActorBootstrapped<TTierPreparationActor> {
-private:
- std::vector<TTierConfig> Objects;
- NMetadata::NModifications::IAlterPreparationController<TTierConfig>::TPtr Controller;
- NMetadata::NModifications::IOperationsManager::TInternalModificationContext Context;
- std::shared_ptr<NMetadata::NSecret::TSnapshot> Secrets;
- void StartChecker();
-protected:
- void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev);
-public:
- STATEFN(StateMain) {
- switch (ev->GetTypeRewrite()) {
- hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, Handle);
- default:
- break;
- }
- }
- void Bootstrap();
-
- TTierPreparationActor(std::vector<TTierConfig>&& objects,
- NMetadata::NModifications::IAlterPreparationController<TTierConfig>::TPtr controller,
- const NMetadata::NModifications::IOperationsManager::TInternalModificationContext& context);
-};
-
-}
diff --git a/ydb/core/tx/tiering/tier/initializer.cpp b/ydb/core/tx/tiering/tier/initializer.cpp
deleted file mode 100644
index 9bf517856a9..00000000000
--- a/ydb/core/tx/tiering/tier/initializer.cpp
+++ /dev/null
@@ -1,36 +0,0 @@
-#include "initializer.h"
-#include "object.h"
-
-namespace NKikimr::NColumnShard::NTiers {
-
-TVector<NKikimr::NMetadata::NInitializer::ITableModifier::TPtr> TTiersInitializer::BuildModifiers() const {
- TVector<NMetadata::NInitializer::ITableModifier::TPtr> result;
- {
- Ydb::Table::CreateTableRequest request;
- request.set_session_id("");
- request.set_path(TTierConfig::GetBehaviour()->GetStorageTablePath());
- request.add_primary_key("tierName");
- {
- auto& column = *request.add_columns();
- column.set_name("tierName");
- column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UTF8);
- }
- {
- auto& column = *request.add_columns();
- column.set_name("tierConfig");
- column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UTF8);
- }
- result.emplace_back(new NMetadata::NInitializer::TGenericTableModifier<NMetadata::NRequest::TDialogCreateTable>(request, "create"));
- auto hRequest = TTierConfig::AddHistoryTableScheme(request);
- result.emplace_back(new NMetadata::NInitializer::TGenericTableModifier<NMetadata::NRequest::TDialogCreateTable>(hRequest, "create_history"));
- }
- result.emplace_back(NMetadata::NInitializer::TACLModifierConstructor::GetReadOnlyModifier(TTierConfig::GetBehaviour()->GetStorageTablePath(), "acl"));
- result.emplace_back(NMetadata::NInitializer::TACLModifierConstructor::GetReadOnlyModifier(TTierConfig::GetBehaviour()->GetStorageHistoryTablePath(), "acl_history"));
- return result;
-}
-
-void TTiersInitializer::DoPrepare(NMetadata::NInitializer::IInitializerInput::TPtr controller) const {
- controller->OnPreparationFinished(BuildModifiers());
-}
-
-}
diff --git a/ydb/core/tx/tiering/tier/initializer.h b/ydb/core/tx/tiering/tier/initializer.h
deleted file mode 100644
index e3d99fffea0..00000000000
--- a/ydb/core/tx/tiering/tier/initializer.h
+++ /dev/null
@@ -1,15 +0,0 @@
-#pragma once
-#include <ydb/services/metadata/abstract/common.h>
-#include <ydb/services/metadata/initializer/common.h>
-#include <ydb/services/metadata/abstract/initialization.h>
-
-namespace NKikimr::NColumnShard::NTiers {
-
-class TTiersInitializer: public NMetadata::NInitializer::IInitializationBehaviour {
-protected:
- TVector<NMetadata::NInitializer::ITableModifier::TPtr> BuildModifiers() const;
- virtual void DoPrepare(NMetadata::NInitializer::IInitializerInput::TPtr controller) const override;
-public:
-};
-
-}
diff --git a/ydb/core/tx/tiering/tier/manager.cpp b/ydb/core/tx/tiering/tier/manager.cpp
deleted file mode 100644
index 8d60219624b..00000000000
--- a/ydb/core/tx/tiering/tier/manager.cpp
+++ /dev/null
@@ -1,77 +0,0 @@
-#include "manager.h"
-#include "initializer.h"
-#include "checker.h"
-
-namespace NKikimr::NColumnShard::NTiers {
-
-NMetadata::NModifications::TOperationParsingResult TTiersManager::DoBuildPatchFromSettings(
- const NYql::TObjectSettingsImpl& settings,
- TInternalModificationContext& context) const
-{
- if (HasAppData() && !AppDataVerified().FeatureFlags.GetEnableTieringInColumnShard()) {
- return TConclusionStatus::Fail("Tiering functionality is disabled for OLAP tables.");
- }
-
- NMetadata::NInternal::TTableRecord result;
- result.SetColumn(TTierConfig::TDecoder::TierName, NMetadata::NInternal::TYDBValue::Utf8(settings.GetObjectId()));
- if (settings.GetObjectId().StartsWith("$") || settings.GetObjectId().StartsWith("_")) {
- return TConclusionStatus::Fail("tier name cannot start with '$', '_' characters");
- }
- {
- auto fConfig = settings.GetFeaturesExtractor().Extract(TTierConfig::TDecoder::TierConfig);
- if (fConfig) {
- NKikimrSchemeOp::TStorageTierConfig proto;
- if (!::google::protobuf::TextFormat::ParseFromString(*fConfig, &proto)) {
- return TConclusionStatus::Fail("incorrect proto format");
- } else if (proto.HasObjectStorage()) {
- TString defaultUserId;
- if (context.GetExternalData().GetUserToken()) {
- defaultUserId = context.GetExternalData().GetUserToken()->GetUserSID();
- }
-
- if (proto.GetObjectStorage().HasSecretableAccessKey()) {
- auto accessKey = NMetadata::NSecret::TSecretIdOrValue::DeserializeFromProto(proto.GetObjectStorage().GetSecretableAccessKey(), defaultUserId);
- if (!accessKey) {
- return TConclusionStatus::Fail("AccessKey description is incorrect");
- }
- *proto.MutableObjectStorage()->MutableSecretableAccessKey() = accessKey->SerializeToProto();
- } else if (proto.GetObjectStorage().HasAccessKey()) {
- auto accessKey = NMetadata::NSecret::TSecretIdOrValue::DeserializeFromString(proto.GetObjectStorage().GetAccessKey(), defaultUserId);
- if (!accessKey) {
- return TConclusionStatus::Fail("AccessKey is incorrect: " + proto.GetObjectStorage().GetAccessKey() + " for userId: " + defaultUserId);
- }
- *proto.MutableObjectStorage()->MutableAccessKey() = accessKey->SerializeToString();
- } else {
- return TConclusionStatus::Fail("AccessKey not configured");
- }
-
- if (proto.GetObjectStorage().HasSecretableSecretKey()) {
- auto secretKey = NMetadata::NSecret::TSecretIdOrValue::DeserializeFromProto(proto.GetObjectStorage().GetSecretableSecretKey(), defaultUserId);
- if (!secretKey) {
- return TConclusionStatus::Fail("SecretKey description is incorrect");
- }
- *proto.MutableObjectStorage()->MutableSecretableSecretKey() = secretKey->SerializeToProto();
- } else if (proto.GetObjectStorage().HasSecretKey()) {
- auto secretKey = NMetadata::NSecret::TSecretIdOrValue::DeserializeFromString(proto.GetObjectStorage().GetSecretKey(), defaultUserId);
- if (!secretKey) {
- return TConclusionStatus::Fail("SecretKey is incorrect");
- }
- *proto.MutableObjectStorage()->MutableSecretKey() = secretKey->SerializeToString();
- } else {
- return TConclusionStatus::Fail("SecretKey not configured");
- }
- }
- result.SetColumn(TTierConfig::TDecoder::TierConfig, NMetadata::NInternal::TYDBValue::Utf8(proto.DebugString()));
- }
- }
- return result;
-}
-
-void TTiersManager::DoPrepareObjectsBeforeModification(std::vector<TTierConfig>&& patchedObjects,
- NMetadata::NModifications::IAlterPreparationController<TTierConfig>::TPtr controller,
- const TInternalModificationContext& context, const NMetadata::NModifications::TAlterOperationContext& /*alterContext*/) const
-{
- TActivationContext::Register(new TTierPreparationActor(std::move(patchedObjects), controller, context));
-}
-
-}
diff --git a/ydb/core/tx/tiering/tier/manager.h b/ydb/core/tx/tiering/tier/manager.h
deleted file mode 100644
index 7d8626c8c36..00000000000
--- a/ydb/core/tx/tiering/tier/manager.h
+++ /dev/null
@@ -1,19 +0,0 @@
-#pragma once
-#include "object.h"
-
-#include <ydb/services/metadata/manager/generic_manager.h>
-
-namespace NKikimr::NColumnShard::NTiers {
-
-class TTiersManager: public NMetadata::NModifications::TGenericOperationsManager<TTierConfig> {
-protected:
- virtual void DoPrepareObjectsBeforeModification(std::vector<TTierConfig>&& patchedObjects,
- NMetadata::NModifications::IAlterPreparationController<TTierConfig>::TPtr controller,
- const TInternalModificationContext& context, const NMetadata::NModifications::TAlterOperationContext& alterContext) const override;
-
- virtual NMetadata::NModifications::TOperationParsingResult DoBuildPatchFromSettings(const NYql::TObjectSettingsImpl& settings,
- TInternalModificationContext& context) const override;
-public:
-};
-
-}
diff --git a/ydb/core/tx/tiering/tier/object.cpp b/ydb/core/tx/tiering/tier/object.cpp
index 7b98282fd6a..37b0cddab5a 100644
--- a/ydb/core/tx/tiering/tier/object.cpp
+++ b/ydb/core/tx/tiering/tier/object.cpp
@@ -1,63 +1,30 @@
#include "object.h"
-#include "behaviour.h"
-
-#include <ydb/core/tx/tiering/tier/checker.h>
-
-#include <ydb/services/metadata/manager/ydb_value_operator.h>
-#include <ydb/services/metadata/secret/fetcher.h>
#include <library/cpp/json/writer/json_value.h>
#include <library/cpp/protobuf/json/proto2json.h>
+#include <library/cpp/uri/uri.h>
namespace NKikimr::NColumnShard::NTiers {
-NMetadata::IClassBehaviour::TPtr TTierConfig::GetBehaviour() {
- static std::shared_ptr<TTierConfigBehaviour> result = std::make_shared<TTierConfigBehaviour>();
- return result;
-}
-
-NJson::TJsonValue TTierConfig::GetDebugJson() const {
- NJson::TJsonValue result = NJson::JSON_MAP;
- result.InsertValue(TDecoder::TierName, TierName);
- NProtobufJson::Proto2Json(ProtoConfig, result.InsertValue(TDecoder::TierConfig, NJson::JSON_MAP));
- return result;
-}
-
-bool TTierConfig::IsSame(const TTierConfig& item) const {
- return TierName == item.TierName && ProtoConfig.SerializeAsString() == item.ProtoConfig.SerializeAsString();
-}
-
-bool TTierConfig::DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& r) {
- if (!decoder.Read(decoder.GetTierNameIdx(), TierName, r)) {
- return false;
- }
- if (!decoder.ReadDebugProto(decoder.GetTierConfigIdx(), ProtoConfig, r)) {
- return false;
- }
- return ProtoConfig.HasObjectStorage();
-}
-
-NMetadata::NInternal::TTableRecord TTierConfig::SerializeToRecord() const {
- NMetadata::NInternal::TTableRecord result;
- result.SetColumn(TDecoder::TierName, NMetadata::NInternal::TYDBValue::Utf8(TierName));
- result.SetColumn(TDecoder::TierConfig, NMetadata::NInternal::TYDBValue::Utf8(ProtoConfig.DebugString()));
- return result;
-}
-
-NKikimrSchemeOp::TS3Settings TTierConfig::GetPatchedConfig(std::shared_ptr<NMetadata::NSecret::TSnapshot> secrets) const {
- auto config = ProtoConfig.GetObjectStorage();
+TConclusion<NKikimrSchemeOp::TS3Settings> TTierConfig::GetPatchedConfig(
+ const std::shared_ptr<NMetadata::NSecret::ISecretAccessor>& secrets) const {
+ auto config = ProtoConfig;
if (secrets) {
{
- auto value = secrets->GetSecretValue(GetAccessKey());
+ auto secretIdOrValue = NMetadata::NSecret::TSecretIdOrValue::DeserializeFromString(config.GetAccessKey());
+ AFL_VERIFY(secretIdOrValue);
+ auto value = secrets->GetSecretValue(*secretIdOrValue);
if (value.IsFail()) {
- AFL_ERROR(NKikimrServices::TX_TIERING)("error", "invalid_secret")("object", "access_key")("reason", value.GetErrorMessage());
+ return TConclusionStatus::Fail(TStringBuilder() << "Can't read access key: " << value.GetErrorMessage());
}
config.SetAccessKey(value.DetachResult());
}
{
- auto value = secrets->GetSecretValue(GetSecretKey());
+ auto secretIdOrValue = NMetadata::NSecret::TSecretIdOrValue::DeserializeFromString(config.GetSecretKey());
+ AFL_VERIFY(secretIdOrValue);
+ auto value = secrets->GetSecretValue(*secretIdOrValue);
if (value.IsFail()) {
- AFL_ERROR(NKikimrServices::TX_TIERING)("error", "invalid_secret")("object", "secret_key")("reason", value.GetErrorMessage());
+ return TConclusionStatus::Fail(TStringBuilder() << "Can't read secret key: " << value.GetErrorMessage());
}
config.SetSecretKey(value.DetachResult());
}
@@ -65,10 +32,67 @@ NKikimrSchemeOp::TS3Settings TTierConfig::GetPatchedConfig(std::shared_ptr<NMeta
return config;
}
+TConclusionStatus TTierConfig::DeserializeFromProto(const NKikimrSchemeOp::TExternalDataSourceDescription& proto) {
+ if (!proto.GetAuth().HasAws()) {
+ return TConclusionStatus::Fail("AWS auth is not defined for storage tier");
+ }
+
+ ProtoConfig.SetAccessKey(NMetadata::NSecret::TSecretName(proto.GetAuth().GetAws().GetAwsAccessKeyIdSecretName()).SerializeToString());
+ ProtoConfig.SetSecretKey(
+ NMetadata::NSecret::TSecretName(proto.GetAuth().GetAws().GetAwsSecretAccessKeySecretName()).SerializeToString());
+
+ NUri::TUri url;
+ if (url.Parse(proto.GetLocation(), NUri::TFeature::FeaturesAll) != NUri::TState::EParsed::ParsedOK) {
+ return TConclusionStatus::Fail("Cannot parse url: " + proto.GetLocation());
+ }
+
+ switch (url.GetScheme()) {
+ case NUri::TScheme::SchemeEmpty:
+ break;
+ case NUri::TScheme::SchemeHTTP:
+ ProtoConfig.SetScheme(::NKikimrSchemeOp::TS3Settings_EScheme_HTTP);
+ break;
+ case NUri::TScheme::SchemeHTTPS:
+ ProtoConfig.SetScheme(::NKikimrSchemeOp::TS3Settings_EScheme_HTTPS);
+ break;
+ default:
+ return TConclusionStatus::Fail("Unknown schema in url");
+ }
+
+ {
+ TStringBuf endpoint;
+ TStringBuf bucket;
+
+ TStringBuf host = url.GetHost();
+ TStringBuf path = url.GetField(NUri::TField::FieldPath);
+ if (!path.Empty()) {
+ endpoint = host;
+ bucket = path;
+ bucket.SkipPrefix("/");
+ if (bucket.Contains("/")) {
+ return TConclusionStatus::Fail(TStringBuilder() << "Not a bucket (contains directories): " << bucket);
+ }
+ } else {
+ if (!path.TrySplit('.', endpoint, bucket)) {
+ return TConclusionStatus::Fail(TStringBuilder() << "Bucket is not specified in URL: " << path);
+ }
+ }
+
+ ProtoConfig.SetEndpoint(TString(endpoint));
+ ProtoConfig.SetBucket(TString(bucket));
+ }
+
+ return TConclusionStatus::Success();
+}
+
NJson::TJsonValue TTierConfig::SerializeConfigToJson() const {
NJson::TJsonValue result;
NProtobufJson::Proto2Json(ProtoConfig, result);
return result;
}
+bool TTierConfig::IsSame(const TTierConfig& item) const {
+ return ProtoConfig.SerializeAsString() == item.ProtoConfig.SerializeAsString();
+}
+
}
diff --git a/ydb/core/tx/tiering/tier/object.h b/ydb/core/tx/tiering/tier/object.h
index 89ce3cdfd83..9da470a0104 100644
--- a/ydb/core/tx/tiering/tier/object.h
+++ b/ydb/core/tx/tiering/tier/object.h
@@ -1,11 +1,10 @@
#pragma once
#include <ydb/core/protos/flat_scheme_op.pb.h>
-#include <ydb/services/metadata/abstract/decoder.h>
-#include <ydb/services/metadata/manager/preparation_controller.h>
-#include <ydb/services/metadata/manager/table_record.h>
-#include <ydb/services/metadata/manager/object.h>
-#include <ydb/services/metadata/secret/snapshot.h>
-#include <ydb/services/metadata/service.h>
+
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/library/conclusion/status.h>
+#include <ydb/services/metadata/secret/accessor/secret_id.h>
+#include <ydb/services/metadata/secret/accessor/snapshot.h>
#include <library/cpp/json/writer/json_value.h>
@@ -15,72 +14,26 @@ class TSnapshot;
namespace NKikimr::NColumnShard::NTiers {
-class TTierConfig: public NMetadata::NModifications::TObject<TTierConfig> {
+class TTierConfig {
private:
- using TTierProto = NKikimrSchemeOp::TStorageTierConfig;
- YDB_ACCESSOR_DEF(TString, TierName);
- TTierProto ProtoConfig;
-public:
+ using TTierProto = NKikimrSchemeOp::TS3Settings;
+ YDB_READONLY_DEF(TTierProto, ProtoConfig);
+ YDB_READONLY_DEF(NKikimrSchemeOp::TCompressionOptions, Compression);
+public:
TTierConfig() = default;
- TTierConfig(const TString& tierName)
- : TierName(tierName) {
-
- }
-
- TTierConfig(const TString& tierName, const TTierProto& config)
- : TierName(tierName)
- , ProtoConfig(config)
- {
-
- }
-
- const NKikimrSchemeOp::TCompressionOptions& GetCompression() const {
- return ProtoConfig.GetCompression();
- }
-
- NMetadata::NSecret::TSecretIdOrValue GetAccessKey() const {
- auto accessKey = NMetadata::NSecret::TSecretIdOrValue::DeserializeFromOptional(ProtoConfig.GetObjectStorage().GetSecretableAccessKey(), ProtoConfig.GetObjectStorage().GetAccessKey());
- if (!accessKey) {
- return NMetadata::NSecret::TSecretIdOrValue::BuildEmpty();
- }
- return *accessKey;
+ TTierConfig(const TTierProto& config, const NKikimrSchemeOp::TCompressionOptions& compression)
+ : ProtoConfig(config)
+ , Compression(compression) {
}
- NMetadata::NSecret::TSecretIdOrValue GetSecretKey() const {
- auto secretKey = NMetadata::NSecret::TSecretIdOrValue::DeserializeFromOptional(ProtoConfig.GetObjectStorage().GetSecretableSecretKey(), ProtoConfig.GetObjectStorage().GetSecretKey());
- if (!secretKey) {
- return NMetadata::NSecret::TSecretIdOrValue::BuildEmpty();
- }
- return *secretKey;
- }
+ TConclusionStatus DeserializeFromProto(const NKikimrSchemeOp::TExternalDataSourceDescription& proto);
NJson::TJsonValue SerializeConfigToJson() const;
-
- static NMetadata::IClassBehaviour::TPtr GetBehaviour();
- NKikimrSchemeOp::TS3Settings GetPatchedConfig(std::shared_ptr<NMetadata::NSecret::TSnapshot> secrets) const;
-
- class TDecoder: public NMetadata::NInternal::TDecoderBase {
- private:
- YDB_READONLY(i32, TierNameIdx, -1);
- YDB_READONLY(i32, TierConfigIdx, -1);
- public:
- static inline const TString TierName = "tierName";
- static inline const TString TierConfig = "tierConfig";
- TDecoder(const Ydb::ResultSet& rawData) {
- TierNameIdx = GetFieldIndex(rawData, TierName);
- TierConfigIdx = GetFieldIndex(rawData, TierConfig);
- }
- };
- bool DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& r);
- NMetadata::NInternal::TTableRecord SerializeToRecord() const;
+ TConclusion<NKikimrSchemeOp::TS3Settings> GetPatchedConfig(const std::shared_ptr<NMetadata::NSecret::ISecretAccessor>& secrets) const;
bool IsSame(const TTierConfig& item) const;
NJson::TJsonValue GetDebugJson() const;
- static TString GetTypeId() {
- return "TIER";
- }
};
-
}
diff --git a/ydb/core/tx/tiering/tier/ss_checker.cpp b/ydb/core/tx/tiering/tier/ss_checker.cpp
deleted file mode 100644
index 7626a13a309..00000000000
--- a/ydb/core/tx/tiering/tier/ss_checker.cpp
+++ /dev/null
@@ -1,26 +0,0 @@
-#include "ss_checker.h"
-
-#include <ydb/core/tx/scheme_cache/scheme_cache.h>
-#include <ydb/library/services/services.pb.h>
-
-namespace NKikimr::NColumnShard::NTiers {
-
-void TSSFetchingActor::Handle(NSchemeShard::TEvSchemeShard::TEvProcessingResponse::TPtr& ev) {
- auto g = PassAwayGuard();
- Controller->FetchingResult(ev->Get()->Record);
-}
-
-TSSFetchingActor::TSSFetchingActor(NSchemeShard::ISSDataProcessor::TPtr processor,
- ISSFetchingController::TPtr controller, const TDuration livetime)
- : TBase(livetime)
- , Processor(processor)
- , Controller(controller)
-{
-
-}
-
-constexpr NKikimrServices::TActivity::EType TSSFetchingActor::ActorActivityType() {
- return NKikimrServices::TActivity::SS_FETCHING_ACTOR;
-}
-
-}
diff --git a/ydb/core/tx/tiering/tier/ss_checker.h b/ydb/core/tx/tiering/tier/ss_checker.h
deleted file mode 100644
index d1e9e777e5f..00000000000
--- a/ydb/core/tx/tiering/tier/ss_checker.h
+++ /dev/null
@@ -1,68 +0,0 @@
-#pragma once
-#include "object.h"
-
-#include <ydb/core/base/appdata.h>
-#include <ydb/core/base/tablet_pipecache.h>
-#include <ydb/core/tx/schemeshard/schemeshard.h>
-#include <ydb/core/tx/scheme_cache/scheme_cache.h>
-#include <ydb/core/tx/tx_proxy/proxy.h>
-#include <ydb/services/metadata/common/ss_dialog.h>
-
-namespace NKikimr::NColumnShard::NTiers {
-
-class ISSFetchingController {
-public:
- using TPtr = std::shared_ptr<ISSFetchingController>;
- virtual ~ISSFetchingController() = default;
- virtual void FetchingProblem(const TString& errorMessage) const = 0;
- virtual void FetchingResult(const NKikimrScheme::TEvProcessingResponse& result) const = 0;
-};
-
-class TSSFetchingController: public ISSFetchingController {
-private:
- const TActorIdentity ActorId;
-public:
- TSSFetchingController(const TActorIdentity& actorId)
- : ActorId(actorId) {
-
- }
-
- virtual void FetchingProblem(const TString& errorMessage) const override {
- ActorId.Send(ActorId, new NSchemeShard::TEvSchemeShard::TEvProcessingResponse(errorMessage));
- }
- virtual void FetchingResult(const NKikimrScheme::TEvProcessingResponse& result) const override {
- ActorId.Send(ActorId, new NSchemeShard::TEvSchemeShard::TEvProcessingResponse(result));
- }
-};
-
-class TSSFetchingActor: public NMetadata::NInternal::TSSDialogActor {
-private:
- using TBase = NMetadata::NInternal::TSSDialogActor;
- NSchemeShard::ISSDataProcessor::TPtr Processor;
- ISSFetchingController::TPtr Controller;
- void Handle(NSchemeShard::TEvSchemeShard::TEvProcessingResponse::TPtr& ev);
-protected:
- virtual void OnBootstrap() override {
- UnsafeBecome(&TSSFetchingActor::StateMain);
- TBase::OnBootstrap();
- }
- virtual void OnFail(const TString& errorMessage) override {
- Controller->FetchingProblem(errorMessage);
- }
- virtual void Execute() override {
- auto req = std::make_unique<NSchemeShard::TEvSchemeShard::TEvProcessingRequest>(*Processor);
- Send(SchemeShardPipe, new TEvPipeCache::TEvForward(req.release(), SchemeShardId, false));
- }
-public:
- static constexpr NKikimrServices::TActivity::EType ActorActivityType();
- STFUNC(StateMain) {
- switch (ev->GetTypeRewrite()) {
- hFunc(NSchemeShard::TEvSchemeShard::TEvProcessingResponse, Handle);
- default:
- TBase::StateMain(ev);
- }
- }
- TSSFetchingActor(NSchemeShard::ISSDataProcessor::TPtr processor, ISSFetchingController::TPtr controller, const TDuration livetime);
-};
-
-}
diff --git a/ydb/core/tx/tiering/tier/ss_fetcher.cpp b/ydb/core/tx/tiering/tier/ss_fetcher.cpp
deleted file mode 100644
index e822ace4c5a..00000000000
--- a/ydb/core/tx/tiering/tier/ss_fetcher.cpp
+++ /dev/null
@@ -1,77 +0,0 @@
-#include "ss_fetcher.h"
-#include <ydb/core/tx/schemeshard/schemeshard.h>
-
-namespace NKikimr::NColumnShard::NTiers {
-
-TFetcherCheckUserTieringPermissions::TFactory::TRegistrator<TFetcherCheckUserTieringPermissions>
- TFetcherCheckUserTieringPermissions::Registrator(TFetcherCheckUserTieringPermissions::GetTypeIdStatic());
-
-void TFetcherCheckUserTieringPermissions::DoProcess(NSchemeShard::TSchemeShard& schemeShard, NKikimrScheme::TEvProcessingResponse& result) const {
- TResult content;
- content.MutableContent().SetOperationAllow(true);
- ui32 access = 0;
- access |= NACLib::EAccessRights::AlterSchema;
-
- if (ActivityType == NMetadata::NModifications::IOperationsManager::EActivityType::Undefined) {
- content.Deny("undefined activity type");
- } else {
- bool denied = false;
- for (auto&& i : TieringRuleIds) {
- const auto& pathIds = schemeShard.ColumnTables.GetTablesWithTiering(i);
- for (auto&& pathId : pathIds) {
- auto path = NSchemeShard::TPath::Init(pathId, &schemeShard);
- if (!path.IsResolved() || path.IsUnderDeleting() || path.IsDeleted()) {
- continue;
- }
- if (ActivityType == NMetadata::NModifications::IOperationsManager::EActivityType::Drop) {
- denied = true;
- content.Deny("tiering in using by table");
- break;
- } else if (ActivityType == NMetadata::NModifications::IOperationsManager::EActivityType::Alter) {
- if (!UserToken) {
- continue;
- }
- TSecurityObject sObject(path->Owner, path->ACL, path->IsContainer());
- if (!sObject.CheckAccess(access, *UserToken)) {
- denied = true;
- content.Deny("no alter permissions for affected table");
- break;
- }
- }
- }
- if (denied) {
- break;
- }
- }
- }
- result.MutableContent()->SetData(content.SerializeToString());
-}
-
-bool TFetcherCheckUserTieringPermissions::DoDeserializeFromProto(const TProtoClass& protoData) {
- if (!TryFromString(protoData.GetActivityType(), ActivityType)) {
- ALS_ERROR(0) << "Cannot parse activity type: undefined value = " << protoData.GetActivityType();
- return false;
- }
- if (protoData.GetUserToken()) {
- NACLib::TUserToken uToken(protoData.GetUserToken());
- UserToken = uToken;
- }
- for (auto&& i : protoData.GetTieringRuleIds()) {
- TieringRuleIds.emplace(i);
- }
- return true;
-}
-
-NKikimr::NColumnShard::NTiers::TFetcherCheckUserTieringPermissions::TProtoClass TFetcherCheckUserTieringPermissions::DoSerializeToProto() const {
- TProtoClass result;
- result.SetActivityType(::ToString(ActivityType));
- if (UserToken) {
- result.SetUserToken(UserToken->SerializeAsString());
- }
- for (auto&& i : TieringRuleIds) {
- *result.AddTieringRuleIds() = i;
- }
- return result;
-}
-
-}
diff --git a/ydb/core/tx/tiering/tier/ss_fetcher.h b/ydb/core/tx/tiering/tier/ss_fetcher.h
deleted file mode 100644
index 8d0891d7362..00000000000
--- a/ydb/core/tx/tiering/tier/ss_fetcher.h
+++ /dev/null
@@ -1,79 +0,0 @@
-#pragma once
-#include "object.h"
-
-#include <ydb/core/protos/flat_tx_scheme.pb.h>
-#include <ydb/core/tx/schemeshard/schemeshard_impl.h>
-#include <ydb/core/tx/tiering/common.h>
-
-#include <ydb/services/bg_tasks/abstract/interface.h>
-
-namespace NKikimr::NColumnShard::NTiers {
-
-class TFetcherCheckUserTieringPermissionsResult: public NBackgroundTasks::IProtoStringSerializable<
- NKikimrScheme::TFetcherCheckUserTieringPermissionsResult, NBackgroundTasks::IStringSerializable> {
-private:
- using TProtoClass = NKikimrScheme::TFetcherCheckUserTieringPermissionsResult;
- YDB_ACCESSOR_DEF(TProtoClass, Content);
-protected:
- virtual TProtoClass DoSerializeToProto() const override {
- return Content;
- }
- virtual bool DoDeserializeFromProto(const TProtoClass& protoData) override {
- Content = protoData;
- return true;
- }
-public:
- void Deny(const TString& reason) {
- Content.SetOperationAllow(false);
- Content.SetDenyReason(reason);
- }
-};
-
-class TFetcherCheckUserTieringPermissions: public NBackgroundTasks::IProtoStringSerializable<
- NKikimrScheme::TFetcherCheckUserTieringPermissions, NSchemeShard::ISSDataProcessor> {
-private:
- using TBase = NBackgroundTasks::IProtoStringSerializable<
- NKikimrScheme::TFetcherCheckUserTieringPermissions, NSchemeShard::ISSDataProcessor>;
- using TBase::TFactory;
- using TProtoClass = NKikimrScheme::TFetcherCheckUserTieringPermissions;
- static TFactory::TRegistrator<TFetcherCheckUserTieringPermissions> Registrator;
- YDB_ACCESSOR_DEF(std::set<TString>, TieringRuleIds);
- YDB_ACCESSOR_DEF(std::optional<NACLib::TUserToken>, UserToken);
- YDB_ACCESSOR(NMetadata::NModifications::IOperationsManager::EActivityType, ActivityType,
- NMetadata::NModifications::IOperationsManager::EActivityType::Undefined);
-protected:
- virtual TProtoClass DoSerializeToProto() const override;
- virtual bool DoDeserializeFromProto(const TProtoClass& protoData) override;
- virtual void DoProcess(NSchemeShard::TSchemeShard& schemeShard, NKikimrScheme::TEvProcessingResponse& result) const override;
-public:
- using TResult = TFetcherCheckUserTieringPermissionsResult;
- std::optional<TFetcherCheckUserTieringPermissionsResult> UnpackResult(const TString& content) const {
- TFetcherCheckUserTieringPermissionsResult result;
- if (!result.DeserializeFromString(content)) {
- return {};
- } else {
- return result;
- }
- }
-
- TFetcherCheckUserTieringPermissions() = default;
-
- virtual TString DebugString() const override {
- TStringBuilder sb;
- sb << "USID=" << (UserToken ? UserToken->GetUserSID() : "nobody") << ";";
- sb << "tierings=";
- for (auto&& i : TieringRuleIds) {
- sb << i << ",";
- }
- sb << ";";
- return sb;
- }
- virtual TString GetClassName() const override {
- return GetTypeIdStatic();
- }
- static TString GetTypeIdStatic() {
- return "ss_fetcher_tiering_permissions";
- }
-};
-
-}
diff --git a/ydb/core/tx/tiering/tier/ya.make b/ydb/core/tx/tiering/tier/ya.make
index 822435a2ec4..32f06144cbd 100644
--- a/ydb/core/tx/tiering/tier/ya.make
+++ b/ydb/core/tx/tiering/tier/ya.make
@@ -1,20 +1,12 @@
LIBRARY()
SRCS(
- manager.cpp
object.cpp
- initializer.cpp
- checker.cpp
- GLOBAL behaviour.cpp
- ss_checker.cpp
)
PEERDIR(
- ydb/services/bg_tasks/abstract
- ydb/services/metadata/initializer
- ydb/services/metadata/abstract
- ydb/services/metadata/secret
- ydb/core/tx/schemeshard
+ ydb/library/conclusion
+ ydb/services/metadata/secret/accessor
)
YQL_LAST_ABI_VERSION()
diff --git a/ydb/core/tx/tiering/ut/ut_tiers.cpp b/ydb/core/tx/tiering/ut/ut_tiers.cpp
index 0afdde16416..59f3ddf8a17 100644
--- a/ydb/core/tx/tiering/ut/ut_tiers.cpp
+++ b/ydb/core/tx/tiering/ut/ut_tiers.cpp
@@ -1,15 +1,17 @@
#include <ydb/core/cms/console/configs_dispatcher.h>
+#include <ydb/core/formats/arrow/size_calcer.h>
#include <ydb/core/testlib/cs_helper.h>
-#include <ydb/core/tx/tiering/external_data.h>
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
+#include <ydb/core/tx/columnshard/hooks/testing/ro_controller.h>
#include <ydb/core/tx/schemeshard/schemeshard.h>
+#include <ydb/core/tx/tiering/manager.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
-#include <ydb/core/formats/arrow/size_calcer.h>
-#include <ydb/core/wrappers/ut_helpers/s3_mock.h>
-#include <ydb/core/wrappers/s3_wrapper.h>
#include <ydb/core/wrappers/fake_storage.h>
-#include <ydb/core/tx/columnshard/hooks/testing/ro_controller.h>
+#include <ydb/core/wrappers/s3_wrapper.h>
+#include <ydb/core/wrappers/ut_helpers/s3_mock.h>
+
#include <ydb/library/accessor/accessor.h>
+#include <ydb/library/actors/core/av_bootstrapped.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <ydb/services/metadata/manager/alter.h>
#include <ydb/services/metadata/manager/common.h>
@@ -17,10 +19,8 @@
#include <ydb/services/metadata/manager/ydb_value_operator.h>
#include <ydb/services/metadata/service.h>
-#include <ydb/library/actors/core/av_bootstrapped.h>
#include <library/cpp/protobuf/json/proto2json.h>
#include <library/cpp/testing/unittest/registar.h>
-
#include <util/system/hostname.h>
namespace NKikimr {
@@ -122,178 +122,79 @@ public:
}
)", tableName.c_str(), tableShardsCount, shardingFunction.c_str(), shardingColumns.c_str()));
}
-};
-
-Y_UNIT_TEST_SUITE(ColumnShardTiers) {
-
- TString GetConfigProtoWithName(const TString & tierName) {
- return TStringBuilder() << "Name : \"" << tierName << "\"\n" <<
- R"(
- ObjectStorage : {
- Endpoint: "fake"
- Bucket: "fake"
- SecretableAccessKey: {
- Value: {
- Data: "secretAccessKey"
- }
- }
- SecretableSecretKey: {
- Value: {
- Data: "secretSecretKey"
- }
- }
- }
- )";
+ void CreateSecrets() const {
+ StartSchemaRequest(R"(
+ UPSERT OBJECT `accessKey` (TYPE SECRET) WITH (value = `secretAccessKey`);
+ UPSERT OBJECT `secretKey` (TYPE SECRET) WITH (value = `fakeSecret`);
+ )");
}
- const TString ConfigTiering1Str = R"({
- "rules" : [
- {
- "tierName" : "tier1",
- "durationForEvict" : "10d"
- },
- {
- "tierName" : "tier2",
- "durationForEvict" : "20d"
- }
- ]
- })";
-
- const TString ConfigTiering2Str = R"({
- "rules" : [
- {
- "tierName" : "tier1",
- "durationForEvict" : "10d"
- }
- ]
- })";
-
- const TString ConfigTieringNothingStr = R"({
- "rules" : [
- {
- "tierName" : "tier1",
- "durationForEvict" : "10000d"
- },
- {
- "tierName" : "tier2",
- "durationForEvict" : "20000d"
- }
- ]
- })";
-
- class TJsonChecker {
- private:
- YDB_ACCESSOR_DEF(TString, Path);
- YDB_ACCESSOR_DEF(TString, Expectation);
- public:
- TJsonChecker(const TString& path, const TString& expectation)
- : Path(path)
- , Expectation(expectation)
- {
+ void CreateExternalDataSource(const TString& name, const TString& location = "http://fake.fake/fake") const {
+ StartSchemaRequest(R"(
+ CREATE EXTERNAL DATA SOURCE `)" + name + R"(` WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION=")" + location + R"(",
+ AUTH_METHOD="AWS",
+ AWS_ACCESS_KEY_ID_SECRET_NAME="accessKey",
+ AWS_SECRET_ACCESS_KEY_SECRET_NAME="secretKey",
+ AWS_REGION="ru-central1"
+ );
+ )");
+ }
+};
- }
- bool Check(const NJson::TJsonValue& jsonInfo) const {
- auto* jsonPathValue = jsonInfo.GetValueByPath(Path);
- if (!jsonPathValue) {
- return Expectation == "__NULL";
- }
- return jsonPathValue->GetStringRobust() == Expectation;
- }
- TString GetDebugString() const {
- TStringBuilder sb;
- sb << "path=" << Path << ";"
- << "expectation=" << Expectation << ";";
- return sb;
- }
- };
+Y_UNIT_TEST_SUITE(ColumnShardTiers) {
class TTestCSEmulator: public NActors::TActorBootstrapped<TTestCSEmulator> {
private:
using TBase = NActors::TActorBootstrapped<TTestCSEmulator>;
- std::shared_ptr<NTiers::TSnapshotConstructor> ExternalDataManipulation;
- TActorId ProviderId;
+ THashSet<TString> ExpectedTiers;
TInstant Start;
- YDB_READONLY_FLAG(Found, false);
- YDB_ACCESSOR(ui32, ExpectedTiersCount, 1);
+ std::shared_ptr<TTiersManager> Manager;
- using TKeyCheckers = TMap<TString, TJsonChecker>;
- YDB_ACCESSOR_DEF(TKeyCheckers, Checkers);
public:
- void ResetConditions() {
- FoundFlag = false;
- Checkers.clear();
- }
-
STATEFN(StateInit) {
switch (ev->GetTypeRewrite()) {
- hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, Handle);
default:
Y_ABORT_UNLESS(false);
}
}
void CheckRuntime(TTestActorRuntime& runtime) {
- const auto pred = [this](TAutoPtr<IEventHandle>& event)->TTestActorRuntimeBase::EEventAction {
- if (event->HasBuffer() && !event->HasEvent()) {
- } else if (!event->HasEvent()) {
- } else {
- auto ptr = event->CastAsLocal<NMetadata::NProvider::TEvRefreshSubscriberData>();
- if (ptr) {
- CheckFound(ptr);
- }
- }
- return TTestActorRuntimeBase::EEventAction::PROCESS;
- };
-
- runtime.SetObserverFunc(pred);
-
for (const TInstant start = Now(); !IsFound() && Now() - start < TDuration::Seconds(30); ) {
runtime.SimulateSleep(TDuration::Seconds(1));
}
- runtime.SetObserverFunc(TTestActorRuntime::DefaultObserverFunc);
Y_ABORT_UNLESS(IsFound());
}
- void CheckFound(NMetadata::NProvider::TEvRefreshSubscriberData* event) {
- auto snapshot = event->GetSnapshotAs<NTiers::TTiersSnapshot>();
- if (!snapshot) {
- Cerr << "incorrect snapshot" << Endl;
- return;
+ bool IsFound() const {
+ if (!Manager) {
+ return false;
}
- Cerr << "SNAPSHOT: " << snapshot->SerializeToString() << Endl;
- if (ExpectedTiersCount != snapshot->GetTierConfigs().size()) {
- Cerr << "TiersCount incorrect: " << snapshot->SerializeToString() << ";expectation=" << ExpectedTiersCount << Endl;
- return;
+ THashSet notFoundTiers = ExpectedTiers;
+ for (const auto& [id, config] : Manager->GetTierConfigs()) {
+ notFoundTiers.erase(id);
}
- for (auto&& i : Checkers) {
- NJson::TJsonValue jsonData;
- if (i.first.StartsWith("TIER.")) {
- auto value = snapshot->GetTierById(i.first.substr(5));
- jsonData = value->SerializeConfigToJson();
- } else {
- Y_ABORT_UNLESS(false);
- }
- if (!i.second.Check(jsonData)) {
- Cerr << "config value incorrect:" << snapshot->SerializeToString() << ";snapshot_check_path=" << i.first << Endl;
- Cerr << "json path incorrect:" << jsonData << ";" << i.second.GetDebugString() << Endl;
- return;
- }
- }
- FoundFlag = true;
+ return notFoundTiers.empty();
}
- void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev) {
- CheckFound(ev->Get());
+ const THashMap<TString, NTiers::TTierConfig>& GetTierConfigs() {
+ return Manager->GetTierConfigs();
}
void Bootstrap() {
- ProviderId = NMetadata::NProvider::MakeServiceId(SelfId().NodeId());
- ExternalDataManipulation = std::make_shared<NTiers::TSnapshotConstructor>();
Become(&TThis::StateInit);
- Sender<NMetadata::NProvider::TEvSubscribeExternal>(ExternalDataManipulation).SendTo(ProviderId);
Start = Now();
+ Manager = std::make_shared<TTiersManager>(0, SelfId(), [](const TActorContext&) {
+ });
+ Manager->Start(Manager);
+ Manager->EnablePathId(0, ExpectedTiers);
+ }
+
+ TTestCSEmulator(THashSet<TString> expectedTiers)
+ : ExpectedTiers(std::move(expectedTiers)) {
}
};
@@ -323,6 +224,7 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
.SetUseRealThreads(false)
.SetEnableMetadataProvider(true)
.SetEnableTieringInColumnShard(true)
+ .SetEnableExternalDataSources(true)
;
Tests::TServer::TPtr server = new Tests::TServer(serverSettings);
@@ -339,55 +241,24 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
TLocalHelper lHelper(*server);
{
lHelper.CreateTestOlapTable();
- lHelper.StartSchemaRequest("CREATE OBJECT tier1 (TYPE TIER) WITH tierConfig = `" + GetConfigProtoWithName("abc") + "`");
- lHelper.StartSchemaRequest("CREATE OBJECT tier2 (TYPE TIER) WITH tierConfig = `" + GetConfigProtoWithName("abc") + "`");
- lHelper.StartSchemaRequest(R"(ALTER TABLE `/Root/olapStore/olapTable` SET TTL Interval("P10D") TO EXTERNAL DATA SOURCE tier1, Interval("P20D") TO EXTERNAL DATA SOURCE tier2 ON timestamp)");
+ lHelper.CreateSecrets();
+ lHelper.CreateExternalDataSource("/Root/tier1", "http://fake.fake/abc");
+ lHelper.CreateExternalDataSource("/Root/tier2", "http://fake.fake/abc");
+ lHelper.StartSchemaRequest(
+ R"(ALTER TABLE `/Root/olapStore/olapTable` SET TTL Interval("P10D") TO EXTERNAL DATA SOURCE `/Root/tier1`, Interval("P20D") TO EXTERNAL DATA SOURCE `/Root/tier2` ON timestamp)");
- TTestCSEmulator* emulator = new TTestCSEmulator();
- emulator->MutableCheckers().emplace("TIER.tier1", TJsonChecker("Name", "abc"));
- emulator->SetExpectedTiersCount(2);
- runtime.Register(emulator);
- runtime.SimulateSleep(TDuration::Seconds(10));
- Cerr << "Initialization finished" << Endl;
- {
- const TInstant start = Now();
- while (!emulator->IsFound() && Now() - start < TDuration::Seconds(2000)) {
- runtime.SimulateSleep(TDuration::Seconds(1));
- }
- Y_ABORT_UNLESS(emulator->IsFound());
- }
{
- emulator->ResetConditions();
- emulator->SetExpectedTiersCount(2);
- emulator->MutableCheckers().emplace("TIER.tier1", TJsonChecker("Name", "abc1"));
-
- lHelper.StartSchemaRequest("ALTER OBJECT tier1 (TYPE TIER) SET tierConfig = `" + GetConfigProtoWithName("abc1") + "`");
-
- {
- const TInstant start = Now();
- while (!emulator->IsFound() && Now() - start < TDuration::Seconds(2000)) {
- runtime.SimulateSleep(TDuration::Seconds(1));
- }
- Y_ABORT_UNLESS(emulator->IsFound());
- }
+ TTestCSEmulator* emulator = new TTestCSEmulator({ "/Root/tier1", "/Root/tier2" });
+ runtime.Register(emulator);
+ emulator->CheckRuntime(runtime);
+ UNIT_ASSERT_EQUAL(emulator->GetTierConfigs().at("/Root/tier1").GetProtoConfig().GetBucket(), "abc");
}
+ Cerr << "Initialization finished" << Endl;
{
- emulator->ResetConditions();
- emulator->SetExpectedTiersCount(0);
-
- // TODO: add validation
- // lHelper.StartSchemaRequest("DROP OBJECT tier1(TYPE TIER)", false);
+ lHelper.StartSchemaRequest("DROP EXTERNAL DATA SOURCE `/Root/tier1`", false);
lHelper.StartSchemaRequest("DROP TABLE `/Root/olapStore/olapTable`");
- lHelper.StartSchemaRequest("DROP OBJECT tier1(TYPE TIER)");
- lHelper.StartSchemaRequest("DROP OBJECT tier2(TYPE TIER)");
-
- {
- const TInstant start = Now();
- while (!emulator->IsFound() && Now() - start < TDuration::Seconds(20)) {
- runtime.SimulateSleep(TDuration::Seconds(1));
- }
- Y_ABORT_UNLESS(emulator->IsFound());
- }
+ lHelper.StartSchemaRequest("DROP EXTERNAL DATA SOURCE `/Root/tier1`");
+ lHelper.StartSchemaRequest("DROP EXTERNAL DATA SOURCE `/Root/tier2`");
}
}
}
@@ -409,6 +280,7 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
.SetUseRealThreads(false)
.SetEnableMetadataProvider(true)
.SetEnableTieringInColumnShard(true)
+ .SetEnableExternalDataSources(true)
.SetAppConfig(appConfig);
Tests::TServer::TPtr server = new Tests::TServer(serverSettings);
@@ -429,46 +301,38 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
runtime.SimulateSleep(TDuration::Seconds(10));
Cerr << "Initialization finished" << Endl;
- lHelper.StartSchemaRequest("CREATE OBJECT tier1 (TYPE TIER) WITH tierConfig = `" + GetConfigProtoWithName("abc1") + "`", true, false);
+ lHelper.CreateSecrets();
+ lHelper.CreateExternalDataSource("/Root/tier1", "http://fake.fake/abc1");
{
- TTestCSEmulator* emulator = new TTestCSEmulator;
+ TTestCSEmulator* emulator = new TTestCSEmulator({ "/Root/tier1" });
runtime.Register(emulator);
- emulator->MutableCheckers().emplace("TIER.tier1", TJsonChecker("Name", "abc1"));
- emulator->SetExpectedTiersCount(1);
emulator->CheckRuntime(runtime);
+ UNIT_ASSERT_EQUAL(emulator->GetTierConfigs().at("/Root/tier1").GetProtoConfig().GetBucket(), "abc1");
}
- lHelper.StartSchemaRequest("CREATE OBJECT tier2 (TYPE TIER) WITH tierConfig = `" + GetConfigProtoWithName("abc2") + "`");
+ lHelper.CreateExternalDataSource("/Root/tier2", "http://fake.fake/abc2");
{
- TTestCSEmulator* emulator = new TTestCSEmulator();
+ TTestCSEmulator* emulator = new TTestCSEmulator({ "/Root/tier1", "/Root/tier2" });
runtime.Register(emulator);
- emulator->MutableCheckers().emplace("TIER.tier1", TJsonChecker("Name", "abc1"));
- emulator->MutableCheckers().emplace("TIER.tier2", TJsonChecker("Name", "abc2"));
- emulator->SetExpectedTiersCount(2);
emulator->CheckRuntime(runtime);
+ UNIT_ASSERT_EQUAL(emulator->GetTierConfigs().at("/Root/tier1").GetProtoConfig().GetBucket(), "abc1");
+ UNIT_ASSERT_EQUAL(emulator->GetTierConfigs().at("/Root/tier2").GetProtoConfig().GetBucket(), "abc2");
}
lHelper.CreateTestOlapTable("olapTable");
- lHelper.StartSchemaRequest(R"(ALTER TABLE `/Root/olapStore/olapTable` SET TTL Interval("P10D") TO EXTERNAL DATA SOURCE tier1, Interval("P20D") TO EXTERNAL DATA SOURCE tier2 ON timestamp)");
+ lHelper.StartSchemaRequest(
+ R"(ALTER TABLE `/Root/olapStore/olapTable` SET TTL Interval("P10D") TO EXTERNAL DATA SOURCE `/Root/tier1`, Interval("P20D") TO EXTERNAL DATA SOURCE `/Root/tier2` ON timestamp)");
- // TODO: add validation
- // lHelper.StartSchemaRequest("DROP OBJECT tier2 (TYPE TIER)", false);
- // lHelper.StartSchemaRequest("DROP OBJECT tier1 (TYPE TIER)", false);
+ lHelper.StartSchemaRequest("DROP EXTERNAL DATA SOURCE `/Root/tier2`", false);
+ lHelper.StartSchemaRequest("DROP EXTERNAL DATA SOURCE `/Root/tier1`", false);
lHelper.StartSchemaRequest("DROP TABLE `/Root/olapStore/olapTable`");
{
- TTestCSEmulator* emulator = new TTestCSEmulator;
- runtime.Register(emulator);
- emulator->SetExpectedTiersCount(2);
- emulator->CheckRuntime(runtime);
- }
- lHelper.StartSchemaRequest("DROP OBJECT tier2 (TYPE TIER)");
- lHelper.StartSchemaRequest("DROP OBJECT tier1 (TYPE TIER)", true, false);
- {
- TTestCSEmulator* emulator = new TTestCSEmulator;
+ TTestCSEmulator* emulator = new TTestCSEmulator({ "/Root/tier1", "/Root/tier2" });
runtime.Register(emulator);
- emulator->SetExpectedTiersCount(0);
emulator->CheckRuntime(runtime);
}
+ lHelper.StartSchemaRequest("DROP EXTERNAL DATA SOURCE `/Root/tier2`");
+ lHelper.StartSchemaRequest("DROP EXTERNAL DATA SOURCE `/Root/tier1`");
//runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE);
//runtime.SetLogPriority(NKikimrServices::KQP_YQL, NLog::PRI_TRACE);
@@ -516,7 +380,7 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
SecretKey: "SId:secretSecretKey"
}
)";
- const TString TierEndpoint = "fake";
+ const TString TierEndpoint = "fake.fake";
#endif
Y_UNIT_TEST(TieringUsage) {
@@ -536,6 +400,7 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
.SetUseRealThreads(false)
.SetEnableMetadataProvider(true)
.SetEnableTieringInColumnShard(true)
+ .SetEnableExternalDataSources(true)
;
Tests::TServer::TPtr server = new Tests::TServer(serverSettings);
@@ -558,27 +423,21 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
TLocalHelper lHelper(*server);
lHelper.SetOptionalStorageId("__DEFAULT");
- lHelper.StartSchemaRequest("CREATE OBJECT secretAccessKey ( "
- "TYPE SECRET) WITH (value = ak)");
- lHelper.StartSchemaRequest("CREATE OBJECT secretSecretKey ( "
- "TYPE SECRET) WITH (value = fakeSecret)");
+ lHelper.CreateSecrets();
Singleton<NKikimr::NWrappers::NExternalStorage::TFakeExternalStorage>()->SetSecretKey("fakeSecret");
- lHelper.StartSchemaRequest("CREATE OBJECT tier1 ( "
- "TYPE TIER) WITH (tierConfig = `" + TierConfigProtoStr + "`)");
- lHelper.StartSchemaRequest("CREATE OBJECT tier2 ( "
- "TYPE TIER) WITH (tierConfig = `" + TierConfigProtoStr + "`)");
+ lHelper.CreateExternalDataSource("/Root/tier1", "http://" + TierEndpoint + "/fake");
+ lHelper.CreateExternalDataSource("/Root/tier2", "http://" + TierEndpoint + "/fake");
{
- TTestCSEmulator* emulator = new TTestCSEmulator();
+ TTestCSEmulator* emulator = new TTestCSEmulator({ "/Root/tier1", "/Root/tier2" });
runtime.Register(emulator);
- emulator->MutableCheckers().emplace("TIER.tier1", TJsonChecker("Name", "fakeTier"));
- emulator->MutableCheckers().emplace("TIER.tier2", TJsonChecker("ObjectStorage.Endpoint", TierEndpoint));
- emulator->SetExpectedTiersCount(2);
emulator->CheckRuntime(runtime);
+ UNIT_ASSERT_VALUES_EQUAL(emulator->GetTierConfigs().at("/Root/tier1").GetProtoConfig().GetEndpoint(), TierEndpoint);
}
lHelper.CreateTestOlapTable("olapTable", 2);
- lHelper.StartSchemaRequest(R"(ALTER TABLE `/Root/olapStore/olapTable` SET TTL Interval("P10D") TO EXTERNAL DATA SOURCE tier1, Interval("P20D") TO EXTERNAL DATA SOURCE tier2 ON timestamp)");
+ lHelper.StartSchemaRequest(
+ R"(ALTER TABLE `/Root/olapStore/olapTable` SET TTL Interval("P10D") TO EXTERNAL DATA SOURCE `/Root/tier1`, Interval("P20D") TO EXTERNAL DATA SOURCE `/Root/tier2` ON timestamp)");
Cerr << "Wait tables" << Endl;
runtime.SimulateSleep(TDuration::Seconds(20));
Cerr << "Initialization tables" << Endl;
@@ -624,7 +483,8 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
lHelper.DropTable("/Root/olapStore/olapTable");
lHelper.StartDataRequest("DELETE FROM `/Root/olapStore/olapTable`");
*/
- lHelper.StartSchemaRequest(R"(ALTER TABLE `/Root/olapStore/olapTable` SET TTL Interval("P10000D") TO EXTERNAL DATA SOURCE tier1, Interval("P20000D") TO EXTERNAL DATA SOURCE tier2 ON timestamp)");
+ lHelper.StartSchemaRequest(
+ R"(ALTER TABLE `/Root/olapStore/olapTable` SET TTL Interval("P10000D") TO EXTERNAL DATA SOURCE `/Root/tier1`, Interval("P20000D") TO EXTERNAL DATA SOURCE `/Root/tier2` ON timestamp)");
{
const TInstant start = Now();
bool check = false;
diff --git a/ydb/core/tx/tiering/ya.make b/ydb/core/tx/tiering/ya.make
index 4090ce51fb6..f64ae8bed4e 100644
--- a/ydb/core/tx/tiering/ya.make
+++ b/ydb/core/tx/tiering/ya.make
@@ -3,8 +3,7 @@ LIBRARY()
SRCS(
common.cpp
manager.cpp
- GLOBAL external_data.cpp
- snapshot.cpp
+ fetcher.cpp
)
IF (OS_WINDOWS)
@@ -18,6 +17,7 @@ PEERDIR(
library/cpp/json/writer
ydb/core/blobstorage
ydb/core/protos
+ ydb/core/tx/columnshard/hooks/abstract
ydb/core/tx/schemeshard
ydb/core/tx/tiering/tier
ydb/core/tablet_flat/protos
@@ -27,6 +27,8 @@ PEERDIR(
ydb/services/metadata
)
+YQL_LAST_ABI_VERSION()
+
END()
RECURSE_FOR_TESTS(
diff --git a/ydb/core/wrappers/abstract.cpp b/ydb/core/wrappers/abstract.cpp
index c13cd9bc6e0..89f799b4ac9 100644
--- a/ydb/core/wrappers/abstract.cpp
+++ b/ydb/core/wrappers/abstract.cpp
@@ -9,7 +9,7 @@ IExternalStorageOperator::TPtr IExternalStorageConfig::ConstructStorageOperator(
}
IExternalStorageConfig::TPtr IExternalStorageConfig::Construct(const NKikimrSchemeOp::TS3Settings& settings) {
- if (settings.GetEndpoint() == "fake") {
+ if (settings.GetEndpoint() == "fake.fake") {
return std::make_shared<TFakeExternalStorageConfig>(settings.GetBucket(), settings.GetSecretKey());
} else {
return std::make_shared<TS3ExternalStorageConfig>(settings);
diff --git a/ydb/core/wrappers/fake_storage.h b/ydb/core/wrappers/fake_storage.h
index fff953f8203..472197a4eda 100644
--- a/ydb/core/wrappers/fake_storage.h
+++ b/ydb/core/wrappers/fake_storage.h
@@ -138,7 +138,7 @@ private:
template <class TEvent>
void ExecuteImpl(TEvent& ev) const {
ev->Get()->MutableRequest().WithBucket(Bucket);
- Y_ABORT_UNLESS(SecretKey == Singleton<TFakeExternalStorage>()->GetSecretKey());
+ Y_ABORT_UNLESS(SecretKey == Singleton<TFakeExternalStorage>()->GetSecretKey(), "%s != %s", SecretKey.data(), Singleton<TFakeExternalStorage>()->GetSecretKey().data());
if (OwnedStorage) {
OwnedStorage->Execute(ev, ReplyAdapter);
} else {
diff --git a/ydb/services/metadata/secret/accessor/secret_id.cpp b/ydb/services/metadata/secret/accessor/secret_id.cpp
new file mode 100644
index 00000000000..0dc4e50e8a8
--- /dev/null
+++ b/ydb/services/metadata/secret/accessor/secret_id.cpp
@@ -0,0 +1,32 @@
+#include "secret_id.h"
+
+#include <ydb/services/metadata/manager/ydb_value_operator.h>
+#include <library/cpp/digest/md5/md5.h>
+
+namespace NKikimr::NMetadata::NSecret {
+
+TString TSecretId::SerializeToString() const {
+ TStringBuilder sb;
+ sb << "USId:" << OwnerUserId << ":" << SecretId;
+ return sb;
+}
+
+TString TSecretIdOrValue::DebugString() const {
+ return std::visit(TOverloaded(
+ [](std::monostate) -> TString{
+ return "__NONE__";
+ },
+ [](const TSecretId& id) -> TString{
+ return id.SerializeToString();
+ },
+ [](const TSecretName& name) -> TString{
+ return name.SerializeToString();
+ },
+ [](const TString& value) -> TString{
+ return MD5::Calc(value);
+ }
+ ),
+ State);
+}
+
+}
diff --git a/ydb/services/metadata/secret/accessor/secret_id.h b/ydb/services/metadata/secret/accessor/secret_id.h
new file mode 100644
index 00000000000..f972bd6c59a
--- /dev/null
+++ b/ydb/services/metadata/secret/accessor/secret_id.h
@@ -0,0 +1,223 @@
+#pragma once
+#include <ydb/core/base/appdata.h>
+
+#include <ydb/library/accessor/accessor.h>
+#include <util/generic/overloaded.h>
+
+namespace NKikimr::NMetadata::NSecret {
+
+class TSecretId {
+private:
+ YDB_READONLY_PROTECT_DEF(TString, OwnerUserId);
+ YDB_READONLY_PROTECT_DEF(TString, SecretId);
+
+public:
+ inline static const TString PrefixWithUser = "USId:";
+
+ TSecretId() = default;
+ TSecretId(const TString& ownerUserId, const TString& secretId)
+ : OwnerUserId(ownerUserId)
+ , SecretId(secretId) {
+ }
+
+ TSecretId(const TStringBuf ownerUserId, const TStringBuf secretId)
+ : OwnerUserId(ownerUserId)
+ , SecretId(secretId) {
+ }
+
+ TString SerializeToString() const;
+
+ template <class TProto>
+ TString BuildSecretAccessString(const TProto& proto, const TString& defaultOwnerId) {
+ if (proto.HasValue()) {
+ return proto.GetValue();
+ } else {
+ return TStringBuilder() << PrefixWithUser << (proto.GetSecretOwnerId() ? proto.GetSecretOwnerId() : defaultOwnerId) << ":" << SecretId;
+ }
+ }
+
+ bool operator<(const TSecretId& item) const {
+ return std::tie(OwnerUserId, SecretId) < std::tie(item.OwnerUserId, item.SecretId);
+ }
+ bool operator==(const TSecretId& item) const {
+ return std::tie(OwnerUserId, SecretId) == std::tie(item.OwnerUserId, item.SecretId);
+ }
+};
+
+class TSecretName {
+private:
+ YDB_READONLY_DEF(TString, SecretId);
+
+public:
+ inline static const TString PrefixNoUser = "SId:";
+
+ TSecretName() = default;
+ TSecretName(const TString& secretId) : SecretId(secretId) {}
+
+ TString SerializeToString() const {
+ return TStringBuilder() << "SId:" << SecretId;
+ }
+
+ bool DeserializeFromString(const TString& secretString) {
+ if (secretString.StartsWith(PrefixNoUser)) {
+ SecretId = secretString.substr(PrefixNoUser.size());
+ return true;
+ }
+ return false;
+ }
+};
+
+class TSecretIdOrValue {
+private:
+ using TState = std::variant<std::monostate, TSecretId, TSecretName, TString>;
+ YDB_READONLY_DEF(TState, State);
+
+private:
+ TSecretIdOrValue() = default;
+
+ bool DeserializeFromStringImpl(const TString& info, const TString& defaultUserId = "") {
+ if (info.StartsWith(TSecretId::PrefixWithUser)) {
+ TStringBuf sb(info.data(), info.size());
+ sb.Skip(TSecretId::PrefixWithUser.size());
+ TStringBuf uId;
+ TStringBuf sId;
+ if (!sb.TrySplit(':', uId, sId)) {
+ return false;
+ }
+ if (!uId || !sId) {
+ return false;
+ }
+ State = TSecretId(uId, sId);
+ } else if (info.StartsWith(TSecretName::PrefixNoUser)) {
+ TStringBuf sb(info.data(), info.size());
+ sb.Skip(TSecretName::PrefixNoUser.size());
+ if (!sb) {
+ return false;
+ }
+ if (defaultUserId) {
+ State = TSecretId(defaultUserId, TString(sb));
+ } else {
+ State = TSecretName(TString(sb));
+ }
+ } else {
+ State = info;
+ }
+ return true;
+ }
+
+ explicit TSecretIdOrValue(const TSecretId& id)
+ : State(id) {
+ }
+ explicit TSecretIdOrValue(const TSecretName& id)
+ : State(id) {
+ }
+ explicit TSecretIdOrValue(const TString& value)
+ : State(value) {
+ }
+
+public:
+ bool operator!() const {
+ return std::holds_alternative<std::monostate>(State);
+ }
+
+ static TSecretIdOrValue BuildAsValue(const TString& value) {
+ return TSecretIdOrValue(value);
+ }
+
+ static TSecretIdOrValue BuildEmpty() {
+ return TSecretIdOrValue();
+ }
+
+ static TSecretIdOrValue BuildAsId(const TSecretId& id) {
+ return TSecretIdOrValue(id);
+ }
+
+ static TSecretIdOrValue BuildAsId(const TSecretName& id) {
+ return TSecretIdOrValue(id);
+ }
+
+ static std::optional<TSecretIdOrValue> DeserializeFromOptional(
+ const NKikimrSchemeOp::TSecretableVariable& proto, const TString& secretInfo, const TString& defaultOwnerId = Default<TString>()) {
+ if (proto.HasSecretId()) {
+ return DeserializeFromProto(proto, defaultOwnerId);
+ } else if (proto.HasValue()) {
+ return DeserializeFromString(proto.GetValue().GetData());
+ }
+ if (secretInfo) {
+ return DeserializeFromString(secretInfo, defaultOwnerId);
+ } else {
+ return {};
+ }
+ }
+
+ NKikimrSchemeOp::TSecretableVariable SerializeToProto() const {
+ NKikimrSchemeOp::TSecretableVariable result;
+ std::visit(TOverloaded(
+ [](std::monostate){ },
+ [&result](const TSecretId& id){
+ result.MutableSecretId()->SetId(id.GetSecretId());
+ result.MutableSecretId()->SetOwnerId(id.GetOwnerUserId());
+ },
+ [&result](const TSecretName& name){
+ result.MutableSecretId()->SetId(name.GetSecretId());
+ },
+ [&result](const TString& value){
+ result.MutableValue()->SetData(value);
+ }
+ ),
+ State);
+ return result;
+ }
+
+ static std::optional<TSecretIdOrValue> DeserializeFromProto(
+ const NKikimrSchemeOp::TSecretableVariable& proto, const TString& defaultOwnerId = Default<TString>()) {
+ if (proto.HasSecretId()) {
+ TString ownerId;
+ TString secretId;
+ if (!proto.GetSecretId().HasOwnerId() || !proto.GetSecretId().GetOwnerId()) {
+ ownerId = defaultOwnerId;
+ } else {
+ ownerId = proto.GetSecretId().GetOwnerId();
+ }
+ secretId = proto.GetSecretId().GetId();
+ if (!ownerId || !secretId) {
+ return {};
+ }
+ return TSecretIdOrValue::BuildAsId(TSecretId(ownerId, secretId));
+ } else if (proto.HasValue()) {
+ return TSecretIdOrValue::BuildAsValue(proto.GetValue().GetData());
+ } else {
+ return {};
+ }
+ }
+
+ static std::optional<TSecretIdOrValue> DeserializeFromString(const TString& info, const TString& defaultOwnerId = Default<TString>()) {
+ TSecretIdOrValue result;
+ if (!result.DeserializeFromStringImpl(info, defaultOwnerId)) {
+ return {};
+ } else {
+ return result;
+ }
+ }
+
+ TString SerializeToString() const {
+ return std::visit(TOverloaded(
+ [](std::monostate) -> TString{
+ return "";
+ },
+ [](const TSecretId& id) -> TString{
+ return TStringBuilder() << TSecretId::PrefixWithUser << id.GetOwnerUserId() << ":" << id.GetSecretId();
+ },
+ [](const TSecretName& name) -> TString{
+ return TStringBuilder() << TSecretName::PrefixNoUser << name.GetSecretId();
+ },
+ [](const TString& value) -> TString{
+ return value;
+ }
+ ),
+ State);
+ }
+
+ TString DebugString() const;
+};
+} // namespace NKikimr::NMetadata::NSecret
diff --git a/ydb/services/metadata/secret/accessor/snapshot.h b/ydb/services/metadata/secret/accessor/snapshot.h
new file mode 100644
index 00000000000..1c8d0179519
--- /dev/null
+++ b/ydb/services/metadata/secret/accessor/snapshot.h
@@ -0,0 +1,18 @@
+#pragma once
+
+#include "secret_id.h"
+
+#include <ydb/library/aclib/aclib.h>
+#include <ydb/library/conclusion/result.h>
+
+namespace NKikimr::NMetadata::NSecret {
+
+class ISecretAccessor {
+public:
+ virtual bool CheckSecretAccess(const TSecretIdOrValue& sIdOrValue, const NACLib::TUserToken& userToken) const = 0;
+ virtual bool PatchString(TString& stringForPath) const = 0;
+ virtual TConclusion<TString> GetSecretValue(const TSecretIdOrValue& secretId) const = 0;
+ virtual std::vector<TSecretId> GetSecretIds(const std::optional<NACLib::TUserToken>& userToken, const TString& secretId) const = 0;
+};
+
+} // namespace NKikimr::NMetadata::NSecret
diff --git a/ydb/services/metadata/secret/accessor/ya.make b/ydb/services/metadata/secret/accessor/ya.make
new file mode 100644
index 00000000000..5c748f1c2bd
--- /dev/null
+++ b/ydb/services/metadata/secret/accessor/ya.make
@@ -0,0 +1,13 @@
+LIBRARY()
+
+SRCS(
+ secret_id.cpp
+)
+
+PEERDIR(
+ ydb/core/base
+ ydb/library/actors/core
+ ydb/library/aclib
+)
+
+END()
diff --git a/ydb/services/metadata/secret/secret.cpp b/ydb/services/metadata/secret/secret.cpp
index 86cf163da3e..ddab3fdaf3c 100644
--- a/ydb/services/metadata/secret/secret.cpp
+++ b/ydb/services/metadata/secret/secret.cpp
@@ -32,28 +32,4 @@ IClassBehaviour::TPtr TSecret::GetBehaviour() {
return TSecretBehaviour::GetInstance();
}
-TString TSecretId::SerializeToString() const {
- TStringBuilder sb;
- sb << "USId:" << OwnerUserId << ":" << SecretId;
- return sb;
-}
-
-TString TSecretIdOrValue::DebugString() const {
- return std::visit(TOverloaded(
- [](std::monostate) -> TString{
- return "__NONE__";
- },
- [](const TSecretId& id) -> TString{
- return id.SerializeToString();
- },
- [](const TSecretName& name) -> TString{
- return name.SerializeToString();
- },
- [](const TString& value) -> TString{
- return MD5::Calc(value);
- }
- ),
- State);
-}
-
}
diff --git a/ydb/services/metadata/secret/secret.h b/ydb/services/metadata/secret/secret.h
index 7511cbb5603..8cc936766c2 100644
--- a/ydb/services/metadata/secret/secret.h
+++ b/ydb/services/metadata/secret/secret.h
@@ -5,224 +5,10 @@
#include <ydb/services/metadata/abstract/decoder.h>
#include <ydb/services/metadata/manager/object.h>
#include <ydb/services/metadata/manager/preparation_controller.h>
+#include <ydb/services/metadata/secret/accessor/secret_id.h>
namespace NKikimr::NMetadata::NSecret {
-class TSecretId {
-private:
- YDB_READONLY_PROTECT_DEF(TString, OwnerUserId);
- YDB_READONLY_PROTECT_DEF(TString, SecretId);
-
-public:
- inline static const TString PrefixWithUser = "USId:";
-
- TSecretId() = default;
- TSecretId(const TString& ownerUserId, const TString& secretId)
- : OwnerUserId(ownerUserId)
- , SecretId(secretId) {
- }
-
- TSecretId(const TStringBuf ownerUserId, const TStringBuf secretId)
- : OwnerUserId(ownerUserId)
- , SecretId(secretId) {
- }
-
- TString SerializeToString() const;
-
- template <class TProto>
- TString BuildSecretAccessString(const TProto& proto, const TString& defaultOwnerId) {
- if (proto.HasValue()) {
- return proto.GetValue();
- } else {
- return TStringBuilder() << PrefixWithUser << (proto.GetSecretOwnerId() ? proto.GetSecretOwnerId() : defaultOwnerId) << ":" << SecretId;
- }
- }
-
- bool operator<(const TSecretId& item) const {
- return std::tie(OwnerUserId, SecretId) < std::tie(item.OwnerUserId, item.SecretId);
- }
- bool operator==(const TSecretId& item) const {
- return std::tie(OwnerUserId, SecretId) == std::tie(item.OwnerUserId, item.SecretId);
- }
-};
-
-class TSecretName {
-private:
- YDB_READONLY_DEF(TString, SecretId);
-
-public:
- inline static const TString PrefixNoUser = "SId:";
-
- TSecretName() = default;
- TSecretName(const TString& secretId) : SecretId(secretId) {}
-
- TString SerializeToString() const {
- return TStringBuilder() << "SId:" << SecretId;
- }
-
- bool DeserializeFromString(const TString& secretString) {
- if (secretString.StartsWith(PrefixNoUser)) {
- SecretId = secretString.substr(PrefixNoUser.size());
- return true;
- }
- return false;
- }
-};
-
-class TSecretIdOrValue {
-private:
- using TState = std::variant<std::monostate, TSecretId, TSecretName, TString>;
- YDB_READONLY_DEF(TState, State);
-
-private:
- TSecretIdOrValue() = default;
-
- bool DeserializeFromStringImpl(const TString& info, const TString& defaultUserId = "") {
- if (info.StartsWith(TSecretId::PrefixWithUser)) {
- TStringBuf sb(info.data(), info.size());
- sb.Skip(TSecretId::PrefixWithUser.size());
- TStringBuf uId;
- TStringBuf sId;
- if (!sb.TrySplit(':', uId, sId)) {
- return false;
- }
- if (!uId || !sId) {
- return false;
- }
- State = TSecretId(uId, sId);
- } else if (info.StartsWith(TSecretName::PrefixNoUser)) {
- TStringBuf sb(info.data(), info.size());
- sb.Skip(TSecretName::PrefixNoUser.size());
- if (!sb) {
- return false;
- }
- if (defaultUserId) {
- State = TSecretId(defaultUserId, TString(sb));
- } else {
- State = TSecretName(TString(sb));
- }
- } else {
- State = info;
- }
- return true;
- }
-
- explicit TSecretIdOrValue(const TSecretId& id)
- : State(id) {
- }
- explicit TSecretIdOrValue(const TSecretName& id)
- : State(id) {
- }
- explicit TSecretIdOrValue(const TString& value)
- : State(value) {
- }
-
-public:
- bool operator!() const {
- return std::holds_alternative<std::monostate>(State);
- }
-
- static TSecretIdOrValue BuildAsValue(const TString& value) {
- return TSecretIdOrValue(value);
- }
-
- static TSecretIdOrValue BuildEmpty() {
- return TSecretIdOrValue();
- }
-
- static TSecretIdOrValue BuildAsId(const TSecretId& id) {
- return TSecretIdOrValue(id);
- }
-
- static TSecretIdOrValue BuildAsId(const TSecretName& id) {
- return TSecretIdOrValue(id);
- }
-
- static std::optional<TSecretIdOrValue> DeserializeFromOptional(
- const NKikimrSchemeOp::TSecretableVariable& proto, const TString& secretInfo, const TString& defaultOwnerId = Default<TString>()) {
- if (proto.HasSecretId()) {
- return DeserializeFromProto(proto, defaultOwnerId);
- } else if (proto.HasValue()) {
- return DeserializeFromString(proto.GetValue().GetData());
- }
- if (secretInfo) {
- return DeserializeFromString(secretInfo, defaultOwnerId);
- } else {
- return {};
- }
- }
-
- NKikimrSchemeOp::TSecretableVariable SerializeToProto() const {
- NKikimrSchemeOp::TSecretableVariable result;
- std::visit(TOverloaded(
- [](std::monostate){ },
- [&result](const TSecretId& id){
- result.MutableSecretId()->SetId(id.GetSecretId());
- result.MutableSecretId()->SetOwnerId(id.GetOwnerUserId());
- },
- [&result](const TSecretName& name){
- result.MutableSecretId()->SetId(name.GetSecretId());
- },
- [&result](const TString& value){
- result.MutableValue()->SetData(value);
- }
- ),
- State);
- return result;
- }
-
- static std::optional<TSecretIdOrValue> DeserializeFromProto(
- const NKikimrSchemeOp::TSecretableVariable& proto, const TString& defaultOwnerId = Default<TString>()) {
- if (proto.HasSecretId()) {
- TString ownerId;
- TString secretId;
- if (!proto.GetSecretId().HasOwnerId() || !proto.GetSecretId().GetOwnerId()) {
- ownerId = defaultOwnerId;
- } else {
- ownerId = proto.GetSecretId().GetOwnerId();
- }
- secretId = proto.GetSecretId().GetId();
- if (!ownerId || !secretId) {
- return {};
- }
- return TSecretIdOrValue::BuildAsId(TSecretId(ownerId, secretId));
- } else if (proto.HasValue()) {
- return TSecretIdOrValue::BuildAsValue(proto.GetValue().GetData());
- } else {
- return {};
- }
- }
-
- static std::optional<TSecretIdOrValue> DeserializeFromString(const TString& info, const TString& defaultOwnerId = Default<TString>()) {
- TSecretIdOrValue result;
- if (!result.DeserializeFromStringImpl(info, defaultOwnerId)) {
- return {};
- } else {
- return result;
- }
- }
-
- TString SerializeToString() const {
- return std::visit(TOverloaded(
- [](std::monostate) -> TString{
- return "";
- },
- [](const TSecretId& id) -> TString{
- return TStringBuilder() << TSecretId::PrefixWithUser << id.GetOwnerUserId() << ":" << id.GetSecretId();
- },
- [](const TSecretName& name) -> TString{
- return TStringBuilder() << TSecretName::PrefixNoUser << name.GetSecretId();
- },
- [](const TString& value) -> TString{
- return value;
- }
- ),
- State);
- }
-
- TString DebugString() const;
-};
-
class TSecret: public TSecretId, public NModifications::TObject<TSecret> {
private:
using TBase = TSecretId;
diff --git a/ydb/services/metadata/secret/snapshot.h b/ydb/services/metadata/secret/snapshot.h
index 92aabfc1743..b53ce0c2c4f 100644
--- a/ydb/services/metadata/secret/snapshot.h
+++ b/ydb/services/metadata/secret/snapshot.h
@@ -3,11 +3,12 @@
#include "access.h"
#include <ydb/services/metadata/abstract/common.h>
+#include <ydb/services/metadata/secret/accessor/snapshot.h>
#include <ydb/library/accessor/accessor.h>
namespace NKikimr::NMetadata::NSecret {
-class TSnapshot: public NFetcher::ISnapshot {
+class TSnapshot: public NFetcher::ISnapshot, public ISecretAccessor {
private:
using TBase = NFetcher::ISnapshot;
using TSecrets = std::map<TSecretId, TSecret>;
@@ -22,10 +23,10 @@ protected:
virtual TString DoSerializeToString() const override;
public:
using TBase::TBase;
- bool CheckSecretAccess(const TSecretIdOrValue& sIdOrValue, const NACLib::TUserToken& userToken) const;
- bool PatchString(TString& stringForPath) const;
- TConclusion<TString> GetSecretValue(const TSecretIdOrValue& secretId) const;
- std::vector<TSecretId> GetSecretIds(const std::optional<NACLib::TUserToken>& userToken, const TString& secretId) const;
+ bool CheckSecretAccess(const TSecretIdOrValue& sIdOrValue, const NACLib::TUserToken& userToken) const override;
+ bool PatchString(TString& stringForPath) const override;
+ TConclusion<TString> GetSecretValue(const TSecretIdOrValue& secretId) const override;
+ std::vector<TSecretId> GetSecretIds(const std::optional<NACLib::TUserToken>& userToken, const TString& secretId) const override;
};
}
diff --git a/ydb/services/metadata/secret/ya.make b/ydb/services/metadata/secret/ya.make
index e44e3e3152d..6cfd391a736 100644
--- a/ydb/services/metadata/secret/ya.make
+++ b/ydb/services/metadata/secret/ya.make
@@ -21,6 +21,7 @@ PEERDIR(
ydb/core/grpc_services/base
ydb/core/grpc_services
ydb/services/metadata/request
+ ydb/services/metadata/secret/accessor
)
END()