summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <[email protected]>2024-03-11 17:24:27 +0300
committerGitHub <[email protected]>2024-03-11 17:24:27 +0300
commit86bb44f2115dd494ecbe6a0bd856cee71f63773b (patch)
treee8071b511435e5adeb0eb4a50a4b1f30090188f2
parent800d1e161ca9a39a3eae3b9537e021b90e0656e6 (diff)
fix cleanup for huge volume removing (#2603)
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp39
1 files changed, 27 insertions, 12 deletions
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 7f91aa3e3dd..8fb593042d4 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -260,11 +260,12 @@ std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup(
auto changes = std::make_shared<TCleanupColumnEngineChanges>(StoragesManager);
// Add all portions from dropped paths
- THashSet<ui64> dropPortions;
THashSet<ui64> emptyPaths;
ui64 txSize = 0;
const ui64 txSizeLimit = TGlobalLimits::TxWriteLimitBytes / 4;
changes->NeedRepeat = false;
+ ui32 skipLocked = 0;
+ ui32 portionsFromDrop = 0;
for (ui64 pathId : pathsToDrop) {
auto itTable = Tables.find(pathId);
if (itTable == Tables.end()) {
@@ -274,6 +275,7 @@ std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup(
for (auto& [portion, info] : itTable->second->GetPortions()) {
if (dataLocksManager->IsLocked(*info)) {
+ ++skipLocked;
continue;
}
if (txSize + info->GetTxVolume() < txSizeLimit || changes->PortionsToDrop.empty()) {
@@ -283,35 +285,48 @@ std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup(
break;
}
changes->PortionsToDrop.push_back(*info);
- dropPortions.insert(portion);
+ ++portionsFromDrop;
}
}
for (ui64 pathId : emptyPaths) {
pathsToDrop.erase(pathId);
}
- while (CleanupPortions.size() && !changes->NeedRepeat) {
- auto it = CleanupPortions.begin();
+ for (auto it = CleanupPortions.begin(); it != CleanupPortions.end();) {
if (it->first >= snapshot) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanupStop")("snapshot", snapshot.DebugString())("current_snapshot", it->first.DebugString());
break;
}
- for (auto&& i : it->second) {
- if (dataLocksManager->IsLocked(i)) {
+ for (ui32 i = 0; i < it->second.size();) {
+ if (dataLocksManager->IsLocked(it->second[i])) {
+ ++skipLocked;
+ ++i;
continue;
}
- Y_ABORT_UNLESS(i.CheckForCleanup(snapshot));
- if (txSize + i.GetTxVolume() < txSizeLimit || changes->PortionsToDrop.empty()) {
- txSize += i.GetTxVolume();
+ Y_ABORT_UNLESS(it->second[i].CheckForCleanup(snapshot));
+ if (txSize + it->second[i].GetTxVolume() < txSizeLimit || changes->PortionsToDrop.empty()) {
+ txSize += it->second[i].GetTxVolume();
} else {
changes->NeedRepeat = true;
break;
}
- changes->PortionsToDrop.push_back(i);
+ changes->PortionsToDrop.push_back(std::move(it->second[i]));
+ if (i + 1 < it->second.size()) {
+ it->second[i] = std::move(it->second.back());
+ }
+ it->second.pop_back();
+ }
+ if (changes->NeedRepeat) {
+ break;
+ }
+ if (it->second.empty()) {
+ it = CleanupPortions.erase(it);
+ } else {
+ ++it;
}
- CleanupPortions.erase(it);
}
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanup")("portions_count", CleanupPortions.size())("portions_prepared", changes->PortionsToDrop.size());
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanup")
+ ("portions_count", CleanupPortions.size())("portions_prepared", changes->PortionsToDrop.size())("repeat", changes->NeedRepeat)("drop", portionsFromDrop)("skip", skipLocked);
if (changes->PortionsToDrop.empty()) {
return nullptr;