diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-10-28 19:16:53 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-10-28 19:16:53 +0300 |
commit | 7d8f9f654230732d2e18e98753e6f07ad3269fee (patch) | |
tree | 7791f5a3e9766c5d39f1da596c127578b44bcd4e | |
parent | 6bbe00187e61d0039d6f8dbc103a438594411983 (diff) | |
download | ydb-7d8f9f654230732d2e18e98753e6f07ad3269fee.tar.gz |
tiering throught metadata from datashard table
51 files changed, 1567 insertions, 768 deletions
diff --git a/ydb/core/grpc_services/rpc_log_store.cpp b/ydb/core/grpc_services/rpc_log_store.cpp index ea15e6a2f4b..f28c4dbd5d9 100644 --- a/ydb/core/grpc_services/rpc_log_store.cpp +++ b/ydb/core/grpc_services/rpc_log_store.cpp @@ -241,14 +241,7 @@ private: LOG_DEBUG(ctx, NKikimrServices::GRPC_SERVER, "LogStore schema error: %s", error.c_str()); return Reply(status, error, NKikimrIssues::TIssuesIds::DEFAULT_ERROR, ctx); } - for (const auto& tier : req->tiers()) { - auto* toTier = toSchemaPreset->MutableSchema()->AddStorageTiers(); - toTier->SetName(tier.name()); - if (!ConvertCompressionFromPublicToInternal(tier.compression(), *toTier->MutableCompression(), error)) { - LOG_DEBUG(ctx, NKikimrServices::GRPC_SERVER, "LogStore schema error: %s", error.c_str()); - return Reply(status, error, NKikimrIssues::TIssuesIds::DEFAULT_ERROR, ctx); - } - } + toSchemaPreset->MutableSchema()->SetEnableTiering(req->enable_tiering()); } ctx.Send(MakeTxProxyID(), proposeRequest.release()); } @@ -297,7 +290,6 @@ private: const auto& storeDescription = pathDescription.GetColumnStoreDescription(); describeLogStoreResult.set_shards_count(storeDescription.GetColumnShardCount()); - bool firstPreset = true; for (const auto& schemaPreset : storeDescription.GetSchemaPresets()) { auto* toSchemaPreset = describeLogStoreResult.add_schema_presets(); toSchemaPreset->set_name(schemaPreset.GetName()); @@ -307,15 +299,6 @@ private: LOG_DEBUG(ctx, NKikimrServices::GRPC_SERVER, "LogStore schema error: %s", error.c_str()); return Reply(status, error, NKikimrIssues::TIssuesIds::DEFAULT_ERROR, ctx); } - if (firstPreset) { - // Preset's tiers are the same. We take first ones. - firstPreset = false; - for (auto& tier : schemaPreset.GetSchema().GetStorageTiers()) { - auto* to = describeLogStoreResult.add_tiers(); - to->set_name(tier.GetName()); - ConvertCompressionFromInternalToPublic(tier.GetCompression(), *to->mutable_compression()); - } - } } return ReplyWithResult(Ydb::StatusIds::SUCCESS, describeLogStoreResult, ctx); } diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 6aae69d872b..f5e3384bab4 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -426,10 +426,9 @@ message TColumnTableSchema { //optional int32 DefaultCompressionLevel = 7; // deprecated, not used before replace optional TCompressionOptions DefaultCompression = 8; - // Storage tiers specification (not ordered list). - // You could use any combination of the tiers in TColumnDataLifeCycle.Tiering in any order. - // If storage tier's Compression is not set it would be derived from table's DefaultCompression. - repeated TStorageTierConfig StorageTiers = 9; + // Storage tiers usage flag. + // Tiers detail information contains in storage_path/parent/.external_info/tiers + optional bool EnableTiering = 10 [default = false]; } message TAlterColumnTableSchema { diff --git a/ydb/core/tx/CMakeLists.txt b/ydb/core/tx/CMakeLists.txt index 2044b8b5d62..6b91efc22a8 100644 --- a/ydb/core/tx/CMakeLists.txt +++ b/ydb/core/tx/CMakeLists.txt @@ -18,6 +18,7 @@ add_subdirectory(scheme_cache) add_subdirectory(schemeshard) add_subdirectory(sequenceproxy) add_subdirectory(sequenceshard) +add_subdirectory(tiering) add_subdirectory(time_cast) add_subdirectory(tx_allocator) add_subdirectory(tx_allocator_client) diff --git a/ydb/core/tx/columnshard/CMakeLists.txt b/ydb/core/tx/columnshard/CMakeLists.txt index f1c69be237d..ccf0af1ba82 100644 --- a/ydb/core/tx/columnshard/CMakeLists.txt +++ b/ydb/core/tx/columnshard/CMakeLists.txt @@ -18,7 +18,6 @@ target_link_libraries(core-tx-columnshard PUBLIC yutil tools-enum_parser-enum_serialization_runtime cpp-actors-core - ydb-services-metadata ydb-core-actorlib_impl ydb-core-base core-blobstorage-dsproxy @@ -29,9 +28,9 @@ target_link_libraries(core-tx-columnshard PUBLIC ydb-core-tablet ydb-core-tablet_flat tx-columnshard-engines + core-tx-tiering tx-long_tx_service-public ydb-core-util - ydb-core-wrappers api-protos dq-actors-compute ) @@ -59,14 +58,13 @@ target_sources(core-tx-columnshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_costs.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_private_events.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/compaction_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/eviction_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/export_actor.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/external_data.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/indexing_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/read_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/write_actor.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/s3_actor.cpp ) generate_enum_serilization(core-tx-columnshard ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard.h diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index fe67c9360ac..67aedff28cb 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -17,7 +17,9 @@ void TColumnShard::BecomeBroken(const TActorContext& ctx) ctx.Send(IndexingActor, new TEvents::TEvPoisonPill); ctx.Send(CompactionActor, new TEvents::TEvPoisonPill); ctx.Send(EvictionActor, new TEvents::TEvPoisonPill); - StopS3Actors(ctx); + if (Tiers) { + Tiers->Stop(); + } } void TColumnShard::SwitchToWork(const TActorContext& ctx) { @@ -27,7 +29,6 @@ void TColumnShard::SwitchToWork(const TActorContext& ctx) { IndexingActor = ctx.Register(CreateIndexingActor(TabletID(), ctx.SelfID)); CompactionActor = ctx.Register(CreateCompactionActor(TabletID(), ctx.SelfID)); EvictionActor = ctx.Register(CreateEvictionActor(TabletID(), ctx.SelfID)); - InitS3Actors(ctx, true); SignalTabletActive(ctx); } diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp index 6e69e03d0ca..2c4a184244a 100644 --- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp +++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp @@ -76,7 +76,6 @@ public: MakeHolder<TEvColumnShard::TEvNotifyTxCompletionResult>(Self->TabletID(), txId)); } Self->AltersInFlight.erase(txId); - Self->InitS3Actors(ctx, false); break; } case NKikimrTxColumnShard::TX_KIND_COMMIT: { diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp index 110148fafc4..a972c1e7022 100644 --- a/ydb/core/tx/columnshard/columnshard__write_index.cpp +++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp @@ -128,10 +128,8 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) } // Mark exported blobs - Y_VERIFY(Self->TierConfigs.count(tierName)); - auto& config = Self->TierConfigs[tierName]; - - if (config.NeedExport()) { + auto& tManager = Self->GetTierManagerVerified(tierName); + if (tManager.NeedExport()) { for (auto& rec : portionInfo.Records) { auto& blobId = rec.BlobRange.BlobId; if (!BlobsToExport.count(blobId)) { diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 4fb7540df8b..ac332a83127 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -1,8 +1,10 @@ #include "columnshard_impl.h" #include "columnshard_schema.h" #include <ydb/core/scheme/scheme_types_proto.h> -#include <ydb/core/tx/columnshard/engines/column_engine_logs.h> #include <ydb/core/tablet/tablet_counters_protobuf.h> +#include <ydb/core/tx/tiering/external_data.h> +#include <ydb/core/tx/columnshard/engines/column_engine_logs.h> +#include <ydb/services/metadata/service.h> namespace NKikimr::NColumnShard { @@ -737,7 +739,7 @@ std::unique_ptr<TEvPrivate::TEvIndexing> TColumnShard::SetupIndexation() { } ActiveIndexingOrCompaction = true; - auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(PrimaryIndex->GetIndexInfo(), indexChanges, + auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(GetActualIndexInfo(), indexChanges, Settings.CacheDataAfterIndexing, std::move(cachedBlobs)); return std::make_unique<TEvPrivate::TEvIndexing>(std::move(ev)); } @@ -775,7 +777,7 @@ std::unique_ptr<TEvPrivate::TEvCompaction> TColumnShard::SetupCompaction() { } ActiveIndexingOrCompaction = true; - auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(PrimaryIndex->GetIndexInfo(), indexChanges, + auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(GetActualIndexInfo(), indexChanges, Settings.CacheDataAfterCompaction); return std::make_unique<TEvPrivate::TEvCompaction>(std::move(ev), *BlobManager); } @@ -791,24 +793,28 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u return {}; } - THashMap<ui64, NOlap::TTiersInfo> regularTtls; - if (pathTtls.empty()) { + THashMap<ui64, NOlap::TTiersInfo> regularTtls = pathTtls; + if (regularTtls.empty()) { regularTtls = Ttl.MakeIndexTtlMap(TInstant::Now(), force); } + const bool tiersUsage = regularTtls.empty() && Tiers && Tiers->IsActive(); + if (tiersUsage) { + regularTtls = Tiers->GetTiering(); + } - if (pathTtls.empty() && regularTtls.empty()) { + if (regularTtls.empty()) { LOG_S_TRACE("TTL not started. No tables to activate it on (or delayed) at tablet " << TabletID()); return {}; + } else { + for (auto&& i : regularTtls) { + LOG_S_DEBUG(i.first << "/" << i.second.GetDebugString() << ";tablet=" << TabletID()); + } } LOG_S_DEBUG("Prepare TTL at tablet " << TabletID()); std::shared_ptr<NOlap::TColumnEngineChanges> indexChanges; - if (pathTtls.empty()) { - indexChanges = PrimaryIndex->StartTtl(regularTtls); - } else { - indexChanges = PrimaryIndex->StartTtl(pathTtls); - } + indexChanges = PrimaryIndex->StartTtl(regularTtls); if (!indexChanges) { LOG_S_NOTICE("Cannot prepare TTL at tablet " << TabletID()); @@ -821,7 +827,7 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u bool needWrites = !indexChanges->PortionsToEvict.empty(); ActiveTtl = true; - auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(PrimaryIndex->GetIndexInfo(), indexChanges, false); + auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(GetActualIndexInfo(tiersUsage), indexChanges, false); return std::make_unique<TEvPrivate::TEvEviction>(std::move(ev), *BlobManager, needWrites); } @@ -871,35 +877,13 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() { return {}; } - auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(PrimaryIndex->GetIndexInfo(), changes, false); + auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(GetActualIndexInfo(), changes, false); ev->PutStatus = NKikimrProto::OK; // No new blobs to write ActiveCleanup = true; return ev; } -static NOlap::TCompression ConvertCompression(const NKikimrSchemeOp::TCompressionOptions& compression) { - NOlap::TCompression out; - if (compression.HasCompressionCodec()) { - switch (compression.GetCompressionCodec()) { - case NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain: - out.Codec = arrow::Compression::UNCOMPRESSED; - break; - case NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4: - out.Codec = arrow::Compression::LZ4_FRAME; - break; - case NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD: - out.Codec = arrow::Compression::ZSTD; - break; - } - } - - if (compression.HasCompressionLevel()) { - out.Level = compression.GetCompressionLevel(); - } - return out; -} - NOlap::TIndexInfo TColumnShard::ConvertSchema(const NKikimrSchemeOp::TColumnTableSchema& schema) { Y_VERIFY(schema.GetEngine() == NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES); @@ -921,22 +905,20 @@ NOlap::TIndexInfo TColumnShard::ConvertSchema(const NKikimrSchemeOp::TColumnTabl } if (schema.HasDefaultCompression()) { - NOlap::TCompression compression = ConvertCompression(schema.GetDefaultCompression()); + NOlap::TCompression compression = NTiers::TManager::ConvertCompression(schema.GetDefaultCompression()); indexInfo.SetDefaultCompression(compression); } - for (auto& tierConfig : schema.GetStorageTiers()) { - auto& tierName = tierConfig.GetName(); - TierConfigs[tierName] = TTierConfig{tierConfig}; - - NOlap::TStorageTier tier{ .Name = tierName }; - if (tierConfig.HasCompression()) { - tier.Compression = ConvertCompression(tierConfig.GetCompression()); - } - if (TierConfigs[tierName].NeedExport()) { - S3Actors[tierName] = {}; // delayed actor creation + EnableTiering = schema.GetEnableTiering(); + if (OwnerPath && !Tiers && EnableTiering) { + Tiers = std::make_shared<TTiersManager>(TabletID(), OwnerPath); + } + if (!!Tiers) { + if (EnableTiering) { + Tiers->Start(Tiers); + } else { + Tiers->Stop(); } - indexInfo.AddStorageTier(std::move(tier)); } return indexInfo; @@ -973,30 +955,16 @@ void TColumnShard::MapExternBlobs(const TActorContext& /*ctx*/, NOlap::TReadMeta } } -TActorId TColumnShard::GetS3ActorForTier(const TString& tierName, const TString& phase) const { - auto it = S3Actors.find(tierName); - if (it == S3Actors.end()) { - LOG_S_ERROR("No S3 actor for tier '" << tierName << "' (on " << phase << ") at tablet " << TabletID()); - return {}; - } - auto s3 = it->second; - if (!s3) { - LOG_S_ERROR("Not started S3 actor for tier '" << tierName << "' (on " << phase << ") at tablet " << TabletID()); - return {}; - } - return s3; -} - void TColumnShard::ExportBlobs(const TActorContext& ctx, ui64 exportNo, const TString& tierName, TEvPrivate::TEvExport::TBlobDataMap&& blobsInfo) const { - if (auto s3 = GetS3ActorForTier(tierName, "export")) { + if (auto s3 = GetS3ActorForTier(tierName)) { auto event = std::make_unique<TEvPrivate::TEvExport>(exportNo, tierName, s3, std::move(blobsInfo)); ctx.Register(CreateExportActor(TabletID(), ctx.SelfID, event.release())); } } void TColumnShard::ForgetBlobs(const TActorContext& ctx, const TString& tierName, std::vector<NOlap::TEvictedBlob>&& blobs) const { - if (auto s3 = GetS3ActorForTier(tierName, "forget")) { + if (auto s3 = GetS3ActorForTier(tierName)) { auto forget = std::make_unique<TEvPrivate::TEvForget>(); forget->Evicted = std::move(blobs); ctx.Send(s3, forget.release()); @@ -1005,7 +973,7 @@ void TColumnShard::ForgetBlobs(const TActorContext& ctx, const TString& tierName bool TColumnShard::GetExportedBlob(const TActorContext& ctx, TActorId dst, ui64 cookie, const TString& tierName, NOlap::TEvictedBlob&& evicted, std::vector<NOlap::TBlobRange>&& ranges) { - if (auto s3 = GetS3ActorForTier(tierName, "get exported")) { + if (auto s3 = GetS3ActorForTier(tierName)) { auto get = std::make_unique<TEvPrivate::TEvGetExported>(); get->DstActor = dst; get->DstCookie = cookie; @@ -1017,41 +985,41 @@ bool TColumnShard::GetExportedBlob(const TActorContext& ctx, TActorId dst, ui64 return false; } -ui32 TColumnShard::InitS3Actors(const TActorContext& ctx, bool init) { - ui32 count = 0; -#ifndef KIKIMR_DISABLE_S3_OPS - for (auto& [tierName, actor] : S3Actors) { - if (!init && actor) { - continue; - } +void TColumnShard::Die(const TActorContext& ctx) { + // TODO + if (!!Tiers) { + Tiers->Stop(); + } + NTabletPipe::CloseAndForgetClient(SelfId(), StatsReportPipe); + UnregisterMediatorTimeCast(); + return IActor::Die(ctx); +} - Y_VERIFY(!actor); - Y_VERIFY(TierConfigs.count(tierName)); - auto& tierConfig = TierConfigs[tierName]; - Y_VERIFY(tierConfig.NeedExport()); +TActorId TColumnShard::GetS3ActorForTier(const TString& tierName) const { + if (!Tiers) { + return {}; + } + return Tiers->GetStorageActorId(tierName); +} - actor = ctx.Register(CreateS3Actor(TabletID(), ctx.SelfID, tierName)); - ctx.Send(actor, new TEvPrivate::TEvS3Settings(tierConfig.S3Settings())); - ++count; +void TColumnShard::Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev) { + if (!Tiers) { + Tiers = std::make_shared<TTiersManager>(TabletID(), OwnerPath); + } + Tiers->TakeConfigs(ev->Get()->GetSnapshot()); + if (EnableTiering) { + Tiers->Start(Tiers); } -#else - Y_UNUSED(ctx); - Y_UNUSED(init); -#endif - return count; } -void TColumnShard::StopS3Actors(const TActorContext& ctx) { -#ifndef KIKIMR_DISABLE_S3_OPS - for (auto& [_, actor] : S3Actors) { - if (actor) { - ctx.Send(actor, new TEvents::TEvPoisonPill); - actor = {}; +NOlap::TIndexInfo TColumnShard::GetActualIndexInfo(const bool tiersUsage) const { + auto indexInfo = PrimaryIndex->GetIndexInfo(); + if (tiersUsage && Tiers && Tiers->IsActive()) { + for (auto&& i : *Tiers) { + indexInfo.AddStorageTier(i.second.BuildTierStorage()); } } -#else - Y_UNUSED(ctx); -#endif + return indexInfo; } } diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index d0028870eab..5595ccf502d 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -7,14 +7,15 @@ #include "blob_manager.h" #include "inflight_request_tracker.h" -#include <ydb/core/tx/tx_processing.h> -#include <ydb/core/tx/time_cast/time_cast.h> #include <ydb/core/tablet/tablet_counters.h> #include <ydb/core/tablet/tablet_pipe_client_cache.h> #include <ydb/core/tablet_flat/flat_cxx_database.h> #include <ydb/core/tablet_flat/tablet_flat_executed.h> #include <ydb/core/tablet_flat/tablet_flat_executor.h> +#include <ydb/core/tx/tiering/manager.h> +#include <ydb/core/tx/time_cast/time_cast.h> #include <ydb/core/tx/tx_processing.h> +#include <ydb/services/metadata/service.h> namespace NKikimr::NColumnShard { @@ -38,9 +39,6 @@ IActor* CreateReadActor(ui64 tabletId, ui64 requestCookie); IActor* CreateColumnShardScan(const TActorId& scanComputeActor, ui32 scanId, ui64 txId); IActor* CreateExportActor(const ui64 tabletId, const TActorId& dstActor, TAutoPtr<TEvPrivate::TEvExport> ev); -#ifndef KIKIMR_DISABLE_S3_OPS -IActor* CreateS3Actor(ui64 tabletId, const TActorId& parent, const TString& tierName); -#endif struct TSettings { static constexpr ui32 MAX_INDEXATIONS_TO_SKIP = 16; @@ -119,7 +117,6 @@ class TColumnShard void Handle(TEvColumnShard::TEvReadBlobRanges::TPtr& ev, const TActorContext& ctx); void Handle(TEvMediatorTimecast::TEvRegisterTabletResult::TPtr& ev, const TActorContext& ctx); void Handle(TEvMediatorTimecast::TEvNotifyPlanStep::TPtr& ev, const TActorContext& ctx); - void Handle(TEvPrivate::TEvScanStats::TPtr &ev, const TActorContext &ctx); void Handle(TEvPrivate::TEvReadFinished::TPtr &ev, const TActorContext &ctx); void Handle(TEvPrivate::TEvPeriodicWakeup::TPtr& ev, const TActorContext& ctx); @@ -127,7 +124,8 @@ class TColumnShard void Handle(TEvPrivate::TEvExport::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvForget::TPtr& ev, const TActorContext& ctx); void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr& ev, const TActorContext& ctx); - + void Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev); + ITransaction* CreateTxInitSchema(); ITransaction* CreateTxRunGc(); @@ -141,13 +139,13 @@ class TColumnShard Y_UNUSED(ctx); } - void Die(const TActorContext& ctx) override { - // TODO - NTabletPipe::CloseAndForgetClient(SelfId(), StatsReportPipe); - UnregisterMediatorTimeCast(); - return IActor::Die(ctx); + const NTiers::TManager& GetTierManagerVerified(const TString& tierName) const { + Y_VERIFY(!!Tiers); + return Tiers->GetManagerVerified(tierName); } + void Die(const TActorContext& ctx) override; + void BecomeBroken(const TActorContext& ctx); void SwitchToWork(const TActorContext& ctx); @@ -177,6 +175,8 @@ class TColumnShard TabletCounters->Percentile()[counter].IncrementFor(latency.MicroSeconds()); } + NOlap::TIndexInfo GetActualIndexInfo(const bool tiersUsage = true) const; + protected: STFUNC(StateInit) { TRACE_EVENT(NKikimrServices::TX_COLUMNSHARD); @@ -203,6 +203,7 @@ protected: STFUNC(StateWork) { TRACE_EVENT(NKikimrServices::TX_COLUMNSHARD); switch (ev->GetTypeRewrite()) { + hFunc(NMetadataProvider::TEvRefreshSubscriberData, Handle); HFunc(TEvents::TEvPoisonPill, Handle); HFunc(TEvTabletPipe::TEvClientConnected, Handle); HFunc(TEvTabletPipe::TEvClientDestroyed, Handle); @@ -316,21 +317,6 @@ private: } }; - struct TTierConfig { - using TTierProto = NKikimrSchemeOp::TStorageTierConfig; - using TS3SettingsProto = NKikimrSchemeOp::TS3Settings; - - TTierProto Proto; - - bool NeedExport() const { - return Proto.HasObjectStorage(); - } - - const TS3SettingsProto& S3Settings() const { - return Proto.GetObjectStorage(); - } - }; - struct TLongTxWriteInfo { ui64 WriteId; NLongTxService::TLongTxId LongTxId; @@ -367,14 +353,15 @@ private: TActorId CompactionActor; // It's memory bounded to 1: we have no memory for parallel compation. TActorId EvictionActor; TActorId StatsReportPipe; - THashMap<TString, TActorId> S3Actors; + + bool EnableTiering = false; + std::shared_ptr<TTiersManager> Tiers; std::unique_ptr<TTabletCountersBase> TabletCountersPtr; TTabletCountersBase* TabletCounters; std::unique_ptr<NTabletPipe::IClientCache> PipeClientCache; std::unique_ptr<NOlap::TInsertTable> InsertTable; std::unique_ptr<NOlap::IColumnEngine> PrimaryIndex; TBatchCache BatchCache; - THashMap<TString, TTierConfig> TierConfigs; THashSet<NOlap::TUnifiedBlobId> DelayedForgetBlobs; TTtl Ttl; @@ -458,14 +445,12 @@ private: NOlap::TIndexInfo ConvertSchema(const NKikimrSchemeOp::TColumnTableSchema& schema); void MapExternBlobs(const TActorContext& ctx, NOlap::TReadMetadata& metadata); - TActorId GetS3ActorForTier(const TString& tierName, const TString& phase) const; + TActorId GetS3ActorForTier(const TString& tierName) const; void ExportBlobs(const TActorContext& ctx, ui64 exportNo, const TString& tierName, TEvPrivate::TEvExport::TBlobDataMap&& blobsInfo) const; void ForgetBlobs(const TActorContext& ctx, const TString& tierName, std::vector<NOlap::TEvictedBlob>&& blobs) const; bool GetExportedBlob(const TActorContext& ctx, TActorId dst, ui64 cookie, const TString& tierName, NOlap::TEvictedBlob&& evicted, std::vector<NOlap::TBlobRange>&& ranges); - ui32 InitS3Actors(const TActorContext& ctx, bool init); - void StopS3Actors(const TActorContext& ctx); std::unique_ptr<TEvPrivate::TEvIndexing> SetupIndexation(); std::unique_ptr<TEvPrivate::TEvCompaction> SetupCompaction(); @@ -479,7 +464,6 @@ private: void UpdateResourceMetrics(const TActorContext& ctx, const TUsage& usage); ui64 MemoryUsage() const; void SendPeriodicStats(); - public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::TX_COLUMNSHARD_ACTOR; diff --git a/ydb/core/tx/columnshard/columnshard_private_events.cpp b/ydb/core/tx/columnshard/columnshard_private_events.cpp new file mode 100644 index 00000000000..83d1d0b29bc --- /dev/null +++ b/ydb/core/tx/columnshard/columnshard_private_events.cpp @@ -0,0 +1,4 @@ +#include "columnshard_private_events.h" + +namespace NKikimr::NColumnShard { +} diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h index bfbe1c429a7..bdf7402b168 100644 --- a/ydb/core/tx/columnshard/columnshard_private_events.h +++ b/ydb/core/tx/columnshard/columnshard_private_events.h @@ -1,6 +1,7 @@ #pragma once #include "blob_manager.h" +#include "defs.h" #include <ydb/core/protos/counters_columnshard.pb.h> @@ -38,11 +39,11 @@ struct TEvPrivate { bool CacheData{false}; TDuration Duration; - TEvWriteIndex(const NOlap::TIndexInfo& indexInfo, + TEvWriteIndex(NOlap::TIndexInfo&& indexInfo, std::shared_ptr<NOlap::TColumnEngineChanges> indexChanges, bool cacheData, THashMap<TUnifiedBlobId, std::shared_ptr<arrow::RecordBatch>>&& cachedBlobs = {}) - : IndexInfo(indexInfo) + : IndexInfo(std::move(indexInfo)) , IndexChanges(indexChanges) , CachedBlobs(std::move(cachedBlobs)) , CacheData(cacheData) diff --git a/ydb/core/tx/columnshard/columnshard_ttl.h b/ydb/core/tx/columnshard/columnshard_ttl.h index a9c88419ccb..aae50f3e7ee 100644 --- a/ydb/core/tx/columnshard/columnshard_ttl.h +++ b/ydb/core/tx/columnshard/columnshard_ttl.h @@ -20,8 +20,6 @@ public: TDescription() = default; TDescription(const NKikimrSchemeOp::TColumnDataLifeCycle& ttl) { - TDuration prevEvicSec; - if (ttl.HasEnabled()) { auto& enabled = ttl.GetEnabled(); ColumnName = enabled.GetColumnName(); @@ -29,23 +27,6 @@ public: Evictions.reserve(1); Evictions.emplace_back(TEviction{{}, expireSec}); - } else if (ttl.HasTiering()) { - Evictions.reserve(ttl.GetTiering().TiersSize()); - - for (auto& tier : ttl.GetTiering().GetTiers()) { - auto& eviction = tier.GetEviction(); - Y_VERIFY(ColumnName.empty() || ColumnName == eviction.GetColumnName()); - ColumnName = eviction.GetColumnName(); - auto evictSec = TDuration::Seconds(eviction.GetExpireAfterSeconds()); - - // Ignore next tier if it has smaller eviction time. Prefer first tier with same eviction time. - if (evictSec > prevEvicSec) { - Evictions.emplace_back(TEviction{tier.GetName(), evictSec}); - prevEvicSec = evictSec; - } - } - - Evictions.shrink_to_fit(); } if (Enabled()) { diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/columnshard_ut_common.cpp index 35c4b08ff5c..af407457cba 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp +++ b/ydb/core/tx/columnshard/columnshard_ut_common.cpp @@ -42,6 +42,12 @@ void TTester::Setup(TTestActorRuntime& runtime) { runtime.UpdateCurrentTime(TInstant::Now()); } +void ProvideTieringSnapshot(TTestBasicRuntime& runtime, TActorId& sender, NMetadataProvider::ISnapshot::TPtr snapshot) { + auto event = std::make_unique<NMetadataProvider::TEvRefreshSubscriberData>(snapshot); + + ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, event.release()); +} + bool ProposeSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, const TString& txBody, NOlap::TSnapshot snap) { auto event = std::make_unique<TEvColumnShard::TEvProposeTransaction>( NKikimrTxColumnShard::TX_KIND_SCHEMA, 0, sender, snap.TxId, txBody); @@ -285,4 +291,37 @@ TSerializedTableRange MakeTestRange(std::pair<ui64, ui64> range, bool inclusiveF TConstArrayRef<TCell>(cellsTo), inclusiveTo); } +NMetadataProvider::ISnapshot::TPtr TTestSchema::BuildSnapshot(const TTableSpecials& specials, const TString& tablePath, const ui32 tablePathId) { + std::unique_ptr<NColumnShard::NTiers::TConfigsSnapshot> cs(new NColumnShard::NTiers::TConfigsSnapshot(Now())); + for (auto&& tier : specials.Tiers) { + { + NColumnShard::NTiers::TTierConfig tConfig; + tConfig.SetOwnerPath("/Root"); + tConfig.SetTierName(tier.Name); + tConfig.MutableProtoConfig().SetName(tier.Name); + auto& cProto = tConfig.MutableProtoConfig(); + if (tier.S3) { + *cProto.MutableObjectStorage() = *tier.S3; + } + if (tier.Codec) { + cProto.MutableCompression()->SetCompressionCodec(tier.GetCodecId()); + } + if (tier.CompressionLevel) { + cProto.MutableCompression()->SetCompressionLevel(*tier.CompressionLevel); + } + cs->MutableTierConfigs().emplace(tConfig.GetConfigId(), tConfig); + } + { + NColumnShard::NTiers::TTieringRule tRule; + tRule.SetOwnerPath("/Root"); + tRule.SetDurationForEvict(TDuration::Seconds(tier.GetEvictAfterSecondsUnsafe())); + tRule.SetTablePath(tablePath).SetTablePathId(tablePathId); + tRule.SetTierName(tier.Name); + tRule.SetColumn(tier.GetTtlColumn()); + cs->MutableTableTierings()[tablePath].AddRule(std::move(tRule)); + } + } + return cs; +} + } diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h index 8a9839f9ad0..e0cdb1766f0 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.h +++ b/ydb/core/tx/columnshard/columnshard_ut_common.h @@ -25,19 +25,20 @@ using TTypeId = NScheme::TTypeId; using TTypeInfo = NScheme::TTypeInfo; struct TTestSchema { - static const constexpr char * DefaultTtlColumn = "saved_at"; + static inline const TString DefaultTtlColumn = "saved_at"; struct TStorageTier { + YDB_ACCESSOR(TString, TtlColumn, DefaultTtlColumn); + YDB_OPT(ui32, EvictAfterSeconds); + public: TString Name; TString Codec; std::optional<int> CompressionLevel; - std::optional<ui32> EvictAfterSeconds; - TString TtlColumn; std::optional<NKikimrSchemeOp::TS3Settings> S3; TStorageTier(const TString& name = {}) : Name(name) - , TtlColumn(DefaultTtlColumn) + {} NKikimrSchemeOp::EColumnCodec GetCodecId() const { @@ -62,12 +63,6 @@ struct TTestSchema { } return *this; } - - TStorageTier& SetTtl(ui32 seconds, const TString& column = DefaultTtlColumn) { - EvictAfterSeconds = seconds; - TtlColumn = column; - return *this; - } }; struct TTableSpecials : public TStorageTier { @@ -78,7 +73,7 @@ struct TTestSchema { } bool HasTtl() const { - return !HasTiers() && EvictAfterSeconds; + return !HasTiers() && HasEvictAfterSeconds(); } TTableSpecials WithCodec(const TString& codec) { @@ -211,37 +206,15 @@ struct TTestSchema { schema->MutableDefaultCompression()->SetCompressionLevel(*specials.CompressionLevel); } - for (auto& tier : specials.Tiers) { - auto* t = schema->AddStorageTiers(); - t->SetName(tier.Name); - if (tier.HasCodec()) { - t->MutableCompression()->SetCompressionCodec(tier.GetCodecId()); - } - if (tier.CompressionLevel) { - t->MutableCompression()->SetCompressionLevel(*tier.CompressionLevel); - } - if (tier.S3) { - t->MutableObjectStorage()->CopyFrom(*tier.S3); - } - } + schema->SetEnableTiering(specials.HasTiers()); } - if (specials.HasTiers()) { - auto* ttlSettings = table->MutableTtlSettings(); - ttlSettings->SetVersion(1); - auto* tiering = ttlSettings->MutableTiering(); - for (auto& tier : specials.Tiers) { - auto* t = tiering->AddTiers(); - t->SetName(tier.Name); - t->MutableEviction()->SetColumnName(tier.TtlColumn); - t->MutableEviction()->SetExpireAfterSeconds(*tier.EvictAfterSeconds); - } - } else if (specials.HasTtl()) { + if (specials.HasTtl()) { auto* ttlSettings = table->MutableTtlSettings(); ttlSettings->SetVersion(1); auto* enable = ttlSettings->MutableEnabled(); - enable->SetColumnName(specials.TtlColumn); - enable->SetExpireAfterSeconds(*specials.EvictAfterSeconds); + enable->SetColumnName(specials.GetTtlColumn()); + enable->SetExpireAfterSeconds(specials.GetEvictAfterSecondsUnsafe()); } TString out; @@ -258,18 +231,10 @@ struct TTestSchema { auto* ttlSettings = table->MutableTtlSettings(); ttlSettings->SetVersion(version); - if (specials.HasTiers()) { - auto* tiering = ttlSettings->MutableTiering(); - for (auto& tier : specials.Tiers) { - auto* t = tiering->AddTiers(); - t->SetName(tier.Name); - t->MutableEviction()->SetColumnName(tier.TtlColumn); - t->MutableEviction()->SetExpireAfterSeconds(*tier.EvictAfterSeconds); - } - } else if (specials.HasTtl()) { + if (specials.HasTtl()) { auto* enable = ttlSettings->MutableEnabled(); - enable->SetColumnName(specials.TtlColumn); - enable->SetExpireAfterSeconds(*specials.EvictAfterSeconds); + enable->SetColumnName(specials.GetTtlColumn()); + enable->SetExpireAfterSeconds(specials.GetEvictAfterSecondsUnsafe()); } else { ttlSettings->MutableDisabled(); } @@ -289,6 +254,8 @@ struct TTestSchema { return out; } + static NMetadataProvider::ISnapshot::TPtr BuildSnapshot(const TTableSpecials& specials, const TString& tablePath, const ui32 tablePathId); + static TString CommitTxBody(ui64 metaShard, const TVector<ui64>& writeIds) { NKikimrTxColumnShard::TCommitTxBody proto; if (metaShard) { @@ -336,6 +303,7 @@ struct TTestSchema { }; bool ProposeSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, const TString& txBody, NOlap::TSnapshot snap); +void ProvideTieringSnapshot(TTestBasicRuntime& runtime, TActorId& sender, NMetadataProvider::ISnapshot::TPtr snapshot); void PlanSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap); bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui64 writeId, ui64 tableId, const TString& data, std::shared_ptr<arrow::Schema> schema = {}); diff --git a/ydb/core/tx/columnshard/engines/column_engine.cpp b/ydb/core/tx/columnshard/engines/column_engine.cpp index 99542dd0a1f..e8e3611ea8e 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine.cpp @@ -1,4 +1,14 @@ -#include "column_engine.h"
+#include "column_engine.h" namespace NKikimr::NOlap { + +TString TTiersInfo::GetDebugString() const { + TStringBuilder sb; + sb << "column=" << Column << ";"; + for (auto&& i : TierBorders) { + sb << "tname=" << i.TierName << ";eborder=" << i.EvictBorder << ";"; + } + return sb; +} + } diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index 4cfc7fa2ef6..2422d5ebcde 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -86,6 +86,8 @@ struct TTiersInfo { TString Column; std::vector<TTierTimeBorder> TierBorders; // Ordered tiers from hottest to coldest + TString GetDebugString() const; + TTiersInfo(const TString& column, TInstant border = {}, const TString& tierName = {}) : Column(column) { diff --git a/ydb/core/tx/columnshard/engines/index_info.cpp b/ydb/core/tx/columnshard/engines/index_info.cpp index 45c601d7c0d..5eff70619a9 100644 --- a/ydb/core/tx/columnshard/engines/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/index_info.cpp @@ -246,4 +246,12 @@ std::shared_ptr<NArrow::TSortDescription> TIndexInfo::SortReplaceDescription() c return {}; } +bool TIndexInfo::AllowTtlOverColumn(const TString& name) const { + auto it = ColumnNames.find(name); + if (it == ColumnNames.end()) { + return false; + } + return MinMaxIdxColumnsIds.count(it->second); +} + } diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h index 218c8dfbbf0..3828bb946d0 100644 --- a/ydb/core/tx/columnshard/engines/index_info.h +++ b/ydb/core/tx/columnshard/engines/index_info.h @@ -178,13 +178,7 @@ struct TIndexInfo : public NTable::TScheme::TTableSchema { return MinMaxIdxColumnsIds; } - bool AllowTtlOverColumn(const TString& name) const { - auto it = ColumnNames.find(name); - if (it == ColumnNames.end()) { - return false; - } - return MinMaxIdxColumnsIds.count(it->second); - } + bool AllowTtlOverColumn(const TString& name) const; bool IsSorted() const { return SortingKey.get(); } bool IsReplacing() const { return ReplaceKey.get(); } diff --git a/ydb/core/tx/columnshard/external_data.cpp b/ydb/core/tx/columnshard/external_data.cpp deleted file mode 100644 index 5a95613ca81..00000000000 --- a/ydb/core/tx/columnshard/external_data.cpp +++ /dev/null @@ -1,91 +0,0 @@ -#include "external_data.h" -#include <library/cpp/json/writer/json_value.h> -#include <library/cpp/protobuf/json/proto2json.h> - -namespace NKikimr::NColumnShard::NTiers { - -bool TConfigsSnapshot::DoDeserializeFromResultSet(const Ydb::ResultSet& rawData) { - const i32 ownerPathIdx = GetFieldIndex(rawData, "ownerPath"); - const i32 tierNameIdx = GetFieldIndex(rawData, "tierName"); - const i32 tierConfigIdx = GetFieldIndex(rawData, "tierConfig"); - if (tierNameIdx < 0 || tierConfigIdx < 0 || ownerPathIdx < 0) { - ALS_ERROR(NKikimrServices::TX_COLUMNSHARD) << "incorrect tiers config table structure"; - return false; - } - for (auto&& r : rawData.rows()) { - TConfig config(r.items()[ownerPathIdx].bytes_value(), r.items()[tierNameIdx].bytes_value()); - if (!config.DeserializeFromString(r.items()[tierConfigIdx].bytes_value())) { - ALS_ERROR(NKikimrServices::TX_COLUMNSHARD) << "cannot parse tier config from snapshot"; - return false; - } - if (!Data.emplace(config.GetConfigId(), config).second) { - ALS_ERROR(NKikimrServices::TX_COLUMNSHARD) << "tier names duplication: " << config.GetTierName(); - return false; - } - } - return true; -} - -std::optional<TConfig> TConfigsSnapshot::GetValue(const TString& key) const { - auto it = Data.find(key); - if (it == Data.end()) { - return {}; - } else { - return it->second; - } -} - -TVector<NMetadataProvider::ITableModifier::TPtr> TSnapshotConstructor::DoGetTableSchema() const { - Ydb::Table::CreateTableRequest request; - request.set_session_id(""); - request.set_path(TablePath); - request.add_primary_key("ownerPath"); - request.add_primary_key("tierName"); - { - auto& column = *request.add_columns(); - column.set_name("tierName"); - column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); - } - { - auto& column = *request.add_columns(); - column.set_name("ownerPath"); - column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); - } - { - auto& column = *request.add_columns(); - column.set_name("tierConfig"); - column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); - } - NMetadataProvider::ITableModifier::TPtr result( - new NMetadataProvider::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request)); - return { result }; -} - -TString NTiers::TConfigsSnapshot::DoSerializeToString() const { - NJson::TJsonValue result = NJson::JSON_MAP; - for (auto&& i : Data) { - result.InsertValue(i.first, i.second.SerializeToJson()); - } - return result.GetStringRobust(); -} - -bool NTiers::TConfig::DeserializeFromString(const TString& configProtoStr) { - if (!::google::protobuf::TextFormat::ParseFromString(configProtoStr, &ProtoConfig)) { - ALS_ERROR(NKikimrServices::TX_COLUMNSHARD) << "cannot parse proto string: " << configProtoStr; - return false; - } - return true; -} - -NJson::TJsonValue TConfig::SerializeToJson() const { - NJson::TJsonValue result = NJson::JSON_MAP; - result.InsertValue("tierName", TierName); - NProtobufJson::Proto2Json(ProtoConfig, result.InsertValue("tierConfig", NJson::JSON_MAP)); - return result; -} - -bool TConfig::IsSame(const TConfig& item) const { - return TierName == item.TierName && ProtoConfig.SerializeAsString() == item.ProtoConfig.SerializeAsString(); -} - -} diff --git a/ydb/core/tx/columnshard/external_data.h b/ydb/core/tx/columnshard/external_data.h deleted file mode 100644 index e070fa5ba21..00000000000 --- a/ydb/core/tx/columnshard/external_data.h +++ /dev/null @@ -1,66 +0,0 @@ -#pragma once -#include <ydb/services/metadata/service.h> -#include <ydb/core/protos/flat_scheme_op.pb.h> - -#include <library/cpp/json/writer/json_value.h> - -namespace NKikimr::NColumnShard::NTiers { - -class TConfig { -private: - using TTierProto = NKikimrSchemeOp::TStorageTierConfig; - using TS3SettingsProto = NKikimrSchemeOp::TS3Settings; - YDB_READONLY_DEF(TString, OwnerPath); - YDB_READONLY_DEF(TString, TierName); - YDB_READONLY_DEF(TTierProto, ProtoConfig); -public: - TConfig() = default; - TConfig(const TString& ownerPath, const TString& tierName) - : OwnerPath(ownerPath) - , TierName(tierName) - { - - } - - TString GetConfigId() const { - return OwnerPath + "." + TierName; - } - - bool NeedExport() const { - return ProtoConfig.HasObjectStorage(); - } - bool IsSame(const TConfig& item) const; - bool DeserializeFromString(const TString& configProtoStr); - NJson::TJsonValue SerializeToJson() const; -}; - -class TConfigsSnapshot: public NMetadataProvider::ISnapshot { -private: - using TBase = NMetadataProvider::ISnapshot; - using TConfigsMap = TMap<TString, TConfig>; - YDB_READONLY_DEF(TConfigsMap, Data); -protected: - virtual bool DoDeserializeFromResultSet(const Ydb::ResultSet& rawData) override; - virtual TString DoSerializeToString() const override; -public: - std::optional<TConfig> GetValue(const TString& key) const; - using TBase::TBase; -}; - -class TSnapshotConstructor: public NMetadataProvider::TGenericSnapshotParser<TConfigsSnapshot> { -private: - const TString TablePath; -protected: - virtual TVector<NMetadataProvider::ITableModifier::TPtr> DoGetTableSchema() const override; - virtual const TString& DoGetTablePath() const override { - return TablePath; - } -public: - TSnapshotConstructor(const TString& tablePath) - : TablePath(tablePath) - { - - } -}; - -} diff --git a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp index 20e5e296ca5..9cf3766c597 100644 --- a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp @@ -27,7 +27,7 @@ std::shared_ptr<arrow::RecordBatch> UpdateColumn(std::shared_ptr<arrow::RecordBa auto schema = batch->schema(); int pos = schema->GetFieldIndex(name); - UNIT_ASSERT(pos > 0); + UNIT_ASSERT(pos >= 0); UNIT_ASSERT(batch->GetColumnByName(name)->type_id() == arrow::Type::TIMESTAMP); auto scalar = arrow::TimestampScalar(seconds * 1000 * 1000, arrow::timestamp(arrow::TimeUnit::MICRO)); @@ -42,7 +42,7 @@ std::shared_ptr<arrow::RecordBatch> UpdateColumn(std::shared_ptr<arrow::RecordBa } bool TriggerTTL(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap, const TVector<ui64>& pathIds, - ui64 tsSeconds = 0, const TString& ttlColumnName = TTestSchema::DefaultTtlColumn) { + ui64 tsSeconds, const TString& ttlColumnName) { TString txBody = TTestSchema::TtlTxBody(pathIds, ttlColumnName, tsSeconds); auto event = std::make_unique<TEvColumnShard::TEvProposeTransaction>( NKikimrTxColumnShard::TX_KIND_TTL, sender, snap.TxId, txBody); @@ -86,7 +86,7 @@ bool CheckSame(const TString& blob, const TString& srtSchema, ui32 expectedSize, return true; } -std::vector<TString> MakeData(const std::vector<ui64>& ts, ui32 portionSize, ui32 overlapSize) { +std::vector<TString> MakeData(const std::vector<ui64>& ts, ui32 portionSize, ui32 overlapSize, const TString& ttlColumnName) { UNIT_ASSERT(ts.size() == 2); TString data1 = MakeTestBlob({0, portionSize}, testYdbSchema); @@ -98,8 +98,8 @@ std::vector<TString> MakeData(const std::vector<ui64>& ts, ui32 portionSize, ui3 UNIT_ASSERT(data2.size() < 7 * 1024 * 1024); auto schema = NArrow::MakeArrowSchema(testYdbSchema); - auto batch1 = UpdateColumn(NArrow::DeserializeBatch(data1, schema), TTestSchema::DefaultTtlColumn, ts[0]); - auto batch2 = UpdateColumn(NArrow::DeserializeBatch(data2, schema), TTestSchema::DefaultTtlColumn, ts[1]); + auto batch1 = UpdateColumn(NArrow::DeserializeBatch(data1, schema), ttlColumnName, ts[0]); + auto batch2 = UpdateColumn(NArrow::DeserializeBatch(data2, schema), ttlColumnName, ts[1]); std::vector<TString> data; data.emplace_back(NArrow::SerializeBatchNoCompression(batch1)); @@ -156,20 +156,20 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, ttlSec -= (ts[0] + ts[1]) / 2; // enable internal ttl between ts1 and ts2 } if (spec.HasTiers()) { - spec.Tiers[0].SetTtl(ttlSec); + spec.Tiers[0].SetEvictAfterSeconds(ttlSec); } else { - spec.SetTtl(ttlSec); + spec.SetEvictAfterSeconds(ttlSec); } bool ok = ProposeSchemaTx(runtime, sender, TTestSchema::CreateTableTxBody(tableId, testYdbSchema, testYdbPk, spec), {++planStep, ++txId}); UNIT_ASSERT(ok); PlanSchemaTx(runtime, sender, {planStep, txId}); - + ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(spec, "test", tableId)); // ui32 portionSize = 80 * 1000; - auto blobs = MakeData(ts, portionSize, portionSize / 2); + auto blobs = MakeData(ts, portionSize, portionSize / 2, spec.GetTtlColumn()); UNIT_ASSERT_EQUAL(blobs.size(), 2); for (auto& data : blobs) { UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, data)); @@ -184,9 +184,9 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, } if (internal) { - TriggerTTL(runtime, sender, {++planStep, ++txId}, {}); + TriggerTTL(runtime, sender, {++planStep, ++txId}, {}, 0, spec.GetTtlColumn()); } else { - TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[0] + 1); + TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[0] + 1, spec.GetTtlColumn()); } TAutoPtr<IEventHandle> handle; @@ -198,7 +198,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, { --planStep; auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, Max<ui64>(), tableId); - Proto(read.get()).AddColumnNames(TTestSchema::DefaultTtlColumn); + Proto(read.get()).AddColumnNames(spec.GetTtlColumn()); ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release()); auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); @@ -213,32 +213,33 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, UNIT_ASSERT(resRead.GetData().size() > 0); auto& schema = resRead.GetMeta().GetSchema(); - UNIT_ASSERT(CheckSame(resRead.GetData(), schema, portionSize, TTestSchema::DefaultTtlColumn, ts[1])); + UNIT_ASSERT(CheckSame(resRead.GetData(), schema, portionSize, spec.GetTtlColumn(), ts[1])); } // Alter TTL ttlSec = TInstant::Now().Seconds() - (ts[1] + 1); if (spec.HasTiers()) { - spec.Tiers[0].SetTtl(ttlSec); + spec.Tiers[0].SetEvictAfterSeconds(ttlSec); } else { - spec.SetTtl(ttlSec); + spec.SetEvictAfterSeconds(ttlSec); } ok = ProposeSchemaTx(runtime, sender, TTestSchema::AlterTableTxBody(tableId, 2, spec), {++planStep, ++txId}); UNIT_ASSERT(ok); PlanSchemaTx(runtime, sender, {planStep, txId}); + ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(spec, "test", tableId)); if (internal) { - TriggerTTL(runtime, sender, {++planStep, ++txId}, {}); + TriggerTTL(runtime, sender, {++planStep, ++txId}, {}, 0, spec.GetTtlColumn()); } else { - TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[1] + 1); + TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[1] + 1, spec.GetTtlColumn()); } { --planStep; auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, Max<ui64>(), tableId); - Proto(read.get()).AddColumnNames(TTestSchema::DefaultTtlColumn); + Proto(read.get()).AddColumnNames(spec.GetTtlColumn()); ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release()); auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); @@ -258,6 +259,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, TTestSchema::AlterTableTxBody(tableId, 3, TTestSchema::TTableSpecials()), {++planStep, ++txId}); UNIT_ASSERT(ok); + ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(TTestSchema::TTableSpecials(), "test", tableId)); PlanSchemaTx(runtime, sender, {planStep, txId}); UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, blobs[0])); @@ -265,15 +267,15 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, PlanCommit(runtime, sender, ++planStep, txId); if (internal) { - TriggerTTL(runtime, sender, {++planStep, ++txId}, {}); + TriggerTTL(runtime, sender, {++planStep, ++txId}, {}, 0, spec.GetTtlColumn()); } else { - TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[0] - 1); + TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[0] - 1, spec.GetTtlColumn()); } { --planStep; auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, Max<ui64>(), tableId); - Proto(read.get()).AddColumnNames(TTestSchema::DefaultTtlColumn); + Proto(read.get()).AddColumnNames(spec.GetTtlColumn()); ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release()); auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); @@ -288,7 +290,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, UNIT_ASSERT(resRead.GetData().size() > 0); auto& schema = resRead.GetMeta().GetSchema(); - UNIT_ASSERT(CheckSame(resRead.GetData(), schema, portionSize, TTestSchema::DefaultTtlColumn, ts[0])); + UNIT_ASSERT(CheckSame(resRead.GetData(), schema, portionSize, spec.GetTtlColumn(), ts[0])); } } @@ -398,7 +400,7 @@ public: }; }; -std::vector<std::pair<std::shared_ptr<arrow::TimestampArray>, ui64>> +std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTestSchema::TTableSpecials>& specs) { TTestBasicRuntime runtime; TTester::Setup(runtime); @@ -423,12 +425,16 @@ TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTe ui64 txId = 100; UNIT_ASSERT(specs.size() > 0); - bool ok = ProposeSchemaTx(runtime, sender, - TTestSchema::CreateTableTxBody(tableId, testYdbSchema, testYdbPk, specs[0]), - {++planStep, ++txId}); - UNIT_ASSERT(ok); + { + const bool ok = ProposeSchemaTx(runtime, sender, + TTestSchema::CreateTableTxBody(tableId, testYdbSchema, testYdbPk, specs[0]), + { ++planStep, ++txId }); + UNIT_ASSERT(ok); + } PlanSchemaTx(runtime, sender, {planStep, txId}); + ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(specs[0], "test", tableId)); + for (auto& data : blobs) { UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, data)); ProposeCommit(runtime, sender, metaShard, ++txId, {writeId}); @@ -441,7 +447,7 @@ TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTe TAutoPtr<IEventHandle> handle; - std::vector<std::pair<std::shared_ptr<arrow::TimestampArray>, ui64>> resColumns; + std::vector<std::pair<ui32, ui64>> resColumns; resColumns.reserve(specs.size()); TCountersContainer counter; @@ -449,11 +455,15 @@ TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTe for (ui32 i = 0; i < specs.size(); ++i) { if (i) { ui32 version = i + 1; - ok = ProposeSchemaTx(runtime, sender, - TTestSchema::AlterTableTxBody(tableId, version, specs[i]), - {++planStep, ++txId}); - UNIT_ASSERT(ok); - PlanSchemaTx(runtime, sender, {planStep, txId}); + { + const bool ok = ProposeSchemaTx(runtime, sender, + TTestSchema::AlterTableTxBody(tableId, version, specs[i]), + { ++planStep, ++txId }); + UNIT_ASSERT(ok); + PlanSchemaTx(runtime, sender, { planStep, txId }); + } + + ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(specs[i], "test", tableId)); } bool hasEvictionSettings = false; @@ -466,7 +476,7 @@ TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTe counter.SetRestartTabletOnPutData(reboots ? 1 : 0); - TriggerTTL(runtime, sender, {++planStep, ++txId}, {}); + TriggerTTL(runtime, sender, {++planStep, ++txId}, {}, 0, specs[i].GetTtlColumn()); if (hasEvictionSettings) { if (i == 1 || i == 2) { counter.WaitEvents(runtime, sender, i, 1, TDuration::Seconds(40)); @@ -481,34 +491,39 @@ TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTe --planStep; auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, Max<ui64>(), tableId); - Proto(read.get()).AddColumnNames(TTestSchema::DefaultTtlColumn); + Proto(read.get()).AddColumnNames(specs[i].GetTtlColumn()); ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release()); - auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); - UNIT_ASSERT(event); - - auto& resRead = Proto(event); - UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard); - UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); - UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0); - UNIT_ASSERT_EQUAL(resRead.GetFinished(), true); - - if (resRead.GetData().size()) { + resColumns.emplace_back(0, 0); + ui32 idx = 0; + while (true) { + auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); + UNIT_ASSERT(event); + + auto& resRead = Proto(event); + UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); + UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard); + UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); + UNIT_ASSERT_EQUAL(resRead.GetBatch(), idx++); + + if (!resRead.GetData().size()) { + break; + } auto& meta = resRead.GetMeta(); auto& schema = meta.GetSchema(); - auto pkColumn = GetFirstPKColumn(resRead.GetData(), schema, TTestSchema::DefaultTtlColumn); + auto pkColumn = GetFirstPKColumn(resRead.GetData(), schema, specs[i].GetTtlColumn()); UNIT_ASSERT(pkColumn); UNIT_ASSERT(pkColumn->type_id() == arrow::Type::TIMESTAMP); - UNIT_ASSERT(meta.HasReadStats()); - auto& readStats = meta.GetReadStats(); - ui64 numBytes = readStats.GetDataBytes(); // compressed bytes in storage - auto tsColumn = std::static_pointer_cast<arrow::TimestampArray>(pkColumn); - resColumns.emplace_back(tsColumn, numBytes); - } else { - resColumns.emplace_back(nullptr, 0); + resColumns.back().first += tsColumn->length(); + if (resRead.GetFinished()) { + UNIT_ASSERT(meta.HasReadStats()); + auto& readStats = meta.GetReadStats(); + ui64 numBytes = readStats.GetDataBytes(); // compressed bytes in storage + resColumns.back().second += numBytes; + break; + } } if (reboots) { @@ -529,21 +544,21 @@ void TestTwoTiers(const TTestSchema::TTableSpecials& spec, bool compressed, bool ui64 allowOne = nowSec - ts[1] + 600; ui64 allowNone = nowSec - ts[1] - 600; - alters[0].Tiers[0].SetTtl(allowBoth); // tier0 allows/has: data[0], data[1] - alters[0].Tiers[1].SetTtl(allowBoth); // tier1 allows: data[0], data[1], has: nothing + alters[0].Tiers[0].SetEvictAfterSeconds(allowBoth); // tier0 allows/has: data[0], data[1] + alters[0].Tiers[1].SetEvictAfterSeconds(allowBoth); // tier1 allows: data[0], data[1], has: nothing - alters[1].Tiers[0].SetTtl(allowOne); // tier0 allows/has: data[1] - alters[1].Tiers[1].SetTtl(allowBoth); // tier1 allows: data[0], data[1], has: data[0] + alters[1].Tiers[0].SetEvictAfterSeconds(allowOne); // tier0 allows/has: data[1] + alters[1].Tiers[1].SetEvictAfterSeconds(allowBoth); // tier1 allows: data[0], data[1], has: data[0] - alters[2].Tiers[0].SetTtl(allowNone); // tier0 allows/has: nothing - alters[2].Tiers[1].SetTtl(allowOne); // tier1 allows/has: data[1] + alters[2].Tiers[0].SetEvictAfterSeconds(allowNone); // tier0 allows/has: nothing + alters[2].Tiers[1].SetEvictAfterSeconds(allowOne); // tier1 allows/has: data[1] - alters[3].Tiers[0].SetTtl(allowNone); // tier0 allows/has: nothing - alters[3].Tiers[1].SetTtl(allowNone); // tier1 allows/has: nothing + alters[3].Tiers[0].SetEvictAfterSeconds(allowNone); // tier0 allows/has: nothing + alters[3].Tiers[1].SetEvictAfterSeconds(allowNone); // tier1 allows/has: nothing ui32 portionSize = 80 * 1000; ui32 overlapSize = 40 * 1000; - std::vector<TString> blobs = MakeData(ts, portionSize, overlapSize); + std::vector<TString> blobs = MakeData(ts, portionSize, overlapSize, spec.GetTtlColumn()); auto columns = TestTiers(reboots, blobs, alters); @@ -557,9 +572,9 @@ void TestTwoTiers(const TTestSchema::TTableSpecials& spec, bool compressed, bool UNIT_ASSERT(columns[2].second); UNIT_ASSERT(!columns[3].second); - UNIT_ASSERT_EQUAL(columns[0].first->length(), 2 * portionSize - overlapSize); - UNIT_ASSERT_EQUAL(columns[0].first->length(), columns[1].first->length()); - UNIT_ASSERT_EQUAL(columns[2].first->length(), portionSize); + UNIT_ASSERT_EQUAL(columns[0].first, 2 * portionSize/* - overlapSize*/); + UNIT_ASSERT_EQUAL(columns[0].first, columns[1].first); + UNIT_ASSERT_EQUAL(columns[2].first, portionSize); Cerr << "read bytes: " << columns[0].second << ", " << columns[1].second << ", " << columns[2].second << "\n"; if (compressed) { @@ -571,8 +586,9 @@ void TestTwoTiers(const TTestSchema::TTableSpecials& spec, bool compressed, bool void TestTwoHotTiers(bool reboot) { TTestSchema::TTableSpecials spec; - spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier0")); - spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier1")); + spec.SetTtlColumn("timestamp"); + spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier0").SetTtlColumn("timestamp")); + spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier1").SetTtlColumn("timestamp")); spec.Tiers.back().SetCodec("zstd"); TestTwoTiers(spec, true, reboot); @@ -587,8 +603,9 @@ void TestHotAndColdTiers(bool reboot) { UNIT_ASSERT(s3Mock.Start()); TTestSchema::TTableSpecials spec; - spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier0")); - spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier1")); + spec.SetTtlColumn("timestamp"); + spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier0").SetTtlColumn("timestamp")); + spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier1").SetTtlColumn("timestamp")); spec.Tiers.back().S3 = NKikimrSchemeOp::TS3Settings(); auto& s3Config = *spec.Tiers.back().S3; { @@ -840,27 +857,31 @@ Y_UNIT_TEST_SUITE(TColumnShardTestSchema) { Y_UNIT_TEST(OneTier) { TTestSchema::TTableSpecials specs; - specs.Tiers.emplace_back(TTestSchema::TStorageTier("default")); + specs.SetTtlColumn("timestamp"); + specs.Tiers.emplace_back(TTestSchema::TStorageTier("default").SetTtlColumn("timestamp")); TestTtl(false, true, specs); } Y_UNIT_TEST(RebootOneTier) { NColumnShard::gAllowLogBatchingDefaultValue = false; TTestSchema::TTableSpecials specs; - specs.Tiers.emplace_back(TTestSchema::TStorageTier("default")); + specs.SetTtlColumn("timestamp"); + specs.Tiers.emplace_back(TTestSchema::TStorageTier("default").SetTtlColumn("timestamp")); TestTtl(true, true, specs); } Y_UNIT_TEST(OneTierExternalTtl) { TTestSchema::TTableSpecials specs; - specs.Tiers.emplace_back(TTestSchema::TStorageTier("default")); + specs.SetTtlColumn("timestamp"); + specs.Tiers.emplace_back(TTestSchema::TStorageTier("default").SetTtlColumn("timestamp")); TestTtl(false, false, specs); } Y_UNIT_TEST(RebootOneTierExternalTtl) { NColumnShard::gAllowLogBatchingDefaultValue = false; TTestSchema::TTableSpecials specs; - specs.Tiers.emplace_back(TTestSchema::TStorageTier("default")); + specs.SetTtlColumn("timestamp"); + specs.Tiers.emplace_back(TTestSchema::TStorageTier("default").SetTtlColumn("timestamp")); TestTtl(true, false, specs); } diff --git a/ydb/core/tx/columnshard/ut_columnshard_tiers.cpp b/ydb/core/tx/columnshard/ut_columnshard_tiers.cpp index 521ed7aa450..7b407936d1e 100644 --- a/ydb/core/tx/columnshard/ut_columnshard_tiers.cpp +++ b/ydb/core/tx/columnshard/ut_columnshard_tiers.cpp @@ -1,15 +1,16 @@ #include "columnshard_ut_common.h" -#include "external_data.h" -#include <ydb/core/wrappers/ut_helpers/s3_mock.h> -#include <ydb/core/wrappers/s3_wrapper.h> -#include <ydb/services/metadata/service.h> #include <ydb/core/cms/console/configs_dispatcher.h> #include <ydb/core/testlib/cs_helper.h> -#include <ydb/core/tx/tx_proxy/proxy.h> +#include <ydb/core/tx/tiering/external_data.h> #include <ydb/core/tx/schemeshard/schemeshard.h> +#include <ydb/core/tx/tx_proxy/proxy.h> +#include <ydb/core/wrappers/ut_helpers/s3_mock.h> +#include <ydb/core/wrappers/s3_wrapper.h> #include <ydb/public/sdk/cpp/client/ydb_table/table.h> +#include <ydb/services/metadata/service.h> #include <library/cpp/actors/core/av_bootstrapped.h> +#include <library/cpp/protobuf/json/proto2json.h> #include <util/system/hostname.h> @@ -44,13 +45,7 @@ public: #Columns { Name: "request_id" Type: "Utf8" } KeyColumnNames: "timestamp" Engine: COLUMN_ENGINE_REPLACING_TIMESERIES - StorageTiers { - Name: "tier1" - ObjectStorage { - Endpoint: "fake" - AccessKey: "$a" - } - } + EnableTiering : true } } )", storeName.c_str(), storeShardsCount)); @@ -77,6 +72,33 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { const TString ConfigProtoStr = "Name : \"abc\""; + class TJsonChecker { + private: + YDB_ACCESSOR_DEF(TString, Path); + YDB_ACCESSOR_DEF(TString, Expectation); + public: + TJsonChecker(const TString& path, const TString& expectation) + : Path(path) + , Expectation(expectation) + { + + } + bool Check(const NJson::TJsonValue& jsonInfo) const { + auto* jsonPathValue = jsonInfo.GetValueByPath(Path); + if (!jsonPathValue) { + return Expectation == "__NULL"; + } + return jsonPathValue->GetStringRobust() == Expectation; + } + + TString GetDebugString() const { + TStringBuilder sb; + sb << "path=" << Path << ";" + << "expectation=" << Expectation << ";"; + return sb; + } + }; + class TTestCSEmulator: public NActors::TActorBootstrapped<TTestCSEmulator> { private: using TBase = NActors::TActorBootstrapped<TTestCSEmulator>; @@ -84,27 +106,88 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { TActorId ProviderId; TInstant Start; YDB_READONLY_FLAG(Found, false); + YDB_ACCESSOR(ui32, TieringsCount, 1); + using TKeyCheckers = TMap<TString, TJsonChecker>; + YDB_ACCESSOR_DEF(TKeyCheckers, Checkers); public: - STFUNC(StateInit) { + STATEFN(StateInit) { switch (ev->GetTypeRewrite()) { - HFunc(NMetadataProvider::TEvRefreshSubscriberData, Handle); + hFunc(NMetadataProvider::TEvRefreshSubscriberData, Handle); default: Y_VERIFY(false); } } - void Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev, const TActorContext&) { - auto value = ev->Get()->GetSnapshotAs<NTiers::TConfigsSnapshot>()->GetValue("/Root/olapStore.tier1"); - if (value && value->GetProtoConfig().GetName() == "abc") { - FoundFlag = true; - } else { - Cerr << ev->Get()->GetSnapshot()->SerializeToString() << Endl; + void CheckRuntime(TTestActorRuntime& runtime) { + const auto pred = [this](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event)->TTestActorRuntimeBase::EEventAction { + if (event->HasBuffer() && !event->HasEvent()) { + } else if (!event->GetBase()) { + } else { + auto ptr = dynamic_cast<NMetadataProvider::TEvRefreshSubscriberData*>(event->GetBase()); + if (ptr) { + CheckFound(ptr); + } + } + return TTestActorRuntimeBase::EEventAction::PROCESS; + }; + + runtime.SetObserverFunc(pred); + + for (const TInstant start = Now(); !IsFound() && Now() - start < TDuration::Seconds(10); ) { + runtime.DispatchEvents(TDispatchOptions(), TDuration::Seconds(1)); + } + Y_VERIFY(IsFound()); + } + + void CheckFound(NMetadataProvider::TEvRefreshSubscriberData* event) { + auto snapshot = event->GetSnapshotAs<NTiers::TConfigsSnapshot>(); + Y_VERIFY(!!snapshot); + auto* tInfo = snapshot->GetTableTiering("/Root/olapStore/olapTable"); + if (TieringsCount) { + if (!tInfo) { + Cerr << "tiering not found: " << snapshot->SerializeToString() << Endl; + return ; + } + if (tInfo->GetRules().size() != TieringsCount) { + Cerr << "TieringsCount incorrect: " << snapshot->SerializeToString() << Endl; + return; + } + } else if (tInfo) { + Cerr << "tiering found but its incorrect: " << snapshot->SerializeToString() << Endl; + return; + } + for (auto&& [_, tiering] : snapshot->GetTableTierings()) { + if (tiering.GetTablePathId() == 0) { + Cerr << "PathId not initialized: " << snapshot->SerializeToString() << Endl; + return; + } else if (tiering.GetTablePathId() == InvalidLocalPathId) { + Cerr << "PathId invalid: " << snapshot->SerializeToString() << Endl; + return; + } else { + Cerr << "PathId: " << tiering.GetTablePathId() << Endl; + } } + for (auto&& i : Checkers) { + auto value = snapshot->GetValue(i.first); + NJson::TJsonValue jsonData; + NProtobufJson::Proto2Json(value->GetProtoConfig(), jsonData); + if (!i.second.Check(jsonData)) { + Cerr << "config value incorrect:" << snapshot->SerializeToString() << ";snapshot_check_path=" << i.first << Endl; + Cerr << "json path incorrect:" << jsonData << ";" << i.second.GetDebugString() << Endl; + return; + } + } + FoundFlag = true; + } + + void Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev) { + CheckFound(ev->Get()); } void Bootstrap() { ProviderId = NMetadataProvider::MakeServiceId(1); - ExternalDataManipulation = std::make_shared<NTiers::TSnapshotConstructor>("/Root/.external_data/tiers"); + ExternalDataManipulation = std::make_shared<NTiers::TSnapshotConstructor>(); + ExternalDataManipulation->Start(ExternalDataManipulation); Become(&TThis::StateInit); Sender<NMetadataProvider::TEvSubscribeExternal>(ExternalDataManipulation).SendTo(ProviderId); Start = Now(); @@ -137,8 +220,11 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { auto sender = runtime.AllocateEdgeActor(); server->SetupRootStoragePools(sender); + TLocalHelper lHelper(*server); + lHelper.CreateTestOlapTable(); { TTestCSEmulator* emulator = new TTestCSEmulator; + emulator->MutableCheckers().emplace("/Root/olapStore.tier1", TJsonChecker("Name", "abc")); runtime.Register(emulator); { const TInstant start = Now(); @@ -152,15 +238,26 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { } } - NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false)); - tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) { - auto sResult = f.GetValueSync(); - Cerr << sResult.GetIssues().ToString() << Endl; - auto session = sResult.GetSession(); - session.ExecuteDataQuery("INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) VALUES ('/Root/olapStore', 'tier1', '" + ConfigProtoStr + "')" - , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()); - }); - + { + NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false)); + tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) { + auto session = f.GetValueSync().GetSession(); + session.ExecuteDataQuery( + "INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) " + "VALUES ('/Root/olapStore', 'tier1', '" + ConfigProtoStr + "')" + , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()); + }); + } + { + NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false)); + tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) { + auto session = f.GetValueSync().GetSession(); + session.ExecuteDataQuery( + "INSERT INTO `/Root/.external_data/tiering` (ownerPath, tierName, tablePath, column, durationForEvict) " + "VALUES ('/Root/olapStore', 'tier1', '/Root/olapStore/olapTable', 'timestamp', '10d')" + , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()); + }); + } const TInstant start = Now(); while (!emulator->IsFound() && Now() - start < TDuration::Seconds(20)) { runtime.DispatchEvents(TDispatchOptions(), TDuration::Seconds(10)); @@ -170,7 +267,7 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { //runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE); //runtime.SetLogPriority(NKikimrServices::KQP_YQL, NLog::PRI_TRACE); } -/* + Y_UNIT_TEST(DSConfigs) { TPortManager pm; @@ -189,8 +286,6 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { Tests::TServer::TPtr server = new Tests::TServer(serverSettings); server->EnableGRpc(grpcPort); - // server->SetupDefaultProfiles(); - Tests::TClient client(serverSettings); auto& runtime = *server->GetRuntime(); @@ -198,11 +293,12 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { auto sender = runtime.AllocateEdgeActor(); server->SetupRootStoragePools(sender); TLocalHelper lHelper(*server); - lHelper.CreateTestOlapTable(); + lHelper.CreateTestOlapTable("olapTable"); - runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); - const TInstant start = Now(); - while (Now() - start < TDuration::Seconds(10)) { + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_NOTICE); + runtime.SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NLog::PRI_INFO); +// runtime.SetLogPriority(NKikimrServices::TX_PROXY_SCHEME_CACHE, NLog::PRI_DEBUG); + for (const TInstant start = Now(); Now() - start < TDuration::Seconds(10); ) { runtime.WaitForEdgeEvents([](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) { Cerr << "Step " << event->Type << Endl; return false; @@ -211,44 +307,89 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { Cerr << "Step finished" << Endl; } - NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false)); - tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) { - auto sResult = f.GetValueSync(); - Cerr << sResult.GetIssues().ToString() << Endl; - auto session = sResult.GetSession(); - session.ExecuteDataQuery("INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) VALUES ('/Root/olapStore', 'tier1', '" + ConfigProtoStr + "')" - , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()); - }); - - bool found = false; - bool* foundPtr = &found; - const auto pred = [foundPtr](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event)->TTestActorRuntimeBase::EEventAction { - if (event->HasBuffer() && !event->HasEvent()) { - } else if (!event->GetBase()) { - Cerr << "Type nullptr" << Endl; - } else { - Cerr << "Step " << event->GetBase()->Type() << Endl; - auto ptr = dynamic_cast<NMetadataProvider::TEvRefreshSubscriberData*>(event->GetBase()); - if (ptr) { - auto value = ptr->GetSnapshotAs<NTiers::TConfigsSnapshot>()->GetValue("/Root/olapStore.tier1"); - if (value && value->GetProtoConfig().GetName() == "abc") { - *foundPtr = true; - } else { - Cerr << ptr->GetSnapshot()->SerializeToString() << Endl; - } - } - } - return TTestActorRuntimeBase::EEventAction::PROCESS; - }; + { + NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false)); + tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) { + auto session = f.GetValueSync().GetSession(); + session.ExecuteDataQuery( + "INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) " + "VALUES ('/Root/olapStore', 'tier1', '" + ConfigProtoStr + "')" + , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()); + }); + } + { + NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false)); + tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) { + auto session = f.GetValueSync().GetSession(); + session.ExecuteDataQuery( + "INSERT INTO `/Root/.external_data/tiering` (ownerPath, tierName, tablePath, column, durationForEvict) " + "VALUES ('/Root/olapStore', 'tier1', '/Root/olapStore/olapTable', 'timestamp', '10d')" + , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()); + }); + } + { + TTestCSEmulator emulator; + emulator.MutableCheckers().emplace("/Root/olapStore.tier1", TJsonChecker("Name", "abc")); + emulator.SetTieringsCount(1); + emulator.CheckRuntime(runtime); + } + + { + NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false)); + tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) { + auto session = f.GetValueSync().GetSession(); + session.ExecuteDataQuery( + "INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) " + "VALUES ('/Root/olapStore', 'tier2', '" + ConfigProtoStr + "')" + , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()); + }); + } + { + NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false)); + tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) { + auto session = f.GetValueSync().GetSession(); + session.ExecuteDataQuery( + "INSERT INTO `/Root/.external_data/tiering` (ownerPath, tierName, tablePath, column, durationForEvict) " + "VALUES ('/Root/olapStore', 'tier2', '/Root/olapStore/olapTable', 'timestamp', '20d')" + , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()); + }); + } - runtime.SetObserverFunc(pred); + { + TTestCSEmulator emulator; + emulator.MutableCheckers().emplace("/Root/olapStore.tier1", TJsonChecker("Name", "abc")); + emulator.MutableCheckers().emplace("/Root/olapStore.tier2", TJsonChecker("Name", "abc")); + emulator.SetTieringsCount(2); + emulator.CheckRuntime(runtime); + } - while (!found) { - runtime.DispatchEvents(TDispatchOptions(), TDuration::Seconds(10)); + { + NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false)); + tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) { + auto session = f.GetValueSync().GetSession(); + session.ExecuteDataQuery( + "DELETE FROM `/Root/.external_data/tiers`" + , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()); + }); + } + { + NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false)); + tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) { + auto session = f.GetValueSync().GetSession(); + session.ExecuteDataQuery( + "DELETE FROM `/Root/.external_data/tiering`" + , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()); + }); + } + { + TTestCSEmulator emulator; + emulator.SetTieringsCount(0); + emulator.CheckRuntime(runtime); } + //runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE); //runtime.SetLogPriority(NKikimrServices::KQP_YQL, NLog::PRI_TRACE); } -*/ + } } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp index 275428e540c..6f167751aa1 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp @@ -49,11 +49,6 @@ TColumnTableInfo::TPtr ParseParams( tableSchema = &presetProto.GetSchema(); } - THashSet<TString> knownTiers; - for (const auto& tier : tableSchema->GetStorageTiers()) { - knownTiers.insert(tier.GetName()); - } - THashMap<ui32, TOlapSchema::TColumn> columns; THashMap<TString, ui32> columnsByName; for (const auto& col : tableSchema->GetColumns()) { @@ -65,7 +60,7 @@ TColumnTableInfo::TPtr ParseParams( columnsByName[name] = id; } - if (!ValidateTtlSettings(alter.GetAlterTtlSettings(), columns, columnsByName, knownTiers, errStr)) { + if (!ValidateTtlSettings(alter.GetAlterTtlSettings(), columns, columnsByName, errStr)) { status = NKikimrScheme::StatusInvalidParameter; return nullptr; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp index aeb53be7b82..a2685bf2b2d 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp @@ -12,13 +12,6 @@ namespace { using namespace NKikimr; using namespace NSchemeShard; -bool PrepareTier(const NKikimrSchemeOp::TStorageTierConfig& proto, TString& errStr) { - Y_UNUSED(proto); - Y_UNUSED(errStr); - // TODO - return true; -} - // TODO: make it a part of TOlapSchema bool PrepareSchema(NKikimrSchemeOp::TColumnTableSchema& proto, TOlapSchema& schema, TString& errStr) { schema.NextColumnId = proto.GetNextColumnId(); @@ -107,7 +100,7 @@ bool PrepareSchema(NKikimrSchemeOp::TColumnTableSchema& proto, TOlapSchema& sche errStr = "At least one key column is required"; return false; } - +#if 0 for (auto& tierConfig : proto.GetStorageTiers()) { TString tierName = tierConfig.GetName(); if (schema.Tiers.count(tierName)) { @@ -120,6 +113,7 @@ bool PrepareSchema(NKikimrSchemeOp::TColumnTableSchema& proto, TOlapSchema& sche return false; } } +#endif schema.Engine = proto.GetEngine(); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp index 40e92d9f0fc..917688b71c5 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp @@ -70,7 +70,6 @@ TColumnTableInfo::TPtr CreateColumnTable( Y_VERIFY(pSchema, "Expected to find a preset schema"); - THashSet<TString> storageTiers; if (op.HasSchema()) { auto& opSchema = op.GetSchema(); const NScheme::TTypeRegistry* typeRegistry = AppData()->TypeRegistry; @@ -176,20 +175,7 @@ TColumnTableInfo::TPtr CreateColumnTable( return nullptr; } - storageTiers.clear(); - for (auto tier : opSchema.GetStorageTiers()) { - auto& name = tier.GetName(); - if (!pSchema->Tiers.count(name)) { - status = NKikimrScheme::StatusSchemeError; - errStr = TStringBuilder() << "Specified schema tier does not match schema preset: '" << name << "'"; - return nullptr; - } - storageTiers.insert(name); - } - op.ClearSchema(); - } else { - storageTiers = pSchema->Tiers; } if (op.HasRESERVED_TtlSettingsPresetName() || op.HasRESERVED_TtlSettingsPresetId()) { @@ -204,7 +190,7 @@ TColumnTableInfo::TPtr CreateColumnTable( // Validate ttl settings and schema compatibility if (op.HasTtlSettings()) { - if (!ValidateTtlSettings(op.GetTtlSettings(), pSchema->Columns, pSchema->ColumnsByName, storageTiers, errStr)) { + if (!ValidateTtlSettings(op.GetTtlSettings(), pSchema->Columns, pSchema->ColumnsByName, errStr)) { status = NKikimrScheme::StatusInvalidParameter; return nullptr; } diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp index 5f17890af45..5c9a351992d 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp @@ -2023,9 +2023,6 @@ TOlapStoreInfo::TOlapStoreInfo( col.KeyOrder = preset.KeyColumnIds.size(); preset.KeyColumnIds.push_back(col.Id); } - for (const auto& tierConfig : presetProto.GetSchema().GetStorageTiers()) { - preset.Tiers.insert(tierConfig.GetName()); - } preset.Engine = presetProto.GetSchema().GetEngine(); preset.NextColumnId = presetProto.GetSchema().GetNextColumnId(); preset.Version = presetProto.GetSchema().GetVersion(); diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index e3ddc608aa9..1d03551aa85 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -804,7 +804,6 @@ struct TOlapSchema { TColumns Columns; TColumnsByName ColumnsByName; TVector<ui32> KeyColumnIds; - THashSet<TString> Tiers; // TODO: hash map with tier config NKikimrSchemeOp::EColumnTableEngine Engine = NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES; ui32 NextColumnId = 1; ui64 Version = 1; @@ -2796,7 +2795,6 @@ bool ValidateTtlSettings(const NKikimrSchemeOp::TTTLSettings& ttl, bool ValidateTtlSettings(const NKikimrSchemeOp::TColumnDataLifeCycle& ttl, const THashMap<ui32, TOlapSchema::TColumn>& columns, const THashMap<TString, ui32>& columnsByName, - const THashSet<TString>& knownTiers, TString& errStr); bool ValidateTtlSettingsChange( const NKikimrSchemeOp::TColumnDataLifeCycle& oldTtl, diff --git a/ydb/core/tx/schemeshard/schemeshard_validate_ttl.cpp b/ydb/core/tx/schemeshard/schemeshard_validate_ttl.cpp index b209f7e43c1..6b3bc7a2cc5 100644 --- a/ydb/core/tx/schemeshard/schemeshard_validate_ttl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_validate_ttl.cpp @@ -158,44 +158,20 @@ static bool ValidateColumnTableTtl(const NKikimrSchemeOp::TColumnDataLifeCycle:: bool ValidateTtlSettings(const NKikimrSchemeOp::TColumnDataLifeCycle& ttl, const THashMap<ui32, TOlapSchema::TColumn>& columns, const THashMap<TString, ui32>& columnsByName, - const THashSet<TString>& knownTiers, TString& errStr) { using TTtlProto = NKikimrSchemeOp::TColumnDataLifeCycle; - THashSet<TString> prevTiers; - switch (ttl.GetStatusCase()) { - case TTtlProto::kTiering: { - for (auto& tier : ttl.GetTiering().GetTiers()) { - auto& name = tier.GetName(); - if (name.empty()) { - errStr = "Unnamed tier in TTL policy"; - return false; - } - if (!knownTiers.count(name)) { - errStr = Sprintf("Unknown tier in TTL policy: '%s'", name.data()); - return false; - } - if (prevTiers.count(name)) { - errStr = Sprintf("The same tier name in TTL policy: '%s'", name.data()); - return false; - } - prevTiers.insert(name); - - if (!ValidateColumnTableTtl(tier.GetEviction(), {}, columns, columnsByName, errStr)) { - return false; - } - } - break; - } - case TTtlProto::kEnabled: - return ValidateColumnTableTtl(ttl.GetEnabled(), {}, columns, columnsByName, errStr); - case TTtlProto::kDisabled: - break; - default: - errStr = "TTL status must be specified"; - return false; + case TTtlProto::kTiering: + break; + case TTtlProto::kEnabled: + return ValidateColumnTableTtl(ttl.GetEnabled(), {}, columns, columnsByName, errStr); + case TTtlProto::kDisabled: + break; + default: + errStr = "TTL status must be specified"; + return false; } return true; @@ -217,19 +193,6 @@ bool ValidateTtlSettingsChange( if (ttl.GetStatusCase() == NKikimrSchemeOp::TColumnDataLifeCycle::kEnabled) { newTtlColName = ttl.GetEnabled().GetColumnName(); oldTtlColName = oldTtl.GetEnabled().GetColumnName(); - } else if (ttl.GetStatusCase() == NKikimrSchemeOp::TColumnDataLifeCycle::kTiering) { - if (ttl.GetTiering().TiersSize() != oldTtl.GetTiering().TiersSize()) { - errStr = "Changing of tiers count is not supported yet"; - return false; - } - - if (ttl.GetTiering().TiersSize() != 1) { - errStr = "Changing of multiple tiers is not supported yet"; - return false; - } - - newTtlColName = ttl.GetTiering().GetTiers()[0].GetEviction().GetColumnName(); - oldTtlColName = oldTtl.GetTiering().GetTiers()[0].GetEviction().GetColumnName(); } if (newTtlColName != oldTtlColName) { diff --git a/ydb/core/tx/schemeshard/ut_olap.cpp b/ydb/core/tx/schemeshard/ut_olap.cpp index 154f062b9ec..82e6a5549ec 100644 --- a/ydb/core/tx/schemeshard/ut_olap.cpp +++ b/ydb/core/tx/schemeshard/ut_olap.cpp @@ -153,13 +153,12 @@ Y_UNIT_TEST_SUITE(TOlap) { Columns { Name: "data" Type: "Utf8" } KeyColumnNames: "timestamp" Engine: COLUMN_ENGINE_REPLACING_TIMESERIES - StorageTiers { Name: "tier0" } - StorageTiers { Name: "tier0" } + EnableTiering : true } } )"; - TestCreateOlapStore(runtime, ++txId, "/MyRoot", olapSchema1, {NKikimrScheme::StatusSchemeError}); + TestCreateOlapStore(runtime, ++txId, "/MyRoot", olapSchema1, {NKikimrScheme::StatusAccepted}); } Y_UNIT_TEST(CreateStoreWithDirs) { @@ -310,9 +309,9 @@ Y_UNIT_TEST_SUITE(TOlap) { Columns { Name: "data" Type: "Utf8" } KeyColumnNames: "timestamp" Engine: COLUMN_ENGINE_REPLACING_TIMESERIES - StorageTiers { Name: "tierX" } + EnableTiering : true } - )", {NKikimrScheme::StatusSchemeError}); + )", {NKikimrScheme::StatusAccepted}); } Y_UNIT_TEST(CustomDefaultPresets) { @@ -393,9 +392,7 @@ Y_UNIT_TEST_SUITE(TOlap) { Columns { Name: "data" Type: "Utf8" } KeyColumnNames: "timestamp" Engine: COLUMN_ENGINE_REPLACING_TIMESERIES - StorageTiers { Name: "tier0" } - StorageTiers { Name: "tier1" Compression { CompressionCodec: ColumnCodecZSTD CompressionLevel: 5 } } - StorageTiers { Name: "tier2" Compression { CompressionCodec: ColumnCodecZSTD CompressionLevel: 10 } } + EnableTiering : true } } )"; @@ -466,7 +463,7 @@ Y_UNIT_TEST_SUITE(TOlap) { )"; TestCreateColumnTable(runtime, ++txId, "/MyRoot/OlapStore", tableSchema4, - {NKikimrScheme::StatusInvalidParameter}); + {NKikimrScheme::StatusAccepted}); } Y_UNIT_TEST(AlterStore) { @@ -536,8 +533,7 @@ Y_UNIT_TEST_SUITE(TOlap) { Columns { Name: "data" Type: "Utf8" } KeyColumnNames: "timestamp" Engine: COLUMN_ENGINE_REPLACING_TIMESERIES - StorageTiers { Name: "tier0" } - StorageTiers { Name: "tier1" } + EnableTiering : true } } )"; diff --git a/ydb/core/tx/tiering/CMakeLists.txt b/ydb/core/tx/tiering/CMakeLists.txt new file mode 100644 index 00000000000..48b8969a74c --- /dev/null +++ b/ydb/core/tx/tiering/CMakeLists.txt @@ -0,0 +1,30 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(core-tx-tiering) +target_link_libraries(core-tx-tiering PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core + cpp-json-writer + ydb-core-blobstorage + ydb-core-protos + core-tablet_flat-protos + ydb-core-wrappers + api-protos + ydb-services-metadata +) +target_sources(core-tx-tiering PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/manager.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/decoder.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/external_data.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/tier_config.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/rule.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/s3_actor.cpp +) diff --git a/ydb/core/tx/tiering/decoder.cpp b/ydb/core/tx/tiering/decoder.cpp new file mode 100644 index 00000000000..40196235569 --- /dev/null +++ b/ydb/core/tx/tiering/decoder.cpp @@ -0,0 +1,43 @@ +#include "decoder.h" +#include <library/cpp/protobuf/json/proto2json.h> +#include <contrib/libs/protobuf/src/google/protobuf/text_format.h> +#include <library/cpp/actors/core/log.h> + +namespace NKikimr::NInternal { + +i32 TDecoderBase::GetFieldIndex(const Ydb::ResultSet& rawData, const TString& columnId, const bool verify /*= true*/) const { + i32 idx = 0; + for (auto&& i : rawData.columns()) { + if (i.name() == columnId) { + return idx; + } + ++idx; + } + Y_VERIFY(!verify, "incorrect columnId %s", columnId.data()); + return -1; +} + +bool TDecoderBase::Read(const ui32 columnIdx, TString& result, const Ydb::Value& r) const { + result = r.items()[columnIdx].bytes_value(); + return true; +} + +bool TDecoderBase::Read(const ui32 columnIdx, TDuration& result, const Ydb::Value& r) const { + const TString& s = r.items()[columnIdx].bytes_value(); + if (!TDuration::TryParse(s, result)) { + ALS_WARN(0) << "cannot parse duration for tiering: " << s; + return false; + } + return true; +} + +bool TDecoderBase::ReadDebugProto(const ui32 columnIdx, ::google::protobuf::Message& result, const Ydb::Value& r) const { + const TString& s = r.items()[columnIdx].bytes_value(); + if (!::google::protobuf::TextFormat::ParseFromString(s, &result)) { + ALS_ERROR(0) << "cannot parse proto string: " << s; + return false; + } + return true; +} + +} diff --git a/ydb/core/tx/tiering/decoder.h b/ydb/core/tx/tiering/decoder.h new file mode 100644 index 00000000000..6866feedebb --- /dev/null +++ b/ydb/core/tx/tiering/decoder.h @@ -0,0 +1,16 @@ +#pragma once +#include <ydb/public/api/protos/ydb_value.pb.h> +#include <util/datetime/base.h> + +namespace NKikimr::NInternal { + +class TDecoderBase { +protected: + i32 GetFieldIndex(const Ydb::ResultSet& rawData, const TString& columnId, const bool verify = true) const; +public: + bool Read(const ui32 columnIdx, TString& result, const Ydb::Value& r) const; + bool ReadDebugProto(const ui32 columnIdx, ::google::protobuf::Message& result, const Ydb::Value& r) const; + bool Read(const ui32 columnIdx, TDuration& result, const Ydb::Value& r) const; +}; + +} diff --git a/ydb/core/tx/tiering/external_data.cpp b/ydb/core/tx/tiering/external_data.cpp new file mode 100644 index 00000000000..ebabf12bcb3 --- /dev/null +++ b/ydb/core/tx/tiering/external_data.cpp @@ -0,0 +1,240 @@ +#include "external_data.h" + +#include <ydb/core/base/path.h> + +#include <library/cpp/json/writer/json_value.h> +#include <library/cpp/protobuf/json/proto2json.h> + +#include <util/string/join.h> + +namespace NKikimr::NColumnShard::NTiers { + +bool TConfigsSnapshot::DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawDataResult) { + Y_VERIFY(rawDataResult.result_sets().size() == 2); + { + auto& rawData = rawDataResult.result_sets()[0]; + TTierConfig::TDecoder decoder(rawData); + for (auto&& r : rawData.rows()) { + TTierConfig config; + if (!config.DeserializeFromRecord(decoder, r)) { + ALS_ERROR(NKikimrServices::TX_COLUMNSHARD) << "cannot parse tier config from snapshot"; + continue; + } + TierConfigs.emplace(config.GetConfigId(), config); + } + } + { + auto& rawData = rawDataResult.result_sets()[1]; + TTieringRule::TDecoder decoder(rawData); + TVector<TTieringRule> rulesLocal; + rulesLocal.reserve(rawData.rows().size()); + for (auto&& r : rawData.rows()) { + TTieringRule tr; + if (!tr.DeserializeFromRecord(decoder, r)) { + ALS_WARN(NKikimrServices::TX_COLUMNSHARD) << "cannot parse record for tiering info"; + continue; + } + rulesLocal.emplace_back(std::move(tr)); + } + std::sort(rulesLocal.begin(), rulesLocal.end()); + for (auto&& i : rulesLocal) { + const TString tablePath = i.GetTablePath(); + TableTierings[tablePath].AddRule(std::move(i)); + } + } + return true; +} + +std::optional<TTierConfig> TConfigsSnapshot::GetValue(const TString& key) const { + auto it = TierConfigs.find(key); + if (it == TierConfigs.end()) { + return {}; + } else { + return it->second; + } +} + +void TConfigsSnapshot::RemapTablePathToId(const TString& path, const ui64 pathId) { + auto it = TableTierings.find(path); + Y_VERIFY(it != TableTierings.end()); + it->second.SetTablePathId(pathId); +} + +const TTableTiering* TConfigsSnapshot::GetTableTiering(const TString& tablePath) const { + auto it = TableTierings.find(tablePath); + if (it == TableTierings.end()) { + return nullptr; + } else { + return &it->second; + } +} + +TVector<NMetadataProvider::ITableModifier::TPtr> TSnapshotConstructor::DoGetTableSchema() const { + TVector<NMetadataProvider::ITableModifier::TPtr> result; + { + Ydb::Table::CreateTableRequest request; + request.set_session_id(""); + request.set_path(Tables[0]); + request.add_primary_key("ownerPath"); + request.add_primary_key("tierName"); + { + auto& column = *request.add_columns(); + column.set_name("tierName"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *request.add_columns(); + column.set_name("ownerPath"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *request.add_columns(); + column.set_name("tierConfig"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + result.emplace_back(new NMetadataProvider::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request)); + } + { + Ydb::Table::CreateTableRequest request; + request.set_session_id(""); + request.set_path(Tables[1]); + request.add_primary_key("ownerPath"); + request.add_primary_key("tierName"); + request.add_primary_key("tablePath"); + { + auto& column = *request.add_columns(); + column.set_name("tierName"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *request.add_columns(); + column.set_name("ownerPath"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *request.add_columns(); + column.set_name("tablePath"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *request.add_columns(); + column.set_name("durationForEvict"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *request.add_columns(); + column.set_name("column"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + result.emplace_back(new NMetadataProvider::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request)); + } + return result; +} + +NThreading::TFuture<NMetadataProvider::ISnapshot::TPtr> TSnapshotConstructor::EnrichSnapshotData(ISnapshot::TPtr original) const { + if (!Actor) { + return NThreading::MakeErrorFuture<NMetadataProvider::ISnapshot::TPtr>( + std::make_exception_ptr(std::runtime_error("actor finished for enrich process"))); + } + auto current = std::dynamic_pointer_cast<TConfigsSnapshot>(original); + Y_VERIFY(current); + auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>(); + request->DatabaseName = NKikimr::CanonizePath(AppData()->TenantName); + for (auto&& i : current->GetTableTierings()) { + auto it = TablesRemapper.find(i.second.GetTablePath()); + if (it != TablesRemapper.end()) { + current->RemapTablePathToId(it->first, it->second); + } else { + auto& entry = request->ResultSet.emplace_back(); + entry.Operation = TNavigate::OpTable; + entry.Path = NKikimr::SplitPath(i.second.GetTablePath()); + } + } + if (request->ResultSet.empty()) { + return NThreading::MakeFuture(original); + } else { + WaitPromise = NThreading::NewPromise<ISnapshot::TPtr>(); + WaitSnapshot = current; + Actor->ProvideEvent(new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release()), MakeSchemeCacheID()); + return WaitPromise.GetFuture(); + } +} + +void TSnapshotConstructorAgent::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { + Owner->ResolveInfo(ev->Get()); +} + +void TSnapshotConstructorAgent::ProvideEvent(IEventBase* event, const TActorId& recipient) { + Send(recipient, event); +} + +TSnapshotConstructorAgent::~TSnapshotConstructorAgent() { + Owner->ActorStopped(); +} + +void TSnapshotConstructor::ResolveInfo(const TEvTxProxySchemeCache::TEvNavigateKeySetResult* info) { + const auto& request = info->Request; + + if (request->ResultSet.empty()) { + WaitPromise.SetException("cannot resolve table path to ids"); + WaitSnapshot = nullptr; + return; + } + + for (auto&& i : request->ResultSet) { + const TString path = "/" + JoinSeq("/", i.Path); + switch (i.Status) { + case TNavigate::EStatus::Ok: + TablesRemapper.emplace(path, i.TableId.PathId.LocalPathId); + break; + case TNavigate::EStatus::RootUnknown: + case TNavigate::EStatus::PathErrorUnknown: + WaitPromise.SetException("path not exists " + path); + break; + case TNavigate::EStatus::LookupError: + case TNavigate::EStatus::RedirectLookupError: + WaitPromise.SetException("RESOLVE_LOOKUP_ERROR" + path); + break; + default: + WaitPromise.SetException("GENERIC_RESOLVE_ERROR" + path); + break; + } + } + if (WaitPromise.HasException()) { + WaitSnapshot = nullptr; + } else { + for (auto&& i : WaitSnapshot->GetTableTierings()) { + auto it = TablesRemapper.find(i.second.GetTablePath()); + if (it != TablesRemapper.end()) { + WaitSnapshot->RemapTablePathToId(it->first, it->second); + } else { + WaitPromise.SetException("undecoded path " + i.second.GetTablePath()); + WaitSnapshot = nullptr; + return; + } + } + WaitPromise.SetValue(WaitSnapshot); + WaitSnapshot = nullptr; + } +} + +TSnapshotConstructor::TSnapshotConstructor() { + TablePath = "/" + AppData()->TenantName + "/.external_data"; + Tables.emplace_back(TablePath + "/tiers"); + Tables.emplace_back(TablePath + "/tiering"); +} + +TString NTiers::TConfigsSnapshot::DoSerializeToString() const { + NJson::TJsonValue result = NJson::JSON_MAP; + auto& jsonTiers = result.InsertValue("tiers", NJson::JSON_MAP); + for (auto&& i : TierConfigs) { + jsonTiers.InsertValue(i.first, i.second.GetDebugJson()); + } + auto& jsonTiering = result.InsertValue("tiering", NJson::JSON_MAP); + for (auto&& i : TableTierings) { + jsonTiering.InsertValue(i.first, i.second.GetDebugJson()); + } + return result.GetStringRobust(); +} + +} diff --git a/ydb/core/tx/tiering/external_data.h b/ydb/core/tx/tiering/external_data.h new file mode 100644 index 00000000000..bb03ed8c9f4 --- /dev/null +++ b/ydb/core/tx/tiering/external_data.h @@ -0,0 +1,105 @@ +#pragma once +#include "rule.h" +#include "tier_config.h" + +#include <ydb/core/protos/flat_scheme_op.pb.h> +#include <ydb/core/tx/scheme_cache/scheme_cache.h> +#include <ydb/services/metadata/service.h> + +#include <library/cpp/json/writer/json_value.h> + +namespace NKikimr::NColumnShard::NTiers { + +class TConfigsSnapshot: public NMetadataProvider::ISnapshot { +private: + using TBase = NMetadataProvider::ISnapshot; + using TConfigsMap = TMap<TString, TTierConfig>; + YDB_ACCESSOR_DEF(TConfigsMap, TierConfigs); + using TTieringMap = TMap<TString, TTableTiering>; + YDB_ACCESSOR_DEF(TTieringMap, TableTierings); +protected: + virtual bool DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawData) override; + virtual TString DoSerializeToString() const override; +public: + const TTableTiering* GetTableTiering(const TString& tablePath) const; + void RemapTablePathToId(const TString& path, const ui64 pathId); + std::optional<TTierConfig> GetValue(const TString& key) const; + using TBase::TBase; +}; + +class TSnapshotConstructor; + +class TSnapshotConstructorAgent: public TActor<TSnapshotConstructorAgent> { +private: + using TBase = TActor<TSnapshotConstructorAgent>; + std::shared_ptr<TSnapshotConstructor> Owner; +private: + void Handle(NActors::TEvents::TEvPoison::TPtr& /*ev*/) { + PassAway(); + } + void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev); +public: + STATEFN(StateMain) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); + hFunc(NActors::TEvents::TEvPoison, Handle); + default: + break; + } + } + + TSnapshotConstructorAgent(std::shared_ptr<TSnapshotConstructor> owner) + : TBase(&TThis::StateMain) + , Owner(owner) + { + + } + + ~TSnapshotConstructorAgent(); + + void ProvideEvent(IEventBase* event, const TActorId& recipient); +}; + +class TSnapshotConstructor: public NMetadataProvider::TGenericSnapshotParser<TConfigsSnapshot> { +private: + using TNavigate = NSchemeCache::TSchemeCacheNavigate; + using TBaseActor = TActor<TSnapshotConstructor>; + using ISnapshot = NMetadataProvider::ISnapshot; + TVector<TString> Tables; + TMap<TString, ui64> TablesRemapper; + TSnapshotConstructorAgent* Actor = nullptr; + TString TablePath; + mutable std::shared_ptr<TConfigsSnapshot> WaitSnapshot; + mutable NThreading::TPromise<NMetadataProvider::ISnapshot::TPtr> WaitPromise; +protected: + virtual TVector<NMetadataProvider::ITableModifier::TPtr> DoGetTableSchema() const override; + virtual const TVector<TString>& DoGetTables() const override { + return Tables; + } +public: + void ResolveInfo(const TEvTxProxySchemeCache::TEvNavigateKeySetResult* info); + virtual NThreading::TFuture<ISnapshot::TPtr> EnrichSnapshotData(ISnapshot::TPtr original) const override; + + TSnapshotConstructor(); + + void Start(std::shared_ptr<TSnapshotConstructor> ownerPtr) { + Y_VERIFY(!Actor); + Actor = new TSnapshotConstructorAgent(ownerPtr); + TActivationContext::AsActorContext().RegisterWithSameMailbox(Actor); + } + + void ActorStopped() { + Actor = nullptr; + } + + void Stop() { + if (Actor && TlsActivationContext) { + TActivationContext::AsActorContext().Send(Actor->SelfId(), new NActors::TEvents::TEvPoison); + } + } + + ~TSnapshotConstructor() { + } +}; + +} diff --git a/ydb/core/tx/tiering/manager.cpp b/ydb/core/tx/tiering/manager.cpp new file mode 100644 index 00000000000..0364ebc2aac --- /dev/null +++ b/ydb/core/tx/tiering/manager.cpp @@ -0,0 +1,234 @@ +#include "manager.h" +#include "external_data.h" +#include "s3_actor.h" + +#include <ydb/core/tx/columnshard/columnshard_private_events.h> + +namespace NKikimr::NColumnShard { + +class TTiersManager::TActor: public TActorBootstrapped<TTiersManager::TActor> { +private: + std::shared_ptr<TTiersManager> Owner; + TActorId GetExternalDataActorId() const { + return NMetadataProvider::MakeServiceId(SelfId().NodeId()); + } +public: + TActor(std::shared_ptr<TTiersManager> owner) + : Owner(owner) { + + } + ~TActor() { + Owner->Stop(); + } + STATEFN(StateMain) { + switch (ev->GetTypeRewrite()) { + hFunc(NMetadataProvider::TEvRefreshSubscriberData, Handle); + hFunc(NActors::TEvents::TEvPoison, Handle); + default: + break; + } + } + void Bootstrap() { + Become(&TThis::StateMain); + Send(GetExternalDataActorId(), new NMetadataProvider::TEvSubscribeExternal(Owner->GetExternalDataManipulation())); + } + void Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev) { + auto snapshot = ev->Get()->GetSnapshot(); + Owner->TakeConfigs(snapshot); + } + void Handle(NActors::TEvents::TEvPoison::TPtr& /*ev*/) { + Send(GetExternalDataActorId(), new NMetadataProvider::TEvUnsubscribeExternal(Owner->GetExternalDataManipulation())); + PassAway(); + } +}; + +namespace NTiers { + +TManager& TManager::Restart(const TTierConfig& config, const bool activity) { + if (Config.IsSame(config)) { + return *this; + } + if (Config.NeedExport()) { + Stop(); + } + Config = config; + if (activity) { + Start(); + } + return *this; +} + +bool TManager::Stop() { + if (!StorageActorId) { + return true; + } + if (TlsActivationContext) { + TActivationContext::AsActorContext().Send(StorageActorId, new TEvents::TEvPoisonPill()); + } + StorageActorId = {}; + return true; +} + +bool TManager::Start() { + if (!Config.NeedExport()) { + return true; + } + if (!!StorageActorId) { + return true; + } +#ifndef KIKIMR_DISABLE_S3_OPS + auto& ctx = TActivationContext::AsActorContext(); + const NActors::TActorId newActor = ctx.Register( + CreateS3Actor(TabletId, ctx.SelfID, Config.GetTierName()) + ); + ctx.Send(newActor, new TEvPrivate::TEvS3Settings(Config.GetProtoConfig().GetObjectStorage())); + Stop(); + StorageActorId = newActor; +#endif + return true; +} + +TManager::TManager(const ui64 tabletId, const TTierConfig& config) + : TabletId(tabletId) + , Config(config) { +} + +NOlap::TStorageTier TManager::BuildTierStorage() const { + NOlap::TStorageTier result; + result.Name = Config.GetTierName(); + if (Config.GetProtoConfig().HasCompression()) { + result.Compression = ConvertCompression(Config.GetProtoConfig().GetCompression()); + } + return result; +} + +NKikimr::NOlap::TCompression TManager::ConvertCompression(const NKikimrSchemeOp::TCompressionOptions& compression) { + NOlap::TCompression out; + if (compression.HasCompressionCodec()) { + switch (compression.GetCompressionCodec()) { + case NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain: + out.Codec = arrow::Compression::UNCOMPRESSED; + break; + case NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4: + out.Codec = arrow::Compression::LZ4_FRAME; + break; + case NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD: + out.Codec = arrow::Compression::ZSTD; + break; + } + } + + if (compression.HasCompressionLevel()) { + out.Level = compression.GetCompressionLevel(); + } + return out; +} +} + +void TTiersManager::TakeConfigs(NMetadataProvider::ISnapshot::TPtr snapshotExt) { + auto snapshotPtr = std::dynamic_pointer_cast<NTiers::TConfigsSnapshot>(snapshotExt); + Y_VERIFY(snapshotPtr); + Snapshot = snapshotExt; + auto& snapshot = *snapshotPtr; + for (auto itSelf = Managers.begin(); itSelf != Managers.end(); ) { + auto it = snapshot.GetTierConfigs().find(OwnerPath + "." + itSelf->first); + if (it == snapshot.GetTierConfigs().end()) { + itSelf->second.Stop(); + itSelf = Managers.erase(itSelf); + } else { + itSelf->second.Restart(it->second, IsActive()); + ++itSelf; + } + } + for (auto&& i : snapshot.GetTierConfigs()) { + if (i.second.GetOwnerPath() != OwnerPath && !!OwnerPath) { + continue; + } + if (Managers.contains(i.second.GetTierName())) { + continue; + } + NTiers::TManager localManager(TabletId, i.second); + auto& manager = Managers.emplace(i.second.GetTierName(), std::move(localManager)).first->second; + if (IsActive()) { + manager.Start(); + } + } +} + +TActorId TTiersManager::GetStorageActorId(const TString& tierName) { + auto it = Managers.find(tierName); + if (it == Managers.end()) { + ALS_ERROR(NKikimrServices::TX_COLUMNSHARD) << "No S3 actor for tier '" << tierName << "' at tablet " << TabletId; + return {}; + } + auto actorId = it->second.GetStorageActorId(); + if (!actorId) { + ALS_ERROR(NKikimrServices::TX_COLUMNSHARD) << "Not started storage actor for tier '" << tierName << "' at tablet " << TabletId; + return {}; + } + return actorId; +} + +TTiersManager& TTiersManager::Start(std::shared_ptr<TTiersManager> ownerPtr) { + if (ActiveFlag) { + return *this; + } + ActiveFlag = true; + Y_VERIFY(!Actor); + Actor = new TTiersManager::TActor(ownerPtr); + TActivationContext::AsActorContext().RegisterWithSameMailbox(Actor); + for (auto&& i : Managers) { + i.second.Start(); + } + return *this; +} + +TTiersManager& TTiersManager::Stop() { + if (!ActiveFlag) { + return *this; + } + ActiveFlag = false; + Y_VERIFY(!!Actor); + if (TlsActivationContext) { + TActivationContext::AsActorContext().Send(Actor->SelfId(), new NActors::TEvents::TEvPoison); + } + Actor = nullptr; + for (auto&& i : Managers) { + i.second.Stop(); + } + return *this; +} + +const NKikimr::NColumnShard::NTiers::TManager& TTiersManager::GetManagerVerified(const TString& tierId) const { + auto it = Managers.find(tierId); + Y_VERIFY(it != Managers.end()); + return it->second; +} + +NMetadataProvider::ISnapshotParser::TPtr TTiersManager::GetExternalDataManipulation() const { + if (!ExternalDataManipulation) { + ExternalDataManipulation = std::make_shared<NTiers::TSnapshotConstructor>(); + auto edmPtr = std::dynamic_pointer_cast<NTiers::TSnapshotConstructor>(ExternalDataManipulation); + edmPtr->Start(edmPtr); + } + return ExternalDataManipulation; +} + +THashMap<ui64, NKikimr::NOlap::TTiersInfo> TTiersManager::GetTiering() const { + THashMap<ui64, NKikimr::NOlap::TTiersInfo> result; + auto snapshotPtr = std::dynamic_pointer_cast<NTiers::TConfigsSnapshot>(Snapshot); + Y_VERIFY(snapshotPtr); + for (auto&& i : snapshotPtr->GetTableTierings()) { + result.emplace(i.second.GetTablePathId(), i.second.BuildTiersInfo()); + } + return result; +} + +TTiersManager::~TTiersManager() { + auto cs = std::dynamic_pointer_cast<NTiers::TSnapshotConstructor>(ExternalDataManipulation); + if (!!cs) { + cs->Stop(); + } +} + +} diff --git a/ydb/core/tx/tiering/manager.h b/ydb/core/tx/tiering/manager.h new file mode 100644 index 00000000000..4313aefce42 --- /dev/null +++ b/ydb/core/tx/tiering/manager.h @@ -0,0 +1,69 @@ +#pragma once +#include "external_data.h" +#include "tier_config.h" + +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/actor.h> +#include <ydb/services/metadata/service.h> +#include <ydb/library/accessor/accessor.h> + +namespace NKikimr::NColumnShard { +namespace NTiers { + +class TManager { +private: + ui64 TabletId = 0; + YDB_READONLY_DEF(TTierConfig, Config); + YDB_READONLY_DEF(NActors::TActorId, StorageActorId); +public: + TManager(const ui64 tabletId, const TTierConfig& config); + static NOlap::TCompression ConvertCompression(const NKikimrSchemeOp::TCompressionOptions& compression); + NOlap::TStorageTier BuildTierStorage() const; + + TManager& Restart(const TTierConfig& config, const bool activity); + bool NeedExport() const { + return Config.NeedExport(); + } + bool Stop(); + bool Start(); +}; +} + +class TTiersManager { +private: + class TActor; + using TManagers = TMap<TString, NTiers::TManager>; + ui64 TabletId = 0; + TString OwnerPath; + TActor* Actor = nullptr; + YDB_READONLY_DEF(TManagers, Managers); + YDB_READONLY_FLAG(Active, false); + + NMetadataProvider::ISnapshot::TPtr Snapshot; + mutable NMetadataProvider::ISnapshotParser::TPtr ExternalDataManipulation; + +public: + TTiersManager(const ui64 tabletId, const TString& ownerPath) + : TabletId(tabletId) + , OwnerPath(ownerPath) + { + } + ~TTiersManager(); + THashMap<ui64, NOlap::TTiersInfo> GetTiering() const; + void TakeConfigs(NMetadataProvider::ISnapshot::TPtr snapshot); + TTiersManager& Start(std::shared_ptr<TTiersManager> ownerPtr); + TTiersManager& Stop(); + TActorId GetStorageActorId(const TString& tierName); + const NTiers::TManager& GetManagerVerified(const TString& tierId) const; + NMetadataProvider::ISnapshotParser::TPtr GetExternalDataManipulation() const; + + TManagers::const_iterator begin() const { + return Managers.begin(); + } + + TManagers::const_iterator end() const { + return Managers.end(); + } +}; + +} diff --git a/ydb/core/tx/tiering/rule.cpp b/ydb/core/tx/tiering/rule.cpp new file mode 100644 index 00000000000..617c665a346 --- /dev/null +++ b/ydb/core/tx/tiering/rule.cpp @@ -0,0 +1,26 @@ +#include "rule.h" + +namespace NKikimr::NColumnShard::NTiers { +NJson::TJsonValue TTieringRule::GetDebugJson() const { + NJson::TJsonValue result = NJson::JSON_MAP; + result.InsertValue(TDecoder::OwnerPath, OwnerPath); + result.InsertValue(TDecoder::TierName, TierName); + result.InsertValue(TDecoder::TablePath, TablePath); + result.InsertValue("tablePathId", TablePathId); + result.InsertValue(TDecoder::Column, Column); + result.InsertValue(TDecoder::DurationForEvict, DurationForEvict.ToString()); + return result; +} + +NJson::TJsonValue TTableTiering::GetDebugJson() const { + NJson::TJsonValue result = NJson::JSON_MAP; + result.InsertValue(TTieringRule::TDecoder::TablePath, TablePath); + result.InsertValue(TTieringRule::TDecoder::Column, Column); + auto&& jsonRules = result.InsertValue("rules", NJson::JSON_ARRAY); + for (auto&& i : Rules) { + jsonRules.AppendValue(i.GetDebugJson()); + } + return result; +} + +} diff --git a/ydb/core/tx/tiering/rule.h b/ydb/core/tx/tiering/rule.h new file mode 100644 index 00000000000..4481f500925 --- /dev/null +++ b/ydb/core/tx/tiering/rule.h @@ -0,0 +1,113 @@ +#pragma once +#include "decoder.h" + +#include <ydb/core/protos/flat_scheme_op.pb.h> +#include <ydb/core/tx/columnshard/engines/column_engine.h> +#include <ydb/services/metadata/service.h> + +#include <library/cpp/json/writer/json_value.h> + +namespace NKikimr::NColumnShard::NTiers { + +class TTieringRule { +private: + YDB_ACCESSOR_DEF(TString, OwnerPath); + YDB_ACCESSOR_DEF(TString, TierName); + YDB_ACCESSOR_DEF(TString, TablePath); + YDB_ACCESSOR(ui64, TablePathId, 0); + YDB_ACCESSOR_DEF(TString, Column); + YDB_ACCESSOR_DEF(TDuration, DurationForEvict); +public: + bool operator<(const TTieringRule& item) const { + return std::tie(OwnerPath, TierName, TablePath, Column, DurationForEvict) + < std::tie(item.OwnerPath, item.TierName, item.TablePath, item.Column, item.DurationForEvict); + } + + NJson::TJsonValue GetDebugJson() const; + TString GetTierId() const { + return OwnerPath + "." + TierName; + } + class TDecoder: public NInternal::TDecoderBase { + private: + YDB_READONLY(i32, OwnerPathIdx, -1); + YDB_READONLY(i32, TablePathIdx, -1); + YDB_READONLY(i32, DurationForEvictIdx, -1); + YDB_READONLY(i32, TierNameIdx, -1); + YDB_READONLY(i32, ColumnIdx, -1); + public: + static inline const TString OwnerPath = "ownerPath"; + static inline const TString TierName = "tierName"; + static inline const TString TablePath = "tablePath"; + static inline const TString DurationForEvict = "durationForEvict"; + static inline const TString Column = "column"; + + TDecoder(const Ydb::ResultSet& rawData) { + OwnerPathIdx = GetFieldIndex(rawData, OwnerPath); + TierNameIdx = GetFieldIndex(rawData, TierName); + TablePathIdx = GetFieldIndex(rawData, TablePath); + DurationForEvictIdx = GetFieldIndex(rawData, DurationForEvict); + ColumnIdx = GetFieldIndex(rawData, Column); + } + }; + bool DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& r) { + if (!decoder.Read(decoder.GetOwnerPathIdx(), OwnerPath, r)) { + return false; + } + if (!decoder.Read(decoder.GetTierNameIdx(), TierName, r)) { + return false; + } + if (!decoder.Read(decoder.GetTablePathIdx(), TablePath, r)) { + return false; + } + TablePath = TFsPath(TablePath).Fix().GetPath(); + if (!decoder.Read(decoder.GetDurationForEvictIdx(), DurationForEvict, r)) { + return false; + } + if (!decoder.Read(decoder.GetColumnIdx(), Column, r)) { + return false; + } + return true; + } +}; + +class TTableTiering { +private: + YDB_READONLY_DEF(TString, TablePath); + YDB_READONLY(ui64, TablePathId, 0); + YDB_READONLY_DEF(TString, Column); + YDB_READONLY_DEF(TVector<TTieringRule>, Rules); +public: + void SetTablePathId(const ui64 pathId) { + Y_VERIFY(!TablePathId); + TablePathId = pathId; + for (auto&& r : Rules) { + r.SetTablePathId(pathId); + } + } + NJson::TJsonValue GetDebugJson() const; + void AddRule(TTieringRule&& tr) { + if (Rules.size()) { + Y_VERIFY(Rules.back().GetDurationForEvict() <= tr.GetDurationForEvict()); + if (Column != tr.GetColumn()) { + ALS_ERROR(NKikimrServices::TX_COLUMNSHARD) << "inconsistency rule column: " << + TablePath << "/" << Column << " != " << tr.GetColumn(); + return; + } + } else { + Column = tr.GetColumn(); + TablePath = tr.GetTablePath(); + TablePathId = tr.GetTablePathId(); + } + Rules.emplace_back(std::move(tr)); + } + + NOlap::TTiersInfo BuildTiersInfo() const { + NOlap::TTiersInfo result(GetColumn()); + for (auto&& r : Rules) { + result.AddTier(r.GetTierName(), Now() - r.GetDurationForEvict()); + } + return result; + } +}; + +} diff --git a/ydb/core/tx/columnshard/s3_actor.cpp b/ydb/core/tx/tiering/s3_actor.cpp index e75945e8044..f0fc7d80973 100644 --- a/ydb/core/tx/columnshard/s3_actor.cpp +++ b/ydb/core/tx/tiering/s3_actor.cpp @@ -1,7 +1,9 @@ #ifndef KIKIMR_DISABLE_S3_OPS -#include "defs.h" -#include "columnshard_impl.h" +#include <ydb/core/tx/columnshard/blob.h> +#include <ydb/core/tx/columnshard/columnshard.h> +#include <ydb/core/tx/columnshard/columnshard_private_events.h> +#include <ydb/core/tx/columnshard/defs.h> #include <ydb/core/protos/flat_scheme_op.pb.h> #include <ydb/core/wrappers/s3_wrapper.h> diff --git a/ydb/core/tx/tiering/s3_actor.h b/ydb/core/tx/tiering/s3_actor.h new file mode 100644 index 00000000000..74e427d6345 --- /dev/null +++ b/ydb/core/tx/tiering/s3_actor.h @@ -0,0 +1,12 @@ +#pragma once +#ifndef KIKIMR_DISABLE_S3_OPS +#include <library/cpp/actors/core/actorid.h> +#include <library/cpp/actors/core/actor.h> + +namespace NKikimr::NColumnShard { + +IActor* CreateS3Actor(ui64 tabletId, const TActorId& parent, const TString& tierName); + +} + +#endif diff --git a/ydb/core/tx/tiering/tier_config.cpp b/ydb/core/tx/tiering/tier_config.cpp new file mode 100644 index 00000000000..19e5af26bbc --- /dev/null +++ b/ydb/core/tx/tiering/tier_config.cpp @@ -0,0 +1,32 @@ +#include "tier_config.h" +#include <library/cpp/json/writer/json_value.h> +#include <library/cpp/protobuf/json/proto2json.h> + +namespace NKikimr::NColumnShard::NTiers { + +NJson::TJsonValue TTierConfig::GetDebugJson() const { + NJson::TJsonValue result = NJson::JSON_MAP; + result.InsertValue(TDecoder::OwnerPath, OwnerPath); + result.InsertValue(TDecoder::TierName, TierName); + NProtobufJson::Proto2Json(ProtoConfig, result.InsertValue(TDecoder::TierConfig, NJson::JSON_MAP)); + return result; +} + +bool TTierConfig::IsSame(const TTierConfig& item) const { + return TierName == item.TierName && ProtoConfig.SerializeAsString() == item.ProtoConfig.SerializeAsString(); +} + +bool TTierConfig::DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& r) { + if (!decoder.Read(decoder.GetOwnerPathIdx(), OwnerPath, r)) { + return false; + } + if (!decoder.Read(decoder.GetTierNameIdx(), TierName, r)) { + return false; + } + if (!decoder.ReadDebugProto(decoder.GetTierConfigIdx(), ProtoConfig, r)) { + return false; + } + return true; +} + +} diff --git a/ydb/core/tx/tiering/tier_config.h b/ydb/core/tx/tiering/tier_config.h new file mode 100644 index 00000000000..78175ff980a --- /dev/null +++ b/ydb/core/tx/tiering/tier_config.h @@ -0,0 +1,55 @@ +#pragma once +#include "decoder.h" +#include <ydb/services/metadata/service.h> +#include <ydb/core/protos/flat_scheme_op.pb.h> + +#include <library/cpp/json/writer/json_value.h> + +namespace NKikimr::NColumnShard::NTiers { + +class TTierConfig { +private: + using TTierProto = NKikimrSchemeOp::TStorageTierConfig; + using TS3SettingsProto = NKikimrSchemeOp::TS3Settings; + YDB_ACCESSOR_DEF(TString, OwnerPath); + YDB_ACCESSOR_DEF(TString, TierName); + YDB_ACCESSOR_DEF(TTierProto, ProtoConfig); + +public: + TTierConfig() = default; + TTierConfig(const TString& ownerPath, const TString& tierName) + : OwnerPath(ownerPath) + , TierName(tierName) + { + + } + + class TDecoder: public NInternal::TDecoderBase { + private: + YDB_READONLY(i32, OwnerPathIdx, -1); + YDB_READONLY(i32, TierNameIdx, -1); + YDB_READONLY(i32, TierConfigIdx, -1); + public: + static inline const TString OwnerPath = "ownerPath"; + static inline const TString TierName = "tierName"; + static inline const TString TierConfig = "tierConfig"; + TDecoder(const Ydb::ResultSet& rawData) { + OwnerPathIdx = GetFieldIndex(rawData, OwnerPath); + TierNameIdx = GetFieldIndex(rawData, TierName); + TierConfigIdx = GetFieldIndex(rawData, TierConfig); + } + }; + bool DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& r); + + TString GetConfigId() const { + return OwnerPath + "." + TierName; + } + + bool NeedExport() const { + return ProtoConfig.HasObjectStorage(); + } + bool IsSame(const TTierConfig& item) const; + NJson::TJsonValue GetDebugJson() const; +}; + +} diff --git a/ydb/public/api/protos/draft/ydb_logstore.proto b/ydb/public/api/protos/draft/ydb_logstore.proto index 0f42cc5a9aa..db17daad6e7 100644 --- a/ydb/public/api/protos/draft/ydb_logstore.proto +++ b/ydb/public/api/protos/draft/ydb_logstore.proto @@ -69,7 +69,7 @@ message CreateLogStoreRequest { string path = 2; // Full path uint32 shards_count = 3; repeated SchemaPreset schema_presets = 4; - repeated TierConfig tiers = 5; + bool enable_tiering = 6; } message CreateLogStoreResponse { diff --git a/ydb/public/lib/experimental/ydb_logstore.cpp b/ydb/public/lib/experimental/ydb_logstore.cpp index c8b2189da6d..c8c642fe7c5 100644 --- a/ydb/public/lib/experimental/ydb_logstore.cpp +++ b/ydb/public/lib/experimental/ydb_logstore.cpp @@ -104,11 +104,9 @@ void TSchema::SerializeTo(Ydb::LogStore::Schema& schema) const { DefaultCompression.SerializeTo(*schema.mutable_default_compression()); } -TLogStoreDescription::TLogStoreDescription(ui32 shardsCount, const THashMap<TString, TSchema>& schemaPresets, - const THashMap<TString, TTierConfig>& tierConfigs) +TLogStoreDescription::TLogStoreDescription(ui32 shardsCount, const THashMap<TString, TSchema>& schemaPresets) : ShardsCount(shardsCount) , SchemaPresets(schemaPresets) - , TierConfigs(tierConfigs) {} TLogStoreDescription::TLogStoreDescription(Ydb::LogStore::DescribeLogStoreResult&& desc, @@ -123,13 +121,6 @@ TLogStoreDescription::TLogStoreDescription(Ydb::LogStore::DescribeLogStoreResult } PermissionToSchemeEntry(desc.self().permissions(), &Permissions); PermissionToSchemeEntry(desc.self().effective_permissions(), &EffectivePermissions); - for (const auto& tier : desc.tiers()) { - TTierConfig cfg; - if (tier.has_compression()) { - cfg.Compression = CompressionFromProto(tier.compression()); - } - TierConfigs.emplace(tier.name(), std::move(cfg)); - } } void TLogStoreDescription::SerializeTo(Ydb::LogStore::CreateLogStoreRequest& request) const { @@ -139,11 +130,6 @@ void TLogStoreDescription::SerializeTo(Ydb::LogStore::CreateLogStoreRequest& req presetSchema.SerializeTo(*pb.mutable_schema()); } request.set_shards_count(ShardsCount); - for (const auto& [tierName, tierCfg] : TierConfigs) { - auto& pb = *request.add_tiers(); - pb.set_name(tierName); - tierCfg.Compression.SerializeTo(*pb.mutable_compression()); - } } TDescribeLogStoreResult::TDescribeLogStoreResult(TStatus&& status, Ydb::LogStore::DescribeLogStoreResult&& desc, @@ -180,13 +166,6 @@ TLogTableDescription::TLogTableDescription(const TSchema& schema, const TLogTabl , TtlSettings(ttlSettings) {} -TLogTableDescription::TLogTableDescription(const TString& schemaPresetName, const TLogTableSharding& sharding, - const THashMap<TString, TTier>& tiers) - : SchemaPresetName(schemaPresetName) - , Sharding(sharding) - , Tiers(tiers) -{} - TLogTableDescription::TLogTableDescription(Ydb::LogStore::DescribeLogTableResult&& desc, const TDescribeLogTableSettings& describeSettings) : Schema(desc.schema()) diff --git a/ydb/public/lib/experimental/ydb_logstore.h b/ydb/public/lib/experimental/ydb_logstore.h index 2e04dfb5fca..11431673c99 100644 --- a/ydb/public/lib/experimental/ydb_logstore.h +++ b/ydb/public/lib/experimental/ydb_logstore.h @@ -39,14 +39,6 @@ struct TCompression { void SerializeTo(Ydb::LogStore::Compression& compression) const; }; -struct TTierConfig { - TCompression Compression; -}; - -struct TTier { - TTtlSettings Eviction; -}; - struct TCreateLogStoreSettings : public TOperationRequestSettings<TCreateLogStoreSettings> { using TSelf = TCreateLogStoreSettings; }; @@ -117,8 +109,7 @@ private: class TLogStoreDescription { public: - TLogStoreDescription(ui32 shardsCount, const THashMap<TString, TSchema>& schemaPresets, - const THashMap<TString, TTierConfig>& tierConfigs = {}); + TLogStoreDescription(ui32 shardsCount, const THashMap<TString, TSchema>& schemaPresets); TLogStoreDescription(Ydb::LogStore::DescribeLogStoreResult&& desc, const TDescribeLogStoreSettings& describeSettings); void SerializeTo(Ydb::LogStore::CreateLogStoreRequest& request) const; const THashMap<TString, TSchema>& GetSchemaPresets() const { @@ -136,17 +127,12 @@ public: const TVector<NScheme::TPermissions>& GetEffectivePermissions() const { return EffectivePermissions; } - const THashMap<TString, TTierConfig>& GetTierConfigs() const { - return TierConfigs; - } - private: ui32 ShardsCount; THashMap<TString, TSchema> SchemaPresets; TString Owner; TVector<NScheme::TPermissions> Permissions; TVector<NScheme::TPermissions> EffectivePermissions; - THashMap<TString, TTierConfig> TierConfigs; }; struct TLogTableSharding { @@ -171,8 +157,6 @@ public: const TMaybe<TTtlSettings>& ttlSettings = {}); TLogTableDescription(const TSchema& schema, const TLogTableSharding& sharding, const TMaybe<TTtlSettings>& ttlSettings = {}); - TLogTableDescription(const TString& schemaPresetName, const TLogTableSharding& sharding, - const THashMap<TString, TTier>& tiers); TLogTableDescription(Ydb::LogStore::DescribeLogTableResult&& desc, const TDescribeLogTableSettings& describeSettings); void SerializeTo(Ydb::LogStore::CreateLogTableRequest& request) const; const TSchema& GetSchema() const { @@ -204,7 +188,6 @@ private: const TSchema Schema; const TLogTableSharding Sharding; const TMaybe<TTtlSettings> TtlSettings; - THashMap<TString, TTier> Tiers; TString Owner; TVector<NScheme::TPermissions> Permissions; TVector<NScheme::TPermissions> EffectivePermissions; diff --git a/ydb/services/metadata/abstract/common.cpp b/ydb/services/metadata/abstract/common.cpp index eddddb14b11..7662c8de1c1 100644 --- a/ydb/services/metadata/abstract/common.cpp +++ b/ydb/services/metadata/abstract/common.cpp @@ -1,16 +1,22 @@ #include "common.h" +#include <util/string/join.h> namespace NKikimr::NMetadataProvider { -i32 ISnapshot::GetFieldIndex(const Ydb::ResultSet& rawData, const TString& columnId) const { - i32 idx = 0; - for (auto&& i : rawData.columns()) { - if (i.name() == columnId) { - return idx; - } - ++idx; +TString ISnapshotParser::GetSnapshotId() const { + if (!SnapshotId) { + SnapshotId = JoinSeq(",", GetTables()); } - return -1; + return *SnapshotId; +} + +ISnapshot::TPtr ISnapshotParser::ParseSnapshot(const Ydb::Table::ExecuteQueryResult& rawData, const TInstant actuality) const { + ISnapshot::TPtr result = CreateSnapshot(actuality); + Y_VERIFY(result); + if (!result->DeserializeFromResultSet(rawData)) { + return nullptr; + } + return result; } } diff --git a/ydb/services/metadata/abstract/common.h b/ydb/services/metadata/abstract/common.h index 782d9695fbb..7da8f033171 100644 --- a/ydb/services/metadata/abstract/common.h +++ b/ydb/services/metadata/abstract/common.h @@ -15,6 +15,7 @@ namespace NKikimr::NMetadataProvider { enum EEvSubscribe { EvRefreshSubscriberData = EventSpaceBegin(TKikimrEvents::ES_METADATA_PROVIDER), EvRefresh, + EvEnrichSnapshot, EvSubscribeLocal, EvUnsubscribeLocal, EvSubscribeExternal, @@ -28,9 +29,8 @@ class ISnapshot { private: YDB_READONLY_DEF(TInstant, Actuality); protected: - virtual bool DoDeserializeFromResultSet(const Ydb::ResultSet& rawData) = 0; + virtual bool DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawData) = 0; virtual TString DoSerializeToString() const = 0; - i32 GetFieldIndex(const Ydb::ResultSet& rawData, const TString& columnId) const; public: using TPtr = std::shared_ptr<ISnapshot>; ISnapshot(const TInstant actuality) @@ -38,7 +38,7 @@ public: } - bool DeserializeFromResultSet(const Ydb::ResultSet& rawData) { + bool DeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawData) { return DoDeserializeFromResultSet(rawData); } @@ -53,25 +53,23 @@ class ISnapshotParser { protected: virtual ISnapshot::TPtr CreateSnapshot(const TInstant actuality) const = 0; virtual TVector<ITableModifier::TPtr> DoGetTableSchema() const = 0; - virtual const TString& DoGetTablePath() const = 0; + virtual const TVector<TString>& DoGetTables() const = 0; + mutable std::optional<TString> SnapshotId; public: using TPtr = std::shared_ptr<ISnapshotParser>; - ISnapshot::TPtr ParseSnapshot(const Ydb::ResultSet& rawData, const TInstant actuality) const { - ISnapshot::TPtr result = CreateSnapshot(actuality); - Y_VERIFY(result); - if (!result->DeserializeFromResultSet(rawData)) { - return nullptr; - } - return result; - } - + TString GetSnapshotId() const; + ISnapshot::TPtr ParseSnapshot(const Ydb::Table::ExecuteQueryResult& rawData, const TInstant actuality) const; TVector<ITableModifier::TPtr> GetTableSchema() const { return DoGetTableSchema(); } - const TString& GetTablePath() const { - return DoGetTablePath(); + virtual NThreading::TFuture<ISnapshot::TPtr> EnrichSnapshotData(ISnapshot::TPtr original) const { + return NThreading::MakeFuture(original); + } + + const TVector<TString>& GetTables() const { + return DoGetTables(); } virtual ~ISnapshotParser() = default; diff --git a/ydb/services/metadata/ds_table/accessor_refresh.cpp b/ydb/services/metadata/ds_table/accessor_refresh.cpp index 339b5916ee7..80ded24e461 100644 --- a/ydb/services/metadata/ds_table/accessor_refresh.cpp +++ b/ydb/services/metadata/ds_table/accessor_refresh.cpp @@ -9,14 +9,14 @@ namespace NKikimr::NMetadataProvider { using namespace NInternal::NRequest; -bool TDSAccessorRefresher::Handle(TEvRequestResult<TDialogSelect>::TPtr& ev) { +void TDSAccessorRefresher::Handle(TEvRequestResult<TDialogSelect>::TPtr& ev) { const TString startString = CurrentSelection.SerializeAsString(); auto currentFullReply = ev->Get()->GetResult(); Ydb::Table::ExecuteQueryResult qResult; currentFullReply.operation().result().UnpackTo(&qResult); - Y_VERIFY(qResult.result_sets().size() == 1); - CurrentSelection = qResult.result_sets()[0]; + Y_VERIFY((size_t)qResult.result_sets().size() == SnapshotConstructor->GetTables().size()); + *CurrentSelection.mutable_result_sets() = std::move(*qResult.mutable_result_sets()); auto parsedSnapshot = SnapshotConstructor->ParseSnapshot(CurrentSelection, RequestedActuality); if (!!parsedSnapshot) { CurrentSnapshot = parsedSnapshot; @@ -24,14 +24,30 @@ bool TDSAccessorRefresher::Handle(TEvRequestResult<TDialogSelect>::TPtr& ev) { ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "cannot parse current snapshot"; } - RequestedActuality = TInstant::Zero(); - Schedule(Config.GetRefreshPeriod(), new TEvRefresh()); if (!!parsedSnapshot && CurrentSelection.SerializeAsString() != startString) { ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "New refresher data: " << CurrentSelection.DebugString(); + NActors::TActorIdentity actorId = SelfId(); + SnapshotConstructor->EnrichSnapshotData(parsedSnapshot).Subscribe( + [actorId](NThreading::TFuture<ISnapshot::TPtr> f) { + if (f.HasValue() && !f.HasException()) { + actorId.Send(actorId, new TEvEnrichSnapshotResult(f.GetValueSync())); + } else { + actorId.Send(actorId, new TEvEnrichSnapshotResult("cannot enrich snapshot")); + } + } + ); + } +} + +void TDSAccessorRefresher::Handle(TEvEnrichSnapshotResult::TPtr& ev) { + RequestedActuality = TInstant::Zero(); + Schedule(Config.GetRefreshPeriod(), new TEvRefresh()); + if (ev->Get()->IsSuccess()) { + CurrentSnapshot = ev->Get()->GetEnrichedSnapshot(); OnSnapshotModified(); - return true; + } else { + ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "enrich problem: " << ev->Get()->GetErrorText(); } - return false; } void TDSAccessorRefresher::Handle(TEvRequestResult<TDialogCreateSession>::TPtr& ev) { @@ -41,7 +57,14 @@ void TDSAccessorRefresher::Handle(TEvRequestResult<TDialogCreateSession>::TPtr& const TString sessionId = session.session_id(); Y_VERIFY(sessionId); Ydb::Table::ExecuteDataQueryRequest request; - request.mutable_query()->set_yql_text("SELECT * FROM `" + EscapeC(SnapshotConstructor->GetTablePath()) + "`"); + TStringBuilder sb; + auto& tables = SnapshotConstructor->GetTables(); + Y_VERIFY(tables.size()); + for (auto&& i : tables) { + sb << "SELECT * FROM `" + EscapeC(i) + "`;"; + } + + request.mutable_query()->set_yql_text(sb); request.set_session_id(sessionId); request.mutable_tx_control()->mutable_begin_tx()->mutable_snapshot_read_only(); diff --git a/ydb/services/metadata/ds_table/accessor_refresh.h b/ydb/services/metadata/ds_table/accessor_refresh.h index ceb21233648..7d883893a83 100644 --- a/ydb/services/metadata/ds_table/accessor_refresh.h +++ b/ydb/services/metadata/ds_table/accessor_refresh.h @@ -2,6 +2,8 @@ #include <ydb/public/api/protos/ydb_value.pb.h> #include <ydb/services/metadata/abstract/common.h> #include <ydb/services/metadata/initializer/accessor_init.h> +#include <ydb/services/metadata/request/request_actor.h> +#include <library/cpp/actors/core/hfunc.h> namespace NKikimr::NMetadataProvider { @@ -11,12 +13,31 @@ class TEvRefresh: public NActors::TEventLocal<TEvRefresh, EEvSubscribe::EvRefres public: }; +class TEvEnrichSnapshotResult: public NActors::TEventLocal<TEvEnrichSnapshotResult, EEvSubscribe::EvEnrichSnapshot> { +private: + YDB_READONLY_FLAG(Success, false); + YDB_READONLY_DEF(TString, ErrorText); + YDB_READONLY_DEF(ISnapshot::TPtr, EnrichedSnapshot); +public: + TEvEnrichSnapshotResult(const TString& errorText) + : ErrorText(errorText) { + + } + + TEvEnrichSnapshotResult(ISnapshot::TPtr snapshot) + : SuccessFlag(true) + , EnrichedSnapshot(snapshot) + { + + } +}; + class TDSAccessorRefresher: public TDSAccessorInitialized { private: using TBase = TDSAccessorInitialized; ISnapshotParser::TPtr SnapshotConstructor; YDB_READONLY_DEF(ISnapshot::TPtr, CurrentSnapshot); - YDB_READONLY_DEF(Ydb::ResultSet, CurrentSelection); + YDB_READONLY_DEF(Ydb::Table::ExecuteQueryResult, CurrentSelection); TInstant RequestedActuality = TInstant::Zero(); protected: bool IsReady() const { @@ -32,6 +53,7 @@ public: hFunc(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogSelect>, Handle); hFunc(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogCreateSession>, Handle); hFunc(TEvRefresh, Handle); + hFunc(TEvEnrichSnapshotResult, Handle); default: TBase::StateMain(ev, ctx); } @@ -39,7 +61,8 @@ public: TDSAccessorRefresher(const TConfig& config, ISnapshotParser::TPtr snapshotConstructor); - bool Handle(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogSelect>::TPtr& ev); + void Handle(TEvEnrichSnapshotResult::TPtr& ev); + void Handle(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogSelect>::TPtr& ev); void Handle(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogCreateSession>::TPtr& ev); void Handle(TEvRefresh::TPtr& ev); }; diff --git a/ydb/services/metadata/ds_table/service.cpp b/ydb/services/metadata/ds_table/service.cpp index c083afe97e7..d75d258450b 100644 --- a/ydb/services/metadata/ds_table/service.cpp +++ b/ydb/services/metadata/ds_table/service.cpp @@ -15,16 +15,16 @@ IActor* CreateService(const TConfig& config) { } void TService::Handle(TEvSubscribeExternal::TPtr& ev) { - auto it = Accessors.find(ev->Get()->GetSnapshotParser()->GetTablePath()); + auto it = Accessors.find(ev->Get()->GetSnapshotParser()->GetSnapshotId()); if (it == Accessors.end()) { THolder<TExternalData> actor = MakeHolder<TExternalData>(Config, ev->Get()->GetSnapshotParser()); - it = Accessors.emplace(ev->Get()->GetSnapshotParser()->GetTablePath(), Register(actor.Release())).first; + it = Accessors.emplace(ev->Get()->GetSnapshotParser()->GetSnapshotId(), Register(actor.Release())).first; } Send<TEvSubscribe>(it->second, ev->Sender); } void TService::Handle(TEvUnsubscribeExternal::TPtr& ev) { - auto it = Accessors.find(ev->Get()->GetSnapshotParser()->GetTablePath()); + auto it = Accessors.find(ev->Get()->GetSnapshotParser()->GetSnapshotId()); if (it != Accessors.end()) { Send<TEvUnsubscribe>(it->second, ev->Sender); } diff --git a/ydb/services/ydb/ydb_logstore_ut.cpp b/ydb/services/ydb/ydb_logstore_ut.cpp index 3e768dee95f..906fbbad88b 100644 --- a/ydb/services/ydb/ydb_logstore_ut.cpp +++ b/ydb/services/ydb/ydb_logstore_ut.cpp @@ -51,18 +51,6 @@ TVector<TString> TestSchemaKey() { return {"timestamp", "resource_type", "resource_id", "uid"}; } -THashMap<TString, NYdb::NLogStore::TTierConfig> TestTierConfigs() { - using NYdb::NLogStore::TTierConfig; - using NYdb::NLogStore::TCompression; - using NYdb::NLogStore::EColumnCompression; - - THashMap<TString, TTierConfig> out; - out.emplace("default", TTierConfig{ TCompression{ EColumnCompression::LZ4, {}} }); - out.emplace("tier_zstd1", TTierConfig{ TCompression{ EColumnCompression::ZSTD, 1} }); - out.emplace("tier_zstd5", TTierConfig{ TCompression{ EColumnCompression::ZSTD, 5} }); - return out; -} - } Y_UNIT_TEST_SUITE(YdbLogStore) { @@ -139,56 +127,6 @@ Y_UNIT_TEST_SUITE(YdbLogStore) { } } - Y_UNIT_TEST(LogStoreTiers) { - NKikimrConfig::TAppConfig appConfig; - TKikimrWithGrpcAndRootSchema server(appConfig); - EnableDebugLogs(server); - - auto connection = ConnectToServer(server); - NYdb::NLogStore::TLogStoreClient logStoreClient(connection); - - { - NYdb::NLogStore::TSchema logSchema(TestSchemaColumns(), TestSchemaKey(), - NYdb::NLogStore::TCompression{NYdb::NLogStore::EColumnCompression::ZSTD, 1}); - THashMap<TString, NYdb::NLogStore::TSchema> schemaPresets; - schemaPresets["default"] = logSchema; - NYdb::NLogStore::TLogStoreDescription storeDescr(4, schemaPresets, TestTierConfigs()); - auto res = logStoreClient.CreateLogStore("/Root/LogStore", std::move(storeDescr)).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString()); - } - - { - auto res = logStoreClient.DescribeLogStore("/Root/LogStore").GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString()); - auto descr = res.GetDescription(); - UNIT_ASSERT_VALUES_EQUAL(descr.GetShardsCount(), 4); - UNIT_ASSERT_VALUES_EQUAL(descr.GetSchemaPresets().size(), 1); - UNIT_ASSERT_VALUES_EQUAL(descr.GetSchemaPresets().count("default"), 1); - UNIT_ASSERT_VALUES_EQUAL(descr.GetOwner(), "root@builtin"); - - const auto& schema = descr.GetSchemaPresets().begin()->second; - UNIT_ASSERT_VALUES_EQUAL(schema.GetColumns().size(), 10); - UNIT_ASSERT_VALUES_EQUAL(schema.GetPrimaryKeyColumns().size(), 4); - UNIT_ASSERT_EQUAL(schema.GetDefaultCompression().Codec, NYdb::NLogStore::EColumnCompression::ZSTD); - UNIT_ASSERT_VALUES_EQUAL(schema.GetDefaultCompression().Level, 1); - - const auto& tiers = descr.GetTierConfigs(); - auto expectedTiers = TestTierConfigs(); - UNIT_ASSERT_VALUES_EQUAL(tiers.size(), expectedTiers.size()); - - for (auto& [name, cfg] : expectedTiers) { - UNIT_ASSERT_VALUES_EQUAL(tiers.count(name), 1); - UNIT_ASSERT_EQUAL(tiers.find(name)->second.Compression.Codec, cfg.Compression.Codec); - UNIT_ASSERT_VALUES_EQUAL(tiers.find(name)->second.Compression.Level, cfg.Compression.Level); - } - } - - { - auto res = logStoreClient.DropLogStore("/Root/LogStore").GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString()); - } - } - Y_UNIT_TEST(LogStoreNegative) { NKikimrConfig::TAppConfig appConfig; TKikimrWithGrpcAndRootSchema server(appConfig); |