diff options
| author | ivanmorozov333 <[email protected]> | 2024-03-11 17:24:27 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-03-11 17:24:27 +0300 |
| commit | 86bb44f2115dd494ecbe6a0bd856cee71f63773b (patch) | |
| tree | e8071b511435e5adeb0eb4a50a4b1f30090188f2 | |
| parent | 800d1e161ca9a39a3eae3b9537e021b90e0656e6 (diff) | |
fix cleanup for huge volume removing (#2603)
| -rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.cpp | 39 |
1 files changed, 27 insertions, 12 deletions
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 7f91aa3e3dd..8fb593042d4 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -260,11 +260,12 @@ std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup( auto changes = std::make_shared<TCleanupColumnEngineChanges>(StoragesManager); // Add all portions from dropped paths - THashSet<ui64> dropPortions; THashSet<ui64> emptyPaths; ui64 txSize = 0; const ui64 txSizeLimit = TGlobalLimits::TxWriteLimitBytes / 4; changes->NeedRepeat = false; + ui32 skipLocked = 0; + ui32 portionsFromDrop = 0; for (ui64 pathId : pathsToDrop) { auto itTable = Tables.find(pathId); if (itTable == Tables.end()) { @@ -274,6 +275,7 @@ std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup( for (auto& [portion, info] : itTable->second->GetPortions()) { if (dataLocksManager->IsLocked(*info)) { + ++skipLocked; continue; } if (txSize + info->GetTxVolume() < txSizeLimit || changes->PortionsToDrop.empty()) { @@ -283,35 +285,48 @@ std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup( break; } changes->PortionsToDrop.push_back(*info); - dropPortions.insert(portion); + ++portionsFromDrop; } } for (ui64 pathId : emptyPaths) { pathsToDrop.erase(pathId); } - while (CleanupPortions.size() && !changes->NeedRepeat) { - auto it = CleanupPortions.begin(); + for (auto it = CleanupPortions.begin(); it != CleanupPortions.end();) { if (it->first >= snapshot) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanupStop")("snapshot", snapshot.DebugString())("current_snapshot", it->first.DebugString()); break; } - for (auto&& i : it->second) { - if (dataLocksManager->IsLocked(i)) { + for (ui32 i = 0; i < it->second.size();) { + if (dataLocksManager->IsLocked(it->second[i])) { + ++skipLocked; + ++i; continue; } - Y_ABORT_UNLESS(i.CheckForCleanup(snapshot)); - if (txSize + i.GetTxVolume() < txSizeLimit || changes->PortionsToDrop.empty()) { - txSize += i.GetTxVolume(); + Y_ABORT_UNLESS(it->second[i].CheckForCleanup(snapshot)); + if (txSize + it->second[i].GetTxVolume() < txSizeLimit || changes->PortionsToDrop.empty()) { + txSize += it->second[i].GetTxVolume(); } else { changes->NeedRepeat = true; break; } - changes->PortionsToDrop.push_back(i); + changes->PortionsToDrop.push_back(std::move(it->second[i])); + if (i + 1 < it->second.size()) { + it->second[i] = std::move(it->second.back()); + } + it->second.pop_back(); + } + if (changes->NeedRepeat) { + break; + } + if (it->second.empty()) { + it = CleanupPortions.erase(it); + } else { + ++it; } - CleanupPortions.erase(it); } - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanup")("portions_count", CleanupPortions.size())("portions_prepared", changes->PortionsToDrop.size()); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanup") + ("portions_count", CleanupPortions.size())("portions_prepared", changes->PortionsToDrop.size())("repeat", changes->NeedRepeat)("drop", portionsFromDrop)("skip", skipLocked); if (changes->PortionsToDrop.empty()) { return nullptr; |
