diff options
author | stanly <stanly@yandex-team.com> | 2023-05-05 16:43:01 +0300 |
---|---|---|
committer | stanly <stanly@yandex-team.com> | 2023-05-05 16:43:01 +0300 |
commit | 54f6fdc6600fa679e1f643cd4bedc3e5f042e10e (patch) | |
tree | d47e9ee04a4ca045cef8f116c20ed0ce741c6ff5 | |
parent | bf9252649a385716bb2c02c192302900f1a59d44 (diff) | |
download | ydb-54f6fdc6600fa679e1f643cd4bedc3e5f042e10e.tar.gz |
refactoring around granules
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.cpp | 110 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.h | 11 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/portion_info.h | 6 |
3 files changed, 70 insertions, 57 deletions
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 37029a8f3c3..b691a939fb7 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -249,7 +249,7 @@ const TColumnEngineStats& TColumnEngineForLogs::GetTotalStats() { Counters.Granules = Granules.size(); Counters.EmptyGranules = EmptyGranules.size(); Counters.OverloadedGranules = 0; - for (auto& [pathId, set] : PathsGranulesOverloaded) { + for (const auto& [_, set] : PathsGranulesOverloaded) { Counters.OverloadedGranules += set.size(); } @@ -389,7 +389,7 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBl #endif THashSet<ui64> emptyGranulePaths; - for (auto& [granule, spg] : Granules) { + for (const auto& [granule, spg] : Granules) { if (spg->Empty()) { EmptyGranules.insert(granule); emptyGranulePaths.insert(spg->PathId()); @@ -397,7 +397,7 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBl CompactionGranules.insert(granule); CleanupGranules.insert(granule); } - for (auto& [_, portionInfo] : spg->Portions) { + for (const auto& [_, portionInfo] : spg->Portions) { UpdatePortionStats(portionInfo, EStatsUpdateType::LOAD); } } @@ -439,12 +439,23 @@ bool TColumnEngineForLogs::LoadGranules(IDbWrapper& db) { } bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs) { - auto callback = [&](const TColumnRecord& row) { - lostBlobs.erase(row.BlobRange.BlobId); // We have such a blob in index. It isn't lost. - AddColumnRecord(row); - }; - - return ColumnsTable->Load(db, callback); + return ColumnsTable->Load(db, [&](const TColumnRecord& rec) { + Y_VERIFY(rec.Valid()); + // Do not count the blob as lost since it exists in the index. + lostBlobs.erase(rec.BlobRange.BlobId); + // Locate granule and append the record. + if (const auto gi = Granules.find(rec.Granule); gi != Granules.end()) { + gi->second->Portions[rec.Portion].AddRecord(IndexInfo, rec); + } else { +#if 0 + LOG_S_ERROR("No granule " << rec.Granule << " for record " << rec << " at tablet " << TabletId); + Granules.erase(rec.Granule); + return; +#else + Y_VERIFY(false); +#endif + } + }); } bool TColumnEngineForLogs::LoadCounters(IDbWrapper& db) { @@ -476,13 +487,15 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartInsert(TVector< changes->InitSnapshot = LastSnapshot; - for (auto& data : changes->DataToIndex) { - ui64 pathId = data.PathId; + for (const auto& data : changes->DataToIndex) { + const ui64 pathId = data.PathId; + if (changes->PathToGranule.contains(pathId)) { continue; } if (PathGranules.contains(pathId)) { + // Abort inserting if the path has overloaded granules. if (PathsGranulesOverloaded.contains(pathId)) { return {}; } @@ -492,7 +505,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartInsert(TVector< auto& dst = changes->PathToGranule[pathId]; dst.reserve(src.size()); for (const auto& [ts, granule] : src) { - dst.emplace_back(std::make_pair(ts, granule)); + dst.emplace_back(ts, granule); } } else { // It could reserve more then needed in case of the same pathId in DataToIndex @@ -658,7 +671,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash ui64 dropBlobs = 0; bool allowDrop = true; - for (auto& [pathId, ttl] : pathEviction) { + for (const auto& [pathId, ttl] : pathEviction) { if (!PathGranules.contains(pathId)) { continue; // It's not an error: allow TTL over multiple shards with different pathIds presented } @@ -753,27 +766,26 @@ TVector<TVector<std::pair<TMark, ui64>>> TColumnEngineForLogs::EmptyGranuleTrack } void TColumnEngineForLogs::UpdateOverloaded(const THashMap<ui64, std::shared_ptr<TGranuleMeta>>& granules) { - for (auto [granule, spg] : granules) { - if (!spg) { - spg = Granules[granule]; - } - Y_VERIFY(spg); - ui64 pathId = spg->Record.PathId; + for (const auto& [granule, spg] : granules) { + const ui64 pathId = spg->Record.PathId; ui64 size = 0; - for (auto& [portion, portionInfo] : spg->Portions) { + // Calculate byte-size of active portions. + for (const auto& [_, portionInfo] : spg->Portions) { if (portionInfo.IsActive()) { - size += portionInfo.BlobsSizes().first; + size += portionInfo.BlobsBytes(); } } + // Size exceeds the configured limit. Mark granule as overloaded. if (size >= Limits.GranuleOverloadSize) { PathsGranulesOverloaded.emplace(pathId, granule); - } else if (PathsGranulesOverloaded.contains(pathId)) { - auto& granules = PathsGranulesOverloaded[pathId]; - granules.erase(granule); - if (granules.empty()) { - PathsGranulesOverloaded.erase(pathId); + } else if (auto pi = PathsGranulesOverloaded.find(pathId); pi != PathsGranulesOverloaded.end()) { + // Size is under limit. Remove granule from the overloaded set. + pi->second.erase(granule); + // Remove entry for the pathId if there it has no overloaded granules any more. + if (pi->second.empty()) { + PathsGranulesOverloaded.erase(pi); } } } @@ -845,17 +857,32 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnE // Update overloaded granules (only if tx would be applyed) if (changes->IsInsert() || changes->IsCompaction() || changes->IsCleanup()) { THashMap<ui64, std::shared_ptr<TGranuleMeta>> granules; + + const auto emplace_granule = [&](const ui64 id) { + // Lookup granule in the global table. + const auto gi = Granules.find(id); + // Granule should exists. + Y_VERIFY(gi != Granules.end()); + // Emplace granule. + granules.emplace(id, gi->second); + }; + if (changes->IsCleanup()) { - for (auto& portionInfo : changes->PortionsToDrop) { - granules[portionInfo.Granule()] = {}; + granules.reserve(changes->PortionsToDrop.size()); + + for (const auto& portionInfo : changes->PortionsToDrop) { + emplace_granule(portionInfo.Granule()); } } else if (changes->IsCompaction() && !changes->CompactionInfo->InGranule) { - granules[changes->SrcGranule->Granule] = {}; + emplace_granule(changes->SrcGranule->Granule); } else { - for (auto& portionInfo : changes->AppendedPortions) { - granules[portionInfo.Granule()] = {}; + granules.reserve(changes->AppendedPortions.size()); + + for (const auto& portionInfo : changes->AppendedPortions) { + emplace_granule(portionInfo.Granule()); } } + UpdateOverloaded(granules); } return true; @@ -1167,27 +1194,10 @@ bool TColumnEngineForLogs::ErasePortion(const TPortionInfo& portionInfo, bool ap return true; // It must return true if (apply == true) } -void TColumnEngineForLogs::AddColumnRecord(const TColumnRecord& rec) { - Y_VERIFY(rec.Valid()); - - const auto gi = Granules.find(rec.Granule); -#if 0 - if (gi == Granules.end()) { - LOG_S_ERROR("No granule " << rec.Granule << " for record " << rec << " at tablet " << TabletId); - Granules.erase(rec.Granule); - return; - } -#else - Y_VERIFY(gi != Granules.end()); -#endif - auto& portionInfo = gi->second->Portions[rec.Portion]; - portionInfo.AddRecord(IndexInfo, rec); -} - bool TColumnEngineForLogs::CanInsert(const TChanges& changes, const TSnapshot& commitSnap) const { Y_UNUSED(commitSnap); // Does insert have granule in split? - for (auto& portionInfo : changes.AppendedPortions) { + for (const auto& portionInfo : changes.AppendedPortions) { Y_VERIFY(!portionInfo.Empty()); ui64 granule = portionInfo.Granule(); if (GranulesInSplit.contains(granule)) { @@ -1216,7 +1226,7 @@ TMap<TSnapshot, TVector<ui64>> TColumnEngineForLogs::GetOrderedPortions(ui64 gra Y_VERIFY(spg); TMap<TSnapshot, TVector<ui64>> out; - for (auto& [portion, portionInfo] : spg->Portions) { + for (const auto& [portion, portionInfo] : spg->Portions) { if (portionInfo.Empty()) { continue; } diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index e615064f628..d5f99fc98b5 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -1,4 +1,5 @@ #pragma once + #include "defs.h" #include "column_engine.h" #include "scalars.h" @@ -174,8 +175,8 @@ public: const TIndexInfo& GetIndexInfo() const override { return IndexInfo; } const THashSet<ui64>* GetOverloadedGranules(ui64 pathId) const override { - if (PathsGranulesOverloaded.contains(pathId)) { - return &PathsGranulesOverloaded.find(pathId)->second; + if (auto pi = PathsGranulesOverloaded.find(pathId); pi != PathsGranulesOverloaded.end()) { + return &pi->second; } return nullptr; } @@ -242,6 +243,8 @@ private: THashMap<ui64, TMap<TMark, ui64>> PathGranules; // path_id -> {mark, granule} TMap<ui64, std::shared_ptr<TColumnEngineStats>> PathStats; // per path_id stats sorted by path_id THashSet<ui64> GranulesInSplit; + /// Set of empty granules. + /// Just for providing count of empty granules. THashSet<ui64> EmptyGranules; THashMap<ui64, THashSet<ui64>> PathsGranulesOverloaded; TSet<ui64> CompactionGranules; @@ -278,7 +281,6 @@ private: bool SetGranule(const TGranuleRecord& rec, bool apply); bool UpsertPortion(const TPortionInfo& portionInfo, bool apply, bool updateStats = true); bool ErasePortion(const TPortionInfo& portionInfo, bool apply, bool updateStats = true); - void AddColumnRecord(const TColumnRecord& row); void UpdatePortionStats(const TPortionInfo& portionInfo, EStatsUpdateType updateType = EStatsUpdateType::DEFAULT); void UpdatePortionStats(TColumnEngineStats& engineStats, const TPortionInfo& portionInfo, EStatsUpdateType updateType) const; @@ -287,7 +289,8 @@ private: TMap<TSnapshot, TVector<ui64>> GetOrderedPortions(ui64 granule, const TSnapshot& snapshot = TSnapshot::Max()) const; void UpdateOverloaded(const THashMap<ui64, std::shared_ptr<TGranuleMeta>>& granules); - TVector<TVector<std::pair<TMark, ui64>>> EmptyGranuleTracks(ui64 pathId) const; + /// Return lists of adjacent empty granules for the path. + TVector<TVector<std::pair<TMark, ui64>>> EmptyGranuleTracks(const ui64 pathId) const; }; } diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h index b75bd6d2175..dd52019d249 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portion_info.h @@ -115,16 +115,16 @@ struct TPortionInfo { std::pair<ui32, ui32> BlobsSizes() const { ui32 sum = 0; ui32 max = 0; - for (auto& rec : Records) { + for (const auto& rec : Records) { sum += rec.BlobRange.Size; max = Max(max, rec.BlobRange.Size); } return {sum, max}; } - ui64 BlobsBytes() const { + ui64 BlobsBytes() const noexcept { ui64 sum = 0; - for (auto& rec : Records) { + for (const auto& rec : Records) { sum += rec.BlobRange.Size; } return sum; |