aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-12-01 16:05:57 +0300
committeralexvru <alexvru@ydb.tech>2023-12-01 19:28:07 +0300
commita67d67d80628b36612ea8c3283c68fa1e3608908 (patch)
tree508f889aad3e1b251ed3b177f52cd8f4c9a14a1b
parent3be304b490b0db30a2c7bc80fa3d543abfca8803 (diff)
downloadydb-a67d67d80628b36612ea8c3283c68fa1e3608908.tar.gz
Issue initial soft barrier after tablet restart KIKIMR-18366
-rw-r--r--ydb/core/blob_depot/data.cpp16
-rw-r--r--ydb/core/blob_depot/data.h1
-rw-r--r--ydb/core/blob_depot/data_load.cpp8
-rw-r--r--ydb/core/blob_depot/data_trash.cpp12
4 files changed, 32 insertions, 5 deletions
diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp
index a960720bb4..26ab56894b 100644
--- a/ydb/core/blob_depot/data.cpp
+++ b/ydb/core/blob_depot/data.cpp
@@ -135,7 +135,10 @@ namespace NKikimr::NBlobDepot {
if (inserted) {
Y_ABORT_UNLESS(!CanBeCollected(TBlobSeqId::FromLogoBlobId(id)));
Y_VERIFY_DEBUG_S(id.Generation() == generation, "BlobId# " << id << " Generation# " << generation);
- Y_DEBUG_ABORT_UNLESS(Self->Channels[id.Channel()].GetLeastExpectedBlobId(generation) <= TBlobSeqId::FromLogoBlobId(id));
+ Y_VERIFY_DEBUG_S(Self->Channels[id.Channel()].GetLeastExpectedBlobId(generation) <= TBlobSeqId::FromLogoBlobId(id),
+ "LeastExpectedBlobId# " << Self->Channels[id.Channel()].GetLeastExpectedBlobId(generation)
+ << " Id# " << id
+ << " Generation# " << generation);
AddFirstMentionedBlob(id);
}
if (outcome == EUpdateOutcome::DROP) {
@@ -603,7 +606,7 @@ namespace NKikimr::NBlobDepot {
}
bool TData::TRecordsPerChannelGroup::Collectible(TData *self) {
- return !Trash.empty() || HardGenStep < GetHardGenStep(self);
+ return !Trash.empty() || HardGenStep < GetHardGenStep(self) || !InitialCollectionComplete;
}
TGenStep TData::TRecordsPerChannelGroup::GetHardGenStep(TData *self) {
@@ -639,6 +642,10 @@ namespace NKikimr::NBlobDepot {
Y_ABORT_UNLESS(blobSeqId.Channel < Self->Channels.size());
auto& channel = Self->Channels[blobSeqId.Channel];
+#ifndef NDEBUG
+ const TBlobSeqId leastBefore = channel.GetLeastExpectedBlobId(generation);
+#endif
+
const ui64 value = blobSeqId.ToSequentialNumber();
agent.GivenIdRanges[blobSeqId.Channel].RemovePoint(value);
@@ -647,6 +654,11 @@ namespace NKikimr::NBlobDepot {
const bool inserted = channel.SequenceNumbersInFlight.insert(value).second;
Y_ABORT_UNLESS(inserted);
+#ifndef NDEBUG
+ // ensure least expected blob id didn't change
+ Y_ABORT_UNLESS(leastBefore == channel.GetLeastExpectedBlobId(generation));
+#endif
+
return true;
}
diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h
index 3b982ccb36..11dd7f12dd 100644
--- a/ydb/core/blob_depot/data.h
+++ b/ydb/core/blob_depot/data.h
@@ -400,6 +400,7 @@ namespace NKikimr::NBlobDepot {
TGenStep HardGenStep; // last sucessfully confirmed (non-persistent value)
ui32 CollectGarbageRequestsInFlight = 0;
TBlobSeqId LastLeastBlobSeqId;
+ bool InitialCollectionComplete = false;
TRecordsPerChannelGroup(ui8 channel, ui32 groupId)
: Channel(channel)
diff --git a/ydb/core/blob_depot/data_load.cpp b/ydb/core/blob_depot/data_load.cpp
index ec9c44b6e0..23fd922f73 100644
--- a/ydb/core/blob_depot/data_load.cpp
+++ b/ydb/core/blob_depot/data_load.cpp
@@ -74,6 +74,14 @@ namespace NKikimr::NBlobDepot {
});
Y_ABORT_UNLESS(Loaded);
Self->OnDataLoadComplete();
+
+ // prepare records for all groups in history
+ for (const auto& channel : Self->Info()->Channels) {
+ for (const auto& entry : channel.History) {
+ RecordsPerChannelGroup.try_emplace(std::make_tuple(channel.Channel, entry.GroupID), channel.Channel, entry.GroupID);
+ }
+ }
+
for (auto& [key, record] : RecordsPerChannelGroup) {
record.CollectIfPossible(this);
}
diff --git a/ydb/core/blob_depot/data_trash.cpp b/ydb/core/blob_depot/data_trash.cpp
index aac9ea9fe8..e33fc5905b 100644
--- a/ydb/core/blob_depot/data_trash.cpp
+++ b/ydb/core/blob_depot/data_trash.cpp
@@ -58,14 +58,18 @@ namespace NKikimr::NBlobDepot {
SendToBSProxy(Self->SelfId(), record.GroupId, ev.release(), id);
}
- if (record.Trash.empty()) {
+ Y_ABORT_UNLESS(generation >= 1);
+ const TGenStep barrierGenStep(generation - 1, Max<ui32>());
+
+ if (record.Trash.empty() && record.IssuedGenStep == barrierGenStep) {
return; // no trash to collect with soft barrier
}
Y_ABORT_UNLESS(record.Channel < Self->Channels.size());
auto& channel = Self->Channels[record.Channel];
- TGenStep nextGenStep = Max(record.IssuedGenStep, TGenStep(*--record.Trash.end()));
+ const TGenStep trashGenStep = record.Trash.empty() ? TGenStep() : TGenStep(*--record.Trash.end());
+ TGenStep nextGenStep = Max(record.IssuedGenStep, trashGenStep, barrierGenStep);
std::set<TLogoBlobID>::iterator trashEndIter = record.Trash.end();
// step we are going to invalidate (including blobs with this one)
@@ -111,6 +115,7 @@ namespace NKikimr::NBlobDepot {
leastExpectedBlobId.Step, record.Channel, 0, 0);
trashEndIter = record.Trash.lower_bound(maxId);
nextGenStep = Max(record.IssuedGenStep,
+ barrierGenStep,
trashEndIter != record.Trash.begin()
? TGenStep(*std::prev(trashEndIter))
: TGenStep());
@@ -146,7 +151,7 @@ namespace NKikimr::NBlobDepot {
Y_ABORT_UNLESS(nextGenStep >= record.IssuedGenStep);
if (trashInFlight.empty()) {
- Y_ABORT_UNLESS(keep.empty()); // nothing to do here
+ Y_ABORT_UNLESS(keep.empty() || record.IssuedGenStep != nextGenStep); // nothing to do here
} else {
auto keep_ = keep ? std::make_unique<TVector<TLogoBlobID>>(std::move(keep)) : nullptr;
auto doNotKeep_ = doNotKeep ? std::make_unique<TVector<TLogoBlobID>>(std::move(doNotKeep)) : nullptr;
@@ -231,6 +236,7 @@ namespace NKikimr::NBlobDepot {
if (info.Hard) {
ExecuteHardGC(record.Channel, record.GroupId, info.GenStep);
} else {
+ record.InitialCollectionComplete = true;
record.OnSuccessfulCollect(this);
ExecuteConfirmGC(record.Channel, record.GroupId, std::exchange(record.TrashInFlight, {}), 0,
record.LastConfirmedGenStep);