diff options
author | alexvru <alexvru@ydb.tech> | 2023-12-01 16:05:57 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-12-01 19:28:07 +0300 |
commit | a67d67d80628b36612ea8c3283c68fa1e3608908 (patch) | |
tree | 508f889aad3e1b251ed3b177f52cd8f4c9a14a1b | |
parent | 3be304b490b0db30a2c7bc80fa3d543abfca8803 (diff) | |
download | ydb-a67d67d80628b36612ea8c3283c68fa1e3608908.tar.gz |
Issue initial soft barrier after tablet restart KIKIMR-18366
-rw-r--r-- | ydb/core/blob_depot/data.cpp | 16 | ||||
-rw-r--r-- | ydb/core/blob_depot/data.h | 1 | ||||
-rw-r--r-- | ydb/core/blob_depot/data_load.cpp | 8 | ||||
-rw-r--r-- | ydb/core/blob_depot/data_trash.cpp | 12 |
4 files changed, 32 insertions, 5 deletions
diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp index a960720bb4..26ab56894b 100644 --- a/ydb/core/blob_depot/data.cpp +++ b/ydb/core/blob_depot/data.cpp @@ -135,7 +135,10 @@ namespace NKikimr::NBlobDepot { if (inserted) { Y_ABORT_UNLESS(!CanBeCollected(TBlobSeqId::FromLogoBlobId(id))); Y_VERIFY_DEBUG_S(id.Generation() == generation, "BlobId# " << id << " Generation# " << generation); - Y_DEBUG_ABORT_UNLESS(Self->Channels[id.Channel()].GetLeastExpectedBlobId(generation) <= TBlobSeqId::FromLogoBlobId(id)); + Y_VERIFY_DEBUG_S(Self->Channels[id.Channel()].GetLeastExpectedBlobId(generation) <= TBlobSeqId::FromLogoBlobId(id), + "LeastExpectedBlobId# " << Self->Channels[id.Channel()].GetLeastExpectedBlobId(generation) + << " Id# " << id + << " Generation# " << generation); AddFirstMentionedBlob(id); } if (outcome == EUpdateOutcome::DROP) { @@ -603,7 +606,7 @@ namespace NKikimr::NBlobDepot { } bool TData::TRecordsPerChannelGroup::Collectible(TData *self) { - return !Trash.empty() || HardGenStep < GetHardGenStep(self); + return !Trash.empty() || HardGenStep < GetHardGenStep(self) || !InitialCollectionComplete; } TGenStep TData::TRecordsPerChannelGroup::GetHardGenStep(TData *self) { @@ -639,6 +642,10 @@ namespace NKikimr::NBlobDepot { Y_ABORT_UNLESS(blobSeqId.Channel < Self->Channels.size()); auto& channel = Self->Channels[blobSeqId.Channel]; +#ifndef NDEBUG + const TBlobSeqId leastBefore = channel.GetLeastExpectedBlobId(generation); +#endif + const ui64 value = blobSeqId.ToSequentialNumber(); agent.GivenIdRanges[blobSeqId.Channel].RemovePoint(value); @@ -647,6 +654,11 @@ namespace NKikimr::NBlobDepot { const bool inserted = channel.SequenceNumbersInFlight.insert(value).second; Y_ABORT_UNLESS(inserted); +#ifndef NDEBUG + // ensure least expected blob id didn't change + Y_ABORT_UNLESS(leastBefore == channel.GetLeastExpectedBlobId(generation)); +#endif + return true; } diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h index 3b982ccb36..11dd7f12dd 100644 --- a/ydb/core/blob_depot/data.h +++ b/ydb/core/blob_depot/data.h @@ -400,6 +400,7 @@ namespace NKikimr::NBlobDepot { TGenStep HardGenStep; // last sucessfully confirmed (non-persistent value) ui32 CollectGarbageRequestsInFlight = 0; TBlobSeqId LastLeastBlobSeqId; + bool InitialCollectionComplete = false; TRecordsPerChannelGroup(ui8 channel, ui32 groupId) : Channel(channel) diff --git a/ydb/core/blob_depot/data_load.cpp b/ydb/core/blob_depot/data_load.cpp index ec9c44b6e0..23fd922f73 100644 --- a/ydb/core/blob_depot/data_load.cpp +++ b/ydb/core/blob_depot/data_load.cpp @@ -74,6 +74,14 @@ namespace NKikimr::NBlobDepot { }); Y_ABORT_UNLESS(Loaded); Self->OnDataLoadComplete(); + + // prepare records for all groups in history + for (const auto& channel : Self->Info()->Channels) { + for (const auto& entry : channel.History) { + RecordsPerChannelGroup.try_emplace(std::make_tuple(channel.Channel, entry.GroupID), channel.Channel, entry.GroupID); + } + } + for (auto& [key, record] : RecordsPerChannelGroup) { record.CollectIfPossible(this); } diff --git a/ydb/core/blob_depot/data_trash.cpp b/ydb/core/blob_depot/data_trash.cpp index aac9ea9fe8..e33fc5905b 100644 --- a/ydb/core/blob_depot/data_trash.cpp +++ b/ydb/core/blob_depot/data_trash.cpp @@ -58,14 +58,18 @@ namespace NKikimr::NBlobDepot { SendToBSProxy(Self->SelfId(), record.GroupId, ev.release(), id); } - if (record.Trash.empty()) { + Y_ABORT_UNLESS(generation >= 1); + const TGenStep barrierGenStep(generation - 1, Max<ui32>()); + + if (record.Trash.empty() && record.IssuedGenStep == barrierGenStep) { return; // no trash to collect with soft barrier } Y_ABORT_UNLESS(record.Channel < Self->Channels.size()); auto& channel = Self->Channels[record.Channel]; - TGenStep nextGenStep = Max(record.IssuedGenStep, TGenStep(*--record.Trash.end())); + const TGenStep trashGenStep = record.Trash.empty() ? TGenStep() : TGenStep(*--record.Trash.end()); + TGenStep nextGenStep = Max(record.IssuedGenStep, trashGenStep, barrierGenStep); std::set<TLogoBlobID>::iterator trashEndIter = record.Trash.end(); // step we are going to invalidate (including blobs with this one) @@ -111,6 +115,7 @@ namespace NKikimr::NBlobDepot { leastExpectedBlobId.Step, record.Channel, 0, 0); trashEndIter = record.Trash.lower_bound(maxId); nextGenStep = Max(record.IssuedGenStep, + barrierGenStep, trashEndIter != record.Trash.begin() ? TGenStep(*std::prev(trashEndIter)) : TGenStep()); @@ -146,7 +151,7 @@ namespace NKikimr::NBlobDepot { Y_ABORT_UNLESS(nextGenStep >= record.IssuedGenStep); if (trashInFlight.empty()) { - Y_ABORT_UNLESS(keep.empty()); // nothing to do here + Y_ABORT_UNLESS(keep.empty() || record.IssuedGenStep != nextGenStep); // nothing to do here } else { auto keep_ = keep ? std::make_unique<TVector<TLogoBlobID>>(std::move(keep)) : nullptr; auto doNotKeep_ = doNotKeep ? std::make_unique<TVector<TLogoBlobID>>(std::move(doNotKeep)) : nullptr; @@ -231,6 +236,7 @@ namespace NKikimr::NBlobDepot { if (info.Hard) { ExecuteHardGC(record.Channel, record.GroupId, info.GenStep); } else { + record.InitialCollectionComplete = true; record.OnSuccessfulCollect(this); ExecuteConfirmGC(record.Channel, record.GroupId, std::exchange(record.TrashInFlight, {}), 0, record.LastConfirmedGenStep); |