aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2022-10-28 19:16:53 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2022-10-28 19:16:53 +0300
commit7d8f9f654230732d2e18e98753e6f07ad3269fee (patch)
tree7791f5a3e9766c5d39f1da596c127578b44bcd4e
parent6bbe00187e61d0039d6f8dbc103a438594411983 (diff)
downloadydb-7d8f9f654230732d2e18e98753e6f07ad3269fee.tar.gz
tiering throught metadata from datashard table
-rw-r--r--ydb/core/grpc_services/rpc_log_store.cpp19
-rw-r--r--ydb/core/protos/flat_scheme_op.proto7
-rw-r--r--ydb/core/tx/CMakeLists.txt1
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.txt6
-rw-r--r--ydb/core/tx/columnshard/columnshard.cpp5
-rw-r--r--ydb/core/tx/columnshard/columnshard__progress_tx.cpp1
-rw-r--r--ydb/core/tx/columnshard/columnshard__write_index.cpp6
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp152
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h50
-rw-r--r--ydb/core/tx/columnshard/columnshard_private_events.cpp4
-rw-r--r--ydb/core/tx/columnshard/columnshard_private_events.h5
-rw-r--r--ydb/core/tx/columnshard/columnshard_ttl.h19
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.cpp39
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.h64
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.cpp12
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h2
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.cpp8
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.h8
-rw-r--r--ydb/core/tx/columnshard/external_data.cpp91
-rw-r--r--ydb/core/tx/columnshard/external_data.h66
-rw-r--r--ydb/core/tx/columnshard/ut_columnshard_schema.cpp169
-rw-r--r--ydb/core/tx/columnshard/ut_columnshard_tiers.cpp281
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp7
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp10
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp16
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.cpp3
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.h2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_validate_ttl.cpp55
-rw-r--r--ydb/core/tx/schemeshard/ut_olap.cpp18
-rw-r--r--ydb/core/tx/tiering/CMakeLists.txt30
-rw-r--r--ydb/core/tx/tiering/decoder.cpp43
-rw-r--r--ydb/core/tx/tiering/decoder.h16
-rw-r--r--ydb/core/tx/tiering/external_data.cpp240
-rw-r--r--ydb/core/tx/tiering/external_data.h105
-rw-r--r--ydb/core/tx/tiering/manager.cpp234
-rw-r--r--ydb/core/tx/tiering/manager.h69
-rw-r--r--ydb/core/tx/tiering/rule.cpp26
-rw-r--r--ydb/core/tx/tiering/rule.h113
-rw-r--r--ydb/core/tx/tiering/s3_actor.cpp (renamed from ydb/core/tx/columnshard/s3_actor.cpp)6
-rw-r--r--ydb/core/tx/tiering/s3_actor.h12
-rw-r--r--ydb/core/tx/tiering/tier_config.cpp32
-rw-r--r--ydb/core/tx/tiering/tier_config.h55
-rw-r--r--ydb/public/api/protos/draft/ydb_logstore.proto2
-rw-r--r--ydb/public/lib/experimental/ydb_logstore.cpp23
-rw-r--r--ydb/public/lib/experimental/ydb_logstore.h19
-rw-r--r--ydb/services/metadata/abstract/common.cpp22
-rw-r--r--ydb/services/metadata/abstract/common.h28
-rw-r--r--ydb/services/metadata/ds_table/accessor_refresh.cpp39
-rw-r--r--ydb/services/metadata/ds_table/accessor_refresh.h27
-rw-r--r--ydb/services/metadata/ds_table/service.cpp6
-rw-r--r--ydb/services/ydb/ydb_logstore_ut.cpp62
51 files changed, 1567 insertions, 768 deletions
diff --git a/ydb/core/grpc_services/rpc_log_store.cpp b/ydb/core/grpc_services/rpc_log_store.cpp
index ea15e6a2f4b..f28c4dbd5d9 100644
--- a/ydb/core/grpc_services/rpc_log_store.cpp
+++ b/ydb/core/grpc_services/rpc_log_store.cpp
@@ -241,14 +241,7 @@ private:
LOG_DEBUG(ctx, NKikimrServices::GRPC_SERVER, "LogStore schema error: %s", error.c_str());
return Reply(status, error, NKikimrIssues::TIssuesIds::DEFAULT_ERROR, ctx);
}
- for (const auto& tier : req->tiers()) {
- auto* toTier = toSchemaPreset->MutableSchema()->AddStorageTiers();
- toTier->SetName(tier.name());
- if (!ConvertCompressionFromPublicToInternal(tier.compression(), *toTier->MutableCompression(), error)) {
- LOG_DEBUG(ctx, NKikimrServices::GRPC_SERVER, "LogStore schema error: %s", error.c_str());
- return Reply(status, error, NKikimrIssues::TIssuesIds::DEFAULT_ERROR, ctx);
- }
- }
+ toSchemaPreset->MutableSchema()->SetEnableTiering(req->enable_tiering());
}
ctx.Send(MakeTxProxyID(), proposeRequest.release());
}
@@ -297,7 +290,6 @@ private:
const auto& storeDescription = pathDescription.GetColumnStoreDescription();
describeLogStoreResult.set_shards_count(storeDescription.GetColumnShardCount());
- bool firstPreset = true;
for (const auto& schemaPreset : storeDescription.GetSchemaPresets()) {
auto* toSchemaPreset = describeLogStoreResult.add_schema_presets();
toSchemaPreset->set_name(schemaPreset.GetName());
@@ -307,15 +299,6 @@ private:
LOG_DEBUG(ctx, NKikimrServices::GRPC_SERVER, "LogStore schema error: %s", error.c_str());
return Reply(status, error, NKikimrIssues::TIssuesIds::DEFAULT_ERROR, ctx);
}
- if (firstPreset) {
- // Preset's tiers are the same. We take first ones.
- firstPreset = false;
- for (auto& tier : schemaPreset.GetSchema().GetStorageTiers()) {
- auto* to = describeLogStoreResult.add_tiers();
- to->set_name(tier.GetName());
- ConvertCompressionFromInternalToPublic(tier.GetCompression(), *to->mutable_compression());
- }
- }
}
return ReplyWithResult(Ydb::StatusIds::SUCCESS, describeLogStoreResult, ctx);
}
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index 6aae69d872b..f5e3384bab4 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -426,10 +426,9 @@ message TColumnTableSchema {
//optional int32 DefaultCompressionLevel = 7; // deprecated, not used before replace
optional TCompressionOptions DefaultCompression = 8;
- // Storage tiers specification (not ordered list).
- // You could use any combination of the tiers in TColumnDataLifeCycle.Tiering in any order.
- // If storage tier's Compression is not set it would be derived from table's DefaultCompression.
- repeated TStorageTierConfig StorageTiers = 9;
+ // Storage tiers usage flag.
+ // Tiers detail information contains in storage_path/parent/.external_info/tiers
+ optional bool EnableTiering = 10 [default = false];
}
message TAlterColumnTableSchema {
diff --git a/ydb/core/tx/CMakeLists.txt b/ydb/core/tx/CMakeLists.txt
index 2044b8b5d62..6b91efc22a8 100644
--- a/ydb/core/tx/CMakeLists.txt
+++ b/ydb/core/tx/CMakeLists.txt
@@ -18,6 +18,7 @@ add_subdirectory(scheme_cache)
add_subdirectory(schemeshard)
add_subdirectory(sequenceproxy)
add_subdirectory(sequenceshard)
+add_subdirectory(tiering)
add_subdirectory(time_cast)
add_subdirectory(tx_allocator)
add_subdirectory(tx_allocator_client)
diff --git a/ydb/core/tx/columnshard/CMakeLists.txt b/ydb/core/tx/columnshard/CMakeLists.txt
index f1c69be237d..ccf0af1ba82 100644
--- a/ydb/core/tx/columnshard/CMakeLists.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.txt
@@ -18,7 +18,6 @@ target_link_libraries(core-tx-columnshard PUBLIC
yutil
tools-enum_parser-enum_serialization_runtime
cpp-actors-core
- ydb-services-metadata
ydb-core-actorlib_impl
ydb-core-base
core-blobstorage-dsproxy
@@ -29,9 +28,9 @@ target_link_libraries(core-tx-columnshard PUBLIC
ydb-core-tablet
ydb-core-tablet_flat
tx-columnshard-engines
+ core-tx-tiering
tx-long_tx_service-public
ydb-core-util
- ydb-core-wrappers
api-protos
dq-actors-compute
)
@@ -59,14 +58,13 @@ target_sources(core-tx-columnshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_costs.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_private_events.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/compaction_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/eviction_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/export_actor.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/external_data.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/indexing_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/read_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/write_actor.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/s3_actor.cpp
)
generate_enum_serilization(core-tx-columnshard
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard.h
diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp
index fe67c9360ac..67aedff28cb 100644
--- a/ydb/core/tx/columnshard/columnshard.cpp
+++ b/ydb/core/tx/columnshard/columnshard.cpp
@@ -17,7 +17,9 @@ void TColumnShard::BecomeBroken(const TActorContext& ctx)
ctx.Send(IndexingActor, new TEvents::TEvPoisonPill);
ctx.Send(CompactionActor, new TEvents::TEvPoisonPill);
ctx.Send(EvictionActor, new TEvents::TEvPoisonPill);
- StopS3Actors(ctx);
+ if (Tiers) {
+ Tiers->Stop();
+ }
}
void TColumnShard::SwitchToWork(const TActorContext& ctx) {
@@ -27,7 +29,6 @@ void TColumnShard::SwitchToWork(const TActorContext& ctx) {
IndexingActor = ctx.Register(CreateIndexingActor(TabletID(), ctx.SelfID));
CompactionActor = ctx.Register(CreateCompactionActor(TabletID(), ctx.SelfID));
EvictionActor = ctx.Register(CreateEvictionActor(TabletID(), ctx.SelfID));
- InitS3Actors(ctx, true);
SignalTabletActive(ctx);
}
diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
index 6e69e03d0ca..2c4a184244a 100644
--- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
+++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
@@ -76,7 +76,6 @@ public:
MakeHolder<TEvColumnShard::TEvNotifyTxCompletionResult>(Self->TabletID(), txId));
}
Self->AltersInFlight.erase(txId);
- Self->InitS3Actors(ctx, false);
break;
}
case NKikimrTxColumnShard::TX_KIND_COMMIT: {
diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp
index 110148fafc4..a972c1e7022 100644
--- a/ydb/core/tx/columnshard/columnshard__write_index.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp
@@ -128,10 +128,8 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
}
// Mark exported blobs
- Y_VERIFY(Self->TierConfigs.count(tierName));
- auto& config = Self->TierConfigs[tierName];
-
- if (config.NeedExport()) {
+ auto& tManager = Self->GetTierManagerVerified(tierName);
+ if (tManager.NeedExport()) {
for (auto& rec : portionInfo.Records) {
auto& blobId = rec.BlobRange.BlobId;
if (!BlobsToExport.count(blobId)) {
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 4fb7540df8b..ac332a83127 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -1,8 +1,10 @@
#include "columnshard_impl.h"
#include "columnshard_schema.h"
#include <ydb/core/scheme/scheme_types_proto.h>
-#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
#include <ydb/core/tablet/tablet_counters_protobuf.h>
+#include <ydb/core/tx/tiering/external_data.h>
+#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
+#include <ydb/services/metadata/service.h>
namespace NKikimr::NColumnShard {
@@ -737,7 +739,7 @@ std::unique_ptr<TEvPrivate::TEvIndexing> TColumnShard::SetupIndexation() {
}
ActiveIndexingOrCompaction = true;
- auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(PrimaryIndex->GetIndexInfo(), indexChanges,
+ auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(GetActualIndexInfo(), indexChanges,
Settings.CacheDataAfterIndexing, std::move(cachedBlobs));
return std::make_unique<TEvPrivate::TEvIndexing>(std::move(ev));
}
@@ -775,7 +777,7 @@ std::unique_ptr<TEvPrivate::TEvCompaction> TColumnShard::SetupCompaction() {
}
ActiveIndexingOrCompaction = true;
- auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(PrimaryIndex->GetIndexInfo(), indexChanges,
+ auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(GetActualIndexInfo(), indexChanges,
Settings.CacheDataAfterCompaction);
return std::make_unique<TEvPrivate::TEvCompaction>(std::move(ev), *BlobManager);
}
@@ -791,24 +793,28 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u
return {};
}
- THashMap<ui64, NOlap::TTiersInfo> regularTtls;
- if (pathTtls.empty()) {
+ THashMap<ui64, NOlap::TTiersInfo> regularTtls = pathTtls;
+ if (regularTtls.empty()) {
regularTtls = Ttl.MakeIndexTtlMap(TInstant::Now(), force);
}
+ const bool tiersUsage = regularTtls.empty() && Tiers && Tiers->IsActive();
+ if (tiersUsage) {
+ regularTtls = Tiers->GetTiering();
+ }
- if (pathTtls.empty() && regularTtls.empty()) {
+ if (regularTtls.empty()) {
LOG_S_TRACE("TTL not started. No tables to activate it on (or delayed) at tablet " << TabletID());
return {};
+ } else {
+ for (auto&& i : regularTtls) {
+ LOG_S_DEBUG(i.first << "/" << i.second.GetDebugString() << ";tablet=" << TabletID());
+ }
}
LOG_S_DEBUG("Prepare TTL at tablet " << TabletID());
std::shared_ptr<NOlap::TColumnEngineChanges> indexChanges;
- if (pathTtls.empty()) {
- indexChanges = PrimaryIndex->StartTtl(regularTtls);
- } else {
- indexChanges = PrimaryIndex->StartTtl(pathTtls);
- }
+ indexChanges = PrimaryIndex->StartTtl(regularTtls);
if (!indexChanges) {
LOG_S_NOTICE("Cannot prepare TTL at tablet " << TabletID());
@@ -821,7 +827,7 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u
bool needWrites = !indexChanges->PortionsToEvict.empty();
ActiveTtl = true;
- auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(PrimaryIndex->GetIndexInfo(), indexChanges, false);
+ auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(GetActualIndexInfo(tiersUsage), indexChanges, false);
return std::make_unique<TEvPrivate::TEvEviction>(std::move(ev), *BlobManager, needWrites);
}
@@ -871,35 +877,13 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() {
return {};
}
- auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(PrimaryIndex->GetIndexInfo(), changes, false);
+ auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(GetActualIndexInfo(), changes, false);
ev->PutStatus = NKikimrProto::OK; // No new blobs to write
ActiveCleanup = true;
return ev;
}
-static NOlap::TCompression ConvertCompression(const NKikimrSchemeOp::TCompressionOptions& compression) {
- NOlap::TCompression out;
- if (compression.HasCompressionCodec()) {
- switch (compression.GetCompressionCodec()) {
- case NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain:
- out.Codec = arrow::Compression::UNCOMPRESSED;
- break;
- case NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4:
- out.Codec = arrow::Compression::LZ4_FRAME;
- break;
- case NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD:
- out.Codec = arrow::Compression::ZSTD;
- break;
- }
- }
-
- if (compression.HasCompressionLevel()) {
- out.Level = compression.GetCompressionLevel();
- }
- return out;
-}
-
NOlap::TIndexInfo TColumnShard::ConvertSchema(const NKikimrSchemeOp::TColumnTableSchema& schema) {
Y_VERIFY(schema.GetEngine() == NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES);
@@ -921,22 +905,20 @@ NOlap::TIndexInfo TColumnShard::ConvertSchema(const NKikimrSchemeOp::TColumnTabl
}
if (schema.HasDefaultCompression()) {
- NOlap::TCompression compression = ConvertCompression(schema.GetDefaultCompression());
+ NOlap::TCompression compression = NTiers::TManager::ConvertCompression(schema.GetDefaultCompression());
indexInfo.SetDefaultCompression(compression);
}
- for (auto& tierConfig : schema.GetStorageTiers()) {
- auto& tierName = tierConfig.GetName();
- TierConfigs[tierName] = TTierConfig{tierConfig};
-
- NOlap::TStorageTier tier{ .Name = tierName };
- if (tierConfig.HasCompression()) {
- tier.Compression = ConvertCompression(tierConfig.GetCompression());
- }
- if (TierConfigs[tierName].NeedExport()) {
- S3Actors[tierName] = {}; // delayed actor creation
+ EnableTiering = schema.GetEnableTiering();
+ if (OwnerPath && !Tiers && EnableTiering) {
+ Tiers = std::make_shared<TTiersManager>(TabletID(), OwnerPath);
+ }
+ if (!!Tiers) {
+ if (EnableTiering) {
+ Tiers->Start(Tiers);
+ } else {
+ Tiers->Stop();
}
- indexInfo.AddStorageTier(std::move(tier));
}
return indexInfo;
@@ -973,30 +955,16 @@ void TColumnShard::MapExternBlobs(const TActorContext& /*ctx*/, NOlap::TReadMeta
}
}
-TActorId TColumnShard::GetS3ActorForTier(const TString& tierName, const TString& phase) const {
- auto it = S3Actors.find(tierName);
- if (it == S3Actors.end()) {
- LOG_S_ERROR("No S3 actor for tier '" << tierName << "' (on " << phase << ") at tablet " << TabletID());
- return {};
- }
- auto s3 = it->second;
- if (!s3) {
- LOG_S_ERROR("Not started S3 actor for tier '" << tierName << "' (on " << phase << ") at tablet " << TabletID());
- return {};
- }
- return s3;
-}
-
void TColumnShard::ExportBlobs(const TActorContext& ctx, ui64 exportNo, const TString& tierName,
TEvPrivate::TEvExport::TBlobDataMap&& blobsInfo) const {
- if (auto s3 = GetS3ActorForTier(tierName, "export")) {
+ if (auto s3 = GetS3ActorForTier(tierName)) {
auto event = std::make_unique<TEvPrivate::TEvExport>(exportNo, tierName, s3, std::move(blobsInfo));
ctx.Register(CreateExportActor(TabletID(), ctx.SelfID, event.release()));
}
}
void TColumnShard::ForgetBlobs(const TActorContext& ctx, const TString& tierName, std::vector<NOlap::TEvictedBlob>&& blobs) const {
- if (auto s3 = GetS3ActorForTier(tierName, "forget")) {
+ if (auto s3 = GetS3ActorForTier(tierName)) {
auto forget = std::make_unique<TEvPrivate::TEvForget>();
forget->Evicted = std::move(blobs);
ctx.Send(s3, forget.release());
@@ -1005,7 +973,7 @@ void TColumnShard::ForgetBlobs(const TActorContext& ctx, const TString& tierName
bool TColumnShard::GetExportedBlob(const TActorContext& ctx, TActorId dst, ui64 cookie, const TString& tierName,
NOlap::TEvictedBlob&& evicted, std::vector<NOlap::TBlobRange>&& ranges) {
- if (auto s3 = GetS3ActorForTier(tierName, "get exported")) {
+ if (auto s3 = GetS3ActorForTier(tierName)) {
auto get = std::make_unique<TEvPrivate::TEvGetExported>();
get->DstActor = dst;
get->DstCookie = cookie;
@@ -1017,41 +985,41 @@ bool TColumnShard::GetExportedBlob(const TActorContext& ctx, TActorId dst, ui64
return false;
}
-ui32 TColumnShard::InitS3Actors(const TActorContext& ctx, bool init) {
- ui32 count = 0;
-#ifndef KIKIMR_DISABLE_S3_OPS
- for (auto& [tierName, actor] : S3Actors) {
- if (!init && actor) {
- continue;
- }
+void TColumnShard::Die(const TActorContext& ctx) {
+ // TODO
+ if (!!Tiers) {
+ Tiers->Stop();
+ }
+ NTabletPipe::CloseAndForgetClient(SelfId(), StatsReportPipe);
+ UnregisterMediatorTimeCast();
+ return IActor::Die(ctx);
+}
- Y_VERIFY(!actor);
- Y_VERIFY(TierConfigs.count(tierName));
- auto& tierConfig = TierConfigs[tierName];
- Y_VERIFY(tierConfig.NeedExport());
+TActorId TColumnShard::GetS3ActorForTier(const TString& tierName) const {
+ if (!Tiers) {
+ return {};
+ }
+ return Tiers->GetStorageActorId(tierName);
+}
- actor = ctx.Register(CreateS3Actor(TabletID(), ctx.SelfID, tierName));
- ctx.Send(actor, new TEvPrivate::TEvS3Settings(tierConfig.S3Settings()));
- ++count;
+void TColumnShard::Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev) {
+ if (!Tiers) {
+ Tiers = std::make_shared<TTiersManager>(TabletID(), OwnerPath);
+ }
+ Tiers->TakeConfigs(ev->Get()->GetSnapshot());
+ if (EnableTiering) {
+ Tiers->Start(Tiers);
}
-#else
- Y_UNUSED(ctx);
- Y_UNUSED(init);
-#endif
- return count;
}
-void TColumnShard::StopS3Actors(const TActorContext& ctx) {
-#ifndef KIKIMR_DISABLE_S3_OPS
- for (auto& [_, actor] : S3Actors) {
- if (actor) {
- ctx.Send(actor, new TEvents::TEvPoisonPill);
- actor = {};
+NOlap::TIndexInfo TColumnShard::GetActualIndexInfo(const bool tiersUsage) const {
+ auto indexInfo = PrimaryIndex->GetIndexInfo();
+ if (tiersUsage && Tiers && Tiers->IsActive()) {
+ for (auto&& i : *Tiers) {
+ indexInfo.AddStorageTier(i.second.BuildTierStorage());
}
}
-#else
- Y_UNUSED(ctx);
-#endif
+ return indexInfo;
}
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index d0028870eab..5595ccf502d 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -7,14 +7,15 @@
#include "blob_manager.h"
#include "inflight_request_tracker.h"
-#include <ydb/core/tx/tx_processing.h>
-#include <ydb/core/tx/time_cast/time_cast.h>
#include <ydb/core/tablet/tablet_counters.h>
#include <ydb/core/tablet/tablet_pipe_client_cache.h>
#include <ydb/core/tablet_flat/flat_cxx_database.h>
#include <ydb/core/tablet_flat/tablet_flat_executed.h>
#include <ydb/core/tablet_flat/tablet_flat_executor.h>
+#include <ydb/core/tx/tiering/manager.h>
+#include <ydb/core/tx/time_cast/time_cast.h>
#include <ydb/core/tx/tx_processing.h>
+#include <ydb/services/metadata/service.h>
namespace NKikimr::NColumnShard {
@@ -38,9 +39,6 @@ IActor* CreateReadActor(ui64 tabletId,
ui64 requestCookie);
IActor* CreateColumnShardScan(const TActorId& scanComputeActor, ui32 scanId, ui64 txId);
IActor* CreateExportActor(const ui64 tabletId, const TActorId& dstActor, TAutoPtr<TEvPrivate::TEvExport> ev);
-#ifndef KIKIMR_DISABLE_S3_OPS
-IActor* CreateS3Actor(ui64 tabletId, const TActorId& parent, const TString& tierName);
-#endif
struct TSettings {
static constexpr ui32 MAX_INDEXATIONS_TO_SKIP = 16;
@@ -119,7 +117,6 @@ class TColumnShard
void Handle(TEvColumnShard::TEvReadBlobRanges::TPtr& ev, const TActorContext& ctx);
void Handle(TEvMediatorTimecast::TEvRegisterTabletResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvMediatorTimecast::TEvNotifyPlanStep::TPtr& ev, const TActorContext& ctx);
-
void Handle(TEvPrivate::TEvScanStats::TPtr &ev, const TActorContext &ctx);
void Handle(TEvPrivate::TEvReadFinished::TPtr &ev, const TActorContext &ctx);
void Handle(TEvPrivate::TEvPeriodicWakeup::TPtr& ev, const TActorContext& ctx);
@@ -127,7 +124,8 @@ class TColumnShard
void Handle(TEvPrivate::TEvExport::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvForget::TPtr& ev, const TActorContext& ctx);
void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr& ev, const TActorContext& ctx);
-
+ void Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev);
+
ITransaction* CreateTxInitSchema();
ITransaction* CreateTxRunGc();
@@ -141,13 +139,13 @@ class TColumnShard
Y_UNUSED(ctx);
}
- void Die(const TActorContext& ctx) override {
- // TODO
- NTabletPipe::CloseAndForgetClient(SelfId(), StatsReportPipe);
- UnregisterMediatorTimeCast();
- return IActor::Die(ctx);
+ const NTiers::TManager& GetTierManagerVerified(const TString& tierName) const {
+ Y_VERIFY(!!Tiers);
+ return Tiers->GetManagerVerified(tierName);
}
+ void Die(const TActorContext& ctx) override;
+
void BecomeBroken(const TActorContext& ctx);
void SwitchToWork(const TActorContext& ctx);
@@ -177,6 +175,8 @@ class TColumnShard
TabletCounters->Percentile()[counter].IncrementFor(latency.MicroSeconds());
}
+ NOlap::TIndexInfo GetActualIndexInfo(const bool tiersUsage = true) const;
+
protected:
STFUNC(StateInit) {
TRACE_EVENT(NKikimrServices::TX_COLUMNSHARD);
@@ -203,6 +203,7 @@ protected:
STFUNC(StateWork) {
TRACE_EVENT(NKikimrServices::TX_COLUMNSHARD);
switch (ev->GetTypeRewrite()) {
+ hFunc(NMetadataProvider::TEvRefreshSubscriberData, Handle);
HFunc(TEvents::TEvPoisonPill, Handle);
HFunc(TEvTabletPipe::TEvClientConnected, Handle);
HFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
@@ -316,21 +317,6 @@ private:
}
};
- struct TTierConfig {
- using TTierProto = NKikimrSchemeOp::TStorageTierConfig;
- using TS3SettingsProto = NKikimrSchemeOp::TS3Settings;
-
- TTierProto Proto;
-
- bool NeedExport() const {
- return Proto.HasObjectStorage();
- }
-
- const TS3SettingsProto& S3Settings() const {
- return Proto.GetObjectStorage();
- }
- };
-
struct TLongTxWriteInfo {
ui64 WriteId;
NLongTxService::TLongTxId LongTxId;
@@ -367,14 +353,15 @@ private:
TActorId CompactionActor; // It's memory bounded to 1: we have no memory for parallel compation.
TActorId EvictionActor;
TActorId StatsReportPipe;
- THashMap<TString, TActorId> S3Actors;
+
+ bool EnableTiering = false;
+ std::shared_ptr<TTiersManager> Tiers;
std::unique_ptr<TTabletCountersBase> TabletCountersPtr;
TTabletCountersBase* TabletCounters;
std::unique_ptr<NTabletPipe::IClientCache> PipeClientCache;
std::unique_ptr<NOlap::TInsertTable> InsertTable;
std::unique_ptr<NOlap::IColumnEngine> PrimaryIndex;
TBatchCache BatchCache;
- THashMap<TString, TTierConfig> TierConfigs;
THashSet<NOlap::TUnifiedBlobId> DelayedForgetBlobs;
TTtl Ttl;
@@ -458,14 +445,12 @@ private:
NOlap::TIndexInfo ConvertSchema(const NKikimrSchemeOp::TColumnTableSchema& schema);
void MapExternBlobs(const TActorContext& ctx, NOlap::TReadMetadata& metadata);
- TActorId GetS3ActorForTier(const TString& tierName, const TString& phase) const;
+ TActorId GetS3ActorForTier(const TString& tierName) const;
void ExportBlobs(const TActorContext& ctx, ui64 exportNo, const TString& tierName,
TEvPrivate::TEvExport::TBlobDataMap&& blobsInfo) const;
void ForgetBlobs(const TActorContext& ctx, const TString& tierName, std::vector<NOlap::TEvictedBlob>&& blobs) const;
bool GetExportedBlob(const TActorContext& ctx, TActorId dst, ui64 cookie, const TString& tierName,
NOlap::TEvictedBlob&& evicted, std::vector<NOlap::TBlobRange>&& ranges);
- ui32 InitS3Actors(const TActorContext& ctx, bool init);
- void StopS3Actors(const TActorContext& ctx);
std::unique_ptr<TEvPrivate::TEvIndexing> SetupIndexation();
std::unique_ptr<TEvPrivate::TEvCompaction> SetupCompaction();
@@ -479,7 +464,6 @@ private:
void UpdateResourceMetrics(const TActorContext& ctx, const TUsage& usage);
ui64 MemoryUsage() const;
void SendPeriodicStats();
-
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::TX_COLUMNSHARD_ACTOR;
diff --git a/ydb/core/tx/columnshard/columnshard_private_events.cpp b/ydb/core/tx/columnshard/columnshard_private_events.cpp
new file mode 100644
index 00000000000..83d1d0b29bc
--- /dev/null
+++ b/ydb/core/tx/columnshard/columnshard_private_events.cpp
@@ -0,0 +1,4 @@
+#include "columnshard_private_events.h"
+
+namespace NKikimr::NColumnShard {
+}
diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h
index bfbe1c429a7..bdf7402b168 100644
--- a/ydb/core/tx/columnshard/columnshard_private_events.h
+++ b/ydb/core/tx/columnshard/columnshard_private_events.h
@@ -1,6 +1,7 @@
#pragma once
#include "blob_manager.h"
+#include "defs.h"
#include <ydb/core/protos/counters_columnshard.pb.h>
@@ -38,11 +39,11 @@ struct TEvPrivate {
bool CacheData{false};
TDuration Duration;
- TEvWriteIndex(const NOlap::TIndexInfo& indexInfo,
+ TEvWriteIndex(NOlap::TIndexInfo&& indexInfo,
std::shared_ptr<NOlap::TColumnEngineChanges> indexChanges,
bool cacheData,
THashMap<TUnifiedBlobId, std::shared_ptr<arrow::RecordBatch>>&& cachedBlobs = {})
- : IndexInfo(indexInfo)
+ : IndexInfo(std::move(indexInfo))
, IndexChanges(indexChanges)
, CachedBlobs(std::move(cachedBlobs))
, CacheData(cacheData)
diff --git a/ydb/core/tx/columnshard/columnshard_ttl.h b/ydb/core/tx/columnshard/columnshard_ttl.h
index a9c88419ccb..aae50f3e7ee 100644
--- a/ydb/core/tx/columnshard/columnshard_ttl.h
+++ b/ydb/core/tx/columnshard/columnshard_ttl.h
@@ -20,8 +20,6 @@ public:
TDescription() = default;
TDescription(const NKikimrSchemeOp::TColumnDataLifeCycle& ttl) {
- TDuration prevEvicSec;
-
if (ttl.HasEnabled()) {
auto& enabled = ttl.GetEnabled();
ColumnName = enabled.GetColumnName();
@@ -29,23 +27,6 @@ public:
Evictions.reserve(1);
Evictions.emplace_back(TEviction{{}, expireSec});
- } else if (ttl.HasTiering()) {
- Evictions.reserve(ttl.GetTiering().TiersSize());
-
- for (auto& tier : ttl.GetTiering().GetTiers()) {
- auto& eviction = tier.GetEviction();
- Y_VERIFY(ColumnName.empty() || ColumnName == eviction.GetColumnName());
- ColumnName = eviction.GetColumnName();
- auto evictSec = TDuration::Seconds(eviction.GetExpireAfterSeconds());
-
- // Ignore next tier if it has smaller eviction time. Prefer first tier with same eviction time.
- if (evictSec > prevEvicSec) {
- Evictions.emplace_back(TEviction{tier.GetName(), evictSec});
- prevEvicSec = evictSec;
- }
- }
-
- Evictions.shrink_to_fit();
}
if (Enabled()) {
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/columnshard_ut_common.cpp
index 35c4b08ff5c..af407457cba 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.cpp
@@ -42,6 +42,12 @@ void TTester::Setup(TTestActorRuntime& runtime) {
runtime.UpdateCurrentTime(TInstant::Now());
}
+void ProvideTieringSnapshot(TTestBasicRuntime& runtime, TActorId& sender, NMetadataProvider::ISnapshot::TPtr snapshot) {
+ auto event = std::make_unique<NMetadataProvider::TEvRefreshSubscriberData>(snapshot);
+
+ ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, event.release());
+}
+
bool ProposeSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, const TString& txBody, NOlap::TSnapshot snap) {
auto event = std::make_unique<TEvColumnShard::TEvProposeTransaction>(
NKikimrTxColumnShard::TX_KIND_SCHEMA, 0, sender, snap.TxId, txBody);
@@ -285,4 +291,37 @@ TSerializedTableRange MakeTestRange(std::pair<ui64, ui64> range, bool inclusiveF
TConstArrayRef<TCell>(cellsTo), inclusiveTo);
}
+NMetadataProvider::ISnapshot::TPtr TTestSchema::BuildSnapshot(const TTableSpecials& specials, const TString& tablePath, const ui32 tablePathId) {
+ std::unique_ptr<NColumnShard::NTiers::TConfigsSnapshot> cs(new NColumnShard::NTiers::TConfigsSnapshot(Now()));
+ for (auto&& tier : specials.Tiers) {
+ {
+ NColumnShard::NTiers::TTierConfig tConfig;
+ tConfig.SetOwnerPath("/Root");
+ tConfig.SetTierName(tier.Name);
+ tConfig.MutableProtoConfig().SetName(tier.Name);
+ auto& cProto = tConfig.MutableProtoConfig();
+ if (tier.S3) {
+ *cProto.MutableObjectStorage() = *tier.S3;
+ }
+ if (tier.Codec) {
+ cProto.MutableCompression()->SetCompressionCodec(tier.GetCodecId());
+ }
+ if (tier.CompressionLevel) {
+ cProto.MutableCompression()->SetCompressionLevel(*tier.CompressionLevel);
+ }
+ cs->MutableTierConfigs().emplace(tConfig.GetConfigId(), tConfig);
+ }
+ {
+ NColumnShard::NTiers::TTieringRule tRule;
+ tRule.SetOwnerPath("/Root");
+ tRule.SetDurationForEvict(TDuration::Seconds(tier.GetEvictAfterSecondsUnsafe()));
+ tRule.SetTablePath(tablePath).SetTablePathId(tablePathId);
+ tRule.SetTierName(tier.Name);
+ tRule.SetColumn(tier.GetTtlColumn());
+ cs->MutableTableTierings()[tablePath].AddRule(std::move(tRule));
+ }
+ }
+ return cs;
+}
+
}
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h
index 8a9839f9ad0..e0cdb1766f0 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.h
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.h
@@ -25,19 +25,20 @@ using TTypeId = NScheme::TTypeId;
using TTypeInfo = NScheme::TTypeInfo;
struct TTestSchema {
- static const constexpr char * DefaultTtlColumn = "saved_at";
+ static inline const TString DefaultTtlColumn = "saved_at";
struct TStorageTier {
+ YDB_ACCESSOR(TString, TtlColumn, DefaultTtlColumn);
+ YDB_OPT(ui32, EvictAfterSeconds);
+ public:
TString Name;
TString Codec;
std::optional<int> CompressionLevel;
- std::optional<ui32> EvictAfterSeconds;
- TString TtlColumn;
std::optional<NKikimrSchemeOp::TS3Settings> S3;
TStorageTier(const TString& name = {})
: Name(name)
- , TtlColumn(DefaultTtlColumn)
+
{}
NKikimrSchemeOp::EColumnCodec GetCodecId() const {
@@ -62,12 +63,6 @@ struct TTestSchema {
}
return *this;
}
-
- TStorageTier& SetTtl(ui32 seconds, const TString& column = DefaultTtlColumn) {
- EvictAfterSeconds = seconds;
- TtlColumn = column;
- return *this;
- }
};
struct TTableSpecials : public TStorageTier {
@@ -78,7 +73,7 @@ struct TTestSchema {
}
bool HasTtl() const {
- return !HasTiers() && EvictAfterSeconds;
+ return !HasTiers() && HasEvictAfterSeconds();
}
TTableSpecials WithCodec(const TString& codec) {
@@ -211,37 +206,15 @@ struct TTestSchema {
schema->MutableDefaultCompression()->SetCompressionLevel(*specials.CompressionLevel);
}
- for (auto& tier : specials.Tiers) {
- auto* t = schema->AddStorageTiers();
- t->SetName(tier.Name);
- if (tier.HasCodec()) {
- t->MutableCompression()->SetCompressionCodec(tier.GetCodecId());
- }
- if (tier.CompressionLevel) {
- t->MutableCompression()->SetCompressionLevel(*tier.CompressionLevel);
- }
- if (tier.S3) {
- t->MutableObjectStorage()->CopyFrom(*tier.S3);
- }
- }
+ schema->SetEnableTiering(specials.HasTiers());
}
- if (specials.HasTiers()) {
- auto* ttlSettings = table->MutableTtlSettings();
- ttlSettings->SetVersion(1);
- auto* tiering = ttlSettings->MutableTiering();
- for (auto& tier : specials.Tiers) {
- auto* t = tiering->AddTiers();
- t->SetName(tier.Name);
- t->MutableEviction()->SetColumnName(tier.TtlColumn);
- t->MutableEviction()->SetExpireAfterSeconds(*tier.EvictAfterSeconds);
- }
- } else if (specials.HasTtl()) {
+ if (specials.HasTtl()) {
auto* ttlSettings = table->MutableTtlSettings();
ttlSettings->SetVersion(1);
auto* enable = ttlSettings->MutableEnabled();
- enable->SetColumnName(specials.TtlColumn);
- enable->SetExpireAfterSeconds(*specials.EvictAfterSeconds);
+ enable->SetColumnName(specials.GetTtlColumn());
+ enable->SetExpireAfterSeconds(specials.GetEvictAfterSecondsUnsafe());
}
TString out;
@@ -258,18 +231,10 @@ struct TTestSchema {
auto* ttlSettings = table->MutableTtlSettings();
ttlSettings->SetVersion(version);
- if (specials.HasTiers()) {
- auto* tiering = ttlSettings->MutableTiering();
- for (auto& tier : specials.Tiers) {
- auto* t = tiering->AddTiers();
- t->SetName(tier.Name);
- t->MutableEviction()->SetColumnName(tier.TtlColumn);
- t->MutableEviction()->SetExpireAfterSeconds(*tier.EvictAfterSeconds);
- }
- } else if (specials.HasTtl()) {
+ if (specials.HasTtl()) {
auto* enable = ttlSettings->MutableEnabled();
- enable->SetColumnName(specials.TtlColumn);
- enable->SetExpireAfterSeconds(*specials.EvictAfterSeconds);
+ enable->SetColumnName(specials.GetTtlColumn());
+ enable->SetExpireAfterSeconds(specials.GetEvictAfterSecondsUnsafe());
} else {
ttlSettings->MutableDisabled();
}
@@ -289,6 +254,8 @@ struct TTestSchema {
return out;
}
+ static NMetadataProvider::ISnapshot::TPtr BuildSnapshot(const TTableSpecials& specials, const TString& tablePath, const ui32 tablePathId);
+
static TString CommitTxBody(ui64 metaShard, const TVector<ui64>& writeIds) {
NKikimrTxColumnShard::TCommitTxBody proto;
if (metaShard) {
@@ -336,6 +303,7 @@ struct TTestSchema {
};
bool ProposeSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, const TString& txBody, NOlap::TSnapshot snap);
+void ProvideTieringSnapshot(TTestBasicRuntime& runtime, TActorId& sender, NMetadataProvider::ISnapshot::TPtr snapshot);
void PlanSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap);
bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui64 writeId, ui64 tableId,
const TString& data, std::shared_ptr<arrow::Schema> schema = {});
diff --git a/ydb/core/tx/columnshard/engines/column_engine.cpp b/ydb/core/tx/columnshard/engines/column_engine.cpp
index 99542dd0a1f..e8e3611ea8e 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine.cpp
@@ -1,4 +1,14 @@
-#include "column_engine.h"
+#include "column_engine.h"
namespace NKikimr::NOlap {
+
+TString TTiersInfo::GetDebugString() const {
+ TStringBuilder sb;
+ sb << "column=" << Column << ";";
+ for (auto&& i : TierBorders) {
+ sb << "tname=" << i.TierName << ";eborder=" << i.EvictBorder << ";";
+ }
+ return sb;
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index 4cfc7fa2ef6..2422d5ebcde 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -86,6 +86,8 @@ struct TTiersInfo {
TString Column;
std::vector<TTierTimeBorder> TierBorders; // Ordered tiers from hottest to coldest
+ TString GetDebugString() const;
+
TTiersInfo(const TString& column, TInstant border = {}, const TString& tierName = {})
: Column(column)
{
diff --git a/ydb/core/tx/columnshard/engines/index_info.cpp b/ydb/core/tx/columnshard/engines/index_info.cpp
index 45c601d7c0d..5eff70619a9 100644
--- a/ydb/core/tx/columnshard/engines/index_info.cpp
+++ b/ydb/core/tx/columnshard/engines/index_info.cpp
@@ -246,4 +246,12 @@ std::shared_ptr<NArrow::TSortDescription> TIndexInfo::SortReplaceDescription() c
return {};
}
+bool TIndexInfo::AllowTtlOverColumn(const TString& name) const {
+ auto it = ColumnNames.find(name);
+ if (it == ColumnNames.end()) {
+ return false;
+ }
+ return MinMaxIdxColumnsIds.count(it->second);
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h
index 218c8dfbbf0..3828bb946d0 100644
--- a/ydb/core/tx/columnshard/engines/index_info.h
+++ b/ydb/core/tx/columnshard/engines/index_info.h
@@ -178,13 +178,7 @@ struct TIndexInfo : public NTable::TScheme::TTableSchema {
return MinMaxIdxColumnsIds;
}
- bool AllowTtlOverColumn(const TString& name) const {
- auto it = ColumnNames.find(name);
- if (it == ColumnNames.end()) {
- return false;
- }
- return MinMaxIdxColumnsIds.count(it->second);
- }
+ bool AllowTtlOverColumn(const TString& name) const;
bool IsSorted() const { return SortingKey.get(); }
bool IsReplacing() const { return ReplaceKey.get(); }
diff --git a/ydb/core/tx/columnshard/external_data.cpp b/ydb/core/tx/columnshard/external_data.cpp
deleted file mode 100644
index 5a95613ca81..00000000000
--- a/ydb/core/tx/columnshard/external_data.cpp
+++ /dev/null
@@ -1,91 +0,0 @@
-#include "external_data.h"
-#include <library/cpp/json/writer/json_value.h>
-#include <library/cpp/protobuf/json/proto2json.h>
-
-namespace NKikimr::NColumnShard::NTiers {
-
-bool TConfigsSnapshot::DoDeserializeFromResultSet(const Ydb::ResultSet& rawData) {
- const i32 ownerPathIdx = GetFieldIndex(rawData, "ownerPath");
- const i32 tierNameIdx = GetFieldIndex(rawData, "tierName");
- const i32 tierConfigIdx = GetFieldIndex(rawData, "tierConfig");
- if (tierNameIdx < 0 || tierConfigIdx < 0 || ownerPathIdx < 0) {
- ALS_ERROR(NKikimrServices::TX_COLUMNSHARD) << "incorrect tiers config table structure";
- return false;
- }
- for (auto&& r : rawData.rows()) {
- TConfig config(r.items()[ownerPathIdx].bytes_value(), r.items()[tierNameIdx].bytes_value());
- if (!config.DeserializeFromString(r.items()[tierConfigIdx].bytes_value())) {
- ALS_ERROR(NKikimrServices::TX_COLUMNSHARD) << "cannot parse tier config from snapshot";
- return false;
- }
- if (!Data.emplace(config.GetConfigId(), config).second) {
- ALS_ERROR(NKikimrServices::TX_COLUMNSHARD) << "tier names duplication: " << config.GetTierName();
- return false;
- }
- }
- return true;
-}
-
-std::optional<TConfig> TConfigsSnapshot::GetValue(const TString& key) const {
- auto it = Data.find(key);
- if (it == Data.end()) {
- return {};
- } else {
- return it->second;
- }
-}
-
-TVector<NMetadataProvider::ITableModifier::TPtr> TSnapshotConstructor::DoGetTableSchema() const {
- Ydb::Table::CreateTableRequest request;
- request.set_session_id("");
- request.set_path(TablePath);
- request.add_primary_key("ownerPath");
- request.add_primary_key("tierName");
- {
- auto& column = *request.add_columns();
- column.set_name("tierName");
- column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
- }
- {
- auto& column = *request.add_columns();
- column.set_name("ownerPath");
- column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
- }
- {
- auto& column = *request.add_columns();
- column.set_name("tierConfig");
- column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
- }
- NMetadataProvider::ITableModifier::TPtr result(
- new NMetadataProvider::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request));
- return { result };
-}
-
-TString NTiers::TConfigsSnapshot::DoSerializeToString() const {
- NJson::TJsonValue result = NJson::JSON_MAP;
- for (auto&& i : Data) {
- result.InsertValue(i.first, i.second.SerializeToJson());
- }
- return result.GetStringRobust();
-}
-
-bool NTiers::TConfig::DeserializeFromString(const TString& configProtoStr) {
- if (!::google::protobuf::TextFormat::ParseFromString(configProtoStr, &ProtoConfig)) {
- ALS_ERROR(NKikimrServices::TX_COLUMNSHARD) << "cannot parse proto string: " << configProtoStr;
- return false;
- }
- return true;
-}
-
-NJson::TJsonValue TConfig::SerializeToJson() const {
- NJson::TJsonValue result = NJson::JSON_MAP;
- result.InsertValue("tierName", TierName);
- NProtobufJson::Proto2Json(ProtoConfig, result.InsertValue("tierConfig", NJson::JSON_MAP));
- return result;
-}
-
-bool TConfig::IsSame(const TConfig& item) const {
- return TierName == item.TierName && ProtoConfig.SerializeAsString() == item.ProtoConfig.SerializeAsString();
-}
-
-}
diff --git a/ydb/core/tx/columnshard/external_data.h b/ydb/core/tx/columnshard/external_data.h
deleted file mode 100644
index e070fa5ba21..00000000000
--- a/ydb/core/tx/columnshard/external_data.h
+++ /dev/null
@@ -1,66 +0,0 @@
-#pragma once
-#include <ydb/services/metadata/service.h>
-#include <ydb/core/protos/flat_scheme_op.pb.h>
-
-#include <library/cpp/json/writer/json_value.h>
-
-namespace NKikimr::NColumnShard::NTiers {
-
-class TConfig {
-private:
- using TTierProto = NKikimrSchemeOp::TStorageTierConfig;
- using TS3SettingsProto = NKikimrSchemeOp::TS3Settings;
- YDB_READONLY_DEF(TString, OwnerPath);
- YDB_READONLY_DEF(TString, TierName);
- YDB_READONLY_DEF(TTierProto, ProtoConfig);
-public:
- TConfig() = default;
- TConfig(const TString& ownerPath, const TString& tierName)
- : OwnerPath(ownerPath)
- , TierName(tierName)
- {
-
- }
-
- TString GetConfigId() const {
- return OwnerPath + "." + TierName;
- }
-
- bool NeedExport() const {
- return ProtoConfig.HasObjectStorage();
- }
- bool IsSame(const TConfig& item) const;
- bool DeserializeFromString(const TString& configProtoStr);
- NJson::TJsonValue SerializeToJson() const;
-};
-
-class TConfigsSnapshot: public NMetadataProvider::ISnapshot {
-private:
- using TBase = NMetadataProvider::ISnapshot;
- using TConfigsMap = TMap<TString, TConfig>;
- YDB_READONLY_DEF(TConfigsMap, Data);
-protected:
- virtual bool DoDeserializeFromResultSet(const Ydb::ResultSet& rawData) override;
- virtual TString DoSerializeToString() const override;
-public:
- std::optional<TConfig> GetValue(const TString& key) const;
- using TBase::TBase;
-};
-
-class TSnapshotConstructor: public NMetadataProvider::TGenericSnapshotParser<TConfigsSnapshot> {
-private:
- const TString TablePath;
-protected:
- virtual TVector<NMetadataProvider::ITableModifier::TPtr> DoGetTableSchema() const override;
- virtual const TString& DoGetTablePath() const override {
- return TablePath;
- }
-public:
- TSnapshotConstructor(const TString& tablePath)
- : TablePath(tablePath)
- {
-
- }
-};
-
-}
diff --git a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
index 20e5e296ca5..9cf3766c597 100644
--- a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
+++ b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
@@ -27,7 +27,7 @@ std::shared_ptr<arrow::RecordBatch> UpdateColumn(std::shared_ptr<arrow::RecordBa
auto schema = batch->schema();
int pos = schema->GetFieldIndex(name);
- UNIT_ASSERT(pos > 0);
+ UNIT_ASSERT(pos >= 0);
UNIT_ASSERT(batch->GetColumnByName(name)->type_id() == arrow::Type::TIMESTAMP);
auto scalar = arrow::TimestampScalar(seconds * 1000 * 1000, arrow::timestamp(arrow::TimeUnit::MICRO));
@@ -42,7 +42,7 @@ std::shared_ptr<arrow::RecordBatch> UpdateColumn(std::shared_ptr<arrow::RecordBa
}
bool TriggerTTL(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap, const TVector<ui64>& pathIds,
- ui64 tsSeconds = 0, const TString& ttlColumnName = TTestSchema::DefaultTtlColumn) {
+ ui64 tsSeconds, const TString& ttlColumnName) {
TString txBody = TTestSchema::TtlTxBody(pathIds, ttlColumnName, tsSeconds);
auto event = std::make_unique<TEvColumnShard::TEvProposeTransaction>(
NKikimrTxColumnShard::TX_KIND_TTL, sender, snap.TxId, txBody);
@@ -86,7 +86,7 @@ bool CheckSame(const TString& blob, const TString& srtSchema, ui32 expectedSize,
return true;
}
-std::vector<TString> MakeData(const std::vector<ui64>& ts, ui32 portionSize, ui32 overlapSize) {
+std::vector<TString> MakeData(const std::vector<ui64>& ts, ui32 portionSize, ui32 overlapSize, const TString& ttlColumnName) {
UNIT_ASSERT(ts.size() == 2);
TString data1 = MakeTestBlob({0, portionSize}, testYdbSchema);
@@ -98,8 +98,8 @@ std::vector<TString> MakeData(const std::vector<ui64>& ts, ui32 portionSize, ui3
UNIT_ASSERT(data2.size() < 7 * 1024 * 1024);
auto schema = NArrow::MakeArrowSchema(testYdbSchema);
- auto batch1 = UpdateColumn(NArrow::DeserializeBatch(data1, schema), TTestSchema::DefaultTtlColumn, ts[0]);
- auto batch2 = UpdateColumn(NArrow::DeserializeBatch(data2, schema), TTestSchema::DefaultTtlColumn, ts[1]);
+ auto batch1 = UpdateColumn(NArrow::DeserializeBatch(data1, schema), ttlColumnName, ts[0]);
+ auto batch2 = UpdateColumn(NArrow::DeserializeBatch(data2, schema), ttlColumnName, ts[1]);
std::vector<TString> data;
data.emplace_back(NArrow::SerializeBatchNoCompression(batch1));
@@ -156,20 +156,20 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
ttlSec -= (ts[0] + ts[1]) / 2; // enable internal ttl between ts1 and ts2
}
if (spec.HasTiers()) {
- spec.Tiers[0].SetTtl(ttlSec);
+ spec.Tiers[0].SetEvictAfterSeconds(ttlSec);
} else {
- spec.SetTtl(ttlSec);
+ spec.SetEvictAfterSeconds(ttlSec);
}
bool ok = ProposeSchemaTx(runtime, sender,
TTestSchema::CreateTableTxBody(tableId, testYdbSchema, testYdbPk, spec),
{++planStep, ++txId});
UNIT_ASSERT(ok);
PlanSchemaTx(runtime, sender, {planStep, txId});
-
+ ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(spec, "test", tableId));
//
ui32 portionSize = 80 * 1000;
- auto blobs = MakeData(ts, portionSize, portionSize / 2);
+ auto blobs = MakeData(ts, portionSize, portionSize / 2, spec.GetTtlColumn());
UNIT_ASSERT_EQUAL(blobs.size(), 2);
for (auto& data : blobs) {
UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, data));
@@ -184,9 +184,9 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
}
if (internal) {
- TriggerTTL(runtime, sender, {++planStep, ++txId}, {});
+ TriggerTTL(runtime, sender, {++planStep, ++txId}, {}, 0, spec.GetTtlColumn());
} else {
- TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[0] + 1);
+ TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[0] + 1, spec.GetTtlColumn());
}
TAutoPtr<IEventHandle> handle;
@@ -198,7 +198,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
{
--planStep;
auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, Max<ui64>(), tableId);
- Proto(read.get()).AddColumnNames(TTestSchema::DefaultTtlColumn);
+ Proto(read.get()).AddColumnNames(spec.GetTtlColumn());
ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release());
auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
@@ -213,32 +213,33 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
UNIT_ASSERT(resRead.GetData().size() > 0);
auto& schema = resRead.GetMeta().GetSchema();
- UNIT_ASSERT(CheckSame(resRead.GetData(), schema, portionSize, TTestSchema::DefaultTtlColumn, ts[1]));
+ UNIT_ASSERT(CheckSame(resRead.GetData(), schema, portionSize, spec.GetTtlColumn(), ts[1]));
}
// Alter TTL
ttlSec = TInstant::Now().Seconds() - (ts[1] + 1);
if (spec.HasTiers()) {
- spec.Tiers[0].SetTtl(ttlSec);
+ spec.Tiers[0].SetEvictAfterSeconds(ttlSec);
} else {
- spec.SetTtl(ttlSec);
+ spec.SetEvictAfterSeconds(ttlSec);
}
ok = ProposeSchemaTx(runtime, sender,
TTestSchema::AlterTableTxBody(tableId, 2, spec),
{++planStep, ++txId});
UNIT_ASSERT(ok);
PlanSchemaTx(runtime, sender, {planStep, txId});
+ ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(spec, "test", tableId));
if (internal) {
- TriggerTTL(runtime, sender, {++planStep, ++txId}, {});
+ TriggerTTL(runtime, sender, {++planStep, ++txId}, {}, 0, spec.GetTtlColumn());
} else {
- TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[1] + 1);
+ TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[1] + 1, spec.GetTtlColumn());
}
{
--planStep;
auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, Max<ui64>(), tableId);
- Proto(read.get()).AddColumnNames(TTestSchema::DefaultTtlColumn);
+ Proto(read.get()).AddColumnNames(spec.GetTtlColumn());
ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release());
auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
@@ -258,6 +259,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
TTestSchema::AlterTableTxBody(tableId, 3, TTestSchema::TTableSpecials()),
{++planStep, ++txId});
UNIT_ASSERT(ok);
+ ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(TTestSchema::TTableSpecials(), "test", tableId));
PlanSchemaTx(runtime, sender, {planStep, txId});
UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, blobs[0]));
@@ -265,15 +267,15 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
PlanCommit(runtime, sender, ++planStep, txId);
if (internal) {
- TriggerTTL(runtime, sender, {++planStep, ++txId}, {});
+ TriggerTTL(runtime, sender, {++planStep, ++txId}, {}, 0, spec.GetTtlColumn());
} else {
- TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[0] - 1);
+ TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[0] - 1, spec.GetTtlColumn());
}
{
--planStep;
auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, Max<ui64>(), tableId);
- Proto(read.get()).AddColumnNames(TTestSchema::DefaultTtlColumn);
+ Proto(read.get()).AddColumnNames(spec.GetTtlColumn());
ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release());
auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
@@ -288,7 +290,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
UNIT_ASSERT(resRead.GetData().size() > 0);
auto& schema = resRead.GetMeta().GetSchema();
- UNIT_ASSERT(CheckSame(resRead.GetData(), schema, portionSize, TTestSchema::DefaultTtlColumn, ts[0]));
+ UNIT_ASSERT(CheckSame(resRead.GetData(), schema, portionSize, spec.GetTtlColumn(), ts[0]));
}
}
@@ -398,7 +400,7 @@ public:
};
};
-std::vector<std::pair<std::shared_ptr<arrow::TimestampArray>, ui64>>
+std::vector<std::pair<ui32, ui64>>
TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTestSchema::TTableSpecials>& specs) {
TTestBasicRuntime runtime;
TTester::Setup(runtime);
@@ -423,12 +425,16 @@ TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTe
ui64 txId = 100;
UNIT_ASSERT(specs.size() > 0);
- bool ok = ProposeSchemaTx(runtime, sender,
- TTestSchema::CreateTableTxBody(tableId, testYdbSchema, testYdbPk, specs[0]),
- {++planStep, ++txId});
- UNIT_ASSERT(ok);
+ {
+ const bool ok = ProposeSchemaTx(runtime, sender,
+ TTestSchema::CreateTableTxBody(tableId, testYdbSchema, testYdbPk, specs[0]),
+ { ++planStep, ++txId });
+ UNIT_ASSERT(ok);
+ }
PlanSchemaTx(runtime, sender, {planStep, txId});
+ ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(specs[0], "test", tableId));
+
for (auto& data : blobs) {
UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, data));
ProposeCommit(runtime, sender, metaShard, ++txId, {writeId});
@@ -441,7 +447,7 @@ TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTe
TAutoPtr<IEventHandle> handle;
- std::vector<std::pair<std::shared_ptr<arrow::TimestampArray>, ui64>> resColumns;
+ std::vector<std::pair<ui32, ui64>> resColumns;
resColumns.reserve(specs.size());
TCountersContainer counter;
@@ -449,11 +455,15 @@ TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTe
for (ui32 i = 0; i < specs.size(); ++i) {
if (i) {
ui32 version = i + 1;
- ok = ProposeSchemaTx(runtime, sender,
- TTestSchema::AlterTableTxBody(tableId, version, specs[i]),
- {++planStep, ++txId});
- UNIT_ASSERT(ok);
- PlanSchemaTx(runtime, sender, {planStep, txId});
+ {
+ const bool ok = ProposeSchemaTx(runtime, sender,
+ TTestSchema::AlterTableTxBody(tableId, version, specs[i]),
+ { ++planStep, ++txId });
+ UNIT_ASSERT(ok);
+ PlanSchemaTx(runtime, sender, { planStep, txId });
+ }
+
+ ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(specs[i], "test", tableId));
}
bool hasEvictionSettings = false;
@@ -466,7 +476,7 @@ TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTe
counter.SetRestartTabletOnPutData(reboots ? 1 : 0);
- TriggerTTL(runtime, sender, {++planStep, ++txId}, {});
+ TriggerTTL(runtime, sender, {++planStep, ++txId}, {}, 0, specs[i].GetTtlColumn());
if (hasEvictionSettings) {
if (i == 1 || i == 2) {
counter.WaitEvents(runtime, sender, i, 1, TDuration::Seconds(40));
@@ -481,34 +491,39 @@ TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTe
--planStep;
auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, Max<ui64>(), tableId);
- Proto(read.get()).AddColumnNames(TTestSchema::DefaultTtlColumn);
+ Proto(read.get()).AddColumnNames(specs[i].GetTtlColumn());
ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release());
- auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
- UNIT_ASSERT(event);
-
- auto& resRead = Proto(event);
- UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0);
- UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard);
- UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
- UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0);
- UNIT_ASSERT_EQUAL(resRead.GetFinished(), true);
-
- if (resRead.GetData().size()) {
+ resColumns.emplace_back(0, 0);
+ ui32 idx = 0;
+ while (true) {
+ auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
+ UNIT_ASSERT(event);
+
+ auto& resRead = Proto(event);
+ UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0);
+ UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard);
+ UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
+ UNIT_ASSERT_EQUAL(resRead.GetBatch(), idx++);
+
+ if (!resRead.GetData().size()) {
+ break;
+ }
auto& meta = resRead.GetMeta();
auto& schema = meta.GetSchema();
- auto pkColumn = GetFirstPKColumn(resRead.GetData(), schema, TTestSchema::DefaultTtlColumn);
+ auto pkColumn = GetFirstPKColumn(resRead.GetData(), schema, specs[i].GetTtlColumn());
UNIT_ASSERT(pkColumn);
UNIT_ASSERT(pkColumn->type_id() == arrow::Type::TIMESTAMP);
- UNIT_ASSERT(meta.HasReadStats());
- auto& readStats = meta.GetReadStats();
- ui64 numBytes = readStats.GetDataBytes(); // compressed bytes in storage
-
auto tsColumn = std::static_pointer_cast<arrow::TimestampArray>(pkColumn);
- resColumns.emplace_back(tsColumn, numBytes);
- } else {
- resColumns.emplace_back(nullptr, 0);
+ resColumns.back().first += tsColumn->length();
+ if (resRead.GetFinished()) {
+ UNIT_ASSERT(meta.HasReadStats());
+ auto& readStats = meta.GetReadStats();
+ ui64 numBytes = readStats.GetDataBytes(); // compressed bytes in storage
+ resColumns.back().second += numBytes;
+ break;
+ }
}
if (reboots) {
@@ -529,21 +544,21 @@ void TestTwoTiers(const TTestSchema::TTableSpecials& spec, bool compressed, bool
ui64 allowOne = nowSec - ts[1] + 600;
ui64 allowNone = nowSec - ts[1] - 600;
- alters[0].Tiers[0].SetTtl(allowBoth); // tier0 allows/has: data[0], data[1]
- alters[0].Tiers[1].SetTtl(allowBoth); // tier1 allows: data[0], data[1], has: nothing
+ alters[0].Tiers[0].SetEvictAfterSeconds(allowBoth); // tier0 allows/has: data[0], data[1]
+ alters[0].Tiers[1].SetEvictAfterSeconds(allowBoth); // tier1 allows: data[0], data[1], has: nothing
- alters[1].Tiers[0].SetTtl(allowOne); // tier0 allows/has: data[1]
- alters[1].Tiers[1].SetTtl(allowBoth); // tier1 allows: data[0], data[1], has: data[0]
+ alters[1].Tiers[0].SetEvictAfterSeconds(allowOne); // tier0 allows/has: data[1]
+ alters[1].Tiers[1].SetEvictAfterSeconds(allowBoth); // tier1 allows: data[0], data[1], has: data[0]
- alters[2].Tiers[0].SetTtl(allowNone); // tier0 allows/has: nothing
- alters[2].Tiers[1].SetTtl(allowOne); // tier1 allows/has: data[1]
+ alters[2].Tiers[0].SetEvictAfterSeconds(allowNone); // tier0 allows/has: nothing
+ alters[2].Tiers[1].SetEvictAfterSeconds(allowOne); // tier1 allows/has: data[1]
- alters[3].Tiers[0].SetTtl(allowNone); // tier0 allows/has: nothing
- alters[3].Tiers[1].SetTtl(allowNone); // tier1 allows/has: nothing
+ alters[3].Tiers[0].SetEvictAfterSeconds(allowNone); // tier0 allows/has: nothing
+ alters[3].Tiers[1].SetEvictAfterSeconds(allowNone); // tier1 allows/has: nothing
ui32 portionSize = 80 * 1000;
ui32 overlapSize = 40 * 1000;
- std::vector<TString> blobs = MakeData(ts, portionSize, overlapSize);
+ std::vector<TString> blobs = MakeData(ts, portionSize, overlapSize, spec.GetTtlColumn());
auto columns = TestTiers(reboots, blobs, alters);
@@ -557,9 +572,9 @@ void TestTwoTiers(const TTestSchema::TTableSpecials& spec, bool compressed, bool
UNIT_ASSERT(columns[2].second);
UNIT_ASSERT(!columns[3].second);
- UNIT_ASSERT_EQUAL(columns[0].first->length(), 2 * portionSize - overlapSize);
- UNIT_ASSERT_EQUAL(columns[0].first->length(), columns[1].first->length());
- UNIT_ASSERT_EQUAL(columns[2].first->length(), portionSize);
+ UNIT_ASSERT_EQUAL(columns[0].first, 2 * portionSize/* - overlapSize*/);
+ UNIT_ASSERT_EQUAL(columns[0].first, columns[1].first);
+ UNIT_ASSERT_EQUAL(columns[2].first, portionSize);
Cerr << "read bytes: " << columns[0].second << ", " << columns[1].second << ", " << columns[2].second << "\n";
if (compressed) {
@@ -571,8 +586,9 @@ void TestTwoTiers(const TTestSchema::TTableSpecials& spec, bool compressed, bool
void TestTwoHotTiers(bool reboot) {
TTestSchema::TTableSpecials spec;
- spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier0"));
- spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier1"));
+ spec.SetTtlColumn("timestamp");
+ spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier0").SetTtlColumn("timestamp"));
+ spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier1").SetTtlColumn("timestamp"));
spec.Tiers.back().SetCodec("zstd");
TestTwoTiers(spec, true, reboot);
@@ -587,8 +603,9 @@ void TestHotAndColdTiers(bool reboot) {
UNIT_ASSERT(s3Mock.Start());
TTestSchema::TTableSpecials spec;
- spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier0"));
- spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier1"));
+ spec.SetTtlColumn("timestamp");
+ spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier0").SetTtlColumn("timestamp"));
+ spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier1").SetTtlColumn("timestamp"));
spec.Tiers.back().S3 = NKikimrSchemeOp::TS3Settings();
auto& s3Config = *spec.Tiers.back().S3;
{
@@ -840,27 +857,31 @@ Y_UNIT_TEST_SUITE(TColumnShardTestSchema) {
Y_UNIT_TEST(OneTier) {
TTestSchema::TTableSpecials specs;
- specs.Tiers.emplace_back(TTestSchema::TStorageTier("default"));
+ specs.SetTtlColumn("timestamp");
+ specs.Tiers.emplace_back(TTestSchema::TStorageTier("default").SetTtlColumn("timestamp"));
TestTtl(false, true, specs);
}
Y_UNIT_TEST(RebootOneTier) {
NColumnShard::gAllowLogBatchingDefaultValue = false;
TTestSchema::TTableSpecials specs;
- specs.Tiers.emplace_back(TTestSchema::TStorageTier("default"));
+ specs.SetTtlColumn("timestamp");
+ specs.Tiers.emplace_back(TTestSchema::TStorageTier("default").SetTtlColumn("timestamp"));
TestTtl(true, true, specs);
}
Y_UNIT_TEST(OneTierExternalTtl) {
TTestSchema::TTableSpecials specs;
- specs.Tiers.emplace_back(TTestSchema::TStorageTier("default"));
+ specs.SetTtlColumn("timestamp");
+ specs.Tiers.emplace_back(TTestSchema::TStorageTier("default").SetTtlColumn("timestamp"));
TestTtl(false, false, specs);
}
Y_UNIT_TEST(RebootOneTierExternalTtl) {
NColumnShard::gAllowLogBatchingDefaultValue = false;
TTestSchema::TTableSpecials specs;
- specs.Tiers.emplace_back(TTestSchema::TStorageTier("default"));
+ specs.SetTtlColumn("timestamp");
+ specs.Tiers.emplace_back(TTestSchema::TStorageTier("default").SetTtlColumn("timestamp"));
TestTtl(true, false, specs);
}
diff --git a/ydb/core/tx/columnshard/ut_columnshard_tiers.cpp b/ydb/core/tx/columnshard/ut_columnshard_tiers.cpp
index 521ed7aa450..7b407936d1e 100644
--- a/ydb/core/tx/columnshard/ut_columnshard_tiers.cpp
+++ b/ydb/core/tx/columnshard/ut_columnshard_tiers.cpp
@@ -1,15 +1,16 @@
#include "columnshard_ut_common.h"
-#include "external_data.h"
-#include <ydb/core/wrappers/ut_helpers/s3_mock.h>
-#include <ydb/core/wrappers/s3_wrapper.h>
-#include <ydb/services/metadata/service.h>
#include <ydb/core/cms/console/configs_dispatcher.h>
#include <ydb/core/testlib/cs_helper.h>
-#include <ydb/core/tx/tx_proxy/proxy.h>
+#include <ydb/core/tx/tiering/external_data.h>
#include <ydb/core/tx/schemeshard/schemeshard.h>
+#include <ydb/core/tx/tx_proxy/proxy.h>
+#include <ydb/core/wrappers/ut_helpers/s3_mock.h>
+#include <ydb/core/wrappers/s3_wrapper.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+#include <ydb/services/metadata/service.h>
#include <library/cpp/actors/core/av_bootstrapped.h>
+#include <library/cpp/protobuf/json/proto2json.h>
#include <util/system/hostname.h>
@@ -44,13 +45,7 @@ public:
#Columns { Name: "request_id" Type: "Utf8" }
KeyColumnNames: "timestamp"
Engine: COLUMN_ENGINE_REPLACING_TIMESERIES
- StorageTiers {
- Name: "tier1"
- ObjectStorage {
- Endpoint: "fake"
- AccessKey: "$a"
- }
- }
+ EnableTiering : true
}
}
)", storeName.c_str(), storeShardsCount));
@@ -77,6 +72,33 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
const TString ConfigProtoStr = "Name : \"abc\"";
+ class TJsonChecker {
+ private:
+ YDB_ACCESSOR_DEF(TString, Path);
+ YDB_ACCESSOR_DEF(TString, Expectation);
+ public:
+ TJsonChecker(const TString& path, const TString& expectation)
+ : Path(path)
+ , Expectation(expectation)
+ {
+
+ }
+ bool Check(const NJson::TJsonValue& jsonInfo) const {
+ auto* jsonPathValue = jsonInfo.GetValueByPath(Path);
+ if (!jsonPathValue) {
+ return Expectation == "__NULL";
+ }
+ return jsonPathValue->GetStringRobust() == Expectation;
+ }
+
+ TString GetDebugString() const {
+ TStringBuilder sb;
+ sb << "path=" << Path << ";"
+ << "expectation=" << Expectation << ";";
+ return sb;
+ }
+ };
+
class TTestCSEmulator: public NActors::TActorBootstrapped<TTestCSEmulator> {
private:
using TBase = NActors::TActorBootstrapped<TTestCSEmulator>;
@@ -84,27 +106,88 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
TActorId ProviderId;
TInstant Start;
YDB_READONLY_FLAG(Found, false);
+ YDB_ACCESSOR(ui32, TieringsCount, 1);
+ using TKeyCheckers = TMap<TString, TJsonChecker>;
+ YDB_ACCESSOR_DEF(TKeyCheckers, Checkers);
public:
- STFUNC(StateInit) {
+ STATEFN(StateInit) {
switch (ev->GetTypeRewrite()) {
- HFunc(NMetadataProvider::TEvRefreshSubscriberData, Handle);
+ hFunc(NMetadataProvider::TEvRefreshSubscriberData, Handle);
default:
Y_VERIFY(false);
}
}
- void Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev, const TActorContext&) {
- auto value = ev->Get()->GetSnapshotAs<NTiers::TConfigsSnapshot>()->GetValue("/Root/olapStore.tier1");
- if (value && value->GetProtoConfig().GetName() == "abc") {
- FoundFlag = true;
- } else {
- Cerr << ev->Get()->GetSnapshot()->SerializeToString() << Endl;
+ void CheckRuntime(TTestActorRuntime& runtime) {
+ const auto pred = [this](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event)->TTestActorRuntimeBase::EEventAction {
+ if (event->HasBuffer() && !event->HasEvent()) {
+ } else if (!event->GetBase()) {
+ } else {
+ auto ptr = dynamic_cast<NMetadataProvider::TEvRefreshSubscriberData*>(event->GetBase());
+ if (ptr) {
+ CheckFound(ptr);
+ }
+ }
+ return TTestActorRuntimeBase::EEventAction::PROCESS;
+ };
+
+ runtime.SetObserverFunc(pred);
+
+ for (const TInstant start = Now(); !IsFound() && Now() - start < TDuration::Seconds(10); ) {
+ runtime.DispatchEvents(TDispatchOptions(), TDuration::Seconds(1));
+ }
+ Y_VERIFY(IsFound());
+ }
+
+ void CheckFound(NMetadataProvider::TEvRefreshSubscriberData* event) {
+ auto snapshot = event->GetSnapshotAs<NTiers::TConfigsSnapshot>();
+ Y_VERIFY(!!snapshot);
+ auto* tInfo = snapshot->GetTableTiering("/Root/olapStore/olapTable");
+ if (TieringsCount) {
+ if (!tInfo) {
+ Cerr << "tiering not found: " << snapshot->SerializeToString() << Endl;
+ return ;
+ }
+ if (tInfo->GetRules().size() != TieringsCount) {
+ Cerr << "TieringsCount incorrect: " << snapshot->SerializeToString() << Endl;
+ return;
+ }
+ } else if (tInfo) {
+ Cerr << "tiering found but its incorrect: " << snapshot->SerializeToString() << Endl;
+ return;
+ }
+ for (auto&& [_, tiering] : snapshot->GetTableTierings()) {
+ if (tiering.GetTablePathId() == 0) {
+ Cerr << "PathId not initialized: " << snapshot->SerializeToString() << Endl;
+ return;
+ } else if (tiering.GetTablePathId() == InvalidLocalPathId) {
+ Cerr << "PathId invalid: " << snapshot->SerializeToString() << Endl;
+ return;
+ } else {
+ Cerr << "PathId: " << tiering.GetTablePathId() << Endl;
+ }
}
+ for (auto&& i : Checkers) {
+ auto value = snapshot->GetValue(i.first);
+ NJson::TJsonValue jsonData;
+ NProtobufJson::Proto2Json(value->GetProtoConfig(), jsonData);
+ if (!i.second.Check(jsonData)) {
+ Cerr << "config value incorrect:" << snapshot->SerializeToString() << ";snapshot_check_path=" << i.first << Endl;
+ Cerr << "json path incorrect:" << jsonData << ";" << i.second.GetDebugString() << Endl;
+ return;
+ }
+ }
+ FoundFlag = true;
+ }
+
+ void Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev) {
+ CheckFound(ev->Get());
}
void Bootstrap() {
ProviderId = NMetadataProvider::MakeServiceId(1);
- ExternalDataManipulation = std::make_shared<NTiers::TSnapshotConstructor>("/Root/.external_data/tiers");
+ ExternalDataManipulation = std::make_shared<NTiers::TSnapshotConstructor>();
+ ExternalDataManipulation->Start(ExternalDataManipulation);
Become(&TThis::StateInit);
Sender<NMetadataProvider::TEvSubscribeExternal>(ExternalDataManipulation).SendTo(ProviderId);
Start = Now();
@@ -137,8 +220,11 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
auto sender = runtime.AllocateEdgeActor();
server->SetupRootStoragePools(sender);
+ TLocalHelper lHelper(*server);
+ lHelper.CreateTestOlapTable();
{
TTestCSEmulator* emulator = new TTestCSEmulator;
+ emulator->MutableCheckers().emplace("/Root/olapStore.tier1", TJsonChecker("Name", "abc"));
runtime.Register(emulator);
{
const TInstant start = Now();
@@ -152,15 +238,26 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
}
}
- NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false));
- tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) {
- auto sResult = f.GetValueSync();
- Cerr << sResult.GetIssues().ToString() << Endl;
- auto session = sResult.GetSession();
- session.ExecuteDataQuery("INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) VALUES ('/Root/olapStore', 'tier1', '" + ConfigProtoStr + "')"
- , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx());
- });
-
+ {
+ NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false));
+ tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) {
+ auto session = f.GetValueSync().GetSession();
+ session.ExecuteDataQuery(
+ "INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) "
+ "VALUES ('/Root/olapStore', 'tier1', '" + ConfigProtoStr + "')"
+ , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx());
+ });
+ }
+ {
+ NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false));
+ tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) {
+ auto session = f.GetValueSync().GetSession();
+ session.ExecuteDataQuery(
+ "INSERT INTO `/Root/.external_data/tiering` (ownerPath, tierName, tablePath, column, durationForEvict) "
+ "VALUES ('/Root/olapStore', 'tier1', '/Root/olapStore/olapTable', 'timestamp', '10d')"
+ , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx());
+ });
+ }
const TInstant start = Now();
while (!emulator->IsFound() && Now() - start < TDuration::Seconds(20)) {
runtime.DispatchEvents(TDispatchOptions(), TDuration::Seconds(10));
@@ -170,7 +267,7 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
//runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE);
//runtime.SetLogPriority(NKikimrServices::KQP_YQL, NLog::PRI_TRACE);
}
-/*
+
Y_UNIT_TEST(DSConfigs) {
TPortManager pm;
@@ -189,8 +286,6 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
Tests::TServer::TPtr server = new Tests::TServer(serverSettings);
server->EnableGRpc(grpcPort);
- // server->SetupDefaultProfiles();
-
Tests::TClient client(serverSettings);
auto& runtime = *server->GetRuntime();
@@ -198,11 +293,12 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
auto sender = runtime.AllocateEdgeActor();
server->SetupRootStoragePools(sender);
TLocalHelper lHelper(*server);
- lHelper.CreateTestOlapTable();
+ lHelper.CreateTestOlapTable("olapTable");
- runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
- const TInstant start = Now();
- while (Now() - start < TDuration::Seconds(10)) {
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_NOTICE);
+ runtime.SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NLog::PRI_INFO);
+// runtime.SetLogPriority(NKikimrServices::TX_PROXY_SCHEME_CACHE, NLog::PRI_DEBUG);
+ for (const TInstant start = Now(); Now() - start < TDuration::Seconds(10); ) {
runtime.WaitForEdgeEvents([](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) {
Cerr << "Step " << event->Type << Endl;
return false;
@@ -211,44 +307,89 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
Cerr << "Step finished" << Endl;
}
- NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false));
- tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) {
- auto sResult = f.GetValueSync();
- Cerr << sResult.GetIssues().ToString() << Endl;
- auto session = sResult.GetSession();
- session.ExecuteDataQuery("INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) VALUES ('/Root/olapStore', 'tier1', '" + ConfigProtoStr + "')"
- , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx());
- });
-
- bool found = false;
- bool* foundPtr = &found;
- const auto pred = [foundPtr](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event)->TTestActorRuntimeBase::EEventAction {
- if (event->HasBuffer() && !event->HasEvent()) {
- } else if (!event->GetBase()) {
- Cerr << "Type nullptr" << Endl;
- } else {
- Cerr << "Step " << event->GetBase()->Type() << Endl;
- auto ptr = dynamic_cast<NMetadataProvider::TEvRefreshSubscriberData*>(event->GetBase());
- if (ptr) {
- auto value = ptr->GetSnapshotAs<NTiers::TConfigsSnapshot>()->GetValue("/Root/olapStore.tier1");
- if (value && value->GetProtoConfig().GetName() == "abc") {
- *foundPtr = true;
- } else {
- Cerr << ptr->GetSnapshot()->SerializeToString() << Endl;
- }
- }
- }
- return TTestActorRuntimeBase::EEventAction::PROCESS;
- };
+ {
+ NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false));
+ tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) {
+ auto session = f.GetValueSync().GetSession();
+ session.ExecuteDataQuery(
+ "INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) "
+ "VALUES ('/Root/olapStore', 'tier1', '" + ConfigProtoStr + "')"
+ , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx());
+ });
+ }
+ {
+ NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false));
+ tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) {
+ auto session = f.GetValueSync().GetSession();
+ session.ExecuteDataQuery(
+ "INSERT INTO `/Root/.external_data/tiering` (ownerPath, tierName, tablePath, column, durationForEvict) "
+ "VALUES ('/Root/olapStore', 'tier1', '/Root/olapStore/olapTable', 'timestamp', '10d')"
+ , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx());
+ });
+ }
+ {
+ TTestCSEmulator emulator;
+ emulator.MutableCheckers().emplace("/Root/olapStore.tier1", TJsonChecker("Name", "abc"));
+ emulator.SetTieringsCount(1);
+ emulator.CheckRuntime(runtime);
+ }
+
+ {
+ NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false));
+ tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) {
+ auto session = f.GetValueSync().GetSession();
+ session.ExecuteDataQuery(
+ "INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) "
+ "VALUES ('/Root/olapStore', 'tier2', '" + ConfigProtoStr + "')"
+ , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx());
+ });
+ }
+ {
+ NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false));
+ tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) {
+ auto session = f.GetValueSync().GetSession();
+ session.ExecuteDataQuery(
+ "INSERT INTO `/Root/.external_data/tiering` (ownerPath, tierName, tablePath, column, durationForEvict) "
+ "VALUES ('/Root/olapStore', 'tier2', '/Root/olapStore/olapTable', 'timestamp', '20d')"
+ , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx());
+ });
+ }
- runtime.SetObserverFunc(pred);
+ {
+ TTestCSEmulator emulator;
+ emulator.MutableCheckers().emplace("/Root/olapStore.tier1", TJsonChecker("Name", "abc"));
+ emulator.MutableCheckers().emplace("/Root/olapStore.tier2", TJsonChecker("Name", "abc"));
+ emulator.SetTieringsCount(2);
+ emulator.CheckRuntime(runtime);
+ }
- while (!found) {
- runtime.DispatchEvents(TDispatchOptions(), TDuration::Seconds(10));
+ {
+ NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false));
+ tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) {
+ auto session = f.GetValueSync().GetSession();
+ session.ExecuteDataQuery(
+ "DELETE FROM `/Root/.external_data/tiers`"
+ , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx());
+ });
+ }
+ {
+ NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false));
+ tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) {
+ auto session = f.GetValueSync().GetSession();
+ session.ExecuteDataQuery(
+ "DELETE FROM `/Root/.external_data/tiering`"
+ , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx());
+ });
+ }
+ {
+ TTestCSEmulator emulator;
+ emulator.SetTieringsCount(0);
+ emulator.CheckRuntime(runtime);
}
+
//runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE);
//runtime.SetLogPriority(NKikimrServices::KQP_YQL, NLog::PRI_TRACE);
}
-*/
+
}
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp
index 275428e540c..6f167751aa1 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp
@@ -49,11 +49,6 @@ TColumnTableInfo::TPtr ParseParams(
tableSchema = &presetProto.GetSchema();
}
- THashSet<TString> knownTiers;
- for (const auto& tier : tableSchema->GetStorageTiers()) {
- knownTiers.insert(tier.GetName());
- }
-
THashMap<ui32, TOlapSchema::TColumn> columns;
THashMap<TString, ui32> columnsByName;
for (const auto& col : tableSchema->GetColumns()) {
@@ -65,7 +60,7 @@ TColumnTableInfo::TPtr ParseParams(
columnsByName[name] = id;
}
- if (!ValidateTtlSettings(alter.GetAlterTtlSettings(), columns, columnsByName, knownTiers, errStr)) {
+ if (!ValidateTtlSettings(alter.GetAlterTtlSettings(), columns, columnsByName, errStr)) {
status = NKikimrScheme::StatusInvalidParameter;
return nullptr;
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp
index aeb53be7b82..a2685bf2b2d 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp
@@ -12,13 +12,6 @@ namespace {
using namespace NKikimr;
using namespace NSchemeShard;
-bool PrepareTier(const NKikimrSchemeOp::TStorageTierConfig& proto, TString& errStr) {
- Y_UNUSED(proto);
- Y_UNUSED(errStr);
- // TODO
- return true;
-}
-
// TODO: make it a part of TOlapSchema
bool PrepareSchema(NKikimrSchemeOp::TColumnTableSchema& proto, TOlapSchema& schema, TString& errStr) {
schema.NextColumnId = proto.GetNextColumnId();
@@ -107,7 +100,7 @@ bool PrepareSchema(NKikimrSchemeOp::TColumnTableSchema& proto, TOlapSchema& sche
errStr = "At least one key column is required";
return false;
}
-
+#if 0
for (auto& tierConfig : proto.GetStorageTiers()) {
TString tierName = tierConfig.GetName();
if (schema.Tiers.count(tierName)) {
@@ -120,6 +113,7 @@ bool PrepareSchema(NKikimrSchemeOp::TColumnTableSchema& proto, TOlapSchema& sche
return false;
}
}
+#endif
schema.Engine = proto.GetEngine();
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp
index 40e92d9f0fc..917688b71c5 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp
@@ -70,7 +70,6 @@ TColumnTableInfo::TPtr CreateColumnTable(
Y_VERIFY(pSchema, "Expected to find a preset schema");
- THashSet<TString> storageTiers;
if (op.HasSchema()) {
auto& opSchema = op.GetSchema();
const NScheme::TTypeRegistry* typeRegistry = AppData()->TypeRegistry;
@@ -176,20 +175,7 @@ TColumnTableInfo::TPtr CreateColumnTable(
return nullptr;
}
- storageTiers.clear();
- for (auto tier : opSchema.GetStorageTiers()) {
- auto& name = tier.GetName();
- if (!pSchema->Tiers.count(name)) {
- status = NKikimrScheme::StatusSchemeError;
- errStr = TStringBuilder() << "Specified schema tier does not match schema preset: '" << name << "'";
- return nullptr;
- }
- storageTiers.insert(name);
- }
-
op.ClearSchema();
- } else {
- storageTiers = pSchema->Tiers;
}
if (op.HasRESERVED_TtlSettingsPresetName() || op.HasRESERVED_TtlSettingsPresetId()) {
@@ -204,7 +190,7 @@ TColumnTableInfo::TPtr CreateColumnTable(
// Validate ttl settings and schema compatibility
if (op.HasTtlSettings()) {
- if (!ValidateTtlSettings(op.GetTtlSettings(), pSchema->Columns, pSchema->ColumnsByName, storageTiers, errStr)) {
+ if (!ValidateTtlSettings(op.GetTtlSettings(), pSchema->Columns, pSchema->ColumnsByName, errStr)) {
status = NKikimrScheme::StatusInvalidParameter;
return nullptr;
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
index 5f17890af45..5c9a351992d 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
@@ -2023,9 +2023,6 @@ TOlapStoreInfo::TOlapStoreInfo(
col.KeyOrder = preset.KeyColumnIds.size();
preset.KeyColumnIds.push_back(col.Id);
}
- for (const auto& tierConfig : presetProto.GetSchema().GetStorageTiers()) {
- preset.Tiers.insert(tierConfig.GetName());
- }
preset.Engine = presetProto.GetSchema().GetEngine();
preset.NextColumnId = presetProto.GetSchema().GetNextColumnId();
preset.Version = presetProto.GetSchema().GetVersion();
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h
index e3ddc608aa9..1d03551aa85 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.h
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h
@@ -804,7 +804,6 @@ struct TOlapSchema {
TColumns Columns;
TColumnsByName ColumnsByName;
TVector<ui32> KeyColumnIds;
- THashSet<TString> Tiers; // TODO: hash map with tier config
NKikimrSchemeOp::EColumnTableEngine Engine = NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES;
ui32 NextColumnId = 1;
ui64 Version = 1;
@@ -2796,7 +2795,6 @@ bool ValidateTtlSettings(const NKikimrSchemeOp::TTTLSettings& ttl,
bool ValidateTtlSettings(const NKikimrSchemeOp::TColumnDataLifeCycle& ttl,
const THashMap<ui32, TOlapSchema::TColumn>& columns,
const THashMap<TString, ui32>& columnsByName,
- const THashSet<TString>& knownTiers,
TString& errStr);
bool ValidateTtlSettingsChange(
const NKikimrSchemeOp::TColumnDataLifeCycle& oldTtl,
diff --git a/ydb/core/tx/schemeshard/schemeshard_validate_ttl.cpp b/ydb/core/tx/schemeshard/schemeshard_validate_ttl.cpp
index b209f7e43c1..6b3bc7a2cc5 100644
--- a/ydb/core/tx/schemeshard/schemeshard_validate_ttl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_validate_ttl.cpp
@@ -158,44 +158,20 @@ static bool ValidateColumnTableTtl(const NKikimrSchemeOp::TColumnDataLifeCycle::
bool ValidateTtlSettings(const NKikimrSchemeOp::TColumnDataLifeCycle& ttl,
const THashMap<ui32, TOlapSchema::TColumn>& columns,
const THashMap<TString, ui32>& columnsByName,
- const THashSet<TString>& knownTiers,
TString& errStr)
{
using TTtlProto = NKikimrSchemeOp::TColumnDataLifeCycle;
- THashSet<TString> prevTiers;
-
switch (ttl.GetStatusCase()) {
- case TTtlProto::kTiering: {
- for (auto& tier : ttl.GetTiering().GetTiers()) {
- auto& name = tier.GetName();
- if (name.empty()) {
- errStr = "Unnamed tier in TTL policy";
- return false;
- }
- if (!knownTiers.count(name)) {
- errStr = Sprintf("Unknown tier in TTL policy: '%s'", name.data());
- return false;
- }
- if (prevTiers.count(name)) {
- errStr = Sprintf("The same tier name in TTL policy: '%s'", name.data());
- return false;
- }
- prevTiers.insert(name);
-
- if (!ValidateColumnTableTtl(tier.GetEviction(), {}, columns, columnsByName, errStr)) {
- return false;
- }
- }
- break;
- }
- case TTtlProto::kEnabled:
- return ValidateColumnTableTtl(ttl.GetEnabled(), {}, columns, columnsByName, errStr);
- case TTtlProto::kDisabled:
- break;
- default:
- errStr = "TTL status must be specified";
- return false;
+ case TTtlProto::kTiering:
+ break;
+ case TTtlProto::kEnabled:
+ return ValidateColumnTableTtl(ttl.GetEnabled(), {}, columns, columnsByName, errStr);
+ case TTtlProto::kDisabled:
+ break;
+ default:
+ errStr = "TTL status must be specified";
+ return false;
}
return true;
@@ -217,19 +193,6 @@ bool ValidateTtlSettingsChange(
if (ttl.GetStatusCase() == NKikimrSchemeOp::TColumnDataLifeCycle::kEnabled) {
newTtlColName = ttl.GetEnabled().GetColumnName();
oldTtlColName = oldTtl.GetEnabled().GetColumnName();
- } else if (ttl.GetStatusCase() == NKikimrSchemeOp::TColumnDataLifeCycle::kTiering) {
- if (ttl.GetTiering().TiersSize() != oldTtl.GetTiering().TiersSize()) {
- errStr = "Changing of tiers count is not supported yet";
- return false;
- }
-
- if (ttl.GetTiering().TiersSize() != 1) {
- errStr = "Changing of multiple tiers is not supported yet";
- return false;
- }
-
- newTtlColName = ttl.GetTiering().GetTiers()[0].GetEviction().GetColumnName();
- oldTtlColName = oldTtl.GetTiering().GetTiers()[0].GetEviction().GetColumnName();
}
if (newTtlColName != oldTtlColName) {
diff --git a/ydb/core/tx/schemeshard/ut_olap.cpp b/ydb/core/tx/schemeshard/ut_olap.cpp
index 154f062b9ec..82e6a5549ec 100644
--- a/ydb/core/tx/schemeshard/ut_olap.cpp
+++ b/ydb/core/tx/schemeshard/ut_olap.cpp
@@ -153,13 +153,12 @@ Y_UNIT_TEST_SUITE(TOlap) {
Columns { Name: "data" Type: "Utf8" }
KeyColumnNames: "timestamp"
Engine: COLUMN_ENGINE_REPLACING_TIMESERIES
- StorageTiers { Name: "tier0" }
- StorageTiers { Name: "tier0" }
+ EnableTiering : true
}
}
)";
- TestCreateOlapStore(runtime, ++txId, "/MyRoot", olapSchema1, {NKikimrScheme::StatusSchemeError});
+ TestCreateOlapStore(runtime, ++txId, "/MyRoot", olapSchema1, {NKikimrScheme::StatusAccepted});
}
Y_UNIT_TEST(CreateStoreWithDirs) {
@@ -310,9 +309,9 @@ Y_UNIT_TEST_SUITE(TOlap) {
Columns { Name: "data" Type: "Utf8" }
KeyColumnNames: "timestamp"
Engine: COLUMN_ENGINE_REPLACING_TIMESERIES
- StorageTiers { Name: "tierX" }
+ EnableTiering : true
}
- )", {NKikimrScheme::StatusSchemeError});
+ )", {NKikimrScheme::StatusAccepted});
}
Y_UNIT_TEST(CustomDefaultPresets) {
@@ -393,9 +392,7 @@ Y_UNIT_TEST_SUITE(TOlap) {
Columns { Name: "data" Type: "Utf8" }
KeyColumnNames: "timestamp"
Engine: COLUMN_ENGINE_REPLACING_TIMESERIES
- StorageTiers { Name: "tier0" }
- StorageTiers { Name: "tier1" Compression { CompressionCodec: ColumnCodecZSTD CompressionLevel: 5 } }
- StorageTiers { Name: "tier2" Compression { CompressionCodec: ColumnCodecZSTD CompressionLevel: 10 } }
+ EnableTiering : true
}
}
)";
@@ -466,7 +463,7 @@ Y_UNIT_TEST_SUITE(TOlap) {
)";
TestCreateColumnTable(runtime, ++txId, "/MyRoot/OlapStore", tableSchema4,
- {NKikimrScheme::StatusInvalidParameter});
+ {NKikimrScheme::StatusAccepted});
}
Y_UNIT_TEST(AlterStore) {
@@ -536,8 +533,7 @@ Y_UNIT_TEST_SUITE(TOlap) {
Columns { Name: "data" Type: "Utf8" }
KeyColumnNames: "timestamp"
Engine: COLUMN_ENGINE_REPLACING_TIMESERIES
- StorageTiers { Name: "tier0" }
- StorageTiers { Name: "tier1" }
+ EnableTiering : true
}
}
)";
diff --git a/ydb/core/tx/tiering/CMakeLists.txt b/ydb/core/tx/tiering/CMakeLists.txt
new file mode 100644
index 00000000000..48b8969a74c
--- /dev/null
+++ b/ydb/core/tx/tiering/CMakeLists.txt
@@ -0,0 +1,30 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(core-tx-tiering)
+target_link_libraries(core-tx-tiering PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ cpp-json-writer
+ ydb-core-blobstorage
+ ydb-core-protos
+ core-tablet_flat-protos
+ ydb-core-wrappers
+ api-protos
+ ydb-services-metadata
+)
+target_sources(core-tx-tiering PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/manager.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/decoder.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/external_data.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/tier_config.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/rule.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/s3_actor.cpp
+)
diff --git a/ydb/core/tx/tiering/decoder.cpp b/ydb/core/tx/tiering/decoder.cpp
new file mode 100644
index 00000000000..40196235569
--- /dev/null
+++ b/ydb/core/tx/tiering/decoder.cpp
@@ -0,0 +1,43 @@
+#include "decoder.h"
+#include <library/cpp/protobuf/json/proto2json.h>
+#include <contrib/libs/protobuf/src/google/protobuf/text_format.h>
+#include <library/cpp/actors/core/log.h>
+
+namespace NKikimr::NInternal {
+
+i32 TDecoderBase::GetFieldIndex(const Ydb::ResultSet& rawData, const TString& columnId, const bool verify /*= true*/) const {
+ i32 idx = 0;
+ for (auto&& i : rawData.columns()) {
+ if (i.name() == columnId) {
+ return idx;
+ }
+ ++idx;
+ }
+ Y_VERIFY(!verify, "incorrect columnId %s", columnId.data());
+ return -1;
+}
+
+bool TDecoderBase::Read(const ui32 columnIdx, TString& result, const Ydb::Value& r) const {
+ result = r.items()[columnIdx].bytes_value();
+ return true;
+}
+
+bool TDecoderBase::Read(const ui32 columnIdx, TDuration& result, const Ydb::Value& r) const {
+ const TString& s = r.items()[columnIdx].bytes_value();
+ if (!TDuration::TryParse(s, result)) {
+ ALS_WARN(0) << "cannot parse duration for tiering: " << s;
+ return false;
+ }
+ return true;
+}
+
+bool TDecoderBase::ReadDebugProto(const ui32 columnIdx, ::google::protobuf::Message& result, const Ydb::Value& r) const {
+ const TString& s = r.items()[columnIdx].bytes_value();
+ if (!::google::protobuf::TextFormat::ParseFromString(s, &result)) {
+ ALS_ERROR(0) << "cannot parse proto string: " << s;
+ return false;
+ }
+ return true;
+}
+
+}
diff --git a/ydb/core/tx/tiering/decoder.h b/ydb/core/tx/tiering/decoder.h
new file mode 100644
index 00000000000..6866feedebb
--- /dev/null
+++ b/ydb/core/tx/tiering/decoder.h
@@ -0,0 +1,16 @@
+#pragma once
+#include <ydb/public/api/protos/ydb_value.pb.h>
+#include <util/datetime/base.h>
+
+namespace NKikimr::NInternal {
+
+class TDecoderBase {
+protected:
+ i32 GetFieldIndex(const Ydb::ResultSet& rawData, const TString& columnId, const bool verify = true) const;
+public:
+ bool Read(const ui32 columnIdx, TString& result, const Ydb::Value& r) const;
+ bool ReadDebugProto(const ui32 columnIdx, ::google::protobuf::Message& result, const Ydb::Value& r) const;
+ bool Read(const ui32 columnIdx, TDuration& result, const Ydb::Value& r) const;
+};
+
+}
diff --git a/ydb/core/tx/tiering/external_data.cpp b/ydb/core/tx/tiering/external_data.cpp
new file mode 100644
index 00000000000..ebabf12bcb3
--- /dev/null
+++ b/ydb/core/tx/tiering/external_data.cpp
@@ -0,0 +1,240 @@
+#include "external_data.h"
+
+#include <ydb/core/base/path.h>
+
+#include <library/cpp/json/writer/json_value.h>
+#include <library/cpp/protobuf/json/proto2json.h>
+
+#include <util/string/join.h>
+
+namespace NKikimr::NColumnShard::NTiers {
+
+bool TConfigsSnapshot::DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawDataResult) {
+ Y_VERIFY(rawDataResult.result_sets().size() == 2);
+ {
+ auto& rawData = rawDataResult.result_sets()[0];
+ TTierConfig::TDecoder decoder(rawData);
+ for (auto&& r : rawData.rows()) {
+ TTierConfig config;
+ if (!config.DeserializeFromRecord(decoder, r)) {
+ ALS_ERROR(NKikimrServices::TX_COLUMNSHARD) << "cannot parse tier config from snapshot";
+ continue;
+ }
+ TierConfigs.emplace(config.GetConfigId(), config);
+ }
+ }
+ {
+ auto& rawData = rawDataResult.result_sets()[1];
+ TTieringRule::TDecoder decoder(rawData);
+ TVector<TTieringRule> rulesLocal;
+ rulesLocal.reserve(rawData.rows().size());
+ for (auto&& r : rawData.rows()) {
+ TTieringRule tr;
+ if (!tr.DeserializeFromRecord(decoder, r)) {
+ ALS_WARN(NKikimrServices::TX_COLUMNSHARD) << "cannot parse record for tiering info";
+ continue;
+ }
+ rulesLocal.emplace_back(std::move(tr));
+ }
+ std::sort(rulesLocal.begin(), rulesLocal.end());
+ for (auto&& i : rulesLocal) {
+ const TString tablePath = i.GetTablePath();
+ TableTierings[tablePath].AddRule(std::move(i));
+ }
+ }
+ return true;
+}
+
+std::optional<TTierConfig> TConfigsSnapshot::GetValue(const TString& key) const {
+ auto it = TierConfigs.find(key);
+ if (it == TierConfigs.end()) {
+ return {};
+ } else {
+ return it->second;
+ }
+}
+
+void TConfigsSnapshot::RemapTablePathToId(const TString& path, const ui64 pathId) {
+ auto it = TableTierings.find(path);
+ Y_VERIFY(it != TableTierings.end());
+ it->second.SetTablePathId(pathId);
+}
+
+const TTableTiering* TConfigsSnapshot::GetTableTiering(const TString& tablePath) const {
+ auto it = TableTierings.find(tablePath);
+ if (it == TableTierings.end()) {
+ return nullptr;
+ } else {
+ return &it->second;
+ }
+}
+
+TVector<NMetadataProvider::ITableModifier::TPtr> TSnapshotConstructor::DoGetTableSchema() const {
+ TVector<NMetadataProvider::ITableModifier::TPtr> result;
+ {
+ Ydb::Table::CreateTableRequest request;
+ request.set_session_id("");
+ request.set_path(Tables[0]);
+ request.add_primary_key("ownerPath");
+ request.add_primary_key("tierName");
+ {
+ auto& column = *request.add_columns();
+ column.set_name("tierName");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("ownerPath");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("tierConfig");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ result.emplace_back(new NMetadataProvider::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request));
+ }
+ {
+ Ydb::Table::CreateTableRequest request;
+ request.set_session_id("");
+ request.set_path(Tables[1]);
+ request.add_primary_key("ownerPath");
+ request.add_primary_key("tierName");
+ request.add_primary_key("tablePath");
+ {
+ auto& column = *request.add_columns();
+ column.set_name("tierName");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("ownerPath");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("tablePath");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("durationForEvict");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("column");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ result.emplace_back(new NMetadataProvider::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request));
+ }
+ return result;
+}
+
+NThreading::TFuture<NMetadataProvider::ISnapshot::TPtr> TSnapshotConstructor::EnrichSnapshotData(ISnapshot::TPtr original) const {
+ if (!Actor) {
+ return NThreading::MakeErrorFuture<NMetadataProvider::ISnapshot::TPtr>(
+ std::make_exception_ptr(std::runtime_error("actor finished for enrich process")));
+ }
+ auto current = std::dynamic_pointer_cast<TConfigsSnapshot>(original);
+ Y_VERIFY(current);
+ auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
+ request->DatabaseName = NKikimr::CanonizePath(AppData()->TenantName);
+ for (auto&& i : current->GetTableTierings()) {
+ auto it = TablesRemapper.find(i.second.GetTablePath());
+ if (it != TablesRemapper.end()) {
+ current->RemapTablePathToId(it->first, it->second);
+ } else {
+ auto& entry = request->ResultSet.emplace_back();
+ entry.Operation = TNavigate::OpTable;
+ entry.Path = NKikimr::SplitPath(i.second.GetTablePath());
+ }
+ }
+ if (request->ResultSet.empty()) {
+ return NThreading::MakeFuture(original);
+ } else {
+ WaitPromise = NThreading::NewPromise<ISnapshot::TPtr>();
+ WaitSnapshot = current;
+ Actor->ProvideEvent(new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release()), MakeSchemeCacheID());
+ return WaitPromise.GetFuture();
+ }
+}
+
+void TSnapshotConstructorAgent::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
+ Owner->ResolveInfo(ev->Get());
+}
+
+void TSnapshotConstructorAgent::ProvideEvent(IEventBase* event, const TActorId& recipient) {
+ Send(recipient, event);
+}
+
+TSnapshotConstructorAgent::~TSnapshotConstructorAgent() {
+ Owner->ActorStopped();
+}
+
+void TSnapshotConstructor::ResolveInfo(const TEvTxProxySchemeCache::TEvNavigateKeySetResult* info) {
+ const auto& request = info->Request;
+
+ if (request->ResultSet.empty()) {
+ WaitPromise.SetException("cannot resolve table path to ids");
+ WaitSnapshot = nullptr;
+ return;
+ }
+
+ for (auto&& i : request->ResultSet) {
+ const TString path = "/" + JoinSeq("/", i.Path);
+ switch (i.Status) {
+ case TNavigate::EStatus::Ok:
+ TablesRemapper.emplace(path, i.TableId.PathId.LocalPathId);
+ break;
+ case TNavigate::EStatus::RootUnknown:
+ case TNavigate::EStatus::PathErrorUnknown:
+ WaitPromise.SetException("path not exists " + path);
+ break;
+ case TNavigate::EStatus::LookupError:
+ case TNavigate::EStatus::RedirectLookupError:
+ WaitPromise.SetException("RESOLVE_LOOKUP_ERROR" + path);
+ break;
+ default:
+ WaitPromise.SetException("GENERIC_RESOLVE_ERROR" + path);
+ break;
+ }
+ }
+ if (WaitPromise.HasException()) {
+ WaitSnapshot = nullptr;
+ } else {
+ for (auto&& i : WaitSnapshot->GetTableTierings()) {
+ auto it = TablesRemapper.find(i.second.GetTablePath());
+ if (it != TablesRemapper.end()) {
+ WaitSnapshot->RemapTablePathToId(it->first, it->second);
+ } else {
+ WaitPromise.SetException("undecoded path " + i.second.GetTablePath());
+ WaitSnapshot = nullptr;
+ return;
+ }
+ }
+ WaitPromise.SetValue(WaitSnapshot);
+ WaitSnapshot = nullptr;
+ }
+}
+
+TSnapshotConstructor::TSnapshotConstructor() {
+ TablePath = "/" + AppData()->TenantName + "/.external_data";
+ Tables.emplace_back(TablePath + "/tiers");
+ Tables.emplace_back(TablePath + "/tiering");
+}
+
+TString NTiers::TConfigsSnapshot::DoSerializeToString() const {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ auto& jsonTiers = result.InsertValue("tiers", NJson::JSON_MAP);
+ for (auto&& i : TierConfigs) {
+ jsonTiers.InsertValue(i.first, i.second.GetDebugJson());
+ }
+ auto& jsonTiering = result.InsertValue("tiering", NJson::JSON_MAP);
+ for (auto&& i : TableTierings) {
+ jsonTiering.InsertValue(i.first, i.second.GetDebugJson());
+ }
+ return result.GetStringRobust();
+}
+
+}
diff --git a/ydb/core/tx/tiering/external_data.h b/ydb/core/tx/tiering/external_data.h
new file mode 100644
index 00000000000..bb03ed8c9f4
--- /dev/null
+++ b/ydb/core/tx/tiering/external_data.h
@@ -0,0 +1,105 @@
+#pragma once
+#include "rule.h"
+#include "tier_config.h"
+
+#include <ydb/core/protos/flat_scheme_op.pb.h>
+#include <ydb/core/tx/scheme_cache/scheme_cache.h>
+#include <ydb/services/metadata/service.h>
+
+#include <library/cpp/json/writer/json_value.h>
+
+namespace NKikimr::NColumnShard::NTiers {
+
+class TConfigsSnapshot: public NMetadataProvider::ISnapshot {
+private:
+ using TBase = NMetadataProvider::ISnapshot;
+ using TConfigsMap = TMap<TString, TTierConfig>;
+ YDB_ACCESSOR_DEF(TConfigsMap, TierConfigs);
+ using TTieringMap = TMap<TString, TTableTiering>;
+ YDB_ACCESSOR_DEF(TTieringMap, TableTierings);
+protected:
+ virtual bool DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawData) override;
+ virtual TString DoSerializeToString() const override;
+public:
+ const TTableTiering* GetTableTiering(const TString& tablePath) const;
+ void RemapTablePathToId(const TString& path, const ui64 pathId);
+ std::optional<TTierConfig> GetValue(const TString& key) const;
+ using TBase::TBase;
+};
+
+class TSnapshotConstructor;
+
+class TSnapshotConstructorAgent: public TActor<TSnapshotConstructorAgent> {
+private:
+ using TBase = TActor<TSnapshotConstructorAgent>;
+ std::shared_ptr<TSnapshotConstructor> Owner;
+private:
+ void Handle(NActors::TEvents::TEvPoison::TPtr& /*ev*/) {
+ PassAway();
+ }
+ void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
+public:
+ STATEFN(StateMain) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
+ hFunc(NActors::TEvents::TEvPoison, Handle);
+ default:
+ break;
+ }
+ }
+
+ TSnapshotConstructorAgent(std::shared_ptr<TSnapshotConstructor> owner)
+ : TBase(&TThis::StateMain)
+ , Owner(owner)
+ {
+
+ }
+
+ ~TSnapshotConstructorAgent();
+
+ void ProvideEvent(IEventBase* event, const TActorId& recipient);
+};
+
+class TSnapshotConstructor: public NMetadataProvider::TGenericSnapshotParser<TConfigsSnapshot> {
+private:
+ using TNavigate = NSchemeCache::TSchemeCacheNavigate;
+ using TBaseActor = TActor<TSnapshotConstructor>;
+ using ISnapshot = NMetadataProvider::ISnapshot;
+ TVector<TString> Tables;
+ TMap<TString, ui64> TablesRemapper;
+ TSnapshotConstructorAgent* Actor = nullptr;
+ TString TablePath;
+ mutable std::shared_ptr<TConfigsSnapshot> WaitSnapshot;
+ mutable NThreading::TPromise<NMetadataProvider::ISnapshot::TPtr> WaitPromise;
+protected:
+ virtual TVector<NMetadataProvider::ITableModifier::TPtr> DoGetTableSchema() const override;
+ virtual const TVector<TString>& DoGetTables() const override {
+ return Tables;
+ }
+public:
+ void ResolveInfo(const TEvTxProxySchemeCache::TEvNavigateKeySetResult* info);
+ virtual NThreading::TFuture<ISnapshot::TPtr> EnrichSnapshotData(ISnapshot::TPtr original) const override;
+
+ TSnapshotConstructor();
+
+ void Start(std::shared_ptr<TSnapshotConstructor> ownerPtr) {
+ Y_VERIFY(!Actor);
+ Actor = new TSnapshotConstructorAgent(ownerPtr);
+ TActivationContext::AsActorContext().RegisterWithSameMailbox(Actor);
+ }
+
+ void ActorStopped() {
+ Actor = nullptr;
+ }
+
+ void Stop() {
+ if (Actor && TlsActivationContext) {
+ TActivationContext::AsActorContext().Send(Actor->SelfId(), new NActors::TEvents::TEvPoison);
+ }
+ }
+
+ ~TSnapshotConstructor() {
+ }
+};
+
+}
diff --git a/ydb/core/tx/tiering/manager.cpp b/ydb/core/tx/tiering/manager.cpp
new file mode 100644
index 00000000000..0364ebc2aac
--- /dev/null
+++ b/ydb/core/tx/tiering/manager.cpp
@@ -0,0 +1,234 @@
+#include "manager.h"
+#include "external_data.h"
+#include "s3_actor.h"
+
+#include <ydb/core/tx/columnshard/columnshard_private_events.h>
+
+namespace NKikimr::NColumnShard {
+
+class TTiersManager::TActor: public TActorBootstrapped<TTiersManager::TActor> {
+private:
+ std::shared_ptr<TTiersManager> Owner;
+ TActorId GetExternalDataActorId() const {
+ return NMetadataProvider::MakeServiceId(SelfId().NodeId());
+ }
+public:
+ TActor(std::shared_ptr<TTiersManager> owner)
+ : Owner(owner) {
+
+ }
+ ~TActor() {
+ Owner->Stop();
+ }
+ STATEFN(StateMain) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(NMetadataProvider::TEvRefreshSubscriberData, Handle);
+ hFunc(NActors::TEvents::TEvPoison, Handle);
+ default:
+ break;
+ }
+ }
+ void Bootstrap() {
+ Become(&TThis::StateMain);
+ Send(GetExternalDataActorId(), new NMetadataProvider::TEvSubscribeExternal(Owner->GetExternalDataManipulation()));
+ }
+ void Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev) {
+ auto snapshot = ev->Get()->GetSnapshot();
+ Owner->TakeConfigs(snapshot);
+ }
+ void Handle(NActors::TEvents::TEvPoison::TPtr& /*ev*/) {
+ Send(GetExternalDataActorId(), new NMetadataProvider::TEvUnsubscribeExternal(Owner->GetExternalDataManipulation()));
+ PassAway();
+ }
+};
+
+namespace NTiers {
+
+TManager& TManager::Restart(const TTierConfig& config, const bool activity) {
+ if (Config.IsSame(config)) {
+ return *this;
+ }
+ if (Config.NeedExport()) {
+ Stop();
+ }
+ Config = config;
+ if (activity) {
+ Start();
+ }
+ return *this;
+}
+
+bool TManager::Stop() {
+ if (!StorageActorId) {
+ return true;
+ }
+ if (TlsActivationContext) {
+ TActivationContext::AsActorContext().Send(StorageActorId, new TEvents::TEvPoisonPill());
+ }
+ StorageActorId = {};
+ return true;
+}
+
+bool TManager::Start() {
+ if (!Config.NeedExport()) {
+ return true;
+ }
+ if (!!StorageActorId) {
+ return true;
+ }
+#ifndef KIKIMR_DISABLE_S3_OPS
+ auto& ctx = TActivationContext::AsActorContext();
+ const NActors::TActorId newActor = ctx.Register(
+ CreateS3Actor(TabletId, ctx.SelfID, Config.GetTierName())
+ );
+ ctx.Send(newActor, new TEvPrivate::TEvS3Settings(Config.GetProtoConfig().GetObjectStorage()));
+ Stop();
+ StorageActorId = newActor;
+#endif
+ return true;
+}
+
+TManager::TManager(const ui64 tabletId, const TTierConfig& config)
+ : TabletId(tabletId)
+ , Config(config) {
+}
+
+NOlap::TStorageTier TManager::BuildTierStorage() const {
+ NOlap::TStorageTier result;
+ result.Name = Config.GetTierName();
+ if (Config.GetProtoConfig().HasCompression()) {
+ result.Compression = ConvertCompression(Config.GetProtoConfig().GetCompression());
+ }
+ return result;
+}
+
+NKikimr::NOlap::TCompression TManager::ConvertCompression(const NKikimrSchemeOp::TCompressionOptions& compression) {
+ NOlap::TCompression out;
+ if (compression.HasCompressionCodec()) {
+ switch (compression.GetCompressionCodec()) {
+ case NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain:
+ out.Codec = arrow::Compression::UNCOMPRESSED;
+ break;
+ case NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4:
+ out.Codec = arrow::Compression::LZ4_FRAME;
+ break;
+ case NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD:
+ out.Codec = arrow::Compression::ZSTD;
+ break;
+ }
+ }
+
+ if (compression.HasCompressionLevel()) {
+ out.Level = compression.GetCompressionLevel();
+ }
+ return out;
+}
+}
+
+void TTiersManager::TakeConfigs(NMetadataProvider::ISnapshot::TPtr snapshotExt) {
+ auto snapshotPtr = std::dynamic_pointer_cast<NTiers::TConfigsSnapshot>(snapshotExt);
+ Y_VERIFY(snapshotPtr);
+ Snapshot = snapshotExt;
+ auto& snapshot = *snapshotPtr;
+ for (auto itSelf = Managers.begin(); itSelf != Managers.end(); ) {
+ auto it = snapshot.GetTierConfigs().find(OwnerPath + "." + itSelf->first);
+ if (it == snapshot.GetTierConfigs().end()) {
+ itSelf->second.Stop();
+ itSelf = Managers.erase(itSelf);
+ } else {
+ itSelf->second.Restart(it->second, IsActive());
+ ++itSelf;
+ }
+ }
+ for (auto&& i : snapshot.GetTierConfigs()) {
+ if (i.second.GetOwnerPath() != OwnerPath && !!OwnerPath) {
+ continue;
+ }
+ if (Managers.contains(i.second.GetTierName())) {
+ continue;
+ }
+ NTiers::TManager localManager(TabletId, i.second);
+ auto& manager = Managers.emplace(i.second.GetTierName(), std::move(localManager)).first->second;
+ if (IsActive()) {
+ manager.Start();
+ }
+ }
+}
+
+TActorId TTiersManager::GetStorageActorId(const TString& tierName) {
+ auto it = Managers.find(tierName);
+ if (it == Managers.end()) {
+ ALS_ERROR(NKikimrServices::TX_COLUMNSHARD) << "No S3 actor for tier '" << tierName << "' at tablet " << TabletId;
+ return {};
+ }
+ auto actorId = it->second.GetStorageActorId();
+ if (!actorId) {
+ ALS_ERROR(NKikimrServices::TX_COLUMNSHARD) << "Not started storage actor for tier '" << tierName << "' at tablet " << TabletId;
+ return {};
+ }
+ return actorId;
+}
+
+TTiersManager& TTiersManager::Start(std::shared_ptr<TTiersManager> ownerPtr) {
+ if (ActiveFlag) {
+ return *this;
+ }
+ ActiveFlag = true;
+ Y_VERIFY(!Actor);
+ Actor = new TTiersManager::TActor(ownerPtr);
+ TActivationContext::AsActorContext().RegisterWithSameMailbox(Actor);
+ for (auto&& i : Managers) {
+ i.second.Start();
+ }
+ return *this;
+}
+
+TTiersManager& TTiersManager::Stop() {
+ if (!ActiveFlag) {
+ return *this;
+ }
+ ActiveFlag = false;
+ Y_VERIFY(!!Actor);
+ if (TlsActivationContext) {
+ TActivationContext::AsActorContext().Send(Actor->SelfId(), new NActors::TEvents::TEvPoison);
+ }
+ Actor = nullptr;
+ for (auto&& i : Managers) {
+ i.second.Stop();
+ }
+ return *this;
+}
+
+const NKikimr::NColumnShard::NTiers::TManager& TTiersManager::GetManagerVerified(const TString& tierId) const {
+ auto it = Managers.find(tierId);
+ Y_VERIFY(it != Managers.end());
+ return it->second;
+}
+
+NMetadataProvider::ISnapshotParser::TPtr TTiersManager::GetExternalDataManipulation() const {
+ if (!ExternalDataManipulation) {
+ ExternalDataManipulation = std::make_shared<NTiers::TSnapshotConstructor>();
+ auto edmPtr = std::dynamic_pointer_cast<NTiers::TSnapshotConstructor>(ExternalDataManipulation);
+ edmPtr->Start(edmPtr);
+ }
+ return ExternalDataManipulation;
+}
+
+THashMap<ui64, NKikimr::NOlap::TTiersInfo> TTiersManager::GetTiering() const {
+ THashMap<ui64, NKikimr::NOlap::TTiersInfo> result;
+ auto snapshotPtr = std::dynamic_pointer_cast<NTiers::TConfigsSnapshot>(Snapshot);
+ Y_VERIFY(snapshotPtr);
+ for (auto&& i : snapshotPtr->GetTableTierings()) {
+ result.emplace(i.second.GetTablePathId(), i.second.BuildTiersInfo());
+ }
+ return result;
+}
+
+TTiersManager::~TTiersManager() {
+ auto cs = std::dynamic_pointer_cast<NTiers::TSnapshotConstructor>(ExternalDataManipulation);
+ if (!!cs) {
+ cs->Stop();
+ }
+}
+
+}
diff --git a/ydb/core/tx/tiering/manager.h b/ydb/core/tx/tiering/manager.h
new file mode 100644
index 00000000000..4313aefce42
--- /dev/null
+++ b/ydb/core/tx/tiering/manager.h
@@ -0,0 +1,69 @@
+#pragma once
+#include "external_data.h"
+#include "tier_config.h"
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/actor.h>
+#include <ydb/services/metadata/service.h>
+#include <ydb/library/accessor/accessor.h>
+
+namespace NKikimr::NColumnShard {
+namespace NTiers {
+
+class TManager {
+private:
+ ui64 TabletId = 0;
+ YDB_READONLY_DEF(TTierConfig, Config);
+ YDB_READONLY_DEF(NActors::TActorId, StorageActorId);
+public:
+ TManager(const ui64 tabletId, const TTierConfig& config);
+ static NOlap::TCompression ConvertCompression(const NKikimrSchemeOp::TCompressionOptions& compression);
+ NOlap::TStorageTier BuildTierStorage() const;
+
+ TManager& Restart(const TTierConfig& config, const bool activity);
+ bool NeedExport() const {
+ return Config.NeedExport();
+ }
+ bool Stop();
+ bool Start();
+};
+}
+
+class TTiersManager {
+private:
+ class TActor;
+ using TManagers = TMap<TString, NTiers::TManager>;
+ ui64 TabletId = 0;
+ TString OwnerPath;
+ TActor* Actor = nullptr;
+ YDB_READONLY_DEF(TManagers, Managers);
+ YDB_READONLY_FLAG(Active, false);
+
+ NMetadataProvider::ISnapshot::TPtr Snapshot;
+ mutable NMetadataProvider::ISnapshotParser::TPtr ExternalDataManipulation;
+
+public:
+ TTiersManager(const ui64 tabletId, const TString& ownerPath)
+ : TabletId(tabletId)
+ , OwnerPath(ownerPath)
+ {
+ }
+ ~TTiersManager();
+ THashMap<ui64, NOlap::TTiersInfo> GetTiering() const;
+ void TakeConfigs(NMetadataProvider::ISnapshot::TPtr snapshot);
+ TTiersManager& Start(std::shared_ptr<TTiersManager> ownerPtr);
+ TTiersManager& Stop();
+ TActorId GetStorageActorId(const TString& tierName);
+ const NTiers::TManager& GetManagerVerified(const TString& tierId) const;
+ NMetadataProvider::ISnapshotParser::TPtr GetExternalDataManipulation() const;
+
+ TManagers::const_iterator begin() const {
+ return Managers.begin();
+ }
+
+ TManagers::const_iterator end() const {
+ return Managers.end();
+ }
+};
+
+}
diff --git a/ydb/core/tx/tiering/rule.cpp b/ydb/core/tx/tiering/rule.cpp
new file mode 100644
index 00000000000..617c665a346
--- /dev/null
+++ b/ydb/core/tx/tiering/rule.cpp
@@ -0,0 +1,26 @@
+#include "rule.h"
+
+namespace NKikimr::NColumnShard::NTiers {
+NJson::TJsonValue TTieringRule::GetDebugJson() const {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ result.InsertValue(TDecoder::OwnerPath, OwnerPath);
+ result.InsertValue(TDecoder::TierName, TierName);
+ result.InsertValue(TDecoder::TablePath, TablePath);
+ result.InsertValue("tablePathId", TablePathId);
+ result.InsertValue(TDecoder::Column, Column);
+ result.InsertValue(TDecoder::DurationForEvict, DurationForEvict.ToString());
+ return result;
+}
+
+NJson::TJsonValue TTableTiering::GetDebugJson() const {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ result.InsertValue(TTieringRule::TDecoder::TablePath, TablePath);
+ result.InsertValue(TTieringRule::TDecoder::Column, Column);
+ auto&& jsonRules = result.InsertValue("rules", NJson::JSON_ARRAY);
+ for (auto&& i : Rules) {
+ jsonRules.AppendValue(i.GetDebugJson());
+ }
+ return result;
+}
+
+}
diff --git a/ydb/core/tx/tiering/rule.h b/ydb/core/tx/tiering/rule.h
new file mode 100644
index 00000000000..4481f500925
--- /dev/null
+++ b/ydb/core/tx/tiering/rule.h
@@ -0,0 +1,113 @@
+#pragma once
+#include "decoder.h"
+
+#include <ydb/core/protos/flat_scheme_op.pb.h>
+#include <ydb/core/tx/columnshard/engines/column_engine.h>
+#include <ydb/services/metadata/service.h>
+
+#include <library/cpp/json/writer/json_value.h>
+
+namespace NKikimr::NColumnShard::NTiers {
+
+class TTieringRule {
+private:
+ YDB_ACCESSOR_DEF(TString, OwnerPath);
+ YDB_ACCESSOR_DEF(TString, TierName);
+ YDB_ACCESSOR_DEF(TString, TablePath);
+ YDB_ACCESSOR(ui64, TablePathId, 0);
+ YDB_ACCESSOR_DEF(TString, Column);
+ YDB_ACCESSOR_DEF(TDuration, DurationForEvict);
+public:
+ bool operator<(const TTieringRule& item) const {
+ return std::tie(OwnerPath, TierName, TablePath, Column, DurationForEvict)
+ < std::tie(item.OwnerPath, item.TierName, item.TablePath, item.Column, item.DurationForEvict);
+ }
+
+ NJson::TJsonValue GetDebugJson() const;
+ TString GetTierId() const {
+ return OwnerPath + "." + TierName;
+ }
+ class TDecoder: public NInternal::TDecoderBase {
+ private:
+ YDB_READONLY(i32, OwnerPathIdx, -1);
+ YDB_READONLY(i32, TablePathIdx, -1);
+ YDB_READONLY(i32, DurationForEvictIdx, -1);
+ YDB_READONLY(i32, TierNameIdx, -1);
+ YDB_READONLY(i32, ColumnIdx, -1);
+ public:
+ static inline const TString OwnerPath = "ownerPath";
+ static inline const TString TierName = "tierName";
+ static inline const TString TablePath = "tablePath";
+ static inline const TString DurationForEvict = "durationForEvict";
+ static inline const TString Column = "column";
+
+ TDecoder(const Ydb::ResultSet& rawData) {
+ OwnerPathIdx = GetFieldIndex(rawData, OwnerPath);
+ TierNameIdx = GetFieldIndex(rawData, TierName);
+ TablePathIdx = GetFieldIndex(rawData, TablePath);
+ DurationForEvictIdx = GetFieldIndex(rawData, DurationForEvict);
+ ColumnIdx = GetFieldIndex(rawData, Column);
+ }
+ };
+ bool DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& r) {
+ if (!decoder.Read(decoder.GetOwnerPathIdx(), OwnerPath, r)) {
+ return false;
+ }
+ if (!decoder.Read(decoder.GetTierNameIdx(), TierName, r)) {
+ return false;
+ }
+ if (!decoder.Read(decoder.GetTablePathIdx(), TablePath, r)) {
+ return false;
+ }
+ TablePath = TFsPath(TablePath).Fix().GetPath();
+ if (!decoder.Read(decoder.GetDurationForEvictIdx(), DurationForEvict, r)) {
+ return false;
+ }
+ if (!decoder.Read(decoder.GetColumnIdx(), Column, r)) {
+ return false;
+ }
+ return true;
+ }
+};
+
+class TTableTiering {
+private:
+ YDB_READONLY_DEF(TString, TablePath);
+ YDB_READONLY(ui64, TablePathId, 0);
+ YDB_READONLY_DEF(TString, Column);
+ YDB_READONLY_DEF(TVector<TTieringRule>, Rules);
+public:
+ void SetTablePathId(const ui64 pathId) {
+ Y_VERIFY(!TablePathId);
+ TablePathId = pathId;
+ for (auto&& r : Rules) {
+ r.SetTablePathId(pathId);
+ }
+ }
+ NJson::TJsonValue GetDebugJson() const;
+ void AddRule(TTieringRule&& tr) {
+ if (Rules.size()) {
+ Y_VERIFY(Rules.back().GetDurationForEvict() <= tr.GetDurationForEvict());
+ if (Column != tr.GetColumn()) {
+ ALS_ERROR(NKikimrServices::TX_COLUMNSHARD) << "inconsistency rule column: " <<
+ TablePath << "/" << Column << " != " << tr.GetColumn();
+ return;
+ }
+ } else {
+ Column = tr.GetColumn();
+ TablePath = tr.GetTablePath();
+ TablePathId = tr.GetTablePathId();
+ }
+ Rules.emplace_back(std::move(tr));
+ }
+
+ NOlap::TTiersInfo BuildTiersInfo() const {
+ NOlap::TTiersInfo result(GetColumn());
+ for (auto&& r : Rules) {
+ result.AddTier(r.GetTierName(), Now() - r.GetDurationForEvict());
+ }
+ return result;
+ }
+};
+
+}
diff --git a/ydb/core/tx/columnshard/s3_actor.cpp b/ydb/core/tx/tiering/s3_actor.cpp
index e75945e8044..f0fc7d80973 100644
--- a/ydb/core/tx/columnshard/s3_actor.cpp
+++ b/ydb/core/tx/tiering/s3_actor.cpp
@@ -1,7 +1,9 @@
#ifndef KIKIMR_DISABLE_S3_OPS
-#include "defs.h"
-#include "columnshard_impl.h"
+#include <ydb/core/tx/columnshard/blob.h>
+#include <ydb/core/tx/columnshard/columnshard.h>
+#include <ydb/core/tx/columnshard/columnshard_private_events.h>
+#include <ydb/core/tx/columnshard/defs.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/core/wrappers/s3_wrapper.h>
diff --git a/ydb/core/tx/tiering/s3_actor.h b/ydb/core/tx/tiering/s3_actor.h
new file mode 100644
index 00000000000..74e427d6345
--- /dev/null
+++ b/ydb/core/tx/tiering/s3_actor.h
@@ -0,0 +1,12 @@
+#pragma once
+#ifndef KIKIMR_DISABLE_S3_OPS
+#include <library/cpp/actors/core/actorid.h>
+#include <library/cpp/actors/core/actor.h>
+
+namespace NKikimr::NColumnShard {
+
+IActor* CreateS3Actor(ui64 tabletId, const TActorId& parent, const TString& tierName);
+
+}
+
+#endif
diff --git a/ydb/core/tx/tiering/tier_config.cpp b/ydb/core/tx/tiering/tier_config.cpp
new file mode 100644
index 00000000000..19e5af26bbc
--- /dev/null
+++ b/ydb/core/tx/tiering/tier_config.cpp
@@ -0,0 +1,32 @@
+#include "tier_config.h"
+#include <library/cpp/json/writer/json_value.h>
+#include <library/cpp/protobuf/json/proto2json.h>
+
+namespace NKikimr::NColumnShard::NTiers {
+
+NJson::TJsonValue TTierConfig::GetDebugJson() const {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ result.InsertValue(TDecoder::OwnerPath, OwnerPath);
+ result.InsertValue(TDecoder::TierName, TierName);
+ NProtobufJson::Proto2Json(ProtoConfig, result.InsertValue(TDecoder::TierConfig, NJson::JSON_MAP));
+ return result;
+}
+
+bool TTierConfig::IsSame(const TTierConfig& item) const {
+ return TierName == item.TierName && ProtoConfig.SerializeAsString() == item.ProtoConfig.SerializeAsString();
+}
+
+bool TTierConfig::DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& r) {
+ if (!decoder.Read(decoder.GetOwnerPathIdx(), OwnerPath, r)) {
+ return false;
+ }
+ if (!decoder.Read(decoder.GetTierNameIdx(), TierName, r)) {
+ return false;
+ }
+ if (!decoder.ReadDebugProto(decoder.GetTierConfigIdx(), ProtoConfig, r)) {
+ return false;
+ }
+ return true;
+}
+
+}
diff --git a/ydb/core/tx/tiering/tier_config.h b/ydb/core/tx/tiering/tier_config.h
new file mode 100644
index 00000000000..78175ff980a
--- /dev/null
+++ b/ydb/core/tx/tiering/tier_config.h
@@ -0,0 +1,55 @@
+#pragma once
+#include "decoder.h"
+#include <ydb/services/metadata/service.h>
+#include <ydb/core/protos/flat_scheme_op.pb.h>
+
+#include <library/cpp/json/writer/json_value.h>
+
+namespace NKikimr::NColumnShard::NTiers {
+
+class TTierConfig {
+private:
+ using TTierProto = NKikimrSchemeOp::TStorageTierConfig;
+ using TS3SettingsProto = NKikimrSchemeOp::TS3Settings;
+ YDB_ACCESSOR_DEF(TString, OwnerPath);
+ YDB_ACCESSOR_DEF(TString, TierName);
+ YDB_ACCESSOR_DEF(TTierProto, ProtoConfig);
+
+public:
+ TTierConfig() = default;
+ TTierConfig(const TString& ownerPath, const TString& tierName)
+ : OwnerPath(ownerPath)
+ , TierName(tierName)
+ {
+
+ }
+
+ class TDecoder: public NInternal::TDecoderBase {
+ private:
+ YDB_READONLY(i32, OwnerPathIdx, -1);
+ YDB_READONLY(i32, TierNameIdx, -1);
+ YDB_READONLY(i32, TierConfigIdx, -1);
+ public:
+ static inline const TString OwnerPath = "ownerPath";
+ static inline const TString TierName = "tierName";
+ static inline const TString TierConfig = "tierConfig";
+ TDecoder(const Ydb::ResultSet& rawData) {
+ OwnerPathIdx = GetFieldIndex(rawData, OwnerPath);
+ TierNameIdx = GetFieldIndex(rawData, TierName);
+ TierConfigIdx = GetFieldIndex(rawData, TierConfig);
+ }
+ };
+ bool DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& r);
+
+ TString GetConfigId() const {
+ return OwnerPath + "." + TierName;
+ }
+
+ bool NeedExport() const {
+ return ProtoConfig.HasObjectStorage();
+ }
+ bool IsSame(const TTierConfig& item) const;
+ NJson::TJsonValue GetDebugJson() const;
+};
+
+}
diff --git a/ydb/public/api/protos/draft/ydb_logstore.proto b/ydb/public/api/protos/draft/ydb_logstore.proto
index 0f42cc5a9aa..db17daad6e7 100644
--- a/ydb/public/api/protos/draft/ydb_logstore.proto
+++ b/ydb/public/api/protos/draft/ydb_logstore.proto
@@ -69,7 +69,7 @@ message CreateLogStoreRequest {
string path = 2; // Full path
uint32 shards_count = 3;
repeated SchemaPreset schema_presets = 4;
- repeated TierConfig tiers = 5;
+ bool enable_tiering = 6;
}
message CreateLogStoreResponse {
diff --git a/ydb/public/lib/experimental/ydb_logstore.cpp b/ydb/public/lib/experimental/ydb_logstore.cpp
index c8b2189da6d..c8c642fe7c5 100644
--- a/ydb/public/lib/experimental/ydb_logstore.cpp
+++ b/ydb/public/lib/experimental/ydb_logstore.cpp
@@ -104,11 +104,9 @@ void TSchema::SerializeTo(Ydb::LogStore::Schema& schema) const {
DefaultCompression.SerializeTo(*schema.mutable_default_compression());
}
-TLogStoreDescription::TLogStoreDescription(ui32 shardsCount, const THashMap<TString, TSchema>& schemaPresets,
- const THashMap<TString, TTierConfig>& tierConfigs)
+TLogStoreDescription::TLogStoreDescription(ui32 shardsCount, const THashMap<TString, TSchema>& schemaPresets)
: ShardsCount(shardsCount)
, SchemaPresets(schemaPresets)
- , TierConfigs(tierConfigs)
{}
TLogStoreDescription::TLogStoreDescription(Ydb::LogStore::DescribeLogStoreResult&& desc,
@@ -123,13 +121,6 @@ TLogStoreDescription::TLogStoreDescription(Ydb::LogStore::DescribeLogStoreResult
}
PermissionToSchemeEntry(desc.self().permissions(), &Permissions);
PermissionToSchemeEntry(desc.self().effective_permissions(), &EffectivePermissions);
- for (const auto& tier : desc.tiers()) {
- TTierConfig cfg;
- if (tier.has_compression()) {
- cfg.Compression = CompressionFromProto(tier.compression());
- }
- TierConfigs.emplace(tier.name(), std::move(cfg));
- }
}
void TLogStoreDescription::SerializeTo(Ydb::LogStore::CreateLogStoreRequest& request) const {
@@ -139,11 +130,6 @@ void TLogStoreDescription::SerializeTo(Ydb::LogStore::CreateLogStoreRequest& req
presetSchema.SerializeTo(*pb.mutable_schema());
}
request.set_shards_count(ShardsCount);
- for (const auto& [tierName, tierCfg] : TierConfigs) {
- auto& pb = *request.add_tiers();
- pb.set_name(tierName);
- tierCfg.Compression.SerializeTo(*pb.mutable_compression());
- }
}
TDescribeLogStoreResult::TDescribeLogStoreResult(TStatus&& status, Ydb::LogStore::DescribeLogStoreResult&& desc,
@@ -180,13 +166,6 @@ TLogTableDescription::TLogTableDescription(const TSchema& schema, const TLogTabl
, TtlSettings(ttlSettings)
{}
-TLogTableDescription::TLogTableDescription(const TString& schemaPresetName, const TLogTableSharding& sharding,
- const THashMap<TString, TTier>& tiers)
- : SchemaPresetName(schemaPresetName)
- , Sharding(sharding)
- , Tiers(tiers)
-{}
-
TLogTableDescription::TLogTableDescription(Ydb::LogStore::DescribeLogTableResult&& desc,
const TDescribeLogTableSettings& describeSettings)
: Schema(desc.schema())
diff --git a/ydb/public/lib/experimental/ydb_logstore.h b/ydb/public/lib/experimental/ydb_logstore.h
index 2e04dfb5fca..11431673c99 100644
--- a/ydb/public/lib/experimental/ydb_logstore.h
+++ b/ydb/public/lib/experimental/ydb_logstore.h
@@ -39,14 +39,6 @@ struct TCompression {
void SerializeTo(Ydb::LogStore::Compression& compression) const;
};
-struct TTierConfig {
- TCompression Compression;
-};
-
-struct TTier {
- TTtlSettings Eviction;
-};
-
struct TCreateLogStoreSettings : public TOperationRequestSettings<TCreateLogStoreSettings> {
using TSelf = TCreateLogStoreSettings;
};
@@ -117,8 +109,7 @@ private:
class TLogStoreDescription {
public:
- TLogStoreDescription(ui32 shardsCount, const THashMap<TString, TSchema>& schemaPresets,
- const THashMap<TString, TTierConfig>& tierConfigs = {});
+ TLogStoreDescription(ui32 shardsCount, const THashMap<TString, TSchema>& schemaPresets);
TLogStoreDescription(Ydb::LogStore::DescribeLogStoreResult&& desc, const TDescribeLogStoreSettings& describeSettings);
void SerializeTo(Ydb::LogStore::CreateLogStoreRequest& request) const;
const THashMap<TString, TSchema>& GetSchemaPresets() const {
@@ -136,17 +127,12 @@ public:
const TVector<NScheme::TPermissions>& GetEffectivePermissions() const {
return EffectivePermissions;
}
- const THashMap<TString, TTierConfig>& GetTierConfigs() const {
- return TierConfigs;
- }
-
private:
ui32 ShardsCount;
THashMap<TString, TSchema> SchemaPresets;
TString Owner;
TVector<NScheme::TPermissions> Permissions;
TVector<NScheme::TPermissions> EffectivePermissions;
- THashMap<TString, TTierConfig> TierConfigs;
};
struct TLogTableSharding {
@@ -171,8 +157,6 @@ public:
const TMaybe<TTtlSettings>& ttlSettings = {});
TLogTableDescription(const TSchema& schema, const TLogTableSharding& sharding,
const TMaybe<TTtlSettings>& ttlSettings = {});
- TLogTableDescription(const TString& schemaPresetName, const TLogTableSharding& sharding,
- const THashMap<TString, TTier>& tiers);
TLogTableDescription(Ydb::LogStore::DescribeLogTableResult&& desc, const TDescribeLogTableSettings& describeSettings);
void SerializeTo(Ydb::LogStore::CreateLogTableRequest& request) const;
const TSchema& GetSchema() const {
@@ -204,7 +188,6 @@ private:
const TSchema Schema;
const TLogTableSharding Sharding;
const TMaybe<TTtlSettings> TtlSettings;
- THashMap<TString, TTier> Tiers;
TString Owner;
TVector<NScheme::TPermissions> Permissions;
TVector<NScheme::TPermissions> EffectivePermissions;
diff --git a/ydb/services/metadata/abstract/common.cpp b/ydb/services/metadata/abstract/common.cpp
index eddddb14b11..7662c8de1c1 100644
--- a/ydb/services/metadata/abstract/common.cpp
+++ b/ydb/services/metadata/abstract/common.cpp
@@ -1,16 +1,22 @@
#include "common.h"
+#include <util/string/join.h>
namespace NKikimr::NMetadataProvider {
-i32 ISnapshot::GetFieldIndex(const Ydb::ResultSet& rawData, const TString& columnId) const {
- i32 idx = 0;
- for (auto&& i : rawData.columns()) {
- if (i.name() == columnId) {
- return idx;
- }
- ++idx;
+TString ISnapshotParser::GetSnapshotId() const {
+ if (!SnapshotId) {
+ SnapshotId = JoinSeq(",", GetTables());
}
- return -1;
+ return *SnapshotId;
+}
+
+ISnapshot::TPtr ISnapshotParser::ParseSnapshot(const Ydb::Table::ExecuteQueryResult& rawData, const TInstant actuality) const {
+ ISnapshot::TPtr result = CreateSnapshot(actuality);
+ Y_VERIFY(result);
+ if (!result->DeserializeFromResultSet(rawData)) {
+ return nullptr;
+ }
+ return result;
}
}
diff --git a/ydb/services/metadata/abstract/common.h b/ydb/services/metadata/abstract/common.h
index 782d9695fbb..7da8f033171 100644
--- a/ydb/services/metadata/abstract/common.h
+++ b/ydb/services/metadata/abstract/common.h
@@ -15,6 +15,7 @@ namespace NKikimr::NMetadataProvider {
enum EEvSubscribe {
EvRefreshSubscriberData = EventSpaceBegin(TKikimrEvents::ES_METADATA_PROVIDER),
EvRefresh,
+ EvEnrichSnapshot,
EvSubscribeLocal,
EvUnsubscribeLocal,
EvSubscribeExternal,
@@ -28,9 +29,8 @@ class ISnapshot {
private:
YDB_READONLY_DEF(TInstant, Actuality);
protected:
- virtual bool DoDeserializeFromResultSet(const Ydb::ResultSet& rawData) = 0;
+ virtual bool DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawData) = 0;
virtual TString DoSerializeToString() const = 0;
- i32 GetFieldIndex(const Ydb::ResultSet& rawData, const TString& columnId) const;
public:
using TPtr = std::shared_ptr<ISnapshot>;
ISnapshot(const TInstant actuality)
@@ -38,7 +38,7 @@ public:
}
- bool DeserializeFromResultSet(const Ydb::ResultSet& rawData) {
+ bool DeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawData) {
return DoDeserializeFromResultSet(rawData);
}
@@ -53,25 +53,23 @@ class ISnapshotParser {
protected:
virtual ISnapshot::TPtr CreateSnapshot(const TInstant actuality) const = 0;
virtual TVector<ITableModifier::TPtr> DoGetTableSchema() const = 0;
- virtual const TString& DoGetTablePath() const = 0;
+ virtual const TVector<TString>& DoGetTables() const = 0;
+ mutable std::optional<TString> SnapshotId;
public:
using TPtr = std::shared_ptr<ISnapshotParser>;
- ISnapshot::TPtr ParseSnapshot(const Ydb::ResultSet& rawData, const TInstant actuality) const {
- ISnapshot::TPtr result = CreateSnapshot(actuality);
- Y_VERIFY(result);
- if (!result->DeserializeFromResultSet(rawData)) {
- return nullptr;
- }
- return result;
- }
-
+ TString GetSnapshotId() const;
+ ISnapshot::TPtr ParseSnapshot(const Ydb::Table::ExecuteQueryResult& rawData, const TInstant actuality) const;
TVector<ITableModifier::TPtr> GetTableSchema() const {
return DoGetTableSchema();
}
- const TString& GetTablePath() const {
- return DoGetTablePath();
+ virtual NThreading::TFuture<ISnapshot::TPtr> EnrichSnapshotData(ISnapshot::TPtr original) const {
+ return NThreading::MakeFuture(original);
+ }
+
+ const TVector<TString>& GetTables() const {
+ return DoGetTables();
}
virtual ~ISnapshotParser() = default;
diff --git a/ydb/services/metadata/ds_table/accessor_refresh.cpp b/ydb/services/metadata/ds_table/accessor_refresh.cpp
index 339b5916ee7..80ded24e461 100644
--- a/ydb/services/metadata/ds_table/accessor_refresh.cpp
+++ b/ydb/services/metadata/ds_table/accessor_refresh.cpp
@@ -9,14 +9,14 @@
namespace NKikimr::NMetadataProvider {
using namespace NInternal::NRequest;
-bool TDSAccessorRefresher::Handle(TEvRequestResult<TDialogSelect>::TPtr& ev) {
+void TDSAccessorRefresher::Handle(TEvRequestResult<TDialogSelect>::TPtr& ev) {
const TString startString = CurrentSelection.SerializeAsString();
auto currentFullReply = ev->Get()->GetResult();
Ydb::Table::ExecuteQueryResult qResult;
currentFullReply.operation().result().UnpackTo(&qResult);
- Y_VERIFY(qResult.result_sets().size() == 1);
- CurrentSelection = qResult.result_sets()[0];
+ Y_VERIFY((size_t)qResult.result_sets().size() == SnapshotConstructor->GetTables().size());
+ *CurrentSelection.mutable_result_sets() = std::move(*qResult.mutable_result_sets());
auto parsedSnapshot = SnapshotConstructor->ParseSnapshot(CurrentSelection, RequestedActuality);
if (!!parsedSnapshot) {
CurrentSnapshot = parsedSnapshot;
@@ -24,14 +24,30 @@ bool TDSAccessorRefresher::Handle(TEvRequestResult<TDialogSelect>::TPtr& ev) {
ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "cannot parse current snapshot";
}
- RequestedActuality = TInstant::Zero();
- Schedule(Config.GetRefreshPeriod(), new TEvRefresh());
if (!!parsedSnapshot && CurrentSelection.SerializeAsString() != startString) {
ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "New refresher data: " << CurrentSelection.DebugString();
+ NActors::TActorIdentity actorId = SelfId();
+ SnapshotConstructor->EnrichSnapshotData(parsedSnapshot).Subscribe(
+ [actorId](NThreading::TFuture<ISnapshot::TPtr> f) {
+ if (f.HasValue() && !f.HasException()) {
+ actorId.Send(actorId, new TEvEnrichSnapshotResult(f.GetValueSync()));
+ } else {
+ actorId.Send(actorId, new TEvEnrichSnapshotResult("cannot enrich snapshot"));
+ }
+ }
+ );
+ }
+}
+
+void TDSAccessorRefresher::Handle(TEvEnrichSnapshotResult::TPtr& ev) {
+ RequestedActuality = TInstant::Zero();
+ Schedule(Config.GetRefreshPeriod(), new TEvRefresh());
+ if (ev->Get()->IsSuccess()) {
+ CurrentSnapshot = ev->Get()->GetEnrichedSnapshot();
OnSnapshotModified();
- return true;
+ } else {
+ ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "enrich problem: " << ev->Get()->GetErrorText();
}
- return false;
}
void TDSAccessorRefresher::Handle(TEvRequestResult<TDialogCreateSession>::TPtr& ev) {
@@ -41,7 +57,14 @@ void TDSAccessorRefresher::Handle(TEvRequestResult<TDialogCreateSession>::TPtr&
const TString sessionId = session.session_id();
Y_VERIFY(sessionId);
Ydb::Table::ExecuteDataQueryRequest request;
- request.mutable_query()->set_yql_text("SELECT * FROM `" + EscapeC(SnapshotConstructor->GetTablePath()) + "`");
+ TStringBuilder sb;
+ auto& tables = SnapshotConstructor->GetTables();
+ Y_VERIFY(tables.size());
+ for (auto&& i : tables) {
+ sb << "SELECT * FROM `" + EscapeC(i) + "`;";
+ }
+
+ request.mutable_query()->set_yql_text(sb);
request.set_session_id(sessionId);
request.mutable_tx_control()->mutable_begin_tx()->mutable_snapshot_read_only();
diff --git a/ydb/services/metadata/ds_table/accessor_refresh.h b/ydb/services/metadata/ds_table/accessor_refresh.h
index ceb21233648..7d883893a83 100644
--- a/ydb/services/metadata/ds_table/accessor_refresh.h
+++ b/ydb/services/metadata/ds_table/accessor_refresh.h
@@ -2,6 +2,8 @@
#include <ydb/public/api/protos/ydb_value.pb.h>
#include <ydb/services/metadata/abstract/common.h>
#include <ydb/services/metadata/initializer/accessor_init.h>
+#include <ydb/services/metadata/request/request_actor.h>
+#include <library/cpp/actors/core/hfunc.h>
namespace NKikimr::NMetadataProvider {
@@ -11,12 +13,31 @@ class TEvRefresh: public NActors::TEventLocal<TEvRefresh, EEvSubscribe::EvRefres
public:
};
+class TEvEnrichSnapshotResult: public NActors::TEventLocal<TEvEnrichSnapshotResult, EEvSubscribe::EvEnrichSnapshot> {
+private:
+ YDB_READONLY_FLAG(Success, false);
+ YDB_READONLY_DEF(TString, ErrorText);
+ YDB_READONLY_DEF(ISnapshot::TPtr, EnrichedSnapshot);
+public:
+ TEvEnrichSnapshotResult(const TString& errorText)
+ : ErrorText(errorText) {
+
+ }
+
+ TEvEnrichSnapshotResult(ISnapshot::TPtr snapshot)
+ : SuccessFlag(true)
+ , EnrichedSnapshot(snapshot)
+ {
+
+ }
+};
+
class TDSAccessorRefresher: public TDSAccessorInitialized {
private:
using TBase = TDSAccessorInitialized;
ISnapshotParser::TPtr SnapshotConstructor;
YDB_READONLY_DEF(ISnapshot::TPtr, CurrentSnapshot);
- YDB_READONLY_DEF(Ydb::ResultSet, CurrentSelection);
+ YDB_READONLY_DEF(Ydb::Table::ExecuteQueryResult, CurrentSelection);
TInstant RequestedActuality = TInstant::Zero();
protected:
bool IsReady() const {
@@ -32,6 +53,7 @@ public:
hFunc(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogSelect>, Handle);
hFunc(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogCreateSession>, Handle);
hFunc(TEvRefresh, Handle);
+ hFunc(TEvEnrichSnapshotResult, Handle);
default:
TBase::StateMain(ev, ctx);
}
@@ -39,7 +61,8 @@ public:
TDSAccessorRefresher(const TConfig& config, ISnapshotParser::TPtr snapshotConstructor);
- bool Handle(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogSelect>::TPtr& ev);
+ void Handle(TEvEnrichSnapshotResult::TPtr& ev);
+ void Handle(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogSelect>::TPtr& ev);
void Handle(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogCreateSession>::TPtr& ev);
void Handle(TEvRefresh::TPtr& ev);
};
diff --git a/ydb/services/metadata/ds_table/service.cpp b/ydb/services/metadata/ds_table/service.cpp
index c083afe97e7..d75d258450b 100644
--- a/ydb/services/metadata/ds_table/service.cpp
+++ b/ydb/services/metadata/ds_table/service.cpp
@@ -15,16 +15,16 @@ IActor* CreateService(const TConfig& config) {
}
void TService::Handle(TEvSubscribeExternal::TPtr& ev) {
- auto it = Accessors.find(ev->Get()->GetSnapshotParser()->GetTablePath());
+ auto it = Accessors.find(ev->Get()->GetSnapshotParser()->GetSnapshotId());
if (it == Accessors.end()) {
THolder<TExternalData> actor = MakeHolder<TExternalData>(Config, ev->Get()->GetSnapshotParser());
- it = Accessors.emplace(ev->Get()->GetSnapshotParser()->GetTablePath(), Register(actor.Release())).first;
+ it = Accessors.emplace(ev->Get()->GetSnapshotParser()->GetSnapshotId(), Register(actor.Release())).first;
}
Send<TEvSubscribe>(it->second, ev->Sender);
}
void TService::Handle(TEvUnsubscribeExternal::TPtr& ev) {
- auto it = Accessors.find(ev->Get()->GetSnapshotParser()->GetTablePath());
+ auto it = Accessors.find(ev->Get()->GetSnapshotParser()->GetSnapshotId());
if (it != Accessors.end()) {
Send<TEvUnsubscribe>(it->second, ev->Sender);
}
diff --git a/ydb/services/ydb/ydb_logstore_ut.cpp b/ydb/services/ydb/ydb_logstore_ut.cpp
index 3e768dee95f..906fbbad88b 100644
--- a/ydb/services/ydb/ydb_logstore_ut.cpp
+++ b/ydb/services/ydb/ydb_logstore_ut.cpp
@@ -51,18 +51,6 @@ TVector<TString> TestSchemaKey() {
return {"timestamp", "resource_type", "resource_id", "uid"};
}
-THashMap<TString, NYdb::NLogStore::TTierConfig> TestTierConfigs() {
- using NYdb::NLogStore::TTierConfig;
- using NYdb::NLogStore::TCompression;
- using NYdb::NLogStore::EColumnCompression;
-
- THashMap<TString, TTierConfig> out;
- out.emplace("default", TTierConfig{ TCompression{ EColumnCompression::LZ4, {}} });
- out.emplace("tier_zstd1", TTierConfig{ TCompression{ EColumnCompression::ZSTD, 1} });
- out.emplace("tier_zstd5", TTierConfig{ TCompression{ EColumnCompression::ZSTD, 5} });
- return out;
-}
-
}
Y_UNIT_TEST_SUITE(YdbLogStore) {
@@ -139,56 +127,6 @@ Y_UNIT_TEST_SUITE(YdbLogStore) {
}
}
- Y_UNIT_TEST(LogStoreTiers) {
- NKikimrConfig::TAppConfig appConfig;
- TKikimrWithGrpcAndRootSchema server(appConfig);
- EnableDebugLogs(server);
-
- auto connection = ConnectToServer(server);
- NYdb::NLogStore::TLogStoreClient logStoreClient(connection);
-
- {
- NYdb::NLogStore::TSchema logSchema(TestSchemaColumns(), TestSchemaKey(),
- NYdb::NLogStore::TCompression{NYdb::NLogStore::EColumnCompression::ZSTD, 1});
- THashMap<TString, NYdb::NLogStore::TSchema> schemaPresets;
- schemaPresets["default"] = logSchema;
- NYdb::NLogStore::TLogStoreDescription storeDescr(4, schemaPresets, TestTierConfigs());
- auto res = logStoreClient.CreateLogStore("/Root/LogStore", std::move(storeDescr)).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString());
- }
-
- {
- auto res = logStoreClient.DescribeLogStore("/Root/LogStore").GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString());
- auto descr = res.GetDescription();
- UNIT_ASSERT_VALUES_EQUAL(descr.GetShardsCount(), 4);
- UNIT_ASSERT_VALUES_EQUAL(descr.GetSchemaPresets().size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(descr.GetSchemaPresets().count("default"), 1);
- UNIT_ASSERT_VALUES_EQUAL(descr.GetOwner(), "root@builtin");
-
- const auto& schema = descr.GetSchemaPresets().begin()->second;
- UNIT_ASSERT_VALUES_EQUAL(schema.GetColumns().size(), 10);
- UNIT_ASSERT_VALUES_EQUAL(schema.GetPrimaryKeyColumns().size(), 4);
- UNIT_ASSERT_EQUAL(schema.GetDefaultCompression().Codec, NYdb::NLogStore::EColumnCompression::ZSTD);
- UNIT_ASSERT_VALUES_EQUAL(schema.GetDefaultCompression().Level, 1);
-
- const auto& tiers = descr.GetTierConfigs();
- auto expectedTiers = TestTierConfigs();
- UNIT_ASSERT_VALUES_EQUAL(tiers.size(), expectedTiers.size());
-
- for (auto& [name, cfg] : expectedTiers) {
- UNIT_ASSERT_VALUES_EQUAL(tiers.count(name), 1);
- UNIT_ASSERT_EQUAL(tiers.find(name)->second.Compression.Codec, cfg.Compression.Codec);
- UNIT_ASSERT_VALUES_EQUAL(tiers.find(name)->second.Compression.Level, cfg.Compression.Level);
- }
- }
-
- {
- auto res = logStoreClient.DropLogStore("/Root/LogStore").GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString());
- }
- }
-
Y_UNIT_TEST(LogStoreNegative) {
NKikimrConfig::TAppConfig appConfig;
TKikimrWithGrpcAndRootSchema server(appConfig);