aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2022-12-13 17:47:30 +0300
committerchertus <azuikov@ydb.tech>2022-12-13 17:47:30 +0300
commitf8bff6b810082c1df1db8097f2d0b3a709eec8e2 (patch)
tree59c8f865d1c1a66027fe308018492081e580a5eb
parent2cb87340968a3d324fa25a7fc117993457021309 (diff)
downloadydb-f8bff6b810082c1df1db8097f2d0b3a709eec8e2.tar.gz
update tiering
-rw-r--r--ydb/core/protos/flat_scheme_op.proto4
-rw-r--r--ydb/core/tx/columnshard/columnshard.cpp11
-rw-r--r--ydb/core/tx/columnshard/columnshard__init.cpp7
-rw-r--r--ydb/core/tx/columnshard/columnshard__propose_transaction.cpp11
-rw-r--r--ydb/core/tx/columnshard/columnshard__write_index.cpp3
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp101
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h7
-rw-r--r--ydb/core/tx/columnshard/columnshard_ttl.h57
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.cpp6
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.h77
-rw-r--r--ydb/core/tx/columnshard/compaction_actor.cpp3
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.cpp12
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h52
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp97
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h2
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.h53
-rw-r--r--ydb/core/tx/columnshard/engines/tier_info.h157
-rw-r--r--ydb/core/tx/columnshard/engines/ut_logs_engine.cpp12
-rw-r--r--ydb/core/tx/columnshard/eviction_actor.cpp3
-rw-r--r--ydb/core/tx/columnshard/export_actor.cpp3
-rw-r--r--ydb/core/tx/columnshard/indexing_actor.cpp3
-rw-r--r--ydb/core/tx/columnshard/read_actor.cpp42
-rw-r--r--ydb/core/tx/columnshard/ut_columnshard_schema.cpp364
-rw-r--r--ydb/core/tx/columnshard/write_actor.cpp21
-rw-r--r--ydb/core/tx/tiering/manager.cpp32
-rw-r--r--ydb/core/tx/tiering/manager.h6
-rw-r--r--ydb/core/tx/tiering/rule/object.cpp8
-rw-r--r--ydb/core/tx/tiering/rule/object.h2
-rw-r--r--ydb/core/tx/tiering/ut/ut_tiers.cpp5
29 files changed, 687 insertions, 474 deletions
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index 0226e5eb363..91018709a17 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -472,6 +472,10 @@ message TColumnDataLifeCycle {
message TStorageTiering {
optional string UseTiering = 1;
+ // If set remote data by TTL in addition to the last tier's border
+ optional TTtl Ttl = 2;
+ // If set keep data in default storage (out of tiers) till this border
+ //optional TTtl KeepTime = 3; TODO
}
message TDisabled {
diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp
index 2aff76c3873..6a6af07b799 100644
--- a/ydb/core/tx/columnshard/columnshard.cpp
+++ b/ydb/core/tx/columnshard/columnshard.cpp
@@ -229,12 +229,21 @@ void TColumnShard::UpdateIndexCounters() {
SetCounter(COUNTER_INACTIVE_ROWS, stats.Inactive.Rows);
SetCounter(COUNTER_INACTIVE_BYTES, stats.Inactive.Bytes);
SetCounter(COUNTER_INACTIVE_RAW_BYTES, stats.Inactive.RawBytes);
-
SetCounter(COUNTER_EVICTED_PORTIONS, stats.Evicted.Portions);
SetCounter(COUNTER_EVICTED_BLOBS, stats.Evicted.Blobs);
SetCounter(COUNTER_EVICTED_ROWS, stats.Evicted.Rows);
SetCounter(COUNTER_EVICTED_BYTES, stats.Evicted.Bytes);
SetCounter(COUNTER_EVICTED_RAW_BYTES, stats.Evicted.RawBytes);
+
+ LOG_S_DEBUG("Index: tables " << stats.Tables
+ << " granules " << stats.Granules << " (empty " << stats.EmptyGranules << " overloaded " << stats.OverloadedGranules << ")"
+ << " inserted " << stats.Inserted.Portions << "/" << stats.Inserted.Blobs << "/" << stats.Inserted.Rows
+ << " compacted " << stats.Compacted.Portions << "/" << stats.Compacted.Blobs << "/" << stats.Compacted.Rows
+ << " s-compacted " << stats.SplitCompacted.Portions << "/" << stats.SplitCompacted.Blobs << "/" << stats.SplitCompacted.Rows
+ << " inactive " << stats.Inactive.Portions << "/" << stats.Inactive.Blobs << "/" << stats.Inactive.Rows
+ << " evicted " << stats.Evicted.Portions << "/" << stats.Evicted.Blobs << "/" << stats.Evicted.Rows
+ << " column records " << stats.ColumnRecords << " meta bytes " << stats.ColumnMetadataBytes
+ << " at tablet " << TabletID());
}
ui64 TColumnShard::MemoryUsage() const {
diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp
index 15ee07d9e55..c714dcd3536 100644
--- a/ydb/core/tx/columnshard/columnshard__init.cpp
+++ b/ydb/core/tx/columnshard/columnshard__init.cpp
@@ -234,7 +234,12 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
Y_VERIFY(info.ParseFromString(rowset.GetValue<Schema::TableVersionInfo::InfoProto>()));
if (!Self->PathsToDrop.count(pathId)) {
- ttls[pathId].emplace(version, TTtl::TDescription(info.GetTtlSettings()));
+ auto& ttlSettings = info.GetTtlSettings();
+ if (ttlSettings.HasEnabled()) {
+ ttls[pathId].emplace(version, TTtl::TDescription(ttlSettings.GetEnabled()));
+ } else if (ttlSettings.HasTiering() && ttlSettings.GetTiering().HasTtl()) {
+ ttls[pathId].emplace(version, TTtl::TDescription(ttlSettings.GetTiering().GetTtl()));
+ }
}
if (!rowset.Next())
diff --git a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
index 7e9490dbad4..2ec98a0b3c8 100644
--- a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
+++ b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
@@ -200,7 +200,7 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex
}
// If no paths trigger schema defined TTL
- THashMap<ui64, NOlap::TTiersInfo> pathTtls;
+ THashMap<ui64, NOlap::TTiering> pathTtls;
if (!ttlBody.GetPathIds().empty()) {
auto unixTime = TInstant::Seconds(ttlBody.GetUnixTimeSeconds());
if (!unixTime) {
@@ -217,7 +217,7 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex
}
for (ui64 pathId : ttlBody.GetPathIds()) {
- pathTtls.emplace(pathId, NOlap::TTiersInfo(columnName, unixTime));
+ pathTtls.emplace(pathId, NOlap::TTiering::MakeTtl(unixTime, columnName));
}
}
@@ -228,6 +228,8 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex
ctx.Send(Self->SelfId(), event->TxEvent.release());
}
status = NKikimrTxColumnShard::EResultStatus::SUCCESS;
+ } else {
+ statusMessage = "TTL not started";
}
break;
@@ -270,9 +272,10 @@ void TTxProposeTransaction::Complete(const TActorContext& ctx) {
void TColumnShard::Handle(TEvColumnShard::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx) {
auto& record = Proto(ev->Get());
- ui32 txKind = record.GetTxKind();
+ auto txKind = record.GetTxKind();
ui64 txId = record.GetTxId();
- LOG_S_DEBUG("ProposeTransaction kind " << txKind << " txId " << txId << " at tablet " << TabletID());
+ LOG_S_DEBUG("ProposeTransaction " << NKikimrTxColumnShard::ETransactionKind_Name(txKind)
+ << " txId " << txId << " at tablet " << TabletID());
Execute(new TTxProposeTransaction(this, ev), ctx);
}
diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp
index e6c88ef4cd9..7d5a298107e 100644
--- a/ydb/core/tx/columnshard/columnshard__write_index.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp
@@ -189,7 +189,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
continue;
}
}
- LOG_S_DEBUG("Delete evicting blob '" << blobId.ToStringNew() << "' at tablet " << Self->TabletID());
+ LOG_S_TRACE("Delete blob '" << blobId.ToStringNew() << "' at tablet " << Self->TabletID());
Self->BlobManager->DeleteBlob(blobId, blobManagerDb);
Self->IncCounter(COUNTER_BLOBS_ERASED);
Self->IncCounter(COUNTER_BYTES_ERASED, blobId.BlobSize());
@@ -199,6 +199,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
// DS to S3 eviction. Keep source blob in DS till EEvictState::EXTERN state.
continue;
}
+ LOG_S_TRACE("Delete evicted blob '" << blobId.ToStringNew() << "' at tablet " << Self->TabletID());
Self->BlobManager->DeleteBlob(blobId, blobManagerDb);
Self->IncCounter(COUNTER_BLOBS_ERASED);
Self->IncCounter(COUNTER_BYTES_ERASED, blobId.BlobSize());
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index dc3e40a879a..8bbb86d9b6c 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -473,7 +473,9 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl
const ui64 pathId = tableProto.GetPathId();
if (!Tables.contains(pathId)) {
- LOG_S_DEBUG("EnsureTable for pathId: " << pathId << " at tablet " << TabletID());
+ LOG_S_DEBUG("EnsureTable for pathId: " << pathId
+ << " ttl settings: " << tableProto.GetTtlSettings()
+ << " at tablet " << TabletID());
ui32 schemaPresetId = 0;
if (tableProto.HasSchemaPreset()) {
@@ -506,11 +508,16 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl
*tableVerProto.MutableTtlSettings() = tableProto.GetTtlSettings();
auto& ttlInfo = tableProto.GetTtlSettings();
if (ttlInfo.HasEnabled()) {
- Ttl.SetPathTtl(pathId, TTtl::TDescription(ttlInfo));
+ Ttl.SetPathTtl(pathId, TTtl::TDescription(ttlInfo.GetEnabled()));
SetCounter(COUNTER_TABLE_TTLS, Ttl.PathsCount());
} else if (ttlInfo.HasTiering()) {
- table.TieringUsage = ttlInfo.GetTiering().GetUseTiering();
+ auto& tiering = ttlInfo.GetTiering();
+ table.TieringUsage = tiering.GetUseTiering();
ActivateTiering(pathId, table.TieringUsage);
+ if (tiering.HasTtl()) {
+ Ttl.SetPathTtl(pathId, TTtl::TDescription(tiering.GetTtl()));
+ SetCounter(COUNTER_TABLE_TTLS, Ttl.PathsCount());
+ }
}
}
@@ -543,8 +550,11 @@ void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterP
auto* tablePtr = Tables.FindPtr(pathId);
Y_VERIFY(tablePtr && !tablePtr->IsDropped(), "AlterTable on a dropped or non-existent table");
auto& table = *tablePtr;
+ auto& ttlSettings = alterProto.GetTtlSettings();
- LOG_S_DEBUG("AlterTable for pathId: " << pathId << " at tablet " << TabletID());
+ LOG_S_DEBUG("AlterTable for pathId: " << pathId
+ << " ttl settings: " << ttlSettings
+ << " at tablet " << TabletID());
Y_VERIFY(!alterProto.HasSchema(), "Tables with explicit schema are not supported");
auto& info = table.Versions[version];
@@ -553,11 +563,17 @@ void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterP
info.SetSchemaPresetId(EnsureSchemaPreset(db, alterProto.GetSchemaPreset(), version));
}
- const TString& tieringUsage = alterProto.GetTtlSettings().GetTiering().GetUseTiering();
+ const TString& tieringUsage = ttlSettings.GetTiering().GetUseTiering();
ActivateTiering(pathId, tieringUsage);
- if (alterProto.HasTtlSettings() && alterProto.GetTtlSettings().HasEnabled()) {
- *info.MutableTtlSettings() = alterProto.GetTtlSettings();
- Ttl.SetPathTtl(pathId, TTtl::TDescription(alterProto.GetTtlSettings()));
+ if (alterProto.HasTtlSettings()) {
+ *info.MutableTtlSettings() = ttlSettings;
+ if (ttlSettings.HasEnabled()) {
+ Ttl.SetPathTtl(pathId, TTtl::TDescription(ttlSettings.GetEnabled()));
+ } else if (ttlSettings.HasTiering() && ttlSettings.GetTiering().HasTtl()) {
+ Ttl.SetPathTtl(pathId, TTtl::TDescription( ttlSettings.GetTiering().GetTtl()));
+ } else {
+ Ttl.DropPathTtl(pathId);
+ }
} else {
Ttl.DropPathTtl(pathId);
}
@@ -786,8 +802,13 @@ std::unique_ptr<TEvPrivate::TEvIndexing> TColumnShard::SetupIndexation() {
return {};
}
+ auto actualIndexInfo = PrimaryIndex->GetIndexInfo();
+ if (Tiers) {
+ actualIndexInfo.SetTiering(Tiers->GetTiering()); // TODO: pathIds
+ }
+
ActiveIndexingOrCompaction = true;
- auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(GetActualIndexInfo(), indexChanges,
+ auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges,
Settings.CacheDataAfterIndexing, std::move(cachedBlobs));
return std::make_unique<TEvPrivate::TEvIndexing>(std::move(ev));
}
@@ -824,14 +845,19 @@ std::unique_ptr<TEvPrivate::TEvCompaction> TColumnShard::SetupCompaction() {
return {};
}
+ auto actualIndexInfo = PrimaryIndex->GetIndexInfo();
+ if (Tiers) {
+ actualIndexInfo.SetTiering(Tiers->GetTiering()); // TODO: pathIds
+ }
+
ActiveIndexingOrCompaction = true;
- auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(GetActualIndexInfo(), indexChanges,
+ auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges,
Settings.CacheDataAfterCompaction);
return std::make_unique<TEvPrivate::TEvCompaction>(std::move(ev), *BlobManager);
}
-std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiersInfo>& pathTtls,
- bool force) {
+std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls,
+ bool force) {
if (ActiveTtl) {
LOG_S_DEBUG("TTL already in progress at tablet " << TabletID());
return {};
@@ -841,28 +867,27 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u
return {};
}
- THashMap<ui64, NOlap::TTiersInfo> regularTtls = pathTtls;
- if (regularTtls.empty()) {
- regularTtls = Ttl.MakeIndexTtlMap(TInstant::Now(), force);
- }
- const bool tiersUsage = regularTtls.empty() && Tiers;
- if (tiersUsage) {
- regularTtls = Tiers->GetTiering();
+ THashMap<ui64, NOlap::TTiering> eviction = pathTtls;
+ if (eviction.empty()) {
+ if (Tiers) {
+ eviction = Tiers->GetTiering(); // TODO: pathIds
+ }
+ Ttl.AddTtls(eviction, TInstant::Now(), force);
}
- if (regularTtls.empty()) {
+ if (eviction.empty()) {
LOG_S_TRACE("TTL not started. No tables to activate it on (or delayed) at tablet " << TabletID());
return {};
- } else {
- for (auto&& i : regularTtls) {
- LOG_S_DEBUG(i.first << "/" << i.second.GetDebugString() << ";tablet=" << TabletID());
- }
}
LOG_S_DEBUG("Prepare TTL at tablet " << TabletID());
+ for (auto&& i : eviction) {
+ LOG_S_DEBUG("Evicting path " << i.first << " with " << i.second.GetDebugString() << " at tablet " << TabletID());
+ }
+
std::shared_ptr<NOlap::TColumnEngineChanges> indexChanges;
- indexChanges = PrimaryIndex->StartTtl(regularTtls);
+ indexChanges = PrimaryIndex->StartTtl(eviction);
if (!indexChanges) {
LOG_S_NOTICE("Cannot prepare TTL at tablet " << TabletID());
@@ -874,8 +899,11 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u
bool needWrites = !indexChanges->PortionsToEvict.empty();
+ auto actualIndexInfo = PrimaryIndex->GetIndexInfo();
+ actualIndexInfo.SetTiering(std::move(eviction));
+
ActiveTtl = true;
- auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(GetActualIndexInfo(tiersUsage), indexChanges, false);
+ auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, false);
return std::make_unique<TEvPrivate::TEvEviction>(std::move(ev), *BlobManager, needWrites);
}
@@ -925,7 +953,14 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() {
return {};
}
- auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(GetActualIndexInfo(), changes, false);
+ auto actualIndexInfo = PrimaryIndex->GetIndexInfo();
+#if 0 // No need for now
+ if (Tiers) {
+ actualIndexInfo.SetTiering(Tiers->GetTiering());
+ }
+#endif
+
+ auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), changes, false);
ev->PutStatus = NKikimrProto::OK; // No new blobs to write
ActiveCleanup = true;
@@ -953,7 +988,7 @@ NOlap::TIndexInfo TColumnShard::ConvertSchema(const NKikimrSchemeOp::TColumnTabl
}
if (schema.HasDefaultCompression()) {
- NOlap::TCompression compression = NTiers::TManager::ConvertCompression(schema.GetDefaultCompression());
+ NOlap::TCompression compression = NTiers::ConvertCompression(schema.GetDefaultCompression());
indexInfo.SetDefaultCompression(compression);
}
@@ -1053,16 +1088,6 @@ void TColumnShard::Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr&
Tiers->TakeConfigs(ev->Get()->GetSnapshot(), nullptr);
}
-NOlap::TIndexInfo TColumnShard::GetActualIndexInfo(const bool tiersUsage) const {
- auto indexInfo = PrimaryIndex->GetIndexInfo();
- if (tiersUsage && Tiers) {
- for (auto&& i : *Tiers) {
- indexInfo.AddStorageTier(i.second.BuildTierStorage());
- }
- }
- return indexInfo;
-}
-
void TColumnShard::ActivateTiering(const ui64 pathId, const TString& useTiering) {
if (!Tiers) {
Tiers = std::make_shared<TTiersManager>(TabletID(), SelfId());
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index 01dbe644c5b..dfbd8700d22 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -127,7 +127,7 @@ class TColumnShard
void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr& ev, const TActorContext& ctx);
void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev);
void Handle(NTiers::TEvTiersManagerReadyForUsage::TPtr& ev);
-
+
ITransaction* CreateTxInitSchema();
ITransaction* CreateTxRunGc();
@@ -178,9 +178,8 @@ class TColumnShard
TabletCounters->Percentile()[counter].IncrementFor(latency.MicroSeconds());
}
- NOlap::TIndexInfo GetActualIndexInfo(const bool tiersUsage = true) const;
-
void ActivateTiering(const ui64 pathId, const TString& useTiering);
+
protected:
STFUNC(StateInit) {
TRACE_EVENT(NKikimrServices::TX_COLUMNSHARD);
@@ -463,7 +462,7 @@ private:
std::unique_ptr<TEvPrivate::TEvIndexing> SetupIndexation();
std::unique_ptr<TEvPrivate::TEvCompaction> SetupCompaction();
- std::unique_ptr<TEvPrivate::TEvEviction> SetupTtl(const THashMap<ui64, NOlap::TTiersInfo>& pathTtls = {},
+ std::unique_ptr<TEvPrivate::TEvEviction> SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls = {},
bool force = false);
std::unique_ptr<TEvPrivate::TEvWriteIndex> SetupCleanup();
diff --git a/ydb/core/tx/columnshard/columnshard_ttl.h b/ydb/core/tx/columnshard/columnshard_ttl.h
index aae50f3e7ee..5846fad4a4a 100644
--- a/ydb/core/tx/columnshard/columnshard_ttl.h
+++ b/ydb/core/tx/columnshard/columnshard_ttl.h
@@ -9,33 +9,20 @@ public:
static constexpr const ui64 DEFAULT_REPEAT_TTL_TIMEOUT_SEC = 10;
struct TEviction {
- TString TierName;
TDuration EvictAfter;
+ TString ColumnName;
};
struct TDescription {
- TString ColumnName;
- std::vector<TEviction> Evictions;
+ std::optional<TEviction> Eviction;
TDescription() = default;
- TDescription(const NKikimrSchemeOp::TColumnDataLifeCycle& ttl) {
- if (ttl.HasEnabled()) {
- auto& enabled = ttl.GetEnabled();
- ColumnName = enabled.GetColumnName();
- auto expireSec = TDuration::Seconds(enabled.GetExpireAfterSeconds());
+ TDescription(const NKikimrSchemeOp::TColumnDataLifeCycle::TTtl& ttl) {
+ auto expireSec = TDuration::Seconds(ttl.GetExpireAfterSeconds());
- Evictions.reserve(1);
- Evictions.emplace_back(TEviction{{}, expireSec});
- }
-
- if (Enabled()) {
- Y_VERIFY(!ColumnName.empty());
- }
- }
-
- bool Enabled() const {
- return !Evictions.empty();
+ Eviction = TEviction{expireSec, ttl.GetColumnName()};
+ Y_VERIFY(!Eviction->ColumnName.empty());
}
};
@@ -44,12 +31,13 @@ public:
}
void SetPathTtl(ui64 pathId, TDescription&& descr) {
- if (descr.Enabled()) {
- auto it = Columns.find(descr.ColumnName);
+ if (descr.Eviction) {
+ auto& evict = descr.Eviction;
+ auto it = Columns.find(evict->ColumnName);
if (it != Columns.end()) {
- descr.ColumnName = *it; // replace string dups (memory efficiency)
+ evict->ColumnName = *it; // replace string dups (memory efficiency)
} else {
- Columns.insert(descr.ColumnName);
+ Columns.insert(evict->ColumnName);
}
PathTtls[pathId] = descr;
} else {
@@ -61,18 +49,16 @@ public:
PathTtls.erase(pathId);
}
- THashMap<ui64, NOlap::TTiersInfo> MakeIndexTtlMap(TInstant now, bool force = false) {
+ void AddTtls(THashMap<ui64, NOlap::TTiering>& eviction, TInstant now, bool force = false) {
if ((now < LastRegularTtl + TtlTimeout) && !force) {
- return {};
+ return;
}
- THashMap<ui64, NOlap::TTiersInfo> out;
for (auto& [pathId, descr] : PathTtls) {
- out.emplace(pathId, Convert(descr, now));
+ eviction[pathId].Ttl = Convert(descr, now);
}
LastRegularTtl = now;
- return out;
}
void Repeat() {
@@ -89,16 +75,13 @@ private:
TDuration RepeatTtlTimeout{TDuration::Seconds(DEFAULT_REPEAT_TTL_TIMEOUT_SEC)};
TInstant LastRegularTtl;
- NOlap::TTiersInfo Convert(const TDescription& descr, TInstant timePoint) const {
- Y_VERIFY(descr.Enabled());
- NOlap::TTiersInfo out(descr.ColumnName);
-
- for (auto& tier : descr.Evictions) {
- auto border = timePoint - tier.EvictAfter;
- out.AddTier(tier.TierName, border);
+ std::shared_ptr<NOlap::TTierInfo> Convert(const TDescription& descr, TInstant timePoint) const {
+ if (descr.Eviction) {
+ auto& evict = descr.Eviction;
+ auto border = timePoint - evict->EvictAfter;
+ return NOlap::TTierInfo::MakeTtl(border, evict->ColumnName);
}
-
- return out;
+ return {};
}
};
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/columnshard_ut_common.cpp
index 21d31c7f1ae..9701ff55a1c 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.cpp
@@ -300,9 +300,9 @@ NMetadata::NFetcher::ISnapshot::TPtr TTestSchema::BuildSnapshot(const TTableSpec
tRule.SetTieringRuleId("Tiering1");
for (auto&& tier : specials.Tiers) {
if (!tRule.GetDefaultColumn()) {
- tRule.SetDefaultColumn(tier.GetTtlColumn());
+ tRule.SetDefaultColumn(tier.TtlColumn);
}
- Y_VERIFY(tRule.GetDefaultColumn() == tier.GetTtlColumn());
+ Y_VERIFY(tRule.GetDefaultColumn() == tier.TtlColumn);
{
NColumnShard::NTiers::TTierConfig tConfig;
tConfig.SetTierName(tier.Name);
@@ -319,7 +319,7 @@ NMetadata::NFetcher::ISnapshot::TPtr TTestSchema::BuildSnapshot(const TTableSpec
}
cs->MutableTierConfigs().emplace(tConfig.GetTierName(), tConfig);
}
- tRule.AddInterval(tier.Name, TDuration::Seconds(tier.GetEvictAfterSecondsUnsafe()));
+ tRule.AddInterval(tier.Name, TDuration::Seconds((*tier.EvictAfter).Seconds()));
}
cs->MutableTableTierings().emplace(tRule.GetTieringRuleId(), tRule);
return cs;
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h
index 8a8b1873d6c..210881dd1f9 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.h
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.h
@@ -28,9 +28,8 @@ struct TTestSchema {
static inline const TString DefaultTtlColumn = "saved_at";
struct TStorageTier {
- YDB_ACCESSOR(TString, TtlColumn, DefaultTtlColumn);
- YDB_OPT(ui32, EvictAfterSeconds);
- public:
+ TString TtlColumn = DefaultTtlColumn;
+ std::optional<TDuration> EvictAfter;
TString Name;
TString Codec;
std::optional<int> CompressionLevel;
@@ -38,7 +37,6 @@ struct TTestSchema {
TStorageTier(const TString& name = {})
: Name(name)
-
{}
NKikimrSchemeOp::EColumnCodec GetCodecId() const {
@@ -63,6 +61,11 @@ struct TTestSchema {
}
return *this;
}
+
+ TStorageTier& SetTtlColumn(const TString& columnName) {
+ TtlColumn = columnName;
+ return *this;
+ }
};
struct TTableSpecials : public TStorageTier {
@@ -73,7 +76,7 @@ struct TTestSchema {
}
bool HasTtl() const {
- return !HasTiers() && HasEvictAfterSeconds();
+ return EvictAfter.has_value();
}
TTableSpecials WithCodec(const TString& codec) {
@@ -81,6 +84,11 @@ struct TTestSchema {
out.SetCodec(codec);
return out;
}
+
+ TTableSpecials& SetTtl(std::optional<TDuration> ttl) {
+ EvictAfter = ttl;
+ return *this;
+ }
};
static auto YdbSchema(const std::pair<TString, TTypeInfo>& firstKeyItem = {"timestamp", TTypeInfo(NTypeIds::Timestamp) }) {
@@ -197,16 +205,26 @@ struct TTestSchema {
}
}
- static void InitTtl(const TTableSpecials& specials, NKikimrSchemeOp::TColumnDataLifeCycle* ttlSettings) {
- ttlSettings->SetVersion(1);
- auto* enable = ttlSettings->MutableEnabled();
- enable->SetColumnName(specials.GetTtlColumn());
- enable->SetExpireAfterSeconds(specials.GetEvictAfterSecondsUnsafe());
+ static void InitTtl(const TTableSpecials& specials, NKikimrSchemeOp::TColumnDataLifeCycle::TTtl* ttl) {
+ Y_VERIFY(specials.HasTtl());
+ Y_VERIFY(!specials.TtlColumn.empty());
+ ttl->SetColumnName(specials.TtlColumn);
+ ttl->SetExpireAfterSeconds((*specials.EvictAfter).Seconds());
}
- static void InitTiers(const TTableSpecials& specials, NKikimrSchemeOp::TColumnDataLifeCycle* ttlSettings) {
- Y_VERIFY(specials.HasTiers());
- ttlSettings->MutableTiering()->SetUseTiering("Tiering1");
+ static bool InitTiersAndTtl(const TTableSpecials& specials, NKikimrSchemeOp::TColumnDataLifeCycle* ttlSettings) {
+ ttlSettings->SetVersion(1);
+ if (specials.HasTiers()) {
+ ttlSettings->MutableTiering()->SetUseTiering("Tiering1");
+ if (specials.HasTtl()) {
+ InitTtl(specials, ttlSettings->MutableTiering()->MutableTtl());
+ }
+ return true;
+ } else if (specials.HasTtl()) {
+ InitTtl(specials, ttlSettings->MutableEnabled());
+ return true;
+ }
+ return false;
}
static TString CreateTableTxBody(ui64 pathId, const TVector<std::pair<TString, TTypeInfo>>& columns,
@@ -225,11 +243,9 @@ struct TTestSchema {
InitSchema(columns, pk, specials, preset->MutableSchema());
}
- if (specials.HasTtl()) {
- InitTtl(specials, table->MutableTtlSettings());
- } else if (specials.HasTiers()) {
- InitTiers(specials, table->MutableTtlSettings());
- }
+ InitTiersAndTtl(specials, table->MutableTtlSettings());
+
+ Cerr << "CreateTable: " << tx << "\n";
TString out;
Y_PROTOBUF_SUPPRESS_NODISCARD tx.SerializeToString(&out);
@@ -245,11 +261,9 @@ struct TTestSchema {
table->SetPathId(pathId);
InitSchema(columns, pk, specials, table->MutableSchema());
- if (specials.HasTtl()) {
- InitTtl(specials, table->MutableTtlSettings());
- } else if (specials.HasTiers()) {
- InitTiers(specials, table->MutableTtlSettings());
- }
+ InitTiersAndTtl(specials, table->MutableTtlSettings());
+
+ Cerr << "CreateInitShard: " << tx << "\n";
TString out;
Y_PROTOBUF_SUPPRESS_NODISCARD tx.SerializeToString(&out);
@@ -264,10 +278,9 @@ struct TTestSchema {
table->SetPathId(pathId);
InitSchema(columns, pk, specials, table->MutableSchema());
+ InitTiersAndTtl(specials, table->MutableTtlSettings());
- if (specials.HasTtl()) {
- InitTtl(specials, table->MutableTtlSettings());
- }
+ Cerr << "CreateStandaloneTable: " << tx << "\n";
TString out;
Y_PROTOBUF_SUPPRESS_NODISCARD tx.SerializeToString(&out);
@@ -281,18 +294,12 @@ struct TTestSchema {
tx.MutableSeqNo()->SetRound(version);
auto* ttlSettings = table->MutableTtlSettings();
- ttlSettings->SetVersion(version);
-
- if (specials.HasTtl()) {
- auto* enable = ttlSettings->MutableEnabled();
- enable->SetColumnName(specials.GetTtlColumn());
- enable->SetExpireAfterSeconds(specials.GetEvictAfterSecondsUnsafe());
- } else if (specials.HasTiers()) {
- ttlSettings->MutableTiering()->SetUseTiering("Tiering1");
- } else {
+ if (!InitTiersAndTtl(specials, ttlSettings)) {
ttlSettings->MutableDisabled();
}
+ Cerr << "AlterTable: " << tx << "\n";
+
TString out;
Y_PROTOBUF_SUPPRESS_NODISCARD tx.SerializeToString(&out);
return out;
diff --git a/ydb/core/tx/columnshard/compaction_actor.cpp b/ydb/core/tx/columnshard/compaction_actor.cpp
index 3ef17bac055..8b5c25b3349 100644
--- a/ydb/core/tx/columnshard/compaction_actor.cpp
+++ b/ydb/core/tx/columnshard/compaction_actor.cpp
@@ -60,7 +60,8 @@ public:
Y_VERIFY(blobData.size() == blobId.Size, "%u vs %u", (ui32)blobData.size(), blobId.Size);
Blobs[blobId] = blobData;
} else {
- LOG_S_ERROR("TEvReadBlobRangeResult cannot get blob " << blobId.ToString() << " status " << event.Status
+ LOG_S_ERROR("TEvReadBlobRangeResult cannot get blob " << blobId.ToString()
+ << " status " << NKikimrProto::EReplyStatus_Name(event.Status)
<< " at tablet " << TabletId << " (compaction)");
TxEvent->PutStatus = event.Status;
if (TxEvent->PutStatus == NKikimrProto::UNKNOWN) {
diff --git a/ydb/core/tx/columnshard/engines/column_engine.cpp b/ydb/core/tx/columnshard/engines/column_engine.cpp
index 905a57f27f1..d24839efc8a 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine.cpp
@@ -1,18 +1,6 @@
#include "column_engine.h"
#include <util/stream/output.h>
-namespace NKikimr::NOlap {
-
-TString TTiersInfo::GetDebugString() const {
- TStringBuilder sb;
- sb << "column=" << Column << ";";
- for (auto&& i : TierBorders) {
- sb << "tname=" << i.TierName << ";eborder=" << i.EvictBorder << ";";
- }
- return sb;
-}
-}
-
template <>
void Out<NKikimr::NOlap::TColumnEngineChanges>(IOutputStream& out, TTypeTraits<NKikimr::NOlap::TColumnEngineChanges>::TFuncParam changes) {
if (ui32 switched = changes.SwitchedPortions.size()) {
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index 603b49f2cd5..f7e9260dfe3 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -1,5 +1,6 @@
#pragma once
#include "defs.h"
+#include "tier_info.h"
#include "index_info.h"
#include "portion_info.h"
#include "db_wrapper.h"
@@ -59,56 +60,15 @@ struct TCompactionInfo {
}
};
-struct TTiersInfo {
- struct TTierTimeBorder {
- TString TierName;
- TInstant EvictBorder;
-
- TTierTimeBorder(TString tierName, TInstant evictBorder)
- : TierName(tierName)
- , EvictBorder(evictBorder)
- {}
-
- std::shared_ptr<arrow::Scalar> ToTimestamp() const {
- if (Scalar) {
- return Scalar;
- }
-
- Scalar = std::make_shared<arrow::TimestampScalar>(
- EvictBorder.MicroSeconds(), arrow::timestamp(arrow::TimeUnit::MICRO));
- return Scalar;
- }
-
- private:
- mutable std::shared_ptr<arrow::Scalar> Scalar;
- };
-
- TString Column;
- std::vector<TTierTimeBorder> TierBorders; // Ordered tiers from hottest to coldest
-
- TString GetDebugString() const;
-
- TTiersInfo(const TString& column, TInstant border = {}, const TString& tierName = {})
- : Column(column)
- {
- if (border) {
- AddTier(tierName, border);
- }
- }
-
- void AddTier(const TString& tierName, TInstant border) {
- TierBorders.emplace_back(TTierTimeBorder(tierName, border));
- }
-};
-
struct TPortionEvictionFeatures {
const TString TargetTierName;
const ui64 PathId; // portion path id for cold-storage-key construct
+ bool DataChanges = true;
+
TPortionEvictionFeatures(const TString& targetTierName, const ui64 pathId)
: TargetTierName(targetTierName)
- , PathId(pathId) {
-
- }
+ , PathId(pathId)
+ {}
};
class TColumnEngineChanges {
@@ -361,7 +321,7 @@ public:
const TSnapshot& outdatedSnapshot) = 0;
virtual std::shared_ptr<TColumnEngineChanges> StartCleanup(const TSnapshot& snapshot,
THashSet<ui64>& pathsToDrop) = 0;
- virtual std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiersInfo>& pathTtls,
+ virtual std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiering>& pathEviction,
ui64 maxBytesToEvict = TCompactionLimits::DEFAULT_EVICTION_BYTES) = 0;
virtual bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) = 0;
virtual void UpdateDefaultSchema(const TSnapshot& snapshot, TIndexInfo&& info) = 0;
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 60da8ba39b0..ea93b8b39a8 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -62,16 +62,19 @@ std::shared_ptr<arrow::RecordBatch> AddSpecials(const std::shared_ptr<arrow::Rec
return NArrow::ExtractColumns(batch, indexInfo.ArrowSchemaWithSpecials());
}
-bool UpdateEvictedPortion(TPortionInfo& portionInfo, const TIndexInfo& indexInfo, const TString& tierName,
- const THashMap<TBlobRange, TString>& srcBlobs,
+bool UpdateEvictedPortion(TPortionInfo& portionInfo, const TIndexInfo& indexInfo,
+ TPortionEvictionFeatures& evictFeatures, const THashMap<TBlobRange, TString>& srcBlobs,
TVector<TColumnRecord>& evictedRecords, TVector<TString>& newBlobs)
{
- Y_VERIFY(portionInfo.TierName != tierName);
+ Y_VERIFY(portionInfo.TierName != evictFeatures.TargetTierName);
- auto compression = indexInfo.GetTierCompression(tierName);
+ auto* tiering = indexInfo.GetTiering(evictFeatures.PathId);
+ Y_VERIFY(tiering);
+ auto compression = tiering->GetCompression(evictFeatures.TargetTierName);
if (!compression) {
- // Noting to recompress. We have no other kinds of evictions yet. Return.
- portionInfo.TierName = tierName;
+ // Noting to recompress. We have no other kinds of evictions yet.
+ portionInfo.TierName = evictFeatures.TargetTierName;
+ evictFeatures.DataChanges = false;
return true;
}
@@ -82,7 +85,6 @@ bool UpdateEvictedPortion(TPortionInfo& portionInfo, const TIndexInfo& indexInfo
TPortionInfo undo = portionInfo;
size_t undoSize = newBlobs.size();
- std::vector<TString> blobs;
for (auto& rec : portionInfo.Records) {
auto colName = indexInfo.GetColumnName(rec.ColumnId);
std::string name(colName.data(), colName.size());
@@ -102,11 +104,11 @@ bool UpdateEvictedPortion(TPortionInfo& portionInfo, const TIndexInfo& indexInfo
evictedRecords.emplace_back(std::move(rec));
}
- portionInfo.AddMetadata(indexInfo, batch, tierName);
+ portionInfo.AddMetadata(indexInfo, batch, evictFeatures.TargetTierName);
return true;
}
-TVector<TPortionInfo> MakeAppendedPortions(const TIndexInfo& indexInfo,
+TVector<TPortionInfo> MakeAppendedPortions(ui64 pathId, const TIndexInfo& indexInfo,
std::shared_ptr<arrow::RecordBatch> batch,
ui64 granule,
const TSnapshot& minSnapshot,
@@ -115,12 +117,17 @@ TVector<TPortionInfo> MakeAppendedPortions(const TIndexInfo& indexInfo,
auto schema = indexInfo.ArrowSchemaWithSpecials();
TVector<TPortionInfo> out;
- TString tierName = indexInfo.GetTierName(0);
- auto compression = indexInfo.GetTierCompression(0);
- if (!compression) {
- compression = indexInfo.GetDefaultCompression();
+ TString tierName;
+ TCompression compression = indexInfo.GetDefaultCompression();
+ if (pathId) {
+ if (auto* tiering = indexInfo.GetTiering(pathId)) {
+ tierName = tiering->GetHottestTierName();
+ if (auto tierCompression = tiering->GetCompression(tierName)) {
+ compression = *tierCompression;
+ }
+ }
}
- auto writeOptions = WriteOptions(*compression);
+ auto writeOptions = WriteOptions(compression);
std::shared_ptr<arrow::RecordBatch> portionBatch = batch;
for (i32 pos = 0; pos < batch->num_rows();) {
@@ -780,9 +787,9 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T
return changes;
}
-std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THashMap<ui64, TTiersInfo>& pathTtls,
+std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THashMap<ui64, TTiering>& pathEviction,
ui64 maxEvictBytes) {
- if (pathTtls.empty()) {
+ if (pathEviction.empty()) {
return {};
}
@@ -793,18 +800,18 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash
ui64 dropBlobs = 0;
bool allowDrop = true;
- for (auto& [pathId, ttl] : pathTtls) {
+ for (auto& [pathId, ttl] : pathEviction) {
if (!PathGranules.count(pathId)) {
continue; // It's not an error: allow TTL over multiple shards with different pathIds presented
}
- if (!IndexInfo.AllowTtlOverColumn(ttl.Column)) {
- continue;
- }
+ auto expireTimestamp = ttl.ExpireTimestamp();
+ Y_VERIFY(expireTimestamp);
- Y_VERIFY(!ttl.TierBorders.empty());
+ auto ttlColumnNames = ttl.GetTtlColumns();
+ Y_VERIFY(ttlColumnNames.size() == 1); // TODO: support different ttl columns
+ ui32 ttlColumnId = IndexInfo.GetColumnId(*ttlColumnNames.begin());
- ui32 ttlColumnId = IndexInfo.GetColumnId(ttl.Column);
for (const auto& [ts, granule] : PathGranules[pathId]) {
auto spg = Granules[granule];
Y_VERIFY(spg);
@@ -818,19 +825,27 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash
allowDrop = (dropBlobs <= TCompactionLimits::MAX_BLOBS_TO_DELETE);
if (auto max = info.MaxValue(ttlColumnId)) {
- bool keep = false;
- for (auto& border : ttl.TierBorders) {
- if (NArrow::ScalarLess(*border.ToTimestamp(), *max)) {
- keep = true;
- if (allowEviction && info.TierName != border.TierName) {
- evicttionSize += info.BlobsSizes().first;
- changes->PortionsToEvict.emplace_back(info, TPortionEvictionFeatures(border.TierName, pathId));
+ bool keep = NArrow::ScalarLess(expireTimestamp, max);
+
+ if (keep && allowEviction) {
+ TString tierName;
+ for (auto& tierRef : ttl.OrderedTiers) { // TODO: lower/upper_bound + move into TEviction
+ auto& tierInfo = tierRef.Get();
+ if (!IndexInfo.AllowTtlOverColumn(tierInfo.Column)) {
+ continue; // Ignore tiers with bad ttl column
+ }
+ if (NArrow::ScalarLess(tierInfo.EvictTimestamp(), max)) {
+ tierName = tierInfo.Name;
+ } else {
+ break;
}
- break;
+ }
+ if (info.TierName != tierName) {
+ evicttionSize += info.BlobsSizes().first;
+ changes->PortionsToEvict.emplace_back(info, TPortionEvictionFeatures(tierName, pathId));
}
}
if (!keep && allowDrop) {
- Y_VERIFY(!NArrow::ScalarLess(*ttl.TierBorders.back().ToTimestamp(), *max));
dropBlobs += info.NumRecords();
changes->PortionsToDrop.push_back(info);
}
@@ -1537,7 +1552,7 @@ TVector<TString> TColumnEngineForLogs::IndexBlobs(const TIndexInfo& indexInfo,
auto granuleBatches = SliceIntoGranules(merged, changes->PathToGranule[pathId], indexInfo);
for (auto& [granule, batch] : granuleBatches) {
- auto portions = MakeAppendedPortions(indexInfo, batch, granule, minSnapshot, blobs);
+ auto portions = MakeAppendedPortions(pathId, indexInfo, batch, granule, minSnapshot, blobs);
Y_VERIFY(portions.size() > 0);
for (auto& portion : portions) {
changes->AppendedPortions.emplace_back(std::move(portion));
@@ -1572,6 +1587,7 @@ static std::shared_ptr<arrow::RecordBatch> CompactInOneGranule(const TIndexInfo&
static TVector<TString> CompactInGranule(const TIndexInfo& indexInfo,
std::shared_ptr<TColumnEngineForLogs::TChanges> changes) {
+ ui64 pathId = changes->SrcGranule->PathId;
TVector<TString> blobs;
auto& switchedProtions = changes->SwitchedPortions;
Y_VERIFY(switchedProtions.size());
@@ -1589,13 +1605,13 @@ static TVector<TString> CompactInGranule(const TIndexInfo& indexInfo,
if (!slice || slice->num_rows() == 0) {
continue;
}
- auto tmp = MakeAppendedPortions(indexInfo, slice, granule, TSnapshot{}, blobs);
+ auto tmp = MakeAppendedPortions(pathId, indexInfo, slice, granule, TSnapshot{}, blobs);
for (auto&& portionInfo : tmp) {
portions.emplace_back(std::move(portionInfo));
}
}
} else {
- portions = MakeAppendedPortions(indexInfo, batch, granule, TSnapshot{}, blobs);
+ portions = MakeAppendedPortions(pathId, indexInfo, batch, granule, TSnapshot{}, blobs);
}
Y_VERIFY(portions.size() > 0);
@@ -1909,7 +1925,7 @@ static TVector<TString> CompactSplitGranule(const TIndexInfo& indexInfo,
for (auto& batch : idBatches[id]) {
// Cannot set snapshot here. It would be set in committing transaction in ApplyChanges().
- auto newPortions = MakeAppendedPortions(indexInfo, batch, tmpGranule, TSnapshot{}, blobs);
+ auto newPortions = MakeAppendedPortions(pathId, indexInfo, batch, tmpGranule, TSnapshot{}, blobs);
Y_VERIFY(newPortions.size() > 0);
for (auto& portion : newPortions) {
changes->AppendedPortions.emplace_back(std::move(portion));
@@ -1925,7 +1941,7 @@ static TVector<TString> CompactSplitGranule(const TIndexInfo& indexInfo,
ui64 tmpGranule = changes->SetTmpGranule(pathId, ts);
// Cannot set snapshot here. It would be set in committing transaction in ApplyChanges().
- auto portions = MakeAppendedPortions(indexInfo, batch, tmpGranule, TSnapshot{}, blobs);
+ auto portions = MakeAppendedPortions(pathId, indexInfo, batch, tmpGranule, TSnapshot{}, blobs);
Y_VERIFY(portions.size() > 0);
for (auto& portion : portions) {
changes->AppendedPortions.emplace_back(std::move(portion));
@@ -1963,13 +1979,14 @@ TVector<TString> TColumnEngineForLogs::EvictBlobs(const TIndexInfo& indexInfo,
TVector<std::pair<TPortionInfo, TPortionEvictionFeatures>> evicted;
evicted.reserve(changes->PortionsToEvict.size());
- for (auto& [portionInfo, evFeatures] : changes->PortionsToEvict) {
+ for (auto& [portionInfo, evictFeatures] : changes->PortionsToEvict) {
Y_VERIFY(!portionInfo.Empty());
Y_VERIFY(portionInfo.IsActive());
- if (UpdateEvictedPortion(portionInfo, indexInfo, evFeatures.TargetTierName, changes->Blobs, changes->EvictedRecords, newBlobs)) {
- Y_VERIFY(portionInfo.TierName == evFeatures.TargetTierName);
- evicted.emplace_back(std::move(portionInfo), evFeatures);
+ if (UpdateEvictedPortion(portionInfo, indexInfo, evictFeatures, changes->Blobs,
+ changes->EvictedRecords, newBlobs)) {
+ Y_VERIFY(portionInfo.TierName == evictFeatures.TargetTierName);
+ evicted.emplace_back(std::move(portionInfo), evictFeatures);
}
}
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index a09710ad213..7e328c4050b 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -247,7 +247,7 @@ public:
const TSnapshot& outdatedSnapshot) override;
std::shared_ptr<TColumnEngineChanges> StartCleanup(const TSnapshot& snapshot,
THashSet<ui64>& pathsToDrop) override;
- std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiersInfo>& pathTtls,
+ std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiering>& pathEviction,
ui64 maxEvictBytes = TCompactionLimits::DEFAULT_EVICTION_BYTES) override;
bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges,
const TSnapshot& snapshot) override;
diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h
index 3828bb946d0..68855e43c6e 100644
--- a/ydb/core/tx/columnshard/engines/index_info.h
+++ b/ydb/core/tx/columnshard/engines/index_info.h
@@ -1,6 +1,7 @@
#pragma once
#include "defs.h"
#include "scalars.h"
+#include "tier_info.h"
#include <ydb/core/tablet_flat/flat_dbase_scheme.h>
#include <ydb/core/sys_view/common/schema.h>
@@ -50,16 +51,6 @@ GetColumns(const NTable::TScheme::TTableSchema& tableSchema, const TVector<ui32>
struct TInsertedData;
-struct TCompression {
- arrow::Compression::type Codec{arrow::Compression::LZ4_FRAME};
- std::optional<int> Level;
-};
-
-struct TStorageTier {
- TString Name;
- std::optional<TCompression> Compression;
-};
-
/// Column engine index description in terms of tablet's local table.
/// We have to use YDB types for keys here.
struct TIndexInfo : public NTable::TScheme::TTableSchema {
@@ -193,41 +184,16 @@ struct TIndexInfo : public NTable::TScheme::TTableSchema {
void SetDefaultCompression(const TCompression& compression) { DefaultCompression = compression; }
const TCompression& GetDefaultCompression() const { return DefaultCompression; }
- std::optional<TCompression> GetTierCompression(ui32 tierNo) const {
- if (!Tiers.empty()) {
- Y_VERIFY(tierNo < Tiers.size());
- return Tiers[tierNo].Compression;
- }
- return {};
- }
-
- std::optional<TCompression> GetTierCompression(const TString& tierName) const {
- if (tierName.empty()) {
- return {};
- }
- ui32 tierNo = GetTierNumber(tierName);
- Y_VERIFY(tierNo != Max<ui32>());
- return GetTierCompression(tierNo);
- }
-
- TString GetTierName(ui32 tierNo) const {
- if (!Tiers.empty()) {
- Y_VERIFY(tierNo < Tiers.size());
- return Tiers[tierNo].Name;
- }
- return {};
- }
-
- void AddStorageTier(TStorageTier&& tier) {
- TierByName[tier.Name] = Tiers.size();
- Tiers.emplace_back(std::move(tier));
+ void SetTiering(THashMap<ui64, TTiering>&& pathTierings) {
+ PathTiering = std::move(pathTierings);
}
- ui32 GetTierNumber(const TString& tierName) const {
- if (auto it = TierByName.find(tierName); it != TierByName.end()) {
- return it->second;
+ const TTiering* GetTiering(ui64 pathId) const {
+ auto it = PathTiering.find(pathId);
+ if (it != PathTiering.end()) {
+ return &it->second;
}
- return Max<ui32>();
+ return nullptr;
}
private:
@@ -242,8 +208,7 @@ private:
THashSet<TString> RequiredColumns;
THashSet<ui32> MinMaxIdxColumnsIds;
TCompression DefaultCompression;
- std::vector<TStorageTier> Tiers;
- THashMap<TString, ui32> TierByName;
+ THashMap<ui64, TTiering> PathTiering;
void AddRequiredColumns(const TVector<TString>& columns) {
for (auto& name: columns) {
diff --git a/ydb/core/tx/columnshard/engines/tier_info.h b/ydb/core/tx/columnshard/engines/tier_info.h
new file mode 100644
index 00000000000..6c80fd0215d
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/tier_info.h
@@ -0,0 +1,157 @@
+#pragma once
+#include "defs.h"
+#include "scalars.h"
+
+namespace NKikimr::NOlap {
+
+struct TCompression {
+ arrow::Compression::type Codec{arrow::Compression::LZ4_FRAME};
+ std::optional<int> Level;
+};
+
+struct TTierInfo {
+ TString Name;
+ TInstant EvictBorder;
+ TString Column;
+ std::optional<TCompression> Compression;
+
+ TTierInfo(const TString& tierName, TInstant evictBorder, const TString& column)
+ : Name(tierName)
+ , EvictBorder(evictBorder)
+ , Column(column)
+ {
+ Y_VERIFY(!Name.empty());
+ Y_VERIFY(!Column.empty());
+ }
+
+ std::shared_ptr<arrow::Scalar> EvictTimestamp() const {
+ if (Scalar) {
+ return Scalar;
+ }
+
+ Scalar = std::make_shared<arrow::TimestampScalar>(
+ EvictBorder.MicroSeconds(), arrow::timestamp(arrow::TimeUnit::MICRO));
+ return Scalar;
+ }
+
+ static std::shared_ptr<TTierInfo> MakeTtl(TInstant ttlBorder, const TString& ttlColumn) {
+ return std::make_shared<TTierInfo>("TTL", ttlBorder, ttlColumn);
+ }
+
+ TString GetDebugString() const {
+ TStringBuilder sb;
+ sb << "tier name '" << Name << "' border '" << EvictBorder << "' column '" << Column << "' "
+ << arrow::util::Codec::GetCodecAsString(Compression ? Compression->Codec : TCompression().Codec)
+ << ":" << ((Compression && Compression->Level) ?
+ *Compression->Level : arrow::util::kUseDefaultCompressionLevel);
+ return sb;
+ }
+
+private:
+ mutable std::shared_ptr<arrow::Scalar> Scalar;
+};
+
+struct TTierRef {
+ TTierRef(const std::shared_ptr<TTierInfo>& tierInfo)
+ : Info(tierInfo)
+ {
+ Y_VERIFY(tierInfo);
+ }
+
+ bool operator < (const TTierRef& b) const {
+ if (Info->EvictBorder < b.Info->EvictBorder) {
+ return true;
+ } else if (Info->EvictBorder == b.Info->EvictBorder) {
+ return Info->Name > b.Info->Name; // add stability: smaller name is hotter
+ }
+ return false;
+ }
+
+ bool operator == (const TTierRef& b) const {
+ return Info->EvictBorder == b.Info->EvictBorder
+ && Info->Name == b.Info->Name;
+ }
+
+ const TTierInfo& Get() const {
+ return *Info;
+ }
+
+private:
+ std::shared_ptr<TTierInfo> Info;
+};
+
+struct TTiering {
+ THashMap<TString, std::shared_ptr<TTierInfo>> TierByName;
+ TSet<TTierRef> OrderedTiers; // Tiers ordered by border
+ std::shared_ptr<TTierInfo> Ttl;
+
+ static TTiering MakeTtl(TInstant ttlBorder, const TString& ttlColumn) {
+ TTiering out;
+ out.Ttl = TTierInfo::MakeTtl(ttlBorder, ttlColumn);
+ return out;
+ }
+
+ bool Empty() const {
+ return OrderedTiers.empty();
+ }
+
+ void Add(const std::shared_ptr<TTierInfo>& tier) {
+ if (!Empty()) {
+ Y_VERIFY(tier->Column == OrderedTiers.begin()->Get().Column); // TODO: support different ttl columns
+ }
+
+ TierByName.emplace(tier->Name, tier);
+ OrderedTiers.emplace(tier);
+ }
+
+ TString GetHottestTierName() const {
+ if (OrderedTiers.size()) {
+ return OrderedTiers.rbegin()->Get().Name; // hottest one
+ }
+ return {};
+ }
+
+ std::shared_ptr<arrow::Scalar> ExpireTimestamp() const {
+ auto ttlTs = Ttl ? Ttl->EvictTimestamp() : nullptr;
+ auto tierTs = OrderedTiers.empty() ? nullptr : OrderedTiers.begin()->Get().EvictTimestamp();
+ if (!ttlTs) {
+ return tierTs;
+ } else if (!tierTs) {
+ return ttlTs;
+ }
+ return NArrow::ScalarLess(ttlTs, tierTs) ? tierTs : ttlTs; // either TTL or tier border appear
+ }
+
+ std::optional<TCompression> GetCompression(const TString& name) const {
+ auto it = TierByName.find(name);
+ if (it != TierByName.end()) {
+ Y_VERIFY(!name.empty());
+ return it->second->Compression;
+ }
+ return {};
+ }
+
+ THashSet<TString> GetTtlColumns() const {
+ THashSet<TString> out;
+ if (Ttl) {
+ out.insert(Ttl->Column);
+ }
+ for (auto& [tierName, tier] : TierByName) {
+ out.insert(tier->Column);
+ }
+ return out;
+ }
+
+ TString GetDebugString() const {
+ TStringBuilder sb;
+ if (Ttl) {
+ sb << "ttl border '" << Ttl->EvictBorder << "' column '" << Ttl->Column << "'; ";
+ }
+ for (auto&& i : OrderedTiers) {
+ sb << i.Get().GetDebugString() << "; ";
+ }
+ return sb;
+ }
+};
+
+}
diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
index 48286575d0a..b5e6188cd4f 100644
--- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
@@ -192,10 +192,6 @@ TIndexInfo TestTableInfo(const TVector<std::pair<TString, TTypeInfo>>& ydbSchema
return indexInfo;
}
-static NOlap::TTiersInfo MakeTtl(TInstant border) {
- return NOlap::TTiersInfo("timestamp", border);
-}
-
template <typename TKeyDataType>
class TBuilder {
public:
@@ -346,8 +342,8 @@ bool Cleanup(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, u
}
bool Ttl(TColumnEngineForLogs& engine, TTestDbWrapper& db,
- const THashMap<ui64, NOlap::TTiersInfo>& pathTtls, ui32 expectedToDrop) {
- std::shared_ptr<TColumnEngineChanges> changes = engine.StartTtl(pathTtls);
+ const THashMap<ui64, NOlap::TTiering>& pathEviction, ui32 expectedToDrop) {
+ std::shared_ptr<TColumnEngineChanges> changes = engine.StartTtl(pathEviction);
UNIT_ASSERT(changes);
UNIT_ASSERT_VALUES_EQUAL(changes->PortionsToDrop.size(), expectedToDrop);
@@ -708,8 +704,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
}
// TTL
- THashMap<ui64, NOlap::TTiersInfo> pathTtls;
- pathTtls.emplace(pathId, MakeTtl(TInstant::MicroSeconds(10000)));
+ THashMap<ui64, NOlap::TTiering> pathTtls;
+ pathTtls.emplace(pathId, TTiering::MakeTtl(TInstant::MicroSeconds(10000), "timestamp"));
Ttl(engine, db, pathTtls, 2);
// read + load + read
diff --git a/ydb/core/tx/columnshard/eviction_actor.cpp b/ydb/core/tx/columnshard/eviction_actor.cpp
index 4cdef39b0b7..a7affb58703 100644
--- a/ydb/core/tx/columnshard/eviction_actor.cpp
+++ b/ydb/core/tx/columnshard/eviction_actor.cpp
@@ -58,7 +58,8 @@ public:
Y_VERIFY(blobData.size() == blobId.Size, "%u vs %u", (ui32)blobData.size(), blobId.Size);
Blobs[blobId] = blobData;
} else {
- LOG_S_ERROR("TEvReadBlobRangeResult cannot get blob " << blobId.ToString() << " status " << event.Status
+ LOG_S_ERROR("TEvReadBlobRangeResult cannot get blob " << blobId.ToString()
+ << " status " << NKikimrProto::EReplyStatus_Name(event.Status)
<< " at tablet " << TabletId << " (eviction)");
TxEvent->PutStatus = event.Status;
if (TxEvent->PutStatus == NKikimrProto::UNKNOWN) {
diff --git a/ydb/core/tx/columnshard/export_actor.cpp b/ydb/core/tx/columnshard/export_actor.cpp
index 237819b98ee..acdb680d268 100644
--- a/ydb/core/tx/columnshard/export_actor.cpp
+++ b/ydb/core/tx/columnshard/export_actor.cpp
@@ -27,7 +27,8 @@ public:
auto& event = *ev->Get();
const TUnifiedBlobId& blobId = event.BlobRange.BlobId;
if (event.Status != NKikimrProto::EReplyStatus::OK) {
- LOG_S_ERROR("TEvReadBlobRangeResult cannot get blob " << blobId << " status " << event.Status
+ LOG_S_ERROR("TEvReadBlobRangeResult cannot get blob " << blobId
+ << " status " << NKikimrProto::EReplyStatus_Name(event.Status)
<< " at tablet " << TabletId << " (export)");
BlobsToRead.erase(blobId);
diff --git a/ydb/core/tx/columnshard/indexing_actor.cpp b/ydb/core/tx/columnshard/indexing_actor.cpp
index d8fe6da5b1d..d385d008dce 100644
--- a/ydb/core/tx/columnshard/indexing_actor.cpp
+++ b/ydb/core/tx/columnshard/indexing_actor.cpp
@@ -51,7 +51,8 @@ public:
auto& event = *ev->Get();
const TUnifiedBlobId& blobId = event.BlobRange.BlobId;
if (event.Status != NKikimrProto::EReplyStatus::OK) {
- LOG_S_ERROR("TEvReadBlobRangeResult cannot get blob " << blobId << " status " << event.Status
+ LOG_S_ERROR("TEvReadBlobRangeResult cannot get blob " << blobId
+ << " status " << NKikimrProto::EReplyStatus_Name(event.Status)
<< " at tablet " << TabletId << " (index)");
BlobsToRead.erase(blobId);
diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp
index 607d770a54e..2193632cad8 100644
--- a/ydb/core/tx/columnshard/read_actor.cpp
+++ b/ydb/core/tx/columnshard/read_actor.cpp
@@ -35,6 +35,15 @@ public:
auto& event = *ev->Get();
const TUnifiedBlobId& blobId = event.BlobRange.BlobId;
+
+ if (event.Status != NKikimrProto::EReplyStatus::OK) {
+ LOG_S_ERROR("TEvReadBlobRangeResult cannot get blob " << blobId
+ << " status " << NKikimrProto::EReplyStatus_Name(event.Status)
+ << " at tablet " << TabletId << " (read)");
+ SendErrorResult(ctx, NKikimrTxColumnShard::EResultStatus::ERROR);
+ return DieFinished(ctx);
+ }
+
Y_VERIFY(event.Data.size() == event.BlobRange.Size, "%zu, %d", event.Data.size(), event.BlobRange.Size);
if (IndexedBlobs.count(event.BlobRange)) {
@@ -78,6 +87,9 @@ public:
void SendErrorResult(const TActorContext& ctx, NKikimrTxColumnShard::EResultStatus status) {
Y_VERIFY(status != NKikimrTxColumnShard::EResultStatus::SUCCESS);
SendResult(ctx, {}, true, status);
+
+ WaitIndexed.clear();
+ WaitCommitted.clear();
}
void SendResult(const TActorContext& ctx, const std::shared_ptr<arrow::RecordBatch>& batch, bool finished = false,
@@ -88,6 +100,10 @@ public:
TString data;
if (batch) {
data = NArrow::SerializeBatchNoCompression(batch);
+
+ auto metadata = proto.MutableMeta();
+ metadata->SetFormat(NKikimrTxColumnShard::FORMAT_ARROW);
+ metadata->SetSchema(GetSerializedSchema(batch));
}
if (status == NKikimrTxColumnShard::EResultStatus::SUCCESS) {
@@ -100,21 +116,18 @@ public:
proto.SetFinished(finished);
++ReturnedBatchNo;
- auto metadata = proto.MutableMeta();
- metadata->SetFormat(NKikimrTxColumnShard::FORMAT_ARROW);
- metadata->SetSchema(GetSerializedSchema(batch));
if (finished) {
auto stats = ReadMetadata->ReadStats;
- auto* proto = metadata->MutableReadStats();
- proto->SetBeginTimestamp(stats->BeginTimestamp.MicroSeconds());
- proto->SetDurationUsec(stats->Duration().MicroSeconds());
- proto->SetSelectedIndex(stats->SelectedIndex);
- proto->SetIndexGranules(stats->IndexGranules);
- proto->SetIndexPortions(stats->IndexPortions);
- proto->SetIndexBatches(stats->IndexBatches);
- proto->SetNotIndexedBatches(stats->CommittedBatches);
- proto->SetUsedColumns(stats->UsedColumns);
- proto->SetDataBytes(stats->DataBytes);
+ auto* protoStats = proto.MutableMeta()->MutableReadStats();
+ protoStats->SetBeginTimestamp(stats->BeginTimestamp.MicroSeconds());
+ protoStats->SetDurationUsec(stats->Duration().MicroSeconds());
+ protoStats->SetSelectedIndex(stats->SelectedIndex);
+ protoStats->SetIndexGranules(stats->IndexGranules);
+ protoStats->SetIndexPortions(stats->IndexPortions);
+ protoStats->SetIndexBatches(stats->IndexBatches);
+ protoStats->SetNotIndexedBatches(stats->CommittedBatches);
+ protoStats->SetUsedColumns(stats->UsedColumns);
+ protoStats->SetDataBytes(stats->DataBytes);
}
if (Deadline != TInstant::Max()) {
@@ -197,9 +210,6 @@ public:
void SendTimeouts(const TActorContext& ctx) {
SendErrorResult(ctx, NKikimrTxColumnShard::EResultStatus::TIMEOUT);
-
- WaitCommitted.clear();
- IndexedBlobs.clear();
}
void SendReadRequest(const TActorContext& ctx, const NBlobCache::TBlobRange& blobRange) {
diff --git a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
index 1d28938a97f..a7e564f7ec8 100644
--- a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
+++ b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
@@ -17,7 +17,7 @@ using namespace NTxUT;
using namespace NColumnShard;
using NWrappers::NTestHelpers::TS3Mock;
-enum class EStartTtlSettings {
+enum class EInitialEviction {
None,
Ttl,
Tiering
@@ -61,10 +61,10 @@ bool TriggerTTL(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot s
return (res.GetStatus() == NKikimrTxColumnShard::SUCCESS);
}
-std::shared_ptr<arrow::Array> GetFirstPKColumn(const TString& blob, const TString& srtSchema,
- const std::string& columnName)
+std::shared_ptr<arrow::Array> DeserializeColumn(const TString& blob, const TString& strSchema,
+ const std::string& columnName)
{
- auto schema = NArrow::DeserializeSchema(srtSchema);
+ auto schema = NArrow::DeserializeSchema(strSchema);
auto batch = NArrow::DeserializeBatch(blob, schema);
UNIT_ASSERT(batch);
@@ -73,12 +73,12 @@ std::shared_ptr<arrow::Array> GetFirstPKColumn(const TString& blob, const TStrin
return array;
}
-bool CheckSame(const TString& blob, const TString& srtSchema, ui32 expectedSize,
+bool CheckSame(const TString& blob, const TString& strSchema, ui32 expectedSize,
const std::string& columnName, i64 seconds) {
auto expected = arrow::TimestampScalar(seconds * 1000 * 1000, arrow::timestamp(arrow::TimeUnit::MICRO));
UNIT_ASSERT_VALUES_EQUAL(expected.value, seconds * 1000 * 1000);
- auto tsCol = GetFirstPKColumn(blob, srtSchema, columnName);
+ auto tsCol = DeserializeColumn(blob, strSchema, columnName);
UNIT_ASSERT(tsCol);
UNIT_ASSERT_VALUES_EQUAL(tsCol->length(), expectedSize);
@@ -130,6 +130,8 @@ bool TestCreateTable(const TString& txBody, ui64 planStep = 1000, ui64 txId = 10
return ProposeSchemaTx(runtime, sender, txBody, {++planStep, ++txId});
}
+static constexpr ui32 PORTION_ROWS = 80 * 1000;
+
// ts[0] = 1600000000; // date -u --date='@1600000000' Sun Sep 13 12:26:40 UTC 2020
// ts[1] = 1620000000; // date -u --date='@1620000000' Mon May 3 00:00:00 UTC 2021
void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
@@ -162,9 +164,10 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
ttlSec -= (ts[0] + ts[1]) / 2; // enable internal ttl between ts1 and ts2
}
if (spec.HasTiers()) {
- spec.Tiers[0].SetEvictAfterSeconds(ttlSec);
+ spec.Tiers[0].EvictAfter = TDuration::Seconds(ttlSec);
} else {
- spec.SetEvictAfterSeconds(ttlSec);
+ UNIT_ASSERT(!spec.TtlColumn.empty());
+ spec.EvictAfter = TDuration::Seconds(ttlSec);
}
bool ok = ProposeSchemaTx(runtime, sender,
TTestSchema::CreateInitShardTxBody(tableId, testYdbSchema, testYdbPk, spec, "/Root/olapStore"),
@@ -176,8 +179,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
}
//
- ui32 portionSize = 80 * 1000;
- auto blobs = MakeData(ts, portionSize, portionSize / 2, spec.GetTtlColumn());
+ auto blobs = MakeData(ts, PORTION_ROWS, PORTION_ROWS / 2, spec.TtlColumn);
UNIT_ASSERT_EQUAL(blobs.size(), 2);
for (auto& data : blobs) {
UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, data));
@@ -192,9 +194,9 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
}
if (internal) {
- TriggerTTL(runtime, sender, {++planStep, ++txId}, {}, 0, spec.GetTtlColumn());
+ TriggerTTL(runtime, sender, {++planStep, ++txId}, {}, 0, spec.TtlColumn);
} else {
- TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[0] + 1, spec.GetTtlColumn());
+ TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[0] + 1, spec.TtlColumn);
}
TAutoPtr<IEventHandle> handle;
@@ -206,7 +208,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
{
--planStep;
auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, Max<ui64>(), tableId);
- Proto(read.get()).AddColumnNames(spec.GetTtlColumn());
+ Proto(read.get()).AddColumnNames(spec.TtlColumn);
ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release());
auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
@@ -216,20 +218,20 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0);
UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard);
UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
- UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(resRead.GetBatch(), 0);
UNIT_ASSERT_EQUAL(resRead.GetFinished(), true);
UNIT_ASSERT(resRead.GetData().size() > 0);
auto& schema = resRead.GetMeta().GetSchema();
- UNIT_ASSERT(CheckSame(resRead.GetData(), schema, portionSize, spec.GetTtlColumn(), ts[1]));
+ UNIT_ASSERT(CheckSame(resRead.GetData(), schema, PORTION_ROWS, spec.TtlColumn, ts[1]));
}
// Alter TTL
ttlSec = TInstant::Now().Seconds() - (ts[1] + 1);
if (spec.HasTiers()) {
- spec.Tiers[0].SetEvictAfterSeconds(ttlSec);
+ spec.Tiers[0].EvictAfter = TDuration::Seconds(ttlSec);
} else {
- spec.SetEvictAfterSeconds(ttlSec);
+ spec.EvictAfter = TDuration::Seconds(ttlSec);
}
ok = ProposeSchemaTx(runtime, sender,
TTestSchema::AlterTableTxBody(tableId, 2, spec),
@@ -241,15 +243,15 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
}
if (internal) {
- TriggerTTL(runtime, sender, {++planStep, ++txId}, {}, 0, spec.GetTtlColumn());
+ TriggerTTL(runtime, sender, {++planStep, ++txId}, {}, 0, spec.TtlColumn);
} else {
- TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[1] + 1, spec.GetTtlColumn());
+ TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[1] + 1, spec.TtlColumn);
}
{
--planStep;
auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, Max<ui64>(), tableId);
- Proto(read.get()).AddColumnNames(spec.GetTtlColumn());
+ Proto(read.get()).AddColumnNames(spec.TtlColumn);
ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release());
auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
@@ -259,9 +261,9 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0);
UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard);
UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
- UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(resRead.GetBatch(), 0);
UNIT_ASSERT_EQUAL(resRead.GetFinished(), true);
- UNIT_ASSERT_EQUAL(resRead.GetData().size(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(resRead.GetData().size(), 0);
}
// Disable TTL
@@ -279,15 +281,15 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
PlanCommit(runtime, sender, ++planStep, txId);
if (internal) {
- TriggerTTL(runtime, sender, {++planStep, ++txId}, {}, 0, spec.GetTtlColumn());
+ TriggerTTL(runtime, sender, {++planStep, ++txId}, {}, 0, spec.TtlColumn);
} else {
- TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[0] - 1, spec.GetTtlColumn());
+ TriggerTTL(runtime, sender, {++planStep, ++txId}, {tableId}, ts[0] - 1, spec.TtlColumn);
}
{
--planStep;
auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, Max<ui64>(), tableId);
- Proto(read.get()).AddColumnNames(spec.GetTtlColumn());
+ Proto(read.get()).AddColumnNames(spec.TtlColumn);
ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release());
auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
@@ -302,13 +304,12 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
UNIT_ASSERT(resRead.GetData().size() > 0);
auto& schema = resRead.GetMeta().GetSchema();
- UNIT_ASSERT(CheckSame(resRead.GetData(), schema, portionSize, spec.GetTtlColumn(), ts[0]));
+ UNIT_ASSERT(CheckSame(resRead.GetData(), schema, PORTION_ROWS, spec.TtlColumn, ts[0]));
}
}
class TCountersContainer {
private:
- ui32 RestartTabletOnPutData = 0;
ui32 SuccessCounterStart = 0;
public:
ui32 UnknownsCounter = 0;
@@ -316,32 +317,17 @@ public:
ui32 ErrorsCounter = 0;
ui32 ResponsesCounter = 0;
- TCountersContainer& SetRestartTabletOnPutData(const ui32 value) {
- RestartTabletOnPutData = value;
- return *this;
- }
-
- bool PopRestartTabletOnPutData() {
- if (!RestartTabletOnPutData) {
- return false;
- }
- --RestartTabletOnPutData;
- return true;
- }
TString SerializeToString() const {
TStringBuilder sb;
sb << "EXPORTS INFO: " << SuccessCounter << "/" << ErrorsCounter << "/" << UnknownsCounter << "/" << ResponsesCounter;
return sb;
}
- void WaitEvents(TTestBasicRuntime& runtime, const TActorId sender, const ui32 attemption, const ui32 expectedDeltaSuccess, const TDuration timeout) {
+ void WaitEvents(TTestBasicRuntime& runtime, const ui32 attemption, const ui32 expectedDeltaSuccess, const TDuration timeout) {
const TInstant startInstant = TAppData::TimeProvider->Now();
const TInstant deadline = startInstant + timeout;
Cerr << "START_WAITING(" << attemption << "): " << SerializeToString() << Endl;
while (TAppData::TimeProvider->Now() < deadline) {
- if (PopRestartTabletOnPutData()) {
- RebootTablet(runtime, TTestTxConfig::TxTablet0, sender);
- }
Cerr << "IN_WAITING(" << attemption << "):" << SerializeToString() << Endl;
runtime.SimulateSleep(TDuration::Seconds(1));
UNIT_ASSERT(ErrorsCounter == 0);
@@ -358,7 +344,7 @@ public:
if (expectedDeltaSuccess) {
UNIT_ASSERT(SuccessCounter >= SuccessCounterStart + expectedDeltaSuccess);
} else {
- UNIT_ASSERT(SuccessCounter == SuccessCounterStart);
+ UNIT_ASSERT_VALUES_EQUAL(SuccessCounter, SuccessCounterStart);
}
Cerr << "FINISH_WAITING(" << attemption << "): " << SerializeToString() << Endl;
SuccessCounterStart = SuccessCounter;
@@ -370,7 +356,7 @@ private:
TCountersContainer* Counters = nullptr;
TTestBasicRuntime& Runtime;
const TActorId Sender;
-private:
+
template <class TPrivateEvent>
static TPrivateEvent* TryGetPrivateEvent(TAutoPtr<IEventHandle>& ev) {
if (ev->GetTypeRewrite() != TPrivateEvent::EventType) {
@@ -378,8 +364,8 @@ private:
}
return dynamic_cast<TPrivateEvent*>(ev->GetBase());
}
-public:
+public:
TEventsCounter(TCountersContainer& counters, TTestBasicRuntime& runtime, const TActorId sender)
: Counters(&counters)
, Runtime(runtime)
@@ -388,6 +374,7 @@ public:
Y_UNUSED(Runtime);
Y_UNUSED(Sender);
}
+
bool operator()(TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
TStringBuilder ss;
if (auto* msg = TryGetPrivateEvent<NColumnShard::TEvPrivate::TEvExport>(ev)) {
@@ -402,7 +389,10 @@ public:
ss << "(" << ++Counters->UnknownsCounter << "): UNKNOWN";
}
} else if (auto* msg = TryGetPrivateEvent<NWrappers::NExternalStorage::TEvPutObjectResponse>(ev)) {
- ss << "S3_RESPONSE(" << ++Counters->ResponsesCounter << "):";
+ ss << "S3_RESPONSE(put " << ++Counters->ResponsesCounter << "):";
+ } else if (auto* msg = TryGetPrivateEvent<NWrappers::NExternalStorage::TEvDeleteObjectResponse>(ev)) {
+ ss << "(" << ++Counters->SuccessCounter << "): DELETE SUCCESS";
+ ss << "S3_RESPONSE(delete " << ++Counters->ResponsesCounter << "):";
} else {
return false;
}
@@ -412,8 +402,10 @@ public:
};
};
-std::vector<std::pair<ui32, ui64>>
-TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTestSchema::TTableSpecials>& specs, const ui32 startTieringIndex) {
+std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TString>& blobs,
+ const std::vector<TTestSchema::TTableSpecials>& specs,
+ const ui32 initialEviction)
+{
TTestBasicRuntime runtime;
TTester::Setup(runtime);
@@ -460,16 +452,16 @@ TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTe
TAutoPtr<IEventHandle> handle;
- std::vector<std::pair<ui32, ui64>> resColumns;
- resColumns.reserve(specs.size());
+ std::vector<std::pair<ui32, ui64>> specRowsBytes;
+ specRowsBytes.reserve(specs.size());
TCountersContainer counter;
runtime.SetEventFilter(TEventsCounter(counter, runtime, sender));
for (ui32 i = 0; i < specs.size(); ++i) {
- bool hasEvictionSettings = false;
+ bool hasColdEviction = false;
for (auto&& i : specs[i].Tiers) {
if (!!i.S3) {
- hasEvictionSettings = true;
+ hasColdEviction = true;
break;
}
}
@@ -483,20 +475,21 @@ TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTe
PlanSchemaTx(runtime, sender, { planStep, txId });
}
}
- if (specs[i].Tiers.size()) {
+ if (specs[i].HasTiers()) {
ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(specs[i]));
}
- counter.SetRestartTabletOnPutData(reboots ? 1 : 0);
- TriggerTTL(runtime, sender, { ++planStep, ++txId }, {}, 0, specs[i].GetTtlColumn());
- if (hasEvictionSettings) {
- if (i == startTieringIndex + 1 || i == startTieringIndex + 2) {
- counter.WaitEvents(runtime, sender, i, 1, TDuration::Seconds(40));
+ TriggerTTL(runtime, sender, { ++planStep, ++txId }, {}, 0, specs[i].TtlColumn);
+ if (hasColdEviction) {
+ Cerr << "Cold tiering, spec " << i << ", num tiers: " << specs[i].Tiers.size() << "\n";
+ if (i > initialEviction) {
+ counter.WaitEvents(runtime, i, 1, TDuration::Seconds(40));
} else {
- counter.WaitEvents(runtime, sender, i, 0, TDuration::Seconds(20));
+ counter.WaitEvents(runtime, i, 0, TDuration::Seconds(20));
}
} else {
- counter.WaitEvents(runtime, sender, i, 0, TDuration::Seconds(5));
+ Cerr << "Hot tiering, spec " << i << ", num tiers: " << specs[i].Tiers.size() << "\n";
+ counter.WaitEvents(runtime, i, 0, TDuration::Seconds(4));
}
if (reboots) {
ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(specs[i]));
@@ -506,10 +499,10 @@ TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTe
--planStep;
auto read = std::make_unique<TEvColumnShard::TEvRead>(sender, metaShard, planStep, Max<ui64>(), tableId);
- Proto(read.get()).AddColumnNames(specs[i].GetTtlColumn());
+ Proto(read.get()).AddColumnNames(specs[i].TtlColumn);
ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release());
- resColumns.emplace_back(0, 0);
+ specRowsBytes.emplace_back(0, 0);
ui32 idx = 0;
while (true) {
auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
@@ -526,17 +519,15 @@ TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTe
}
auto& meta = resRead.GetMeta();
auto& schema = meta.GetSchema();
- auto pkColumn = GetFirstPKColumn(resRead.GetData(), schema, specs[i].GetTtlColumn());
- UNIT_ASSERT(pkColumn);
- UNIT_ASSERT(pkColumn->type_id() == arrow::Type::TIMESTAMP);
+ auto ttlColumn = DeserializeColumn(resRead.GetData(), schema, specs[i].TtlColumn);
+ UNIT_ASSERT(ttlColumn);
- auto tsColumn = std::static_pointer_cast<arrow::TimestampArray>(pkColumn);
- resColumns.back().first += tsColumn->length();
+ specRowsBytes.back().first += ttlColumn->length();
if (resRead.GetFinished()) {
UNIT_ASSERT(meta.HasReadStats());
auto& readStats = meta.GetReadStats();
ui64 numBytes = readStats.GetDataBytes(); // compressed bytes in storage
- resColumns.back().second += numBytes;
+ specRowsBytes.back().second += numBytes;
break;
}
}
@@ -546,90 +537,157 @@ TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTe
}
}
- return resColumns;
+ return specRowsBytes;
}
-void TestTwoTiers(const TTestSchema::TTableSpecials& spec, bool compressed, bool reboots, const EStartTtlSettings startConf) {
- const std::vector<ui64> ts = { 1600000000, 1620000000 };
- ui64 nowSec = TInstant::Now().Seconds();
-
- ui32 portionSize = 80 * 1000;
- ui32 overlapSize = 40 * 1000;
- std::vector<TString> blobs = MakeData(ts, portionSize, overlapSize, spec.GetTtlColumn());
-
- ui64 allowBoth = nowSec - ts[0] + 600;
- ui64 allowOne = nowSec - ts[1] + 600;
- ui64 allowNone = nowSec - ts[1] - 600;
- ui32 startTieringIndex = 0;
- std::vector<TTestSchema::TTableSpecials> alters;
- if (startConf != EStartTtlSettings::Tiering) {
- if (startConf == EStartTtlSettings::None) {
- alters.emplace_back(TTestSchema::TTableSpecials());
+class TEvictionChanges {
+public:
+ void AddTierAlters(const TTestSchema::TTableSpecials& spec, const std::vector<TDuration>&& borders,
+ std::vector<TTestSchema::TTableSpecials>& alters) const {
+ UNIT_ASSERT_EQUAL(borders.size(), 3);
+ UNIT_ASSERT(spec.Tiers.size());
+
+ alters.reserve(alters.size() + spec.Tiers.size() + 1);
+
+ if (spec.Tiers.size() == 1) {
+ alters.push_back(MakeAlter(spec, {borders[0]})); // <tier0 border>, data[0], data[1]
+ alters.push_back(MakeAlter(spec, {borders[1]})); // data[0], <tier0 border>, data[1]
+ alters.push_back(MakeAlter(spec, {borders[2]})); // data[0], data[1], <tier0 border>
+ } else if (spec.Tiers.size() == 2) {
+ alters.push_back(MakeAlter(spec, {borders[0], borders[0]})); // <tier1 border>, <tier0 border>, data[0], data[1]
+ alters.push_back(MakeAlter(spec, {borders[1], borders[0]})); // <tier1 border>, data[0], <tier0 border>, data[1]
+ alters.push_back(MakeAlter(spec, {borders[2], borders[1]})); // data[0], <tier1 border>, data[1], <tier0 border>
+ alters.push_back(MakeAlter(spec, {borders[2], borders[2]})); // data[0], data[1], <tier1 border>, <tier0 border>
}
- if (startConf == EStartTtlSettings::Ttl) {
- alters.emplace_back(TTestSchema::TTableSpecials());
- alters.back().SetTtlColumn("timestamp");
- alters.back().SetEvictAfterSeconds(allowBoth);
+ }
+
+ void AddTtlAlters(const TTestSchema::TTableSpecials& spec, const std::vector<TDuration>&& borders,
+ std::vector<TTestSchema::TTableSpecials>& alters) const {
+ UNIT_ASSERT_EQUAL(borders.size(), 3);
+ UNIT_ASSERT(spec.Tiers.size());
+
+ TTestSchema::TTableSpecials specTtl(spec);
+ if (spec.Tiers.size() == 1) {
+ specTtl = MakeAlter(spec, {borders[0]}); // <tier0 border>, data[0], data[1]
+ } else if (spec.Tiers.size() == 2) {
+ specTtl = MakeAlter(spec, {borders[0], borders[0]}); // <tier1 border>, <tier0 border>, data[0], data[1]
}
- std::vector<TString> blobsOld = MakeData({ 1500000000, 1620000000 }, portionSize, overlapSize, spec.GetTtlColumn());
- blobs.emplace_back(std::move(blobsOld[0]));
- blobs.emplace_back(std::move(blobsOld[1]));
+
+ alters.reserve(alters.size() + borders.size());
+ alters.push_back(specTtl.SetTtl(borders[0])); // <ttl border>, data[0], data[1]
+ alters.push_back(specTtl.SetTtl(borders[1])); // data[0], <ttl border>, data[1]
+ alters.push_back(specTtl.SetTtl(borders[2])); // data[0], data[1], <ttl border>
}
- startTieringIndex = alters.size();
- alters.resize(alters.size() + 4, spec);
- alters[startTieringIndex].Tiers[0].SetEvictAfterSeconds(allowBoth); // tier0 allows/has: data[0], data[1]
- alters[startTieringIndex].Tiers[1].SetEvictAfterSeconds(allowBoth); // tier1 allows: data[0], data[1], has: nothing
- alters[startTieringIndex + 1].Tiers[0].SetEvictAfterSeconds(allowOne); // tier0 allows/has: data[1]
- alters[startTieringIndex + 1].Tiers[1].SetEvictAfterSeconds(allowBoth); // tier1 allows: data[0], data[1], has: data[0]
+ static void Assert(const TTestSchema::TTableSpecials& spec,
+ const std::vector<std::pair<ui32, ui64>>& rowsBytes,
+ size_t initialEviction) {
+ UNIT_ASSERT_VALUES_EQUAL(rowsBytes[initialEviction].first, 2 * PORTION_ROWS);
+ UNIT_ASSERT(rowsBytes[initialEviction].second);
+ if (spec.Tiers.size() > 1) {
+ UNIT_ASSERT_VALUES_EQUAL(rowsBytes[initialEviction].first, rowsBytes[initialEviction + 1].first);
+ }
- alters[startTieringIndex + 2].Tiers[0].SetEvictAfterSeconds(allowNone); // tier0 allows/has: nothing
- alters[startTieringIndex + 2].Tiers[1].SetEvictAfterSeconds(allowOne); // tier1 allows/has: data[1]
+ UNIT_ASSERT_VALUES_EQUAL(rowsBytes[rowsBytes.size() - 2].first, PORTION_ROWS);
+ UNIT_ASSERT(rowsBytes[rowsBytes.size() - 2].second);
- alters[startTieringIndex + 3].Tiers[0].SetEvictAfterSeconds(allowNone); // tier0 allows/has: nothing
- alters[startTieringIndex + 3].Tiers[1].SetEvictAfterSeconds(allowNone); // tier1 allows/has: nothing
+ UNIT_ASSERT_VALUES_EQUAL(rowsBytes.back().first, 0);
+ UNIT_ASSERT_VALUES_EQUAL(rowsBytes.back().second, 0);
+ }
- auto columns = TestTiers(reboots, blobs, alters, startTieringIndex);
+private:
+ TTestSchema::TTableSpecials MakeAlter(const TTestSchema::TTableSpecials& spec,
+ const std::vector<TDuration>& tierBorders) const {
+ UNIT_ASSERT_EQUAL(spec.Tiers.size(), tierBorders.size());
- for (auto&& i : columns) {
- Cerr << i.first << "/" << i.second << Endl;
+ TTestSchema::TTableSpecials alter(spec); // same TTL, Codec, etc.
+ for (size_t i = 0; i < tierBorders.size(); ++i) {
+ alter.Tiers[i].EvictAfter = tierBorders[i];
+ }
+ return alter;
}
+};
- UNIT_ASSERT_EQUAL(columns.size(), alters.size());
- UNIT_ASSERT(columns[startTieringIndex].second);
- UNIT_ASSERT(columns[startTieringIndex].first);
+TTestSchema::TTableSpecials InitialSpec(const EInitialEviction init, TDuration initTs) {
+ TTestSchema::TTableSpecials spec;
+ if (init == EInitialEviction::Ttl) {
+ spec.TtlColumn = "timestamp";
+ spec.EvictAfter = initTs;
+ }
+ return spec;
+}
- UNIT_ASSERT(columns[startTieringIndex + 1].second);
- UNIT_ASSERT(columns[startTieringIndex + 1].first);
+std::vector<std::pair<ui32, ui64>> TestTiersAndTtl(const TTestSchema::TTableSpecials& spec, bool reboots,
+ EInitialEviction init, bool testTtl = false) {
+ const std::vector<ui64> ts = { 1600000000, 1620000000 };
- UNIT_ASSERT(columns[startTieringIndex + 2].second);
- UNIT_ASSERT(columns[startTieringIndex + 2].first);
+ ui32 overlapSize = 40 * 1000;
+ std::vector<TString> blobs = MakeData(ts, PORTION_ROWS, overlapSize, spec.TtlColumn);
+ if (init != EInitialEviction::Tiering) {
+ std::vector<TString> preload = MakeData({ 1500000000, 1620000000 }, PORTION_ROWS, overlapSize, spec.TtlColumn);
+ blobs.emplace_back(std::move(preload[0]));
+ blobs.emplace_back(std::move(preload[1]));
+ }
- UNIT_ASSERT(!columns[startTieringIndex + 3].first);
- UNIT_ASSERT(!columns[startTieringIndex + 3].second);
+ TInstant now = TInstant::Now();
+ TDuration allowBoth = TDuration::Seconds(now.Seconds() - ts[0] + 600);
+ TDuration allowOne = TDuration::Seconds(now.Seconds() - ts[1] + 600);
+ TDuration allowNone = TDuration::Seconds(now.Seconds() - ts[1] - 600);
- UNIT_ASSERT_EQUAL(columns[startTieringIndex].first, 2 * portionSize/* - overlapSize*/);
- UNIT_ASSERT_EQUAL(columns[startTieringIndex].first, columns[startTieringIndex + 1].first);
- UNIT_ASSERT_EQUAL(columns[startTieringIndex + 2].first, portionSize);
+ std::vector<TTestSchema::TTableSpecials> alters = { InitialSpec(init, allowBoth) };
+ size_t initialEviction = alters.size();
- if (compressed) {
- UNIT_ASSERT_GT(columns[startTieringIndex].second, columns[startTieringIndex + 1].second);
+ TEvictionChanges changes;
+ if (testTtl) {
+ changes.AddTtlAlters(spec, {allowBoth, allowOne, allowNone}, alters);
} else {
- UNIT_ASSERT_EQUAL(columns[startTieringIndex].second, columns[startTieringIndex + 1].second);
+ changes.AddTierAlters(spec, {allowBoth, allowOne, allowNone}, alters);
+ }
+
+ auto rowsBytes = TestTiers(reboots, blobs, alters, initialEviction);
+ for (auto&& i : rowsBytes) {
+ Cerr << i.first << "/" << i.second << Endl;
+ }
+
+ UNIT_ASSERT_EQUAL(rowsBytes.size(), alters.size());
+
+ if (!testTtl) { // TODO
+ changes.Assert(spec, rowsBytes, initialEviction);
}
+ return rowsBytes;
}
-void TestTwoHotTiers(bool reboot) {
+void TestTwoHotTiers(bool reboot, bool changeTtl, const EInitialEviction initial = EInitialEviction::None) {
TTestSchema::TTableSpecials spec;
spec.SetTtlColumn("timestamp");
spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier0").SetTtlColumn("timestamp"));
spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier1").SetTtlColumn("timestamp"));
spec.Tiers.back().SetCodec("zstd");
- TestTwoTiers(spec, true, reboot, EStartTtlSettings::None);
+ auto rowsBytes = TestTiersAndTtl(spec, reboot, initial, changeTtl);
+ if (changeTtl) {
+ UNIT_ASSERT_VALUES_EQUAL(rowsBytes.size(), 4);
+ UNIT_ASSERT_VALUES_EQUAL(rowsBytes[0].first, 3 * PORTION_ROWS);
+ UNIT_ASSERT_VALUES_EQUAL(rowsBytes[1].first, 2 * PORTION_ROWS);
+ UNIT_ASSERT_VALUES_EQUAL(rowsBytes[2].first, PORTION_ROWS);
+ UNIT_ASSERT_VALUES_EQUAL(rowsBytes[3].first, 0);
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(rowsBytes.size(), 5);
+ if (initial == EInitialEviction::Ttl) {
+ UNIT_ASSERT_VALUES_EQUAL(rowsBytes[0].first, 2 * PORTION_ROWS);
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(rowsBytes[0].first, 3 * PORTION_ROWS);
+ }
+ UNIT_ASSERT_VALUES_EQUAL(rowsBytes[1].first, 2 * PORTION_ROWS);
+ UNIT_ASSERT_VALUES_EQUAL(rowsBytes[2].first, 2 * PORTION_ROWS);
+ UNIT_ASSERT_VALUES_EQUAL(rowsBytes[3].first, PORTION_ROWS);
+ UNIT_ASSERT_VALUES_EQUAL(rowsBytes[4].first, 0);
+
+ UNIT_ASSERT(rowsBytes[1].second > rowsBytes[2].second); // compression works
+ }
}
-void TestHotAndColdTiers(bool reboot, const EStartTtlSettings startConf) {
+void TestHotAndColdTiers(bool reboot, const EInitialEviction initial) {
const TString bucket = "tiering-test-01";
TPortManager portManager;
const ui16 port = portManager.GetPort();
@@ -664,7 +722,7 @@ void TestHotAndColdTiers(bool reboot, const EStartTtlSettings startConf) {
s3Config.SetConnectionTimeoutMs(10000);
}
- TestTwoTiers(spec, false, reboot, startConf);
+ TestTiersAndTtl(spec, reboot, initial);
}
void TestDrop(bool reboots) {
@@ -695,9 +753,7 @@ void TestDrop(bool reboots) {
//
- static const ui32 portionSize = 80 * 1000;
-
- TString data1 = MakeTestBlob({0, portionSize}, testYdbSchema);
+ TString data1 = MakeTestBlob({0, PORTION_ROWS}, testYdbSchema);
UNIT_ASSERT(data1.size() > NColumnShard::TLimits::MIN_BYTES_TO_INSERT);
UNIT_ASSERT(data1.size() < 7 * 1024 * 1024);
@@ -920,41 +976,63 @@ Y_UNIT_TEST_SUITE(TColumnShardTestSchema) {
TestTtl(true, false, specs);
}
+ // TODO: EnableOneTierAfterTtl, EnableTtlAfterOneTier
+
Y_UNIT_TEST(HotTiers) {
- TestTwoHotTiers(false);
+ TestTwoHotTiers(false, false);
}
Y_UNIT_TEST(RebootHotTiers) {
- NColumnShard::gAllowLogBatchingDefaultValue = false;
- TestTwoHotTiers(true);
+ TestTwoHotTiers(true, false);
}
- Y_UNIT_TEST(ColdTiers) {
- TestHotAndColdTiers(false, EStartTtlSettings::Tiering);
+ Y_UNIT_TEST(HotTiersTtl) {
+ NColumnShard::gAllowLogBatchingDefaultValue = false;
+ TestTwoHotTiers(false, true);
}
- Y_UNIT_TEST(ColdTiersWithNoneTtlTiering) {
- TestHotAndColdTiers(false, EStartTtlSettings::None);
+ Y_UNIT_TEST(RebootHotTiersTtl) {
+ NColumnShard::gAllowLogBatchingDefaultValue = false;
+ TestTwoHotTiers(true, true);
}
- Y_UNIT_TEST(ColdTiersWithTtlTiering) {
- TestHotAndColdTiers(false, EStartTtlSettings::Ttl);
+ Y_UNIT_TEST(HotTiersAfterTtl) {
+ TestTwoHotTiers(false, false, EInitialEviction::Ttl);
}
- Y_UNIT_TEST(ColdTiersWithNoneTtlTieringAndReboot) {
- TestHotAndColdTiers(true, EStartTtlSettings::None);
+ Y_UNIT_TEST(RebootHotTiersAfterTtl) {
+ TestTwoHotTiers(true, false, EInitialEviction::Ttl);
}
- Y_UNIT_TEST(ColdTiersWithTtlTieringAndReboot) {
- TestHotAndColdTiers(true, EStartTtlSettings::Ttl);
+ // TODO: EnableTtlAfterHotTiers
+
+ Y_UNIT_TEST(ColdTiers) {
+ TestHotAndColdTiers(false, EInitialEviction::Tiering);
}
Y_UNIT_TEST(RebootColdTiers) {
- // Disabled KIKIMR-14942
//NColumnShard::gAllowLogBatchingDefaultValue = false;
- //TestHotAndColdTiers(true);
+ TestHotAndColdTiers(true, EInitialEviction::Tiering);
+ }
+
+ Y_UNIT_TEST(EnableColdTiersAfterNoEviction) {
+ TestHotAndColdTiers(false, EInitialEviction::None);
+ }
+
+ Y_UNIT_TEST(RebootEnableColdTiersAfterNoEviction) {
+ TestHotAndColdTiers(true, EInitialEviction::None);
+ }
+
+ Y_UNIT_TEST(EnableColdTiersAfterTtl) {
+ TestHotAndColdTiers(false, EInitialEviction::Ttl);
+ }
+
+ Y_UNIT_TEST(RebootEnableColdTiersAfterTtl) {
+ TestHotAndColdTiers(true, EInitialEviction::Ttl);
}
+ // TODO: EnableTtlAfterColdTiers
+
Y_UNIT_TEST(Drop) {
TestDrop(false);
}
diff --git a/ydb/core/tx/columnshard/write_actor.cpp b/ydb/core/tx/columnshard/write_actor.cpp
index 6558c08c3dc..1ba8b01cc82 100644
--- a/ydb/core/tx/columnshard/write_actor.cpp
+++ b/ydb/core/tx/columnshard/write_actor.cpp
@@ -224,23 +224,26 @@ public:
TString accumulatedBlob;
TVector<std::pair<size_t, TString>> recordsInBlob;
- size_t portionsToWrite = indexChanges->AppendedPortions.size();
- bool appended = true;
- if (indexChanges->PortionsToEvict.size()) {
- Y_VERIFY(portionsToWrite == 0);
- portionsToWrite = indexChanges->PortionsToEvict.size();
- appended = false;
- }
+ Y_VERIFY(indexChanges->AppendedPortions.empty() || indexChanges->PortionsToEvict.empty());
+ size_t portionsToWrite = indexChanges->AppendedPortions.size() + indexChanges->PortionsToEvict.size();
+ bool eviction = indexChanges->PortionsToEvict.size() > 0;
for (size_t pos = 0; pos < portionsToWrite; ++pos) {
- auto& portionInfo = appended ? indexChanges->AppendedPortions[pos]
- : indexChanges->PortionsToEvict[pos].first;
+ auto& portionInfo = eviction ? indexChanges->PortionsToEvict[pos].first
+ : indexChanges->AppendedPortions[pos];
auto& records = portionInfo.Records;
accumulatedBlob.clear();
recordsInBlob.clear();
+ // There could be eviction mix between normal eviction and eviction without data changes
+ // TODO: better portions to blobs mathching
+ if (eviction && !indexChanges->PortionsToEvict[pos].second.DataChanges) {
+ continue;
+ }
+
for (size_t i = 0; i < records.size(); ++i, ++blobsPos) {
+ Y_VERIFY(blobsPos < blobs.size());
const TString& currentBlob = blobs[blobsPos];
Y_VERIFY(currentBlob.size());
diff --git a/ydb/core/tx/tiering/manager.cpp b/ydb/core/tx/tiering/manager.cpp
index fc810bcfd8b..78ed9e33b05 100644
--- a/ydb/core/tx/tiering/manager.cpp
+++ b/ydb/core/tx/tiering/manager.cpp
@@ -101,7 +101,7 @@ bool TManager::Start(std::shared_ptr<NMetadata::NSecret::TSnapshot> secrets) {
CreateS3Actor(TabletId, TabletActorId, Config.GetTierName())
);
auto s3Config = Config.GetPatchedConfig(secrets);
-
+
ctx.Send(newActor, new TEvPrivate::TEvS3Settings(s3Config));
StorageActorId = newActor;
#endif
@@ -115,16 +115,7 @@ TManager::TManager(const ui64 tabletId, const NActors::TActorId& tabletActorId,
{
}
-NOlap::TStorageTier TManager::BuildTierStorage() const {
- NOlap::TStorageTier result;
- result.Name = Config.GetTierName();
- if (Config.GetProtoConfig().HasCompression()) {
- result.Compression = ConvertCompression(Config.GetProtoConfig().GetCompression());
- }
- return result;
-}
-
-NKikimr::NOlap::TCompression TManager::ConvertCompression(const NKikimrSchemeOp::TCompressionOptions& compression) {
+NKikimr::NOlap::TCompression ConvertCompression(const NKikimrSchemeOp::TCompressionOptions& compression) {
NOlap::TCompression out;
if (compression.HasCompressionCodec()) {
switch (compression.GetCompressionCodec()) {
@@ -223,19 +214,26 @@ NMetadata::NFetcher::ISnapshotsFetcher::TPtr TTiersManager::GetExternalDataManip
return ExternalDataManipulation;
}
-THashMap<ui64, NKikimr::NOlap::TTiersInfo> TTiersManager::GetTiering() const {
- THashMap<ui64, NKikimr::NOlap::TTiersInfo> result;
+THashMap<ui64, NKikimr::NOlap::TTiering> TTiersManager::GetTiering() const {
+ THashMap<ui64, NKikimr::NOlap::TTiering> result;
if (!Snapshot) {
return result;
}
auto snapshotPtr = std::dynamic_pointer_cast<NTiers::TConfigsSnapshot>(Snapshot);
Y_VERIFY(snapshotPtr);
+ auto& tierConfigs = snapshotPtr->GetTierConfigs();
for (auto&& i : PathIdTiering) {
auto* tiering = snapshotPtr->GetTieringById(i.second);
- if (!tiering) {
-
- } else {
- result.emplace(i.first, tiering->BuildTiersInfo());
+ if (tiering) {
+ result.emplace(i.first, tiering->BuildOlapTiers());
+ for (auto& [pathId, pathTiering] : result) {
+ for (auto& [name, tier] : pathTiering.TierByName) {
+ auto it = tierConfigs.find(name);
+ if (it != tierConfigs.end()) {
+ tier->Compression = NTiers::ConvertCompression(it->second.GetProtoConfig().GetCompression());
+ }
+ }
+ }
}
}
return result;
diff --git a/ydb/core/tx/tiering/manager.h b/ydb/core/tx/tiering/manager.h
index e449bb26611..1ddf79da156 100644
--- a/ydb/core/tx/tiering/manager.h
+++ b/ydb/core/tx/tiering/manager.h
@@ -12,6 +12,8 @@
namespace NKikimr::NColumnShard {
namespace NTiers {
+NOlap::TCompression ConvertCompression(const NKikimrSchemeOp::TCompressionOptions& compression);
+
class TManager {
private:
ui64 TabletId = 0;
@@ -20,8 +22,6 @@ private:
YDB_READONLY_DEF(NActors::TActorId, StorageActorId);
public:
TManager(const ui64 tabletId, const NActors::TActorId& tabletActorId, const TTierConfig& config);
- static NOlap::TCompression ConvertCompression(const NKikimrSchemeOp::TCompressionOptions& compression);
- NOlap::TStorageTier BuildTierStorage() const;
TManager& Restart(const TTierConfig& config, std::shared_ptr<NMetadata::NSecret::TSnapshot> secrets);
bool NeedExport() const {
@@ -54,7 +54,7 @@ public:
{
}
TActorId GetActorId() const;
- THashMap<ui64, NOlap::TTiersInfo> GetTiering() const;
+ THashMap<ui64, NOlap::TTiering> GetTiering() const;
void TakeConfigs(NMetadata::NFetcher::ISnapshot::TPtr snapshot, std::shared_ptr<NMetadata::NSecret::TSnapshot> secrets);
void EnablePathId(const ui64 pathId, const TString& tieringId) {
PathIdTiering.emplace(pathId, tieringId);
diff --git a/ydb/core/tx/tiering/rule/object.cpp b/ydb/core/tx/tiering/rule/object.cpp
index 5b1095edc68..0bd4fd00b5e 100644
--- a/ydb/core/tx/tiering/rule/object.cpp
+++ b/ydb/core/tx/tiering/rule/object.cpp
@@ -71,10 +71,12 @@ bool TTieringRule::DeserializeFromRecord(const TDecoder& decoder, const Ydb::Val
return true;
}
-NKikimr::NOlap::TTiersInfo TTieringRule::BuildTiersInfo() const {
- NOlap::TTiersInfo result(GetDefaultColumn());
+NKikimr::NOlap::TTiering TTieringRule::BuildOlapTiers() const {
+ NOlap::TTiering result;
+ TInstant now = Now(); // Do not put it in cycle: prevent tiers reorder with the same eviction time
for (auto&& r : Intervals) {
- result.AddTier(r.GetTierName(), Now() - r.GetDurationForEvict());
+ TInstant evictionBorder = now - r.GetDurationForEvict();
+ result.Add(std::make_shared<NOlap::TTierInfo>(r.GetTierName(), evictionBorder, GetDefaultColumn()));
}
return result;
}
diff --git a/ydb/core/tx/tiering/rule/object.h b/ydb/core/tx/tiering/rule/object.h
index 5edc30c4579..a597d6204ce 100644
--- a/ydb/core/tx/tiering/rule/object.h
+++ b/ydb/core/tx/tiering/rule/object.h
@@ -87,7 +87,7 @@ public:
};
NMetadata::NInternal::TTableRecord SerializeToRecord() const;
bool DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& r);
- NKikimr::NOlap::TTiersInfo BuildTiersInfo() const;
+ NKikimr::NOlap::TTiering BuildOlapTiers() const;
};
}
diff --git a/ydb/core/tx/tiering/ut/ut_tiers.cpp b/ydb/core/tx/tiering/ut/ut_tiers.cpp
index 9d541f8b15c..fa36d606a65 100644
--- a/ydb/core/tx/tiering/ut/ut_tiers.cpp
+++ b/ydb/core/tx/tiering/ut/ut_tiers.cpp
@@ -143,7 +143,7 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
YDB_READONLY_FLAG(Found, false);
YDB_ACCESSOR(ui32, ExpectedTieringsCount, 1);
YDB_ACCESSOR(ui32, ExpectedTiersCount, 1);
-
+
using TKeyCheckers = TMap<TString, TJsonChecker>;
YDB_ACCESSOR_DEF(TKeyCheckers, Checkers);
public:
@@ -349,7 +349,6 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
.SetUseRealThreads(false)
.SetEnableMetadataProvider(true)
.SetEnableOlapSchemaOperations(true);
- ;
Tests::TServer::TPtr server = new Tests::TServer(serverSettings);
server->EnableGRpc(grpcPort);
@@ -509,7 +508,7 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
runtime.SimulateSleep(TDuration::Seconds(20));
Cerr << "Initialization tables" << Endl;
Cerr << "Insert..." << Endl;
- const TInstant pkStart = Now() - TDuration::Days(5);
+ const TInstant pkStart = Now() - TDuration::Days(15);
ui32 idx = 0;
lHelper.SendDataViaActorSystem("/Root/olapStore/olapTable", 0, (pkStart + TDuration::Seconds(2 * idx++)).GetValue(), 2000);
{