diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-07-28 11:40:25 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-07-28 11:40:25 +0300 |
commit | 7ea9b705953267d7fbc2e21b30451f84ebf519bb (patch) | |
tree | 96dd7a5a52d5459f366f85e4a0e52b3160478121 | |
parent | f573bf9c678a6748ab2b5e6eb4374986af7fae84 (diff) | |
download | ydb-7ea9b705953267d7fbc2e21b30451f84ebf519bb.tar.gz |
KIKIMR-18853: fix ttl force sometimes started
27 files changed, 349 insertions, 268 deletions
diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index ea961de553..32792aed4c 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -129,16 +129,15 @@ void TColumnShard::Handle(TEvPrivate::TEvReadFinished::TPtr& ev, const TActorCon void TColumnShard::Handle(TEvPrivate::TEvPeriodicWakeup::TPtr& ev, const TActorContext& ctx) { if (ev->Get()->Manual) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "TEvPrivate::TEvPeriodicWakeup::MANUAL")("tablet_id", TabletID()); EnqueueBackgroundActivities(); - return; - } - - if (LastPeriodicBackActivation < TInstant::Now() - ActivationPeriod) { + } else { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "TEvPrivate::TEvPeriodicWakeup")("tablet_id", TabletID()); SendWaitPlanStep(GetOutdatedStep()); - } - SendPeriodicStats(); - ctx.Schedule(ActivationPeriod, new TEvPrivate::TEvPeriodicWakeup()); + SendPeriodicStats(); + ctx.Schedule(ActivationPeriod, new TEvPrivate::TEvPeriodicWakeup()); + } } void TColumnShard::Handle(TEvMediatorTimecast::TEvRegisterTabletResult::TPtr& ev, const TActorContext&) { diff --git a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp index e713d87e0a..1579fc586e 100644 --- a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp +++ b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp @@ -250,9 +250,10 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex } if (statusMessage.empty()) { + const TInstant now = TlsActivationContext ? AppData()->TimeProvider->Now() : TInstant::Now(); for (ui64 pathId : ttlBody.GetPathIds()) { NOlap::TTiering tiering; - tiering.Ttl = NOlap::TTierInfo::MakeTtl(unixTime, columnName); + tiering.Ttl = NOlap::TTierInfo::MakeTtl(now - unixTime, columnName); pathTtls.emplace(pathId, std::move(tiering)); } } diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index ff0be97c5e..20594b328d 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -557,7 +557,6 @@ void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterP tableVerProto.SetSchemaPresetVersionAdj(alterProto.GetSchemaPresetVersionAdj()); TablesManager.AddTableVersion(pathId, version, tableVerProto, db); - TablesManager.OnTtlUpdate(); } void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProto, const TRowVersion& version, @@ -619,13 +618,7 @@ void TColumnShard::ScheduleNextGC(const TActorContext& ctx, bool cleanupOnly) { void TColumnShard::EnqueueBackgroundActivities(bool periodic, TBackgroundActivity activity) { TLogContextGuard gLogging(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletID())); - if (periodic) { - if (LastPeriodicBackActivation > TInstant::Now() - ActivationPeriod) { - CSCounters.OnTooEarly(); - return; - } - LastPeriodicBackActivation = TInstant::Now(); - } + ACFL_DEBUG("event", "EnqueueBackgroundActivities")("periodic", periodic)("activity", activity.DebugString()); CSCounters.OnStartBackground(); SendPeriodicStats(); @@ -743,7 +736,7 @@ void TColumnShard::SetupIndexation() { } Y_VERIFY(data.size()); - auto indexChanges = TablesManager.MutablePrimaryIndex().StartInsert(CompactionLimits.Get(), std::move(data)); + auto indexChanges = TablesManager.MutablePrimaryIndex().StartInsert(std::move(data)); if (!indexChanges) { LOG_S_NOTICE("Cannot prepare indexing at tablet " << TabletID()); return; @@ -753,9 +746,6 @@ void TColumnShard::SetupIndexation() { indexChanges->Start(*this); auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, Settings.CacheDataAfterIndexing, std::move(cachedBlobs)); - if (Tiers) { - ev->SetTiering(Tiers->GetTiering()); - } ActorContext().Send(IndexingActor, std::make_unique<TEvPrivate::TEvIndexing>(std::move(ev))); } @@ -788,10 +778,6 @@ void TColumnShard::SetupCompaction() { auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndex(); auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, Settings.CacheDataAfterCompaction); - if (Tiers) { - ev->SetTiering(Tiers->GetTiering()); - } - ActorContext().Send(CompactionActor, std::make_unique<TEvPrivate::TEvCompaction>(std::move(ev), *BlobManager)); } @@ -810,23 +796,9 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u return {}; } if (force) { - TablesManager.MutablePrimaryIndex().OnTieringModified(Tiers); + TablesManager.MutablePrimaryIndex().OnTieringModified(Tiers, TablesManager.GetTtl()); } THashMap<ui64, NOlap::TTiering> eviction = pathTtls; - if (eviction.empty()) { - if (Tiers) { - eviction = Tiers->GetTiering(); - } - TablesManager.AddTtls(eviction, AppData()->TimeProvider->Now(), force); - } - - if (eviction.empty()) { - if (Tiers || TablesManager.GetTtl().PathsCount()) { - LOG_S_DEBUG("TTL not started. No tables to activate it on (or delayed) at tablet " << TabletID()); - } - return {}; - } - for (auto&& i : eviction) { LOG_S_DEBUG("Prepare TTL evicting path " << i.first << " with " << i.second.GetDebugString() << " at tablet " << TabletID()); @@ -839,16 +811,12 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u LOG_S_INFO("Cannot prepare TTL at tablet " << TabletID()); return {}; } - if (indexChanges->NeedRepeat) { - TablesManager.OnTtlUpdate(); - } bool needWrites = !indexChanges->PortionsToEvict.empty(); LOG_S_INFO("TTL" << (needWrites ? " with writes" : "" ) << " prepared at tablet " << TabletID()); indexChanges->Start(*this); auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, false); - ev->SetTiering(eviction); return std::make_unique<TEvPrivate::TEvEviction>(std::move(ev), *BlobManager, needWrites); } @@ -862,7 +830,7 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() { NOlap::TSnapshot cleanupSnapshot{GetMinReadStep(), 0}; auto changes = - TablesManager.MutablePrimaryIndex().StartCleanup(cleanupSnapshot, CompactionLimits.Get(), TablesManager.MutablePathsToDrop(), TLimits::MAX_TX_RECORDS); + TablesManager.MutablePrimaryIndex().StartCleanup(cleanupSnapshot, TablesManager.MutablePathsToDrop(), TLimits::MAX_TX_RECORDS); if (!changes) { LOG_S_INFO("Cannot prepare cleanup at tablet " << TabletID()); return {}; @@ -1096,7 +1064,7 @@ void TColumnShard::ActivateTiering(const ui64 pathId, const TString& useTiering) if (!Tiers) { Tiers = std::make_shared<TTiersManager>(TabletID(), SelfId(), [this](const TActorContext& ctx){ - TablesManager.MutablePrimaryIndex().OnTieringModified(Tiers); + TablesManager.MutablePrimaryIndex().OnTieringModified(Tiers, TablesManager.GetTtl()); CleanForgottenBlobs(ctx); Reexport(ctx); }); diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 9c2b425dcf..7abcc80602 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -106,6 +106,16 @@ public: bool HasTtl() const { return Activity & TTL; } bool HasAll() const { return Activity == ALL; } + TString DebugString() const { + return TStringBuilder() + << "indexation:" << HasIndexation() << ";" + << "compaction:" << HasCompaction() << ";" + << "cleanup:" << HasCleanup() << ";" + << "ttl:" << HasTtl() << ";" + ; + + } + private: EBackActivity Activity = NONE; @@ -494,7 +504,6 @@ private: TDuration FailActivationDelay = TDuration::Seconds(1); TDuration StatsReportInterval = TDuration::Seconds(10); TInstant LastAccessTime; - TInstant LastPeriodicBackActivation; TInstant LastStatsReport; TActorId IndexingActor; // It's logically bounded to 1: we move each portion of data to multiple indices. diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h index 55050c38dc..396376c921 100644 --- a/ydb/core/tx/columnshard/columnshard_private_events.h +++ b/ydb/core/tx/columnshard/columnshard_private_events.h @@ -30,7 +30,6 @@ struct TEvPrivate { /// Common event for Indexing and GranuleCompaction: write index data in TTxWriteIndex transaction. struct TEvWriteIndex : public TEventLocal<TEvWriteIndex, EvWriteIndex> { NOlap::TVersionedIndex IndexInfo; - THashMap<ui64, NKikimr::NOlap::TTiering> Tiering; std::shared_ptr<NOlap::TColumnEngineChanges> IndexChanges; THashMap<TUnifiedBlobId, std::shared_ptr<arrow::RecordBatch>> CachedBlobs; std::vector<TString> Blobs; @@ -53,11 +52,6 @@ struct TEvPrivate { PutResult = std::make_shared<TBlobPutResult>(NKikimrProto::UNKNOWN); } - TEvWriteIndex& SetTiering(const THashMap<ui64, NKikimr::NOlap::TTiering>& tiering) { - Tiering = tiering; - return *this; - } - const TBlobPutResult& GetPutResult() const { Y_VERIFY(PutResult); return *PutResult; diff --git a/ydb/core/tx/columnshard/columnshard_ttl.h b/ydb/core/tx/columnshard/columnshard_ttl.h index 0201d857b3..469ade115f 100644 --- a/ydb/core/tx/columnshard/columnshard_ttl.h +++ b/ydb/core/tx/columnshard/columnshard_ttl.h @@ -5,9 +5,6 @@ namespace NKikimr::NColumnShard { class TTtl { public: - static constexpr const ui64 DEFAULT_TTL_TIMEOUT_SEC = 60 * 60; - static constexpr const ui64 DEFAULT_REPEAT_TTL_TIMEOUT_SEC = 10; - struct TEviction { TDuration EvictAfter; TString ColumnName; @@ -68,21 +65,10 @@ public: PathTtls.erase(pathId); } - void AddTtls(THashMap<ui64, NOlap::TTiering>& eviction, TInstant now, bool force = false) { - if ((now < LastRegularTtl + TtlTimeout) && !force) { - return; - } - + void AddTtls(THashMap<ui64, NOlap::TTiering>& eviction) const { for (auto& [pathId, descr] : PathTtls) { - eviction[pathId].Ttl = Convert(descr, now); + eviction[pathId].Ttl = Convert(descr); } - - LastRegularTtl = now; - } - - void Repeat() { - LastRegularTtl -= TtlTimeout; - LastRegularTtl += RepeatTtlTimeout; } const THashSet<TString>& TtlColumns() const { return Columns; } @@ -90,16 +76,12 @@ public: private: THashMap<ui64, TDescription> PathTtls; // pathId -> ttl THashSet<TString> Columns; - TDuration TtlTimeout{TDuration::Seconds(DEFAULT_TTL_TIMEOUT_SEC)}; - TDuration RepeatTtlTimeout{TDuration::Seconds(DEFAULT_REPEAT_TTL_TIMEOUT_SEC)}; - TInstant LastRegularTtl; - std::shared_ptr<NOlap::TTierInfo> Convert(const TDescription& descr, TInstant timePoint) const + std::shared_ptr<NOlap::TTierInfo> Convert(const TDescription& descr) const { if (descr.Eviction) { auto& evict = descr.Eviction; - TInstant border = timePoint - evict->EvictAfter; - return NOlap::TTierInfo::MakeTtl(border, evict->ColumnName, evict->UnitsInSecond); + return NOlap::TTierInfo::MakeTtl(evict->EvictAfter, evict->ColumnName, evict->UnitsInSecond); } return {}; } diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h index 10a2aff00c..ed09885ace 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.h +++ b/ydb/core/tx/columnshard/columnshard_ut_common.h @@ -106,6 +106,7 @@ struct TTestSchema { struct TTableSpecials : public TStorageTier { std::vector<TStorageTier> Tiers; bool CompositeMarks = false; + bool WaitEmptyAfter = false; TTableSpecials() noexcept = default; diff --git a/ydb/core/tx/columnshard/compaction_actor.cpp b/ydb/core/tx/columnshard/compaction_actor.cpp index 149704ea01..d91f8053af 100644 --- a/ydb/core/tx/columnshard/compaction_actor.cpp +++ b/ydb/core/tx/columnshard/compaction_actor.cpp @@ -134,7 +134,7 @@ private: virtual bool DoExecute() override { auto guard = TxEvent->PutResult->StartCpuGuard(); - NOlap::TConstructionContext context(TxEvent->IndexInfo, TxEvent->Tiering, Counters); + NOlap::TConstructionContext context(TxEvent->IndexInfo, Counters); TxEvent->Blobs = std::move(TxEvent->IndexChanges->ConstructBlobs(context).DetachResult()); return true; } diff --git a/ydb/core/tx/columnshard/engines/changes/abstract.h b/ydb/core/tx/columnshard/engines/changes/abstract.h index 8f7b76b514..3df0bcc5ee 100644 --- a/ydb/core/tx/columnshard/engines/changes/abstract.h +++ b/ydb/core/tx/columnshard/engines/changes/abstract.h @@ -137,33 +137,14 @@ public: class TConstructionContext: TNonCopyable { public: - using TTieringsHash = THashMap<ui64, NKikimr::NOlap::TTiering>; -private: - const TTieringsHash* TieringMap = nullptr; -public: const TVersionedIndex& SchemaVersions; const NColumnShard::TIndexationCounters Counters; - TConstructionContext(const TVersionedIndex& schemaVersions, const TTieringsHash& tieringMap, const NColumnShard::TIndexationCounters counters) - : TieringMap(&tieringMap) - , SchemaVersions(schemaVersions) - , Counters(counters) - { - - } - TConstructionContext(const TVersionedIndex& schemaVersions, const NColumnShard::TIndexationCounters counters) : SchemaVersions(schemaVersions) , Counters(counters) { } - - const THashMap<ui64, NKikimr::NOlap::TTiering>& GetTieringMap() const { - if (TieringMap) { - return *TieringMap; - } - return Default<THashMap<ui64, NKikimr::NOlap::TTiering>>(); - } }; class TGranuleMeta; diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp index fe4c8b7ca7..f5f3741284 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp +++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp @@ -56,6 +56,7 @@ bool TInsertColumnEngineChanges::AddPathIfNotExists(ui64 pathId) { } void TInsertColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) { + TBase::DoStart(self); self.BackgroundController.StartIndexing(); } diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.cpp b/ydb/core/tx/columnshard/engines/changes/ttl.cpp index dc5cc467c6..10f0121032 100644 --- a/ydb/core/tx/columnshard/engines/changes/ttl.cpp +++ b/ydb/core/tx/columnshard/engines/changes/ttl.cpp @@ -174,7 +174,7 @@ bool TTTLColumnEngineChanges::UpdateEvictedPortion(TPortionInfo& portionInfo, TP TConstructionContext& context) const { Y_VERIFY(portionInfo.TierName != evictFeatures.TargetTierName); - auto* tiering = context.GetTieringMap().FindPtr(evictFeatures.PathId); + auto* tiering = Tiering.FindPtr(evictFeatures.PathId); Y_VERIFY(tiering); auto compression = tiering->GetCompression(evictFeatures.TargetTierName); if (!compression) { diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.h b/ydb/core/tx/columnshard/engines/changes/ttl.h index 3562e9494b..8745a1afe6 100644 --- a/ydb/core/tx/columnshard/engines/changes/ttl.h +++ b/ydb/core/tx/columnshard/engines/changes/ttl.h @@ -1,5 +1,6 @@ #pragma once #include "cleanup.h" +#include <ydb/core/tx/columnshard/engines/scheme/tier_info.h> namespace NKikimr::NOlap { @@ -9,6 +10,7 @@ private: using TBase = TCleanupColumnEngineChanges; THashMap<TString, TPathIdBlobs> ExportTierBlobs; ui64 ExportNo = 0; + bool UpdateEvictedPortion(TPortionInfo& portionInfo, TPortionEvictionFeatures& evictFeatures, const THashMap<TBlobRange, TString>& srcBlobs, std::vector<TColumnRecord>& evictedRecords, std::vector<TString>& newBlobs, TConstructionContext& context) const; @@ -27,6 +29,7 @@ protected: } public: std::vector<TColumnRecord> EvictedRecords; + THashMap<ui64, NOlap::TTiering> Tiering; std::vector<std::pair<TPortionInfo, TPortionEvictionFeatures>> PortionsToEvict; // {portion, TPortionEvictionFeatures} virtual THashMap<TUnifiedBlobId, std::vector<TBlobRange>> GetGroupedBlobRanges() const override; diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp index ac01441408..95928fc715 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp @@ -113,7 +113,7 @@ std::vector<NKikimr::NOlap::TPortionInfo> TChangesWithAppend::MakeAppendedPortio TString tierName; std::optional<NArrow::TCompression> compression; if (pathId) { - if (auto* tiering = context.GetTieringMap().FindPtr(pathId)) { + if (auto* tiering = TieringInfo.FindPtr(pathId)) { tierName = tiering->GetHottestTierName(); if (const auto& tierCompression = tiering->GetCompression(tierName)) { compression = *tierCompression; @@ -145,4 +145,10 @@ std::vector<NKikimr::NOlap::TPortionInfo> TChangesWithAppend::MakeAppendedPortio return out; } +void TChangesWithAppend::DoStart(NColumnShard::TColumnShard& self) { + if (self.Tiers) { + TieringInfo = self.Tiers->GetTiering(); + } +} + } diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.h b/ydb/core/tx/columnshard/engines/changes/with_appended.h index d526da6f5b..00e1ebbd91 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.h +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.h @@ -5,6 +5,8 @@ namespace NKikimr::NOlap { class TChangesWithAppend: public TColumnEngineChanges { +private: + THashMap<ui64, NOlap::TTiering> TieringInfo; protected: virtual void DoDebugString(TStringOutput& out) const override; virtual void DoCompile(TFinalizationContext& context) override; @@ -13,9 +15,7 @@ protected: virtual void DoWriteIndexComplete(NColumnShard::TColumnShard& /*self*/, TWriteIndexCompleteContext& /*context*/) override { } - virtual void DoStart(NColumnShard::TColumnShard& /*self*/) override { - - } + virtual void DoStart(NColumnShard::TColumnShard& self) override; std::vector<TPortionInfo> MakeAppendedPortions(const ui64 pathId, const std::shared_ptr<arrow::RecordBatch> batch, const ui64 granule, diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index fae205043e..1f48fce0fa 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -9,6 +9,7 @@ namespace NKikimr::NColumnShard { class TTiersManager; +class TTtl; } namespace NKikimr::NOlap { @@ -358,13 +359,13 @@ public: const THashSet<ui32>& columnIds, const TPKRangesFilter& pkRangesFilter) const = 0; virtual std::unique_ptr<TCompactionInfo> Compact(const TCompactionLimits& limits, const THashSet<ui64>& busyGranuleIds) = 0; - virtual std::shared_ptr<TInsertColumnEngineChanges> StartInsert(const TCompactionLimits& limits, std::vector<TInsertedData>&& dataToIndex) = 0; + virtual std::shared_ptr<TInsertColumnEngineChanges> StartInsert(std::vector<TInsertedData>&& dataToIndex) noexcept = 0; virtual std::shared_ptr<TCompactColumnEngineChanges> StartCompaction(std::unique_ptr<TCompactionInfo>&& compactionInfo, - const TCompactionLimits& limits) = 0; - virtual std::shared_ptr<TCleanupColumnEngineChanges> StartCleanup(const TSnapshot& snapshot, const TCompactionLimits& limits, THashSet<ui64>& pathsToDrop, - ui32 maxRecords) = 0; + const TCompactionLimits& limits) noexcept = 0; + virtual std::shared_ptr<TCleanupColumnEngineChanges> StartCleanup(const TSnapshot& snapshot, THashSet<ui64>& pathsToDrop, + ui32 maxRecords) noexcept = 0; virtual std::shared_ptr<TTTLColumnEngineChanges> StartTtl(const THashMap<ui64, TTiering>& pathEviction, - ui64 maxBytesToEvict = TCompactionLimits::DEFAULT_EVICTION_BYTES) = 0; + ui64 maxBytesToEvict = TCompactionLimits::DEFAULT_EVICTION_BYTES) noexcept = 0; virtual bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) noexcept = 0; virtual void UpdateDefaultSchema(const TSnapshot& snapshot, TIndexInfo&& info) = 0; //virtual void UpdateTableSchema(ui64 pathId, const TSnapshot& snapshot, TIndexInfo&& info) = 0; // TODO @@ -372,7 +373,7 @@ public: virtual const TColumnEngineStats& GetTotalStats() = 0; virtual ui64 MemoryUsage() const { return 0; } virtual TSnapshot LastUpdate() const { return TSnapshot::Zero(); } - virtual void OnTieringModified(std::shared_ptr<NColumnShard::TTiersManager> manager) = 0; + virtual void OnTieringModified(std::shared_ptr<NColumnShard::TTiersManager> manager, const NColumnShard::TTtl& ttl) = 0; }; } diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 1d37b22dbd..9d3b359b7a 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -5,6 +5,8 @@ #include <ydb/core/tx/columnshard/hooks/abstract/abstract.h> #include <ydb/core/formats/arrow/one_batch_input_stream.h> #include <ydb/core/formats/arrow/merging_sorted_input_stream.h> +#include <ydb/core/tx/tiering/manager.h> +#include <ydb/core/tx/columnshard/columnshard_ttl.h> #include <ydb/library/conclusion/status.h> #include "changes/indexation.h" #include "changes/in_granule_compaction.h" @@ -269,7 +271,7 @@ bool TColumnEngineForLogs::LoadCounters(IDbWrapper& db) { return CountersTable->Load(db, callback); } -std::shared_ptr<TInsertColumnEngineChanges> TColumnEngineForLogs::StartInsert(const TCompactionLimits& /*limits*/, std::vector<TInsertedData>&& dataToIndex) { +std::shared_ptr<TInsertColumnEngineChanges> TColumnEngineForLogs::StartInsert(std::vector<TInsertedData>&& dataToIndex) noexcept { Y_VERIFY(dataToIndex.size()); auto changes = TChangesConstructor::BuildInsertChanges(DefaultMark(), std::move(dataToIndex), LastSnapshot); @@ -306,7 +308,7 @@ std::shared_ptr<TInsertColumnEngineChanges> TColumnEngineForLogs::StartInsert(co } std::shared_ptr<TCompactColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std::unique_ptr<TCompactionInfo>&& info, - const TCompactionLimits& limits) { + const TCompactionLimits& limits) noexcept { const ui64 pathId = info->GetPlanCompaction().GetPathId(); Y_VERIFY(PathGranules.contains(pathId)); @@ -324,9 +326,7 @@ std::shared_ptr<TCompactColumnEngineChanges> TColumnEngineForLogs::StartCompacti } std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const TSnapshot& snapshot, - const TCompactionLimits& /*limits*/, - THashSet<ui64>& pathsToDrop, - ui32 maxRecords) { + THashSet<ui64>& pathsToDrop, ui32 maxRecords) noexcept { auto changes = TChangesConstructor::BuildCleanupChanges(snapshot); ui32 affectedRecords = 0; @@ -400,119 +400,160 @@ std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup( return changes; } -std::shared_ptr<TTTLColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THashMap<ui64, TTiering>& pathEviction, ui64 maxEvictBytes) { - if (pathEviction.empty()) { - return {}; - } - auto changes = TChangesConstructor::BuildTtlChanges(); +TDuration TColumnEngineForLogs::ProcessTiering(const ui64 pathId, const TTiering& ttl, TTieringProcessContext& context) const { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "ProcessTiering")("path_id", pathId)("ttl", ttl.GetDebugString()); ui64 evictionSize = 0; - bool allowEviction = true; ui64 dropBlobs = 0; - bool allowDrop = true; - auto& indexInfo = VersionedIndex.GetLastSchema()->GetIndexInfo(); - const TMonotonic nowMonotonic = TlsActivationContext ? AppData()->MonotonicTimeProvider->Now() : TMonotonic::Now(); - for (const auto& [pathId, ttl] : pathEviction) { - auto it = NextCheckInstantForTTL.find(pathId); - if (it != NextCheckInstantForTTL.end() && nowMonotonic < it->second) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_path_id_ttl")("path_id", pathId)("now", nowMonotonic)("expected", it->second); - continue; - } - auto itGranules = PathGranules.find(pathId); - if (itGranules == PathGranules.end()) { - continue; // It's not an error: allow TTL over multiple shards with different pathIds presented - } + Y_VERIFY(context.Changes->Tiering.emplace(pathId, ttl).second); - auto expireTimestampOpt = ttl.GetEvictBorder(); - Y_VERIFY(expireTimestampOpt); - auto expireTimestamp = *expireTimestampOpt; + TDuration dWaiting = TDuration::Minutes(5); + auto itGranules = PathGranules.find(pathId); + if (itGranules == PathGranules.end()) { + return dWaiting; + } - auto ttlColumnNames = ttl.GetTtlColumns(); - Y_VERIFY(ttlColumnNames.size() == 1); // TODO: support different ttl columns - ui32 ttlColumnId = indexInfo.GetColumnId(*ttlColumnNames.begin()); - std::optional<TDuration> dWaiting; - for (const auto& [ts, granule] : itGranules->second) { - auto spg = Granules[granule]; - Y_VERIFY(spg); + auto expireTimestampOpt = ttl.GetEvictInstant(context.Now); + Y_VERIFY(expireTimestampOpt); + auto expireTimestamp = *expireTimestampOpt; - for (auto& [portion, info] : spg->GetPortions()) { - if (!info.IsActive()) { - continue; - } + auto ttlColumnNames = ttl.GetTtlColumns(); + Y_VERIFY(ttlColumnNames.size() == 1); // TODO: support different ttl columns + ui32 ttlColumnId = indexInfo.GetColumnId(*ttlColumnNames.begin()); + for (const auto& [ts, granule] : itGranules->second) { + auto itGranule = Granules.find(granule); + auto spg = itGranule->second; + Y_VERIFY(spg); - allowEviction = (evictionSize <= maxEvictBytes); - allowDrop = (dropBlobs <= TCompactionLimits::MAX_BLOBS_TO_DELETE); - const bool tryEvictPortion = allowEviction && ttl.HasTiers() && info.EvictReady(TCompactionLimits::EVICT_HOT_PORTION_BYTES); + for (auto& [portion, info] : spg->GetPortions()) { + if (!info.IsActive()) { + continue; + } - if (auto max = info.MaxValue(ttlColumnId)) { - bool keep = false; - { - auto mpiOpt = ttl.ScalarToInstant(max); - Y_VERIFY(mpiOpt); - const TInstant maxTtlPortionInstant = *mpiOpt; - const TDuration d = maxTtlPortionInstant - expireTimestamp; - keep = !!d; - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "keep_detect")("max", maxTtlPortionInstant.Seconds())("expire", expireTimestamp.Seconds()); - if (d && (!dWaiting || *dWaiting > d)) { - dWaiting = d; - } + context.AllowEviction = (evictionSize <= context.MaxEvictBytes); + context.AllowDrop = (dropBlobs <= TCompactionLimits::MAX_BLOBS_TO_DELETE); + const bool tryEvictPortion = context.AllowEviction && ttl.HasTiers() && info.EvictReady(TCompactionLimits::EVICT_HOT_PORTION_BYTES); + + if (auto max = info.MaxValue(ttlColumnId)) { + bool keep = false; + { + auto mpiOpt = ttl.ScalarToInstant(max); + Y_VERIFY(mpiOpt); + const TInstant maxTtlPortionInstant = *mpiOpt; + const TDuration d = maxTtlPortionInstant - expireTimestamp; + keep = !!d; + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "keep_detect")("max", maxTtlPortionInstant.Seconds())("expire", expireTimestamp.Seconds()); + if (d && dWaiting > d) { + dWaiting = d; } + } - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_result")("keep", keep)("tryEvictPortion", tryEvictPortion)("allowDrop", allowDrop); - if (keep && tryEvictPortion) { - TString tierName; - for (auto& tierRef : ttl.GetOrderedTiers()) { - auto& tierInfo = tierRef.Get(); - if (!indexInfo.AllowTtlOverColumn(tierInfo.GetEvictColumnName())) { - SignalCounters.OnPortionNoTtlColumn(info.BlobsBytes()); - continue; - } - auto mpiOpt = tierInfo.ScalarToInstant(max); - Y_VERIFY(mpiOpt); - const TInstant maxTieringPortionInstant = *mpiOpt; - - const TDuration d = maxTieringPortionInstant - tierInfo.GetEvictBorder(); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "tiering")("max", maxTieringPortionInstant.Seconds())("evict", tierInfo.GetEvictBorder().Seconds()); - if (d) { - if (!dWaiting || *dWaiting > d) { - dWaiting = d; - } - tierName = tierInfo.GetName(); - } else { - break; - } + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_result")("keep", keep)("tryEvictPortion", tryEvictPortion)("allowDrop", context.AllowDrop); + if (keep && tryEvictPortion) { + TString tierName; + for (auto& tierRef : ttl.GetOrderedTiers()) { + auto& tierInfo = tierRef.Get(); + if (!indexInfo.AllowTtlOverColumn(tierInfo.GetEvictColumnName())) { + SignalCounters.OnPortionNoTtlColumn(info.BlobsBytes()); + continue; } - if (info.TierName != tierName) { - evictionSize += info.BlobsSizes().first; - const bool needExport = ttl.NeedExport(tierName); - changes->PortionsToEvict.emplace_back(info, TPortionEvictionFeatures(tierName, pathId, needExport)); - SignalCounters.OnPortionToEvict(info.BlobsBytes()); + auto mpiOpt = tierInfo.ScalarToInstant(max); + Y_VERIFY(mpiOpt); + const TInstant maxTieringPortionInstant = *mpiOpt; + + const TDuration d = maxTieringPortionInstant - tierInfo.GetEvictInstant(context.Now); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "tiering")("max", maxTieringPortionInstant.Seconds()) + ("evict", tierInfo.GetEvictInstant(context.Now).Seconds()); + if (d) { + if (dWaiting > d) { + dWaiting = d; + } + tierName = tierInfo.GetName(); + } else { + break; } } - if (!keep && allowDrop) { - dropBlobs += info.NumRecords(); - changes->PortionsToDrop.push_back(info); - SignalCounters.OnPortionToDrop(info.BlobsBytes()); + if (info.TierName != tierName) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "tiering switch detected")("from", info.TierName)("to", tierName); + evictionSize += info.BlobsSizes().first; + const bool needExport = ttl.NeedExport(tierName); + context.Changes->PortionsToEvict.emplace_back(info, TPortionEvictionFeatures(tierName, pathId, needExport)); + SignalCounters.OnPortionToEvict(info.BlobsBytes()); } - } else { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_not_max"); - SignalCounters.OnPortionNoBorder(info.BlobsBytes()); } + if (!keep && context.AllowDrop) { + dropBlobs += info.NumRecords(); + context.Changes->PortionsToDrop.push_back(info); + SignalCounters.OnPortionToDrop(info.BlobsBytes()); + } + } else { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_not_max"); + SignalCounters.OnPortionNoBorder(info.BlobsBytes()); + } + } + } + Y_VERIFY(!!dWaiting); + return dWaiting; +} + +bool TColumnEngineForLogs::DrainEvictionQueue(std::map<TMonotonic, std::vector<TEvictionsController::TTieringWithPathId>>& evictionsQueue, TTieringProcessContext& context) const { + const TMonotonic nowMonotonic = TlsActivationContext ? AppData()->MonotonicTimeProvider->Now() : TMonotonic::Now(); + bool hasChanges = false; + while (evictionsQueue.size() && evictionsQueue.begin()->first < nowMonotonic) { + hasChanges = true; + auto tierings = std::move(evictionsQueue.begin()->second); + evictionsQueue.erase(evictionsQueue.begin()); + for (auto&& i : tierings) { + auto itDuration = context.DurationsForced.find(i.GetPathId()); + if (itDuration == context.DurationsForced.end()) { + const TDuration dWaiting = ProcessTiering(i.GetPathId(), i.GetTieringInfo(), context); + evictionsQueue[nowMonotonic + dWaiting].emplace_back(std::move(i)); + } else { + evictionsQueue[nowMonotonic + itDuration->second].emplace_back(std::move(i)); } } - if (dWaiting) { - NextCheckInstantForTTL[pathId] = nowMonotonic + std::min<TDuration>(*dWaiting, TDuration::Minutes(5)); + } + + if (evictionsQueue.size()) { + if (evictionsQueue.begin()->first < nowMonotonic) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "stop scan")("reason", "too many data")("first", evictionsQueue.begin()->first)("now", nowMonotonic); + } else if (!hasChanges) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "stop scan")("reason", "too early")("first", evictionsQueue.begin()->first)("now", nowMonotonic); } else { - NextCheckInstantForTTL.erase(pathId); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "stop scan")("reason", "task_ready")("first", evictionsQueue.begin()->first)("now", nowMonotonic) + ("internal", hasChanges)("evict_portions", context.Changes->PortionsToEvict.size()) + ("drop_portions", context.Changes->PortionsToDrop.size()); } + } else { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "stop scan")("reason", "no data in queue"); + } + return hasChanges; +} + +std::shared_ptr<TTTLColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THashMap<ui64, TTiering>& pathEviction, ui64 maxEvictBytes) noexcept { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartTtl")("external", pathEviction.size()) + ("internal", EvictionsController.MutableNextCheckInstantForTierings().size()) + ; + auto changes = TChangesConstructor::BuildTtlChanges(); + + TTieringProcessContext context(maxEvictBytes, changes); + bool hasExternalChanges = false; + for (auto&& i : pathEviction) { + context.DurationsForced[i.first] = ProcessTiering(i.first, i.second, context); + hasExternalChanges = true; + } + + { + TLogContextGuard lGuard(TLogContextBuilder::Build()("queue", "ttl")("has_external", hasExternalChanges)); + DrainEvictionQueue(EvictionsController.MutableNextCheckInstantForTierings(), context); } if (changes->PortionsToDrop.empty() && changes->PortionsToEvict.empty()) { - return {}; + return nullptr; } - if (!allowEviction || !allowDrop) { + if (!context.AllowEviction || !context.AllowDrop) { changes->NeedRepeat = true; } return changes; @@ -810,9 +851,35 @@ std::unique_ptr<TCompactionInfo> TColumnEngineForLogs::Compact(const TCompaction } } -void TColumnEngineForLogs::OnTieringModified(std::shared_ptr<NColumnShard::TTiersManager> /*manager*/) { +void TColumnEngineForLogs::OnTieringModified(std::shared_ptr<NColumnShard::TTiersManager> manager, const NColumnShard::TTtl& ttl) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "OnTieringModified"); - NextCheckInstantForTTL.clear(); + std::optional<THashMap<ui64, TTiering>> tierings; + if (manager) { + tierings = manager->GetTiering(); + } + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "OnTieringModified") + ("new_count_tierings", tierings ? ::ToString(tierings->size()) : TString("undefined")) + ("new_count_ttls", ttl.PathsCount()); + EvictionsController.RefreshTierings(std::move(tierings), ttl); + +} + +TColumnEngineForLogs::TTieringProcessContext::TTieringProcessContext(const ui64 maxEvictBytes, std::shared_ptr<TTTLColumnEngineChanges> changes) + : Now(TlsActivationContext ? AppData()->TimeProvider->Now() : TInstant::Now()) + , MaxEvictBytes(maxEvictBytes) + , Changes(changes) +{ + +} + +void TEvictionsController::RefreshTierings(std::optional<THashMap<ui64, TTiering>>&& tierings, const NColumnShard::TTtl& ttl) { + if (tierings) { + OriginalTierings = std::move(*tierings); + } + auto copy = OriginalTierings; + ttl.AddTtls(copy); + NextCheckInstantForTierings = BuildNextInstantCheckers(std::move(copy)); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "RefreshTierings")("count", NextCheckInstantForTierings.size()); } } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index 0b2d0e5291..1aee3a86a6 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -4,6 +4,8 @@ #include "column_engine.h" #include <ydb/core/tx/columnshard/common/scalars.h> #include <ydb/core/tx/columnshard/counters/engine_logs.h> +#include <ydb/core/tx/columnshard/columnshard_ttl.h> +#include "scheme/tier_info.h" #include "storage/granule.h" #include "storage/storage.h" #include "changes/indexation.h" @@ -21,6 +23,51 @@ class TGranulesTable; class TColumnsTable; class TCountersTable; +class TEvictionsController { +public: + class TTieringWithPathId { + private: + const ui64 PathId; + TTiering TieringInfo; + public: + TTieringWithPathId(const ui64 pathId, TTiering&& tieringInfo) + : PathId(pathId) + , TieringInfo(std::move(tieringInfo)) + { + + } + + ui64 GetPathId() const { + return PathId; + } + + const TTiering& GetTieringInfo() const { + return TieringInfo; + } + }; +private: + THashMap<ui64, TTiering> OriginalTierings; + std::map<TMonotonic, std::vector<TTieringWithPathId>> NextCheckInstantForTTL; + std::map<TMonotonic, std::vector<TTieringWithPathId>> NextCheckInstantForTierings; + + std::map<TMonotonic, std::vector<TTieringWithPathId>> BuildNextInstantCheckers(THashMap<ui64, TTiering>&& info) { + std::map<TMonotonic, std::vector<TTieringWithPathId>> result; + std::vector<TTieringWithPathId> newTasks; + for (auto&& i : info) { + newTasks.emplace_back(i.first, std::move(i.second)); + } + result.emplace(TMonotonic::Zero(), std::move(newTasks)); + return result; + } +public: + std::map<TMonotonic, std::vector<TTieringWithPathId>>& MutableNextCheckInstantForTierings() { + return NextCheckInstantForTierings; + } + + void RefreshTierings(std::optional<THashMap<ui64, TTiering>>&& tierings, const NColumnShard::TTtl& ttl); +}; + + /// Engine with 2 tables: /// - Granules: PK -> granules (use part of PK) /// - Columns: granule -> blobs @@ -35,7 +82,20 @@ class TColumnEngineForLogs : public IColumnEngine { private: const NColumnShard::TEngineLogsCounters SignalCounters; std::shared_ptr<TGranulesStorage> GranulesStorage; - THashMap<ui64, TMonotonic> NextCheckInstantForTTL; + TEvictionsController EvictionsController; + class TTieringProcessContext { + public: + bool AllowEviction = true; + bool AllowDrop = true; + const TInstant Now; + const ui64 MaxEvictBytes; + std::shared_ptr<TTTLColumnEngineChanges> Changes; + std::map<ui64, TDuration> DurationsForced; + TTieringProcessContext(const ui64 maxEvictBytes, std::shared_ptr<TTTLColumnEngineChanges> changes); + }; + + TDuration ProcessTiering(const ui64 pathId, const TTiering& tiering, TTieringProcessContext& context) const; + bool DrainEvictionQueue(std::map<TMonotonic, std::vector<TEvictionsController::TTieringWithPathId>>& evictionsQueue, TTieringProcessContext& context) const; public: class TChangesConstructor : public TColumnEngineChanges { public: @@ -69,7 +129,7 @@ public: TColumnEngineForLogs(ui64 tabletId, const TCompactionLimits& limits = {}); - virtual void OnTieringModified(std::shared_ptr<NColumnShard::TTiersManager> manager) override; + virtual void OnTieringModified(std::shared_ptr<NColumnShard::TTiersManager> manager, const NColumnShard::TTtl& ttl) override; const TVersionedIndex& GetVersionedIndex() const override { return VersionedIndex; @@ -108,12 +168,11 @@ public: public: bool Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs, const THashSet<ui64>& pathsToDrop = {}) override; - std::shared_ptr<TInsertColumnEngineChanges> StartInsert(const TCompactionLimits& limits, std::vector<TInsertedData>&& dataToIndex) override; - std::shared_ptr<TCompactColumnEngineChanges> StartCompaction(std::unique_ptr<TCompactionInfo>&& compactionInfo, const TCompactionLimits& limits) override; - std::shared_ptr<TCleanupColumnEngineChanges> StartCleanup(const TSnapshot& snapshot, const TCompactionLimits& limits, THashSet<ui64>& pathsToDrop, - ui32 maxRecords) override; + std::shared_ptr<TInsertColumnEngineChanges> StartInsert(std::vector<TInsertedData>&& dataToIndex) noexcept override; + std::shared_ptr<TCompactColumnEngineChanges> StartCompaction(std::unique_ptr<TCompactionInfo>&& compactionInfo, const TCompactionLimits& limits) noexcept override; + std::shared_ptr<TCleanupColumnEngineChanges> StartCleanup(const TSnapshot& snapshot, THashSet<ui64>& pathsToDrop, ui32 maxRecords) noexcept override; std::shared_ptr<TTTLColumnEngineChanges> StartTtl(const THashMap<ui64, TTiering>& pathEviction, - ui64 maxEvictBytes = TCompactionLimits::DEFAULT_EVICTION_BYTES) override; + ui64 maxEvictBytes = TCompactionLimits::DEFAULT_EVICTION_BYTES) noexcept override; bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges, const TSnapshot& snapshot) noexcept override; diff --git a/ydb/core/tx/columnshard/engines/scheme/tier_info.cpp b/ydb/core/tx/columnshard/engines/scheme/tier_info.cpp index 1672d7a97a..ce84fe4c50 100644 --- a/ydb/core/tx/columnshard/engines/scheme/tier_info.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/tier_info.cpp @@ -18,4 +18,24 @@ std::optional<TInstant> TTierInfo::ScalarToInstant(const std::shared_ptr<arrow:: } } +std::shared_ptr<NKikimr::NOlap::TTierInfo> TTiering::GetMainTierInfo() const { + auto ttl = Ttl; + auto tier = OrderedTiers.size() ? OrderedTiers.begin()->GetPtr() : nullptr; + if (!ttl && !tier) { + return nullptr; + } else if (!tier) { + return ttl; + } else if (!ttl) { + return tier; + } else { + const TDuration ttlDuration = ttl->GetEvictDuration(); + const TDuration tierDuration = tier->GetEvictDuration(); + if (tierDuration < ttlDuration) { + return tier; + } else { + return ttl; + } + } +} + } diff --git a/ydb/core/tx/columnshard/engines/scheme/tier_info.h b/ydb/core/tx/columnshard/engines/scheme/tier_info.h index bf5e8980cb..4ec4c3233b 100644 --- a/ydb/core/tx/columnshard/engines/scheme/tier_info.h +++ b/ydb/core/tx/columnshard/engines/scheme/tier_info.h @@ -15,20 +15,20 @@ class TTierInfo { private: TString Name; TString EvictColumnName; - TInstant EvictBorder; + TDuration EvictDuration; bool NeedExport = false; ui32 TtlUnitsInSecond; std::optional<NArrow::TCompression> Compression; public: - TTierInfo(const TString& tierName, TInstant evictBorder, const TString& column, ui32 unitsInSecond = 0) + TTierInfo(const TString& tierName, TDuration evictDuration, const TString& column, ui32 unitsInSecond = 0) : Name(tierName) , EvictColumnName(column) - , EvictBorder(evictBorder) + , EvictDuration(evictDuration) , TtlUnitsInSecond(unitsInSecond) { - Y_VERIFY(!Name.empty()); - Y_VERIFY(!EvictColumnName.empty()); + Y_VERIFY(!!Name); + Y_VERIFY(!!EvictColumnName); } const TString& GetName() const { @@ -39,8 +39,12 @@ public: return EvictColumnName; } - const TInstant GetEvictBorder() const { - return EvictBorder; + TInstant GetEvictInstant(const TInstant now) const { + return now - EvictDuration; + } + + TDuration GetEvictDuration() const { + return EvictDuration; } bool GetNeedExport() const { @@ -70,18 +74,19 @@ public: std::optional<TInstant> ScalarToInstant(const std::shared_ptr<arrow::Scalar>& scalar) const; - static std::shared_ptr<TTierInfo> MakeTtl(TInstant ttlBorder, const TString& ttlColumn, ui32 unitsInSecond = 0) { - return std::make_shared<TTierInfo>("TTL", ttlBorder, ttlColumn, unitsInSecond); + static std::shared_ptr<TTierInfo> MakeTtl(const TDuration evictDuration, const TString& ttlColumn, ui32 unitsInSecond = 0) { + return std::make_shared<TTierInfo>("TTL", evictDuration, ttlColumn, unitsInSecond); } TString GetDebugString() const { TStringBuilder sb; - sb << "tier name '" << Name << "' border '" << EvictBorder << "' column '" << EvictColumnName << "' "; + sb << "name=" << Name << ";duration=" << EvictDuration << ";column=" << EvictColumnName << ";compression="; if (Compression) { sb << Compression->DebugString(); } else { sb << "NOT_SPECIFIED(Default)"; } + sb << ";"; return sb; } }; @@ -95,16 +100,16 @@ public: } bool operator < (const TTierRef& b) const { - if (Info->GetEvictBorder() < b.Info->GetEvictBorder()) { + if (Info->GetEvictDuration() > b.Info->GetEvictDuration()) { return true; - } else if (Info->GetEvictBorder() == b.Info->GetEvictBorder()) { + } else if (Info->GetEvictDuration() == b.Info->GetEvictDuration()) { return Info->GetName() > b.Info->GetName(); // add stability: smaller name is hotter } return false; } bool operator == (const TTierRef& b) const { - return Info->GetEvictBorder() == b.Info->GetEvictBorder() + return Info->GetEvictDuration() == b.Info->GetEvictDuration() && Info->GetName() == b.Info->GetName(); } @@ -126,25 +131,7 @@ class TTiering { TSet<TTierRef> OrderedTiers; public: - std::shared_ptr<TTierInfo> GetMainTierInfo() const { - auto ttl = Ttl; - auto tier = OrderedTiers.size() ? OrderedTiers.begin()->GetPtr() : nullptr; - if (!ttl && !tier) { - return nullptr; - } else if (!tier) { - return ttl; - } else if (!ttl) { - return tier; - } else { - const TInstant ttlInstant = ttl->GetEvictBorder(); - const TInstant tierInstant = tier->GetEvictBorder(); - if (ttlInstant < tierInstant) { - return tier; - } else { - return ttl; - } - } - } + std::shared_ptr<TTierInfo> GetMainTierInfo() const; std::shared_ptr<TTierInfo> Ttl; @@ -186,12 +173,12 @@ public: } } - std::optional<TInstant> GetEvictBorder() const { + std::optional<TInstant> GetEvictInstant(const TInstant now) const { auto mainTier = GetMainTierInfo(); if (!mainTier) { return {}; } else { - return mainTier->GetEvictBorder(); + return mainTier->GetEvictInstant(now); } } diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp index 2169430819..3631138ec8 100644 --- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp @@ -260,7 +260,7 @@ TCompactionLimits TestLimits() { bool Insert(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, std::vector<TInsertedData>&& dataToIndex, THashMap<TBlobRange, TString>& blobs, ui32& step) { - std::shared_ptr<TInsertColumnEngineChanges> changes = engine.StartInsert(TestLimits(), std::move(dataToIndex)); + std::shared_ptr<TInsertColumnEngineChanges> changes = engine.StartInsert(std::move(dataToIndex)); if (!changes) { return false; } @@ -312,7 +312,7 @@ bool Compact(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, T bool Cleanup(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, ui32 expectedToDrop) { THashSet<ui64> pathsToDrop; - std::shared_ptr<TCleanupColumnEngineChanges> changes = engine.StartCleanup(snap, TestLimits(), pathsToDrop, 1000); + std::shared_ptr<TCleanupColumnEngineChanges> changes = engine.StartCleanup(snap, pathsToDrop, 1000); UNIT_ASSERT(changes); UNIT_ASSERT_VALUES_EQUAL(changes->PortionsToDrop.size(), expectedToDrop); @@ -700,7 +700,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { std::shared_ptr<arrow::DataType> ttlColType = arrow::timestamp(arrow::TimeUnit::MICRO); THashMap<ui64, NOlap::TTiering> pathTtls; NOlap::TTiering tiering; - tiering.Ttl = NOlap::TTierInfo::MakeTtl(TInstant::MicroSeconds(10000), "timestamp"); + tiering.Ttl = NOlap::TTierInfo::MakeTtl(TDuration::MicroSeconds(TInstant::Now().MicroSeconds() - 10000), "timestamp"); pathTtls.emplace(pathId, std::move(tiering)); Ttl(engine, db, pathTtls, 2); diff --git a/ydb/core/tx/columnshard/eviction_actor.cpp b/ydb/core/tx/columnshard/eviction_actor.cpp index 82621bf861..40fcffde44 100644 --- a/ydb/core/tx/columnshard/eviction_actor.cpp +++ b/ydb/core/tx/columnshard/eviction_actor.cpp @@ -132,7 +132,7 @@ private: auto guard = TxEvent->PutResult->StartCpuGuard(); TxEvent->IndexChanges->SetBlobs(std::move(Blobs)); - NOlap::TConstructionContext context(TxEvent->IndexInfo, TxEvent->Tiering, Counters); + NOlap::TConstructionContext context(TxEvent->IndexInfo, Counters); TxEvent->Blobs = std::move(TxEvent->IndexChanges->ConstructBlobs(context).DetachResult()); if (TxEvent->Blobs.empty()) { TxEvent->SetPutStatus(NKikimrProto::OK); diff --git a/ydb/core/tx/columnshard/indexing_actor.cpp b/ydb/core/tx/columnshard/indexing_actor.cpp index e320d93558..c882695dc7 100644 --- a/ydb/core/tx/columnshard/indexing_actor.cpp +++ b/ydb/core/tx/columnshard/indexing_actor.cpp @@ -127,7 +127,7 @@ private: virtual bool DoExecute() override { auto guard = TxEvent->PutResult->StartCpuGuard(); - NOlap::TConstructionContext context(TxEvent->IndexInfo, TxEvent->Tiering, Counters); + NOlap::TConstructionContext context(TxEvent->IndexInfo, Counters); TxEvent->Blobs = std::move(TxEvent->IndexChanges->ConstructBlobs(context).DetachResult()); return true; } diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp index d92c226455..2acde90fb1 100644 --- a/ydb/core/tx/columnshard/tables_manager.cpp +++ b/ydb/core/tx/columnshard/tables_manager.cpp @@ -183,15 +183,14 @@ ui64 TTablesManager::GetMemoryUsage() const { return memory; } -void TTablesManager::OnTtlUpdate() { - Ttl.Repeat(); -} - void TTablesManager::DropTable(const ui64 pathId, const TRowVersion& version, NIceDb::TNiceDb& db) { auto& table = Tables.at(pathId); table.SetDropVersion(version); PathsToDrop.insert(pathId); Ttl.DropPathTtl(pathId); + if (PrimaryIndex) { + PrimaryIndex->OnTieringModified(nullptr, Ttl); + } Schema::SaveTableDropVersion(db, pathId, version.Step, version.TxId); } @@ -261,6 +260,10 @@ void TTablesManager::AddTableVersion(const ui64 pathId, const TRowVersion& versi } else { Ttl.DropPathTtl(pathId); } + if (PrimaryIndex) { + PrimaryIndex->OnTieringModified(nullptr, Ttl); + } + } Schema::SaveTableVersionInfo(db, pathId, version, versionInfo); table.AddVersion(version, versionInfo); @@ -278,6 +281,7 @@ void TTablesManager::IndexSchemaVersion(const TRowVersion& version, const NKikim Y_VERIFY(lastIndexInfo.GetIndexKey()->Equals(indexInfo.GetIndexKey())); } PrimaryIndex->UpdateDefaultSchema(snapshot, std::move(indexInfo)); + PrimaryIndex->OnTieringModified(nullptr, Ttl); for (auto& columnName : Ttl.TtlColumns()) { PrimaryIndex->GetVersionedIndex().GetLastSchema()->GetIndexInfo().CheckTtlColumn(columnName); diff --git a/ydb/core/tx/columnshard/tables_manager.h b/ydb/core/tx/columnshard/tables_manager.h index 654901d13f..e5d5e0f308 100644 --- a/ydb/core/tx/columnshard/tables_manager.h +++ b/ydb/core/tx/columnshard/tables_manager.h @@ -142,8 +142,8 @@ public: return Ttl; } - void AddTtls(THashMap<ui64, NOlap::TTiering>& eviction, TInstant now, bool force) { - Ttl.AddTtls(eviction, now, force); + void AddTtls(THashMap<ui64, NOlap::TTiering>& eviction) { + Ttl.AddTtls(eviction); } const THashSet<ui64>& GetPathsToDrop() const { @@ -209,8 +209,6 @@ public: void AddPresetVersion(const ui32 presetId, const TRowVersion& version, const NKikimrSchemeOp::TColumnTableSchema& schema, NIceDb::TNiceDb& db); void AddTableVersion(const ui64 pathId, const TRowVersion& version, const TTableInfo::TTableVersionInfo& versionInfo, NIceDb::TNiceDb& db); - - void OnTtlUpdate(); private: void IndexSchemaVersion(const TRowVersion& version, const NKikimrSchemeOp::TColumnTableSchema& schema); static NOlap::TIndexInfo DeserializeIndexInfoFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema); diff --git a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp index 1b388028c2..c70ec3a568 100644 --- a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp @@ -657,14 +657,12 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt } } if (i) { - ui32 version = i + 1; - { - const bool ok = ProposeSchemaTx(runtime, sender, - TTestSchema::AlterTableTxBody(tableId, version, specs[i]), - NOlap::TSnapshot(++planStep, ++txId)); - UNIT_ASSERT(ok); - PlanSchemaTx(runtime, sender, NOlap::TSnapshot(planStep, txId)); - } + const ui32 version = i + 1; + const bool ok = ProposeSchemaTx(runtime, sender, + TTestSchema::AlterTableTxBody(tableId, version, specs[i]), + NOlap::TSnapshot(++planStep, ++txId)); + UNIT_ASSERT(ok); + PlanSchemaTx(runtime, sender, NOlap::TSnapshot(planStep, txId)); } if (specs[i].HasTiers() || reboots) { ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(specs[i])); @@ -700,7 +698,7 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep - 1, Max<ui64>(), tableId); Proto(read.get()).AddColumnNames(specs[i].TtlColumn); - counter.CaptureReadEvents = 1; // TODO: we need affected by tiering blob here + counter.CaptureReadEvents = specs[i].WaitEmptyAfter ? 0 : 1; // TODO: we need affected by tiering blob here ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release()); counter.WaitReadsCaptured(runtime); } @@ -907,6 +905,7 @@ std::vector<std::pair<ui32, ui64>> TestTiersAndTtl(const TTestSchema::TTableSpec THashSet<ui32> forgets; if (testTtl) { changes.AddTtlAlters(spec, {allowBoth, allowOne, allowNone}, alters); + alters.back().WaitEmptyAfter = true; } else { changes.AddTierAlters(spec, {allowBoth, allowOne, allowNone}, alters); diff --git a/ydb/core/tx/tiering/rule/object.cpp b/ydb/core/tx/tiering/rule/object.cpp index 12f5981d7b..64aa4ba586 100644 --- a/ydb/core/tx/tiering/rule/object.cpp +++ b/ydb/core/tx/tiering/rule/object.cpp @@ -73,10 +73,8 @@ bool TTieringRule::DeserializeFromRecord(const TDecoder& decoder, const Ydb::Val NKikimr::NOlap::TTiering TTieringRule::BuildOlapTiers() const { NOlap::TTiering result; - TInstant now = Now(); // Do not put it in cycle: prevent tiers reorder with the same eviction time for (auto&& r : Intervals) { - TInstant evictionBorder = now - r.GetDurationForEvict(); - result.Add(std::make_shared<NOlap::TTierInfo>(r.GetTierName(), evictionBorder, GetDefaultColumn())); + result.Add(std::make_shared<NOlap::TTierInfo>(r.GetTierName(), r.GetDurationForEvict(), GetDefaultColumn())); } return result; } diff --git a/ydb/core/tx/tiering/ut/ut_tiers.cpp b/ydb/core/tx/tiering/ut/ut_tiers.cpp index a743adc90b..59662eeb73 100644 --- a/ydb/core/tx/tiering/ut/ut_tiers.cpp +++ b/ydb/core/tx/tiering/ut/ut_tiers.cpp @@ -533,7 +533,9 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { Cerr << "Wait tables" << Endl; runtime.SimulateSleep(TDuration::Seconds(20)); Cerr << "Initialization tables" << Endl; - const TInstant pkStart = Now() - TDuration::Days(15); + const TInstant now = Now() - TDuration::Days(100); + runtime.UpdateCurrentTime(now); + const TInstant pkStart = now - TDuration::Days(15); auto batch = lHelper.TestArrowBatch(0, pkStart.GetValue(), 6000); auto batchSize = NArrow::GetBatchDataSize(batch); @@ -867,11 +869,12 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { const ui32 reduceStepsCount = 1; for (ui32 i = 0; i < reduceStepsCount; ++i) { runtime.AdvanceCurrentTime(TDuration::Seconds(numRecords * (i + 1) / reduceStepsCount + 500000)); - const TInstant start = TInstant::Now(); const ui64 purposeSize = 800000000.0 * (1 - 1.0 * (i + 1) / reduceStepsCount); const ui64 purposeRecords = numRecords * (1 - 1.0 * (i + 1) / reduceStepsCount); const ui64 purposeMinTimestamp = numRecords * 1.0 * (i + 1) / reduceStepsCount * 1000000; + const TInstant start = TInstant::Now(); while (bsCollector.GetChannelSize(2) > purposeSize && TInstant::Now() - start < TDuration::Seconds(60)) { + runtime.AdvanceCurrentTime(TDuration::Minutes(6)); runtime.SimulateSleep(TDuration::Seconds(1)); } Cerr << bsCollector.GetChannelSize(2) << "/" << purposeSize << Endl; |