diff options
author | alexvru <alexvru@ydb.tech> | 2023-11-21 17:14:55 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-11-21 19:31:13 +0300 |
commit | 6eaac20f79fdd38e61456ad341898290054670dd (patch) | |
tree | 8a04342a17e37fb87b71556c07a0aef82b302fdf | |
parent | c44e2073480bbdd2a095c19ffaaab1506961d51c (diff) | |
download | ydb-6eaac20f79fdd38e61456ad341898290054670dd.tar.gz |
Issue hard barrier in BlobDepot KIKIMR-18366
-rw-r--r-- | ydb/core/blob_depot/data.cpp | 29 | ||||
-rw-r--r-- | ydb/core/blob_depot/data.h | 15 | ||||
-rw-r--r-- | ydb/core/blob_depot/data_gc.cpp | 50 | ||||
-rw-r--r-- | ydb/core/blob_depot/data_trash.cpp | 94 | ||||
-rw-r--r-- | ydb/core/blob_depot/types.h | 5 | ||||
-rw-r--r-- | ydb/core/protos/counters_blob_depot.proto | 1 |
6 files changed, 169 insertions, 25 deletions
diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp index 7d06dfd785..6325b1750b 100644 --- a/ydb/core/blob_depot/data.cpp +++ b/ydb/core/blob_depot/data.cpp @@ -569,30 +569,45 @@ namespace NKikimr::NBlobDepot { for (const TLogoBlobID& id : TrashInFlight) { for (; it != Trash.end() && *it < id; ++it) {} Y_ABORT_UNLESS(it != Trash.end() && *it == id); - it = Trash.erase(it); - self->AccountBlob(id, false); - self->TotalStoredTrashSize -= id.BlobSize(); - self->Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_TOTAL_STORED_TRASH_SIZE] = self->TotalStoredTrashSize; + DeleteTrashRecord(self, it); } LastConfirmedGenStep = IssuedGenStep; } + void TData::TRecordsPerChannelGroup::DeleteTrashRecord(TData *self, std::set<TLogoBlobID>::iterator& it) { + self->AccountBlob(*it, false); + self->TotalStoredTrashSize -= it->BlobSize(); + self->Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_TOTAL_STORED_TRASH_SIZE] = self->TotalStoredTrashSize; + it = Trash.erase(it); + } + void TData::TRecordsPerChannelGroup::OnLeastExpectedBlobIdChange(TData *self) { CollectIfPossible(self); } void TData::TRecordsPerChannelGroup::ClearInFlight(TData *self) { - Y_ABORT_UNLESS(CollectGarbageRequestInFlight); - CollectGarbageRequestInFlight = false; + Y_ABORT_UNLESS(CollectGarbageRequestsInFlight); + --CollectGarbageRequestsInFlight; CollectIfPossible(self); } void TData::TRecordsPerChannelGroup::CollectIfPossible(TData *self) { - if (!CollectGarbageRequestInFlight && !Trash.empty() && self->Loaded) { + if (!CollectGarbageRequestsInFlight && Collectible(self) && self->Loaded) { self->HandleTrash(*this); } } + bool TData::TRecordsPerChannelGroup::Collectible(TData *self) const { + return !Trash.empty() || HardGenStep < GetHardGenStep(self); + } + + TGenStep TData::TRecordsPerChannelGroup::GetHardGenStep(TData *self) const { + const TGenStep genStep = Used.empty() + ? TGenStep(self->Self->Channels[Channel].GetLeastExpectedBlobId(self->Self->Executor()->Generation())) + : TGenStep(*Used.begin()); + return genStep.Previous(); + } + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// bool TData::BeginCommittingBlobSeqId(TAgent& agent, TBlobSeqId blobSeqId) { diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h index 4f77b28391..2d7c4fc6a8 100644 --- a/ydb/core/blob_depot/data.h +++ b/ydb/core/blob_depot/data.h @@ -397,7 +397,8 @@ namespace NKikimr::NBlobDepot { ui32 PerGenerationCounter = 1; TGenStep IssuedGenStep; // currently in flight or already confirmed TGenStep LastConfirmedGenStep; - bool CollectGarbageRequestInFlight = false; + TGenStep HardGenStep; // last sucessfully confirmed (non-persistent value) + ui32 CollectGarbageRequestsInFlight = 0; TRecordsPerChannelGroup(ui8 channel, ui32 groupId) : Channel(channel) @@ -406,9 +407,12 @@ namespace NKikimr::NBlobDepot { void MoveToTrash(TData *self, TLogoBlobID id); void OnSuccessfulCollect(TData *self); + void DeleteTrashRecord(TData *self, std::set<TLogoBlobID>::iterator& it); void OnLeastExpectedBlobIdChange(TData *self); void ClearInFlight(TData *self); void CollectIfPossible(TData *self); + bool Collectible(TData *self) const; + TGenStep GetHardGenStep(TData *self) const; }; bool Loaded = false; @@ -428,6 +432,7 @@ namespace NKikimr::NBlobDepot { class TTxIssueGC; class TTxConfirmGC; + class TTxHardGC; class TTxDataLoad; @@ -444,6 +449,8 @@ namespace NKikimr::NBlobDepot { struct TCollectCmd { ui64 QueryId; ui32 GroupId; + bool Hard; + TGenStep GenStep; }; ui64 LastCollectCmdId = 0; std::unordered_map<ui64, TCollectCmd> CollectCmds; @@ -647,6 +654,10 @@ namespace NKikimr::NBlobDepot { void OnCommitConfirmedGC(ui8 channel, ui32 groupId, std::vector<TLogoBlobID> trashDeleted); bool OnBarrierShift(ui64 tabletId, ui8 channel, bool hard, TGenStep previous, TGenStep current, ui32& maxItems, NTabletFlatExecutor::TTransactionContext& txc, void *cookie); + void CollectTrashByHardBarrier(ui8 channel, ui32 groupId, TGenStep hardGenStep, + const std::function<bool(TLogoBlobID)>& callback); + void OnCommitHardGC(ui8 channel, ui32 groupId, TGenStep hardGenStep); + void TrimChannelHistory(ui8 channel, ui32 groupId, std::vector<TLogoBlobID> trashDeleted); void AddFirstMentionedBlob(TLogoBlobID id); void AccountBlob(TLogoBlobID id, bool add); @@ -719,6 +730,8 @@ namespace NKikimr::NBlobDepot { void ExecuteConfirmGC(ui8 channel, ui32 groupId, std::vector<TLogoBlobID> trashDeleted, size_t index, TGenStep confirmedGenStep); + + void ExecuteHardGC(ui8 channel, ui32 groupId, TGenStep hardGenStep); }; Y_DECLARE_OPERATORS_FOR_FLAGS(TBlobDepot::TData::TScanFlags); diff --git a/ydb/core/blob_depot/data_gc.cpp b/ydb/core/blob_depot/data_gc.cpp index d4dc4251eb..bca035fbcb 100644 --- a/ydb/core/blob_depot/data_gc.cpp +++ b/ydb/core/blob_depot/data_gc.cpp @@ -91,4 +91,54 @@ namespace NKikimr::NBlobDepot { confirmedGenStep)); } + class TData::TTxHardGC : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { + const ui8 Channel; + const ui32 GroupId; + const TGenStep HardGenStep; + bool MoreToGo = false; + std::vector<TLogoBlobID> TrashDeleted; + + static constexpr ui32 MaxKeysToProcessAtOnce = 10'000; + + public: + TTxType GetTxType() const override { return NKikimrBlobDepot::TXTYPE_HARD_GC; } + + TTxHardGC(TBlobDepot *self, ui8 channel, ui32 groupId, TGenStep hardGenStep) + : TTransactionBase(self) + , Channel(channel) + , GroupId(groupId) + , HardGenStep(hardGenStep) + {} + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + NIceDb::TNiceDb db(txc.DB); + + std::function<bool(TLogoBlobID)> callback = [&](TLogoBlobID id) -> bool { + if (TrashDeleted.size() == MaxKeysToProcessAtOnce) { + MoreToGo = true; + return false; + } + db.Table<Schema::Trash>().Key(TKey(id).MakeBinaryKey()).Delete(); + TrashDeleted.push_back(id); + return true; + }; + Self->Data->CollectTrashByHardBarrier(Channel, GroupId, HardGenStep, callback); + + return true; + } + + void Complete(const TActorContext&) override { + Self->Data->TrimChannelHistory(Channel, GroupId, std::move(TrashDeleted)); + if (MoreToGo) { + Self->Data->ExecuteHardGC(Channel, GroupId, HardGenStep); + } else { + Self->Data->OnCommitHardGC(Channel, GroupId, HardGenStep); + } + } + }; + + void TData::ExecuteHardGC(ui8 channel, ui32 groupId, TGenStep hardGenStep) { + Self->Execute(std::make_unique<TTxHardGC>(Self, channel, groupId, hardGenStep)); + } + } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/data_trash.cpp b/ydb/core/blob_depot/data_trash.cpp index 79c56c784c..aac9ea9fe8 100644 --- a/ydb/core/blob_depot/data_trash.cpp +++ b/ydb/core/blob_depot/data_trash.cpp @@ -25,9 +25,42 @@ namespace NKikimr::NBlobDepot { const ui32 generation = Self->Executor()->Generation(); THashMap<ui32, std::unique_ptr<TEvBlobDepot::TEvPushNotify>> outbox; - Y_ABORT_UNLESS(!record.CollectGarbageRequestInFlight); - Y_ABORT_UNLESS(!record.Trash.empty()); - Y_ABORT_UNLESS(Loaded); // we must have correct Trash and Used values + Y_DEBUG_ABORT_UNLESS(!record.CollectGarbageRequestsInFlight); + Y_DEBUG_ABORT_UNLESS(record.Collectible(this)); + Y_DEBUG_ABORT_UNLESS(Loaded); // we must have correct Trash and Used values + + // check if we can issue a hard barrier + const TGenStep hardGenStep = record.GetHardGenStep(this); + Y_ABORT_UNLESS(record.HardGenStep <= hardGenStep); // ensure hard barrier does not decrease + if (record.HardGenStep < hardGenStep) { + auto ev = std::make_unique<TEvBlobStorage::TEvCollectGarbage>(Self->TabletID(), generation, + record.PerGenerationCounter++, record.Channel, true, hardGenStep.Generation(), hardGenStep.Step(), + nullptr, nullptr, TInstant::Max(), false /*isMultiCollectAllowed*/, true /*hard*/); + + std::optional<TLogoBlobID> minTrashId = record.Trash.empty() + ? std::nullopt + : std::make_optional(*record.Trash.begin()); + std::optional<TLogoBlobID> maxTrashId = record.Trash.empty() + ? std::nullopt + : std::make_optional(*--record.Trash.end()); + + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT85, "issuing hard barrier TEvCollectGarbage", (Id, Self->GetLogId()), + (Channel, int(record.Channel)), (GroupId, record.GroupId), (Msg, ev->ToString()), + (HardGenStep, hardGenStep), (MinTrashId, minTrashId), (MaxTrashId, maxTrashId)); + + ++record.CollectGarbageRequestsInFlight; + + const ui64 id = ++LastCollectCmdId; + const ui64 queryId = RandomNumber<ui64>(); + CollectCmds.emplace(id, TCollectCmd{.QueryId = queryId, .GroupId = record.GroupId, .Hard = true, + .GenStep = hardGenStep}); + + SendToBSProxy(Self->SelfId(), record.GroupId, ev.release(), id); + } + + if (record.Trash.empty()) { + return; // no trash to collect with soft barrier + } Y_ABORT_UNLESS(record.Channel < Self->Channels.size()); auto& channel = Self->Channels[record.Channel]; @@ -87,13 +120,17 @@ namespace NKikimr::NBlobDepot { TVector<TLogoBlobID> keep; TVector<TLogoBlobID> doNotKeep; + std::vector<TLogoBlobID> trashInFlight; for (auto it = record.Trash.begin(); it != trashEndIter; ++it) { - if (const TGenStep genStep(*it); genStep <= record.IssuedGenStep) { + if (const TGenStep genStep(*it); genStep <= hardGenStep) { + continue; // this blob will be deleted by hard barrier, no need for do-not-keep flag + } else if (genStep <= record.IssuedGenStep) { doNotKeep.push_back(*it); } else if (nextGenStep < genStep) { Y_ABORT(); } + trashInFlight.push_back(*it); } const TLogoBlobID keepFrom(Self->TabletID(), record.LastConfirmedGenStep.Generation(), @@ -101,12 +138,10 @@ namespace NKikimr::NBlobDepot { TLogoBlobID::MaxPartId, TLogoBlobID::MaxCrcMode); for (auto it = record.Used.upper_bound(keepFrom); it != record.Used.end() && TGenStep(*it) <= nextGenStep; ++it) { Y_ABORT_UNLESS(record.LastConfirmedGenStep < TGenStep(*it)); + Y_ABORT_UNLESS(hardGenStep < TGenStep(*it)); keep.push_back(*it); } - // trash items that will be deleted during this round - std::vector<TLogoBlobID> trashInFlight(record.Trash.begin(), trashEndIter); - const bool collect = nextGenStep > record.LastConfirmedGenStep; Y_ABORT_UNLESS(nextGenStep >= record.IssuedGenStep); @@ -123,7 +158,7 @@ namespace NKikimr::NBlobDepot { keep_.release(); doNotKeep_.release(); - record.CollectGarbageRequestInFlight = true; + ++record.CollectGarbageRequestsInFlight; record.PerGenerationCounter += ev->Collect; record.TrashInFlight.swap(trashInFlight); record.IssuedGenStep = nextGenStep; @@ -135,7 +170,7 @@ namespace NKikimr::NBlobDepot { const ui64 id = ++LastCollectCmdId; const ui64 queryId = RandomNumber<ui64>(); - CollectCmds.emplace(id, TCollectCmd{.QueryId = queryId, .GroupId = record.GroupId}); + CollectCmds.emplace(id, TCollectCmd{.QueryId = queryId, .GroupId = record.GroupId, .Hard = false}); if (IS_LOG_PRIORITY_ENABLED(NLog::PRI_TRACE, NKikimrServices::BLOB_DEPOT_EVENTS)) { if (ev->Keep) { @@ -184,24 +219,51 @@ namespace NKikimr::NBlobDepot { const ui32 groupId = info.GroupId; STLOG(PRI_DEBUG, BLOB_DEPOT, BDT12, "TEvCollectGarbageResult", (Id, Self->GetLogId()), - (Channel, ev->Get()->Channel), (GroupId, groupId), (Msg, ev->Get()->ToString())); + (Channel, ev->Get()->Channel), (GroupId, groupId), (Hard, info.Hard), (Msg, ev->Get()->ToString())); BDEV(BDEV03, "TrashManager_collectResult", (BDT, Self->TabletID()), (GroupId, groupId), (Channel, ev->Get()->Channel), (Q, info.QueryId), (Cookie, ev->Cookie), (Status, ev->Get()->Status), (ErrorReason, ev->Get()->ErrorReason)); TRecordsPerChannelGroup& record = GetRecordsPerChannelGroup(ev->Get()->Channel, groupId); + Y_ABORT_UNLESS(record.CollectGarbageRequestsInFlight); + if (ev->Get()->Status == NKikimrProto::OK) { - Y_ABORT_UNLESS(record.CollectGarbageRequestInFlight); - record.OnSuccessfulCollect(this); - ExecuteConfirmGC(record.Channel, record.GroupId, std::exchange(record.TrashInFlight, {}), 0, - record.LastConfirmedGenStep); + if (info.Hard) { + ExecuteHardGC(record.Channel, record.GroupId, info.GenStep); + } else { + record.OnSuccessfulCollect(this); + ExecuteConfirmGC(record.Channel, record.GroupId, std::exchange(record.TrashInFlight, {}), 0, + record.LastConfirmedGenStep); + } } else { - record.TrashInFlight.clear(); + if (!info.Hard) { + record.TrashInFlight.clear(); + } record.ClearInFlight(this); } } void TData::OnCommitConfirmedGC(ui8 channel, ui32 groupId, std::vector<TLogoBlobID> trashDeleted) { + TrimChannelHistory(channel, groupId, std::move(trashDeleted)); + TRecordsPerChannelGroup& record = GetRecordsPerChannelGroup(channel, groupId); + record.ClearInFlight(this); + } + + void TData::CollectTrashByHardBarrier(ui8 channel, ui32 groupId, TGenStep hardGenStep, + const std::function<bool(TLogoBlobID)>& callback) { + TRecordsPerChannelGroup& record = GetRecordsPerChannelGroup(channel, groupId); + for (auto it = record.Trash.begin(); it != record.Trash.end() && TGenStep(*it) <= hardGenStep && callback(*it); ) { + record.DeleteTrashRecord(this, it); + } + } + + void TData::OnCommitHardGC(ui8 channel, ui32 groupId, TGenStep hardGenStep) { + TRecordsPerChannelGroup& record = GetRecordsPerChannelGroup(channel, groupId); + record.ClearInFlight(this); + record.HardGenStep = hardGenStep; + } + + void TData::TrimChannelHistory(ui8 channel, ui32 groupId, std::vector<TLogoBlobID> trashDeleted) { TRecordsPerChannelGroup& record = GetRecordsPerChannelGroup(channel, groupId); const TTabletStorageInfo *info = Self->Info(); @@ -256,8 +318,6 @@ namespace NKikimr::NBlobDepot { } } } - - record.ClearInFlight(this); } } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/types.h b/ydb/core/blob_depot/types.h index 24082e56b1..7d03287094 100644 --- a/ydb/core/blob_depot/types.h +++ b/ydb/core/blob_depot/types.h @@ -235,6 +235,11 @@ namespace NKikimr::NBlobDepot { return s.Str(); } + TGenStep Previous() const { + Y_ABORT_UNLESS(Value); + return TGenStep(Value - 1); + } + friend bool operator ==(const TGenStep& x, const TGenStep& y) { return x.Value == y.Value; } friend bool operator !=(const TGenStep& x, const TGenStep& y) { return x.Value != y.Value; } friend bool operator < (const TGenStep& x, const TGenStep& y) { return x.Value < y.Value; } diff --git a/ydb/core/protos/counters_blob_depot.proto b/ydb/core/protos/counters_blob_depot.proto index bf79b1ff13..9b32efdbf9 100644 --- a/ydb/core/protos/counters_blob_depot.proto +++ b/ydb/core/protos/counters_blob_depot.proto @@ -60,4 +60,5 @@ enum ETxTypes { TXTYPE_COLLECT_GARBAGE = 13 [(NKikimr.TxTypeOpts) = {Name: "TTxCollectGarbage"}]; TXTYPE_COMMIT_BLOB_SEQ = 14 [(NKikimr.TxTypeOpts) = {Name: "TTxCommitBlobSeq"}]; TXTYPE_UPDATE_BLOCK = 15 [(NKikimr.TxTypeOpts) = {Name: "TTxUpdateBlock"}]; + TXTYPE_HARD_GC = 16 [(NKikimr.TxTypeOpts) = {Name: "TTxHardGC"}]; } |