diff options
author | chertus <azuikov@ydb.tech> | 2023-04-07 12:38:57 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2023-04-07 12:38:57 +0300 |
commit | 486f33fb6f812329654a7b85b10e1f602557ead3 (patch) | |
tree | ccabb37ea7d6c48c2a84e6f3ab66fc0e9d027b20 | |
parent | fd191c37beaf0651cc6f10f4cc76e03b23b8f85c (diff) | |
download | ydb-486f33fb6f812329654a7b85b10e1f602557ead3.tar.gz |
fix ColumnShard GC in case of too many blobs to delete
-rw-r--r-- | ydb/core/tx/columnshard/blob_manager.cpp | 54 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/blob_manager.h | 6 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/blob_manager_txs.cpp | 9 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__progress_tx.cpp | 5 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.cpp | 13 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/defs.h | 1 |
7 files changed, 58 insertions, 32 deletions
diff --git a/ydb/core/tx/columnshard/blob_manager.cpp b/ydb/core/tx/columnshard/blob_manager.cpp index 4e863e3b5bb..3c5b04c0bb3 100644 --- a/ydb/core/tx/columnshard/blob_manager.cpp +++ b/ydb/core/tx/columnshard/blob_manager.cpp @@ -178,6 +178,15 @@ bool TBlobManager::LoadState(IBlobManagerDb& db) { Y_VERIFY(unifiedBlobId.IsDsBlob(), "Not a DS blob id in Keep table: %s", unifiedBlobId.ToStringNew().c_str()); TLogoBlobID blobId = unifiedBlobId.GetLogoBlobId(); + TGenStep genStep{blobId.Generation(), blobId.Step()}; + if (genStep <= LastCollectedGenStep) { + LOG_S_WARN("BlobManager at tablet " << TabletInfo->TabletID + << " Load not keeped blob " << unifiedBlobId.ToStringNew() << " collected by GenStep: " + << std::get<0>(LastCollectedGenStep) << ":" << std::get<1>(LastCollectedGenStep)); + KeepsToErase.emplace_back(unifiedBlobId); + continue; + } + BlobsToKeep.insert(blobId); // Keep + DontKeep (probably in different gen:steps) @@ -186,12 +195,7 @@ bool TBlobManager::LoadState(IBlobManagerDb& db) { continue; } - TGenStep genStep{blobId.Generation(), blobId.Step()}; genStepsWithBlobsToKeep.insert(genStep); - - Y_VERIFY(genStep > LastCollectedGenStep, - "Blob %s in keep queue is before last barrier (%" PRIu32 ":%" PRIu32 ")", - unifiedBlobId.ToStringNew().c_str(), std::get<0>(LastCollectedGenStep), std::get<1>(LastCollectedGenStep)); } AllocatedGenSteps.clear(); @@ -317,7 +321,7 @@ THashMap<ui32, std::unique_ptr<TEvBlobStorage::TEvCollectGarbage>> TBlobManager: // Remove the blob from keep list if its also in the delete list gl.KeepList.erase(*blobIt); // Skipped blobs still need to be deleted from BlobsToKeep table - KeepsToErase.push_back(TUnifiedBlobId(blobGroup, *blobIt)); + KeepsToErase.emplace_back(TUnifiedBlobId(blobGroup, *blobIt)); if (CurrentGen == blobIt->Generation()) { // If this blob was created and deleted in the current generation then @@ -326,7 +330,7 @@ THashMap<ui32, std::unique_ptr<TEvBlobStorage::TEvCollectGarbage>> TBlobManager: // a scenario when Keep flag was sent in the old generation and then tablet restarted // before getting the result and removing the blob from the Keep list. skipDontKeep = true; - DeletesToErase.push_back(TUnifiedBlobId(blobGroup, *blobIt)); + DeletesToErase.emplace_back(TUnifiedBlobId(blobGroup, *blobIt)); ++CountersUpdate.BlobSkippedEntries; } } @@ -363,12 +367,12 @@ THashMap<ui32, std::unique_ptr<TEvBlobStorage::TEvCollectGarbage>> TBlobManager: return requests; } -bool TBlobManager::CleanupFlaggedBlobs(IBlobManagerDb& db) { +size_t TBlobManager::CleanupFlaggedBlobs(IBlobManagerDb& db) { if (KeepsToErase.empty() && DeletesToErase.empty()) { - return false; + return 0; } - static constexpr size_t maxBlobsToCleanup = 100000; + static constexpr size_t maxBlobsToCleanup = TLimits::MAX_BLOBS_TO_DELETE; size_t numBlobs = 0; for (; !KeepsToErase.empty() && numBlobs < maxBlobsToCleanup; ++numBlobs) { @@ -381,7 +385,7 @@ bool TBlobManager::CleanupFlaggedBlobs(IBlobManagerDb& db) { DeletesToErase.pop_front(); } - return true; + return numBlobs; } void TBlobManager::OnGCResult(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev, IBlobManagerDb& db) { @@ -398,8 +402,12 @@ void TBlobManager::OnGCResult(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev, const auto& keepList = it->second.KeepList; const auto& dontKeepList = it->second.DontKeepList; - static constexpr size_t maxBlobsToCleanup = 100000; - size_t blobsToForget = keepList.size() + dontKeepList.size() + KeepsToErase.size() + DeletesToErase.size(); + // NOTE: It clears blobs of different groups. + // It's expected to be safe cause we have GC result for the blobs or don't need such result. + size_t maxBlobsToCleanup = TLimits::MAX_BLOBS_TO_DELETE; + maxBlobsToCleanup -= CleanupFlaggedBlobs(db); + + size_t blobsToForget = keepList.size() + dontKeepList.size(); if (blobsToForget < maxBlobsToCleanup) { for (const auto& blobId : keepList) { @@ -410,17 +418,13 @@ void TBlobManager::OnGCResult(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev, } } else { for (const auto& blobId : keepList) { - KeepsToErase.push_back(TUnifiedBlobId(group, blobId)); + KeepsToErase.emplace_back(TUnifiedBlobId(group, blobId)); } for (const auto& blobId : dontKeepList) { - DeletesToErase.push_back(TUnifiedBlobId(group, blobId)); + DeletesToErase.emplace_back(TUnifiedBlobId(group, blobId)); } } - // NOTE: It clears blobs of different groups. - // It's expected to be safe cause we have GC result for the blobs or don't need such result. - CleanupFlaggedBlobs(db); - ++CountersUpdate.GcRequestsSent; CountersUpdate.BlobKeepEntries += keepList.size(); CountersUpdate.BlobDontKeepEntries += dontKeepList.size(); @@ -457,9 +461,19 @@ void TBlobManager::SaveBlobBatch(TBlobBatch&& blobBatch, IBlobManagerDb& db) { << " Blob count: " << blobBatch.BatchInfo->BlobSizes.size()); // Add this batch to KeepQueue + TGenStep edgeGenStep = EdgeGenStep(); for (ui32 i = 0; i < blobBatch.BatchInfo->BlobSizes.size(); ++i) { const TUnifiedBlobId blobId = blobBatch.BatchInfo->MakeBlobId(i); - BlobsToKeep.insert(blobId.GetLogoBlobId()); + Y_VERIFY_DEBUG(blobId.IsDsBlob(), "Not a DS blob id: %s", blobId.ToStringNew().c_str()); + + auto logoblobId = blobId.GetLogoBlobId(); + TGenStep genStep{logoblobId.Generation(), logoblobId.Step()}; + + Y_VERIFY(genStep > edgeGenStep, + "Trying to keep blob %s that could be already collected by edge barrier (%" PRIu32 ":%" PRIu32 ")", + blobId.ToStringNew().c_str(), std::get<0>(edgeGenStep), std::get<1>(edgeGenStep)); + + BlobsToKeep.insert(std::move(logoblobId)); db.AddBlobToKeep(blobId); } diff --git a/ydb/core/tx/columnshard/blob_manager.h b/ydb/core/tx/columnshard/blob_manager.h index ff076b31ba2..3302097ff0c 100644 --- a/ydb/core/tx/columnshard/blob_manager.h +++ b/ydb/core/tx/columnshard/blob_manager.h @@ -217,7 +217,7 @@ public: THashMap<ui32, std::unique_ptr<TEvBlobStorage::TEvCollectGarbage>> PreparePerGroupGCRequests(); // Cleanup blobs that have correct flags (skipped or already marked with correct flags) - bool CleanupFlaggedBlobs(IBlobManagerDb& db); + size_t CleanupFlaggedBlobs(IBlobManagerDb& db); // Called with GC result received from Distributed Storage void OnGCResult(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev, IBlobManagerDb& db); @@ -259,6 +259,10 @@ private: void PerformDelayedDeletes(IBlobManagerDb& db); bool ExtractEvicted(TEvictedBlob& evict, TEvictMetadata& meta, bool fromDropped = false); + + TGenStep EdgeGenStep() const { + return (CollectGenStepInFlight == TGenStep{0, 0}) ? LastCollectedGenStep : CollectGenStepInFlight; + } }; } diff --git a/ydb/core/tx/columnshard/blob_manager_txs.cpp b/ydb/core/tx/columnshard/blob_manager_txs.cpp index 53dbb9de57d..68efa9ee1f2 100644 --- a/ydb/core/tx/columnshard/blob_manager_txs.cpp +++ b/ydb/core/tx/columnshard/blob_manager_txs.cpp @@ -29,6 +29,10 @@ public: } void Complete(const TActorContext& ctx) override { + if (Requests.empty()) { + Self->ScheduleNextGC(ctx); + } + for (auto& r : Requests) { ui32 groupId = r.first; auto ev = std::move(r.second); @@ -72,10 +76,7 @@ public: } void Complete(const TActorContext& ctx) override { - // Schedule next GC - if (Self->BlobManager->CanCollectGarbage()) { - Self->Execute(Self->CreateTxRunGc(), ctx); - } + Self->ScheduleNextGC(ctx); } }; diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp index 2ab3ab2a8e8..3982f4318cf 100644 --- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp +++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp @@ -198,10 +198,7 @@ public: ctx.Send(res.TxInfo.Source, event.Release(), 0, res.TxInfo.Cookie); } - Self->UpdateBlobMangerCounters(); - if (Self->BlobManager->CanCollectGarbage()) { - Self->Execute(Self->CreateTxRunGc(), ctx); - } + Self->ScheduleNextGC(ctx); switch (Trigger) { case ETriggerActivities::POST_INSERT: diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 5db1a69b302..78932bcaf05 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -473,7 +473,7 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl preset.Deserialize(tableProto.GetSchemaPreset()); Y_VERIFY(!preset.IsStandaloneTable()); tableVerProto.SetSchemaPresetId(preset.GetId()); - + if (TablesManager.RegisterSchemaPreset(preset, db)) { TablesManager.AddPresetVersion(tableProto.GetSchemaPreset().GetId(), version, tableProto.GetSchemaPreset().GetSchema(), db); } @@ -509,7 +509,7 @@ void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterP const ui64 pathId = alterProto.GetPathId(); Y_VERIFY(TablesManager.HasTable(pathId), "AlterTable on a dropped or non-existent table"); - + LOG_S_DEBUG("AlterTable for pathId: " << pathId << " schema: " << alterProto.GetSchema() << " ttl settings: " << alterProto.GetTtlSettings() @@ -532,7 +532,7 @@ void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterP ActivateTiering(pathId, tieringUsage); Schema::SaveTableInfo(db, pathId, tieringUsage); - tableVerProto.SetSchemaPresetVersionAdj(alterProto.GetSchemaPresetVersionAdj()); + tableVerProto.SetSchemaPresetVersionAdj(alterProto.GetSchemaPresetVersionAdj()); TablesManager.AddTableVersion(pathId, version, tableVerProto, db); TablesManager.OnTtlUpdate(); } @@ -582,6 +582,13 @@ void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto, } } +void TColumnShard::ScheduleNextGC(const TActorContext& ctx) { + UpdateBlobMangerCounters(); + if (BlobManager->CanCollectGarbage()) { + Execute(CreateTxRunGc(), ctx); + } +} + void TColumnShard::EnqueueBackgroundActivities(bool periodic, TBackgroundActivity activity) { if (periodic) { if (LastPeriodicBackActivation > TInstant::Now() - ActivationPeriod) { diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 383d51a3592..1d31ed4879a 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -463,6 +463,8 @@ private: bool GetExportedBlob(const TActorContext& ctx, TActorId dst, ui64 cookie, const TString& tierName, NOlap::TEvictedBlob&& evicted, std::vector<NOlap::TBlobRange>&& ranges); + void ScheduleNextGC(const TActorContext& ctx); + std::unique_ptr<TEvPrivate::TEvIndexing> SetupIndexation(); std::unique_ptr<TEvPrivate::TEvCompaction> SetupCompaction(); std::unique_ptr<TEvPrivate::TEvEviction> SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls = {}, diff --git a/ydb/core/tx/columnshard/defs.h b/ydb/core/tx/columnshard/defs.h index 87f6cacb264..d280261262f 100644 --- a/ydb/core/tx/columnshard/defs.h +++ b/ydb/core/tx/columnshard/defs.h @@ -16,6 +16,7 @@ struct TLimits { static constexpr const ui32 MIN_BYTES_TO_INSERT = 4 * 1024 * 1024; static constexpr const ui64 MAX_BYTES_TO_INSERT = 16 * 1024 * 1024; static constexpr const ui32 MAX_TX_RECORDS = 100000; + static constexpr const ui64 MAX_BLOBS_TO_DELETE = NOlap::TCompactionLimits::MAX_BLOBS_TO_DELETE; static ui64 GetBlobSizeLimit(); static ui64 GetMaxBlobSize(); |