diff options
author | chertus <azuikov@ydb.tech> | 2022-12-13 17:47:30 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2022-12-13 17:47:30 +0300 |
commit | f8bff6b810082c1df1db8097f2d0b3a709eec8e2 (patch) | |
tree | 59c8f865d1c1a66027fe308018492081e580a5eb | |
parent | 2cb87340968a3d324fa25a7fc117993457021309 (diff) | |
download | ydb-f8bff6b810082c1df1db8097f2d0b3a709eec8e2.tar.gz |
update tiering
29 files changed, 687 insertions, 474 deletions
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 0226e5eb363..91018709a17 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -472,6 +472,10 @@ message TColumnDataLifeCycle { message TStorageTiering { optional string UseTiering = 1; + // If set remote data by TTL in addition to the last tier's border + optional TTtl Ttl = 2; + // If set keep data in default storage (out of tiers) till this border + //optional TTtl KeepTime = 3; TODO } message TDisabled { diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index 2aff76c3873..6a6af07b799 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -229,12 +229,21 @@ void TColumnShard::UpdateIndexCounters() { SetCounter(COUNTER_INACTIVE_ROWS, stats.Inactive.Rows); SetCounter(COUNTER_INACTIVE_BYTES, stats.Inactive.Bytes); SetCounter(COUNTER_INACTIVE_RAW_BYTES, stats.Inactive.RawBytes); - SetCounter(COUNTER_EVICTED_PORTIONS, stats.Evicted.Portions); SetCounter(COUNTER_EVICTED_BLOBS, stats.Evicted.Blobs); SetCounter(COUNTER_EVICTED_ROWS, stats.Evicted.Rows); SetCounter(COUNTER_EVICTED_BYTES, stats.Evicted.Bytes); SetCounter(COUNTER_EVICTED_RAW_BYTES, stats.Evicted.RawBytes); + + LOG_S_DEBUG("Index: tables " << stats.Tables + << " granules " << stats.Granules << " (empty " << stats.EmptyGranules << " overloaded " << stats.OverloadedGranules << ")" + << " inserted " << stats.Inserted.Portions << "/" << stats.Inserted.Blobs << "/" << stats.Inserted.Rows + << " compacted " << stats.Compacted.Portions << "/" << stats.Compacted.Blobs << "/" << stats.Compacted.Rows + << " s-compacted " << stats.SplitCompacted.Portions << "/" << stats.SplitCompacted.Blobs << "/" << stats.SplitCompacted.Rows + << " inactive " << stats.Inactive.Portions << "/" << stats.Inactive.Blobs << "/" << stats.Inactive.Rows + << " evicted " << stats.Evicted.Portions << "/" << stats.Evicted.Blobs << "/" << stats.Evicted.Rows + << " column records " << stats.ColumnRecords << " meta bytes " << stats.ColumnMetadataBytes + << " at tablet " << TabletID()); } ui64 TColumnShard::MemoryUsage() const { diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp index 15ee07d9e55..c714dcd3536 100644 --- a/ydb/core/tx/columnshard/columnshard__init.cpp +++ b/ydb/core/tx/columnshard/columnshard__init.cpp @@ -234,7 +234,12 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx) Y_VERIFY(info.ParseFromString(rowset.GetValue<Schema::TableVersionInfo::InfoProto>())); if (!Self->PathsToDrop.count(pathId)) { - ttls[pathId].emplace(version, TTtl::TDescription(info.GetTtlSettings())); + auto& ttlSettings = info.GetTtlSettings(); + if (ttlSettings.HasEnabled()) { + ttls[pathId].emplace(version, TTtl::TDescription(ttlSettings.GetEnabled())); + } else if (ttlSettings.HasTiering() && ttlSettings.GetTiering().HasTtl()) { + ttls[pathId].emplace(version, TTtl::TDescription(ttlSettings.GetTiering().GetTtl())); + } } if (!rowset.Next()) diff --git a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp index 7e9490dbad4..2ec98a0b3c8 100644 --- a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp +++ b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp @@ -200,7 +200,7 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex } // If no paths trigger schema defined TTL - THashMap<ui64, NOlap::TTiersInfo> pathTtls; + THashMap<ui64, NOlap::TTiering> pathTtls; if (!ttlBody.GetPathIds().empty()) { auto unixTime = TInstant::Seconds(ttlBody.GetUnixTimeSeconds()); if (!unixTime) { @@ -217,7 +217,7 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex } for (ui64 pathId : ttlBody.GetPathIds()) { - pathTtls.emplace(pathId, NOlap::TTiersInfo(columnName, unixTime)); + pathTtls.emplace(pathId, NOlap::TTiering::MakeTtl(unixTime, columnName)); } } @@ -228,6 +228,8 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex ctx.Send(Self->SelfId(), event->TxEvent.release()); } status = NKikimrTxColumnShard::EResultStatus::SUCCESS; + } else { + statusMessage = "TTL not started"; } break; @@ -270,9 +272,10 @@ void TTxProposeTransaction::Complete(const TActorContext& ctx) { void TColumnShard::Handle(TEvColumnShard::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx) { auto& record = Proto(ev->Get()); - ui32 txKind = record.GetTxKind(); + auto txKind = record.GetTxKind(); ui64 txId = record.GetTxId(); - LOG_S_DEBUG("ProposeTransaction kind " << txKind << " txId " << txId << " at tablet " << TabletID()); + LOG_S_DEBUG("ProposeTransaction " << NKikimrTxColumnShard::ETransactionKind_Name(txKind) + << " txId " << txId << " at tablet " << TabletID()); Execute(new TTxProposeTransaction(this, ev), ctx); } diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp index e6c88ef4cd9..7d5a298107e 100644 --- a/ydb/core/tx/columnshard/columnshard__write_index.cpp +++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp @@ -189,7 +189,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) continue; } } - LOG_S_DEBUG("Delete evicting blob '" << blobId.ToStringNew() << "' at tablet " << Self->TabletID()); + LOG_S_TRACE("Delete blob '" << blobId.ToStringNew() << "' at tablet " << Self->TabletID()); Self->BlobManager->DeleteBlob(blobId, blobManagerDb); Self->IncCounter(COUNTER_BLOBS_ERASED); Self->IncCounter(COUNTER_BYTES_ERASED, blobId.BlobSize()); @@ -199,6 +199,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) // DS to S3 eviction. Keep source blob in DS till EEvictState::EXTERN state. continue; } + LOG_S_TRACE("Delete evicted blob '" << blobId.ToStringNew() << "' at tablet " << Self->TabletID()); Self->BlobManager->DeleteBlob(blobId, blobManagerDb); Self->IncCounter(COUNTER_BLOBS_ERASED); Self->IncCounter(COUNTER_BYTES_ERASED, blobId.BlobSize()); diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index dc3e40a879a..8bbb86d9b6c 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -473,7 +473,9 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl const ui64 pathId = tableProto.GetPathId(); if (!Tables.contains(pathId)) { - LOG_S_DEBUG("EnsureTable for pathId: " << pathId << " at tablet " << TabletID()); + LOG_S_DEBUG("EnsureTable for pathId: " << pathId + << " ttl settings: " << tableProto.GetTtlSettings() + << " at tablet " << TabletID()); ui32 schemaPresetId = 0; if (tableProto.HasSchemaPreset()) { @@ -506,11 +508,16 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl *tableVerProto.MutableTtlSettings() = tableProto.GetTtlSettings(); auto& ttlInfo = tableProto.GetTtlSettings(); if (ttlInfo.HasEnabled()) { - Ttl.SetPathTtl(pathId, TTtl::TDescription(ttlInfo)); + Ttl.SetPathTtl(pathId, TTtl::TDescription(ttlInfo.GetEnabled())); SetCounter(COUNTER_TABLE_TTLS, Ttl.PathsCount()); } else if (ttlInfo.HasTiering()) { - table.TieringUsage = ttlInfo.GetTiering().GetUseTiering(); + auto& tiering = ttlInfo.GetTiering(); + table.TieringUsage = tiering.GetUseTiering(); ActivateTiering(pathId, table.TieringUsage); + if (tiering.HasTtl()) { + Ttl.SetPathTtl(pathId, TTtl::TDescription(tiering.GetTtl())); + SetCounter(COUNTER_TABLE_TTLS, Ttl.PathsCount()); + } } } @@ -543,8 +550,11 @@ void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterP auto* tablePtr = Tables.FindPtr(pathId); Y_VERIFY(tablePtr && !tablePtr->IsDropped(), "AlterTable on a dropped or non-existent table"); auto& table = *tablePtr; + auto& ttlSettings = alterProto.GetTtlSettings(); - LOG_S_DEBUG("AlterTable for pathId: " << pathId << " at tablet " << TabletID()); + LOG_S_DEBUG("AlterTable for pathId: " << pathId + << " ttl settings: " << ttlSettings + << " at tablet " << TabletID()); Y_VERIFY(!alterProto.HasSchema(), "Tables with explicit schema are not supported"); auto& info = table.Versions[version]; @@ -553,11 +563,17 @@ void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterP info.SetSchemaPresetId(EnsureSchemaPreset(db, alterProto.GetSchemaPreset(), version)); } - const TString& tieringUsage = alterProto.GetTtlSettings().GetTiering().GetUseTiering(); + const TString& tieringUsage = ttlSettings.GetTiering().GetUseTiering(); ActivateTiering(pathId, tieringUsage); - if (alterProto.HasTtlSettings() && alterProto.GetTtlSettings().HasEnabled()) { - *info.MutableTtlSettings() = alterProto.GetTtlSettings(); - Ttl.SetPathTtl(pathId, TTtl::TDescription(alterProto.GetTtlSettings())); + if (alterProto.HasTtlSettings()) { + *info.MutableTtlSettings() = ttlSettings; + if (ttlSettings.HasEnabled()) { + Ttl.SetPathTtl(pathId, TTtl::TDescription(ttlSettings.GetEnabled())); + } else if (ttlSettings.HasTiering() && ttlSettings.GetTiering().HasTtl()) { + Ttl.SetPathTtl(pathId, TTtl::TDescription( ttlSettings.GetTiering().GetTtl())); + } else { + Ttl.DropPathTtl(pathId); + } } else { Ttl.DropPathTtl(pathId); } @@ -786,8 +802,13 @@ std::unique_ptr<TEvPrivate::TEvIndexing> TColumnShard::SetupIndexation() { return {}; } + auto actualIndexInfo = PrimaryIndex->GetIndexInfo(); + if (Tiers) { + actualIndexInfo.SetTiering(Tiers->GetTiering()); // TODO: pathIds + } + ActiveIndexingOrCompaction = true; - auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(GetActualIndexInfo(), indexChanges, + auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, Settings.CacheDataAfterIndexing, std::move(cachedBlobs)); return std::make_unique<TEvPrivate::TEvIndexing>(std::move(ev)); } @@ -824,14 +845,19 @@ std::unique_ptr<TEvPrivate::TEvCompaction> TColumnShard::SetupCompaction() { return {}; } + auto actualIndexInfo = PrimaryIndex->GetIndexInfo(); + if (Tiers) { + actualIndexInfo.SetTiering(Tiers->GetTiering()); // TODO: pathIds + } + ActiveIndexingOrCompaction = true; - auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(GetActualIndexInfo(), indexChanges, + auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, Settings.CacheDataAfterCompaction); return std::make_unique<TEvPrivate::TEvCompaction>(std::move(ev), *BlobManager); } -std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiersInfo>& pathTtls, - bool force) { +std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls, + bool force) { if (ActiveTtl) { LOG_S_DEBUG("TTL already in progress at tablet " << TabletID()); return {}; @@ -841,28 +867,27 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u return {}; } - THashMap<ui64, NOlap::TTiersInfo> regularTtls = pathTtls; - if (regularTtls.empty()) { - regularTtls = Ttl.MakeIndexTtlMap(TInstant::Now(), force); - } - const bool tiersUsage = regularTtls.empty() && Tiers; - if (tiersUsage) { - regularTtls = Tiers->GetTiering(); + THashMap<ui64, NOlap::TTiering> eviction = pathTtls; + if (eviction.empty()) { + if (Tiers) { + eviction = Tiers->GetTiering(); // TODO: pathIds + } + Ttl.AddTtls(eviction, TInstant::Now(), force); } - if (regularTtls.empty()) { + if (eviction.empty()) { LOG_S_TRACE("TTL not started. No tables to activate it on (or delayed) at tablet " << TabletID()); return {}; - } else { - for (auto&& i : regularTtls) { - LOG_S_DEBUG(i.first << "/" << i.second.GetDebugString() << ";tablet=" << TabletID()); - } } LOG_S_DEBUG("Prepare TTL at tablet " << TabletID()); + for (auto&& i : eviction) { + LOG_S_DEBUG("Evicting path " << i.first << " with " << i.second.GetDebugString() << " at tablet " << TabletID()); + } + std::shared_ptr<NOlap::TColumnEngineChanges> indexChanges; - indexChanges = PrimaryIndex->StartTtl(regularTtls); + indexChanges = PrimaryIndex->StartTtl(eviction); if (!indexChanges) { LOG_S_NOTICE("Cannot prepare TTL at tablet " << TabletID()); @@ -874,8 +899,11 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u bool needWrites = !indexChanges->PortionsToEvict.empty(); + auto actualIndexInfo = PrimaryIndex->GetIndexInfo(); + actualIndexInfo.SetTiering(std::move(eviction)); + ActiveTtl = true; - auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(GetActualIndexInfo(tiersUsage), indexChanges, false); + auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, false); return std::make_unique<TEvPrivate::TEvEviction>(std::move(ev), *BlobManager, needWrites); } @@ -925,7 +953,14 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() { return {}; } - auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(GetActualIndexInfo(), changes, false); + auto actualIndexInfo = PrimaryIndex->GetIndexInfo(); +#if 0 // No need for now + if (Tiers) { + actualIndexInfo.SetTiering(Tiers->GetTiering()); + } +#endif + + auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), changes, false); ev->PutStatus = NKikimrProto::OK; // No new blobs to write ActiveCleanup = true; @@ -953,7 +988,7 @@ NOlap::TIndexInfo TColumnShard::ConvertSchema(const NKikimrSchemeOp::TColumnTabl } if (schema.HasDefaultCompression()) { - NOlap::TCompression compression = NTiers::TManager::ConvertCompression(schema.GetDefaultCompression()); + NOlap::TCompression compression = NTiers::ConvertCompression(schema.GetDefaultCompression()); indexInfo.SetDefaultCompression(compression); } @@ -1053,16 +1088,6 @@ void TColumnShard::Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& Tiers->TakeConfigs(ev->Get()->GetSnapshot(), nullptr); } -NOlap::TIndexInfo TColumnShard::GetActualIndexInfo(const bool tiersUsage) const { - auto indexInfo = PrimaryIndex->GetIndexInfo(); - if (tiersUsage && Tiers) { - for (auto&& i : *Tiers) { - indexInfo.AddStorageTier(i.second.BuildTierStorage()); - } - } - return indexInfo; -} - void TColumnShard::ActivateTiering(const ui64 pathId, const TString& useTiering) { if (!Tiers) { Tiers = std::make_shared<TTiersManager>(TabletID(), SelfId()); diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 01dbe644c5b..dfbd8700d22 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -127,7 +127,7 @@ class TColumnShard void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr& ev, const TActorContext& ctx); void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev); void Handle(NTiers::TEvTiersManagerReadyForUsage::TPtr& ev); - + ITransaction* CreateTxInitSchema(); ITransaction* CreateTxRunGc(); @@ -178,9 +178,8 @@ class TColumnShard TabletCounters->Percentile()[counter].IncrementFor(latency.MicroSeconds()); } - NOlap::TIndexInfo GetActualIndexInfo(const bool tiersUsage = true) const; - void ActivateTiering(const ui64 pathId, const TString& useTiering); + protected: STFUNC(StateInit) { TRACE_EVENT(NKikimrServices::TX_COLUMNSHARD); @@ -463,7 +462,7 @@ private: std::unique_ptr<TEvPrivate::TEvIndexing> SetupIndexation(); std::unique_ptr<TEvPrivate::TEvCompaction> SetupCompaction(); - std::unique_ptr<TEvPrivate::TEvEviction> SetupTtl(const THashMap<ui64, NOlap::TTiersInfo>& pathTtls = {}, + std::unique_ptr<TEvPrivate::TEvEviction> SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls = {}, bool force = false); std::unique_ptr<TEvPrivate::TEvWriteIndex> SetupCleanup(); diff --git a/ydb/core/tx/columnshard/columnshard_ttl.h b/ydb/core/tx/columnshard/columnshard_ttl.h index aae50f3e7ee..5846fad4a4a 100644 --- a/ydb/core/tx/columnshard/columnshard_ttl.h +++ b/ydb/core/tx/columnshard/columnshard_ttl.h @@ -9,33 +9,20 @@ public: static constexpr const ui64 DEFAULT_REPEAT_TTL_TIMEOUT_SEC = 10; struct TEviction { - TString TierName; TDuration EvictAfter; + TString ColumnName; }; struct TDescription { - TString ColumnName; - std::vector<TEviction> Evictions; + std::optional<TEviction> Eviction; TDescription() = default; - TDescription(const NKikimrSchemeOp::TColumnDataLifeCycle& ttl) { - if (ttl.HasEnabled()) { - auto& enabled = ttl.GetEnabled(); - ColumnName = enabled.GetColumnName(); - auto expireSec = TDuration::Seconds(enabled.GetExpireAfterSeconds()); + TDescription(const NKikimrSchemeOp::TColumnDataLifeCycle::TTtl& ttl) { + auto expireSec = TDuration::Seconds(ttl.GetExpireAfterSeconds()); - Evictions.reserve(1); - Evictions.emplace_back(TEviction{{}, expireSec}); - } - - if (Enabled()) { - Y_VERIFY(!ColumnName.empty()); - } - } - - bool Enabled() const { - return !Evictions.empty(); + Eviction = TEviction{expireSec, ttl.GetColumnName()}; + Y_VERIFY(!Eviction->ColumnName.empty()); } }; @@ -44,12 +31,13 @@ public: } void SetPathTtl(ui64 pathId, TDescription&& descr) { - if (descr.Enabled()) { - auto it = Columns.find(descr.ColumnName); + if (descr.Eviction) { + auto& evict = descr.Eviction; + auto it = Columns.find(evict->ColumnName); if (it != Columns.end()) { - descr.ColumnName = *it; // replace string dups (memory efficiency) + evict->ColumnName = *it; // replace string dups (memory efficiency) } else { - Columns.insert(descr.ColumnName); + Columns.insert(evict->ColumnName); } PathTtls[pathId] = descr; } else { @@ -61,18 +49,16 @@ public: PathTtls.erase(pathId); } - THashMap<ui64, NOlap::TTiersInfo> MakeIndexTtlMap(TInstant now, bool force = false) { + void AddTtls(THashMap<ui64, NOlap::TTiering>& eviction, TInstant now, bool force = false) { if ((now < LastRegularTtl + TtlTimeout) && !force) { - return {}; + return; } - THashMap<ui64, NOlap::TTiersInfo> out; for (auto& [pathId, descr] : PathTtls) { - out.emplace(pathId, Convert(descr, now)); + eviction[pathId].Ttl = Convert(descr, now); } LastRegularTtl = now; - return out; } void Repeat() { @@ -89,16 +75,13 @@ private: TDuration RepeatTtlTimeout{TDuration::Seconds(DEFAULT_REPEAT_TTL_TIMEOUT_SEC)}; TInstant LastRegularTtl; - NOlap::TTiersInfo Convert(const TDescription& descr, TInstant timePoint) const { - Y_VERIFY(descr.Enabled()); - NOlap::TTiersInfo out(descr.ColumnName); - - for (auto& tier : descr.Evictions) { - auto border = timePoint - tier.EvictAfter; - out.AddTier(tier.TierName, border); + std::shared_ptr<NOlap::TTierInfo> Convert(const TDescription& descr, TInstant timePoint) const { + if (descr.Eviction) { + auto& evict = descr.Eviction; + auto border = timePoint - evict->EvictAfter; + return NOlap::TTierInfo::MakeTtl(border, evict->ColumnName); } - - return out; + return {}; } }; diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/columnshard_ut_common.cpp index 21d31c7f1ae..9701ff55a1c 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp +++ b/ydb/core/tx/columnshard/columnshard_ut_common.cpp @@ -300,9 +300,9 @@ NMetadata::NFetcher::ISnapshot::TPtr TTestSchema::BuildSnapshot(const TTableSpec tRule.SetTieringRuleId("Tiering1"); for (auto&& tier : specials.Tiers) { if (!tRule.GetDefaultColumn()) { - tRule.SetDefaultColumn(tier.GetTtlColumn()); + tRule.SetDefaultColumn(tier.TtlColumn); } - Y_VERIFY(tRule.GetDefaultColumn() == tier.GetTtlColumn()); + Y_VERIFY(tRule.GetDefaultColumn() == tier.TtlColumn); { NColumnShard::NTiers::TTierConfig tConfig; tConfig.SetTierName(tier.Name); @@ -319,7 +319,7 @@ NMetadata::NFetcher::ISnapshot::TPtr TTestSchema::BuildSnapshot(const TTableSpec } cs->MutableTierConfigs().emplace(tConfig.GetTierName(), tConfig); } - tRule.AddInterval(tier.Name, TDuration::Seconds(tier.GetEvictAfterSecondsUnsafe())); + tRule.AddInterval(tier.Name, TDuration::Seconds((*tier.EvictAfter).Seconds())); } cs->MutableTableTierings().emplace(tRule.GetTieringRuleId(), tRule); return cs; diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h index 8a8b1873d6c..210881dd1f9 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.h +++ b/ydb/core/tx/columnshard/columnshard_ut_common.h @@ -28,9 +28,8 @@ struct TTestSchema { static inline const TString DefaultTtlColumn = "saved_at"; struct TStorageTier { - YDB_ACCESSOR(TString, TtlColumn, DefaultTtlColumn); - YDB_OPT(ui32, EvictAfterSeconds); - public: + TString TtlColumn = DefaultTtlColumn; + std::optional<TDuration> EvictAfter; TString Name; TString Codec; std::optional<int> CompressionLevel; @@ -38,7 +37,6 @@ struct TTestSchema { TStorageTier(const TString& name = {}) : Name(name) - {} NKikimrSchemeOp::EColumnCodec GetCodecId() const { @@ -63,6 +61,11 @@ struct TTestSchema { } return *this; } + + TStorageTier& SetTtlColumn(const TString& columnName) { + TtlColumn = columnName; + return *this; + } }; struct TTableSpecials : public TStorageTier { @@ -73,7 +76,7 @@ struct TTestSchema { } bool HasTtl() const { - return !HasTiers() && HasEvictAfterSeconds(); + return EvictAfter.has_value(); } TTableSpecials WithCodec(const TString& codec) { @@ -81,6 +84,11 @@ struct TTestSchema { out.SetCodec(codec); return out; } + + TTableSpecials& SetTtl(std::optional<TDuration> ttl) { + EvictAfter = ttl; + return *this; + } }; static auto YdbSchema(const std::pair<TString, TTypeInfo>& firstKeyItem = {"timestamp", TTypeInfo(NTypeIds::Timestamp) }) { @@ -197,16 +205,26 @@ struct TTestSchema { } } - static void InitTtl(const TTableSpecials& specials, NKikimrSchemeOp::TColumnDataLifeCycle* ttlSettings) { - ttlSettings->SetVersion(1); - auto* enable = ttlSettings->MutableEnabled(); - enable->SetColumnName(specials.GetTtlColumn()); - enable->SetExpireAfterSeconds(specials.GetEvictAfterSecondsUnsafe()); + static void InitTtl(const TTableSpecials& specials, NKikimrSchemeOp::TColumnDataLifeCycle::TTtl* ttl) { + Y_VERIFY(specials.HasTtl()); + Y_VERIFY(!specials.TtlColumn.empty()); + ttl->SetColumnName(specials.TtlColumn); + ttl->SetExpireAfterSeconds((*specials.EvictAfter).Seconds()); } - static void InitTiers(const TTableSpecials& specials, NKikimrSchemeOp::TColumnDataLifeCycle* ttlSettings) { - Y_VERIFY(specials.HasTiers()); - ttlSettings->MutableTiering()->SetUseTiering("Tiering1"); + static bool InitTiersAndTtl(const TTableSpecials& specials, NKikimrSchemeOp::TColumnDataLifeCycle* ttlSettings) { + ttlSettings->SetVersion(1); + if (specials.HasTiers()) { + ttlSettings->MutableTiering()->SetUseTiering("Tiering1"); + if (specials.HasTtl()) { + InitTtl(specials, ttlSettings->MutableTiering()->MutableTtl()); + } + return true; + } else if (specials.HasTtl()) { + InitTtl(specials, ttlSettings->MutableEnabled()); + return true; + } + return false; } static TString CreateTableTxBody(ui64 pathId, const TVector<std::pair<TString, TTypeInfo>>& columns, @@ -225,11 +243,9 @@ struct TTestSchema { InitSchema(columns, pk, specials, preset->MutableSchema()); } - if (specials.HasTtl()) { - InitTtl(specials, table->MutableTtlSettings()); - } else if (specials.HasTiers()) { - InitTiers(specials, table->MutableTtlSettings()); - } + InitTiersAndTtl(specials, table->MutableTtlSettings()); + + Cerr << "CreateTable: " << tx << "\n"; TString out; Y_PROTOBUF_SUPPRESS_NODISCARD tx.SerializeToString(&out); @@ -245,11 +261,9 @@ struct TTestSchema { table->SetPathId(pathId); InitSchema(columns, pk, specials, table->MutableSchema()); - if (specials.HasTtl()) { - InitTtl(specials, table->MutableTtlSettings()); - } else if (specials.HasTiers()) { - InitTiers(specials, table->MutableTtlSettings()); - } + InitTiersAndTtl(specials, table->MutableTtlSettings()); + + Cerr << "CreateInitShard: " << tx << "\n"; TString out; Y_PROTOBUF_SUPPRESS_NODISCARD tx.SerializeToString(&out); @@ -264,10 +278,9 @@ struct TTestSchema { table->SetPathId(pathId); InitSchema(columns, pk, specials, table->MutableSchema()); + InitTiersAndTtl(specials, table->MutableTtlSettings()); - if (specials.HasTtl()) { - InitTtl(specials, table->MutableTtlSettings()); - } + Cerr << "CreateStandaloneTable: " << tx << "\n"; TString out; Y_PROTOBUF_SUPPRESS_NODISCARD tx.SerializeToString(&out); @@ -281,18 +294,12 @@ struct TTestSchema { tx.MutableSeqNo()->SetRound(version); auto* ttlSettings = table->MutableTtlSettings(); - ttlSettings->SetVersion(version); - - if (specials.HasTtl()) { - auto* enable = ttlSettings->MutableEnabled(); - enable->SetColumnName(specials.GetTtlColumn()); - enable->SetExpireAfterSeconds(specials.GetEvictAfterSecondsUnsafe()); - } else if (specials.HasTiers()) { - ttlSettings->MutableTiering()->SetUseTiering("Tiering1"); - } else { + if (!InitTiersAndTtl(specials, ttlSettings)) { ttlSettings->MutableDisabled(); } + Cerr << "AlterTable: " << tx << "\n"; + TString out; Y_PROTOBUF_SUPPRESS_NODISCARD tx.SerializeToString(&out); return out; diff --git a/ydb/core/tx/columnshard/compaction_actor.cpp b/ydb/core/tx/columnshard/compaction_actor.cpp index 3ef17bac055..8b5c25b3349 100644 --- a/ydb/core/tx/columnshard/compaction_actor.cpp +++ b/ydb/core/tx/columnshard/compaction_actor.cpp @@ -60,7 +60,8 @@ public: Y_VERIFY(blobData.size() == blobId.Size, "%u vs %u", (ui32)blobData.size(), blobId.Size); Blobs[blobId] = blobData; } else { - LOG_S_ERROR("TEvReadBlobRangeResult cannot get blob " << blobId.ToString() << " status " << event.Status + LOG_S_ERROR("TEvReadBlobRangeResult cannot get blob " << blobId.ToString() + << " status " << NKikimrProto::EReplyStatus_Name(event.Status) << " at tablet " << TabletId << " (compaction)"); TxEvent->PutStatus = event.Status; if (TxEvent->PutStatus == NKikimrProto::UNKNOWN) { diff --git a/ydb/core/tx/columnshard/engines/column_engine.cpp b/ydb/core/tx/columnshard/engines/column_engine.cpp index 905a57f27f1..d24839efc8a 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine.cpp @@ -1,18 +1,6 @@ #include "column_engine.h" #include <util/stream/output.h> -namespace NKikimr::NOlap { - -TString TTiersInfo::GetDebugString() const { - TStringBuilder sb; - sb << "column=" << Column << ";"; - for (auto&& i : TierBorders) { - sb << "tname=" << i.TierName << ";eborder=" << i.EvictBorder << ";"; - } - return sb; -} -} - template <> void Out<NKikimr::NOlap::TColumnEngineChanges>(IOutputStream& out, TTypeTraits<NKikimr::NOlap::TColumnEngineChanges>::TFuncParam changes) { if (ui32 switched = changes.SwitchedPortions.size()) { diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index 603b49f2cd5..f7e9260dfe3 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -1,5 +1,6 @@ #pragma once #include "defs.h" +#include "tier_info.h" #include "index_info.h" #include "portion_info.h" #include "db_wrapper.h" @@ -59,56 +60,15 @@ struct TCompactionInfo { } }; -struct TTiersInfo { - struct TTierTimeBorder { - TString TierName; - TInstant EvictBorder; - - TTierTimeBorder(TString tierName, TInstant evictBorder) - : TierName(tierName) - , EvictBorder(evictBorder) - {} - - std::shared_ptr<arrow::Scalar> ToTimestamp() const { - if (Scalar) { - return Scalar; - } - - Scalar = std::make_shared<arrow::TimestampScalar>( - EvictBorder.MicroSeconds(), arrow::timestamp(arrow::TimeUnit::MICRO)); - return Scalar; - } - - private: - mutable std::shared_ptr<arrow::Scalar> Scalar; - }; - - TString Column; - std::vector<TTierTimeBorder> TierBorders; // Ordered tiers from hottest to coldest - - TString GetDebugString() const; - - TTiersInfo(const TString& column, TInstant border = {}, const TString& tierName = {}) - : Column(column) - { - if (border) { - AddTier(tierName, border); - } - } - - void AddTier(const TString& tierName, TInstant border) { - TierBorders.emplace_back(TTierTimeBorder(tierName, border)); - } -}; - struct TPortionEvictionFeatures { const TString TargetTierName; const ui64 PathId; // portion path id for cold-storage-key construct + bool DataChanges = true; + TPortionEvictionFeatures(const TString& targetTierName, const ui64 pathId) : TargetTierName(targetTierName) - , PathId(pathId) { - - } + , PathId(pathId) + {} }; class TColumnEngineChanges { @@ -361,7 +321,7 @@ public: const TSnapshot& outdatedSnapshot) = 0; virtual std::shared_ptr<TColumnEngineChanges> StartCleanup(const TSnapshot& snapshot, THashSet<ui64>& pathsToDrop) = 0; - virtual std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiersInfo>& pathTtls, + virtual std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiering>& pathEviction, ui64 maxBytesToEvict = TCompactionLimits::DEFAULT_EVICTION_BYTES) = 0; virtual bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) = 0; virtual void UpdateDefaultSchema(const TSnapshot& snapshot, TIndexInfo&& info) = 0; diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 60da8ba39b0..ea93b8b39a8 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -62,16 +62,19 @@ std::shared_ptr<arrow::RecordBatch> AddSpecials(const std::shared_ptr<arrow::Rec return NArrow::ExtractColumns(batch, indexInfo.ArrowSchemaWithSpecials()); } -bool UpdateEvictedPortion(TPortionInfo& portionInfo, const TIndexInfo& indexInfo, const TString& tierName, - const THashMap<TBlobRange, TString>& srcBlobs, +bool UpdateEvictedPortion(TPortionInfo& portionInfo, const TIndexInfo& indexInfo, + TPortionEvictionFeatures& evictFeatures, const THashMap<TBlobRange, TString>& srcBlobs, TVector<TColumnRecord>& evictedRecords, TVector<TString>& newBlobs) { - Y_VERIFY(portionInfo.TierName != tierName); + Y_VERIFY(portionInfo.TierName != evictFeatures.TargetTierName); - auto compression = indexInfo.GetTierCompression(tierName); + auto* tiering = indexInfo.GetTiering(evictFeatures.PathId); + Y_VERIFY(tiering); + auto compression = tiering->GetCompression(evictFeatures.TargetTierName); if (!compression) { - // Noting to recompress. We have no other kinds of evictions yet. Return. - portionInfo.TierName = tierName; + // Noting to recompress. We have no other kinds of evictions yet. + portionInfo.TierName = evictFeatures.TargetTierName; + evictFeatures.DataChanges = false; return true; } @@ -82,7 +85,6 @@ bool UpdateEvictedPortion(TPortionInfo& portionInfo, const TIndexInfo& indexInfo TPortionInfo undo = portionInfo; size_t undoSize = newBlobs.size(); - std::vector<TString> blobs; for (auto& rec : portionInfo.Records) { auto colName = indexInfo.GetColumnName(rec.ColumnId); std::string name(colName.data(), colName.size()); @@ -102,11 +104,11 @@ bool UpdateEvictedPortion(TPortionInfo& portionInfo, const TIndexInfo& indexInfo evictedRecords.emplace_back(std::move(rec)); } - portionInfo.AddMetadata(indexInfo, batch, tierName); + portionInfo.AddMetadata(indexInfo, batch, evictFeatures.TargetTierName); return true; } -TVector<TPortionInfo> MakeAppendedPortions(const TIndexInfo& indexInfo, +TVector<TPortionInfo> MakeAppendedPortions(ui64 pathId, const TIndexInfo& indexInfo, std::shared_ptr<arrow::RecordBatch> batch, ui64 granule, const TSnapshot& minSnapshot, @@ -115,12 +117,17 @@ TVector<TPortionInfo> MakeAppendedPortions(const TIndexInfo& indexInfo, auto schema = indexInfo.ArrowSchemaWithSpecials(); TVector<TPortionInfo> out; - TString tierName = indexInfo.GetTierName(0); - auto compression = indexInfo.GetTierCompression(0); - if (!compression) { - compression = indexInfo.GetDefaultCompression(); + TString tierName; + TCompression compression = indexInfo.GetDefaultCompression(); + if (pathId) { + if (auto* tiering = indexInfo.GetTiering(pathId)) { + tierName = tiering->GetHottestTierName(); + if (auto tierCompression = tiering->GetCompression(tierName)) { + compression = *tierCompression; + } + } } - auto writeOptions = WriteOptions(*compression); + auto writeOptions = WriteOptions(compression); std::shared_ptr<arrow::RecordBatch> portionBatch = batch; for (i32 pos = 0; pos < batch->num_rows();) { @@ -780,9 +787,9 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T return changes; } -std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THashMap<ui64, TTiersInfo>& pathTtls, +std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THashMap<ui64, TTiering>& pathEviction, ui64 maxEvictBytes) { - if (pathTtls.empty()) { + if (pathEviction.empty()) { return {}; } @@ -793,18 +800,18 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash ui64 dropBlobs = 0; bool allowDrop = true; - for (auto& [pathId, ttl] : pathTtls) { + for (auto& [pathId, ttl] : pathEviction) { if (!PathGranules.count(pathId)) { continue; // It's not an error: allow TTL over multiple shards with different pathIds presented } - if (!IndexInfo.AllowTtlOverColumn(ttl.Column)) { - continue; - } + auto expireTimestamp = ttl.ExpireTimestamp(); + Y_VERIFY(expireTimestamp); - Y_VERIFY(!ttl.TierBorders.empty()); + auto ttlColumnNames = ttl.GetTtlColumns(); + Y_VERIFY(ttlColumnNames.size() == 1); // TODO: support different ttl columns + ui32 ttlColumnId = IndexInfo.GetColumnId(*ttlColumnNames.begin()); - ui32 ttlColumnId = IndexInfo.GetColumnId(ttl.Column); for (const auto& [ts, granule] : PathGranules[pathId]) { auto spg = Granules[granule]; Y_VERIFY(spg); @@ -818,19 +825,27 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash allowDrop = (dropBlobs <= TCompactionLimits::MAX_BLOBS_TO_DELETE); if (auto max = info.MaxValue(ttlColumnId)) { - bool keep = false; - for (auto& border : ttl.TierBorders) { - if (NArrow::ScalarLess(*border.ToTimestamp(), *max)) { - keep = true; - if (allowEviction && info.TierName != border.TierName) { - evicttionSize += info.BlobsSizes().first; - changes->PortionsToEvict.emplace_back(info, TPortionEvictionFeatures(border.TierName, pathId)); + bool keep = NArrow::ScalarLess(expireTimestamp, max); + + if (keep && allowEviction) { + TString tierName; + for (auto& tierRef : ttl.OrderedTiers) { // TODO: lower/upper_bound + move into TEviction + auto& tierInfo = tierRef.Get(); + if (!IndexInfo.AllowTtlOverColumn(tierInfo.Column)) { + continue; // Ignore tiers with bad ttl column + } + if (NArrow::ScalarLess(tierInfo.EvictTimestamp(), max)) { + tierName = tierInfo.Name; + } else { + break; } - break; + } + if (info.TierName != tierName) { + evicttionSize += info.BlobsSizes().first; + changes->PortionsToEvict.emplace_back(info, TPortionEvictionFeatures(tierName, pathId)); } } if (!keep && allowDrop) { - Y_VERIFY(!NArrow::ScalarLess(*ttl.TierBorders.back().ToTimestamp(), *max)); dropBlobs += info.NumRecords(); changes->PortionsToDrop.push_back(info); } @@ -1537,7 +1552,7 @@ TVector<TString> TColumnEngineForLogs::IndexBlobs(const TIndexInfo& indexInfo, auto granuleBatches = SliceIntoGranules(merged, changes->PathToGranule[pathId], indexInfo); for (auto& [granule, batch] : granuleBatches) { - auto portions = MakeAppendedPortions(indexInfo, batch, granule, minSnapshot, blobs); + auto portions = MakeAppendedPortions(pathId, indexInfo, batch, granule, minSnapshot, blobs); Y_VERIFY(portions.size() > 0); for (auto& portion : portions) { changes->AppendedPortions.emplace_back(std::move(portion)); @@ -1572,6 +1587,7 @@ static std::shared_ptr<arrow::RecordBatch> CompactInOneGranule(const TIndexInfo& static TVector<TString> CompactInGranule(const TIndexInfo& indexInfo, std::shared_ptr<TColumnEngineForLogs::TChanges> changes) { + ui64 pathId = changes->SrcGranule->PathId; TVector<TString> blobs; auto& switchedProtions = changes->SwitchedPortions; Y_VERIFY(switchedProtions.size()); @@ -1589,13 +1605,13 @@ static TVector<TString> CompactInGranule(const TIndexInfo& indexInfo, if (!slice || slice->num_rows() == 0) { continue; } - auto tmp = MakeAppendedPortions(indexInfo, slice, granule, TSnapshot{}, blobs); + auto tmp = MakeAppendedPortions(pathId, indexInfo, slice, granule, TSnapshot{}, blobs); for (auto&& portionInfo : tmp) { portions.emplace_back(std::move(portionInfo)); } } } else { - portions = MakeAppendedPortions(indexInfo, batch, granule, TSnapshot{}, blobs); + portions = MakeAppendedPortions(pathId, indexInfo, batch, granule, TSnapshot{}, blobs); } Y_VERIFY(portions.size() > 0); @@ -1909,7 +1925,7 @@ static TVector<TString> CompactSplitGranule(const TIndexInfo& indexInfo, for (auto& batch : idBatches[id]) { // Cannot set snapshot here. It would be set in committing transaction in ApplyChanges(). - auto newPortions = MakeAppendedPortions(indexInfo, batch, tmpGranule, TSnapshot{}, blobs); + auto newPortions = MakeAppendedPortions(pathId, indexInfo, batch, tmpGranule, TSnapshot{}, blobs); Y_VERIFY(newPortions.size() > 0); for (auto& portion : newPortions) { changes->AppendedPortions.emplace_back(std::move(portion)); @@ -1925,7 +1941,7 @@ static TVector<TString> CompactSplitGranule(const TIndexInfo& indexInfo, ui64 tmpGranule = changes->SetTmpGranule(pathId, ts); // Cannot set snapshot here. It would be set in committing transaction in ApplyChanges(). - auto portions = MakeAppendedPortions(indexInfo, batch, tmpGranule, TSnapshot{}, blobs); + auto portions = MakeAppendedPortions(pathId, indexInfo, batch, tmpGranule, TSnapshot{}, blobs); Y_VERIFY(portions.size() > 0); for (auto& portion : portions) { changes->AppendedPortions.emplace_back(std::move(portion)); @@ -1963,13 +1979,14 @@ TVector<TString> TColumnEngineForLogs::EvictBlobs(const TIndexInfo& indexInfo, TVector<std::pair<TPortionInfo, TPortionEvictionFeatures>> evicted; evicted.reserve(changes->PortionsToEvict.size()); - for (auto& [portionInfo, evFeatures] : changes->PortionsToEvict) { + for (auto& [portionInfo, evictFeatures] : changes->PortionsToEvict) { Y_VERIFY(!portionInfo.Empty()); Y_VERIFY(portionInfo.IsActive()); - if (UpdateEvictedPortion(portionInfo, indexInfo, evFeatures.TargetTierName, changes->Blobs, changes->EvictedRecords, newBlobs)) { - Y_VERIFY(portionInfo.TierName == evFeatures.TargetTierName); - evicted.emplace_back(std::move(portionInfo), evFeatures); + if (UpdateEvictedPortion(portionInfo, indexInfo, evictFeatures, changes->Blobs, + changes->EvictedRecords, newBlobs)) { + Y_VERIFY(portionInfo.TierName == evictFeatures.TargetTierName); + evicted.emplace_back(std::move(portionInfo), evictFeatures); } } diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index a09710ad213..7e328c4050b 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -247,7 +247,7 @@ public: const TSnapshot& outdatedSnapshot) override; std::shared_ptr<TColumnEngineChanges> StartCleanup(const TSnapshot& snapshot, THashSet<ui64>& pathsToDrop) override; - std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiersInfo>& pathTtls, + std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiering>& pathEviction, ui64 maxEvictBytes = TCompactionLimits::DEFAULT_EVICTION_BYTES) override; bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges, const TSnapshot& snapshot) override; diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h index 3828bb946d0..68855e43c6e 100644 --- a/ydb/core/tx/columnshard/engines/index_info.h +++ b/ydb/core/tx/columnshard/engines/index_info.h @@ -1,6 +1,7 @@ #pragma once #include "defs.h" #include "scalars.h" +#include "tier_info.h" #include <ydb/core/tablet_flat/flat_dbase_scheme.h> #include <ydb/core/sys_view/common/schema.h> @@ -50,16 +51,6 @@ GetColumns(const NTable::TScheme::TTableSchema& tableSchema, const TVector<ui32> struct TInsertedData; -struct TCompression { - arrow::Compression::type Codec{arrow::Compression::LZ4_FRAME}; - std::optional<int> Level; -}; - -struct TStorageTier { - TString Name; - std::optional<TCompression> Compression; -}; - /// Column engine index description in terms of tablet's local table. /// We have to use YDB types for keys here. struct TIndexInfo : public NTable::TScheme::TTableSchema { @@ -193,41 +184,16 @@ struct TIndexInfo : public NTable::TScheme::TTableSchema { void SetDefaultCompression(const TCompression& compression) { DefaultCompression = compression; } const TCompression& GetDefaultCompression() const { return DefaultCompression; } - std::optional<TCompression> GetTierCompression(ui32 tierNo) const { - if (!Tiers.empty()) { - Y_VERIFY(tierNo < Tiers.size()); - return Tiers[tierNo].Compression; - } - return {}; - } - - std::optional<TCompression> GetTierCompression(const TString& tierName) const { - if (tierName.empty()) { - return {}; - } - ui32 tierNo = GetTierNumber(tierName); - Y_VERIFY(tierNo != Max<ui32>()); - return GetTierCompression(tierNo); - } - - TString GetTierName(ui32 tierNo) const { - if (!Tiers.empty()) { - Y_VERIFY(tierNo < Tiers.size()); - return Tiers[tierNo].Name; - } - return {}; - } - - void AddStorageTier(TStorageTier&& tier) { - TierByName[tier.Name] = Tiers.size(); - Tiers.emplace_back(std::move(tier)); + void SetTiering(THashMap<ui64, TTiering>&& pathTierings) { + PathTiering = std::move(pathTierings); } - ui32 GetTierNumber(const TString& tierName) const { - if (auto it = TierByName.find(tierName); it != TierByName.end()) { - return it->second; + const TTiering* GetTiering(ui64 pathId) const { + auto it = PathTiering.find(pathId); + if (it != PathTiering.end()) { + return &it->second; } - return Max<ui32>(); + return nullptr; } private: @@ -242,8 +208,7 @@ private: THashSet<TString> RequiredColumns; THashSet<ui32> MinMaxIdxColumnsIds; TCompression DefaultCompression; - std::vector<TStorageTier> Tiers; - THashMap<TString, ui32> TierByName; + THashMap<ui64, TTiering> PathTiering; void AddRequiredColumns(const TVector<TString>& columns) { for (auto& name: columns) { diff --git a/ydb/core/tx/columnshard/engines/tier_info.h b/ydb/core/tx/columnshard/engines/tier_info.h new file mode 100644 index 00000000000..6c80fd0215d --- /dev/null +++ b/ydb/core/tx/columnshard/engines/tier_info.h @@ -0,0 +1,157 @@ +#pragma once +#include "defs.h" +#include "scalars.h" + +namespace NKikimr::NOlap { + +struct TCompression { + arrow::Compression::type Codec{arrow::Compression::LZ4_FRAME}; + std::optional<int> Level; +}; + +struct TTierInfo { + TString Name; + TInstant EvictBorder; + TString Column; + std::optional<TCompression> Compression; + + TTierInfo(const TString& tierName, TInstant evictBorder, const TString& column) + : Name(tierName) + , EvictBorder(evictBorder) + , Column(column) + { + Y_VERIFY(!Name.empty()); + Y_VERIFY(!Column.empty()); + } + + std::shared_ptr<arrow::Scalar> EvictTimestamp() const { + if (Scalar) { + return Scalar; + } + + Scalar = std::make_shared<arrow::TimestampScalar>( + EvictBorder.MicroSeconds(), arrow::timestamp(arrow::TimeUnit::MICRO)); + return Scalar; + } + + static std::shared_ptr<TTierInfo> MakeTtl(TInstant ttlBorder, const TString& ttlColumn) { + return std::make_shared<TTierInfo>("TTL", ttlBorder, ttlColumn); + } + + TString GetDebugString() const { + TStringBuilder sb; + sb << "tier name '" << Name << "' border '" << EvictBorder << "' column '" << Column << "' " + << arrow::util::Codec::GetCodecAsString(Compression ? Compression->Codec : TCompression().Codec) + << ":" << ((Compression && Compression->Level) ? + *Compression->Level : arrow::util::kUseDefaultCompressionLevel); + return sb; + } + +private: + mutable std::shared_ptr<arrow::Scalar> Scalar; +}; + +struct TTierRef { + TTierRef(const std::shared_ptr<TTierInfo>& tierInfo) + : Info(tierInfo) + { + Y_VERIFY(tierInfo); + } + + bool operator < (const TTierRef& b) const { + if (Info->EvictBorder < b.Info->EvictBorder) { + return true; + } else if (Info->EvictBorder == b.Info->EvictBorder) { + return Info->Name > b.Info->Name; // add stability: smaller name is hotter + } + return false; + } + + bool operator == (const TTierRef& b) const { + return Info->EvictBorder == b.Info->EvictBorder + && Info->Name == b.Info->Name; + } + + const TTierInfo& Get() const { + return *Info; + } + +private: + std::shared_ptr<TTierInfo> Info; +}; + +struct TTiering { + THashMap<TString, std::shared_ptr<TTierInfo>> TierByName; + TSet<TTierRef> OrderedTiers; // Tiers ordered by border + std::shared_ptr<TTierInfo> Ttl; + + static TTiering MakeTtl(TInstant ttlBorder, const TString& ttlColumn) { + TTiering out; + out.Ttl = TTierInfo::MakeTtl(ttlBorder, ttlColumn); + return out; + } + + bool Empty() const { + return OrderedTiers.empty(); + } + + void Add(const std::shared_ptr<TTierInfo>& tier) { + if (!Empty()) { + Y_VERIFY(tier->Column == OrderedTiers.begin()->Get().Column); // TODO: support different ttl columns + } + + TierByName.emplace(tier->Name, tier); + OrderedTiers.emplace(tier); + } + + TString GetHottestTierName() const { + if (OrderedTiers.size()) { + return OrderedTiers.rbegin()->Get().Name; // hottest one + } + return {}; + } + + std::shared_ptr<arrow::Scalar> ExpireTimestamp() const { + auto ttlTs = Ttl ? Ttl->EvictTimestamp() : nullptr; + auto tierTs = OrderedTiers.empty() ? nullptr : OrderedTiers.begin()->Get().EvictTimestamp(); + if (!ttlTs) { + return tierTs; + } else if (!tierTs) { + return ttlTs; + } + return NArrow::ScalarLess(ttlTs, tierTs) ? tierTs : ttlTs; // either TTL or tier border appear + } + + std::optional<TCompression> GetCompression(const TString& name) const { + auto it = TierByName.find(name); + if (it != TierByName.end()) { + Y_VERIFY(!name.empty()); + return it->second->Compression; + } + return {}; + } + + THashSet<TString> GetTtlColumns() const { + THashSet<TString> out; + if (Ttl) { + out.insert(Ttl->Column); + } + for (auto& [tierName, tier] : TierByName) { + out.insert(tier->Column); + } + return out; + } + + TString GetDebugString() const { + TStringBuilder sb; + if (Ttl) { + sb << "ttl border '" << Ttl->EvictBorder << "' column '" << Ttl->Column << "'; "; + } + for (auto&& i : OrderedTiers) { + sb << i.Get().GetDebugString() << "; "; + } + return sb; + } +}; + +} diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp index 48286575d0a..b5e6188cd4f 100644 --- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp @@ -192,10 +192,6 @@ TIndexInfo TestTableInfo(const TVector<std::pair<TString, TTypeInfo>>& ydbSchema return indexInfo; } -static NOlap::TTiersInfo MakeTtl(TInstant border) { - return NOlap::TTiersInfo("timestamp", border); -} - template <typename TKeyDataType> class TBuilder { public: @@ -346,8 +342,8 @@ bool Cleanup(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, u } bool Ttl(TColumnEngineForLogs& engine, TTestDbWrapper& db, - const THashMap<ui64, NOlap::TTiersInfo>& pathTtls, ui32 expectedToDrop) { - std::shared_ptr<TColumnEngineChanges> changes = engine.StartTtl(pathTtls); + const THashMap<ui64, NOlap::TTiering>& pathEviction, ui32 expectedToDrop) { + std::shared_ptr<TColumnEngineChanges> changes = engine.StartTtl(pathEviction); UNIT_ASSERT(changes); UNIT_ASSERT_VALUES_EQUAL(changes->PortionsToDrop.size(), expectedToDrop); @@ -708,8 +704,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { } // TTL - THashMap<ui64, NOlap::TTiersInfo> pathTtls; - pathTtls.emplace(pathId, MakeTtl(TInstant::MicroSeconds(10000))); + THashMap<ui64, NOlap::TTiering> pathTtls; + pathTtls.emplace(pathId, TTiering::MakeTtl(TInstant::MicroSeconds(10000), "timestamp")); Ttl(engine, db, pathTtls, 2); // read + load + read diff --git a/ydb/core/tx/columnshard/eviction_actor.cpp b/ydb/core/tx/columnshard/eviction_actor.cpp index 4cdef39b0b7..a7affb58703 100644 --- a/ydb/core/tx/columnshard/eviction_actor.cpp +++ b/ydb/core/tx/columnshard/eviction_actor.cpp @@ -58,7 +58,8 @@ public: Y_VERIFY(blobData.size() == blobId.Size, "%u vs %u", (ui32)blobData.size(), blobId.Size); Blobs[blobId] = blobData; } else { - LOG_S_ERROR("TEvReadBlobRangeResult cannot get blob " << blobId.ToString() << " status " << event.Status + LOG_S_ERROR("TEvReadBlobRangeResult cannot get blob " << blobId.ToString() + << " status " << NKikimrProto::EReplyStatus_Name(event.Status) << " at tablet " << TabletId << " (eviction)"); TxEvent->PutStatus = event.Status; if (TxEvent->PutStatus == NKikimrProto::UNKNOWN) { diff --git a/ydb/core/tx/columnshard/export_actor.cpp b/ydb/core/tx/columnshard/export_actor.cpp index 237819b98ee..acdb680d268 100644 --- a/ydb/core/tx/columnshard/export_actor.cpp +++ b/ydb/core/tx/columnshard/export_actor.cpp @@ -27,7 +27,8 @@ public: auto& event = *ev->Get(); const TUnifiedBlobId& blobId = event.BlobRange.BlobId; if (event.Status != NKikimrProto::EReplyStatus::OK) { - LOG_S_ERROR("TEvReadBlobRangeResult cannot get blob " << blobId << " status " << event.Status + LOG_S_ERROR("TEvReadBlobRangeResult cannot get blob " << blobId + << " status " << NKikimrProto::EReplyStatus_Name(event.Status) << " at tablet " << TabletId << " (export)"); BlobsToRead.erase(blobId); diff --git a/ydb/core/tx/columnshard/indexing_actor.cpp b/ydb/core/tx/columnshard/indexing_actor.cpp index d8fe6da5b1d..d385d008dce 100644 --- a/ydb/core/tx/columnshard/indexing_actor.cpp +++ b/ydb/core/tx/columnshard/indexing_actor.cpp @@ -51,7 +51,8 @@ public: auto& event = *ev->Get(); const TUnifiedBlobId& blobId = event.BlobRange.BlobId; if (event.Status != NKikimrProto::EReplyStatus::OK) { - LOG_S_ERROR("TEvReadBlobRangeResult cannot get blob " << blobId << " status " << event.Status + LOG_S_ERROR("TEvReadBlobRangeResult cannot get blob " << blobId + << " status " << NKikimrProto::EReplyStatus_Name(event.Status) << " at tablet " << TabletId << " (index)"); BlobsToRead.erase(blobId); diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp index 607d770a54e..2193632cad8 100644 --- a/ydb/core/tx/columnshard/read_actor.cpp +++ b/ydb/core/tx/columnshard/read_actor.cpp @@ -35,6 +35,15 @@ public: auto& event = *ev->Get(); const TUnifiedBlobId& blobId = event.BlobRange.BlobId; + + if (event.Status != NKikimrProto::EReplyStatus::OK) { + LOG_S_ERROR("TEvReadBlobRangeResult cannot get blob " << blobId + << " status " << NKikimrProto::EReplyStatus_Name(event.Status) + << " at tablet " << TabletId << " (read)"); + SendErrorResult(ctx, NKikimrTxColumnShard::EResultStatus::ERROR); + return DieFinished(ctx); + } + Y_VERIFY(event.Data.size() == event.BlobRange.Size, "%zu, %d", event.Data.size(), event.BlobRange.Size); if (IndexedBlobs.count(event.BlobRange)) { @@ -78,6 +87,9 @@ public: void SendErrorResult(const TActorContext& ctx, NKikimrTxColumnShard::EResultStatus status) { Y_VERIFY(status != NKikimrTxColumnShard::EResultStatus::SUCCESS); SendResult(ctx, {}, true, status); + + WaitIndexed.clear(); + WaitCommitted.clear(); } void SendResult(const TActorContext& ctx, const std::shared_ptr<arrow::RecordBatch>& batch, bool finished = false, @@ -88,6 +100,10 @@ public: TString data; if (batch) { data = NArrow::SerializeBatchNoCompression(batch); + + auto metadata = proto.MutableMeta(); + metadata->SetFormat(NKikimrTxColumnShard::FORMAT_ARROW); + metadata->SetSchema(GetSerializedSchema(batch)); } if (status == NKikimrTxColumnShard::EResultStatus::SUCCESS) { @@ -100,21 +116,18 @@ public: proto.SetFinished(finished); ++ReturnedBatchNo; - auto metadata = proto.MutableMeta(); - metadata->SetFormat(NKikimrTxColumnShard::FORMAT_ARROW); - metadata->SetSchema(GetSerializedSchema(batch)); if (finished) { auto stats = ReadMetadata->ReadStats; - auto* proto = metadata->MutableReadStats(); - proto->SetBeginTimestamp(stats->BeginTimestamp.MicroSeconds()); - proto->SetDurationUsec(stats->Duration().MicroSeconds()); - proto->SetSelectedIndex(stats->SelectedIndex); - proto->SetIndexGranules(stats->IndexGranules); - proto->SetIndexPortions(stats->IndexPortions); - proto->SetIndexBatches(stats->IndexBatches); - proto->SetNotIndexedBatches(stats->CommittedBatches); - proto->SetUsedColumns(stats->UsedColumns); - proto->SetDataBytes(stats->DataBytes); + auto* protoStats = proto.MutableMeta()->MutableReadStats(); + protoStats->SetBeginTimestamp(stats->BeginTimestamp.MicroSeconds()); + protoStats->SetDurationUsec(stats->Duration().MicroSeconds()); + protoStats->SetSelectedIndex(stats->SelectedIndex); + protoStats->SetIndexGranules(stats->IndexGranules); + protoStats->SetIndexPortions(stats->IndexPortions); + protoStats->SetIndexBatches(stats->IndexBatches); + protoStats->SetNotIndexedBatches(stats->CommittedBatches); + protoStats->SetUsedColumns(stats->UsedColumns); + protoStats->SetDataBytes(stats->DataBytes); } if (Deadline != TInstant::Max()) { @@ -197,9 +210,6 @@ public: void SendTimeouts(const TActorContext& ctx) { SendErrorResult(ctx, NKikimrTxColumnShard::EResultStatus::TIMEOUT); - - WaitCommitted.clear(); - IndexedBlobs.clear(); } void SendReadRequest(const TActorContext& ctx, const NBlobCache::TBlobRange& blobRange) { diff --git a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp index 1d28938a97f..a7e564f7ec8 100644 --- a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp @@ -17,7 +17,7 @@ using namespace NTxUT; using namespace NColumnShard; using NWrappers::NTestHelpers::TS3Mock; -enum class EStartTtlSettings { +enum class EInitialEviction { None, Ttl, Tiering @@ -61,10 +61,10 @@ bool TriggerTTL(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot s return (res.GetStatus() == NKikimrTxColumnShard::SUCCESS); } -std::shared_ptr<arrow::Array> GetFirstPKColumn(const TString& blob, const TString& srtSchema, - const std::string& columnName) +std::shared_ptr<arrow::Array> DeserializeColumn(const TString& blob, const TString& strSchema, + const std::string& columnName) { - auto schema = NArrow::DeserializeSchema(srtSchema); + auto schema = NArrow::DeserializeSchema(strSchema); auto batch = NArrow::DeserializeBatch(blob, schema); UNIT_ASSERT(batch); @@ -73,12 +73,12 @@ std::shared_ptr<arrow::Array> GetFirstPKColumn(const TString& blob, const TStrin return array; } -bool CheckSame(const TString& blob, const TString& srtSchema, ui32 expectedSize, +bool CheckSame(const TString& blob, const TString& strSchema, ui32 expectedSize, const std::string& columnName, i64 seconds) { auto expected = arrow::TimestampScalar(seconds * 1000 * 1000, arrow::timestamp(arrow::TimeUnit::MICRO)); UNIT_ASSERT_VALUES_EQUAL(expected.value, seconds * 1000 * 1000); - auto tsCol = GetFirstPKColumn(blob, srtSchema, columnName); + auto tsCol = DeserializeColumn(blob, strSchema, columnName); UNIT_ASSERT(tsCol); UNIT_ASSERT_VALUES_EQUAL(tsCol->length(), expectedSize); @@ -130,6 +130,8 @@ bool TestCreateTable(const TString& txBody, ui64 planStep = 1000, ui64 txId = 10 return ProposeSchemaTx(runtime, sender, txBody, {++planStep, ++txId}); } +static constexpr ui32 PORTION_ROWS = 80 * 1000; + // ts[0] = 1600000000; // date -u --date='@1600000000' Sun Sep 13 12:26:40 UTC 2020 // ts[1] = 1620000000; // date -u --date='@1620000000' Mon May 3 00:00:00 UTC 2021 void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, @@ -162,9 +164,10 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, ttlSec -= (ts[0] + ts[1]) / 2; // enable internal ttl between ts1 and ts2 } if (spec.HasTiers()) { - spec.Tiers[0].SetEvictAfterSeconds(ttlSec); + spec.Tiers[0].EvictAfter = TDuration::Seconds(ttlSec); } else { - spec.SetEvictAfterSeconds(ttlSec); + UNIT_ASSERT(!spec.TtlColumn.empty()); + spec.EvictAfter = TDuration::Seconds(ttlSec); } bool ok = ProposeSchemaTx(runtime, sender, TTestSchema::CreateInitShardTxBody(tableId, testYdbSchema, testYdbPk, spec, "/Root/olapStore"), @@ -176,8 +179,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, } // - ui32 portionSize = 80 * 1000; - auto blobs = MakeData(ts, portionSize, portionSize / 2, spec.GetTtlColumn()); + auto blobs = MakeData(ts, PORTION_ROWS, PORTION_ROWS / 2, spec.TtlColumn); UNIT_ASSERT_EQUAL(blobs.size(), 2); for (auto& data : blobs) { UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, data)); @@ -192,9 +194,9 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, } if (internal) { - TriggerTTL(runtime, sender, {++planStep, ++txId}, {}, 0, spec.GetTtlColumn()); + TriggerTTL(runtime, sender, {++planStep, ++txId}, {}, 0, spec.TtlColumn); } else { - TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[0] + 1, spec.GetTtlColumn()); + TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[0] + 1, spec.TtlColumn); } TAutoPtr<IEventHandle> handle; @@ -206,7 +208,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, { --planStep; auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, Max<ui64>(), tableId); - Proto(read.get()).AddColumnNames(spec.GetTtlColumn()); + Proto(read.get()).AddColumnNames(spec.TtlColumn); ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release()); auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); @@ -216,20 +218,20 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard); UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); - UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0); + UNIT_ASSERT_VALUES_EQUAL(resRead.GetBatch(), 0); UNIT_ASSERT_EQUAL(resRead.GetFinished(), true); UNIT_ASSERT(resRead.GetData().size() > 0); auto& schema = resRead.GetMeta().GetSchema(); - UNIT_ASSERT(CheckSame(resRead.GetData(), schema, portionSize, spec.GetTtlColumn(), ts[1])); + UNIT_ASSERT(CheckSame(resRead.GetData(), schema, PORTION_ROWS, spec.TtlColumn, ts[1])); } // Alter TTL ttlSec = TInstant::Now().Seconds() - (ts[1] + 1); if (spec.HasTiers()) { - spec.Tiers[0].SetEvictAfterSeconds(ttlSec); + spec.Tiers[0].EvictAfter = TDuration::Seconds(ttlSec); } else { - spec.SetEvictAfterSeconds(ttlSec); + spec.EvictAfter = TDuration::Seconds(ttlSec); } ok = ProposeSchemaTx(runtime, sender, TTestSchema::AlterTableTxBody(tableId, 2, spec), @@ -241,15 +243,15 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, } if (internal) { - TriggerTTL(runtime, sender, {++planStep, ++txId}, {}, 0, spec.GetTtlColumn()); + TriggerTTL(runtime, sender, {++planStep, ++txId}, {}, 0, spec.TtlColumn); } else { - TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[1] + 1, spec.GetTtlColumn()); + TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[1] + 1, spec.TtlColumn); } { --planStep; auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, Max<ui64>(), tableId); - Proto(read.get()).AddColumnNames(spec.GetTtlColumn()); + Proto(read.get()).AddColumnNames(spec.TtlColumn); ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release()); auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); @@ -259,9 +261,9 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard); UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); - UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0); + UNIT_ASSERT_VALUES_EQUAL(resRead.GetBatch(), 0); UNIT_ASSERT_EQUAL(resRead.GetFinished(), true); - UNIT_ASSERT_EQUAL(resRead.GetData().size(), 0); + UNIT_ASSERT_VALUES_EQUAL(resRead.GetData().size(), 0); } // Disable TTL @@ -279,15 +281,15 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, PlanCommit(runtime, sender, ++planStep, txId); if (internal) { - TriggerTTL(runtime, sender, {++planStep, ++txId}, {}, 0, spec.GetTtlColumn()); + TriggerTTL(runtime, sender, {++planStep, ++txId}, {}, 0, spec.TtlColumn); } else { - TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[0] - 1, spec.GetTtlColumn()); + TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[0] - 1, spec.TtlColumn); } { --planStep; auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, Max<ui64>(), tableId); - Proto(read.get()).AddColumnNames(spec.GetTtlColumn()); + Proto(read.get()).AddColumnNames(spec.TtlColumn); ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release()); auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); @@ -302,13 +304,12 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {}, UNIT_ASSERT(resRead.GetData().size() > 0); auto& schema = resRead.GetMeta().GetSchema(); - UNIT_ASSERT(CheckSame(resRead.GetData(), schema, portionSize, spec.GetTtlColumn(), ts[0])); + UNIT_ASSERT(CheckSame(resRead.GetData(), schema, PORTION_ROWS, spec.TtlColumn, ts[0])); } } class TCountersContainer { private: - ui32 RestartTabletOnPutData = 0; ui32 SuccessCounterStart = 0; public: ui32 UnknownsCounter = 0; @@ -316,32 +317,17 @@ public: ui32 ErrorsCounter = 0; ui32 ResponsesCounter = 0; - TCountersContainer& SetRestartTabletOnPutData(const ui32 value) { - RestartTabletOnPutData = value; - return *this; - } - - bool PopRestartTabletOnPutData() { - if (!RestartTabletOnPutData) { - return false; - } - --RestartTabletOnPutData; - return true; - } TString SerializeToString() const { TStringBuilder sb; sb << "EXPORTS INFO: " << SuccessCounter << "/" << ErrorsCounter << "/" << UnknownsCounter << "/" << ResponsesCounter; return sb; } - void WaitEvents(TTestBasicRuntime& runtime, const TActorId sender, const ui32 attemption, const ui32 expectedDeltaSuccess, const TDuration timeout) { + void WaitEvents(TTestBasicRuntime& runtime, const ui32 attemption, const ui32 expectedDeltaSuccess, const TDuration timeout) { const TInstant startInstant = TAppData::TimeProvider->Now(); const TInstant deadline = startInstant + timeout; Cerr << "START_WAITING(" << attemption << "): " << SerializeToString() << Endl; while (TAppData::TimeProvider->Now() < deadline) { - if (PopRestartTabletOnPutData()) { - RebootTablet(runtime, TTestTxConfig::TxTablet0, sender); - } Cerr << "IN_WAITING(" << attemption << "):" << SerializeToString() << Endl; runtime.SimulateSleep(TDuration::Seconds(1)); UNIT_ASSERT(ErrorsCounter == 0); @@ -358,7 +344,7 @@ public: if (expectedDeltaSuccess) { UNIT_ASSERT(SuccessCounter >= SuccessCounterStart + expectedDeltaSuccess); } else { - UNIT_ASSERT(SuccessCounter == SuccessCounterStart); + UNIT_ASSERT_VALUES_EQUAL(SuccessCounter, SuccessCounterStart); } Cerr << "FINISH_WAITING(" << attemption << "): " << SerializeToString() << Endl; SuccessCounterStart = SuccessCounter; @@ -370,7 +356,7 @@ private: TCountersContainer* Counters = nullptr; TTestBasicRuntime& Runtime; const TActorId Sender; -private: + template <class TPrivateEvent> static TPrivateEvent* TryGetPrivateEvent(TAutoPtr<IEventHandle>& ev) { if (ev->GetTypeRewrite() != TPrivateEvent::EventType) { @@ -378,8 +364,8 @@ private: } return dynamic_cast<TPrivateEvent*>(ev->GetBase()); } -public: +public: TEventsCounter(TCountersContainer& counters, TTestBasicRuntime& runtime, const TActorId sender) : Counters(&counters) , Runtime(runtime) @@ -388,6 +374,7 @@ public: Y_UNUSED(Runtime); Y_UNUSED(Sender); } + bool operator()(TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { TStringBuilder ss; if (auto* msg = TryGetPrivateEvent<NColumnShard::TEvPrivate::TEvExport>(ev)) { @@ -402,7 +389,10 @@ public: ss << "(" << ++Counters->UnknownsCounter << "): UNKNOWN"; } } else if (auto* msg = TryGetPrivateEvent<NWrappers::NExternalStorage::TEvPutObjectResponse>(ev)) { - ss << "S3_RESPONSE(" << ++Counters->ResponsesCounter << "):"; + ss << "S3_RESPONSE(put " << ++Counters->ResponsesCounter << "):"; + } else if (auto* msg = TryGetPrivateEvent<NWrappers::NExternalStorage::TEvDeleteObjectResponse>(ev)) { + ss << "(" << ++Counters->SuccessCounter << "): DELETE SUCCESS"; + ss << "S3_RESPONSE(delete " << ++Counters->ResponsesCounter << "):"; } else { return false; } @@ -412,8 +402,10 @@ public: }; }; -std::vector<std::pair<ui32, ui64>> -TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTestSchema::TTableSpecials>& specs, const ui32 startTieringIndex) { +std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TString>& blobs, + const std::vector<TTestSchema::TTableSpecials>& specs, + const ui32 initialEviction) +{ TTestBasicRuntime runtime; TTester::Setup(runtime); @@ -460,16 +452,16 @@ TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTe TAutoPtr<IEventHandle> handle; - std::vector<std::pair<ui32, ui64>> resColumns; - resColumns.reserve(specs.size()); + std::vector<std::pair<ui32, ui64>> specRowsBytes; + specRowsBytes.reserve(specs.size()); TCountersContainer counter; runtime.SetEventFilter(TEventsCounter(counter, runtime, sender)); for (ui32 i = 0; i < specs.size(); ++i) { - bool hasEvictionSettings = false; + bool hasColdEviction = false; for (auto&& i : specs[i].Tiers) { if (!!i.S3) { - hasEvictionSettings = true; + hasColdEviction = true; break; } } @@ -483,20 +475,21 @@ TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTe PlanSchemaTx(runtime, sender, { planStep, txId }); } } - if (specs[i].Tiers.size()) { + if (specs[i].HasTiers()) { ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(specs[i])); } - counter.SetRestartTabletOnPutData(reboots ? 1 : 0); - TriggerTTL(runtime, sender, { ++planStep, ++txId }, {}, 0, specs[i].GetTtlColumn()); - if (hasEvictionSettings) { - if (i == startTieringIndex + 1 || i == startTieringIndex + 2) { - counter.WaitEvents(runtime, sender, i, 1, TDuration::Seconds(40)); + TriggerTTL(runtime, sender, { ++planStep, ++txId }, {}, 0, specs[i].TtlColumn); + if (hasColdEviction) { + Cerr << "Cold tiering, spec " << i << ", num tiers: " << specs[i].Tiers.size() << "\n"; + if (i > initialEviction) { + counter.WaitEvents(runtime, i, 1, TDuration::Seconds(40)); } else { - counter.WaitEvents(runtime, sender, i, 0, TDuration::Seconds(20)); + counter.WaitEvents(runtime, i, 0, TDuration::Seconds(20)); } } else { - counter.WaitEvents(runtime, sender, i, 0, TDuration::Seconds(5)); + Cerr << "Hot tiering, spec " << i << ", num tiers: " << specs[i].Tiers.size() << "\n"; + counter.WaitEvents(runtime, i, 0, TDuration::Seconds(4)); } if (reboots) { ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(specs[i])); @@ -506,10 +499,10 @@ TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTe --planStep; auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, Max<ui64>(), tableId); - Proto(read.get()).AddColumnNames(specs[i].GetTtlColumn()); + Proto(read.get()).AddColumnNames(specs[i].TtlColumn); ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release()); - resColumns.emplace_back(0, 0); + specRowsBytes.emplace_back(0, 0); ui32 idx = 0; while (true) { auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); @@ -526,17 +519,15 @@ TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTe } auto& meta = resRead.GetMeta(); auto& schema = meta.GetSchema(); - auto pkColumn = GetFirstPKColumn(resRead.GetData(), schema, specs[i].GetTtlColumn()); - UNIT_ASSERT(pkColumn); - UNIT_ASSERT(pkColumn->type_id() == arrow::Type::TIMESTAMP); + auto ttlColumn = DeserializeColumn(resRead.GetData(), schema, specs[i].TtlColumn); + UNIT_ASSERT(ttlColumn); - auto tsColumn = std::static_pointer_cast<arrow::TimestampArray>(pkColumn); - resColumns.back().first += tsColumn->length(); + specRowsBytes.back().first += ttlColumn->length(); if (resRead.GetFinished()) { UNIT_ASSERT(meta.HasReadStats()); auto& readStats = meta.GetReadStats(); ui64 numBytes = readStats.GetDataBytes(); // compressed bytes in storage - resColumns.back().second += numBytes; + specRowsBytes.back().second += numBytes; break; } } @@ -546,90 +537,157 @@ TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTe } } - return resColumns; + return specRowsBytes; } -void TestTwoTiers(const TTestSchema::TTableSpecials& spec, bool compressed, bool reboots, const EStartTtlSettings startConf) { - const std::vector<ui64> ts = { 1600000000, 1620000000 }; - ui64 nowSec = TInstant::Now().Seconds(); - - ui32 portionSize = 80 * 1000; - ui32 overlapSize = 40 * 1000; - std::vector<TString> blobs = MakeData(ts, portionSize, overlapSize, spec.GetTtlColumn()); - - ui64 allowBoth = nowSec - ts[0] + 600; - ui64 allowOne = nowSec - ts[1] + 600; - ui64 allowNone = nowSec - ts[1] - 600; - ui32 startTieringIndex = 0; - std::vector<TTestSchema::TTableSpecials> alters; - if (startConf != EStartTtlSettings::Tiering) { - if (startConf == EStartTtlSettings::None) { - alters.emplace_back(TTestSchema::TTableSpecials()); +class TEvictionChanges { +public: + void AddTierAlters(const TTestSchema::TTableSpecials& spec, const std::vector<TDuration>&& borders, + std::vector<TTestSchema::TTableSpecials>& alters) const { + UNIT_ASSERT_EQUAL(borders.size(), 3); + UNIT_ASSERT(spec.Tiers.size()); + + alters.reserve(alters.size() + spec.Tiers.size() + 1); + + if (spec.Tiers.size() == 1) { + alters.push_back(MakeAlter(spec, {borders[0]})); // <tier0 border>, data[0], data[1] + alters.push_back(MakeAlter(spec, {borders[1]})); // data[0], <tier0 border>, data[1] + alters.push_back(MakeAlter(spec, {borders[2]})); // data[0], data[1], <tier0 border> + } else if (spec.Tiers.size() == 2) { + alters.push_back(MakeAlter(spec, {borders[0], borders[0]})); // <tier1 border>, <tier0 border>, data[0], data[1] + alters.push_back(MakeAlter(spec, {borders[1], borders[0]})); // <tier1 border>, data[0], <tier0 border>, data[1] + alters.push_back(MakeAlter(spec, {borders[2], borders[1]})); // data[0], <tier1 border>, data[1], <tier0 border> + alters.push_back(MakeAlter(spec, {borders[2], borders[2]})); // data[0], data[1], <tier1 border>, <tier0 border> } - if (startConf == EStartTtlSettings::Ttl) { - alters.emplace_back(TTestSchema::TTableSpecials()); - alters.back().SetTtlColumn("timestamp"); - alters.back().SetEvictAfterSeconds(allowBoth); + } + + void AddTtlAlters(const TTestSchema::TTableSpecials& spec, const std::vector<TDuration>&& borders, + std::vector<TTestSchema::TTableSpecials>& alters) const { + UNIT_ASSERT_EQUAL(borders.size(), 3); + UNIT_ASSERT(spec.Tiers.size()); + + TTestSchema::TTableSpecials specTtl(spec); + if (spec.Tiers.size() == 1) { + specTtl = MakeAlter(spec, {borders[0]}); // <tier0 border>, data[0], data[1] + } else if (spec.Tiers.size() == 2) { + specTtl = MakeAlter(spec, {borders[0], borders[0]}); // <tier1 border>, <tier0 border>, data[0], data[1] } - std::vector<TString> blobsOld = MakeData({ 1500000000, 1620000000 }, portionSize, overlapSize, spec.GetTtlColumn()); - blobs.emplace_back(std::move(blobsOld[0])); - blobs.emplace_back(std::move(blobsOld[1])); + + alters.reserve(alters.size() + borders.size()); + alters.push_back(specTtl.SetTtl(borders[0])); // <ttl border>, data[0], data[1] + alters.push_back(specTtl.SetTtl(borders[1])); // data[0], <ttl border>, data[1] + alters.push_back(specTtl.SetTtl(borders[2])); // data[0], data[1], <ttl border> } - startTieringIndex = alters.size(); - alters.resize(alters.size() + 4, spec); - alters[startTieringIndex].Tiers[0].SetEvictAfterSeconds(allowBoth); // tier0 allows/has: data[0], data[1] - alters[startTieringIndex].Tiers[1].SetEvictAfterSeconds(allowBoth); // tier1 allows: data[0], data[1], has: nothing - alters[startTieringIndex + 1].Tiers[0].SetEvictAfterSeconds(allowOne); // tier0 allows/has: data[1] - alters[startTieringIndex + 1].Tiers[1].SetEvictAfterSeconds(allowBoth); // tier1 allows: data[0], data[1], has: data[0] + static void Assert(const TTestSchema::TTableSpecials& spec, + const std::vector<std::pair<ui32, ui64>>& rowsBytes, + size_t initialEviction) { + UNIT_ASSERT_VALUES_EQUAL(rowsBytes[initialEviction].first, 2 * PORTION_ROWS); + UNIT_ASSERT(rowsBytes[initialEviction].second); + if (spec.Tiers.size() > 1) { + UNIT_ASSERT_VALUES_EQUAL(rowsBytes[initialEviction].first, rowsBytes[initialEviction + 1].first); + } - alters[startTieringIndex + 2].Tiers[0].SetEvictAfterSeconds(allowNone); // tier0 allows/has: nothing - alters[startTieringIndex + 2].Tiers[1].SetEvictAfterSeconds(allowOne); // tier1 allows/has: data[1] + UNIT_ASSERT_VALUES_EQUAL(rowsBytes[rowsBytes.size() - 2].first, PORTION_ROWS); + UNIT_ASSERT(rowsBytes[rowsBytes.size() - 2].second); - alters[startTieringIndex + 3].Tiers[0].SetEvictAfterSeconds(allowNone); // tier0 allows/has: nothing - alters[startTieringIndex + 3].Tiers[1].SetEvictAfterSeconds(allowNone); // tier1 allows/has: nothing + UNIT_ASSERT_VALUES_EQUAL(rowsBytes.back().first, 0); + UNIT_ASSERT_VALUES_EQUAL(rowsBytes.back().second, 0); + } - auto columns = TestTiers(reboots, blobs, alters, startTieringIndex); +private: + TTestSchema::TTableSpecials MakeAlter(const TTestSchema::TTableSpecials& spec, + const std::vector<TDuration>& tierBorders) const { + UNIT_ASSERT_EQUAL(spec.Tiers.size(), tierBorders.size()); - for (auto&& i : columns) { - Cerr << i.first << "/" << i.second << Endl; + TTestSchema::TTableSpecials alter(spec); // same TTL, Codec, etc. + for (size_t i = 0; i < tierBorders.size(); ++i) { + alter.Tiers[i].EvictAfter = tierBorders[i]; + } + return alter; } +}; - UNIT_ASSERT_EQUAL(columns.size(), alters.size()); - UNIT_ASSERT(columns[startTieringIndex].second); - UNIT_ASSERT(columns[startTieringIndex].first); +TTestSchema::TTableSpecials InitialSpec(const EInitialEviction init, TDuration initTs) { + TTestSchema::TTableSpecials spec; + if (init == EInitialEviction::Ttl) { + spec.TtlColumn = "timestamp"; + spec.EvictAfter = initTs; + } + return spec; +} - UNIT_ASSERT(columns[startTieringIndex + 1].second); - UNIT_ASSERT(columns[startTieringIndex + 1].first); +std::vector<std::pair<ui32, ui64>> TestTiersAndTtl(const TTestSchema::TTableSpecials& spec, bool reboots, + EInitialEviction init, bool testTtl = false) { + const std::vector<ui64> ts = { 1600000000, 1620000000 }; - UNIT_ASSERT(columns[startTieringIndex + 2].second); - UNIT_ASSERT(columns[startTieringIndex + 2].first); + ui32 overlapSize = 40 * 1000; + std::vector<TString> blobs = MakeData(ts, PORTION_ROWS, overlapSize, spec.TtlColumn); + if (init != EInitialEviction::Tiering) { + std::vector<TString> preload = MakeData({ 1500000000, 1620000000 }, PORTION_ROWS, overlapSize, spec.TtlColumn); + blobs.emplace_back(std::move(preload[0])); + blobs.emplace_back(std::move(preload[1])); + } - UNIT_ASSERT(!columns[startTieringIndex + 3].first); - UNIT_ASSERT(!columns[startTieringIndex + 3].second); + TInstant now = TInstant::Now(); + TDuration allowBoth = TDuration::Seconds(now.Seconds() - ts[0] + 600); + TDuration allowOne = TDuration::Seconds(now.Seconds() - ts[1] + 600); + TDuration allowNone = TDuration::Seconds(now.Seconds() - ts[1] - 600); - UNIT_ASSERT_EQUAL(columns[startTieringIndex].first, 2 * portionSize/* - overlapSize*/); - UNIT_ASSERT_EQUAL(columns[startTieringIndex].first, columns[startTieringIndex + 1].first); - UNIT_ASSERT_EQUAL(columns[startTieringIndex + 2].first, portionSize); + std::vector<TTestSchema::TTableSpecials> alters = { InitialSpec(init, allowBoth) }; + size_t initialEviction = alters.size(); - if (compressed) { - UNIT_ASSERT_GT(columns[startTieringIndex].second, columns[startTieringIndex + 1].second); + TEvictionChanges changes; + if (testTtl) { + changes.AddTtlAlters(spec, {allowBoth, allowOne, allowNone}, alters); } else { - UNIT_ASSERT_EQUAL(columns[startTieringIndex].second, columns[startTieringIndex + 1].second); + changes.AddTierAlters(spec, {allowBoth, allowOne, allowNone}, alters); + } + + auto rowsBytes = TestTiers(reboots, blobs, alters, initialEviction); + for (auto&& i : rowsBytes) { + Cerr << i.first << "/" << i.second << Endl; + } + + UNIT_ASSERT_EQUAL(rowsBytes.size(), alters.size()); + + if (!testTtl) { // TODO + changes.Assert(spec, rowsBytes, initialEviction); } + return rowsBytes; } -void TestTwoHotTiers(bool reboot) { +void TestTwoHotTiers(bool reboot, bool changeTtl, const EInitialEviction initial = EInitialEviction::None) { TTestSchema::TTableSpecials spec; spec.SetTtlColumn("timestamp"); spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier0").SetTtlColumn("timestamp")); spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier1").SetTtlColumn("timestamp")); spec.Tiers.back().SetCodec("zstd"); - TestTwoTiers(spec, true, reboot, EStartTtlSettings::None); + auto rowsBytes = TestTiersAndTtl(spec, reboot, initial, changeTtl); + if (changeTtl) { + UNIT_ASSERT_VALUES_EQUAL(rowsBytes.size(), 4); + UNIT_ASSERT_VALUES_EQUAL(rowsBytes[0].first, 3 * PORTION_ROWS); + UNIT_ASSERT_VALUES_EQUAL(rowsBytes[1].first, 2 * PORTION_ROWS); + UNIT_ASSERT_VALUES_EQUAL(rowsBytes[2].first, PORTION_ROWS); + UNIT_ASSERT_VALUES_EQUAL(rowsBytes[3].first, 0); + } else { + UNIT_ASSERT_VALUES_EQUAL(rowsBytes.size(), 5); + if (initial == EInitialEviction::Ttl) { + UNIT_ASSERT_VALUES_EQUAL(rowsBytes[0].first, 2 * PORTION_ROWS); + } else { + UNIT_ASSERT_VALUES_EQUAL(rowsBytes[0].first, 3 * PORTION_ROWS); + } + UNIT_ASSERT_VALUES_EQUAL(rowsBytes[1].first, 2 * PORTION_ROWS); + UNIT_ASSERT_VALUES_EQUAL(rowsBytes[2].first, 2 * PORTION_ROWS); + UNIT_ASSERT_VALUES_EQUAL(rowsBytes[3].first, PORTION_ROWS); + UNIT_ASSERT_VALUES_EQUAL(rowsBytes[4].first, 0); + + UNIT_ASSERT(rowsBytes[1].second > rowsBytes[2].second); // compression works + } } -void TestHotAndColdTiers(bool reboot, const EStartTtlSettings startConf) { +void TestHotAndColdTiers(bool reboot, const EInitialEviction initial) { const TString bucket = "tiering-test-01"; TPortManager portManager; const ui16 port = portManager.GetPort(); @@ -664,7 +722,7 @@ void TestHotAndColdTiers(bool reboot, const EStartTtlSettings startConf) { s3Config.SetConnectionTimeoutMs(10000); } - TestTwoTiers(spec, false, reboot, startConf); + TestTiersAndTtl(spec, reboot, initial); } void TestDrop(bool reboots) { @@ -695,9 +753,7 @@ void TestDrop(bool reboots) { // - static const ui32 portionSize = 80 * 1000; - - TString data1 = MakeTestBlob({0, portionSize}, testYdbSchema); + TString data1 = MakeTestBlob({0, PORTION_ROWS}, testYdbSchema); UNIT_ASSERT(data1.size() > NColumnShard::TLimits::MIN_BYTES_TO_INSERT); UNIT_ASSERT(data1.size() < 7 * 1024 * 1024); @@ -920,41 +976,63 @@ Y_UNIT_TEST_SUITE(TColumnShardTestSchema) { TestTtl(true, false, specs); } + // TODO: EnableOneTierAfterTtl, EnableTtlAfterOneTier + Y_UNIT_TEST(HotTiers) { - TestTwoHotTiers(false); + TestTwoHotTiers(false, false); } Y_UNIT_TEST(RebootHotTiers) { - NColumnShard::gAllowLogBatchingDefaultValue = false; - TestTwoHotTiers(true); + TestTwoHotTiers(true, false); } - Y_UNIT_TEST(ColdTiers) { - TestHotAndColdTiers(false, EStartTtlSettings::Tiering); + Y_UNIT_TEST(HotTiersTtl) { + NColumnShard::gAllowLogBatchingDefaultValue = false; + TestTwoHotTiers(false, true); } - Y_UNIT_TEST(ColdTiersWithNoneTtlTiering) { - TestHotAndColdTiers(false, EStartTtlSettings::None); + Y_UNIT_TEST(RebootHotTiersTtl) { + NColumnShard::gAllowLogBatchingDefaultValue = false; + TestTwoHotTiers(true, true); } - Y_UNIT_TEST(ColdTiersWithTtlTiering) { - TestHotAndColdTiers(false, EStartTtlSettings::Ttl); + Y_UNIT_TEST(HotTiersAfterTtl) { + TestTwoHotTiers(false, false, EInitialEviction::Ttl); } - Y_UNIT_TEST(ColdTiersWithNoneTtlTieringAndReboot) { - TestHotAndColdTiers(true, EStartTtlSettings::None); + Y_UNIT_TEST(RebootHotTiersAfterTtl) { + TestTwoHotTiers(true, false, EInitialEviction::Ttl); } - Y_UNIT_TEST(ColdTiersWithTtlTieringAndReboot) { - TestHotAndColdTiers(true, EStartTtlSettings::Ttl); + // TODO: EnableTtlAfterHotTiers + + Y_UNIT_TEST(ColdTiers) { + TestHotAndColdTiers(false, EInitialEviction::Tiering); } Y_UNIT_TEST(RebootColdTiers) { - // Disabled KIKIMR-14942 //NColumnShard::gAllowLogBatchingDefaultValue = false; - //TestHotAndColdTiers(true); + TestHotAndColdTiers(true, EInitialEviction::Tiering); + } + + Y_UNIT_TEST(EnableColdTiersAfterNoEviction) { + TestHotAndColdTiers(false, EInitialEviction::None); + } + + Y_UNIT_TEST(RebootEnableColdTiersAfterNoEviction) { + TestHotAndColdTiers(true, EInitialEviction::None); + } + + Y_UNIT_TEST(EnableColdTiersAfterTtl) { + TestHotAndColdTiers(false, EInitialEviction::Ttl); + } + + Y_UNIT_TEST(RebootEnableColdTiersAfterTtl) { + TestHotAndColdTiers(true, EInitialEviction::Ttl); } + // TODO: EnableTtlAfterColdTiers + Y_UNIT_TEST(Drop) { TestDrop(false); } diff --git a/ydb/core/tx/columnshard/write_actor.cpp b/ydb/core/tx/columnshard/write_actor.cpp index 6558c08c3dc..1ba8b01cc82 100644 --- a/ydb/core/tx/columnshard/write_actor.cpp +++ b/ydb/core/tx/columnshard/write_actor.cpp @@ -224,23 +224,26 @@ public: TString accumulatedBlob; TVector<std::pair<size_t, TString>> recordsInBlob; - size_t portionsToWrite = indexChanges->AppendedPortions.size(); - bool appended = true; - if (indexChanges->PortionsToEvict.size()) { - Y_VERIFY(portionsToWrite == 0); - portionsToWrite = indexChanges->PortionsToEvict.size(); - appended = false; - } + Y_VERIFY(indexChanges->AppendedPortions.empty() || indexChanges->PortionsToEvict.empty()); + size_t portionsToWrite = indexChanges->AppendedPortions.size() + indexChanges->PortionsToEvict.size(); + bool eviction = indexChanges->PortionsToEvict.size() > 0; for (size_t pos = 0; pos < portionsToWrite; ++pos) { - auto& portionInfo = appended ? indexChanges->AppendedPortions[pos] - : indexChanges->PortionsToEvict[pos].first; + auto& portionInfo = eviction ? indexChanges->PortionsToEvict[pos].first + : indexChanges->AppendedPortions[pos]; auto& records = portionInfo.Records; accumulatedBlob.clear(); recordsInBlob.clear(); + // There could be eviction mix between normal eviction and eviction without data changes + // TODO: better portions to blobs mathching + if (eviction && !indexChanges->PortionsToEvict[pos].second.DataChanges) { + continue; + } + for (size_t i = 0; i < records.size(); ++i, ++blobsPos) { + Y_VERIFY(blobsPos < blobs.size()); const TString& currentBlob = blobs[blobsPos]; Y_VERIFY(currentBlob.size()); diff --git a/ydb/core/tx/tiering/manager.cpp b/ydb/core/tx/tiering/manager.cpp index fc810bcfd8b..78ed9e33b05 100644 --- a/ydb/core/tx/tiering/manager.cpp +++ b/ydb/core/tx/tiering/manager.cpp @@ -101,7 +101,7 @@ bool TManager::Start(std::shared_ptr<NMetadata::NSecret::TSnapshot> secrets) { CreateS3Actor(TabletId, TabletActorId, Config.GetTierName()) ); auto s3Config = Config.GetPatchedConfig(secrets); - + ctx.Send(newActor, new TEvPrivate::TEvS3Settings(s3Config)); StorageActorId = newActor; #endif @@ -115,16 +115,7 @@ TManager::TManager(const ui64 tabletId, const NActors::TActorId& tabletActorId, { } -NOlap::TStorageTier TManager::BuildTierStorage() const { - NOlap::TStorageTier result; - result.Name = Config.GetTierName(); - if (Config.GetProtoConfig().HasCompression()) { - result.Compression = ConvertCompression(Config.GetProtoConfig().GetCompression()); - } - return result; -} - -NKikimr::NOlap::TCompression TManager::ConvertCompression(const NKikimrSchemeOp::TCompressionOptions& compression) { +NKikimr::NOlap::TCompression ConvertCompression(const NKikimrSchemeOp::TCompressionOptions& compression) { NOlap::TCompression out; if (compression.HasCompressionCodec()) { switch (compression.GetCompressionCodec()) { @@ -223,19 +214,26 @@ NMetadata::NFetcher::ISnapshotsFetcher::TPtr TTiersManager::GetExternalDataManip return ExternalDataManipulation; } -THashMap<ui64, NKikimr::NOlap::TTiersInfo> TTiersManager::GetTiering() const { - THashMap<ui64, NKikimr::NOlap::TTiersInfo> result; +THashMap<ui64, NKikimr::NOlap::TTiering> TTiersManager::GetTiering() const { + THashMap<ui64, NKikimr::NOlap::TTiering> result; if (!Snapshot) { return result; } auto snapshotPtr = std::dynamic_pointer_cast<NTiers::TConfigsSnapshot>(Snapshot); Y_VERIFY(snapshotPtr); + auto& tierConfigs = snapshotPtr->GetTierConfigs(); for (auto&& i : PathIdTiering) { auto* tiering = snapshotPtr->GetTieringById(i.second); - if (!tiering) { - - } else { - result.emplace(i.first, tiering->BuildTiersInfo()); + if (tiering) { + result.emplace(i.first, tiering->BuildOlapTiers()); + for (auto& [pathId, pathTiering] : result) { + for (auto& [name, tier] : pathTiering.TierByName) { + auto it = tierConfigs.find(name); + if (it != tierConfigs.end()) { + tier->Compression = NTiers::ConvertCompression(it->second.GetProtoConfig().GetCompression()); + } + } + } } } return result; diff --git a/ydb/core/tx/tiering/manager.h b/ydb/core/tx/tiering/manager.h index e449bb26611..1ddf79da156 100644 --- a/ydb/core/tx/tiering/manager.h +++ b/ydb/core/tx/tiering/manager.h @@ -12,6 +12,8 @@ namespace NKikimr::NColumnShard { namespace NTiers { +NOlap::TCompression ConvertCompression(const NKikimrSchemeOp::TCompressionOptions& compression); + class TManager { private: ui64 TabletId = 0; @@ -20,8 +22,6 @@ private: YDB_READONLY_DEF(NActors::TActorId, StorageActorId); public: TManager(const ui64 tabletId, const NActors::TActorId& tabletActorId, const TTierConfig& config); - static NOlap::TCompression ConvertCompression(const NKikimrSchemeOp::TCompressionOptions& compression); - NOlap::TStorageTier BuildTierStorage() const; TManager& Restart(const TTierConfig& config, std::shared_ptr<NMetadata::NSecret::TSnapshot> secrets); bool NeedExport() const { @@ -54,7 +54,7 @@ public: { } TActorId GetActorId() const; - THashMap<ui64, NOlap::TTiersInfo> GetTiering() const; + THashMap<ui64, NOlap::TTiering> GetTiering() const; void TakeConfigs(NMetadata::NFetcher::ISnapshot::TPtr snapshot, std::shared_ptr<NMetadata::NSecret::TSnapshot> secrets); void EnablePathId(const ui64 pathId, const TString& tieringId) { PathIdTiering.emplace(pathId, tieringId); diff --git a/ydb/core/tx/tiering/rule/object.cpp b/ydb/core/tx/tiering/rule/object.cpp index 5b1095edc68..0bd4fd00b5e 100644 --- a/ydb/core/tx/tiering/rule/object.cpp +++ b/ydb/core/tx/tiering/rule/object.cpp @@ -71,10 +71,12 @@ bool TTieringRule::DeserializeFromRecord(const TDecoder& decoder, const Ydb::Val return true; } -NKikimr::NOlap::TTiersInfo TTieringRule::BuildTiersInfo() const { - NOlap::TTiersInfo result(GetDefaultColumn()); +NKikimr::NOlap::TTiering TTieringRule::BuildOlapTiers() const { + NOlap::TTiering result; + TInstant now = Now(); // Do not put it in cycle: prevent tiers reorder with the same eviction time for (auto&& r : Intervals) { - result.AddTier(r.GetTierName(), Now() - r.GetDurationForEvict()); + TInstant evictionBorder = now - r.GetDurationForEvict(); + result.Add(std::make_shared<NOlap::TTierInfo>(r.GetTierName(), evictionBorder, GetDefaultColumn())); } return result; } diff --git a/ydb/core/tx/tiering/rule/object.h b/ydb/core/tx/tiering/rule/object.h index 5edc30c4579..a597d6204ce 100644 --- a/ydb/core/tx/tiering/rule/object.h +++ b/ydb/core/tx/tiering/rule/object.h @@ -87,7 +87,7 @@ public: }; NMetadata::NInternal::TTableRecord SerializeToRecord() const; bool DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& r); - NKikimr::NOlap::TTiersInfo BuildTiersInfo() const; + NKikimr::NOlap::TTiering BuildOlapTiers() const; }; } diff --git a/ydb/core/tx/tiering/ut/ut_tiers.cpp b/ydb/core/tx/tiering/ut/ut_tiers.cpp index 9d541f8b15c..fa36d606a65 100644 --- a/ydb/core/tx/tiering/ut/ut_tiers.cpp +++ b/ydb/core/tx/tiering/ut/ut_tiers.cpp @@ -143,7 +143,7 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { YDB_READONLY_FLAG(Found, false); YDB_ACCESSOR(ui32, ExpectedTieringsCount, 1); YDB_ACCESSOR(ui32, ExpectedTiersCount, 1); - + using TKeyCheckers = TMap<TString, TJsonChecker>; YDB_ACCESSOR_DEF(TKeyCheckers, Checkers); public: @@ -349,7 +349,6 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { .SetUseRealThreads(false) .SetEnableMetadataProvider(true) .SetEnableOlapSchemaOperations(true); - ; Tests::TServer::TPtr server = new Tests::TServer(serverSettings); server->EnableGRpc(grpcPort); @@ -509,7 +508,7 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { runtime.SimulateSleep(TDuration::Seconds(20)); Cerr << "Initialization tables" << Endl; Cerr << "Insert..." << Endl; - const TInstant pkStart = Now() - TDuration::Days(5); + const TInstant pkStart = Now() - TDuration::Days(15); ui32 idx = 0; lHelper.SendDataViaActorSystem("/Root/olapStore/olapTable", 0, (pkStart + TDuration::Seconds(2 * idx++)).GetValue(), 2000); { |