aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorstanly <stanly@yandex-team.com>2023-05-05 16:43:01 +0300
committerstanly <stanly@yandex-team.com>2023-05-05 16:43:01 +0300
commit54f6fdc6600fa679e1f643cd4bedc3e5f042e10e (patch)
treed47e9ee04a4ca045cef8f116c20ed0ce741c6ff5
parentbf9252649a385716bb2c02c192302900f1a59d44 (diff)
downloadydb-54f6fdc6600fa679e1f643cd4bedc3e5f042e10e.tar.gz
refactoring around granules
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp110
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h11
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.h6
3 files changed, 70 insertions, 57 deletions
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 37029a8f3c3..b691a939fb7 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -249,7 +249,7 @@ const TColumnEngineStats& TColumnEngineForLogs::GetTotalStats() {
Counters.Granules = Granules.size();
Counters.EmptyGranules = EmptyGranules.size();
Counters.OverloadedGranules = 0;
- for (auto& [pathId, set] : PathsGranulesOverloaded) {
+ for (const auto& [_, set] : PathsGranulesOverloaded) {
Counters.OverloadedGranules += set.size();
}
@@ -389,7 +389,7 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBl
#endif
THashSet<ui64> emptyGranulePaths;
- for (auto& [granule, spg] : Granules) {
+ for (const auto& [granule, spg] : Granules) {
if (spg->Empty()) {
EmptyGranules.insert(granule);
emptyGranulePaths.insert(spg->PathId());
@@ -397,7 +397,7 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBl
CompactionGranules.insert(granule);
CleanupGranules.insert(granule);
}
- for (auto& [_, portionInfo] : spg->Portions) {
+ for (const auto& [_, portionInfo] : spg->Portions) {
UpdatePortionStats(portionInfo, EStatsUpdateType::LOAD);
}
}
@@ -439,12 +439,23 @@ bool TColumnEngineForLogs::LoadGranules(IDbWrapper& db) {
}
bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs) {
- auto callback = [&](const TColumnRecord& row) {
- lostBlobs.erase(row.BlobRange.BlobId); // We have such a blob in index. It isn't lost.
- AddColumnRecord(row);
- };
-
- return ColumnsTable->Load(db, callback);
+ return ColumnsTable->Load(db, [&](const TColumnRecord& rec) {
+ Y_VERIFY(rec.Valid());
+ // Do not count the blob as lost since it exists in the index.
+ lostBlobs.erase(rec.BlobRange.BlobId);
+ // Locate granule and append the record.
+ if (const auto gi = Granules.find(rec.Granule); gi != Granules.end()) {
+ gi->second->Portions[rec.Portion].AddRecord(IndexInfo, rec);
+ } else {
+#if 0
+ LOG_S_ERROR("No granule " << rec.Granule << " for record " << rec << " at tablet " << TabletId);
+ Granules.erase(rec.Granule);
+ return;
+#else
+ Y_VERIFY(false);
+#endif
+ }
+ });
}
bool TColumnEngineForLogs::LoadCounters(IDbWrapper& db) {
@@ -476,13 +487,15 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartInsert(TVector<
changes->InitSnapshot = LastSnapshot;
- for (auto& data : changes->DataToIndex) {
- ui64 pathId = data.PathId;
+ for (const auto& data : changes->DataToIndex) {
+ const ui64 pathId = data.PathId;
+
if (changes->PathToGranule.contains(pathId)) {
continue;
}
if (PathGranules.contains(pathId)) {
+ // Abort inserting if the path has overloaded granules.
if (PathsGranulesOverloaded.contains(pathId)) {
return {};
}
@@ -492,7 +505,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartInsert(TVector<
auto& dst = changes->PathToGranule[pathId];
dst.reserve(src.size());
for (const auto& [ts, granule] : src) {
- dst.emplace_back(std::make_pair(ts, granule));
+ dst.emplace_back(ts, granule);
}
} else {
// It could reserve more then needed in case of the same pathId in DataToIndex
@@ -658,7 +671,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash
ui64 dropBlobs = 0;
bool allowDrop = true;
- for (auto& [pathId, ttl] : pathEviction) {
+ for (const auto& [pathId, ttl] : pathEviction) {
if (!PathGranules.contains(pathId)) {
continue; // It's not an error: allow TTL over multiple shards with different pathIds presented
}
@@ -753,27 +766,26 @@ TVector<TVector<std::pair<TMark, ui64>>> TColumnEngineForLogs::EmptyGranuleTrack
}
void TColumnEngineForLogs::UpdateOverloaded(const THashMap<ui64, std::shared_ptr<TGranuleMeta>>& granules) {
- for (auto [granule, spg] : granules) {
- if (!spg) {
- spg = Granules[granule];
- }
- Y_VERIFY(spg);
- ui64 pathId = spg->Record.PathId;
+ for (const auto& [granule, spg] : granules) {
+ const ui64 pathId = spg->Record.PathId;
ui64 size = 0;
- for (auto& [portion, portionInfo] : spg->Portions) {
+ // Calculate byte-size of active portions.
+ for (const auto& [_, portionInfo] : spg->Portions) {
if (portionInfo.IsActive()) {
- size += portionInfo.BlobsSizes().first;
+ size += portionInfo.BlobsBytes();
}
}
+ // Size exceeds the configured limit. Mark granule as overloaded.
if (size >= Limits.GranuleOverloadSize) {
PathsGranulesOverloaded.emplace(pathId, granule);
- } else if (PathsGranulesOverloaded.contains(pathId)) {
- auto& granules = PathsGranulesOverloaded[pathId];
- granules.erase(granule);
- if (granules.empty()) {
- PathsGranulesOverloaded.erase(pathId);
+ } else if (auto pi = PathsGranulesOverloaded.find(pathId); pi != PathsGranulesOverloaded.end()) {
+ // Size is under limit. Remove granule from the overloaded set.
+ pi->second.erase(granule);
+ // Remove entry for the pathId if there it has no overloaded granules any more.
+ if (pi->second.empty()) {
+ PathsGranulesOverloaded.erase(pi);
}
}
}
@@ -845,17 +857,32 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnE
// Update overloaded granules (only if tx would be applyed)
if (changes->IsInsert() || changes->IsCompaction() || changes->IsCleanup()) {
THashMap<ui64, std::shared_ptr<TGranuleMeta>> granules;
+
+ const auto emplace_granule = [&](const ui64 id) {
+ // Lookup granule in the global table.
+ const auto gi = Granules.find(id);
+ // Granule should exists.
+ Y_VERIFY(gi != Granules.end());
+ // Emplace granule.
+ granules.emplace(id, gi->second);
+ };
+
if (changes->IsCleanup()) {
- for (auto& portionInfo : changes->PortionsToDrop) {
- granules[portionInfo.Granule()] = {};
+ granules.reserve(changes->PortionsToDrop.size());
+
+ for (const auto& portionInfo : changes->PortionsToDrop) {
+ emplace_granule(portionInfo.Granule());
}
} else if (changes->IsCompaction() && !changes->CompactionInfo->InGranule) {
- granules[changes->SrcGranule->Granule] = {};
+ emplace_granule(changes->SrcGranule->Granule);
} else {
- for (auto& portionInfo : changes->AppendedPortions) {
- granules[portionInfo.Granule()] = {};
+ granules.reserve(changes->AppendedPortions.size());
+
+ for (const auto& portionInfo : changes->AppendedPortions) {
+ emplace_granule(portionInfo.Granule());
}
}
+
UpdateOverloaded(granules);
}
return true;
@@ -1167,27 +1194,10 @@ bool TColumnEngineForLogs::ErasePortion(const TPortionInfo& portionInfo, bool ap
return true; // It must return true if (apply == true)
}
-void TColumnEngineForLogs::AddColumnRecord(const TColumnRecord& rec) {
- Y_VERIFY(rec.Valid());
-
- const auto gi = Granules.find(rec.Granule);
-#if 0
- if (gi == Granules.end()) {
- LOG_S_ERROR("No granule " << rec.Granule << " for record " << rec << " at tablet " << TabletId);
- Granules.erase(rec.Granule);
- return;
- }
-#else
- Y_VERIFY(gi != Granules.end());
-#endif
- auto& portionInfo = gi->second->Portions[rec.Portion];
- portionInfo.AddRecord(IndexInfo, rec);
-}
-
bool TColumnEngineForLogs::CanInsert(const TChanges& changes, const TSnapshot& commitSnap) const {
Y_UNUSED(commitSnap);
// Does insert have granule in split?
- for (auto& portionInfo : changes.AppendedPortions) {
+ for (const auto& portionInfo : changes.AppendedPortions) {
Y_VERIFY(!portionInfo.Empty());
ui64 granule = portionInfo.Granule();
if (GranulesInSplit.contains(granule)) {
@@ -1216,7 +1226,7 @@ TMap<TSnapshot, TVector<ui64>> TColumnEngineForLogs::GetOrderedPortions(ui64 gra
Y_VERIFY(spg);
TMap<TSnapshot, TVector<ui64>> out;
- for (auto& [portion, portionInfo] : spg->Portions) {
+ for (const auto& [portion, portionInfo] : spg->Portions) {
if (portionInfo.Empty()) {
continue;
}
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index e615064f628..d5f99fc98b5 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -1,4 +1,5 @@
#pragma once
+
#include "defs.h"
#include "column_engine.h"
#include "scalars.h"
@@ -174,8 +175,8 @@ public:
const TIndexInfo& GetIndexInfo() const override { return IndexInfo; }
const THashSet<ui64>* GetOverloadedGranules(ui64 pathId) const override {
- if (PathsGranulesOverloaded.contains(pathId)) {
- return &PathsGranulesOverloaded.find(pathId)->second;
+ if (auto pi = PathsGranulesOverloaded.find(pathId); pi != PathsGranulesOverloaded.end()) {
+ return &pi->second;
}
return nullptr;
}
@@ -242,6 +243,8 @@ private:
THashMap<ui64, TMap<TMark, ui64>> PathGranules; // path_id -> {mark, granule}
TMap<ui64, std::shared_ptr<TColumnEngineStats>> PathStats; // per path_id stats sorted by path_id
THashSet<ui64> GranulesInSplit;
+ /// Set of empty granules.
+ /// Just for providing count of empty granules.
THashSet<ui64> EmptyGranules;
THashMap<ui64, THashSet<ui64>> PathsGranulesOverloaded;
TSet<ui64> CompactionGranules;
@@ -278,7 +281,6 @@ private:
bool SetGranule(const TGranuleRecord& rec, bool apply);
bool UpsertPortion(const TPortionInfo& portionInfo, bool apply, bool updateStats = true);
bool ErasePortion(const TPortionInfo& portionInfo, bool apply, bool updateStats = true);
- void AddColumnRecord(const TColumnRecord& row);
void UpdatePortionStats(const TPortionInfo& portionInfo, EStatsUpdateType updateType = EStatsUpdateType::DEFAULT);
void UpdatePortionStats(TColumnEngineStats& engineStats, const TPortionInfo& portionInfo,
EStatsUpdateType updateType) const;
@@ -287,7 +289,8 @@ private:
TMap<TSnapshot, TVector<ui64>> GetOrderedPortions(ui64 granule, const TSnapshot& snapshot = TSnapshot::Max()) const;
void UpdateOverloaded(const THashMap<ui64, std::shared_ptr<TGranuleMeta>>& granules);
- TVector<TVector<std::pair<TMark, ui64>>> EmptyGranuleTracks(ui64 pathId) const;
+ /// Return lists of adjacent empty granules for the path.
+ TVector<TVector<std::pair<TMark, ui64>>> EmptyGranuleTracks(const ui64 pathId) const;
};
}
diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h
index b75bd6d2175..dd52019d249 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portion_info.h
@@ -115,16 +115,16 @@ struct TPortionInfo {
std::pair<ui32, ui32> BlobsSizes() const {
ui32 sum = 0;
ui32 max = 0;
- for (auto& rec : Records) {
+ for (const auto& rec : Records) {
sum += rec.BlobRange.Size;
max = Max(max, rec.BlobRange.Size);
}
return {sum, max};
}
- ui64 BlobsBytes() const {
+ ui64 BlobsBytes() const noexcept {
ui64 sum = 0;
- for (auto& rec : Records) {
+ for (const auto& rec : Records) {
sum += rec.BlobRange.Size;
}
return sum;