aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2023-04-07 12:38:57 +0300
committerchertus <azuikov@ydb.tech>2023-04-07 12:38:57 +0300
commit486f33fb6f812329654a7b85b10e1f602557ead3 (patch)
treeccabb37ea7d6c48c2a84e6f3ab66fc0e9d027b20
parentfd191c37beaf0651cc6f10f4cc76e03b23b8f85c (diff)
downloadydb-486f33fb6f812329654a7b85b10e1f602557ead3.tar.gz
fix ColumnShard GC in case of too many blobs to delete
-rw-r--r--ydb/core/tx/columnshard/blob_manager.cpp54
-rw-r--r--ydb/core/tx/columnshard/blob_manager.h6
-rw-r--r--ydb/core/tx/columnshard/blob_manager_txs.cpp9
-rw-r--r--ydb/core/tx/columnshard/columnshard__progress_tx.cpp5
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp13
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h2
-rw-r--r--ydb/core/tx/columnshard/defs.h1
7 files changed, 58 insertions, 32 deletions
diff --git a/ydb/core/tx/columnshard/blob_manager.cpp b/ydb/core/tx/columnshard/blob_manager.cpp
index 4e863e3b5bb..3c5b04c0bb3 100644
--- a/ydb/core/tx/columnshard/blob_manager.cpp
+++ b/ydb/core/tx/columnshard/blob_manager.cpp
@@ -178,6 +178,15 @@ bool TBlobManager::LoadState(IBlobManagerDb& db) {
Y_VERIFY(unifiedBlobId.IsDsBlob(), "Not a DS blob id in Keep table: %s", unifiedBlobId.ToStringNew().c_str());
TLogoBlobID blobId = unifiedBlobId.GetLogoBlobId();
+ TGenStep genStep{blobId.Generation(), blobId.Step()};
+ if (genStep <= LastCollectedGenStep) {
+ LOG_S_WARN("BlobManager at tablet " << TabletInfo->TabletID
+ << " Load not keeped blob " << unifiedBlobId.ToStringNew() << " collected by GenStep: "
+ << std::get<0>(LastCollectedGenStep) << ":" << std::get<1>(LastCollectedGenStep));
+ KeepsToErase.emplace_back(unifiedBlobId);
+ continue;
+ }
+
BlobsToKeep.insert(blobId);
// Keep + DontKeep (probably in different gen:steps)
@@ -186,12 +195,7 @@ bool TBlobManager::LoadState(IBlobManagerDb& db) {
continue;
}
- TGenStep genStep{blobId.Generation(), blobId.Step()};
genStepsWithBlobsToKeep.insert(genStep);
-
- Y_VERIFY(genStep > LastCollectedGenStep,
- "Blob %s in keep queue is before last barrier (%" PRIu32 ":%" PRIu32 ")",
- unifiedBlobId.ToStringNew().c_str(), std::get<0>(LastCollectedGenStep), std::get<1>(LastCollectedGenStep));
}
AllocatedGenSteps.clear();
@@ -317,7 +321,7 @@ THashMap<ui32, std::unique_ptr<TEvBlobStorage::TEvCollectGarbage>> TBlobManager:
// Remove the blob from keep list if its also in the delete list
gl.KeepList.erase(*blobIt);
// Skipped blobs still need to be deleted from BlobsToKeep table
- KeepsToErase.push_back(TUnifiedBlobId(blobGroup, *blobIt));
+ KeepsToErase.emplace_back(TUnifiedBlobId(blobGroup, *blobIt));
if (CurrentGen == blobIt->Generation()) {
// If this blob was created and deleted in the current generation then
@@ -326,7 +330,7 @@ THashMap<ui32, std::unique_ptr<TEvBlobStorage::TEvCollectGarbage>> TBlobManager:
// a scenario when Keep flag was sent in the old generation and then tablet restarted
// before getting the result and removing the blob from the Keep list.
skipDontKeep = true;
- DeletesToErase.push_back(TUnifiedBlobId(blobGroup, *blobIt));
+ DeletesToErase.emplace_back(TUnifiedBlobId(blobGroup, *blobIt));
++CountersUpdate.BlobSkippedEntries;
}
}
@@ -363,12 +367,12 @@ THashMap<ui32, std::unique_ptr<TEvBlobStorage::TEvCollectGarbage>> TBlobManager:
return requests;
}
-bool TBlobManager::CleanupFlaggedBlobs(IBlobManagerDb& db) {
+size_t TBlobManager::CleanupFlaggedBlobs(IBlobManagerDb& db) {
if (KeepsToErase.empty() && DeletesToErase.empty()) {
- return false;
+ return 0;
}
- static constexpr size_t maxBlobsToCleanup = 100000;
+ static constexpr size_t maxBlobsToCleanup = TLimits::MAX_BLOBS_TO_DELETE;
size_t numBlobs = 0;
for (; !KeepsToErase.empty() && numBlobs < maxBlobsToCleanup; ++numBlobs) {
@@ -381,7 +385,7 @@ bool TBlobManager::CleanupFlaggedBlobs(IBlobManagerDb& db) {
DeletesToErase.pop_front();
}
- return true;
+ return numBlobs;
}
void TBlobManager::OnGCResult(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev, IBlobManagerDb& db) {
@@ -398,8 +402,12 @@ void TBlobManager::OnGCResult(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev,
const auto& keepList = it->second.KeepList;
const auto& dontKeepList = it->second.DontKeepList;
- static constexpr size_t maxBlobsToCleanup = 100000;
- size_t blobsToForget = keepList.size() + dontKeepList.size() + KeepsToErase.size() + DeletesToErase.size();
+ // NOTE: It clears blobs of different groups.
+ // It's expected to be safe cause we have GC result for the blobs or don't need such result.
+ size_t maxBlobsToCleanup = TLimits::MAX_BLOBS_TO_DELETE;
+ maxBlobsToCleanup -= CleanupFlaggedBlobs(db);
+
+ size_t blobsToForget = keepList.size() + dontKeepList.size();
if (blobsToForget < maxBlobsToCleanup) {
for (const auto& blobId : keepList) {
@@ -410,17 +418,13 @@ void TBlobManager::OnGCResult(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev,
}
} else {
for (const auto& blobId : keepList) {
- KeepsToErase.push_back(TUnifiedBlobId(group, blobId));
+ KeepsToErase.emplace_back(TUnifiedBlobId(group, blobId));
}
for (const auto& blobId : dontKeepList) {
- DeletesToErase.push_back(TUnifiedBlobId(group, blobId));
+ DeletesToErase.emplace_back(TUnifiedBlobId(group, blobId));
}
}
- // NOTE: It clears blobs of different groups.
- // It's expected to be safe cause we have GC result for the blobs or don't need such result.
- CleanupFlaggedBlobs(db);
-
++CountersUpdate.GcRequestsSent;
CountersUpdate.BlobKeepEntries += keepList.size();
CountersUpdate.BlobDontKeepEntries += dontKeepList.size();
@@ -457,9 +461,19 @@ void TBlobManager::SaveBlobBatch(TBlobBatch&& blobBatch, IBlobManagerDb& db) {
<< " Blob count: " << blobBatch.BatchInfo->BlobSizes.size());
// Add this batch to KeepQueue
+ TGenStep edgeGenStep = EdgeGenStep();
for (ui32 i = 0; i < blobBatch.BatchInfo->BlobSizes.size(); ++i) {
const TUnifiedBlobId blobId = blobBatch.BatchInfo->MakeBlobId(i);
- BlobsToKeep.insert(blobId.GetLogoBlobId());
+ Y_VERIFY_DEBUG(blobId.IsDsBlob(), "Not a DS blob id: %s", blobId.ToStringNew().c_str());
+
+ auto logoblobId = blobId.GetLogoBlobId();
+ TGenStep genStep{logoblobId.Generation(), logoblobId.Step()};
+
+ Y_VERIFY(genStep > edgeGenStep,
+ "Trying to keep blob %s that could be already collected by edge barrier (%" PRIu32 ":%" PRIu32 ")",
+ blobId.ToStringNew().c_str(), std::get<0>(edgeGenStep), std::get<1>(edgeGenStep));
+
+ BlobsToKeep.insert(std::move(logoblobId));
db.AddBlobToKeep(blobId);
}
diff --git a/ydb/core/tx/columnshard/blob_manager.h b/ydb/core/tx/columnshard/blob_manager.h
index ff076b31ba2..3302097ff0c 100644
--- a/ydb/core/tx/columnshard/blob_manager.h
+++ b/ydb/core/tx/columnshard/blob_manager.h
@@ -217,7 +217,7 @@ public:
THashMap<ui32, std::unique_ptr<TEvBlobStorage::TEvCollectGarbage>> PreparePerGroupGCRequests();
// Cleanup blobs that have correct flags (skipped or already marked with correct flags)
- bool CleanupFlaggedBlobs(IBlobManagerDb& db);
+ size_t CleanupFlaggedBlobs(IBlobManagerDb& db);
// Called with GC result received from Distributed Storage
void OnGCResult(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev, IBlobManagerDb& db);
@@ -259,6 +259,10 @@ private:
void PerformDelayedDeletes(IBlobManagerDb& db);
bool ExtractEvicted(TEvictedBlob& evict, TEvictMetadata& meta, bool fromDropped = false);
+
+ TGenStep EdgeGenStep() const {
+ return (CollectGenStepInFlight == TGenStep{0, 0}) ? LastCollectedGenStep : CollectGenStepInFlight;
+ }
};
}
diff --git a/ydb/core/tx/columnshard/blob_manager_txs.cpp b/ydb/core/tx/columnshard/blob_manager_txs.cpp
index 53dbb9de57d..68efa9ee1f2 100644
--- a/ydb/core/tx/columnshard/blob_manager_txs.cpp
+++ b/ydb/core/tx/columnshard/blob_manager_txs.cpp
@@ -29,6 +29,10 @@ public:
}
void Complete(const TActorContext& ctx) override {
+ if (Requests.empty()) {
+ Self->ScheduleNextGC(ctx);
+ }
+
for (auto& r : Requests) {
ui32 groupId = r.first;
auto ev = std::move(r.second);
@@ -72,10 +76,7 @@ public:
}
void Complete(const TActorContext& ctx) override {
- // Schedule next GC
- if (Self->BlobManager->CanCollectGarbage()) {
- Self->Execute(Self->CreateTxRunGc(), ctx);
- }
+ Self->ScheduleNextGC(ctx);
}
};
diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
index 2ab3ab2a8e8..3982f4318cf 100644
--- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
+++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
@@ -198,10 +198,7 @@ public:
ctx.Send(res.TxInfo.Source, event.Release(), 0, res.TxInfo.Cookie);
}
- Self->UpdateBlobMangerCounters();
- if (Self->BlobManager->CanCollectGarbage()) {
- Self->Execute(Self->CreateTxRunGc(), ctx);
- }
+ Self->ScheduleNextGC(ctx);
switch (Trigger) {
case ETriggerActivities::POST_INSERT:
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 5db1a69b302..78932bcaf05 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -473,7 +473,7 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl
preset.Deserialize(tableProto.GetSchemaPreset());
Y_VERIFY(!preset.IsStandaloneTable());
tableVerProto.SetSchemaPresetId(preset.GetId());
-
+
if (TablesManager.RegisterSchemaPreset(preset, db)) {
TablesManager.AddPresetVersion(tableProto.GetSchemaPreset().GetId(), version, tableProto.GetSchemaPreset().GetSchema(), db);
}
@@ -509,7 +509,7 @@ void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterP
const ui64 pathId = alterProto.GetPathId();
Y_VERIFY(TablesManager.HasTable(pathId), "AlterTable on a dropped or non-existent table");
-
+
LOG_S_DEBUG("AlterTable for pathId: " << pathId
<< " schema: " << alterProto.GetSchema()
<< " ttl settings: " << alterProto.GetTtlSettings()
@@ -532,7 +532,7 @@ void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterP
ActivateTiering(pathId, tieringUsage);
Schema::SaveTableInfo(db, pathId, tieringUsage);
- tableVerProto.SetSchemaPresetVersionAdj(alterProto.GetSchemaPresetVersionAdj());
+ tableVerProto.SetSchemaPresetVersionAdj(alterProto.GetSchemaPresetVersionAdj());
TablesManager.AddTableVersion(pathId, version, tableVerProto, db);
TablesManager.OnTtlUpdate();
}
@@ -582,6 +582,13 @@ void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto,
}
}
+void TColumnShard::ScheduleNextGC(const TActorContext& ctx) {
+ UpdateBlobMangerCounters();
+ if (BlobManager->CanCollectGarbage()) {
+ Execute(CreateTxRunGc(), ctx);
+ }
+}
+
void TColumnShard::EnqueueBackgroundActivities(bool periodic, TBackgroundActivity activity) {
if (periodic) {
if (LastPeriodicBackActivation > TInstant::Now() - ActivationPeriod) {
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index 383d51a3592..1d31ed4879a 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -463,6 +463,8 @@ private:
bool GetExportedBlob(const TActorContext& ctx, TActorId dst, ui64 cookie, const TString& tierName,
NOlap::TEvictedBlob&& evicted, std::vector<NOlap::TBlobRange>&& ranges);
+ void ScheduleNextGC(const TActorContext& ctx);
+
std::unique_ptr<TEvPrivate::TEvIndexing> SetupIndexation();
std::unique_ptr<TEvPrivate::TEvCompaction> SetupCompaction();
std::unique_ptr<TEvPrivate::TEvEviction> SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls = {},
diff --git a/ydb/core/tx/columnshard/defs.h b/ydb/core/tx/columnshard/defs.h
index 87f6cacb264..d280261262f 100644
--- a/ydb/core/tx/columnshard/defs.h
+++ b/ydb/core/tx/columnshard/defs.h
@@ -16,6 +16,7 @@ struct TLimits {
static constexpr const ui32 MIN_BYTES_TO_INSERT = 4 * 1024 * 1024;
static constexpr const ui64 MAX_BYTES_TO_INSERT = 16 * 1024 * 1024;
static constexpr const ui32 MAX_TX_RECORDS = 100000;
+ static constexpr const ui64 MAX_BLOBS_TO_DELETE = NOlap::TCompactionLimits::MAX_BLOBS_TO_DELETE;
static ui64 GetBlobSizeLimit();
static ui64 GetMaxBlobSize();