aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-11-21 17:14:55 +0300
committeralexvru <alexvru@ydb.tech>2023-11-21 19:31:13 +0300
commit6eaac20f79fdd38e61456ad341898290054670dd (patch)
tree8a04342a17e37fb87b71556c07a0aef82b302fdf
parentc44e2073480bbdd2a095c19ffaaab1506961d51c (diff)
downloadydb-6eaac20f79fdd38e61456ad341898290054670dd.tar.gz
Issue hard barrier in BlobDepot KIKIMR-18366
-rw-r--r--ydb/core/blob_depot/data.cpp29
-rw-r--r--ydb/core/blob_depot/data.h15
-rw-r--r--ydb/core/blob_depot/data_gc.cpp50
-rw-r--r--ydb/core/blob_depot/data_trash.cpp94
-rw-r--r--ydb/core/blob_depot/types.h5
-rw-r--r--ydb/core/protos/counters_blob_depot.proto1
6 files changed, 169 insertions, 25 deletions
diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp
index 7d06dfd785..6325b1750b 100644
--- a/ydb/core/blob_depot/data.cpp
+++ b/ydb/core/blob_depot/data.cpp
@@ -569,30 +569,45 @@ namespace NKikimr::NBlobDepot {
for (const TLogoBlobID& id : TrashInFlight) {
for (; it != Trash.end() && *it < id; ++it) {}
Y_ABORT_UNLESS(it != Trash.end() && *it == id);
- it = Trash.erase(it);
- self->AccountBlob(id, false);
- self->TotalStoredTrashSize -= id.BlobSize();
- self->Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_TOTAL_STORED_TRASH_SIZE] = self->TotalStoredTrashSize;
+ DeleteTrashRecord(self, it);
}
LastConfirmedGenStep = IssuedGenStep;
}
+ void TData::TRecordsPerChannelGroup::DeleteTrashRecord(TData *self, std::set<TLogoBlobID>::iterator& it) {
+ self->AccountBlob(*it, false);
+ self->TotalStoredTrashSize -= it->BlobSize();
+ self->Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_TOTAL_STORED_TRASH_SIZE] = self->TotalStoredTrashSize;
+ it = Trash.erase(it);
+ }
+
void TData::TRecordsPerChannelGroup::OnLeastExpectedBlobIdChange(TData *self) {
CollectIfPossible(self);
}
void TData::TRecordsPerChannelGroup::ClearInFlight(TData *self) {
- Y_ABORT_UNLESS(CollectGarbageRequestInFlight);
- CollectGarbageRequestInFlight = false;
+ Y_ABORT_UNLESS(CollectGarbageRequestsInFlight);
+ --CollectGarbageRequestsInFlight;
CollectIfPossible(self);
}
void TData::TRecordsPerChannelGroup::CollectIfPossible(TData *self) {
- if (!CollectGarbageRequestInFlight && !Trash.empty() && self->Loaded) {
+ if (!CollectGarbageRequestsInFlight && Collectible(self) && self->Loaded) {
self->HandleTrash(*this);
}
}
+ bool TData::TRecordsPerChannelGroup::Collectible(TData *self) const {
+ return !Trash.empty() || HardGenStep < GetHardGenStep(self);
+ }
+
+ TGenStep TData::TRecordsPerChannelGroup::GetHardGenStep(TData *self) const {
+ const TGenStep genStep = Used.empty()
+ ? TGenStep(self->Self->Channels[Channel].GetLeastExpectedBlobId(self->Self->Executor()->Generation()))
+ : TGenStep(*Used.begin());
+ return genStep.Previous();
+ }
+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
bool TData::BeginCommittingBlobSeqId(TAgent& agent, TBlobSeqId blobSeqId) {
diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h
index 4f77b28391..2d7c4fc6a8 100644
--- a/ydb/core/blob_depot/data.h
+++ b/ydb/core/blob_depot/data.h
@@ -397,7 +397,8 @@ namespace NKikimr::NBlobDepot {
ui32 PerGenerationCounter = 1;
TGenStep IssuedGenStep; // currently in flight or already confirmed
TGenStep LastConfirmedGenStep;
- bool CollectGarbageRequestInFlight = false;
+ TGenStep HardGenStep; // last sucessfully confirmed (non-persistent value)
+ ui32 CollectGarbageRequestsInFlight = 0;
TRecordsPerChannelGroup(ui8 channel, ui32 groupId)
: Channel(channel)
@@ -406,9 +407,12 @@ namespace NKikimr::NBlobDepot {
void MoveToTrash(TData *self, TLogoBlobID id);
void OnSuccessfulCollect(TData *self);
+ void DeleteTrashRecord(TData *self, std::set<TLogoBlobID>::iterator& it);
void OnLeastExpectedBlobIdChange(TData *self);
void ClearInFlight(TData *self);
void CollectIfPossible(TData *self);
+ bool Collectible(TData *self) const;
+ TGenStep GetHardGenStep(TData *self) const;
};
bool Loaded = false;
@@ -428,6 +432,7 @@ namespace NKikimr::NBlobDepot {
class TTxIssueGC;
class TTxConfirmGC;
+ class TTxHardGC;
class TTxDataLoad;
@@ -444,6 +449,8 @@ namespace NKikimr::NBlobDepot {
struct TCollectCmd {
ui64 QueryId;
ui32 GroupId;
+ bool Hard;
+ TGenStep GenStep;
};
ui64 LastCollectCmdId = 0;
std::unordered_map<ui64, TCollectCmd> CollectCmds;
@@ -647,6 +654,10 @@ namespace NKikimr::NBlobDepot {
void OnCommitConfirmedGC(ui8 channel, ui32 groupId, std::vector<TLogoBlobID> trashDeleted);
bool OnBarrierShift(ui64 tabletId, ui8 channel, bool hard, TGenStep previous, TGenStep current, ui32& maxItems,
NTabletFlatExecutor::TTransactionContext& txc, void *cookie);
+ void CollectTrashByHardBarrier(ui8 channel, ui32 groupId, TGenStep hardGenStep,
+ const std::function<bool(TLogoBlobID)>& callback);
+ void OnCommitHardGC(ui8 channel, ui32 groupId, TGenStep hardGenStep);
+ void TrimChannelHistory(ui8 channel, ui32 groupId, std::vector<TLogoBlobID> trashDeleted);
void AddFirstMentionedBlob(TLogoBlobID id);
void AccountBlob(TLogoBlobID id, bool add);
@@ -719,6 +730,8 @@ namespace NKikimr::NBlobDepot {
void ExecuteConfirmGC(ui8 channel, ui32 groupId, std::vector<TLogoBlobID> trashDeleted, size_t index,
TGenStep confirmedGenStep);
+
+ void ExecuteHardGC(ui8 channel, ui32 groupId, TGenStep hardGenStep);
};
Y_DECLARE_OPERATORS_FOR_FLAGS(TBlobDepot::TData::TScanFlags);
diff --git a/ydb/core/blob_depot/data_gc.cpp b/ydb/core/blob_depot/data_gc.cpp
index d4dc4251eb..bca035fbcb 100644
--- a/ydb/core/blob_depot/data_gc.cpp
+++ b/ydb/core/blob_depot/data_gc.cpp
@@ -91,4 +91,54 @@ namespace NKikimr::NBlobDepot {
confirmedGenStep));
}
+ class TData::TTxHardGC : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
+ const ui8 Channel;
+ const ui32 GroupId;
+ const TGenStep HardGenStep;
+ bool MoreToGo = false;
+ std::vector<TLogoBlobID> TrashDeleted;
+
+ static constexpr ui32 MaxKeysToProcessAtOnce = 10'000;
+
+ public:
+ TTxType GetTxType() const override { return NKikimrBlobDepot::TXTYPE_HARD_GC; }
+
+ TTxHardGC(TBlobDepot *self, ui8 channel, ui32 groupId, TGenStep hardGenStep)
+ : TTransactionBase(self)
+ , Channel(channel)
+ , GroupId(groupId)
+ , HardGenStep(hardGenStep)
+ {}
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ NIceDb::TNiceDb db(txc.DB);
+
+ std::function<bool(TLogoBlobID)> callback = [&](TLogoBlobID id) -> bool {
+ if (TrashDeleted.size() == MaxKeysToProcessAtOnce) {
+ MoreToGo = true;
+ return false;
+ }
+ db.Table<Schema::Trash>().Key(TKey(id).MakeBinaryKey()).Delete();
+ TrashDeleted.push_back(id);
+ return true;
+ };
+ Self->Data->CollectTrashByHardBarrier(Channel, GroupId, HardGenStep, callback);
+
+ return true;
+ }
+
+ void Complete(const TActorContext&) override {
+ Self->Data->TrimChannelHistory(Channel, GroupId, std::move(TrashDeleted));
+ if (MoreToGo) {
+ Self->Data->ExecuteHardGC(Channel, GroupId, HardGenStep);
+ } else {
+ Self->Data->OnCommitHardGC(Channel, GroupId, HardGenStep);
+ }
+ }
+ };
+
+ void TData::ExecuteHardGC(ui8 channel, ui32 groupId, TGenStep hardGenStep) {
+ Self->Execute(std::make_unique<TTxHardGC>(Self, channel, groupId, hardGenStep));
+ }
+
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/data_trash.cpp b/ydb/core/blob_depot/data_trash.cpp
index 79c56c784c..aac9ea9fe8 100644
--- a/ydb/core/blob_depot/data_trash.cpp
+++ b/ydb/core/blob_depot/data_trash.cpp
@@ -25,9 +25,42 @@ namespace NKikimr::NBlobDepot {
const ui32 generation = Self->Executor()->Generation();
THashMap<ui32, std::unique_ptr<TEvBlobDepot::TEvPushNotify>> outbox;
- Y_ABORT_UNLESS(!record.CollectGarbageRequestInFlight);
- Y_ABORT_UNLESS(!record.Trash.empty());
- Y_ABORT_UNLESS(Loaded); // we must have correct Trash and Used values
+ Y_DEBUG_ABORT_UNLESS(!record.CollectGarbageRequestsInFlight);
+ Y_DEBUG_ABORT_UNLESS(record.Collectible(this));
+ Y_DEBUG_ABORT_UNLESS(Loaded); // we must have correct Trash and Used values
+
+ // check if we can issue a hard barrier
+ const TGenStep hardGenStep = record.GetHardGenStep(this);
+ Y_ABORT_UNLESS(record.HardGenStep <= hardGenStep); // ensure hard barrier does not decrease
+ if (record.HardGenStep < hardGenStep) {
+ auto ev = std::make_unique<TEvBlobStorage::TEvCollectGarbage>(Self->TabletID(), generation,
+ record.PerGenerationCounter++, record.Channel, true, hardGenStep.Generation(), hardGenStep.Step(),
+ nullptr, nullptr, TInstant::Max(), false /*isMultiCollectAllowed*/, true /*hard*/);
+
+ std::optional<TLogoBlobID> minTrashId = record.Trash.empty()
+ ? std::nullopt
+ : std::make_optional(*record.Trash.begin());
+ std::optional<TLogoBlobID> maxTrashId = record.Trash.empty()
+ ? std::nullopt
+ : std::make_optional(*--record.Trash.end());
+
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT85, "issuing hard barrier TEvCollectGarbage", (Id, Self->GetLogId()),
+ (Channel, int(record.Channel)), (GroupId, record.GroupId), (Msg, ev->ToString()),
+ (HardGenStep, hardGenStep), (MinTrashId, minTrashId), (MaxTrashId, maxTrashId));
+
+ ++record.CollectGarbageRequestsInFlight;
+
+ const ui64 id = ++LastCollectCmdId;
+ const ui64 queryId = RandomNumber<ui64>();
+ CollectCmds.emplace(id, TCollectCmd{.QueryId = queryId, .GroupId = record.GroupId, .Hard = true,
+ .GenStep = hardGenStep});
+
+ SendToBSProxy(Self->SelfId(), record.GroupId, ev.release(), id);
+ }
+
+ if (record.Trash.empty()) {
+ return; // no trash to collect with soft barrier
+ }
Y_ABORT_UNLESS(record.Channel < Self->Channels.size());
auto& channel = Self->Channels[record.Channel];
@@ -87,13 +120,17 @@ namespace NKikimr::NBlobDepot {
TVector<TLogoBlobID> keep;
TVector<TLogoBlobID> doNotKeep;
+ std::vector<TLogoBlobID> trashInFlight;
for (auto it = record.Trash.begin(); it != trashEndIter; ++it) {
- if (const TGenStep genStep(*it); genStep <= record.IssuedGenStep) {
+ if (const TGenStep genStep(*it); genStep <= hardGenStep) {
+ continue; // this blob will be deleted by hard barrier, no need for do-not-keep flag
+ } else if (genStep <= record.IssuedGenStep) {
doNotKeep.push_back(*it);
} else if (nextGenStep < genStep) {
Y_ABORT();
}
+ trashInFlight.push_back(*it);
}
const TLogoBlobID keepFrom(Self->TabletID(), record.LastConfirmedGenStep.Generation(),
@@ -101,12 +138,10 @@ namespace NKikimr::NBlobDepot {
TLogoBlobID::MaxPartId, TLogoBlobID::MaxCrcMode);
for (auto it = record.Used.upper_bound(keepFrom); it != record.Used.end() && TGenStep(*it) <= nextGenStep; ++it) {
Y_ABORT_UNLESS(record.LastConfirmedGenStep < TGenStep(*it));
+ Y_ABORT_UNLESS(hardGenStep < TGenStep(*it));
keep.push_back(*it);
}
- // trash items that will be deleted during this round
- std::vector<TLogoBlobID> trashInFlight(record.Trash.begin(), trashEndIter);
-
const bool collect = nextGenStep > record.LastConfirmedGenStep;
Y_ABORT_UNLESS(nextGenStep >= record.IssuedGenStep);
@@ -123,7 +158,7 @@ namespace NKikimr::NBlobDepot {
keep_.release();
doNotKeep_.release();
- record.CollectGarbageRequestInFlight = true;
+ ++record.CollectGarbageRequestsInFlight;
record.PerGenerationCounter += ev->Collect;
record.TrashInFlight.swap(trashInFlight);
record.IssuedGenStep = nextGenStep;
@@ -135,7 +170,7 @@ namespace NKikimr::NBlobDepot {
const ui64 id = ++LastCollectCmdId;
const ui64 queryId = RandomNumber<ui64>();
- CollectCmds.emplace(id, TCollectCmd{.QueryId = queryId, .GroupId = record.GroupId});
+ CollectCmds.emplace(id, TCollectCmd{.QueryId = queryId, .GroupId = record.GroupId, .Hard = false});
if (IS_LOG_PRIORITY_ENABLED(NLog::PRI_TRACE, NKikimrServices::BLOB_DEPOT_EVENTS)) {
if (ev->Keep) {
@@ -184,24 +219,51 @@ namespace NKikimr::NBlobDepot {
const ui32 groupId = info.GroupId;
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT12, "TEvCollectGarbageResult", (Id, Self->GetLogId()),
- (Channel, ev->Get()->Channel), (GroupId, groupId), (Msg, ev->Get()->ToString()));
+ (Channel, ev->Get()->Channel), (GroupId, groupId), (Hard, info.Hard), (Msg, ev->Get()->ToString()));
BDEV(BDEV03, "TrashManager_collectResult", (BDT, Self->TabletID()), (GroupId, groupId), (Channel, ev->Get()->Channel),
(Q, info.QueryId), (Cookie, ev->Cookie), (Status, ev->Get()->Status), (ErrorReason, ev->Get()->ErrorReason));
TRecordsPerChannelGroup& record = GetRecordsPerChannelGroup(ev->Get()->Channel, groupId);
+ Y_ABORT_UNLESS(record.CollectGarbageRequestsInFlight);
+
if (ev->Get()->Status == NKikimrProto::OK) {
- Y_ABORT_UNLESS(record.CollectGarbageRequestInFlight);
- record.OnSuccessfulCollect(this);
- ExecuteConfirmGC(record.Channel, record.GroupId, std::exchange(record.TrashInFlight, {}), 0,
- record.LastConfirmedGenStep);
+ if (info.Hard) {
+ ExecuteHardGC(record.Channel, record.GroupId, info.GenStep);
+ } else {
+ record.OnSuccessfulCollect(this);
+ ExecuteConfirmGC(record.Channel, record.GroupId, std::exchange(record.TrashInFlight, {}), 0,
+ record.LastConfirmedGenStep);
+ }
} else {
- record.TrashInFlight.clear();
+ if (!info.Hard) {
+ record.TrashInFlight.clear();
+ }
record.ClearInFlight(this);
}
}
void TData::OnCommitConfirmedGC(ui8 channel, ui32 groupId, std::vector<TLogoBlobID> trashDeleted) {
+ TrimChannelHistory(channel, groupId, std::move(trashDeleted));
+ TRecordsPerChannelGroup& record = GetRecordsPerChannelGroup(channel, groupId);
+ record.ClearInFlight(this);
+ }
+
+ void TData::CollectTrashByHardBarrier(ui8 channel, ui32 groupId, TGenStep hardGenStep,
+ const std::function<bool(TLogoBlobID)>& callback) {
+ TRecordsPerChannelGroup& record = GetRecordsPerChannelGroup(channel, groupId);
+ for (auto it = record.Trash.begin(); it != record.Trash.end() && TGenStep(*it) <= hardGenStep && callback(*it); ) {
+ record.DeleteTrashRecord(this, it);
+ }
+ }
+
+ void TData::OnCommitHardGC(ui8 channel, ui32 groupId, TGenStep hardGenStep) {
+ TRecordsPerChannelGroup& record = GetRecordsPerChannelGroup(channel, groupId);
+ record.ClearInFlight(this);
+ record.HardGenStep = hardGenStep;
+ }
+
+ void TData::TrimChannelHistory(ui8 channel, ui32 groupId, std::vector<TLogoBlobID> trashDeleted) {
TRecordsPerChannelGroup& record = GetRecordsPerChannelGroup(channel, groupId);
const TTabletStorageInfo *info = Self->Info();
@@ -256,8 +318,6 @@ namespace NKikimr::NBlobDepot {
}
}
}
-
- record.ClearInFlight(this);
}
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/types.h b/ydb/core/blob_depot/types.h
index 24082e56b1..7d03287094 100644
--- a/ydb/core/blob_depot/types.h
+++ b/ydb/core/blob_depot/types.h
@@ -235,6 +235,11 @@ namespace NKikimr::NBlobDepot {
return s.Str();
}
+ TGenStep Previous() const {
+ Y_ABORT_UNLESS(Value);
+ return TGenStep(Value - 1);
+ }
+
friend bool operator ==(const TGenStep& x, const TGenStep& y) { return x.Value == y.Value; }
friend bool operator !=(const TGenStep& x, const TGenStep& y) { return x.Value != y.Value; }
friend bool operator < (const TGenStep& x, const TGenStep& y) { return x.Value < y.Value; }
diff --git a/ydb/core/protos/counters_blob_depot.proto b/ydb/core/protos/counters_blob_depot.proto
index bf79b1ff13..9b32efdbf9 100644
--- a/ydb/core/protos/counters_blob_depot.proto
+++ b/ydb/core/protos/counters_blob_depot.proto
@@ -60,4 +60,5 @@ enum ETxTypes {
TXTYPE_COLLECT_GARBAGE = 13 [(NKikimr.TxTypeOpts) = {Name: "TTxCollectGarbage"}];
TXTYPE_COMMIT_BLOB_SEQ = 14 [(NKikimr.TxTypeOpts) = {Name: "TTxCommitBlobSeq"}];
TXTYPE_UPDATE_BLOCK = 15 [(NKikimr.TxTypeOpts) = {Name: "TTxUpdateBlock"}];
+ TXTYPE_HARD_GC = 16 [(NKikimr.TxTypeOpts) = {Name: "TTxHardGC"}];
}