diff options
author | Artem Zuikov <chertus@gmail.com> | 2022-06-17 19:29:08 +0300 |
---|---|---|
committer | Artem Zuikov <chertus@gmail.com> | 2022-06-17 19:29:08 +0300 |
commit | 349bc75358426fc4245611743bf725010472028b (patch) | |
tree | 9b71e45569b4f8fcd3f4c92bce5a63143fa7c4c5 | |
parent | 5d69c04a58ea9f27399bb4c57e631cd90bb594a8 (diff) | |
download | ydb-349bc75358426fc4245611743bf725010472028b.tar.gz |
KIKIMR-15079: limit erased blobs in one BlobManager tx
ref:7591c0ee7f48f5127bcfda5a82b21848eaa44be3
-rw-r--r-- | ydb/core/tx/columnshard/blob_manager.cpp | 191 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/blob_manager.h | 18 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/blob_manager_txs.cpp | 10 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__progress_tx.cpp | 2 |
4 files changed, 136 insertions, 85 deletions
diff --git a/ydb/core/tx/columnshard/blob_manager.cpp b/ydb/core/tx/columnshard/blob_manager.cpp index 991300843e..64bb13523c 100644 --- a/ydb/core/tx/columnshard/blob_manager.cpp +++ b/ydb/core/tx/columnshard/blob_manager.cpp @@ -153,7 +153,6 @@ bool TBlobManager::LoadState(IBlobManagerDb& db) { if (!db.LoadLastGcBarrier(LastCollectedGenStep)) { return false; } - NewCollectGenStep = LastCollectedGenStep; // Load the keep and delete queues TVector<TUnifiedBlobId> blobsToKeep; @@ -202,7 +201,15 @@ bool TBlobManager::LoadState(IBlobManagerDb& db) { return true; } -bool TBlobManager::TryMoveGCBarrier() { +bool TBlobManager::CanCollectGarbage() const { + if (KeepsToErase.size() || DeletesToErase.size()) { + return true; + } + + return NeedStorageCG(); +} + +bool TBlobManager::NeedStorageCG() const { // Check that there is no GC request in flight if (!PerGroupGCListsInFlight.empty()) { return false; @@ -220,84 +227,91 @@ bool TBlobManager::TryMoveGCBarrier() { return false; } - // Find the GenStep where GC barrier can be moved - { - Y_VERIFY(NewCollectGenStep >= LastCollectedGenStep); - while (!AllocatedGenSteps.empty()) { - if (!AllocatedGenSteps.front()->Finished()) { - break; - } - Y_VERIFY(AllocatedGenSteps.front()->GenStep > CollectGenStepInFlight); - NewCollectGenStep = AllocatedGenSteps.front()->GenStep; + return true; +} - AllocatedGenSteps.pop_front(); - } - if (AllocatedGenSteps.empty()) { - NewCollectGenStep = TGenStep{CurrentGen, CurrentStep}; +TGenStep TBlobManager::FindNewGCBarrier() { + TGenStep newCollectGenStep = LastCollectedGenStep; + size_t numFinished = 0; + for (auto& allocated : AllocatedGenSteps) { + if (!allocated->Finished()) { + break; } + + ++numFinished; + newCollectGenStep = allocated->GenStep; + Y_VERIFY(newCollectGenStep > CollectGenStepInFlight); + } + if (numFinished) { + AllocatedGenSteps.erase(AllocatedGenSteps.begin(), AllocatedGenSteps.begin() + numFinished); } - return NewCollectGenStep > LastCollectedGenStep; + if (AllocatedGenSteps.empty()) { + newCollectGenStep = TGenStep{CurrentGen, CurrentStep}; + } + return newCollectGenStep; } THashMap<ui32, std::unique_ptr<TEvBlobStorage::TEvCollectGarbage>> TBlobManager::PreparePerGroupGCRequests() { - if (!TryMoveGCBarrier()) { + if (!NeedStorageCG()) { return {}; } - PreviousGCTime = AppData()->TimeProvider->Now(); + TGenStep newCollectGenStep = FindNewGCBarrier(); + Y_VERIFY(newCollectGenStep >= LastCollectedGenStep); + if (newCollectGenStep == LastCollectedGenStep) { + return {}; + } - CollectGenStepInFlight = NewCollectGenStep; + PreviousGCTime = AppData()->TimeProvider->Now(); const ui32 channelIdx = BLOB_CHANNEL; - // Find the list of groups between LastCollectedGenSten and new GC GenStep - PerGroupGCListsInFlight.clear(); - { - const ui32 fromGen = std::get<0>(LastCollectedGenStep); - const ui32 toGen = std::get<0>(CollectGenStepInFlight); + Y_VERIFY(PerGroupGCListsInFlight.empty()); + + // Clear all possibly not keeped trash in channel's groups: create an event for each group + if (FirstGC) { + FirstGC = false; + + // TODO: we need only actual channel history here const auto& channelHistory = TabletInfo->ChannelInfo(channelIdx)->History; - auto fnCmpGen = [](ui32 gen, const auto& historyEntry) { - return gen < historyEntry.FromGeneration; - }; - // Look for the entry with FromGeneration <= fromGen and the next entry has FromGeneration > fromGen - auto fromIt = std::upper_bound(channelHistory.begin(), channelHistory.end(), fromGen, fnCmpGen); - if (fromIt != channelHistory.begin()) { - --fromIt; - } - auto toIt = std::upper_bound(channelHistory.begin(), channelHistory.end(), toGen, fnCmpGen); - for (auto it = fromIt; it != toIt; ++it) { - ui32 group = it->GroupID; - PerGroupGCListsInFlight[group]; + + for (auto it = channelHistory.begin(); it != channelHistory.end(); ++it) { + PerGroupGCListsInFlight[it->GroupID]; } } // Make per-group Keep/DontKeep lists { // Add all blobs to keep - while (!BlobsToKeep.empty()) { - auto blobIt = BlobsToKeep.begin(); - if (TGenStep{blobIt->Generation(), blobIt->Step()} > CollectGenStepInFlight) { + auto keepBlobIt = BlobsToKeep.begin(); + for (; keepBlobIt != BlobsToKeep.end(); ++keepBlobIt) { + TGenStep genStep{keepBlobIt->Generation(), keepBlobIt->Step()}; + if (genStep > newCollectGenStep) { break; } - ui32 blobGroup = TabletInfo->GroupFor(blobIt->Channel(), blobIt->Generation()); - PerGroupGCListsInFlight[blobGroup].KeepList.insert(*blobIt); - BlobsToKeep.erase(blobIt); + ui32 blobGroup = TabletInfo->GroupFor(keepBlobIt->Channel(), keepBlobIt->Generation()); + PerGroupGCListsInFlight[blobGroup].KeepList.insert(*keepBlobIt); + } + if (BlobsToKeep.begin() != keepBlobIt) { + BlobsToKeep.erase(BlobsToKeep.begin(), keepBlobIt); } // Add all blobs to delete - while (!BlobsToDelete.empty()) { - auto blobIt = BlobsToDelete.begin(); - if (TGenStep{blobIt->Generation(), blobIt->Step()} > CollectGenStepInFlight) { + auto blobIt = BlobsToDelete.begin(); + for (; blobIt != BlobsToDelete.end(); ++blobIt) { + TGenStep genStep{blobIt->Generation(), blobIt->Step()}; + if (genStep > newCollectGenStep) { break; } ui32 blobGroup = TabletInfo->GroupFor(blobIt->Channel(), blobIt->Generation()); - bool canSkipDontKeep = false; - if (PerGroupGCListsInFlight[blobGroup].KeepList.count(*blobIt)) { + TGCLists& gl = PerGroupGCListsInFlight[blobGroup]; + bool skipDontKeep = false; + if (gl.KeepList.count(*blobIt)) { // Remove the blob from keep list if its also in the delete list - PerGroupGCListsInFlight[blobGroup].KeepList.erase(*blobIt); + gl.KeepList.erase(*blobIt); // Skipped blobs still need to be deleted from BlobsToKeep table - PerGroupGCListsInFlight[blobGroup].KeepListSkipped.push_back(*blobIt); + KeepsToErase.push_back(TUnifiedBlobId(blobGroup, *blobIt)); if (CurrentGen == blobIt->Generation()) { // If this blob was created and deleted in the current generation then @@ -305,19 +319,22 @@ THashMap<ui32, std::unique_ptr<TEvBlobStorage::TEvCollectGarbage>> TBlobManager: // NOTE: its not safe to do this for older generations because there is // a scenario when Keep flag was sent in the old generation and then tablet restarted // before getting the result and removing the blob from the Keep list. - canSkipDontKeep = true; + skipDontKeep = true; + DeletesToErase.push_back(TUnifiedBlobId(blobGroup, *blobIt)); + ++CountersUpdate.BlobSkippedEntries; } } - if (!canSkipDontKeep) { - PerGroupGCListsInFlight[blobGroup].DontKeepList.insert(*blobIt); - } else { - // Skipped blobs still need to be deleted from BlobsToDelete table - PerGroupGCListsInFlight[blobGroup].DontKeepListSkipped.push_back(*blobIt); + if (!skipDontKeep) { + gl.DontKeepList.insert(*blobIt); } - BlobsToDelete.erase(blobIt); + } + if (BlobsToDelete.begin() != blobIt) { + BlobsToDelete.erase(BlobsToDelete.begin(), blobIt); } } + CollectGenStepInFlight = newCollectGenStep; + // Make per group requests THashMap<ui32, std::unique_ptr<TEvBlobStorage::TEvCollectGarbage>> requests; { @@ -340,6 +357,27 @@ THashMap<ui32, std::unique_ptr<TEvBlobStorage::TEvCollectGarbage>> TBlobManager: return requests; } +bool TBlobManager::CleanupFlaggedBlobs(IBlobManagerDb& db) { + if (KeepsToErase.empty() && DeletesToErase.empty()) { + return false; + } + + static constexpr size_t maxBlobsToCleanup = 100000; + size_t numBlobs = 0; + + for (; !KeepsToErase.empty() && numBlobs < maxBlobsToCleanup; ++numBlobs) { + db.EraseBlobToKeep(KeepsToErase.front()); + KeepsToErase.pop_front(); + } + + for (; !DeletesToErase.empty() && numBlobs < maxBlobsToCleanup; ++numBlobs) { + db.EraseBlobToDelete(DeletesToErase.front()); + DeletesToErase.pop_front(); + } + + return true; +} + void TBlobManager::OnGCResult(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev, IBlobManagerDb& db) { Y_VERIFY(ev->Get()->Status == NKikimrProto::OK, "The caller must handle unsuccessful status"); Y_VERIFY(!CounterToGroupInFlight.empty()); @@ -350,29 +388,36 @@ void TBlobManager::OnGCResult(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev, Y_VERIFY(CounterToGroupInFlight.count(counterFromRequest)); ui32 group = CounterToGroupInFlight[counterFromRequest]; - auto it = PerGroupGCListsInFlight.find(group); - for (const auto& blobId : it->second.KeepList) { - db.EraseBlobToKeep(TUnifiedBlobId(group, blobId)); - } + auto it = PerGroupGCListsInFlight.find(group); + const auto& keepList = it->second.KeepList; + const auto& dontKeepList = it->second.DontKeepList; - for (const auto& blobId : it->second.DontKeepList) { - db.EraseBlobToDelete(TUnifiedBlobId(group, blobId)); - } + static constexpr size_t maxBlobsToCleanup = 100000; + size_t blobsToForget = keepList.size() + dontKeepList.size() + KeepsToErase.size() + DeletesToErase.size(); - for (const auto& blobId : it->second.KeepListSkipped) { - db.EraseBlobToKeep(TUnifiedBlobId(group, blobId)); + if (blobsToForget < maxBlobsToCleanup) { + for (const auto& blobId : keepList) { + db.EraseBlobToKeep(TUnifiedBlobId(group, blobId)); + } + for (const auto& blobId : dontKeepList) { + db.EraseBlobToDelete(TUnifiedBlobId(group, blobId)); + } + } else { + for (const auto& blobId : keepList) { + KeepsToErase.push_back(TUnifiedBlobId(group, blobId)); + } + for (const auto& blobId : dontKeepList) { + DeletesToErase.push_back(TUnifiedBlobId(group, blobId)); + } } - for (const auto& blobId : it->second.DontKeepListSkipped) { - db.EraseBlobToDelete(TUnifiedBlobId(group, blobId)); - } + // NOTE: It clears blobs of different groups. + // It's expected to be safe cause we have GC result for the blobs or don't need such result. + CleanupFlaggedBlobs(db); ++CountersUpdate.GcRequestsSent; - CountersUpdate.BlobKeepEntries += it->second.KeepList.size(); - CountersUpdate.BlobDontKeepEntries += it->second.DontKeepList.size(); - // "SkippedBlobs" counter tracks blobs that where excluded from both Keep and DontKeep lists - // DontKeepListSkipped contains those blobs; KeepListSkipped contains them too but also some more - CountersUpdate.BlobSkippedEntries += it->second.DontKeepListSkipped.size(); + CountersUpdate.BlobKeepEntries += keepList.size(); + CountersUpdate.BlobDontKeepEntries += dontKeepList.size(); PerGroupGCListsInFlight.erase(it); CounterToGroupInFlight.erase(group); diff --git a/ydb/core/tx/columnshard/blob_manager.h b/ydb/core/tx/columnshard/blob_manager.h index 8bb09011f9..0d30720126 100644 --- a/ydb/core/tx/columnshard/blob_manager.h +++ b/ydb/core/tx/columnshard/blob_manager.h @@ -172,9 +172,6 @@ private: // The Gen:Step that has been acknowledged by the Distributed Storage TGenStep LastCollectedGenStep = {0, 0}; - // The Gen:Step where GC barrier can be moved - TGenStep NewCollectGenStep = {0, 0}; - // Distributed Storage requires a monotonically increasing counter for GC requests ui64 PerGenerationCounter = 1; @@ -183,15 +180,16 @@ private: struct TGCLists { THashSet<TLogoBlobID> KeepList; THashSet<TLogoBlobID> DontKeepList; - TVector<TLogoBlobID> KeepListSkipped; // List of blobs excluded from Keep list for optimization - TVector<TLogoBlobID> DontKeepListSkipped; // List of blobs excluded from both Keep/DontKeep lists - // NOTE: skipped blobs still need to be removed from local db after GC request completes }; THashMap<ui32, TGCLists> PerGroupGCListsInFlight; + // NOTE: blobs still need to be removed from local db + TDeque<TUnifiedBlobId> KeepsToErase; + TDeque<TUnifiedBlobId> DeletesToErase; // Maps PerGenerationCounter value to the group in PerGroupGCListsInFlight THashMap<ui64, ui32> CounterToGroupInFlight; // The barrier in the current in-flight GC request(s) TGenStep CollectGenStepInFlight = {0, 0}; + bool FirstGC = true; // Stores counter updates since last call to GetCountersUpdate() // Then the counters are reset and start accumulating new delta @@ -211,12 +209,15 @@ public: // Loads the state at startup bool LoadState(IBlobManagerDb& db); - // Checks if GC barrier can be moved. Updates NewCollectGenStep if possible. - bool TryMoveGCBarrier(); + bool CanCollectGarbage() const; + bool NeedStorageCG() const; // Prepares Keep/DontKeep lists and GC barrier THashMap<ui32, std::unique_ptr<TEvBlobStorage::TEvCollectGarbage>> PreparePerGroupGCRequests(); + // Cleanup blobs that have correct flags (skipped or already marked with correct flags) + bool CleanupFlaggedBlobs(IBlobManagerDb& db); + // Called with GC result received from Distributed Storage void OnGCResult(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev, IBlobManagerDb& db); @@ -248,6 +249,7 @@ public: void SetBlobInUse(const TUnifiedBlobId& blobId, bool inUse) override; private: + TGenStep FindNewGCBarrier(); void DeleteSmallBlob(const TUnifiedBlobId& blobId, IBlobManagerDb& db); // Delete small blobs that were previously in use and could not be deleted diff --git a/ydb/core/tx/columnshard/blob_manager_txs.cpp b/ydb/core/tx/columnshard/blob_manager_txs.cpp index ed9e49a8aa..53dbb9de57 100644 --- a/ydb/core/tx/columnshard/blob_manager_txs.cpp +++ b/ydb/core/tx/columnshard/blob_manager_txs.cpp @@ -16,11 +16,15 @@ public: {} bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { - Y_UNUSED(txc); Y_UNUSED(ctx); - Requests = Self->BlobManager->PreparePerGroupGCRequests(); + // Cleanup delayed blobs before next GC + TBlobManagerDb blobManagerDb(txc.DB); + if (Self->BlobManager->CleanupFlaggedBlobs(blobManagerDb)) { + return true; + } + Requests = Self->BlobManager->PreparePerGroupGCRequests(); return true; } @@ -69,7 +73,7 @@ public: void Complete(const TActorContext& ctx) override { // Schedule next GC - if (Self->BlobManager->TryMoveGCBarrier()) { + if (Self->BlobManager->CanCollectGarbage()) { Self->Execute(Self->CreateTxRunGc(), ctx); } } diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp index c3ddf59b4a..edb9a8e90d 100644 --- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp +++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp @@ -132,7 +132,7 @@ public: } Self->UpdateBlobMangerCounters(); - if (Self->BlobManager->TryMoveGCBarrier()) { + if (Self->BlobManager->CanCollectGarbage()) { Self->Execute(Self->CreateTxRunGc(), ctx); } |