summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornsofya <[email protected]>2023-05-03 18:49:36 +0300
committernsofya <[email protected]>2023-05-03 18:49:36 +0300
commit7b101672fe85f87bdadd7b110b1aba1f9aa24414 (patch)
tree1e10f29501c6f0e130b4b5046b30132d95a628ec
parent9e8692365dba3cb82e0ac1d6d3a1900ef3e82edc (diff)
Remove tiering from TIndexInfo
Remove tiering from TIndexInfo Вытащила и передают TieringInfo отдельно. Нужно для поддержки нескольких версий индекса, потому что бессмысленно копировать этот объект в каждый индекс тем более, что это изменяемая контекстная переменная, которую можно в ивент сохранить с таким же успехом. Что и сделала. Логику никакую не меняла больше.
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp25
-rw-r--r--ydb/core/tx/columnshard/columnshard_private_events.h6
-rw-r--r--ydb/core/tx/columnshard/compaction_actor.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h2
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp45
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h8
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.cpp31
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.h5
-rw-r--r--ydb/core/tx/columnshard/engines/tier_info.h114
-rw-r--r--ydb/core/tx/columnshard/engines/ut_logs_engine.cpp7
-rw-r--r--ydb/core/tx/columnshard/eviction_actor.cpp2
-rw-r--r--ydb/core/tx/columnshard/indexing_actor.cpp2
-rw-r--r--ydb/core/tx/tiering/manager.cpp6
13 files changed, 132 insertions, 123 deletions
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 592adb06187..2a8d2c77fac 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -747,15 +747,12 @@ std::unique_ptr<TEvPrivate::TEvIndexing> TColumnShard::SetupIndexation() {
}
auto actualIndexInfo = TablesManager.GetIndexInfo();
- if (Tiers) {
- auto pathTiering = Tiers->GetTiering(); // TODO: pathIds
- actualIndexInfo.UpdatePathTiering(pathTiering);
- actualIndexInfo.SetPathTiering(std::move(pathTiering));
- }
-
ActiveIndexingOrCompaction = true;
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges,
Settings.CacheDataAfterIndexing, std::move(cachedBlobs));
+ if (Tiers) {
+ ev->SetTiering(Tiers->GetTiering());
+ }
return std::make_unique<TEvPrivate::TEvIndexing>(std::move(ev));
}
@@ -788,15 +785,12 @@ std::unique_ptr<TEvPrivate::TEvCompaction> TColumnShard::SetupCompaction() {
}
auto actualIndexInfo = TablesManager.GetIndexInfo();
- if (Tiers) {
- auto pathTiering = Tiers->GetTiering(); // TODO: pathIds
- actualIndexInfo.UpdatePathTiering(pathTiering);
- actualIndexInfo.SetPathTiering(std::move(pathTiering));
- }
-
ActiveIndexingOrCompaction = true;
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges,
Settings.CacheDataAfterCompaction);
+ if (Tiers) {
+ ev->SetTiering(Tiers->GetTiering());
+ }
return std::make_unique<TEvPrivate::TEvCompaction>(std::move(ev), *BlobManager);
}
@@ -835,12 +829,8 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u
}
auto actualIndexInfo = TablesManager.GetIndexInfo();
- actualIndexInfo.UpdatePathTiering(eviction);
-
std::shared_ptr<NOlap::TColumnEngineChanges> indexChanges;
- indexChanges = TablesManager.MutablePrimaryIndex().StartTtl(eviction);
-
- actualIndexInfo.SetPathTiering(std::move(eviction));
+ indexChanges = TablesManager.MutablePrimaryIndex().StartTtl(eviction, actualIndexInfo.ArrowSchema());
if (!indexChanges) {
LOG_S_DEBUG("Cannot prepare TTL at tablet " << TabletID());
@@ -854,6 +844,7 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u
ActiveTtl = true;
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, false);
+ ev->SetTiering(eviction);
return std::make_unique<TEvPrivate::TEvEviction>(std::move(ev), *BlobManager, needWrites);
}
diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h
index 377f8782f56..0349c8ffaf3 100644
--- a/ydb/core/tx/columnshard/columnshard_private_events.h
+++ b/ydb/core/tx/columnshard/columnshard_private_events.h
@@ -28,6 +28,7 @@ struct TEvPrivate {
struct TEvWriteIndex : public TEventLocal<TEvWriteIndex, EvWriteIndex> {
NKikimrProto::EReplyStatus PutStatus = NKikimrProto::UNKNOWN;
NOlap::TIndexInfo IndexInfo;
+ THashMap<ui64, NKikimr::NOlap::TTiering> Tiering;
std::shared_ptr<NOlap::TColumnEngineChanges> IndexChanges;
THashMap<TUnifiedBlobId, std::shared_ptr<arrow::RecordBatch>> CachedBlobs;
TVector<TString> Blobs;
@@ -48,6 +49,11 @@ struct TEvPrivate {
, CachedBlobs(std::move(cachedBlobs))
, CacheData(cacheData)
{}
+
+ TEvWriteIndex& SetTiering(const THashMap<ui64, NKikimr::NOlap::TTiering>& tiering) {
+ Tiering = tiering;
+ return *this;
+ }
};
struct TEvIndexing : public TEventLocal<TEvIndexing, EvIndexing> {
diff --git a/ydb/core/tx/columnshard/compaction_actor.cpp b/ydb/core/tx/columnshard/compaction_actor.cpp
index 2635bf9de58..e45dae396c7 100644
--- a/ydb/core/tx/columnshard/compaction_actor.cpp
+++ b/ydb/core/tx/columnshard/compaction_actor.cpp
@@ -130,7 +130,7 @@ private:
TxEvent->IndexChanges->SetBlobs(std::move(Blobs));
- TxEvent->Blobs = NOlap::TColumnEngineForLogs::CompactBlobs(TxEvent->IndexInfo, TxEvent->IndexChanges);
+ TxEvent->Blobs = NOlap::TColumnEngineForLogs::CompactBlobs(TxEvent->IndexInfo, TxEvent->Tiering, TxEvent->IndexChanges);
if (TxEvent->Blobs.empty()) {
TxEvent->PutStatus = NKikimrProto::OK; // nothing to write, commit
}
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index 4fc878fe033..5ea71878d00 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -362,7 +362,7 @@ public:
const TSnapshot& outdatedSnapshot) = 0;
virtual std::shared_ptr<TColumnEngineChanges> StartCleanup(const TSnapshot& snapshot, THashSet<ui64>& pathsToDrop,
ui32 maxRecords) = 0;
- virtual std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiering>& pathEviction,
+ virtual std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiering>& pathEviction, const std::shared_ptr<arrow::Schema>& schema,
ui64 maxBytesToEvict = TCompactionLimits::DEFAULT_EVICTION_BYTES) = 0;
virtual bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) = 0;
virtual void FreeLocks(std::shared_ptr<TColumnEngineChanges> changes) = 0;
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 2c3a607e285..c8bfe987bb1 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -62,13 +62,13 @@ std::shared_ptr<arrow::RecordBatch> AddSpecials(const std::shared_ptr<arrow::Rec
return NArrow::ExtractColumns(batch, indexInfo.ArrowSchemaWithSpecials());
}
-bool UpdateEvictedPortion(TPortionInfo& portionInfo, const TIndexInfo& indexInfo,
+bool UpdateEvictedPortion(TPortionInfo& portionInfo, const TIndexInfo& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap,
TPortionEvictionFeatures& evictFeatures, const THashMap<TBlobRange, TString>& srcBlobs,
TVector<TColumnRecord>& evictedRecords, TVector<TString>& newBlobs)
{
Y_VERIFY(portionInfo.TierName != evictFeatures.TargetTierName);
- auto* tiering = indexInfo.GetTiering(evictFeatures.PathId);
+ auto* tiering = tieringMap.FindPtr(evictFeatures.PathId);
Y_VERIFY(tiering);
auto compression = tiering->GetCompression(evictFeatures.TargetTierName);
if (!compression) {
@@ -111,6 +111,7 @@ bool UpdateEvictedPortion(TPortionInfo& portionInfo, const TIndexInfo& indexInfo
}
TVector<TPortionInfo> MakeAppendedPortions(const ui64 pathId, const TIndexInfo& indexInfo,
+ const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap,
const std::shared_ptr<arrow::RecordBatch> batch,
const ui64 granule,
const TSnapshot& minSnapshot,
@@ -122,7 +123,7 @@ TVector<TPortionInfo> MakeAppendedPortions(const ui64 pathId, const TIndexInfo&
TString tierName;
TCompression compression = indexInfo.GetDefaultCompression();
if (pathId) {
- if (auto* tiering = indexInfo.GetTiering(pathId)) {
+ if (auto* tiering = tieringMap.FindPtr(pathId)) {
tierName = tiering->GetHottestTierName();
if (const auto& tierCompression = tiering->GetCompression(tierName)) {
compression = *tierCompression;
@@ -867,7 +868,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T
return changes;
}
-std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THashMap<ui64, TTiering>& pathEviction,
+std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THashMap<ui64, TTiering>& pathEviction, const std::shared_ptr<arrow::Schema>& schema,
ui64 maxEvictBytes) {
if (pathEviction.empty()) {
return {};
@@ -885,7 +886,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash
continue; // It's not an error: allow TTL over multiple shards with different pathIds presented
}
- auto expireTimestamp = ttl.EvictScalar();
+ auto expireTimestamp = ttl.EvictScalar(schema);
Y_VERIFY(expireTimestamp);
auto ttlColumnNames = ttl.GetTtlColumns();
@@ -911,13 +912,13 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash
if (keep && tryEvictPortion) {
TString tierName;
- for (auto& tierRef : ttl.OrderedTiers) { // TODO: lower/upper_bound + move into TEviction
+ for (auto& tierRef : ttl.GetOrderedTiers()) { // TODO: lower/upper_bound + move into TEviction
auto& tierInfo = tierRef.Get();
- if (!IndexInfo.AllowTtlOverColumn(tierInfo.EvictColumnName)) {
+ if (!IndexInfo.AllowTtlOverColumn(tierInfo.GetEvictColumnName())) {
continue; // Ignore tiers with bad ttl column
}
- if (NArrow::ScalarLess(tierInfo.EvictScalar(), max)) {
- tierName = tierInfo.Name;
+ if (NArrow::ScalarLess(tierInfo.EvictScalar(schema), max)) {
+ tierName = tierInfo.GetName();
} else {
break;
}
@@ -1609,7 +1610,7 @@ std::unique_ptr<TCompactionInfo> TColumnEngineForLogs::Compact(ui64& lastCompact
return {};
}
-TVector<TString> TColumnEngineForLogs::IndexBlobs(const TIndexInfo& indexInfo,
+TVector<TString> TColumnEngineForLogs::IndexBlobs(const TIndexInfo& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap,
std::shared_ptr<TColumnEngineChanges> indexChanges) {
auto changes = std::static_pointer_cast<TChanges>(indexChanges);
Y_VERIFY(!changes->DataToIndex.empty());
@@ -1663,7 +1664,7 @@ TVector<TString> TColumnEngineForLogs::IndexBlobs(const TIndexInfo& indexInfo,
auto granuleBatches = SliceIntoGranules(merged, changes->PathToGranule[pathId], indexInfo);
for (auto& [granule, batch] : granuleBatches) {
- auto portions = MakeAppendedPortions(pathId, indexInfo, batch, granule, minSnapshot, blobs);
+ auto portions = MakeAppendedPortions(pathId, indexInfo, tieringMap, batch, granule, minSnapshot, blobs);
Y_VERIFY(portions.size() > 0);
for (auto& portion : portions) {
changes->AppendedPortions.emplace_back(std::move(portion));
@@ -1696,7 +1697,7 @@ static std::shared_ptr<arrow::RecordBatch> CompactInOneGranule(const TIndexInfo&
return sortedBatch;
}
-static TVector<TString> CompactInGranule(const TIndexInfo& indexInfo,
+static TVector<TString> CompactInGranule(const TIndexInfo& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap,
std::shared_ptr<TColumnEngineForLogs::TChanges> changes) {
const ui64 pathId = changes->SrcGranule->PathId;
TVector<TString> blobs;
@@ -1716,13 +1717,13 @@ static TVector<TString> CompactInGranule(const TIndexInfo& indexInfo,
if (!slice || slice->num_rows() == 0) {
continue;
}
- auto tmp = MakeAppendedPortions(pathId, indexInfo, slice, granule, TSnapshot{}, blobs);
+ auto tmp = MakeAppendedPortions(pathId, indexInfo, tieringMap, slice, granule, TSnapshot{}, blobs);
for (auto&& portionInfo : tmp) {
portions.emplace_back(std::move(portionInfo));
}
}
} else {
- portions = MakeAppendedPortions(pathId, indexInfo, batch, granule, TSnapshot{}, blobs);
+ portions = MakeAppendedPortions(pathId, indexInfo, tieringMap, batch, granule, TSnapshot{}, blobs);
}
Y_VERIFY(portions.size() > 0);
@@ -1962,7 +1963,7 @@ static ui64 TryMovePortions(const TMark& ts0,
return numRows;
}
-static TVector<TString> CompactSplitGranule(const TIndexInfo& indexInfo,
+static TVector<TString> CompactSplitGranule(const TIndexInfo& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap,
const std::shared_ptr<TColumnEngineForLogs::TChanges>& changes)
{
const ui64 pathId = changes->SrcGranule->PathId;
@@ -2063,7 +2064,7 @@ static TVector<TString> CompactSplitGranule(const TIndexInfo& indexInfo,
for (const auto& batch : idBatches[id]) {
// Cannot set snapshot here. It would be set in committing transaction in ApplyChanges().
- auto newPortions = MakeAppendedPortions(pathId, indexInfo, batch, tmpGranule, TSnapshot{}, blobs);
+ auto newPortions = MakeAppendedPortions(pathId, indexInfo, tieringMap, batch, tmpGranule, TSnapshot{}, blobs);
Y_VERIFY(newPortions.size() > 0);
for (auto& portion : newPortions) {
changes->AppendedPortions.emplace_back(std::move(portion));
@@ -2079,7 +2080,7 @@ static TVector<TString> CompactSplitGranule(const TIndexInfo& indexInfo,
ui64 tmpGranule = changes->SetTmpGranule(pathId, ts);
// Cannot set snapshot here. It would be set in committing transaction in ApplyChanges().
- auto portions = MakeAppendedPortions(pathId, indexInfo, batch, tmpGranule, TSnapshot{}, blobs);
+ auto portions = MakeAppendedPortions(pathId, indexInfo, tieringMap, batch, tmpGranule, TSnapshot{}, blobs);
Y_VERIFY(portions.size() > 0);
for (auto& portion : portions) {
changes->AppendedPortions.emplace_back(std::move(portion));
@@ -2090,7 +2091,7 @@ static TVector<TString> CompactSplitGranule(const TIndexInfo& indexInfo,
return blobs;
}
-TVector<TString> TColumnEngineForLogs::CompactBlobs(const TIndexInfo& indexInfo,
+TVector<TString> TColumnEngineForLogs::CompactBlobs(const TIndexInfo& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap,
std::shared_ptr<TColumnEngineChanges> changes) {
Y_VERIFY(changes);
Y_VERIFY(changes->CompactionInfo);
@@ -2101,12 +2102,12 @@ TVector<TString> TColumnEngineForLogs::CompactBlobs(const TIndexInfo& indexInfo,
auto castedChanges = std::static_pointer_cast<TChanges>(changes);
if (castedChanges->CompactionInfo->InGranule) {
- return CompactInGranule(indexInfo, castedChanges);
+ return CompactInGranule(indexInfo, tieringMap, castedChanges);
}
- return CompactSplitGranule(indexInfo, castedChanges);
+ return CompactSplitGranule(indexInfo, tieringMap, castedChanges);
}
-TVector<TString> TColumnEngineForLogs::EvictBlobs(const TIndexInfo& indexInfo,
+TVector<TString> TColumnEngineForLogs::EvictBlobs(const TIndexInfo& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap,
std::shared_ptr<TColumnEngineChanges> changes) {
Y_VERIFY(changes);
Y_VERIFY(!changes->Blobs.empty()); // src data
@@ -2121,7 +2122,7 @@ TVector<TString> TColumnEngineForLogs::EvictBlobs(const TIndexInfo& indexInfo,
Y_VERIFY(!portionInfo.Empty());
Y_VERIFY(portionInfo.IsActive());
- if (UpdateEvictedPortion(portionInfo, indexInfo, evictFeatures, changes->Blobs,
+ if (UpdateEvictedPortion(portionInfo, indexInfo, tieringMap, evictFeatures, changes->Blobs,
changes->EvictedRecords, newBlobs)) {
Y_VERIFY(portionInfo.TierName == evictFeatures.TargetTierName);
evicted.emplace_back(std::move(portionInfo), evictFeatures);
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index 6608fe67569..36391221401 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -198,7 +198,7 @@ public:
const TSnapshot& outdatedSnapshot) override;
std::shared_ptr<TColumnEngineChanges> StartCleanup(const TSnapshot& snapshot, THashSet<ui64>& pathsToDrop,
ui32 maxRecords) override;
- std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiering>& pathEviction,
+ std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiering>& pathEviction, const std::shared_ptr<arrow::Schema>& schema,
ui64 maxEvictBytes = TCompactionLimits::DEFAULT_EVICTION_BYTES) override;
bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges,
const TSnapshot& snapshot) override;
@@ -219,13 +219,13 @@ public:
// Static part of IColumnEngine iface (called from actors). It's static cause there's no threads sync.
/// @note called from IndexingActor
- static TVector<TString> IndexBlobs(const TIndexInfo& indexInfo, std::shared_ptr<TColumnEngineChanges> changes);
+ static TVector<TString> IndexBlobs(const TIndexInfo& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap, std::shared_ptr<TColumnEngineChanges> changes);
/// @note called from CompactionActor
- static TVector<TString> CompactBlobs(const TIndexInfo& indexInfo, std::shared_ptr<TColumnEngineChanges> changes);
+ static TVector<TString> CompactBlobs(const TIndexInfo& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap, std::shared_ptr<TColumnEngineChanges> changes);
/// @note called from EvictionActor
- static TVector<TString> EvictBlobs(const TIndexInfo& indexInfo, std::shared_ptr<TColumnEngineChanges> changes);
+ static TVector<TString> EvictBlobs(const TIndexInfo& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap, std::shared_ptr<TColumnEngineChanges> changes);
private:
struct TGranuleMeta {
diff --git a/ydb/core/tx/columnshard/engines/index_info.cpp b/ydb/core/tx/columnshard/engines/index_info.cpp
index 441089d7484..ccdae968df9 100644
--- a/ydb/core/tx/columnshard/engines/index_info.cpp
+++ b/ydb/core/tx/columnshard/engines/index_info.cpp
@@ -320,37 +320,6 @@ bool TIndexInfo::AllowTtlOverColumn(const TString& name) const {
return MinMaxIdxColumnsIds.contains(it->second);
}
-void TIndexInfo::UpdatePathTiering(THashMap<ui64, NOlap::TTiering>& pathTiering) const {
- auto schema = ArrowSchema(); // init Schema if not yet
-
- for (auto& [pathId, tiering] : pathTiering) {
- for (auto& [tierName, tierInfo] : tiering.TierByName) {
- if (!tierInfo->EvictColumn) {
- tierInfo->EvictColumn = schema->GetFieldByName(tierInfo->EvictColumnName);
- }
- // TODO: eviction with recompression is not supported yet
- if (tierInfo->NeedExport) {
- tierInfo->Compression = {};
- }
- }
- if (tiering.Ttl && !tiering.Ttl->EvictColumn) {
- tiering.Ttl->EvictColumn = schema->GetFieldByName(tiering.Ttl->EvictColumnName);
- }
- }
-}
-
-void TIndexInfo::SetPathTiering(THashMap<ui64, TTiering>&& pathTierings) {
- PathTiering = std::move(pathTierings);
-}
-
-const TTiering* TIndexInfo::GetTiering(ui64 pathId) const {
- auto it = PathTiering.find(pathId);
- if (it != PathTiering.end()) {
- return &it->second;
- }
- return nullptr;
-}
-
std::shared_ptr<arrow::Schema> MakeArrowSchema(const NTable::TScheme::TTableSchema::TColumns& columns, const TVector<ui32>& ids) {
std::vector<std::shared_ptr<arrow::Field>> fields;
fields.reserve(ids.size());
diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h
index c6af3b0d9b7..6c170990525 100644
--- a/ydb/core/tx/columnshard/engines/index_info.h
+++ b/ydb/core/tx/columnshard/engines/index_info.h
@@ -146,10 +146,6 @@ public:
void SetDefaultCompression(const TCompression& compression) { DefaultCompression = compression; }
const TCompression& GetDefaultCompression() const { return DefaultCompression; }
- void UpdatePathTiering(THashMap<ui64, NOlap::TTiering>& pathTiering) const;
- void SetPathTiering(THashMap<ui64, TTiering>&& pathTierings);
- const TTiering* GetTiering(ui64 pathId) const;
-
private:
ui32 Id;
TString Name;
@@ -162,7 +158,6 @@ private:
THashSet<TString> RequiredColumns;
THashSet<ui32> MinMaxIdxColumnsIds;
TCompression DefaultCompression;
- THashMap<ui64, TTiering> PathTiering;
};
std::shared_ptr<arrow::Schema> MakeArrowSchema(const NTable::TScheme::TTableSchema::TColumns& columns, const TVector<ui32>& ids);
diff --git a/ydb/core/tx/columnshard/engines/tier_info.h b/ydb/core/tx/columnshard/engines/tier_info.h
index e7bd6fcc8c5..c6565a60a7e 100644
--- a/ydb/core/tx/columnshard/engines/tier_info.h
+++ b/ydb/core/tx/columnshard/engines/tier_info.h
@@ -13,37 +13,77 @@ struct TCompression {
std::optional<int> Level;
};
-struct TTierInfo {
+class TTierInfo {
+private:
TString Name;
- TInstant EvictBorder;
TString EvictColumnName;
- std::shared_ptr<arrow::Field> EvictColumn;
- std::optional<TCompression> Compression;
- ui32 TtlUnitsInSecond;
+ TInstant EvictBorder;
bool NeedExport = false;
+ ui32 TtlUnitsInSecond;
+ std::optional<TCompression> Compression;
+ mutable std::shared_ptr<arrow::Scalar> Scalar;
+
+public:
TTierInfo(const TString& tierName, TInstant evictBorder, const TString& column, ui32 unitsInSecond = 0)
: Name(tierName)
- , EvictBorder(evictBorder)
, EvictColumnName(column)
+ , EvictBorder(evictBorder)
, TtlUnitsInSecond(unitsInSecond)
{
Y_VERIFY(!Name.empty());
Y_VERIFY(!EvictColumnName.empty());
}
- std::shared_ptr<arrow::Scalar> EvictScalar() const {
+ const TString& GetName() const {
+ return Name;
+ }
+
+ const TString& GetEvictColumnName() const {
+ return EvictColumnName;
+ }
+
+ const TInstant GetEvictBorder() const {
+ return EvictBorder;
+ }
+
+ bool GetNeedExport() const {
+ return NeedExport;
+ }
+
+ TTierInfo& SetNeedExport(const bool value) {
+ NeedExport = value;
+ return *this;
+ }
+
+ TTierInfo& SetCompression(const TCompression& value) {
+ Compression = value;
+ return *this;
+ }
+
+ const std::optional<TCompression> GetCompression() const {
+ if (NeedExport) {
+ return {};
+ }
+ return Compression;
+ }
+
+ std::shared_ptr<arrow::Field> GetEvictColumn(const std::shared_ptr<arrow::Schema>& schema) const {
+ return schema->GetFieldByName(EvictColumnName);
+ }
+
+ std::shared_ptr<arrow::Scalar> EvictScalar(const std::shared_ptr<arrow::Schema>& schema) const {
if (Scalar) {
return Scalar;
}
+ auto evictColumn = GetEvictColumn(schema);
+ Y_VERIFY(evictColumn);
ui32 multiplier = TtlUnitsInSecond ? TtlUnitsInSecond : 1;
-
- Y_VERIFY(EvictColumn);
- switch (EvictColumn->type()->id()) {
+ switch (evictColumn->type()->id()) {
case arrow::Type::TIMESTAMP:
Scalar = std::make_shared<arrow::TimestampScalar>(
- EvictBorder.MicroSeconds(), arrow::timestamp(arrow::TimeUnit::MICRO));
+ EvictBorder.MicroSeconds(), arrow::timestamp(arrow::TimeUnit::MICRO));
break;
case arrow::Type::UINT16: // YQL Date
Scalar = std::make_shared<arrow::UInt16Scalar>(EvictBorder.Days());
@@ -73,12 +113,10 @@ struct TTierInfo {
*Compression->Level : arrow::util::kUseDefaultCompressionLevel);
return sb;
}
-
-private:
- mutable std::shared_ptr<arrow::Scalar> Scalar;
};
-struct TTierRef {
+class TTierRef {
+public:
TTierRef(const std::shared_ptr<TTierInfo>& tierInfo)
: Info(tierInfo)
{
@@ -86,17 +124,17 @@ struct TTierRef {
}
bool operator < (const TTierRef& b) const {
- if (Info->EvictBorder < b.Info->EvictBorder) {
+ if (Info->GetEvictBorder() < b.Info->GetEvictBorder()) {
return true;
- } else if (Info->EvictBorder == b.Info->EvictBorder) {
- return Info->Name > b.Info->Name; // add stability: smaller name is hotter
+ } else if (Info->GetEvictBorder() == b.Info->GetEvictBorder()) {
+ return Info->GetName() > b.Info->GetName(); // add stability: smaller name is hotter
}
return false;
}
bool operator == (const TTierRef& b) const {
- return Info->EvictBorder == b.Info->EvictBorder
- && Info->Name == b.Info->Name;
+ return Info->GetEvictBorder() == b.Info->GetEvictBorder()
+ && Info->GetName() == b.Info->GetName();
}
const TTierInfo& Get() const {
@@ -107,11 +145,21 @@ private:
std::shared_ptr<TTierInfo> Info;
};
-struct TTiering {
- THashMap<TString, std::shared_ptr<TTierInfo>> TierByName;
- TSet<TTierRef> OrderedTiers; // Tiers ordered by border
+class TTiering {
+ using TTiersMap = THashMap<TString, std::shared_ptr<TTierInfo>>;
+ TTiersMap TierByName;
+ TSet<TTierRef> OrderedTiers;
+public:
std::shared_ptr<TTierInfo> Ttl;
+ const TTiersMap& GetTierByName() const {
+ return TierByName;
+ }
+
+ const TSet<TTierRef>& GetOrderedTiers() const {
+ return OrderedTiers;
+ }
+
bool HasTiers() const {
return !OrderedTiers.empty();
}
@@ -119,23 +167,23 @@ struct TTiering {
void Add(const std::shared_ptr<TTierInfo>& tier) {
if (HasTiers()) {
// TODO: support different ttl columns
- Y_VERIFY(tier->EvictColumnName == OrderedTiers.begin()->Get().EvictColumnName);
+ Y_VERIFY(tier->GetEvictColumnName() == OrderedTiers.begin()->Get().GetEvictColumnName());
}
- TierByName.emplace(tier->Name, tier);
+ TierByName.emplace(tier->GetName(), tier);
OrderedTiers.emplace(tier);
}
TString GetHottestTierName() const {
if (OrderedTiers.size()) {
- return OrderedTiers.rbegin()->Get().Name; // hottest one
+ return OrderedTiers.rbegin()->Get().GetName(); // hottest one
}
return {};
}
- std::shared_ptr<arrow::Scalar> EvictScalar() const {
- auto ttlTs = Ttl ? Ttl->EvictScalar() : nullptr;
- auto tierTs = OrderedTiers.empty() ? nullptr : OrderedTiers.begin()->Get().EvictScalar();
+ std::shared_ptr<arrow::Scalar> EvictScalar(const std::shared_ptr<arrow::Schema>& schema) const {
+ auto ttlTs = Ttl ? Ttl->EvictScalar(schema) : nullptr;
+ auto tierTs = OrderedTiers.empty() ? nullptr : OrderedTiers.begin()->Get().EvictScalar(schema);
if (!ttlTs) {
return tierTs;
} else if (!tierTs) {
@@ -148,7 +196,7 @@ struct TTiering {
auto it = TierByName.find(name);
if (it != TierByName.end()) {
Y_VERIFY(!name.empty());
- return it->second->Compression;
+ return it->second->GetCompression();
}
return {};
}
@@ -157,7 +205,7 @@ struct TTiering {
auto it = TierByName.find(name);
if (it != TierByName.end()) {
Y_VERIFY(!name.empty());
- return it->second->NeedExport;
+ return it->second->GetNeedExport();
}
return false;
}
@@ -165,10 +213,10 @@ struct TTiering {
THashSet<TString> GetTtlColumns() const {
THashSet<TString> out;
if (Ttl) {
- out.insert(Ttl->EvictColumnName);
+ out.insert(Ttl->GetEvictColumnName());
}
for (auto& [tierName, tier] : TierByName) {
- out.insert(tier->EvictColumnName);
+ out.insert(tier->GetEvictColumnName());
}
return out;
}
diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
index d121dc3023a..4634a912686 100644
--- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
@@ -281,7 +281,7 @@ bool Insert(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap,
changes->Blobs.insert(blobs.begin(), blobs.end());
- TVector<TString> newBlobs = TColumnEngineForLogs::IndexBlobs(engine.GetIndexInfo(), changes);
+ TVector<TString> newBlobs = TColumnEngineForLogs::IndexBlobs(engine.GetIndexInfo(), {}, changes);
UNIT_ASSERT_VALUES_EQUAL(changes->AppendedPortions.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(newBlobs.size(), testColumns.size() + 2); // add 2 columns: planStep, txId
@@ -316,7 +316,7 @@ bool Compact(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, T
UNIT_ASSERT_VALUES_EQUAL(changes->SwitchedPortions.size(), expected.SrcPortions);
changes->SetBlobs(std::move(blobs));
- TVector<TString> newBlobs = TColumnEngineForLogs::CompactBlobs(engine.GetIndexInfo(), changes);
+ TVector<TString> newBlobs = TColumnEngineForLogs::CompactBlobs(engine.GetIndexInfo(), {}, changes);
UNIT_ASSERT_VALUES_EQUAL(changes->AppendedPortions.size(), expected.NewPortions);
AddIdsToBlobs(newBlobs, changes->AppendedPortions, changes->Blobs, step);
@@ -346,7 +346,7 @@ bool Cleanup(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, u
bool Ttl(TColumnEngineForLogs& engine, TTestDbWrapper& db,
const THashMap<ui64, NOlap::TTiering>& pathEviction, ui32 expectedToDrop) {
- std::shared_ptr<TColumnEngineChanges> changes = engine.StartTtl(pathEviction);
+ std::shared_ptr<TColumnEngineChanges> changes = engine.StartTtl(pathEviction, engine.GetIndexInfo().ArrowSchema());
UNIT_ASSERT(changes);
UNIT_ASSERT_VALUES_EQUAL(changes->PortionsToDrop.size(), expectedToDrop);
@@ -715,7 +715,6 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
THashMap<ui64, NOlap::TTiering> pathTtls;
NOlap::TTiering tiering;
tiering.Ttl = NOlap::TTierInfo::MakeTtl(TInstant::MicroSeconds(10000), "timestamp");
- tiering.Ttl->EvictColumn = std::make_shared<arrow::Field>("timestamp", ttlColType);
pathTtls.emplace(pathId, std::move(tiering));
Ttl(engine, db, pathTtls, 2);
diff --git a/ydb/core/tx/columnshard/eviction_actor.cpp b/ydb/core/tx/columnshard/eviction_actor.cpp
index e6a588fddcd..25d4a3f9f35 100644
--- a/ydb/core/tx/columnshard/eviction_actor.cpp
+++ b/ydb/core/tx/columnshard/eviction_actor.cpp
@@ -127,7 +127,7 @@ private:
TxEvent->IndexChanges->SetBlobs(std::move(Blobs));
- TxEvent->Blobs = NOlap::TColumnEngineForLogs::EvictBlobs(TxEvent->IndexInfo, TxEvent->IndexChanges);
+ TxEvent->Blobs = NOlap::TColumnEngineForLogs::EvictBlobs(TxEvent->IndexInfo, TxEvent->Tiering, TxEvent->IndexChanges);
if (TxEvent->Blobs.empty()) {
TxEvent->PutStatus = NKikimrProto::OK;
}
diff --git a/ydb/core/tx/columnshard/indexing_actor.cpp b/ydb/core/tx/columnshard/indexing_actor.cpp
index 029de364814..038cdc9e9d3 100644
--- a/ydb/core/tx/columnshard/indexing_actor.cpp
+++ b/ydb/core/tx/columnshard/indexing_actor.cpp
@@ -123,7 +123,7 @@ private:
LOG_S_DEBUG("Indexing started at tablet " << TabletId);
TCpuGuard guard(TxEvent->ResourceUsage);
- TxEvent->Blobs = NOlap::TColumnEngineForLogs::IndexBlobs(TxEvent->IndexInfo, TxEvent->IndexChanges);
+ TxEvent->Blobs = NOlap::TColumnEngineForLogs::IndexBlobs(TxEvent->IndexInfo, TxEvent->Tiering, TxEvent->IndexChanges);
LOG_S_DEBUG("Indexing finished at tablet " << TabletId);
} else {
diff --git a/ydb/core/tx/tiering/manager.cpp b/ydb/core/tx/tiering/manager.cpp
index ed9c4b8c94e..34606b2dcb6 100644
--- a/ydb/core/tx/tiering/manager.cpp
+++ b/ydb/core/tx/tiering/manager.cpp
@@ -225,11 +225,11 @@ THashMap<ui64, NKikimr::NOlap::TTiering> TTiersManager::GetTiering() const {
if (tiering) {
result.emplace(i.first, tiering->BuildOlapTiers());
for (auto& [pathId, pathTiering] : result) {
- for (auto& [name, tier] : pathTiering.TierByName) {
+ for (auto& [name, tier] : pathTiering.GetTierByName()) {
auto it = tierConfigs.find(name);
if (it != tierConfigs.end()) {
- tier->Compression = NTiers::ConvertCompression(it->second.GetCompression());
- tier->NeedExport = it->second.NeedExport();
+ tier->SetCompression(NTiers::ConvertCompression(it->second.GetCompression()));
+ tier->SetNeedExport(it->second.NeedExport());
}
}
}