diff options
author | nsofya <[email protected]> | 2023-05-03 18:49:36 +0300 |
---|---|---|
committer | nsofya <[email protected]> | 2023-05-03 18:49:36 +0300 |
commit | 7b101672fe85f87bdadd7b110b1aba1f9aa24414 (patch) | |
tree | 1e10f29501c6f0e130b4b5046b30132d95a628ec | |
parent | 9e8692365dba3cb82e0ac1d6d3a1900ef3e82edc (diff) |
Remove tiering from TIndexInfo
Remove tiering from TIndexInfo
Вытащила и передают TieringInfo отдельно.
Нужно для поддержки нескольких версий индекса, потому что бессмысленно копировать этот объект в каждый индекс тем более, что это изменяемая контекстная переменная, которую можно в ивент сохранить с таким же успехом. Что и сделала. Логику никакую не меняла больше.
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.cpp | 25 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_private_events.h | 6 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/compaction_actor.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.cpp | 45 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.h | 8 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/index_info.cpp | 31 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/index_info.h | 5 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/tier_info.h | 114 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/ut_logs_engine.cpp | 7 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/eviction_actor.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/indexing_actor.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/tiering/manager.cpp | 6 |
13 files changed, 132 insertions, 123 deletions
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 592adb06187..2a8d2c77fac 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -747,15 +747,12 @@ std::unique_ptr<TEvPrivate::TEvIndexing> TColumnShard::SetupIndexation() { } auto actualIndexInfo = TablesManager.GetIndexInfo(); - if (Tiers) { - auto pathTiering = Tiers->GetTiering(); // TODO: pathIds - actualIndexInfo.UpdatePathTiering(pathTiering); - actualIndexInfo.SetPathTiering(std::move(pathTiering)); - } - ActiveIndexingOrCompaction = true; auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, Settings.CacheDataAfterIndexing, std::move(cachedBlobs)); + if (Tiers) { + ev->SetTiering(Tiers->GetTiering()); + } return std::make_unique<TEvPrivate::TEvIndexing>(std::move(ev)); } @@ -788,15 +785,12 @@ std::unique_ptr<TEvPrivate::TEvCompaction> TColumnShard::SetupCompaction() { } auto actualIndexInfo = TablesManager.GetIndexInfo(); - if (Tiers) { - auto pathTiering = Tiers->GetTiering(); // TODO: pathIds - actualIndexInfo.UpdatePathTiering(pathTiering); - actualIndexInfo.SetPathTiering(std::move(pathTiering)); - } - ActiveIndexingOrCompaction = true; auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, Settings.CacheDataAfterCompaction); + if (Tiers) { + ev->SetTiering(Tiers->GetTiering()); + } return std::make_unique<TEvPrivate::TEvCompaction>(std::move(ev), *BlobManager); } @@ -835,12 +829,8 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u } auto actualIndexInfo = TablesManager.GetIndexInfo(); - actualIndexInfo.UpdatePathTiering(eviction); - std::shared_ptr<NOlap::TColumnEngineChanges> indexChanges; - indexChanges = TablesManager.MutablePrimaryIndex().StartTtl(eviction); - - actualIndexInfo.SetPathTiering(std::move(eviction)); + indexChanges = TablesManager.MutablePrimaryIndex().StartTtl(eviction, actualIndexInfo.ArrowSchema()); if (!indexChanges) { LOG_S_DEBUG("Cannot prepare TTL at tablet " << TabletID()); @@ -854,6 +844,7 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u ActiveTtl = true; auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, false); + ev->SetTiering(eviction); return std::make_unique<TEvPrivate::TEvEviction>(std::move(ev), *BlobManager, needWrites); } diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h index 377f8782f56..0349c8ffaf3 100644 --- a/ydb/core/tx/columnshard/columnshard_private_events.h +++ b/ydb/core/tx/columnshard/columnshard_private_events.h @@ -28,6 +28,7 @@ struct TEvPrivate { struct TEvWriteIndex : public TEventLocal<TEvWriteIndex, EvWriteIndex> { NKikimrProto::EReplyStatus PutStatus = NKikimrProto::UNKNOWN; NOlap::TIndexInfo IndexInfo; + THashMap<ui64, NKikimr::NOlap::TTiering> Tiering; std::shared_ptr<NOlap::TColumnEngineChanges> IndexChanges; THashMap<TUnifiedBlobId, std::shared_ptr<arrow::RecordBatch>> CachedBlobs; TVector<TString> Blobs; @@ -48,6 +49,11 @@ struct TEvPrivate { , CachedBlobs(std::move(cachedBlobs)) , CacheData(cacheData) {} + + TEvWriteIndex& SetTiering(const THashMap<ui64, NKikimr::NOlap::TTiering>& tiering) { + Tiering = tiering; + return *this; + } }; struct TEvIndexing : public TEventLocal<TEvIndexing, EvIndexing> { diff --git a/ydb/core/tx/columnshard/compaction_actor.cpp b/ydb/core/tx/columnshard/compaction_actor.cpp index 2635bf9de58..e45dae396c7 100644 --- a/ydb/core/tx/columnshard/compaction_actor.cpp +++ b/ydb/core/tx/columnshard/compaction_actor.cpp @@ -130,7 +130,7 @@ private: TxEvent->IndexChanges->SetBlobs(std::move(Blobs)); - TxEvent->Blobs = NOlap::TColumnEngineForLogs::CompactBlobs(TxEvent->IndexInfo, TxEvent->IndexChanges); + TxEvent->Blobs = NOlap::TColumnEngineForLogs::CompactBlobs(TxEvent->IndexInfo, TxEvent->Tiering, TxEvent->IndexChanges); if (TxEvent->Blobs.empty()) { TxEvent->PutStatus = NKikimrProto::OK; // nothing to write, commit } diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index 4fc878fe033..5ea71878d00 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -362,7 +362,7 @@ public: const TSnapshot& outdatedSnapshot) = 0; virtual std::shared_ptr<TColumnEngineChanges> StartCleanup(const TSnapshot& snapshot, THashSet<ui64>& pathsToDrop, ui32 maxRecords) = 0; - virtual std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiering>& pathEviction, + virtual std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiering>& pathEviction, const std::shared_ptr<arrow::Schema>& schema, ui64 maxBytesToEvict = TCompactionLimits::DEFAULT_EVICTION_BYTES) = 0; virtual bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) = 0; virtual void FreeLocks(std::shared_ptr<TColumnEngineChanges> changes) = 0; diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 2c3a607e285..c8bfe987bb1 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -62,13 +62,13 @@ std::shared_ptr<arrow::RecordBatch> AddSpecials(const std::shared_ptr<arrow::Rec return NArrow::ExtractColumns(batch, indexInfo.ArrowSchemaWithSpecials()); } -bool UpdateEvictedPortion(TPortionInfo& portionInfo, const TIndexInfo& indexInfo, +bool UpdateEvictedPortion(TPortionInfo& portionInfo, const TIndexInfo& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap, TPortionEvictionFeatures& evictFeatures, const THashMap<TBlobRange, TString>& srcBlobs, TVector<TColumnRecord>& evictedRecords, TVector<TString>& newBlobs) { Y_VERIFY(portionInfo.TierName != evictFeatures.TargetTierName); - auto* tiering = indexInfo.GetTiering(evictFeatures.PathId); + auto* tiering = tieringMap.FindPtr(evictFeatures.PathId); Y_VERIFY(tiering); auto compression = tiering->GetCompression(evictFeatures.TargetTierName); if (!compression) { @@ -111,6 +111,7 @@ bool UpdateEvictedPortion(TPortionInfo& portionInfo, const TIndexInfo& indexInfo } TVector<TPortionInfo> MakeAppendedPortions(const ui64 pathId, const TIndexInfo& indexInfo, + const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap, const std::shared_ptr<arrow::RecordBatch> batch, const ui64 granule, const TSnapshot& minSnapshot, @@ -122,7 +123,7 @@ TVector<TPortionInfo> MakeAppendedPortions(const ui64 pathId, const TIndexInfo& TString tierName; TCompression compression = indexInfo.GetDefaultCompression(); if (pathId) { - if (auto* tiering = indexInfo.GetTiering(pathId)) { + if (auto* tiering = tieringMap.FindPtr(pathId)) { tierName = tiering->GetHottestTierName(); if (const auto& tierCompression = tiering->GetCompression(tierName)) { compression = *tierCompression; @@ -867,7 +868,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T return changes; } -std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THashMap<ui64, TTiering>& pathEviction, +std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THashMap<ui64, TTiering>& pathEviction, const std::shared_ptr<arrow::Schema>& schema, ui64 maxEvictBytes) { if (pathEviction.empty()) { return {}; @@ -885,7 +886,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash continue; // It's not an error: allow TTL over multiple shards with different pathIds presented } - auto expireTimestamp = ttl.EvictScalar(); + auto expireTimestamp = ttl.EvictScalar(schema); Y_VERIFY(expireTimestamp); auto ttlColumnNames = ttl.GetTtlColumns(); @@ -911,13 +912,13 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash if (keep && tryEvictPortion) { TString tierName; - for (auto& tierRef : ttl.OrderedTiers) { // TODO: lower/upper_bound + move into TEviction + for (auto& tierRef : ttl.GetOrderedTiers()) { // TODO: lower/upper_bound + move into TEviction auto& tierInfo = tierRef.Get(); - if (!IndexInfo.AllowTtlOverColumn(tierInfo.EvictColumnName)) { + if (!IndexInfo.AllowTtlOverColumn(tierInfo.GetEvictColumnName())) { continue; // Ignore tiers with bad ttl column } - if (NArrow::ScalarLess(tierInfo.EvictScalar(), max)) { - tierName = tierInfo.Name; + if (NArrow::ScalarLess(tierInfo.EvictScalar(schema), max)) { + tierName = tierInfo.GetName(); } else { break; } @@ -1609,7 +1610,7 @@ std::unique_ptr<TCompactionInfo> TColumnEngineForLogs::Compact(ui64& lastCompact return {}; } -TVector<TString> TColumnEngineForLogs::IndexBlobs(const TIndexInfo& indexInfo, +TVector<TString> TColumnEngineForLogs::IndexBlobs(const TIndexInfo& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap, std::shared_ptr<TColumnEngineChanges> indexChanges) { auto changes = std::static_pointer_cast<TChanges>(indexChanges); Y_VERIFY(!changes->DataToIndex.empty()); @@ -1663,7 +1664,7 @@ TVector<TString> TColumnEngineForLogs::IndexBlobs(const TIndexInfo& indexInfo, auto granuleBatches = SliceIntoGranules(merged, changes->PathToGranule[pathId], indexInfo); for (auto& [granule, batch] : granuleBatches) { - auto portions = MakeAppendedPortions(pathId, indexInfo, batch, granule, minSnapshot, blobs); + auto portions = MakeAppendedPortions(pathId, indexInfo, tieringMap, batch, granule, minSnapshot, blobs); Y_VERIFY(portions.size() > 0); for (auto& portion : portions) { changes->AppendedPortions.emplace_back(std::move(portion)); @@ -1696,7 +1697,7 @@ static std::shared_ptr<arrow::RecordBatch> CompactInOneGranule(const TIndexInfo& return sortedBatch; } -static TVector<TString> CompactInGranule(const TIndexInfo& indexInfo, +static TVector<TString> CompactInGranule(const TIndexInfo& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap, std::shared_ptr<TColumnEngineForLogs::TChanges> changes) { const ui64 pathId = changes->SrcGranule->PathId; TVector<TString> blobs; @@ -1716,13 +1717,13 @@ static TVector<TString> CompactInGranule(const TIndexInfo& indexInfo, if (!slice || slice->num_rows() == 0) { continue; } - auto tmp = MakeAppendedPortions(pathId, indexInfo, slice, granule, TSnapshot{}, blobs); + auto tmp = MakeAppendedPortions(pathId, indexInfo, tieringMap, slice, granule, TSnapshot{}, blobs); for (auto&& portionInfo : tmp) { portions.emplace_back(std::move(portionInfo)); } } } else { - portions = MakeAppendedPortions(pathId, indexInfo, batch, granule, TSnapshot{}, blobs); + portions = MakeAppendedPortions(pathId, indexInfo, tieringMap, batch, granule, TSnapshot{}, blobs); } Y_VERIFY(portions.size() > 0); @@ -1962,7 +1963,7 @@ static ui64 TryMovePortions(const TMark& ts0, return numRows; } -static TVector<TString> CompactSplitGranule(const TIndexInfo& indexInfo, +static TVector<TString> CompactSplitGranule(const TIndexInfo& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap, const std::shared_ptr<TColumnEngineForLogs::TChanges>& changes) { const ui64 pathId = changes->SrcGranule->PathId; @@ -2063,7 +2064,7 @@ static TVector<TString> CompactSplitGranule(const TIndexInfo& indexInfo, for (const auto& batch : idBatches[id]) { // Cannot set snapshot here. It would be set in committing transaction in ApplyChanges(). - auto newPortions = MakeAppendedPortions(pathId, indexInfo, batch, tmpGranule, TSnapshot{}, blobs); + auto newPortions = MakeAppendedPortions(pathId, indexInfo, tieringMap, batch, tmpGranule, TSnapshot{}, blobs); Y_VERIFY(newPortions.size() > 0); for (auto& portion : newPortions) { changes->AppendedPortions.emplace_back(std::move(portion)); @@ -2079,7 +2080,7 @@ static TVector<TString> CompactSplitGranule(const TIndexInfo& indexInfo, ui64 tmpGranule = changes->SetTmpGranule(pathId, ts); // Cannot set snapshot here. It would be set in committing transaction in ApplyChanges(). - auto portions = MakeAppendedPortions(pathId, indexInfo, batch, tmpGranule, TSnapshot{}, blobs); + auto portions = MakeAppendedPortions(pathId, indexInfo, tieringMap, batch, tmpGranule, TSnapshot{}, blobs); Y_VERIFY(portions.size() > 0); for (auto& portion : portions) { changes->AppendedPortions.emplace_back(std::move(portion)); @@ -2090,7 +2091,7 @@ static TVector<TString> CompactSplitGranule(const TIndexInfo& indexInfo, return blobs; } -TVector<TString> TColumnEngineForLogs::CompactBlobs(const TIndexInfo& indexInfo, +TVector<TString> TColumnEngineForLogs::CompactBlobs(const TIndexInfo& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap, std::shared_ptr<TColumnEngineChanges> changes) { Y_VERIFY(changes); Y_VERIFY(changes->CompactionInfo); @@ -2101,12 +2102,12 @@ TVector<TString> TColumnEngineForLogs::CompactBlobs(const TIndexInfo& indexInfo, auto castedChanges = std::static_pointer_cast<TChanges>(changes); if (castedChanges->CompactionInfo->InGranule) { - return CompactInGranule(indexInfo, castedChanges); + return CompactInGranule(indexInfo, tieringMap, castedChanges); } - return CompactSplitGranule(indexInfo, castedChanges); + return CompactSplitGranule(indexInfo, tieringMap, castedChanges); } -TVector<TString> TColumnEngineForLogs::EvictBlobs(const TIndexInfo& indexInfo, +TVector<TString> TColumnEngineForLogs::EvictBlobs(const TIndexInfo& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap, std::shared_ptr<TColumnEngineChanges> changes) { Y_VERIFY(changes); Y_VERIFY(!changes->Blobs.empty()); // src data @@ -2121,7 +2122,7 @@ TVector<TString> TColumnEngineForLogs::EvictBlobs(const TIndexInfo& indexInfo, Y_VERIFY(!portionInfo.Empty()); Y_VERIFY(portionInfo.IsActive()); - if (UpdateEvictedPortion(portionInfo, indexInfo, evictFeatures, changes->Blobs, + if (UpdateEvictedPortion(portionInfo, indexInfo, tieringMap, evictFeatures, changes->Blobs, changes->EvictedRecords, newBlobs)) { Y_VERIFY(portionInfo.TierName == evictFeatures.TargetTierName); evicted.emplace_back(std::move(portionInfo), evictFeatures); diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index 6608fe67569..36391221401 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -198,7 +198,7 @@ public: const TSnapshot& outdatedSnapshot) override; std::shared_ptr<TColumnEngineChanges> StartCleanup(const TSnapshot& snapshot, THashSet<ui64>& pathsToDrop, ui32 maxRecords) override; - std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiering>& pathEviction, + std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiering>& pathEviction, const std::shared_ptr<arrow::Schema>& schema, ui64 maxEvictBytes = TCompactionLimits::DEFAULT_EVICTION_BYTES) override; bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges, const TSnapshot& snapshot) override; @@ -219,13 +219,13 @@ public: // Static part of IColumnEngine iface (called from actors). It's static cause there's no threads sync. /// @note called from IndexingActor - static TVector<TString> IndexBlobs(const TIndexInfo& indexInfo, std::shared_ptr<TColumnEngineChanges> changes); + static TVector<TString> IndexBlobs(const TIndexInfo& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap, std::shared_ptr<TColumnEngineChanges> changes); /// @note called from CompactionActor - static TVector<TString> CompactBlobs(const TIndexInfo& indexInfo, std::shared_ptr<TColumnEngineChanges> changes); + static TVector<TString> CompactBlobs(const TIndexInfo& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap, std::shared_ptr<TColumnEngineChanges> changes); /// @note called from EvictionActor - static TVector<TString> EvictBlobs(const TIndexInfo& indexInfo, std::shared_ptr<TColumnEngineChanges> changes); + static TVector<TString> EvictBlobs(const TIndexInfo& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap, std::shared_ptr<TColumnEngineChanges> changes); private: struct TGranuleMeta { diff --git a/ydb/core/tx/columnshard/engines/index_info.cpp b/ydb/core/tx/columnshard/engines/index_info.cpp index 441089d7484..ccdae968df9 100644 --- a/ydb/core/tx/columnshard/engines/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/index_info.cpp @@ -320,37 +320,6 @@ bool TIndexInfo::AllowTtlOverColumn(const TString& name) const { return MinMaxIdxColumnsIds.contains(it->second); } -void TIndexInfo::UpdatePathTiering(THashMap<ui64, NOlap::TTiering>& pathTiering) const { - auto schema = ArrowSchema(); // init Schema if not yet - - for (auto& [pathId, tiering] : pathTiering) { - for (auto& [tierName, tierInfo] : tiering.TierByName) { - if (!tierInfo->EvictColumn) { - tierInfo->EvictColumn = schema->GetFieldByName(tierInfo->EvictColumnName); - } - // TODO: eviction with recompression is not supported yet - if (tierInfo->NeedExport) { - tierInfo->Compression = {}; - } - } - if (tiering.Ttl && !tiering.Ttl->EvictColumn) { - tiering.Ttl->EvictColumn = schema->GetFieldByName(tiering.Ttl->EvictColumnName); - } - } -} - -void TIndexInfo::SetPathTiering(THashMap<ui64, TTiering>&& pathTierings) { - PathTiering = std::move(pathTierings); -} - -const TTiering* TIndexInfo::GetTiering(ui64 pathId) const { - auto it = PathTiering.find(pathId); - if (it != PathTiering.end()) { - return &it->second; - } - return nullptr; -} - std::shared_ptr<arrow::Schema> MakeArrowSchema(const NTable::TScheme::TTableSchema::TColumns& columns, const TVector<ui32>& ids) { std::vector<std::shared_ptr<arrow::Field>> fields; fields.reserve(ids.size()); diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h index c6af3b0d9b7..6c170990525 100644 --- a/ydb/core/tx/columnshard/engines/index_info.h +++ b/ydb/core/tx/columnshard/engines/index_info.h @@ -146,10 +146,6 @@ public: void SetDefaultCompression(const TCompression& compression) { DefaultCompression = compression; } const TCompression& GetDefaultCompression() const { return DefaultCompression; } - void UpdatePathTiering(THashMap<ui64, NOlap::TTiering>& pathTiering) const; - void SetPathTiering(THashMap<ui64, TTiering>&& pathTierings); - const TTiering* GetTiering(ui64 pathId) const; - private: ui32 Id; TString Name; @@ -162,7 +158,6 @@ private: THashSet<TString> RequiredColumns; THashSet<ui32> MinMaxIdxColumnsIds; TCompression DefaultCompression; - THashMap<ui64, TTiering> PathTiering; }; std::shared_ptr<arrow::Schema> MakeArrowSchema(const NTable::TScheme::TTableSchema::TColumns& columns, const TVector<ui32>& ids); diff --git a/ydb/core/tx/columnshard/engines/tier_info.h b/ydb/core/tx/columnshard/engines/tier_info.h index e7bd6fcc8c5..c6565a60a7e 100644 --- a/ydb/core/tx/columnshard/engines/tier_info.h +++ b/ydb/core/tx/columnshard/engines/tier_info.h @@ -13,37 +13,77 @@ struct TCompression { std::optional<int> Level; }; -struct TTierInfo { +class TTierInfo { +private: TString Name; - TInstant EvictBorder; TString EvictColumnName; - std::shared_ptr<arrow::Field> EvictColumn; - std::optional<TCompression> Compression; - ui32 TtlUnitsInSecond; + TInstant EvictBorder; bool NeedExport = false; + ui32 TtlUnitsInSecond; + std::optional<TCompression> Compression; + mutable std::shared_ptr<arrow::Scalar> Scalar; + +public: TTierInfo(const TString& tierName, TInstant evictBorder, const TString& column, ui32 unitsInSecond = 0) : Name(tierName) - , EvictBorder(evictBorder) , EvictColumnName(column) + , EvictBorder(evictBorder) , TtlUnitsInSecond(unitsInSecond) { Y_VERIFY(!Name.empty()); Y_VERIFY(!EvictColumnName.empty()); } - std::shared_ptr<arrow::Scalar> EvictScalar() const { + const TString& GetName() const { + return Name; + } + + const TString& GetEvictColumnName() const { + return EvictColumnName; + } + + const TInstant GetEvictBorder() const { + return EvictBorder; + } + + bool GetNeedExport() const { + return NeedExport; + } + + TTierInfo& SetNeedExport(const bool value) { + NeedExport = value; + return *this; + } + + TTierInfo& SetCompression(const TCompression& value) { + Compression = value; + return *this; + } + + const std::optional<TCompression> GetCompression() const { + if (NeedExport) { + return {}; + } + return Compression; + } + + std::shared_ptr<arrow::Field> GetEvictColumn(const std::shared_ptr<arrow::Schema>& schema) const { + return schema->GetFieldByName(EvictColumnName); + } + + std::shared_ptr<arrow::Scalar> EvictScalar(const std::shared_ptr<arrow::Schema>& schema) const { if (Scalar) { return Scalar; } + auto evictColumn = GetEvictColumn(schema); + Y_VERIFY(evictColumn); ui32 multiplier = TtlUnitsInSecond ? TtlUnitsInSecond : 1; - - Y_VERIFY(EvictColumn); - switch (EvictColumn->type()->id()) { + switch (evictColumn->type()->id()) { case arrow::Type::TIMESTAMP: Scalar = std::make_shared<arrow::TimestampScalar>( - EvictBorder.MicroSeconds(), arrow::timestamp(arrow::TimeUnit::MICRO)); + EvictBorder.MicroSeconds(), arrow::timestamp(arrow::TimeUnit::MICRO)); break; case arrow::Type::UINT16: // YQL Date Scalar = std::make_shared<arrow::UInt16Scalar>(EvictBorder.Days()); @@ -73,12 +113,10 @@ struct TTierInfo { *Compression->Level : arrow::util::kUseDefaultCompressionLevel); return sb; } - -private: - mutable std::shared_ptr<arrow::Scalar> Scalar; }; -struct TTierRef { +class TTierRef { +public: TTierRef(const std::shared_ptr<TTierInfo>& tierInfo) : Info(tierInfo) { @@ -86,17 +124,17 @@ struct TTierRef { } bool operator < (const TTierRef& b) const { - if (Info->EvictBorder < b.Info->EvictBorder) { + if (Info->GetEvictBorder() < b.Info->GetEvictBorder()) { return true; - } else if (Info->EvictBorder == b.Info->EvictBorder) { - return Info->Name > b.Info->Name; // add stability: smaller name is hotter + } else if (Info->GetEvictBorder() == b.Info->GetEvictBorder()) { + return Info->GetName() > b.Info->GetName(); // add stability: smaller name is hotter } return false; } bool operator == (const TTierRef& b) const { - return Info->EvictBorder == b.Info->EvictBorder - && Info->Name == b.Info->Name; + return Info->GetEvictBorder() == b.Info->GetEvictBorder() + && Info->GetName() == b.Info->GetName(); } const TTierInfo& Get() const { @@ -107,11 +145,21 @@ private: std::shared_ptr<TTierInfo> Info; }; -struct TTiering { - THashMap<TString, std::shared_ptr<TTierInfo>> TierByName; - TSet<TTierRef> OrderedTiers; // Tiers ordered by border +class TTiering { + using TTiersMap = THashMap<TString, std::shared_ptr<TTierInfo>>; + TTiersMap TierByName; + TSet<TTierRef> OrderedTiers; +public: std::shared_ptr<TTierInfo> Ttl; + const TTiersMap& GetTierByName() const { + return TierByName; + } + + const TSet<TTierRef>& GetOrderedTiers() const { + return OrderedTiers; + } + bool HasTiers() const { return !OrderedTiers.empty(); } @@ -119,23 +167,23 @@ struct TTiering { void Add(const std::shared_ptr<TTierInfo>& tier) { if (HasTiers()) { // TODO: support different ttl columns - Y_VERIFY(tier->EvictColumnName == OrderedTiers.begin()->Get().EvictColumnName); + Y_VERIFY(tier->GetEvictColumnName() == OrderedTiers.begin()->Get().GetEvictColumnName()); } - TierByName.emplace(tier->Name, tier); + TierByName.emplace(tier->GetName(), tier); OrderedTiers.emplace(tier); } TString GetHottestTierName() const { if (OrderedTiers.size()) { - return OrderedTiers.rbegin()->Get().Name; // hottest one + return OrderedTiers.rbegin()->Get().GetName(); // hottest one } return {}; } - std::shared_ptr<arrow::Scalar> EvictScalar() const { - auto ttlTs = Ttl ? Ttl->EvictScalar() : nullptr; - auto tierTs = OrderedTiers.empty() ? nullptr : OrderedTiers.begin()->Get().EvictScalar(); + std::shared_ptr<arrow::Scalar> EvictScalar(const std::shared_ptr<arrow::Schema>& schema) const { + auto ttlTs = Ttl ? Ttl->EvictScalar(schema) : nullptr; + auto tierTs = OrderedTiers.empty() ? nullptr : OrderedTiers.begin()->Get().EvictScalar(schema); if (!ttlTs) { return tierTs; } else if (!tierTs) { @@ -148,7 +196,7 @@ struct TTiering { auto it = TierByName.find(name); if (it != TierByName.end()) { Y_VERIFY(!name.empty()); - return it->second->Compression; + return it->second->GetCompression(); } return {}; } @@ -157,7 +205,7 @@ struct TTiering { auto it = TierByName.find(name); if (it != TierByName.end()) { Y_VERIFY(!name.empty()); - return it->second->NeedExport; + return it->second->GetNeedExport(); } return false; } @@ -165,10 +213,10 @@ struct TTiering { THashSet<TString> GetTtlColumns() const { THashSet<TString> out; if (Ttl) { - out.insert(Ttl->EvictColumnName); + out.insert(Ttl->GetEvictColumnName()); } for (auto& [tierName, tier] : TierByName) { - out.insert(tier->EvictColumnName); + out.insert(tier->GetEvictColumnName()); } return out; } diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp index d121dc3023a..4634a912686 100644 --- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp @@ -281,7 +281,7 @@ bool Insert(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, changes->Blobs.insert(blobs.begin(), blobs.end()); - TVector<TString> newBlobs = TColumnEngineForLogs::IndexBlobs(engine.GetIndexInfo(), changes); + TVector<TString> newBlobs = TColumnEngineForLogs::IndexBlobs(engine.GetIndexInfo(), {}, changes); UNIT_ASSERT_VALUES_EQUAL(changes->AppendedPortions.size(), 1); UNIT_ASSERT_VALUES_EQUAL(newBlobs.size(), testColumns.size() + 2); // add 2 columns: planStep, txId @@ -316,7 +316,7 @@ bool Compact(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, T UNIT_ASSERT_VALUES_EQUAL(changes->SwitchedPortions.size(), expected.SrcPortions); changes->SetBlobs(std::move(blobs)); - TVector<TString> newBlobs = TColumnEngineForLogs::CompactBlobs(engine.GetIndexInfo(), changes); + TVector<TString> newBlobs = TColumnEngineForLogs::CompactBlobs(engine.GetIndexInfo(), {}, changes); UNIT_ASSERT_VALUES_EQUAL(changes->AppendedPortions.size(), expected.NewPortions); AddIdsToBlobs(newBlobs, changes->AppendedPortions, changes->Blobs, step); @@ -346,7 +346,7 @@ bool Cleanup(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, u bool Ttl(TColumnEngineForLogs& engine, TTestDbWrapper& db, const THashMap<ui64, NOlap::TTiering>& pathEviction, ui32 expectedToDrop) { - std::shared_ptr<TColumnEngineChanges> changes = engine.StartTtl(pathEviction); + std::shared_ptr<TColumnEngineChanges> changes = engine.StartTtl(pathEviction, engine.GetIndexInfo().ArrowSchema()); UNIT_ASSERT(changes); UNIT_ASSERT_VALUES_EQUAL(changes->PortionsToDrop.size(), expectedToDrop); @@ -715,7 +715,6 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { THashMap<ui64, NOlap::TTiering> pathTtls; NOlap::TTiering tiering; tiering.Ttl = NOlap::TTierInfo::MakeTtl(TInstant::MicroSeconds(10000), "timestamp"); - tiering.Ttl->EvictColumn = std::make_shared<arrow::Field>("timestamp", ttlColType); pathTtls.emplace(pathId, std::move(tiering)); Ttl(engine, db, pathTtls, 2); diff --git a/ydb/core/tx/columnshard/eviction_actor.cpp b/ydb/core/tx/columnshard/eviction_actor.cpp index e6a588fddcd..25d4a3f9f35 100644 --- a/ydb/core/tx/columnshard/eviction_actor.cpp +++ b/ydb/core/tx/columnshard/eviction_actor.cpp @@ -127,7 +127,7 @@ private: TxEvent->IndexChanges->SetBlobs(std::move(Blobs)); - TxEvent->Blobs = NOlap::TColumnEngineForLogs::EvictBlobs(TxEvent->IndexInfo, TxEvent->IndexChanges); + TxEvent->Blobs = NOlap::TColumnEngineForLogs::EvictBlobs(TxEvent->IndexInfo, TxEvent->Tiering, TxEvent->IndexChanges); if (TxEvent->Blobs.empty()) { TxEvent->PutStatus = NKikimrProto::OK; } diff --git a/ydb/core/tx/columnshard/indexing_actor.cpp b/ydb/core/tx/columnshard/indexing_actor.cpp index 029de364814..038cdc9e9d3 100644 --- a/ydb/core/tx/columnshard/indexing_actor.cpp +++ b/ydb/core/tx/columnshard/indexing_actor.cpp @@ -123,7 +123,7 @@ private: LOG_S_DEBUG("Indexing started at tablet " << TabletId); TCpuGuard guard(TxEvent->ResourceUsage); - TxEvent->Blobs = NOlap::TColumnEngineForLogs::IndexBlobs(TxEvent->IndexInfo, TxEvent->IndexChanges); + TxEvent->Blobs = NOlap::TColumnEngineForLogs::IndexBlobs(TxEvent->IndexInfo, TxEvent->Tiering, TxEvent->IndexChanges); LOG_S_DEBUG("Indexing finished at tablet " << TabletId); } else { diff --git a/ydb/core/tx/tiering/manager.cpp b/ydb/core/tx/tiering/manager.cpp index ed9c4b8c94e..34606b2dcb6 100644 --- a/ydb/core/tx/tiering/manager.cpp +++ b/ydb/core/tx/tiering/manager.cpp @@ -225,11 +225,11 @@ THashMap<ui64, NKikimr::NOlap::TTiering> TTiersManager::GetTiering() const { if (tiering) { result.emplace(i.first, tiering->BuildOlapTiers()); for (auto& [pathId, pathTiering] : result) { - for (auto& [name, tier] : pathTiering.TierByName) { + for (auto& [name, tier] : pathTiering.GetTierByName()) { auto it = tierConfigs.find(name); if (it != tierConfigs.end()) { - tier->Compression = NTiers::ConvertCompression(it->second.GetCompression()); - tier->NeedExport = it->second.NeedExport(); + tier->SetCompression(NTiers::ConvertCompression(it->second.GetCompression())); + tier->SetNeedExport(it->second.NeedExport()); } } } |