aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorArtem Zuikov <chertus@gmail.com>2022-06-17 19:29:08 +0300
committerArtem Zuikov <chertus@gmail.com>2022-06-17 19:29:08 +0300
commit349bc75358426fc4245611743bf725010472028b (patch)
tree9b71e45569b4f8fcd3f4c92bce5a63143fa7c4c5
parent5d69c04a58ea9f27399bb4c57e631cd90bb594a8 (diff)
downloadydb-349bc75358426fc4245611743bf725010472028b.tar.gz
KIKIMR-15079: limit erased blobs in one BlobManager tx
ref:7591c0ee7f48f5127bcfda5a82b21848eaa44be3
-rw-r--r--ydb/core/tx/columnshard/blob_manager.cpp191
-rw-r--r--ydb/core/tx/columnshard/blob_manager.h18
-rw-r--r--ydb/core/tx/columnshard/blob_manager_txs.cpp10
-rw-r--r--ydb/core/tx/columnshard/columnshard__progress_tx.cpp2
4 files changed, 136 insertions, 85 deletions
diff --git a/ydb/core/tx/columnshard/blob_manager.cpp b/ydb/core/tx/columnshard/blob_manager.cpp
index 991300843e..64bb13523c 100644
--- a/ydb/core/tx/columnshard/blob_manager.cpp
+++ b/ydb/core/tx/columnshard/blob_manager.cpp
@@ -153,7 +153,6 @@ bool TBlobManager::LoadState(IBlobManagerDb& db) {
if (!db.LoadLastGcBarrier(LastCollectedGenStep)) {
return false;
}
- NewCollectGenStep = LastCollectedGenStep;
// Load the keep and delete queues
TVector<TUnifiedBlobId> blobsToKeep;
@@ -202,7 +201,15 @@ bool TBlobManager::LoadState(IBlobManagerDb& db) {
return true;
}
-bool TBlobManager::TryMoveGCBarrier() {
+bool TBlobManager::CanCollectGarbage() const {
+ if (KeepsToErase.size() || DeletesToErase.size()) {
+ return true;
+ }
+
+ return NeedStorageCG();
+}
+
+bool TBlobManager::NeedStorageCG() const {
// Check that there is no GC request in flight
if (!PerGroupGCListsInFlight.empty()) {
return false;
@@ -220,84 +227,91 @@ bool TBlobManager::TryMoveGCBarrier() {
return false;
}
- // Find the GenStep where GC barrier can be moved
- {
- Y_VERIFY(NewCollectGenStep >= LastCollectedGenStep);
- while (!AllocatedGenSteps.empty()) {
- if (!AllocatedGenSteps.front()->Finished()) {
- break;
- }
- Y_VERIFY(AllocatedGenSteps.front()->GenStep > CollectGenStepInFlight);
- NewCollectGenStep = AllocatedGenSteps.front()->GenStep;
+ return true;
+}
- AllocatedGenSteps.pop_front();
- }
- if (AllocatedGenSteps.empty()) {
- NewCollectGenStep = TGenStep{CurrentGen, CurrentStep};
+TGenStep TBlobManager::FindNewGCBarrier() {
+ TGenStep newCollectGenStep = LastCollectedGenStep;
+ size_t numFinished = 0;
+ for (auto& allocated : AllocatedGenSteps) {
+ if (!allocated->Finished()) {
+ break;
}
+
+ ++numFinished;
+ newCollectGenStep = allocated->GenStep;
+ Y_VERIFY(newCollectGenStep > CollectGenStepInFlight);
+ }
+ if (numFinished) {
+ AllocatedGenSteps.erase(AllocatedGenSteps.begin(), AllocatedGenSteps.begin() + numFinished);
}
- return NewCollectGenStep > LastCollectedGenStep;
+ if (AllocatedGenSteps.empty()) {
+ newCollectGenStep = TGenStep{CurrentGen, CurrentStep};
+ }
+ return newCollectGenStep;
}
THashMap<ui32, std::unique_ptr<TEvBlobStorage::TEvCollectGarbage>> TBlobManager::PreparePerGroupGCRequests() {
- if (!TryMoveGCBarrier()) {
+ if (!NeedStorageCG()) {
return {};
}
- PreviousGCTime = AppData()->TimeProvider->Now();
+ TGenStep newCollectGenStep = FindNewGCBarrier();
+ Y_VERIFY(newCollectGenStep >= LastCollectedGenStep);
+ if (newCollectGenStep == LastCollectedGenStep) {
+ return {};
+ }
- CollectGenStepInFlight = NewCollectGenStep;
+ PreviousGCTime = AppData()->TimeProvider->Now();
const ui32 channelIdx = BLOB_CHANNEL;
- // Find the list of groups between LastCollectedGenSten and new GC GenStep
- PerGroupGCListsInFlight.clear();
- {
- const ui32 fromGen = std::get<0>(LastCollectedGenStep);
- const ui32 toGen = std::get<0>(CollectGenStepInFlight);
+ Y_VERIFY(PerGroupGCListsInFlight.empty());
+
+ // Clear all possibly not keeped trash in channel's groups: create an event for each group
+ if (FirstGC) {
+ FirstGC = false;
+
+ // TODO: we need only actual channel history here
const auto& channelHistory = TabletInfo->ChannelInfo(channelIdx)->History;
- auto fnCmpGen = [](ui32 gen, const auto& historyEntry) {
- return gen < historyEntry.FromGeneration;
- };
- // Look for the entry with FromGeneration <= fromGen and the next entry has FromGeneration > fromGen
- auto fromIt = std::upper_bound(channelHistory.begin(), channelHistory.end(), fromGen, fnCmpGen);
- if (fromIt != channelHistory.begin()) {
- --fromIt;
- }
- auto toIt = std::upper_bound(channelHistory.begin(), channelHistory.end(), toGen, fnCmpGen);
- for (auto it = fromIt; it != toIt; ++it) {
- ui32 group = it->GroupID;
- PerGroupGCListsInFlight[group];
+
+ for (auto it = channelHistory.begin(); it != channelHistory.end(); ++it) {
+ PerGroupGCListsInFlight[it->GroupID];
}
}
// Make per-group Keep/DontKeep lists
{
// Add all blobs to keep
- while (!BlobsToKeep.empty()) {
- auto blobIt = BlobsToKeep.begin();
- if (TGenStep{blobIt->Generation(), blobIt->Step()} > CollectGenStepInFlight) {
+ auto keepBlobIt = BlobsToKeep.begin();
+ for (; keepBlobIt != BlobsToKeep.end(); ++keepBlobIt) {
+ TGenStep genStep{keepBlobIt->Generation(), keepBlobIt->Step()};
+ if (genStep > newCollectGenStep) {
break;
}
- ui32 blobGroup = TabletInfo->GroupFor(blobIt->Channel(), blobIt->Generation());
- PerGroupGCListsInFlight[blobGroup].KeepList.insert(*blobIt);
- BlobsToKeep.erase(blobIt);
+ ui32 blobGroup = TabletInfo->GroupFor(keepBlobIt->Channel(), keepBlobIt->Generation());
+ PerGroupGCListsInFlight[blobGroup].KeepList.insert(*keepBlobIt);
+ }
+ if (BlobsToKeep.begin() != keepBlobIt) {
+ BlobsToKeep.erase(BlobsToKeep.begin(), keepBlobIt);
}
// Add all blobs to delete
- while (!BlobsToDelete.empty()) {
- auto blobIt = BlobsToDelete.begin();
- if (TGenStep{blobIt->Generation(), blobIt->Step()} > CollectGenStepInFlight) {
+ auto blobIt = BlobsToDelete.begin();
+ for (; blobIt != BlobsToDelete.end(); ++blobIt) {
+ TGenStep genStep{blobIt->Generation(), blobIt->Step()};
+ if (genStep > newCollectGenStep) {
break;
}
ui32 blobGroup = TabletInfo->GroupFor(blobIt->Channel(), blobIt->Generation());
- bool canSkipDontKeep = false;
- if (PerGroupGCListsInFlight[blobGroup].KeepList.count(*blobIt)) {
+ TGCLists& gl = PerGroupGCListsInFlight[blobGroup];
+ bool skipDontKeep = false;
+ if (gl.KeepList.count(*blobIt)) {
// Remove the blob from keep list if its also in the delete list
- PerGroupGCListsInFlight[blobGroup].KeepList.erase(*blobIt);
+ gl.KeepList.erase(*blobIt);
// Skipped blobs still need to be deleted from BlobsToKeep table
- PerGroupGCListsInFlight[blobGroup].KeepListSkipped.push_back(*blobIt);
+ KeepsToErase.push_back(TUnifiedBlobId(blobGroup, *blobIt));
if (CurrentGen == blobIt->Generation()) {
// If this blob was created and deleted in the current generation then
@@ -305,19 +319,22 @@ THashMap<ui32, std::unique_ptr<TEvBlobStorage::TEvCollectGarbage>> TBlobManager:
// NOTE: its not safe to do this for older generations because there is
// a scenario when Keep flag was sent in the old generation and then tablet restarted
// before getting the result and removing the blob from the Keep list.
- canSkipDontKeep = true;
+ skipDontKeep = true;
+ DeletesToErase.push_back(TUnifiedBlobId(blobGroup, *blobIt));
+ ++CountersUpdate.BlobSkippedEntries;
}
}
- if (!canSkipDontKeep) {
- PerGroupGCListsInFlight[blobGroup].DontKeepList.insert(*blobIt);
- } else {
- // Skipped blobs still need to be deleted from BlobsToDelete table
- PerGroupGCListsInFlight[blobGroup].DontKeepListSkipped.push_back(*blobIt);
+ if (!skipDontKeep) {
+ gl.DontKeepList.insert(*blobIt);
}
- BlobsToDelete.erase(blobIt);
+ }
+ if (BlobsToDelete.begin() != blobIt) {
+ BlobsToDelete.erase(BlobsToDelete.begin(), blobIt);
}
}
+ CollectGenStepInFlight = newCollectGenStep;
+
// Make per group requests
THashMap<ui32, std::unique_ptr<TEvBlobStorage::TEvCollectGarbage>> requests;
{
@@ -340,6 +357,27 @@ THashMap<ui32, std::unique_ptr<TEvBlobStorage::TEvCollectGarbage>> TBlobManager:
return requests;
}
+bool TBlobManager::CleanupFlaggedBlobs(IBlobManagerDb& db) {
+ if (KeepsToErase.empty() && DeletesToErase.empty()) {
+ return false;
+ }
+
+ static constexpr size_t maxBlobsToCleanup = 100000;
+ size_t numBlobs = 0;
+
+ for (; !KeepsToErase.empty() && numBlobs < maxBlobsToCleanup; ++numBlobs) {
+ db.EraseBlobToKeep(KeepsToErase.front());
+ KeepsToErase.pop_front();
+ }
+
+ for (; !DeletesToErase.empty() && numBlobs < maxBlobsToCleanup; ++numBlobs) {
+ db.EraseBlobToDelete(DeletesToErase.front());
+ DeletesToErase.pop_front();
+ }
+
+ return true;
+}
+
void TBlobManager::OnGCResult(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev, IBlobManagerDb& db) {
Y_VERIFY(ev->Get()->Status == NKikimrProto::OK, "The caller must handle unsuccessful status");
Y_VERIFY(!CounterToGroupInFlight.empty());
@@ -350,29 +388,36 @@ void TBlobManager::OnGCResult(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev,
Y_VERIFY(CounterToGroupInFlight.count(counterFromRequest));
ui32 group = CounterToGroupInFlight[counterFromRequest];
- auto it = PerGroupGCListsInFlight.find(group);
- for (const auto& blobId : it->second.KeepList) {
- db.EraseBlobToKeep(TUnifiedBlobId(group, blobId));
- }
+ auto it = PerGroupGCListsInFlight.find(group);
+ const auto& keepList = it->second.KeepList;
+ const auto& dontKeepList = it->second.DontKeepList;
- for (const auto& blobId : it->second.DontKeepList) {
- db.EraseBlobToDelete(TUnifiedBlobId(group, blobId));
- }
+ static constexpr size_t maxBlobsToCleanup = 100000;
+ size_t blobsToForget = keepList.size() + dontKeepList.size() + KeepsToErase.size() + DeletesToErase.size();
- for (const auto& blobId : it->second.KeepListSkipped) {
- db.EraseBlobToKeep(TUnifiedBlobId(group, blobId));
+ if (blobsToForget < maxBlobsToCleanup) {
+ for (const auto& blobId : keepList) {
+ db.EraseBlobToKeep(TUnifiedBlobId(group, blobId));
+ }
+ for (const auto& blobId : dontKeepList) {
+ db.EraseBlobToDelete(TUnifiedBlobId(group, blobId));
+ }
+ } else {
+ for (const auto& blobId : keepList) {
+ KeepsToErase.push_back(TUnifiedBlobId(group, blobId));
+ }
+ for (const auto& blobId : dontKeepList) {
+ DeletesToErase.push_back(TUnifiedBlobId(group, blobId));
+ }
}
- for (const auto& blobId : it->second.DontKeepListSkipped) {
- db.EraseBlobToDelete(TUnifiedBlobId(group, blobId));
- }
+ // NOTE: It clears blobs of different groups.
+ // It's expected to be safe cause we have GC result for the blobs or don't need such result.
+ CleanupFlaggedBlobs(db);
++CountersUpdate.GcRequestsSent;
- CountersUpdate.BlobKeepEntries += it->second.KeepList.size();
- CountersUpdate.BlobDontKeepEntries += it->second.DontKeepList.size();
- // "SkippedBlobs" counter tracks blobs that where excluded from both Keep and DontKeep lists
- // DontKeepListSkipped contains those blobs; KeepListSkipped contains them too but also some more
- CountersUpdate.BlobSkippedEntries += it->second.DontKeepListSkipped.size();
+ CountersUpdate.BlobKeepEntries += keepList.size();
+ CountersUpdate.BlobDontKeepEntries += dontKeepList.size();
PerGroupGCListsInFlight.erase(it);
CounterToGroupInFlight.erase(group);
diff --git a/ydb/core/tx/columnshard/blob_manager.h b/ydb/core/tx/columnshard/blob_manager.h
index 8bb09011f9..0d30720126 100644
--- a/ydb/core/tx/columnshard/blob_manager.h
+++ b/ydb/core/tx/columnshard/blob_manager.h
@@ -172,9 +172,6 @@ private:
// The Gen:Step that has been acknowledged by the Distributed Storage
TGenStep LastCollectedGenStep = {0, 0};
- // The Gen:Step where GC barrier can be moved
- TGenStep NewCollectGenStep = {0, 0};
-
// Distributed Storage requires a monotonically increasing counter for GC requests
ui64 PerGenerationCounter = 1;
@@ -183,15 +180,16 @@ private:
struct TGCLists {
THashSet<TLogoBlobID> KeepList;
THashSet<TLogoBlobID> DontKeepList;
- TVector<TLogoBlobID> KeepListSkipped; // List of blobs excluded from Keep list for optimization
- TVector<TLogoBlobID> DontKeepListSkipped; // List of blobs excluded from both Keep/DontKeep lists
- // NOTE: skipped blobs still need to be removed from local db after GC request completes
};
THashMap<ui32, TGCLists> PerGroupGCListsInFlight;
+ // NOTE: blobs still need to be removed from local db
+ TDeque<TUnifiedBlobId> KeepsToErase;
+ TDeque<TUnifiedBlobId> DeletesToErase;
// Maps PerGenerationCounter value to the group in PerGroupGCListsInFlight
THashMap<ui64, ui32> CounterToGroupInFlight;
// The barrier in the current in-flight GC request(s)
TGenStep CollectGenStepInFlight = {0, 0};
+ bool FirstGC = true;
// Stores counter updates since last call to GetCountersUpdate()
// Then the counters are reset and start accumulating new delta
@@ -211,12 +209,15 @@ public:
// Loads the state at startup
bool LoadState(IBlobManagerDb& db);
- // Checks if GC barrier can be moved. Updates NewCollectGenStep if possible.
- bool TryMoveGCBarrier();
+ bool CanCollectGarbage() const;
+ bool NeedStorageCG() const;
// Prepares Keep/DontKeep lists and GC barrier
THashMap<ui32, std::unique_ptr<TEvBlobStorage::TEvCollectGarbage>> PreparePerGroupGCRequests();
+ // Cleanup blobs that have correct flags (skipped or already marked with correct flags)
+ bool CleanupFlaggedBlobs(IBlobManagerDb& db);
+
// Called with GC result received from Distributed Storage
void OnGCResult(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev, IBlobManagerDb& db);
@@ -248,6 +249,7 @@ public:
void SetBlobInUse(const TUnifiedBlobId& blobId, bool inUse) override;
private:
+ TGenStep FindNewGCBarrier();
void DeleteSmallBlob(const TUnifiedBlobId& blobId, IBlobManagerDb& db);
// Delete small blobs that were previously in use and could not be deleted
diff --git a/ydb/core/tx/columnshard/blob_manager_txs.cpp b/ydb/core/tx/columnshard/blob_manager_txs.cpp
index ed9e49a8aa..53dbb9de57 100644
--- a/ydb/core/tx/columnshard/blob_manager_txs.cpp
+++ b/ydb/core/tx/columnshard/blob_manager_txs.cpp
@@ -16,11 +16,15 @@ public:
{}
bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
- Y_UNUSED(txc);
Y_UNUSED(ctx);
- Requests = Self->BlobManager->PreparePerGroupGCRequests();
+ // Cleanup delayed blobs before next GC
+ TBlobManagerDb blobManagerDb(txc.DB);
+ if (Self->BlobManager->CleanupFlaggedBlobs(blobManagerDb)) {
+ return true;
+ }
+ Requests = Self->BlobManager->PreparePerGroupGCRequests();
return true;
}
@@ -69,7 +73,7 @@ public:
void Complete(const TActorContext& ctx) override {
// Schedule next GC
- if (Self->BlobManager->TryMoveGCBarrier()) {
+ if (Self->BlobManager->CanCollectGarbage()) {
Self->Execute(Self->CreateTxRunGc(), ctx);
}
}
diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
index c3ddf59b4a..edb9a8e90d 100644
--- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
+++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
@@ -132,7 +132,7 @@ public:
}
Self->UpdateBlobMangerCounters();
- if (Self->BlobManager->TryMoveGCBarrier()) {
+ if (Self->BlobManager->CanCollectGarbage()) {
Self->Execute(Self->CreateTxRunGc(), ctx);
}