aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-07-28 11:40:25 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-07-28 11:40:25 +0300
commit7ea9b705953267d7fbc2e21b30451f84ebf519bb (patch)
tree96dd7a5a52d5459f366f85e4a0e52b3160478121
parentf573bf9c678a6748ab2b5e6eb4374986af7fae84 (diff)
downloadydb-7ea9b705953267d7fbc2e21b30451f84ebf519bb.tar.gz
KIKIMR-18853: fix ttl force sometimes started
-rw-r--r--ydb/core/tx/columnshard/columnshard.cpp13
-rw-r--r--ydb/core/tx/columnshard/columnshard__propose_transaction.cpp3
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp42
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h11
-rw-r--r--ydb/core/tx/columnshard/columnshard_private_events.h6
-rw-r--r--ydb/core/tx/columnshard/columnshard_ttl.h26
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.h1
-rw-r--r--ydb/core/tx/columnshard/compaction_actor.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/changes/abstract.h19
-rw-r--r--ydb/core/tx/columnshard/engines/changes/indexation.cpp1
-rw-r--r--ydb/core/tx/columnshard/engines/changes/ttl.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/changes/ttl.h3
-rw-r--r--ydb/core/tx/columnshard/engines/changes/with_appended.cpp8
-rw-r--r--ydb/core/tx/columnshard/engines/changes/with_appended.h6
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h13
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp257
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h73
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/tier_info.cpp20
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/tier_info.h55
-rw-r--r--ydb/core/tx/columnshard/engines/ut_logs_engine.cpp6
-rw-r--r--ydb/core/tx/columnshard/eviction_actor.cpp2
-rw-r--r--ydb/core/tx/columnshard/indexing_actor.cpp2
-rw-r--r--ydb/core/tx/columnshard/tables_manager.cpp12
-rw-r--r--ydb/core/tx/columnshard/tables_manager.h6
-rw-r--r--ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp17
-rw-r--r--ydb/core/tx/tiering/rule/object.cpp4
-rw-r--r--ydb/core/tx/tiering/ut/ut_tiers.cpp7
27 files changed, 349 insertions, 268 deletions
diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp
index ea961de553..32792aed4c 100644
--- a/ydb/core/tx/columnshard/columnshard.cpp
+++ b/ydb/core/tx/columnshard/columnshard.cpp
@@ -129,16 +129,15 @@ void TColumnShard::Handle(TEvPrivate::TEvReadFinished::TPtr& ev, const TActorCon
void TColumnShard::Handle(TEvPrivate::TEvPeriodicWakeup::TPtr& ev, const TActorContext& ctx) {
if (ev->Get()->Manual) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "TEvPrivate::TEvPeriodicWakeup::MANUAL")("tablet_id", TabletID());
EnqueueBackgroundActivities();
- return;
- }
-
- if (LastPeriodicBackActivation < TInstant::Now() - ActivationPeriod) {
+ } else {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "TEvPrivate::TEvPeriodicWakeup")("tablet_id", TabletID());
SendWaitPlanStep(GetOutdatedStep());
- }
- SendPeriodicStats();
- ctx.Schedule(ActivationPeriod, new TEvPrivate::TEvPeriodicWakeup());
+ SendPeriodicStats();
+ ctx.Schedule(ActivationPeriod, new TEvPrivate::TEvPeriodicWakeup());
+ }
}
void TColumnShard::Handle(TEvMediatorTimecast::TEvRegisterTabletResult::TPtr& ev, const TActorContext&) {
diff --git a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
index e713d87e0a..1579fc586e 100644
--- a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
+++ b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
@@ -250,9 +250,10 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex
}
if (statusMessage.empty()) {
+ const TInstant now = TlsActivationContext ? AppData()->TimeProvider->Now() : TInstant::Now();
for (ui64 pathId : ttlBody.GetPathIds()) {
NOlap::TTiering tiering;
- tiering.Ttl = NOlap::TTierInfo::MakeTtl(unixTime, columnName);
+ tiering.Ttl = NOlap::TTierInfo::MakeTtl(now - unixTime, columnName);
pathTtls.emplace(pathId, std::move(tiering));
}
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index ff0be97c5e..20594b328d 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -557,7 +557,6 @@ void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterP
tableVerProto.SetSchemaPresetVersionAdj(alterProto.GetSchemaPresetVersionAdj());
TablesManager.AddTableVersion(pathId, version, tableVerProto, db);
- TablesManager.OnTtlUpdate();
}
void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProto, const TRowVersion& version,
@@ -619,13 +618,7 @@ void TColumnShard::ScheduleNextGC(const TActorContext& ctx, bool cleanupOnly) {
void TColumnShard::EnqueueBackgroundActivities(bool periodic, TBackgroundActivity activity) {
TLogContextGuard gLogging(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletID()));
- if (periodic) {
- if (LastPeriodicBackActivation > TInstant::Now() - ActivationPeriod) {
- CSCounters.OnTooEarly();
- return;
- }
- LastPeriodicBackActivation = TInstant::Now();
- }
+ ACFL_DEBUG("event", "EnqueueBackgroundActivities")("periodic", periodic)("activity", activity.DebugString());
CSCounters.OnStartBackground();
SendPeriodicStats();
@@ -743,7 +736,7 @@ void TColumnShard::SetupIndexation() {
}
Y_VERIFY(data.size());
- auto indexChanges = TablesManager.MutablePrimaryIndex().StartInsert(CompactionLimits.Get(), std::move(data));
+ auto indexChanges = TablesManager.MutablePrimaryIndex().StartInsert(std::move(data));
if (!indexChanges) {
LOG_S_NOTICE("Cannot prepare indexing at tablet " << TabletID());
return;
@@ -753,9 +746,6 @@ void TColumnShard::SetupIndexation() {
indexChanges->Start(*this);
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges,
Settings.CacheDataAfterIndexing, std::move(cachedBlobs));
- if (Tiers) {
- ev->SetTiering(Tiers->GetTiering());
- }
ActorContext().Send(IndexingActor, std::make_unique<TEvPrivate::TEvIndexing>(std::move(ev)));
}
@@ -788,10 +778,6 @@ void TColumnShard::SetupCompaction() {
auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndex();
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, Settings.CacheDataAfterCompaction);
- if (Tiers) {
- ev->SetTiering(Tiers->GetTiering());
- }
-
ActorContext().Send(CompactionActor, std::make_unique<TEvPrivate::TEvCompaction>(std::move(ev), *BlobManager));
}
@@ -810,23 +796,9 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u
return {};
}
if (force) {
- TablesManager.MutablePrimaryIndex().OnTieringModified(Tiers);
+ TablesManager.MutablePrimaryIndex().OnTieringModified(Tiers, TablesManager.GetTtl());
}
THashMap<ui64, NOlap::TTiering> eviction = pathTtls;
- if (eviction.empty()) {
- if (Tiers) {
- eviction = Tiers->GetTiering();
- }
- TablesManager.AddTtls(eviction, AppData()->TimeProvider->Now(), force);
- }
-
- if (eviction.empty()) {
- if (Tiers || TablesManager.GetTtl().PathsCount()) {
- LOG_S_DEBUG("TTL not started. No tables to activate it on (or delayed) at tablet " << TabletID());
- }
- return {};
- }
-
for (auto&& i : eviction) {
LOG_S_DEBUG("Prepare TTL evicting path " << i.first << " with " << i.second.GetDebugString()
<< " at tablet " << TabletID());
@@ -839,16 +811,12 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u
LOG_S_INFO("Cannot prepare TTL at tablet " << TabletID());
return {};
}
- if (indexChanges->NeedRepeat) {
- TablesManager.OnTtlUpdate();
- }
bool needWrites = !indexChanges->PortionsToEvict.empty();
LOG_S_INFO("TTL" << (needWrites ? " with writes" : "" ) << " prepared at tablet " << TabletID());
indexChanges->Start(*this);
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, false);
- ev->SetTiering(eviction);
return std::make_unique<TEvPrivate::TEvEviction>(std::move(ev), *BlobManager, needWrites);
}
@@ -862,7 +830,7 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() {
NOlap::TSnapshot cleanupSnapshot{GetMinReadStep(), 0};
auto changes =
- TablesManager.MutablePrimaryIndex().StartCleanup(cleanupSnapshot, CompactionLimits.Get(), TablesManager.MutablePathsToDrop(), TLimits::MAX_TX_RECORDS);
+ TablesManager.MutablePrimaryIndex().StartCleanup(cleanupSnapshot, TablesManager.MutablePathsToDrop(), TLimits::MAX_TX_RECORDS);
if (!changes) {
LOG_S_INFO("Cannot prepare cleanup at tablet " << TabletID());
return {};
@@ -1096,7 +1064,7 @@ void TColumnShard::ActivateTiering(const ui64 pathId, const TString& useTiering)
if (!Tiers) {
Tiers = std::make_shared<TTiersManager>(TabletID(), SelfId(),
[this](const TActorContext& ctx){
- TablesManager.MutablePrimaryIndex().OnTieringModified(Tiers);
+ TablesManager.MutablePrimaryIndex().OnTieringModified(Tiers, TablesManager.GetTtl());
CleanForgottenBlobs(ctx);
Reexport(ctx);
});
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index 9c2b425dcf..7abcc80602 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -106,6 +106,16 @@ public:
bool HasTtl() const { return Activity & TTL; }
bool HasAll() const { return Activity == ALL; }
+ TString DebugString() const {
+ return TStringBuilder()
+ << "indexation:" << HasIndexation() << ";"
+ << "compaction:" << HasCompaction() << ";"
+ << "cleanup:" << HasCleanup() << ";"
+ << "ttl:" << HasTtl() << ";"
+ ;
+
+ }
+
private:
EBackActivity Activity = NONE;
@@ -494,7 +504,6 @@ private:
TDuration FailActivationDelay = TDuration::Seconds(1);
TDuration StatsReportInterval = TDuration::Seconds(10);
TInstant LastAccessTime;
- TInstant LastPeriodicBackActivation;
TInstant LastStatsReport;
TActorId IndexingActor; // It's logically bounded to 1: we move each portion of data to multiple indices.
diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h
index 55050c38dc..396376c921 100644
--- a/ydb/core/tx/columnshard/columnshard_private_events.h
+++ b/ydb/core/tx/columnshard/columnshard_private_events.h
@@ -30,7 +30,6 @@ struct TEvPrivate {
/// Common event for Indexing and GranuleCompaction: write index data in TTxWriteIndex transaction.
struct TEvWriteIndex : public TEventLocal<TEvWriteIndex, EvWriteIndex> {
NOlap::TVersionedIndex IndexInfo;
- THashMap<ui64, NKikimr::NOlap::TTiering> Tiering;
std::shared_ptr<NOlap::TColumnEngineChanges> IndexChanges;
THashMap<TUnifiedBlobId, std::shared_ptr<arrow::RecordBatch>> CachedBlobs;
std::vector<TString> Blobs;
@@ -53,11 +52,6 @@ struct TEvPrivate {
PutResult = std::make_shared<TBlobPutResult>(NKikimrProto::UNKNOWN);
}
- TEvWriteIndex& SetTiering(const THashMap<ui64, NKikimr::NOlap::TTiering>& tiering) {
- Tiering = tiering;
- return *this;
- }
-
const TBlobPutResult& GetPutResult() const {
Y_VERIFY(PutResult);
return *PutResult;
diff --git a/ydb/core/tx/columnshard/columnshard_ttl.h b/ydb/core/tx/columnshard/columnshard_ttl.h
index 0201d857b3..469ade115f 100644
--- a/ydb/core/tx/columnshard/columnshard_ttl.h
+++ b/ydb/core/tx/columnshard/columnshard_ttl.h
@@ -5,9 +5,6 @@ namespace NKikimr::NColumnShard {
class TTtl {
public:
- static constexpr const ui64 DEFAULT_TTL_TIMEOUT_SEC = 60 * 60;
- static constexpr const ui64 DEFAULT_REPEAT_TTL_TIMEOUT_SEC = 10;
-
struct TEviction {
TDuration EvictAfter;
TString ColumnName;
@@ -68,21 +65,10 @@ public:
PathTtls.erase(pathId);
}
- void AddTtls(THashMap<ui64, NOlap::TTiering>& eviction, TInstant now, bool force = false) {
- if ((now < LastRegularTtl + TtlTimeout) && !force) {
- return;
- }
-
+ void AddTtls(THashMap<ui64, NOlap::TTiering>& eviction) const {
for (auto& [pathId, descr] : PathTtls) {
- eviction[pathId].Ttl = Convert(descr, now);
+ eviction[pathId].Ttl = Convert(descr);
}
-
- LastRegularTtl = now;
- }
-
- void Repeat() {
- LastRegularTtl -= TtlTimeout;
- LastRegularTtl += RepeatTtlTimeout;
}
const THashSet<TString>& TtlColumns() const { return Columns; }
@@ -90,16 +76,12 @@ public:
private:
THashMap<ui64, TDescription> PathTtls; // pathId -> ttl
THashSet<TString> Columns;
- TDuration TtlTimeout{TDuration::Seconds(DEFAULT_TTL_TIMEOUT_SEC)};
- TDuration RepeatTtlTimeout{TDuration::Seconds(DEFAULT_REPEAT_TTL_TIMEOUT_SEC)};
- TInstant LastRegularTtl;
- std::shared_ptr<NOlap::TTierInfo> Convert(const TDescription& descr, TInstant timePoint) const
+ std::shared_ptr<NOlap::TTierInfo> Convert(const TDescription& descr) const
{
if (descr.Eviction) {
auto& evict = descr.Eviction;
- TInstant border = timePoint - evict->EvictAfter;
- return NOlap::TTierInfo::MakeTtl(border, evict->ColumnName, evict->UnitsInSecond);
+ return NOlap::TTierInfo::MakeTtl(evict->EvictAfter, evict->ColumnName, evict->UnitsInSecond);
}
return {};
}
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h
index 10a2aff00c..ed09885ace 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.h
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.h
@@ -106,6 +106,7 @@ struct TTestSchema {
struct TTableSpecials : public TStorageTier {
std::vector<TStorageTier> Tiers;
bool CompositeMarks = false;
+ bool WaitEmptyAfter = false;
TTableSpecials() noexcept = default;
diff --git a/ydb/core/tx/columnshard/compaction_actor.cpp b/ydb/core/tx/columnshard/compaction_actor.cpp
index 149704ea01..d91f8053af 100644
--- a/ydb/core/tx/columnshard/compaction_actor.cpp
+++ b/ydb/core/tx/columnshard/compaction_actor.cpp
@@ -134,7 +134,7 @@ private:
virtual bool DoExecute() override {
auto guard = TxEvent->PutResult->StartCpuGuard();
- NOlap::TConstructionContext context(TxEvent->IndexInfo, TxEvent->Tiering, Counters);
+ NOlap::TConstructionContext context(TxEvent->IndexInfo, Counters);
TxEvent->Blobs = std::move(TxEvent->IndexChanges->ConstructBlobs(context).DetachResult());
return true;
}
diff --git a/ydb/core/tx/columnshard/engines/changes/abstract.h b/ydb/core/tx/columnshard/engines/changes/abstract.h
index 8f7b76b514..3df0bcc5ee 100644
--- a/ydb/core/tx/columnshard/engines/changes/abstract.h
+++ b/ydb/core/tx/columnshard/engines/changes/abstract.h
@@ -137,33 +137,14 @@ public:
class TConstructionContext: TNonCopyable {
public:
- using TTieringsHash = THashMap<ui64, NKikimr::NOlap::TTiering>;
-private:
- const TTieringsHash* TieringMap = nullptr;
-public:
const TVersionedIndex& SchemaVersions;
const NColumnShard::TIndexationCounters Counters;
- TConstructionContext(const TVersionedIndex& schemaVersions, const TTieringsHash& tieringMap, const NColumnShard::TIndexationCounters counters)
- : TieringMap(&tieringMap)
- , SchemaVersions(schemaVersions)
- , Counters(counters)
- {
-
- }
-
TConstructionContext(const TVersionedIndex& schemaVersions, const NColumnShard::TIndexationCounters counters)
: SchemaVersions(schemaVersions)
, Counters(counters) {
}
-
- const THashMap<ui64, NKikimr::NOlap::TTiering>& GetTieringMap() const {
- if (TieringMap) {
- return *TieringMap;
- }
- return Default<THashMap<ui64, NKikimr::NOlap::TTiering>>();
- }
};
class TGranuleMeta;
diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp
index fe4c8b7ca7..f5f3741284 100644
--- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp
@@ -56,6 +56,7 @@ bool TInsertColumnEngineChanges::AddPathIfNotExists(ui64 pathId) {
}
void TInsertColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) {
+ TBase::DoStart(self);
self.BackgroundController.StartIndexing();
}
diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.cpp b/ydb/core/tx/columnshard/engines/changes/ttl.cpp
index dc5cc467c6..10f0121032 100644
--- a/ydb/core/tx/columnshard/engines/changes/ttl.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/ttl.cpp
@@ -174,7 +174,7 @@ bool TTTLColumnEngineChanges::UpdateEvictedPortion(TPortionInfo& portionInfo, TP
TConstructionContext& context) const {
Y_VERIFY(portionInfo.TierName != evictFeatures.TargetTierName);
- auto* tiering = context.GetTieringMap().FindPtr(evictFeatures.PathId);
+ auto* tiering = Tiering.FindPtr(evictFeatures.PathId);
Y_VERIFY(tiering);
auto compression = tiering->GetCompression(evictFeatures.TargetTierName);
if (!compression) {
diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.h b/ydb/core/tx/columnshard/engines/changes/ttl.h
index 3562e9494b..8745a1afe6 100644
--- a/ydb/core/tx/columnshard/engines/changes/ttl.h
+++ b/ydb/core/tx/columnshard/engines/changes/ttl.h
@@ -1,5 +1,6 @@
#pragma once
#include "cleanup.h"
+#include <ydb/core/tx/columnshard/engines/scheme/tier_info.h>
namespace NKikimr::NOlap {
@@ -9,6 +10,7 @@ private:
using TBase = TCleanupColumnEngineChanges;
THashMap<TString, TPathIdBlobs> ExportTierBlobs;
ui64 ExportNo = 0;
+
bool UpdateEvictedPortion(TPortionInfo& portionInfo,
TPortionEvictionFeatures& evictFeatures, const THashMap<TBlobRange, TString>& srcBlobs,
std::vector<TColumnRecord>& evictedRecords, std::vector<TString>& newBlobs, TConstructionContext& context) const;
@@ -27,6 +29,7 @@ protected:
}
public:
std::vector<TColumnRecord> EvictedRecords;
+ THashMap<ui64, NOlap::TTiering> Tiering;
std::vector<std::pair<TPortionInfo, TPortionEvictionFeatures>> PortionsToEvict; // {portion, TPortionEvictionFeatures}
virtual THashMap<TUnifiedBlobId, std::vector<TBlobRange>> GetGroupedBlobRanges() const override;
diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
index ac01441408..95928fc715 100644
--- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
@@ -113,7 +113,7 @@ std::vector<NKikimr::NOlap::TPortionInfo> TChangesWithAppend::MakeAppendedPortio
TString tierName;
std::optional<NArrow::TCompression> compression;
if (pathId) {
- if (auto* tiering = context.GetTieringMap().FindPtr(pathId)) {
+ if (auto* tiering = TieringInfo.FindPtr(pathId)) {
tierName = tiering->GetHottestTierName();
if (const auto& tierCompression = tiering->GetCompression(tierName)) {
compression = *tierCompression;
@@ -145,4 +145,10 @@ std::vector<NKikimr::NOlap::TPortionInfo> TChangesWithAppend::MakeAppendedPortio
return out;
}
+void TChangesWithAppend::DoStart(NColumnShard::TColumnShard& self) {
+ if (self.Tiers) {
+ TieringInfo = self.Tiers->GetTiering();
+ }
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.h b/ydb/core/tx/columnshard/engines/changes/with_appended.h
index d526da6f5b..00e1ebbd91 100644
--- a/ydb/core/tx/columnshard/engines/changes/with_appended.h
+++ b/ydb/core/tx/columnshard/engines/changes/with_appended.h
@@ -5,6 +5,8 @@
namespace NKikimr::NOlap {
class TChangesWithAppend: public TColumnEngineChanges {
+private:
+ THashMap<ui64, NOlap::TTiering> TieringInfo;
protected:
virtual void DoDebugString(TStringOutput& out) const override;
virtual void DoCompile(TFinalizationContext& context) override;
@@ -13,9 +15,7 @@ protected:
virtual void DoWriteIndexComplete(NColumnShard::TColumnShard& /*self*/, TWriteIndexCompleteContext& /*context*/) override {
}
- virtual void DoStart(NColumnShard::TColumnShard& /*self*/) override {
-
- }
+ virtual void DoStart(NColumnShard::TColumnShard& self) override;
std::vector<TPortionInfo> MakeAppendedPortions(const ui64 pathId,
const std::shared_ptr<arrow::RecordBatch> batch,
const ui64 granule,
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index fae205043e..1f48fce0fa 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -9,6 +9,7 @@
namespace NKikimr::NColumnShard {
class TTiersManager;
+class TTtl;
}
namespace NKikimr::NOlap {
@@ -358,13 +359,13 @@ public:
const THashSet<ui32>& columnIds,
const TPKRangesFilter& pkRangesFilter) const = 0;
virtual std::unique_ptr<TCompactionInfo> Compact(const TCompactionLimits& limits, const THashSet<ui64>& busyGranuleIds) = 0;
- virtual std::shared_ptr<TInsertColumnEngineChanges> StartInsert(const TCompactionLimits& limits, std::vector<TInsertedData>&& dataToIndex) = 0;
+ virtual std::shared_ptr<TInsertColumnEngineChanges> StartInsert(std::vector<TInsertedData>&& dataToIndex) noexcept = 0;
virtual std::shared_ptr<TCompactColumnEngineChanges> StartCompaction(std::unique_ptr<TCompactionInfo>&& compactionInfo,
- const TCompactionLimits& limits) = 0;
- virtual std::shared_ptr<TCleanupColumnEngineChanges> StartCleanup(const TSnapshot& snapshot, const TCompactionLimits& limits, THashSet<ui64>& pathsToDrop,
- ui32 maxRecords) = 0;
+ const TCompactionLimits& limits) noexcept = 0;
+ virtual std::shared_ptr<TCleanupColumnEngineChanges> StartCleanup(const TSnapshot& snapshot, THashSet<ui64>& pathsToDrop,
+ ui32 maxRecords) noexcept = 0;
virtual std::shared_ptr<TTTLColumnEngineChanges> StartTtl(const THashMap<ui64, TTiering>& pathEviction,
- ui64 maxBytesToEvict = TCompactionLimits::DEFAULT_EVICTION_BYTES) = 0;
+ ui64 maxBytesToEvict = TCompactionLimits::DEFAULT_EVICTION_BYTES) noexcept = 0;
virtual bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) noexcept = 0;
virtual void UpdateDefaultSchema(const TSnapshot& snapshot, TIndexInfo&& info) = 0;
//virtual void UpdateTableSchema(ui64 pathId, const TSnapshot& snapshot, TIndexInfo&& info) = 0; // TODO
@@ -372,7 +373,7 @@ public:
virtual const TColumnEngineStats& GetTotalStats() = 0;
virtual ui64 MemoryUsage() const { return 0; }
virtual TSnapshot LastUpdate() const { return TSnapshot::Zero(); }
- virtual void OnTieringModified(std::shared_ptr<NColumnShard::TTiersManager> manager) = 0;
+ virtual void OnTieringModified(std::shared_ptr<NColumnShard::TTiersManager> manager, const NColumnShard::TTtl& ttl) = 0;
};
}
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 1d37b22dbd..9d3b359b7a 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -5,6 +5,8 @@
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
#include <ydb/core/formats/arrow/one_batch_input_stream.h>
#include <ydb/core/formats/arrow/merging_sorted_input_stream.h>
+#include <ydb/core/tx/tiering/manager.h>
+#include <ydb/core/tx/columnshard/columnshard_ttl.h>
#include <ydb/library/conclusion/status.h>
#include "changes/indexation.h"
#include "changes/in_granule_compaction.h"
@@ -269,7 +271,7 @@ bool TColumnEngineForLogs::LoadCounters(IDbWrapper& db) {
return CountersTable->Load(db, callback);
}
-std::shared_ptr<TInsertColumnEngineChanges> TColumnEngineForLogs::StartInsert(const TCompactionLimits& /*limits*/, std::vector<TInsertedData>&& dataToIndex) {
+std::shared_ptr<TInsertColumnEngineChanges> TColumnEngineForLogs::StartInsert(std::vector<TInsertedData>&& dataToIndex) noexcept {
Y_VERIFY(dataToIndex.size());
auto changes = TChangesConstructor::BuildInsertChanges(DefaultMark(), std::move(dataToIndex), LastSnapshot);
@@ -306,7 +308,7 @@ std::shared_ptr<TInsertColumnEngineChanges> TColumnEngineForLogs::StartInsert(co
}
std::shared_ptr<TCompactColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std::unique_ptr<TCompactionInfo>&& info,
- const TCompactionLimits& limits) {
+ const TCompactionLimits& limits) noexcept {
const ui64 pathId = info->GetPlanCompaction().GetPathId();
Y_VERIFY(PathGranules.contains(pathId));
@@ -324,9 +326,7 @@ std::shared_ptr<TCompactColumnEngineChanges> TColumnEngineForLogs::StartCompacti
}
std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const TSnapshot& snapshot,
- const TCompactionLimits& /*limits*/,
- THashSet<ui64>& pathsToDrop,
- ui32 maxRecords) {
+ THashSet<ui64>& pathsToDrop, ui32 maxRecords) noexcept {
auto changes = TChangesConstructor::BuildCleanupChanges(snapshot);
ui32 affectedRecords = 0;
@@ -400,119 +400,160 @@ std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup(
return changes;
}
-std::shared_ptr<TTTLColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THashMap<ui64, TTiering>& pathEviction, ui64 maxEvictBytes) {
- if (pathEviction.empty()) {
- return {};
- }
- auto changes = TChangesConstructor::BuildTtlChanges();
+TDuration TColumnEngineForLogs::ProcessTiering(const ui64 pathId, const TTiering& ttl, TTieringProcessContext& context) const {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "ProcessTiering")("path_id", pathId)("ttl", ttl.GetDebugString());
ui64 evictionSize = 0;
- bool allowEviction = true;
ui64 dropBlobs = 0;
- bool allowDrop = true;
-
auto& indexInfo = VersionedIndex.GetLastSchema()->GetIndexInfo();
- const TMonotonic nowMonotonic = TlsActivationContext ? AppData()->MonotonicTimeProvider->Now() : TMonotonic::Now();
- for (const auto& [pathId, ttl] : pathEviction) {
- auto it = NextCheckInstantForTTL.find(pathId);
- if (it != NextCheckInstantForTTL.end() && nowMonotonic < it->second) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_path_id_ttl")("path_id", pathId)("now", nowMonotonic)("expected", it->second);
- continue;
- }
- auto itGranules = PathGranules.find(pathId);
- if (itGranules == PathGranules.end()) {
- continue; // It's not an error: allow TTL over multiple shards with different pathIds presented
- }
+ Y_VERIFY(context.Changes->Tiering.emplace(pathId, ttl).second);
- auto expireTimestampOpt = ttl.GetEvictBorder();
- Y_VERIFY(expireTimestampOpt);
- auto expireTimestamp = *expireTimestampOpt;
+ TDuration dWaiting = TDuration::Minutes(5);
+ auto itGranules = PathGranules.find(pathId);
+ if (itGranules == PathGranules.end()) {
+ return dWaiting;
+ }
- auto ttlColumnNames = ttl.GetTtlColumns();
- Y_VERIFY(ttlColumnNames.size() == 1); // TODO: support different ttl columns
- ui32 ttlColumnId = indexInfo.GetColumnId(*ttlColumnNames.begin());
- std::optional<TDuration> dWaiting;
- for (const auto& [ts, granule] : itGranules->second) {
- auto spg = Granules[granule];
- Y_VERIFY(spg);
+ auto expireTimestampOpt = ttl.GetEvictInstant(context.Now);
+ Y_VERIFY(expireTimestampOpt);
+ auto expireTimestamp = *expireTimestampOpt;
- for (auto& [portion, info] : spg->GetPortions()) {
- if (!info.IsActive()) {
- continue;
- }
+ auto ttlColumnNames = ttl.GetTtlColumns();
+ Y_VERIFY(ttlColumnNames.size() == 1); // TODO: support different ttl columns
+ ui32 ttlColumnId = indexInfo.GetColumnId(*ttlColumnNames.begin());
+ for (const auto& [ts, granule] : itGranules->second) {
+ auto itGranule = Granules.find(granule);
+ auto spg = itGranule->second;
+ Y_VERIFY(spg);
- allowEviction = (evictionSize <= maxEvictBytes);
- allowDrop = (dropBlobs <= TCompactionLimits::MAX_BLOBS_TO_DELETE);
- const bool tryEvictPortion = allowEviction && ttl.HasTiers() && info.EvictReady(TCompactionLimits::EVICT_HOT_PORTION_BYTES);
+ for (auto& [portion, info] : spg->GetPortions()) {
+ if (!info.IsActive()) {
+ continue;
+ }
- if (auto max = info.MaxValue(ttlColumnId)) {
- bool keep = false;
- {
- auto mpiOpt = ttl.ScalarToInstant(max);
- Y_VERIFY(mpiOpt);
- const TInstant maxTtlPortionInstant = *mpiOpt;
- const TDuration d = maxTtlPortionInstant - expireTimestamp;
- keep = !!d;
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "keep_detect")("max", maxTtlPortionInstant.Seconds())("expire", expireTimestamp.Seconds());
- if (d && (!dWaiting || *dWaiting > d)) {
- dWaiting = d;
- }
+ context.AllowEviction = (evictionSize <= context.MaxEvictBytes);
+ context.AllowDrop = (dropBlobs <= TCompactionLimits::MAX_BLOBS_TO_DELETE);
+ const bool tryEvictPortion = context.AllowEviction && ttl.HasTiers() && info.EvictReady(TCompactionLimits::EVICT_HOT_PORTION_BYTES);
+
+ if (auto max = info.MaxValue(ttlColumnId)) {
+ bool keep = false;
+ {
+ auto mpiOpt = ttl.ScalarToInstant(max);
+ Y_VERIFY(mpiOpt);
+ const TInstant maxTtlPortionInstant = *mpiOpt;
+ const TDuration d = maxTtlPortionInstant - expireTimestamp;
+ keep = !!d;
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "keep_detect")("max", maxTtlPortionInstant.Seconds())("expire", expireTimestamp.Seconds());
+ if (d && dWaiting > d) {
+ dWaiting = d;
}
+ }
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_result")("keep", keep)("tryEvictPortion", tryEvictPortion)("allowDrop", allowDrop);
- if (keep && tryEvictPortion) {
- TString tierName;
- for (auto& tierRef : ttl.GetOrderedTiers()) {
- auto& tierInfo = tierRef.Get();
- if (!indexInfo.AllowTtlOverColumn(tierInfo.GetEvictColumnName())) {
- SignalCounters.OnPortionNoTtlColumn(info.BlobsBytes());
- continue;
- }
- auto mpiOpt = tierInfo.ScalarToInstant(max);
- Y_VERIFY(mpiOpt);
- const TInstant maxTieringPortionInstant = *mpiOpt;
-
- const TDuration d = maxTieringPortionInstant - tierInfo.GetEvictBorder();
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "tiering")("max", maxTieringPortionInstant.Seconds())("evict", tierInfo.GetEvictBorder().Seconds());
- if (d) {
- if (!dWaiting || *dWaiting > d) {
- dWaiting = d;
- }
- tierName = tierInfo.GetName();
- } else {
- break;
- }
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_result")("keep", keep)("tryEvictPortion", tryEvictPortion)("allowDrop", context.AllowDrop);
+ if (keep && tryEvictPortion) {
+ TString tierName;
+ for (auto& tierRef : ttl.GetOrderedTiers()) {
+ auto& tierInfo = tierRef.Get();
+ if (!indexInfo.AllowTtlOverColumn(tierInfo.GetEvictColumnName())) {
+ SignalCounters.OnPortionNoTtlColumn(info.BlobsBytes());
+ continue;
}
- if (info.TierName != tierName) {
- evictionSize += info.BlobsSizes().first;
- const bool needExport = ttl.NeedExport(tierName);
- changes->PortionsToEvict.emplace_back(info, TPortionEvictionFeatures(tierName, pathId, needExport));
- SignalCounters.OnPortionToEvict(info.BlobsBytes());
+ auto mpiOpt = tierInfo.ScalarToInstant(max);
+ Y_VERIFY(mpiOpt);
+ const TInstant maxTieringPortionInstant = *mpiOpt;
+
+ const TDuration d = maxTieringPortionInstant - tierInfo.GetEvictInstant(context.Now);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "tiering")("max", maxTieringPortionInstant.Seconds())
+ ("evict", tierInfo.GetEvictInstant(context.Now).Seconds());
+ if (d) {
+ if (dWaiting > d) {
+ dWaiting = d;
+ }
+ tierName = tierInfo.GetName();
+ } else {
+ break;
}
}
- if (!keep && allowDrop) {
- dropBlobs += info.NumRecords();
- changes->PortionsToDrop.push_back(info);
- SignalCounters.OnPortionToDrop(info.BlobsBytes());
+ if (info.TierName != tierName) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "tiering switch detected")("from", info.TierName)("to", tierName);
+ evictionSize += info.BlobsSizes().first;
+ const bool needExport = ttl.NeedExport(tierName);
+ context.Changes->PortionsToEvict.emplace_back(info, TPortionEvictionFeatures(tierName, pathId, needExport));
+ SignalCounters.OnPortionToEvict(info.BlobsBytes());
}
- } else {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_not_max");
- SignalCounters.OnPortionNoBorder(info.BlobsBytes());
}
+ if (!keep && context.AllowDrop) {
+ dropBlobs += info.NumRecords();
+ context.Changes->PortionsToDrop.push_back(info);
+ SignalCounters.OnPortionToDrop(info.BlobsBytes());
+ }
+ } else {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_not_max");
+ SignalCounters.OnPortionNoBorder(info.BlobsBytes());
+ }
+ }
+ }
+ Y_VERIFY(!!dWaiting);
+ return dWaiting;
+}
+
+bool TColumnEngineForLogs::DrainEvictionQueue(std::map<TMonotonic, std::vector<TEvictionsController::TTieringWithPathId>>& evictionsQueue, TTieringProcessContext& context) const {
+ const TMonotonic nowMonotonic = TlsActivationContext ? AppData()->MonotonicTimeProvider->Now() : TMonotonic::Now();
+ bool hasChanges = false;
+ while (evictionsQueue.size() && evictionsQueue.begin()->first < nowMonotonic) {
+ hasChanges = true;
+ auto tierings = std::move(evictionsQueue.begin()->second);
+ evictionsQueue.erase(evictionsQueue.begin());
+ for (auto&& i : tierings) {
+ auto itDuration = context.DurationsForced.find(i.GetPathId());
+ if (itDuration == context.DurationsForced.end()) {
+ const TDuration dWaiting = ProcessTiering(i.GetPathId(), i.GetTieringInfo(), context);
+ evictionsQueue[nowMonotonic + dWaiting].emplace_back(std::move(i));
+ } else {
+ evictionsQueue[nowMonotonic + itDuration->second].emplace_back(std::move(i));
}
}
- if (dWaiting) {
- NextCheckInstantForTTL[pathId] = nowMonotonic + std::min<TDuration>(*dWaiting, TDuration::Minutes(5));
+ }
+
+ if (evictionsQueue.size()) {
+ if (evictionsQueue.begin()->first < nowMonotonic) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "stop scan")("reason", "too many data")("first", evictionsQueue.begin()->first)("now", nowMonotonic);
+ } else if (!hasChanges) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "stop scan")("reason", "too early")("first", evictionsQueue.begin()->first)("now", nowMonotonic);
} else {
- NextCheckInstantForTTL.erase(pathId);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "stop scan")("reason", "task_ready")("first", evictionsQueue.begin()->first)("now", nowMonotonic)
+ ("internal", hasChanges)("evict_portions", context.Changes->PortionsToEvict.size())
+ ("drop_portions", context.Changes->PortionsToDrop.size());
}
+ } else {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "stop scan")("reason", "no data in queue");
+ }
+ return hasChanges;
+}
+
+std::shared_ptr<TTTLColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THashMap<ui64, TTiering>& pathEviction, ui64 maxEvictBytes) noexcept {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartTtl")("external", pathEviction.size())
+ ("internal", EvictionsController.MutableNextCheckInstantForTierings().size())
+ ;
+ auto changes = TChangesConstructor::BuildTtlChanges();
+
+ TTieringProcessContext context(maxEvictBytes, changes);
+ bool hasExternalChanges = false;
+ for (auto&& i : pathEviction) {
+ context.DurationsForced[i.first] = ProcessTiering(i.first, i.second, context);
+ hasExternalChanges = true;
+ }
+
+ {
+ TLogContextGuard lGuard(TLogContextBuilder::Build()("queue", "ttl")("has_external", hasExternalChanges));
+ DrainEvictionQueue(EvictionsController.MutableNextCheckInstantForTierings(), context);
}
if (changes->PortionsToDrop.empty() &&
changes->PortionsToEvict.empty()) {
- return {};
+ return nullptr;
}
- if (!allowEviction || !allowDrop) {
+ if (!context.AllowEviction || !context.AllowDrop) {
changes->NeedRepeat = true;
}
return changes;
@@ -810,9 +851,35 @@ std::unique_ptr<TCompactionInfo> TColumnEngineForLogs::Compact(const TCompaction
}
}
-void TColumnEngineForLogs::OnTieringModified(std::shared_ptr<NColumnShard::TTiersManager> /*manager*/) {
+void TColumnEngineForLogs::OnTieringModified(std::shared_ptr<NColumnShard::TTiersManager> manager, const NColumnShard::TTtl& ttl) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "OnTieringModified");
- NextCheckInstantForTTL.clear();
+ std::optional<THashMap<ui64, TTiering>> tierings;
+ if (manager) {
+ tierings = manager->GetTiering();
+ }
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "OnTieringModified")
+ ("new_count_tierings", tierings ? ::ToString(tierings->size()) : TString("undefined"))
+ ("new_count_ttls", ttl.PathsCount());
+ EvictionsController.RefreshTierings(std::move(tierings), ttl);
+
+}
+
+TColumnEngineForLogs::TTieringProcessContext::TTieringProcessContext(const ui64 maxEvictBytes, std::shared_ptr<TTTLColumnEngineChanges> changes)
+ : Now(TlsActivationContext ? AppData()->TimeProvider->Now() : TInstant::Now())
+ , MaxEvictBytes(maxEvictBytes)
+ , Changes(changes)
+{
+
+}
+
+void TEvictionsController::RefreshTierings(std::optional<THashMap<ui64, TTiering>>&& tierings, const NColumnShard::TTtl& ttl) {
+ if (tierings) {
+ OriginalTierings = std::move(*tierings);
+ }
+ auto copy = OriginalTierings;
+ ttl.AddTtls(copy);
+ NextCheckInstantForTierings = BuildNextInstantCheckers(std::move(copy));
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "RefreshTierings")("count", NextCheckInstantForTierings.size());
}
} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index 0b2d0e5291..1aee3a86a6 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -4,6 +4,8 @@
#include "column_engine.h"
#include <ydb/core/tx/columnshard/common/scalars.h>
#include <ydb/core/tx/columnshard/counters/engine_logs.h>
+#include <ydb/core/tx/columnshard/columnshard_ttl.h>
+#include "scheme/tier_info.h"
#include "storage/granule.h"
#include "storage/storage.h"
#include "changes/indexation.h"
@@ -21,6 +23,51 @@ class TGranulesTable;
class TColumnsTable;
class TCountersTable;
+class TEvictionsController {
+public:
+ class TTieringWithPathId {
+ private:
+ const ui64 PathId;
+ TTiering TieringInfo;
+ public:
+ TTieringWithPathId(const ui64 pathId, TTiering&& tieringInfo)
+ : PathId(pathId)
+ , TieringInfo(std::move(tieringInfo))
+ {
+
+ }
+
+ ui64 GetPathId() const {
+ return PathId;
+ }
+
+ const TTiering& GetTieringInfo() const {
+ return TieringInfo;
+ }
+ };
+private:
+ THashMap<ui64, TTiering> OriginalTierings;
+ std::map<TMonotonic, std::vector<TTieringWithPathId>> NextCheckInstantForTTL;
+ std::map<TMonotonic, std::vector<TTieringWithPathId>> NextCheckInstantForTierings;
+
+ std::map<TMonotonic, std::vector<TTieringWithPathId>> BuildNextInstantCheckers(THashMap<ui64, TTiering>&& info) {
+ std::map<TMonotonic, std::vector<TTieringWithPathId>> result;
+ std::vector<TTieringWithPathId> newTasks;
+ for (auto&& i : info) {
+ newTasks.emplace_back(i.first, std::move(i.second));
+ }
+ result.emplace(TMonotonic::Zero(), std::move(newTasks));
+ return result;
+ }
+public:
+ std::map<TMonotonic, std::vector<TTieringWithPathId>>& MutableNextCheckInstantForTierings() {
+ return NextCheckInstantForTierings;
+ }
+
+ void RefreshTierings(std::optional<THashMap<ui64, TTiering>>&& tierings, const NColumnShard::TTtl& ttl);
+};
+
+
/// Engine with 2 tables:
/// - Granules: PK -> granules (use part of PK)
/// - Columns: granule -> blobs
@@ -35,7 +82,20 @@ class TColumnEngineForLogs : public IColumnEngine {
private:
const NColumnShard::TEngineLogsCounters SignalCounters;
std::shared_ptr<TGranulesStorage> GranulesStorage;
- THashMap<ui64, TMonotonic> NextCheckInstantForTTL;
+ TEvictionsController EvictionsController;
+ class TTieringProcessContext {
+ public:
+ bool AllowEviction = true;
+ bool AllowDrop = true;
+ const TInstant Now;
+ const ui64 MaxEvictBytes;
+ std::shared_ptr<TTTLColumnEngineChanges> Changes;
+ std::map<ui64, TDuration> DurationsForced;
+ TTieringProcessContext(const ui64 maxEvictBytes, std::shared_ptr<TTTLColumnEngineChanges> changes);
+ };
+
+ TDuration ProcessTiering(const ui64 pathId, const TTiering& tiering, TTieringProcessContext& context) const;
+ bool DrainEvictionQueue(std::map<TMonotonic, std::vector<TEvictionsController::TTieringWithPathId>>& evictionsQueue, TTieringProcessContext& context) const;
public:
class TChangesConstructor : public TColumnEngineChanges {
public:
@@ -69,7 +129,7 @@ public:
TColumnEngineForLogs(ui64 tabletId, const TCompactionLimits& limits = {});
- virtual void OnTieringModified(std::shared_ptr<NColumnShard::TTiersManager> manager) override;
+ virtual void OnTieringModified(std::shared_ptr<NColumnShard::TTiersManager> manager, const NColumnShard::TTtl& ttl) override;
const TVersionedIndex& GetVersionedIndex() const override {
return VersionedIndex;
@@ -108,12 +168,11 @@ public:
public:
bool Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs, const THashSet<ui64>& pathsToDrop = {}) override;
- std::shared_ptr<TInsertColumnEngineChanges> StartInsert(const TCompactionLimits& limits, std::vector<TInsertedData>&& dataToIndex) override;
- std::shared_ptr<TCompactColumnEngineChanges> StartCompaction(std::unique_ptr<TCompactionInfo>&& compactionInfo, const TCompactionLimits& limits) override;
- std::shared_ptr<TCleanupColumnEngineChanges> StartCleanup(const TSnapshot& snapshot, const TCompactionLimits& limits, THashSet<ui64>& pathsToDrop,
- ui32 maxRecords) override;
+ std::shared_ptr<TInsertColumnEngineChanges> StartInsert(std::vector<TInsertedData>&& dataToIndex) noexcept override;
+ std::shared_ptr<TCompactColumnEngineChanges> StartCompaction(std::unique_ptr<TCompactionInfo>&& compactionInfo, const TCompactionLimits& limits) noexcept override;
+ std::shared_ptr<TCleanupColumnEngineChanges> StartCleanup(const TSnapshot& snapshot, THashSet<ui64>& pathsToDrop, ui32 maxRecords) noexcept override;
std::shared_ptr<TTTLColumnEngineChanges> StartTtl(const THashMap<ui64, TTiering>& pathEviction,
- ui64 maxEvictBytes = TCompactionLimits::DEFAULT_EVICTION_BYTES) override;
+ ui64 maxEvictBytes = TCompactionLimits::DEFAULT_EVICTION_BYTES) noexcept override;
bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges,
const TSnapshot& snapshot) noexcept override;
diff --git a/ydb/core/tx/columnshard/engines/scheme/tier_info.cpp b/ydb/core/tx/columnshard/engines/scheme/tier_info.cpp
index 1672d7a97a..ce84fe4c50 100644
--- a/ydb/core/tx/columnshard/engines/scheme/tier_info.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/tier_info.cpp
@@ -18,4 +18,24 @@ std::optional<TInstant> TTierInfo::ScalarToInstant(const std::shared_ptr<arrow::
}
}
+std::shared_ptr<NKikimr::NOlap::TTierInfo> TTiering::GetMainTierInfo() const {
+ auto ttl = Ttl;
+ auto tier = OrderedTiers.size() ? OrderedTiers.begin()->GetPtr() : nullptr;
+ if (!ttl && !tier) {
+ return nullptr;
+ } else if (!tier) {
+ return ttl;
+ } else if (!ttl) {
+ return tier;
+ } else {
+ const TDuration ttlDuration = ttl->GetEvictDuration();
+ const TDuration tierDuration = tier->GetEvictDuration();
+ if (tierDuration < ttlDuration) {
+ return tier;
+ } else {
+ return ttl;
+ }
+ }
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/scheme/tier_info.h b/ydb/core/tx/columnshard/engines/scheme/tier_info.h
index bf5e8980cb..4ec4c3233b 100644
--- a/ydb/core/tx/columnshard/engines/scheme/tier_info.h
+++ b/ydb/core/tx/columnshard/engines/scheme/tier_info.h
@@ -15,20 +15,20 @@ class TTierInfo {
private:
TString Name;
TString EvictColumnName;
- TInstant EvictBorder;
+ TDuration EvictDuration;
bool NeedExport = false;
ui32 TtlUnitsInSecond;
std::optional<NArrow::TCompression> Compression;
public:
- TTierInfo(const TString& tierName, TInstant evictBorder, const TString& column, ui32 unitsInSecond = 0)
+ TTierInfo(const TString& tierName, TDuration evictDuration, const TString& column, ui32 unitsInSecond = 0)
: Name(tierName)
, EvictColumnName(column)
- , EvictBorder(evictBorder)
+ , EvictDuration(evictDuration)
, TtlUnitsInSecond(unitsInSecond)
{
- Y_VERIFY(!Name.empty());
- Y_VERIFY(!EvictColumnName.empty());
+ Y_VERIFY(!!Name);
+ Y_VERIFY(!!EvictColumnName);
}
const TString& GetName() const {
@@ -39,8 +39,12 @@ public:
return EvictColumnName;
}
- const TInstant GetEvictBorder() const {
- return EvictBorder;
+ TInstant GetEvictInstant(const TInstant now) const {
+ return now - EvictDuration;
+ }
+
+ TDuration GetEvictDuration() const {
+ return EvictDuration;
}
bool GetNeedExport() const {
@@ -70,18 +74,19 @@ public:
std::optional<TInstant> ScalarToInstant(const std::shared_ptr<arrow::Scalar>& scalar) const;
- static std::shared_ptr<TTierInfo> MakeTtl(TInstant ttlBorder, const TString& ttlColumn, ui32 unitsInSecond = 0) {
- return std::make_shared<TTierInfo>("TTL", ttlBorder, ttlColumn, unitsInSecond);
+ static std::shared_ptr<TTierInfo> MakeTtl(const TDuration evictDuration, const TString& ttlColumn, ui32 unitsInSecond = 0) {
+ return std::make_shared<TTierInfo>("TTL", evictDuration, ttlColumn, unitsInSecond);
}
TString GetDebugString() const {
TStringBuilder sb;
- sb << "tier name '" << Name << "' border '" << EvictBorder << "' column '" << EvictColumnName << "' ";
+ sb << "name=" << Name << ";duration=" << EvictDuration << ";column=" << EvictColumnName << ";compression=";
if (Compression) {
sb << Compression->DebugString();
} else {
sb << "NOT_SPECIFIED(Default)";
}
+ sb << ";";
return sb;
}
};
@@ -95,16 +100,16 @@ public:
}
bool operator < (const TTierRef& b) const {
- if (Info->GetEvictBorder() < b.Info->GetEvictBorder()) {
+ if (Info->GetEvictDuration() > b.Info->GetEvictDuration()) {
return true;
- } else if (Info->GetEvictBorder() == b.Info->GetEvictBorder()) {
+ } else if (Info->GetEvictDuration() == b.Info->GetEvictDuration()) {
return Info->GetName() > b.Info->GetName(); // add stability: smaller name is hotter
}
return false;
}
bool operator == (const TTierRef& b) const {
- return Info->GetEvictBorder() == b.Info->GetEvictBorder()
+ return Info->GetEvictDuration() == b.Info->GetEvictDuration()
&& Info->GetName() == b.Info->GetName();
}
@@ -126,25 +131,7 @@ class TTiering {
TSet<TTierRef> OrderedTiers;
public:
- std::shared_ptr<TTierInfo> GetMainTierInfo() const {
- auto ttl = Ttl;
- auto tier = OrderedTiers.size() ? OrderedTiers.begin()->GetPtr() : nullptr;
- if (!ttl && !tier) {
- return nullptr;
- } else if (!tier) {
- return ttl;
- } else if (!ttl) {
- return tier;
- } else {
- const TInstant ttlInstant = ttl->GetEvictBorder();
- const TInstant tierInstant = tier->GetEvictBorder();
- if (ttlInstant < tierInstant) {
- return tier;
- } else {
- return ttl;
- }
- }
- }
+ std::shared_ptr<TTierInfo> GetMainTierInfo() const;
std::shared_ptr<TTierInfo> Ttl;
@@ -186,12 +173,12 @@ public:
}
}
- std::optional<TInstant> GetEvictBorder() const {
+ std::optional<TInstant> GetEvictInstant(const TInstant now) const {
auto mainTier = GetMainTierInfo();
if (!mainTier) {
return {};
} else {
- return mainTier->GetEvictBorder();
+ return mainTier->GetEvictInstant(now);
}
}
diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
index 2169430819..3631138ec8 100644
--- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
@@ -260,7 +260,7 @@ TCompactionLimits TestLimits() {
bool Insert(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap,
std::vector<TInsertedData>&& dataToIndex, THashMap<TBlobRange, TString>& blobs, ui32& step) {
- std::shared_ptr<TInsertColumnEngineChanges> changes = engine.StartInsert(TestLimits(), std::move(dataToIndex));
+ std::shared_ptr<TInsertColumnEngineChanges> changes = engine.StartInsert(std::move(dataToIndex));
if (!changes) {
return false;
}
@@ -312,7 +312,7 @@ bool Compact(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, T
bool Cleanup(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, ui32 expectedToDrop) {
THashSet<ui64> pathsToDrop;
- std::shared_ptr<TCleanupColumnEngineChanges> changes = engine.StartCleanup(snap, TestLimits(), pathsToDrop, 1000);
+ std::shared_ptr<TCleanupColumnEngineChanges> changes = engine.StartCleanup(snap, pathsToDrop, 1000);
UNIT_ASSERT(changes);
UNIT_ASSERT_VALUES_EQUAL(changes->PortionsToDrop.size(), expectedToDrop);
@@ -700,7 +700,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
std::shared_ptr<arrow::DataType> ttlColType = arrow::timestamp(arrow::TimeUnit::MICRO);
THashMap<ui64, NOlap::TTiering> pathTtls;
NOlap::TTiering tiering;
- tiering.Ttl = NOlap::TTierInfo::MakeTtl(TInstant::MicroSeconds(10000), "timestamp");
+ tiering.Ttl = NOlap::TTierInfo::MakeTtl(TDuration::MicroSeconds(TInstant::Now().MicroSeconds() - 10000), "timestamp");
pathTtls.emplace(pathId, std::move(tiering));
Ttl(engine, db, pathTtls, 2);
diff --git a/ydb/core/tx/columnshard/eviction_actor.cpp b/ydb/core/tx/columnshard/eviction_actor.cpp
index 82621bf861..40fcffde44 100644
--- a/ydb/core/tx/columnshard/eviction_actor.cpp
+++ b/ydb/core/tx/columnshard/eviction_actor.cpp
@@ -132,7 +132,7 @@ private:
auto guard = TxEvent->PutResult->StartCpuGuard();
TxEvent->IndexChanges->SetBlobs(std::move(Blobs));
- NOlap::TConstructionContext context(TxEvent->IndexInfo, TxEvent->Tiering, Counters);
+ NOlap::TConstructionContext context(TxEvent->IndexInfo, Counters);
TxEvent->Blobs = std::move(TxEvent->IndexChanges->ConstructBlobs(context).DetachResult());
if (TxEvent->Blobs.empty()) {
TxEvent->SetPutStatus(NKikimrProto::OK);
diff --git a/ydb/core/tx/columnshard/indexing_actor.cpp b/ydb/core/tx/columnshard/indexing_actor.cpp
index e320d93558..c882695dc7 100644
--- a/ydb/core/tx/columnshard/indexing_actor.cpp
+++ b/ydb/core/tx/columnshard/indexing_actor.cpp
@@ -127,7 +127,7 @@ private:
virtual bool DoExecute() override {
auto guard = TxEvent->PutResult->StartCpuGuard();
- NOlap::TConstructionContext context(TxEvent->IndexInfo, TxEvent->Tiering, Counters);
+ NOlap::TConstructionContext context(TxEvent->IndexInfo, Counters);
TxEvent->Blobs = std::move(TxEvent->IndexChanges->ConstructBlobs(context).DetachResult());
return true;
}
diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp
index d92c226455..2acde90fb1 100644
--- a/ydb/core/tx/columnshard/tables_manager.cpp
+++ b/ydb/core/tx/columnshard/tables_manager.cpp
@@ -183,15 +183,14 @@ ui64 TTablesManager::GetMemoryUsage() const {
return memory;
}
-void TTablesManager::OnTtlUpdate() {
- Ttl.Repeat();
-}
-
void TTablesManager::DropTable(const ui64 pathId, const TRowVersion& version, NIceDb::TNiceDb& db) {
auto& table = Tables.at(pathId);
table.SetDropVersion(version);
PathsToDrop.insert(pathId);
Ttl.DropPathTtl(pathId);
+ if (PrimaryIndex) {
+ PrimaryIndex->OnTieringModified(nullptr, Ttl);
+ }
Schema::SaveTableDropVersion(db, pathId, version.Step, version.TxId);
}
@@ -261,6 +260,10 @@ void TTablesManager::AddTableVersion(const ui64 pathId, const TRowVersion& versi
} else {
Ttl.DropPathTtl(pathId);
}
+ if (PrimaryIndex) {
+ PrimaryIndex->OnTieringModified(nullptr, Ttl);
+ }
+
}
Schema::SaveTableVersionInfo(db, pathId, version, versionInfo);
table.AddVersion(version, versionInfo);
@@ -278,6 +281,7 @@ void TTablesManager::IndexSchemaVersion(const TRowVersion& version, const NKikim
Y_VERIFY(lastIndexInfo.GetIndexKey()->Equals(indexInfo.GetIndexKey()));
}
PrimaryIndex->UpdateDefaultSchema(snapshot, std::move(indexInfo));
+ PrimaryIndex->OnTieringModified(nullptr, Ttl);
for (auto& columnName : Ttl.TtlColumns()) {
PrimaryIndex->GetVersionedIndex().GetLastSchema()->GetIndexInfo().CheckTtlColumn(columnName);
diff --git a/ydb/core/tx/columnshard/tables_manager.h b/ydb/core/tx/columnshard/tables_manager.h
index 654901d13f..e5d5e0f308 100644
--- a/ydb/core/tx/columnshard/tables_manager.h
+++ b/ydb/core/tx/columnshard/tables_manager.h
@@ -142,8 +142,8 @@ public:
return Ttl;
}
- void AddTtls(THashMap<ui64, NOlap::TTiering>& eviction, TInstant now, bool force) {
- Ttl.AddTtls(eviction, now, force);
+ void AddTtls(THashMap<ui64, NOlap::TTiering>& eviction) {
+ Ttl.AddTtls(eviction);
}
const THashSet<ui64>& GetPathsToDrop() const {
@@ -209,8 +209,6 @@ public:
void AddPresetVersion(const ui32 presetId, const TRowVersion& version, const NKikimrSchemeOp::TColumnTableSchema& schema, NIceDb::TNiceDb& db);
void AddTableVersion(const ui64 pathId, const TRowVersion& version, const TTableInfo::TTableVersionInfo& versionInfo, NIceDb::TNiceDb& db);
-
- void OnTtlUpdate();
private:
void IndexSchemaVersion(const TRowVersion& version, const NKikimrSchemeOp::TColumnTableSchema& schema);
static NOlap::TIndexInfo DeserializeIndexInfoFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema);
diff --git a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
index 1b388028c2..c70ec3a568 100644
--- a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
+++ b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
@@ -657,14 +657,12 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt
}
}
if (i) {
- ui32 version = i + 1;
- {
- const bool ok = ProposeSchemaTx(runtime, sender,
- TTestSchema::AlterTableTxBody(tableId, version, specs[i]),
- NOlap::TSnapshot(++planStep, ++txId));
- UNIT_ASSERT(ok);
- PlanSchemaTx(runtime, sender, NOlap::TSnapshot(planStep, txId));
- }
+ const ui32 version = i + 1;
+ const bool ok = ProposeSchemaTx(runtime, sender,
+ TTestSchema::AlterTableTxBody(tableId, version, specs[i]),
+ NOlap::TSnapshot(++planStep, ++txId));
+ UNIT_ASSERT(ok);
+ PlanSchemaTx(runtime, sender, NOlap::TSnapshot(planStep, txId));
}
if (specs[i].HasTiers() || reboots) {
ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(specs[i]));
@@ -700,7 +698,7 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt
auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep - 1, Max<ui64>(), tableId);
Proto(read.get()).AddColumnNames(specs[i].TtlColumn);
- counter.CaptureReadEvents = 1; // TODO: we need affected by tiering blob here
+ counter.CaptureReadEvents = specs[i].WaitEmptyAfter ? 0 : 1; // TODO: we need affected by tiering blob here
ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release());
counter.WaitReadsCaptured(runtime);
}
@@ -907,6 +905,7 @@ std::vector<std::pair<ui32, ui64>> TestTiersAndTtl(const TTestSchema::TTableSpec
THashSet<ui32> forgets;
if (testTtl) {
changes.AddTtlAlters(spec, {allowBoth, allowOne, allowNone}, alters);
+ alters.back().WaitEmptyAfter = true;
} else {
changes.AddTierAlters(spec, {allowBoth, allowOne, allowNone}, alters);
diff --git a/ydb/core/tx/tiering/rule/object.cpp b/ydb/core/tx/tiering/rule/object.cpp
index 12f5981d7b..64aa4ba586 100644
--- a/ydb/core/tx/tiering/rule/object.cpp
+++ b/ydb/core/tx/tiering/rule/object.cpp
@@ -73,10 +73,8 @@ bool TTieringRule::DeserializeFromRecord(const TDecoder& decoder, const Ydb::Val
NKikimr::NOlap::TTiering TTieringRule::BuildOlapTiers() const {
NOlap::TTiering result;
- TInstant now = Now(); // Do not put it in cycle: prevent tiers reorder with the same eviction time
for (auto&& r : Intervals) {
- TInstant evictionBorder = now - r.GetDurationForEvict();
- result.Add(std::make_shared<NOlap::TTierInfo>(r.GetTierName(), evictionBorder, GetDefaultColumn()));
+ result.Add(std::make_shared<NOlap::TTierInfo>(r.GetTierName(), r.GetDurationForEvict(), GetDefaultColumn()));
}
return result;
}
diff --git a/ydb/core/tx/tiering/ut/ut_tiers.cpp b/ydb/core/tx/tiering/ut/ut_tiers.cpp
index a743adc90b..59662eeb73 100644
--- a/ydb/core/tx/tiering/ut/ut_tiers.cpp
+++ b/ydb/core/tx/tiering/ut/ut_tiers.cpp
@@ -533,7 +533,9 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
Cerr << "Wait tables" << Endl;
runtime.SimulateSleep(TDuration::Seconds(20));
Cerr << "Initialization tables" << Endl;
- const TInstant pkStart = Now() - TDuration::Days(15);
+ const TInstant now = Now() - TDuration::Days(100);
+ runtime.UpdateCurrentTime(now);
+ const TInstant pkStart = now - TDuration::Days(15);
auto batch = lHelper.TestArrowBatch(0, pkStart.GetValue(), 6000);
auto batchSize = NArrow::GetBatchDataSize(batch);
@@ -867,11 +869,12 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
const ui32 reduceStepsCount = 1;
for (ui32 i = 0; i < reduceStepsCount; ++i) {
runtime.AdvanceCurrentTime(TDuration::Seconds(numRecords * (i + 1) / reduceStepsCount + 500000));
- const TInstant start = TInstant::Now();
const ui64 purposeSize = 800000000.0 * (1 - 1.0 * (i + 1) / reduceStepsCount);
const ui64 purposeRecords = numRecords * (1 - 1.0 * (i + 1) / reduceStepsCount);
const ui64 purposeMinTimestamp = numRecords * 1.0 * (i + 1) / reduceStepsCount * 1000000;
+ const TInstant start = TInstant::Now();
while (bsCollector.GetChannelSize(2) > purposeSize && TInstant::Now() - start < TDuration::Seconds(60)) {
+ runtime.AdvanceCurrentTime(TDuration::Minutes(6));
runtime.SimulateSleep(TDuration::Seconds(1));
}
Cerr << bsCollector.GetChannelSize(2) << "/" << purposeSize << Endl;