diff options
author | alexvru <alexvru@ydb.tech> | 2022-07-13 22:12:17 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2022-07-13 22:12:17 +0300 |
commit | bd3fd08805f8f6a5d618885c38f57f1d498d682f (patch) | |
tree | 1c9b24b140238eee5b187c07b07249f46b9d88c1 | |
parent | a3311ac7b9ea1ade9487351839b35bfab06583d3 (diff) | |
download | ydb-bd3fd08805f8f6a5d618885c38f57f1d498d682f.tar.gz |
BlobDepot work in progress
-rw-r--r-- | ydb/core/blob_depot/agent.cpp | 25 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/agent_impl.h | 2 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/read.cpp | 19 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/request.cpp | 27 | ||||
-rw-r--r-- | ydb/core/blob_depot/data.cpp | 95 | ||||
-rw-r--r-- | ydb/core/blob_depot/data.h | 9 | ||||
-rw-r--r-- | ydb/core/blob_depot/garbage_collection.cpp | 88 | ||||
-rw-r--r-- | ydb/core/blob_depot/garbage_collection.h | 15 | ||||
-rw-r--r-- | ydb/core/blob_depot/given_id_range.cpp | 48 | ||||
-rw-r--r-- | ydb/core/blob_depot/mon_main.cpp | 10 | ||||
-rw-r--r-- | ydb/core/blob_depot/op_commit_blob_seq.cpp | 24 | ||||
-rw-r--r-- | ydb/core/blob_depot/op_load.cpp | 21 | ||||
-rw-r--r-- | ydb/core/blob_depot/schema.h | 19 | ||||
-rw-r--r-- | ydb/core/blob_depot/types.h | 68 |
14 files changed, 322 insertions, 148 deletions
diff --git a/ydb/core/blob_depot/agent.cpp b/ydb/core/blob_depot/agent.cpp index 7cecc58d313..68de7977909 100644 --- a/ydb/core/blob_depot/agent.cpp +++ b/ydb/core/blob_depot/agent.cpp @@ -4,13 +4,15 @@ namespace NKikimr::NBlobDepot { void TBlobDepot::Handle(TEvTabletPipe::TEvServerConnected::TPtr ev) { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT01, "TEvServerConnected", (TabletId, TabletID()), (Msg, ev->Get()->ToString())); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT01, "TEvServerConnected", (TabletId, TabletID()), + (PipeServerId, ev->Get()->ServerId)); const auto [it, inserted] = PipeServerToNode.emplace(ev->Get()->ServerId, std::nullopt); Y_VERIFY(inserted); } void TBlobDepot::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr ev) { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT02, "TEvServerDisconnected", (TabletId, TabletID()), (Msg, ev->Get()->ToString())); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT02, "TEvServerDisconnected", (TabletId, TabletID()), + (PipeServerId, ev->Get()->ServerId)); const auto it = PipeServerToNode.find(ev->Get()->ServerId); Y_VERIFY(it != PipeServerToNode.end()); if (const auto& nodeId = it->second) { @@ -31,17 +33,18 @@ namespace NKikimr::NBlobDepot { } void TBlobDepot::Handle(TEvBlobDepot::TEvRegisterAgent::TPtr ev) { + const ui32 nodeId = ev->Sender.NodeId(); + const TActorId& pipeServerId = ev->Recipient; const auto& req = ev->Get()->Record; + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT03, "TEvRegisterAgent", (TabletId, TabletID()), (Msg, req), (NodeId, nodeId), + (PipeServerId, pipeServerId)); - const auto it = PipeServerToNode.find(ev->Recipient); + const auto it = PipeServerToNode.find(pipeServerId); Y_VERIFY(it != PipeServerToNode.end()); - const ui32 nodeId = ev->Sender.NodeId(); Y_VERIFY(!it->second || *it->second == nodeId); it->second = nodeId; auto& agent = Agents[nodeId]; - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT03, "TEvRegisterAgent", (TabletId, TabletID()), (Msg, req), (NodeId, nodeId), - (PipeServerId, it->first)); - agent.ConnectedAgent = it->first; + agent.ConnectedAgent = pipeServerId; agent.ConnectedNodeId = nodeId; agent.ExpirationTimestamp = TInstant::Max(); @@ -125,6 +128,9 @@ namespace NKikimr::NBlobDepot { for (const auto& range : givenIdRange->GetChannelRanges()) { agent.GivenIdRanges[range.GetChannel()].IssueNewRange(range.GetBegin(), range.GetEnd()); Channels[range.GetChannel()].GivenIdRanges.IssueNewRange(range.GetBegin(), range.GetEnd()); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT99, "IssueNewRange", (TabletId, TabletID()), + (AgentId, agent.ConnectedNodeId), (Channel, range.GetChannel()), + (Begin, range.GetBegin()), (End, range.GetEnd())); } } @@ -149,6 +155,11 @@ namespace NKikimr::NBlobDepot { void TBlobDepot::ResetAgent(TAgent& agent) { for (auto& [channel, agentGivenIdRange] : agent.GivenIdRanges) { Channels[channel].GivenIdRanges.Subtract(agentGivenIdRange); + const ui32 channel_ = channel; + const auto& agentGivenIdRange_ = agentGivenIdRange; + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT99, "ResetAgent", (TabletId, TabletID()), (AgentId, agent.ConnectedNodeId), + (Channel, channel_), (GivenIdRanges, Channels[channel_].GivenIdRanges), + (Agent.GivenIdRanges, agentGivenIdRange_)); agentGivenIdRange = {}; } Data->HandleTrash(); diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h index dc3068976d9..92984f132c6 100644 --- a/ydb/core/blob_depot/agent/agent_impl.h +++ b/ydb/core/blob_depot/agent/agent_impl.h @@ -61,6 +61,8 @@ namespace NKikimr::NBlobDepot { TEvBlobStorage::TEvPutResult* >; + static TString ToString(const TResponse& response); + public: TRequestSender(TBlobDepotAgent& agent); virtual ~TRequestSender(); diff --git a/ydb/core/blob_depot/agent/read.cpp b/ydb/core/blob_depot/agent/read.cpp index 941a9258328..1d61329ae3d 100644 --- a/ydb/core/blob_depot/agent/read.cpp +++ b/ydb/core/blob_depot/agent/read.cpp @@ -58,14 +58,17 @@ namespace NKikimr::NBlobDepot { return false; } - const ui64 partLen = end - begin; + // calculate the whole length of current part + ui64 partLen = end - begin; if (offset >= partLen) { // just skip this part offset -= partLen; continue; } - const ui64 partSize = Min(size ? size : Max<ui64>(), partLen - offset); + // adjust it to fit size and offset + partLen = Min(size ? size : Max<ui64>(), partLen - offset); + Y_VERIFY(partLen); auto blobSeqId = TBlobSeqId::FromProto(locator.GetBlobSeqId()); @@ -75,19 +78,20 @@ namespace NKikimr::NBlobDepot { const ui32 blobSize = totalDataLen + (composite ? sizeof(TVirtualGroupBlobFooter) : 0); const auto id = blobSeqId.MakeBlobId(TabletId, type, 0, blobSize); items.push_back(TReadItem{locator.GetGroupId(), id, static_cast<ui32>(offset + begin), - static_cast<ui32>(partSize), outputOffset}); + static_cast<ui32>(partLen), outputOffset}); } else { Y_FAIL(); } + outputOffset += partLen; + offset = 0; + if (size) { - size -= partSize; + size -= partLen; if (!size) { break; } } - offset = 0; - outputOffset += partSize; } if (size) { @@ -130,7 +134,8 @@ namespace NKikimr::NBlobDepot { readContext.ReadOffsets.erase(it); for (const ui64 offset : v) { - Y_VERIFY(offset + blob.Buffer.size() <= readContext.Size); + Y_VERIFY_S(offset + blob.Buffer.size() <= readContext.Size, "offset# " << offset << " Buffer.size# " + << blob.Buffer.size() << " Size# " << readContext.Size); if (!readContext.Buffer && !offset) { readContext.Buffer = std::move(blob.Buffer); readContext.Buffer.resize(readContext.Size); diff --git a/ydb/core/blob_depot/agent/request.cpp b/ydb/core/blob_depot/agent/request.cpp index 3c7fe68706a..72adcaa0906 100644 --- a/ydb/core/blob_depot/agent/request.cpp +++ b/ydb/core/blob_depot/agent/request.cpp @@ -44,6 +44,20 @@ namespace NKikimr::NBlobDepot { ProcessResponse(id, std::move(context), std::move(response)); } + TString TRequestSender::ToString(const TResponse& response) { + auto printer = [](auto& value) -> TString { + using T = std::decay_t<decltype(value)>; + if constexpr (std::is_same_v<T, TTabletDisconnected>) { + return "TTabletDisconnected"; + } else if constexpr (std::is_same_v<T, TKeyResolved>) { + return "TKeyResolved"; + } else { + return value->ToString(); + } + }; + return std::visit(printer, response); + } + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // TBlobDepotAgent machinery @@ -59,8 +73,7 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::HandleTabletResponse(TAutoPtr<TEventHandle<TEvent>> ev) { STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA15, "HandleTabletResponse", (VirtualGroupId, VirtualGroupId), (Id, ev->Cookie), (Type, TypeName<TEvent>())); - auto *event = ev->Get(); - OnRequestComplete(ev->Cookie, event, TabletRequestInFlight); + OnRequestComplete(ev->Cookie, ev->Get(), TabletRequestInFlight); } template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvRegisterAgentResult::TPtr ev); @@ -75,8 +88,7 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::HandleOtherResponse(TAutoPtr<TEventHandle<TEvent>> ev) { STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA16, "HandleOtherResponse", (VirtualGroupId, VirtualGroupId), (Id, ev->Cookie), (Type, TypeName<TEvent>())); - auto *event = ev->Get(); - OnRequestComplete(ev->Cookie, event, OtherRequestInFlight); + OnRequestComplete(ev->Cookie, ev->Get(), OtherRequestInFlight); } template void TBlobDepotAgent::HandleOtherResponse(TEvBlobStorage::TEvGetResult::TPtr ev); @@ -84,10 +96,11 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::OnRequestComplete(ui64 id, TResponse response, TRequestsInFlight& map) { const auto it = map.find(id); - Y_VERIFY(it != map.end()); - auto& [_, request] = *it; - request.Sender->OnRequestComplete(id, std::move(response)); + Y_VERIFY_S(it != map.end(), "id# " << id << " response# " << TRequestSender::ToString(response)); + TRequestInFlight request = std::move(it->second); map.erase(it); + + request.Sender->OnRequestComplete(id, std::move(response)); } } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp index 43b4d5ceee1..b0d58eddae0 100644 --- a/ydb/core/blob_depot/data.cpp +++ b/ydb/core/blob_depot/data.cpp @@ -3,17 +3,43 @@ namespace NKikimr::NBlobDepot { + class TBlobDepot::TData::TTxIssueGC : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { + const ui8 Channel; + const ui32 GroupId; + const TGenStep IssuedGenStep; + std::unique_ptr<TEvBlobStorage::TEvCollectGarbage> CollectGarbage; + + public: + TTxIssueGC(TBlobDepot *self, ui8 channel, ui32 groupId, TGenStep issuedGenStep, + std::unique_ptr<TEvBlobStorage::TEvCollectGarbage> collectGarbage) + : TTransactionBase(self) + , Channel(channel) + , GroupId(groupId) + , IssuedGenStep(issuedGenStep) + , CollectGarbage(std::move(collectGarbage)) + {} + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + NIceDb::TNiceDb db(txc.DB); + db.Table<Schema::GC>().Key(Channel, GroupId).Update<Schema::GC::IssuedGenStep>(ui64(IssuedGenStep)); + return true; + } + + void Complete(const TActorContext&) override { + SendToBSProxy(Self->SelfId(), GroupId, CollectGarbage.release(), GroupId); + } + }; + class TBlobDepot::TData::TTxConfirmGC : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { const ui8 Channel; const ui32 GroupId; std::vector<TLogoBlobID> TrashDeleted; - const ui64 ConfirmedGenStep; + const TGenStep ConfirmedGenStep; static constexpr ui32 MaxKeysToProcessAtOnce = 10'000; public: - TTxConfirmGC(TBlobDepot *self, ui8 channel, ui32 groupId, std::vector<TLogoBlobID> trashDeleted, - ui64 confirmedGenStep) + TTxConfirmGC(TBlobDepot *self, ui8 channel, ui32 groupId, std::vector<TLogoBlobID> trashDeleted, TGenStep confirmedGenStep) : TTransactionBase(self) , Channel(channel) , GroupId(groupId) @@ -29,8 +55,7 @@ namespace NKikimr::NBlobDepot { } if (TrashDeleted.size() <= MaxKeysToProcessAtOnce) { TrashDeleted.clear(); - db.Table<Schema::ConfirmedGC>().Key(Channel, GroupId).Update<Schema::ConfirmedGC::ConfirmedGenStep>( - ConfirmedGenStep); + db.Table<Schema::GC>().Key(Channel, GroupId).Update<Schema::GC::ConfirmedGenStep>(ui64(ConfirmedGenStep)); } else { std::vector<TLogoBlobID> temp; temp.insert(temp.end(), TrashDeleted.begin() + MaxKeysToProcessAtOnce, TrashDeleted.end()); @@ -81,10 +106,11 @@ namespace NKikimr::NBlobDepot { OnTrashInserted(record); } - void TBlobDepot::TData::AddConfirmedGenStepOnLoad(ui8 channel, ui32 groupId, ui64 confirmedGenStep) { + void TBlobDepot::TData::AddGenStepOnLoad(ui8 channel, ui32 groupId, TGenStep issuedGenStep, TGenStep confirmedGenStep) { const auto& key = std::make_tuple(Self->TabletID(), channel, groupId); const auto [it, _] = RecordsPerChannelGroup.try_emplace(key, Self->TabletID(), channel, groupId); auto& record = it->second; + record.IssuedGenStep = issuedGenStep; record.LastConfirmedGenStep = confirmedGenStep; } @@ -176,7 +202,7 @@ namespace NKikimr::NBlobDepot { Y_VERIFY(record.TabletId == Self->TabletID()); Y_VERIFY(!record.Trash.empty()); - ui64 nextGenStep = GenStep(*--record.Trash.end()); + TGenStep nextGenStep(*--record.Trash.end()); Y_VERIFY(record.Channel < Self->Channels.size()); auto& channel = Self->Channels[record.Channel]; @@ -187,15 +213,15 @@ namespace NKikimr::NBlobDepot { channel.GivenIdRanges.GetMinimumValue()); // step we are going to invalidate (including blobs with this one) - const ui32 invalidatedStep = ui32(nextGenStep); + const ui32 invalidatedStep = nextGenStep.Step(); if (minBlobSeqId.Step <= invalidatedStep) { const TLogoBlobID maxId(record.TabletId, generation, minBlobSeqId.Step, record.Channel, 0, 0); const auto it = record.Trash.lower_bound(maxId); if (it != record.Trash.begin()) { - nextGenStep = GenStep(*std::prev(it)); + nextGenStep = TGenStep(*std::prev(it)); } else { - nextGenStep = 0; + nextGenStep = {}; } // issue notifications to agents @@ -224,14 +250,14 @@ namespace NKikimr::NBlobDepot { auto doNotKeep = std::make_unique<TVector<TLogoBlobID>>(); // FIXME: check for blob leaks when LastConfirmedGenStep is not properly persisted - for (auto it = record.Trash.begin(); it != record.Trash.end() && GenStep(*it) <= record.LastConfirmedGenStep; ++it) { + for (auto it = record.Trash.begin(); it != record.Trash.end() && TGenStep(*it) <= record.LastConfirmedGenStep; ++it) { doNotKeep->push_back(*it); } // FIXME: check for blob loss when LastConfirmedGenStep is not properly persisted - const TLogoBlobID keepFrom(record.TabletId, ui32(record.LastConfirmedGenStep >> 32), - ui32(record.LastConfirmedGenStep), record.Channel, 0, 0); - for (auto it = record.Used.upper_bound(keepFrom); it != record.Used.end() && GenStep(*it) <= nextGenStep; ++it) { + const TLogoBlobID keepFrom(record.TabletId, record.LastConfirmedGenStep.Generation(), + record.LastConfirmedGenStep.Step(), record.Channel, 0, 0); + for (auto it = record.Used.upper_bound(keepFrom); it != record.Used.end() && TGenStep(*it) <= nextGenStep; ++it) { keep->push_back(*it); } @@ -248,7 +274,7 @@ namespace NKikimr::NBlobDepot { } auto ev = std::make_unique<TEvBlobStorage::TEvCollectGarbage>(record.TabletId, generation, - record.PerGenerationCounter, record.Channel, collect, ui32(nextGenStep >> 32), ui32(nextGenStep), + record.PerGenerationCounter, record.Channel, collect, nextGenStep.Generation(), nextGenStep.Step(), keep.get(), doNotKeep.get(), TInstant::Max(), true); keep.release(); doNotKeep.release(); @@ -256,22 +282,27 @@ namespace NKikimr::NBlobDepot { record.CollectGarbageRequestInFlight = true; record.PerGenerationCounter += ev->Collect ? ev->PerGenerationCounterStepSize() : 0; record.TrashInFlight.insert(record.TrashInFlight.end(), record.Trash.begin(), record.Trash.end()); - record.NextGenStep = Max(nextGenStep, record.LastConfirmedGenStep); + record.IssuedGenStep = Max(nextGenStep, record.LastConfirmedGenStep); auto blobSeqId = TBlobSeqId::FromSequentalNumber(record.Channel, generation, channel.NextBlobSeqId); - Y_VERIFY(record.LastConfirmedGenStep < GenStep(blobSeqId)); - if (GenStep(blobSeqId) <= nextGenStep) { - blobSeqId.Step = ui32(nextGenStep) + 1; + Y_VERIFY(record.LastConfirmedGenStep < TGenStep(blobSeqId)); + if (TGenStep(blobSeqId) <= nextGenStep) { + blobSeqId.Step = nextGenStep.Step() + 1; blobSeqId.Index = 0; channel.NextBlobSeqId = blobSeqId.ToSequentialNumber(); } STLOG(PRI_DEBUG, BLOB_DEPOT, BDT09, "issuing TEvCollectGarbage", (TabletId, Self->TabletID()), (Channel, record.Channel), (GroupId, record.GroupId), (Msg, ev->ToString()), - (LastConfirmedGenStep, record.LastConfirmedGenStep), (NextGenStep, record.NextGenStep), + (LastConfirmedGenStep, record.LastConfirmedGenStep), (IssuedGenStep, record.IssuedGenStep), (TrashInFlight.size, record.TrashInFlight.size())); - SendToBSProxy(Self->SelfId(), record.GroupId, ev.release(), record.GroupId); + if (collect) { + Self->Execute(std::make_unique<TTxIssueGC>(Self, record.Channel, record.GroupId, record.IssuedGenStep, + std::move(ev))); + } else { + SendToBSProxy(Self->SelfId(), record.GroupId, ev.release(), record.GroupId); + } } RecordsWithTrash.Clear(); @@ -301,7 +332,7 @@ namespace NKikimr::NBlobDepot { for (const TLogoBlobID& id : record.TrashInFlight) { // make it merge record.Trash.erase(id); } - record.LastConfirmedGenStep = record.NextGenStep; + record.LastConfirmedGenStep = record.IssuedGenStep; Self->Execute(std::make_unique<TTxConfirmGC>(Self, record.Channel, record.GroupId, std::exchange(record.TrashInFlight, {}), record.LastConfirmedGenStep)); } else { @@ -316,6 +347,12 @@ namespace NKikimr::NBlobDepot { const ui32 generation = Self->Executor()->Generation(); if (const auto it = agent.InvalidateStepRequests.find(ev->Cookie); it != agent.InvalidateStepRequests.end()) { for (const auto& [channel, invalidatedStep] : it->second) { + const ui32 channel_ = channel; + const ui32 invalidatedStep_ = invalidatedStep; + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT99, "Trim", (TabletId, Self->TabletID()), (AgentId, agent.ConnectedNodeId), + (Channel, channel_), (InvalidatedStep, invalidatedStep_), + (GivenIdRanges, Self->Channels[channel_].GivenIdRanges), + (Agent.GivenIdRanges, agent.GivenIdRanges[channel_])); Self->Channels[channel].GivenIdRanges.Trim(channel, generation, invalidatedStep); agent.GivenIdRanges[channel].Trim(channel, generation, invalidatedStep); } @@ -323,8 +360,13 @@ namespace NKikimr::NBlobDepot { } for (const auto& item : ev->Get()->Record.GetWritesInFlight()) { const auto blobSeqId = TBlobSeqId::FromProto(item); - Self->Channels[blobSeqId.Channel].GivenIdRanges.AddPoint(blobSeqId.ToSequentialNumber()); - agent.GivenIdRanges[blobSeqId.Channel].AddPoint(blobSeqId.ToSequentialNumber()); + const ui64 value = blobSeqId.ToSequentialNumber(); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT99, "WriteInFlight", (TabletId, Self->TabletID()), (AgentId, agent.ConnectedNodeId), + (BlobSeqId, blobSeqId), (Value, value), + (GivenIdRanges, Self->Channels[blobSeqId.Channel].GivenIdRanges), + (Agent.GivenIdRanges, agent.GivenIdRanges[blobSeqId.Channel])); + Self->Channels[blobSeqId.Channel].GivenIdRanges.AddPoint(value); + agent.GivenIdRanges[blobSeqId.Channel].AddPoint(value); } HandleTrash(); } @@ -342,4 +384,9 @@ namespace NKikimr::NBlobDepot { } } + bool TBlobDepot::TData::CanBeCollected(ui32 groupId, TBlobSeqId id) const { + const auto it = RecordsPerChannelGroup.find(std::make_tuple(Self->TabletID(), id.Channel, groupId)); + return it != RecordsPerChannelGroup.end() && TGenStep(id) <= it->second.IssuedGenStep; + } + } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h index 3d092bd995c..8e8b4ba84e4 100644 --- a/ydb/core/blob_depot/data.h +++ b/ydb/core/blob_depot/data.h @@ -246,8 +246,8 @@ namespace NKikimr::NBlobDepot { std::set<TLogoBlobID> Trash; // committed trash std::vector<TLogoBlobID> TrashInFlight; ui32 PerGenerationCounter = 1; - ui64 LastConfirmedGenStep = 0; - ui64 NextGenStep = 0; + TGenStep IssuedGenStep; // currently in flight or already confirmed + TGenStep LastConfirmedGenStep; bool CollectGarbageRequestInFlight = false; TRecordsPerChannelGroup(ui64 tabletId, ui8 channel, ui32 groupId) @@ -264,6 +264,7 @@ namespace NKikimr::NBlobDepot { THashMultiMap<void*, TLogoBlobID> InFlightTrash; // being committed, but not yet confirmed + class TTxIssueGC; class TTxConfirmGC; public: @@ -307,7 +308,7 @@ namespace NKikimr::NBlobDepot { void AddDataOnLoad(TKey key, TString value); void AddTrashOnLoad(TLogoBlobID id); - void AddConfirmedGenStepOnLoad(ui8 channel, ui32 groupId, ui64 confirmedGenStep); + void AddGenStepOnLoad(ui8 channel, ui32 groupId, TGenStep issuedGenStep, TGenStep confirmedGenStep); void PutKey(TKey key, TValue&& data); @@ -320,6 +321,8 @@ namespace NKikimr::NBlobDepot { void Handle(TEvBlobDepot::TEvPushNotifyResult::TPtr ev); void OnCommitConfirmedGC(ui8 channel, ui32 groupId); + bool CanBeCollected(ui32 groupId, TBlobSeqId id) const; + static TString ToValueProto(const TValue& value); template<typename TCallback> diff --git a/ydb/core/blob_depot/garbage_collection.cpp b/ydb/core/blob_depot/garbage_collection.cpp index 7e69dfa76f1..85a083f17ae 100644 --- a/ydb/core/blob_depot/garbage_collection.cpp +++ b/ydb/core/blob_depot/garbage_collection.cpp @@ -7,7 +7,10 @@ namespace NKikimr::NBlobDepot { class TBlobDepot::TBarrierServer::TTxCollectGarbage : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { std::optional<TString> Error; - std::unique_ptr<TEvBlobDepot::TEvCollectGarbage::THandle> Request; + TBarrier& Barrier; + std::unique_ptr<TEvBlobDepot::TEvCollectGarbage::THandle>& Request; + const ui64 TabletId; + const ui8 Channel; int KeepIndex = 0; int DoNotKeepIndex = 0; ui32 NumKeysProcessed = 0; @@ -16,10 +19,12 @@ namespace NKikimr::NBlobDepot { static constexpr ui32 MaxKeysToProcessAtOnce = 10'000; public: - TTxCollectGarbage(TBlobDepot *self, std::unique_ptr<TEvBlobDepot::TEvCollectGarbage::THandle> request, - ui32 keepIndex = 0, ui32 doNotKeepIndex = 0) + TTxCollectGarbage(TBlobDepot *self, ui64 tabletId, ui8 channel, ui32 keepIndex = 0, ui32 doNotKeepIndex = 0) : TTransactionBase(self) - , Request(std::move(request)) + , Barrier(Self->BarrierServer->Barriers[std::make_pair(tabletId, channel)]) + , Request(Barrier.ProcessingQ.front()) + , TabletId(tabletId) + , Channel(channel) , KeepIndex(keepIndex) , DoNotKeepIndex(doNotKeepIndex) {} @@ -42,9 +47,13 @@ namespace NKikimr::NBlobDepot { auto [response, _] = TEvBlobDepot::MakeResponseFor(*Request, Self->SelfId(), Error ? NKikimrProto::ERROR : NKikimrProto::OK, std::move(Error)); TActivationContext::Send(response.release()); + Barrier.ProcessingQ.pop_front(); Self->Data->HandleTrash(); + if (!Barrier.ProcessingQ.empty()) { + Self->Execute(std::make_unique<TTxCollectGarbage>(Self, TabletId, Channel)); + } } else { - Self->Execute(std::make_unique<TTxCollectGarbage>(Self, std::move(Request), KeepIndex, DoNotKeepIndex)); + Self->Execute(std::make_unique<TTxCollectGarbage>(Self, TabletId, Channel, KeepIndex, DoNotKeepIndex)); } } @@ -60,17 +69,23 @@ namespace NKikimr::NBlobDepot { const auto key = std::make_pair(record.GetTabletId(), record.GetChannel()); auto& barriers = Self->BarrierServer->Barriers; if (const auto it = barriers.find(key); it != barriers.end()) { + // extract existing barrier record auto& barrier = it->second; - const ui64 recordGenStep = GenStep(record.GetGeneration(), record.GetPerGenerationCounter()); - const ui64 collectGenStep = GenStep(record.GetCollectGeneration(), record.GetCollectStep()); - ui64& currentGenStep = record.GetHard() ? barrier.Hard : barrier.Soft; - if (recordGenStep < barrier.LastRecordGenStep) { + const auto barrierGenCounter = std::make_tuple(barrier.RecordGeneration, barrier.PerGenerationCounter); + const TGenStep barrierGenStep = record.GetHard() ? barrier.Hard : barrier.Soft; + + // extract new parameters from protobuf + const auto genCounter = std::make_tuple(record.GetGeneration(), record.GetPerGenerationCounter()); + const TGenStep collectGenStep(record.GetCollectGeneration(), record.GetCollectStep()); + + // validate them + if (genCounter < barrierGenCounter) { Error = "record generation:counter is obsolete"; - } else if (recordGenStep == barrier.LastRecordGenStep) { - if (currentGenStep != collectGenStep) { + } else if (genCounter == barrierGenCounter) { + if (barrierGenStep != collectGenStep) { Error = "repeated command with different collect parameters received"; } - } else if (collectGenStep < currentGenStep) { + } else if (collectGenStep < barrierGenStep) { Error = "decreasing barrier"; } } @@ -133,18 +148,21 @@ namespace NKikimr::NBlobDepot { const auto key = std::make_pair(record.GetTabletId(), record.GetChannel()); auto& barriers = Self->BarrierServer->Barriers; auto& barrier = barriers[key]; - const ui64 recordGenStep = GenStep(record.GetGeneration(), record.GetPerGenerationCounter()); - const ui64 collectGenStep = GenStep(record.GetCollectGeneration(), record.GetCollectStep()); - ui64& currentGenStep = record.GetHard() ? barrier.Hard : barrier.Soft; - Y_VERIFY(barrier.LastRecordGenStep <= recordGenStep); - barrier.LastRecordGenStep = recordGenStep; - Y_VERIFY(currentGenStep <= collectGenStep); - currentGenStep = collectGenStep; + auto barrierGenCounter = std::tie(barrier.RecordGeneration, barrier.PerGenerationCounter); + TGenStep& barrierGenStep = record.GetHard() ? barrier.Hard : barrier.Soft; + + const auto genCounter = std::make_tuple(record.GetGeneration(), record.GetPerGenerationCounter()); + const TGenStep collectGenStep(record.GetCollectGeneration(), record.GetCollectStep()); + Y_VERIFY(barrierGenCounter <= genCounter); + barrierGenCounter = genCounter; + Y_VERIFY(barrierGenStep <= collectGenStep); + barrierGenStep = collectGenStep; db.Table<Schema::Barriers>().Key(record.GetTabletId(), record.GetChannel()).Update( - NIceDb::TUpdate<Schema::Barriers::LastRecordGenStep>(recordGenStep), - NIceDb::TUpdate<Schema::Barriers::Soft>(barrier.Soft), - NIceDb::TUpdate<Schema::Barriers::Hard>(barrier.Hard) + NIceDb::TUpdate<Schema::Barriers::RecordGeneration>(std::get<0>(genCounter)), + NIceDb::TUpdate<Schema::Barriers::PerGenerationCounter>(std::get<1>(genCounter)), + NIceDb::TUpdate<Schema::Barriers::Soft>(ui64(barrier.Soft)), + NIceDb::TUpdate<Schema::Barriers::Hard>(ui64(barrier.Hard)) ); } @@ -152,30 +170,34 @@ namespace NKikimr::NBlobDepot { } }; - void TBlobDepot::TBarrierServer::AddBarrierOnLoad(ui64 tabletId, ui8 channel, ui64 lastRecordGenStep, - ui64 soft, ui64 hard) { + void TBlobDepot::TBarrierServer::AddBarrierOnLoad(ui64 tabletId, ui8 channel, ui32 recordGeneration, ui32 perGenerationCounter, + TGenStep soft, TGenStep hard) { Barriers[std::make_pair(tabletId, channel)] = { - .LastRecordGenStep = lastRecordGenStep, + .RecordGeneration = recordGeneration, + .PerGenerationCounter = perGenerationCounter, .Soft = soft, .Hard = hard, }; } void TBlobDepot::TBarrierServer::Handle(TEvBlobDepot::TEvCollectGarbage::TPtr ev) { - Self->Execute(std::make_unique<TTxCollectGarbage>(Self, - std::unique_ptr<TEvBlobDepot::TEvCollectGarbage::THandle>(ev.Release()))); + const auto& record = ev->Get()->Record; + const auto key = std::make_pair(record.GetTabletId(), record.GetChannel()); + auto& barrier = Barriers[key]; + barrier.ProcessingQ.emplace_back(ev.Release()); + if (barrier.ProcessingQ.size() == 1) { + Self->Execute(std::make_unique<TTxCollectGarbage>(Self, record.GetTabletId(), record.GetChannel())); + } } bool TBlobDepot::TBarrierServer::CheckBlobForBarrier(TLogoBlobID id) const { - const auto key = std::make_pair(id.TabletID(), id.Channel()); - const auto it = Barriers.find(key); - return it == Barriers.end() || GenStep(id) > Max(it->second.Soft, it->second.Hard); + const auto it = Barriers.find(std::make_pair(id.TabletID(), id.Channel())); + return it == Barriers.end() || TGenStep(id) > Max(it->second.Soft, it->second.Hard); } void TBlobDepot::TBarrierServer::GetBlobBarrierRelation(TLogoBlobID id, bool *underSoft, bool *underHard) const { - const auto key = std::make_pair(id.TabletID(), id.Channel()); - const auto it = Barriers.find(key); - const ui64 genStep = GenStep(id); + const auto it = Barriers.find(std::make_pair(id.TabletID(), id.Channel())); + const TGenStep genStep(id); *underSoft = it == Barriers.end() ? false : genStep <= it->second.Soft; *underHard = it == Barriers.end() ? false : genStep <= it->second.Hard; } diff --git a/ydb/core/blob_depot/garbage_collection.h b/ydb/core/blob_depot/garbage_collection.h index c6c6a4ac2b8..3164828fda3 100644 --- a/ydb/core/blob_depot/garbage_collection.h +++ b/ydb/core/blob_depot/garbage_collection.h @@ -9,9 +9,11 @@ namespace NKikimr::NBlobDepot { TBlobDepot* const Self; struct TBarrier { - ui64 LastRecordGenStep = 0; - ui64 Soft = 0; - ui64 Hard = 0; + ui32 RecordGeneration = 0; + ui32 PerGenerationCounter = 0; + TGenStep Soft; + TGenStep Hard; + std::deque<std::unique_ptr<TEvBlobDepot::TEvCollectGarbage::THandle>> ProcessingQ; }; THashMap<std::pair<ui64, ui8>, TBarrier> Barriers; @@ -24,7 +26,8 @@ namespace NKikimr::NBlobDepot { : Self(self) {} - void AddBarrierOnLoad(ui64 tabletId, ui8 channel, ui64 lastRecordGenStep, ui64 soft, ui64 hard); + void AddBarrierOnLoad(ui64 tabletId, ui8 channel, ui32 recordGeneration, ui32 perGenerationCounter, TGenStep soft, + TGenStep hard); void Handle(TEvBlobDepot::TEvCollectGarbage::TPtr ev); bool CheckBlobForBarrier(TLogoBlobID id) const; void GetBlobBarrierRelation(TLogoBlobID id, bool *underSoft, bool *underHard) const; @@ -32,9 +35,7 @@ namespace NKikimr::NBlobDepot { template<typename TCallback> void Enumerate(TCallback&& callback) { for (const auto& [key, value] : Barriers) { - callback(key.first, key.second, static_cast<ui32>(value.LastRecordGenStep >> 32), - static_cast<ui32>(value.LastRecordGenStep), static_cast<ui32>(value.Soft >> 32), - static_cast<ui32>(value.Soft), static_cast<ui32>(value.Hard >> 32), static_cast<ui32>(value.Hard)); + callback(key.first, key.second, value.RecordGeneration, value.PerGenerationCounter, value.Soft, value.Hard); } } }; diff --git a/ydb/core/blob_depot/given_id_range.cpp b/ydb/core/blob_depot/given_id_range.cpp index 91864653753..74cd9a4e8bd 100644 --- a/ydb/core/blob_depot/given_id_range.cpp +++ b/ydb/core/blob_depot/given_id_range.cpp @@ -4,8 +4,10 @@ namespace NKikimr::NBlobDepot { void TGivenIdRange::IssueNewRange(ui64 begin, ui64 end) { Y_VERIFY(begin < end); + const auto [it, inserted] = Ranges.emplace(begin, end); Y_VERIFY(inserted); + if (it != Ranges.begin()) { const auto& prev = *std::prev(it); Y_VERIFY(prev.End <= begin); @@ -14,6 +16,7 @@ namespace NKikimr::NBlobDepot { const auto& next = *std::next(it); Y_VERIFY(end <= next.Begin); } + NumAvailableItems += end - begin; } @@ -22,16 +25,9 @@ namespace NKikimr::NBlobDepot { } void TGivenIdRange::RemovePoint(ui64 value) { - const auto it = Ranges.upper_bound(value); + auto it = Ranges.upper_bound(value); Y_VERIFY(it != Ranges.begin()); - auto& range = const_cast<TRange&>(*std::prev(it)); - Y_VERIFY(range.Begin <= value && value < range.End); - Y_VERIFY(range.Bits[value - range.Begin]); - range.Bits.Reset(value - range.Begin); - if (range.Bits.Empty()) { - Ranges.erase(it); - } - --NumAvailableItems; + Pop(std::prev(it), value); } bool TGivenIdRange::IsEmpty() const { @@ -45,23 +41,15 @@ namespace NKikimr::NBlobDepot { ui64 TGivenIdRange::GetMinimumValue() const { Y_VERIFY(!Ranges.empty()); const auto& range = *Ranges.begin(); + Y_VERIFY(range.NumSetBits); size_t offset = range.Bits.FirstNonZeroBit(); Y_VERIFY(offset != range.Bits.Size()); return range.Begin + offset; } ui64 TGivenIdRange::Allocate() { - Y_VERIFY(!Ranges.empty()); - const auto it = Ranges.begin(); - auto& range = const_cast<TRange&>(*it); - size_t offset = range.Bits.FirstNonZeroBit(); - Y_VERIFY(offset != range.Bits.Size()); - range.Bits.Reset(offset); - const ui64 value = range.Begin + offset; - if (range.Bits.Empty()) { - Ranges.erase(it); - } - --NumAvailableItems; + const ui64 value = GetMinimumValue(); + Pop(Ranges.begin(), value); return value; } @@ -77,6 +65,7 @@ namespace NKikimr::NBlobDepot { } else if (range.Begin < validSince) { const ui32 len = validSince - range.Begin; for (ui32 i = 0; i < len; ++i) { + range.NumSetBits -= range.Bits[i]; NumAvailableItems -= range.Bits[i]; } range.Bits.Reset(0, len); @@ -91,8 +80,9 @@ namespace NKikimr::NBlobDepot { const auto it = Ranges.find(range.Begin); Y_VERIFY(it != Ranges.end()); Y_VERIFY(range.End == it->End); + Y_VERIFY(range.NumSetBits == it->NumSetBits); Y_VERIFY(range.Bits == it->Bits); - NumAvailableItems -= range.Bits.Count(); + NumAvailableItems -= range.NumSetBits; Ranges.erase(it); } } @@ -103,11 +93,10 @@ namespace NKikimr::NBlobDepot { if (it != Ranges.begin()) { s << " "; } - s << it->Begin << "-" << it->End << "["; + s << "[" << it->Begin << "," << it->End << "):"; for (ui32 i = 0, count = it->End - it->Begin; i < count; ++i) { s << int(it->Bits[i]); } - s << "]"; } s << "}"; } @@ -118,4 +107,17 @@ namespace NKikimr::NBlobDepot { return s.Str(); } + void TGivenIdRange::Pop(TRanges::iterator it, ui64 value) { + TRange& range = const_cast<TRange&>(*it); + Y_VERIFY(range.Begin <= value && value < range.End); + const size_t offset = value - range.Begin; + Y_VERIFY(range.Bits[offset]); + range.Bits.Reset(offset); + --range.NumSetBits; + if (!range.NumSetBits) { + Ranges.erase(it); + } + --NumAvailableItems; + } + } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/mon_main.cpp b/ydb/core/blob_depot/mon_main.cpp index f32db510828..7f5544420e1 100644 --- a/ydb/core/blob_depot/mon_main.cpp +++ b/ydb/core/blob_depot/mon_main.cpp @@ -198,14 +198,14 @@ namespace NKikimr::NBlobDepot { TABLEH() { Stream << "soft"; } TABLEH() { Stream << "hard"; } } else { - Self->BarrierServer->Enumerate([&](ui64 tabletId, ui8 channel, ui32 recordGen, ui32 recordCounter, - ui32 softGen, ui32 softStep, ui32 hardGen, ui32 hardStep) { + Self->BarrierServer->Enumerate([&](ui64 tabletId, ui8 channel, ui32 recordGen, ui32 perGenerationCounter, + TGenStep soft, TGenStep hard) { TABLER() { TABLED() { Stream << tabletId; } TABLED() { Stream << int(channel); } - TABLED() { Stream << recordGen << ":" << recordCounter; } - TABLED() { Stream << softGen << ":" << softStep; } - TABLED() { Stream << hardGen << ":" << hardStep; } + TABLED() { Stream << recordGen << ":" << perGenerationCounter; } + TABLED() { soft.Output(Stream); } + TABLED() { hard.Output(Stream); } } }); } diff --git a/ydb/core/blob_depot/op_commit_blob_seq.cpp b/ydb/core/blob_depot/op_commit_blob_seq.cpp index 1c65c6a19cc..bd85b9eda49 100644 --- a/ydb/core/blob_depot/op_commit_blob_seq.cpp +++ b/ydb/core/blob_depot/op_commit_blob_seq.cpp @@ -23,6 +23,7 @@ namespace NKikimr::NBlobDepot { std::tie(Response, responseRecord) = TEvBlobDepot::MakeResponseFor(*Request, Self->SelfId()); TAgent& agent = Self->GetAgent(Request->Recipient); + const ui32 generation = Self->Executor()->Generation(); for (const auto& item : Request->Get()->Record.GetItems()) { auto *responseItem = responseRecord->AddItems(); @@ -36,7 +37,20 @@ namespace NKikimr::NBlobDepot { auto *locator = chain->MutableLocator(); locator->CopyFrom(item.GetBlobLocator()); - MarkGivenIdCommitted(agent, TBlobSeqId::FromProto(locator->GetBlobSeqId())); + const auto blobSeqId = TBlobSeqId::FromProto(locator->GetBlobSeqId()); + const bool canBeCollected = Self->Data->CanBeCollected(locator->GetGroupId(), blobSeqId); + + if (blobSeqId.Generation == generation) { + // check for internal sanity -- we can't issue barriers on given ids without confirmed trimming + Y_VERIFY(!canBeCollected); + } else if (canBeCollected) { + // we can't accept this record, because it is potentially under already issued barrier + responseItem->SetStatus(NKikimrProto::ERROR); + responseItem->SetErrorReason("generation race"); + continue; + } + + MarkGivenIdCommitted(agent, blobSeqId); if (!CheckKeyAgainstBarrier(item.GetKey(), responseItem)) { continue; @@ -63,11 +77,13 @@ namespace NKikimr::NBlobDepot { Y_VERIFY(blobSeqId.Generation == Self->Executor()->Generation()); Y_VERIFY(blobSeqId.Channel < Self->Channels.size()); - auto& channel = Self->Channels[blobSeqId.Channel]; - const ui64 value = blobSeqId.ToSequentialNumber(); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT99, "MarkGivenIdCommitted", (TabletId, Self->TabletID()), + (AgentId, agent.ConnectedNodeId), (BlobSeqId, blobSeqId), (Value, value), + (GivenIdRanges, Self->Channels[blobSeqId.Channel].GivenIdRanges), + (Agent.GivenIdRanges, agent.GivenIdRanges[blobSeqId.Channel])); agent.GivenIdRanges[blobSeqId.Channel].RemovePoint(value); - channel.GivenIdRanges.RemovePoint(value); + Self->Channels[blobSeqId.Channel].GivenIdRanges.RemovePoint(value); } bool CheckKeyAgainstBarrier(const TString& key, NKikimrBlobDepot::TEvCommitBlobSeqResult::TItem *responseItem) { diff --git a/ydb/core/blob_depot/op_load.cpp b/ydb/core/blob_depot/op_load.cpp index c15641b18d3..b2efbaeac2a 100644 --- a/ydb/core/blob_depot/op_load.cpp +++ b/ydb/core/blob_depot/op_load.cpp @@ -65,9 +65,10 @@ namespace NKikimr::NBlobDepot { Self->BarrierServer->AddBarrierOnLoad( table.GetValue<Schema::Barriers::TabletId>(), table.GetValue<Schema::Barriers::Channel>(), - table.GetValue<Schema::Barriers::LastRecordGenStep>(), - table.GetValue<Schema::Barriers::Soft>(), - table.GetValue<Schema::Barriers::Hard>() + table.GetValue<Schema::Barriers::RecordGeneration>(), + table.GetValue<Schema::Barriers::PerGenerationCounter>(), + TGenStep(table.GetValue<Schema::Barriers::Soft>()), + TGenStep(table.GetValue<Schema::Barriers::Hard>()) ); if (!table.Next()) { return false; @@ -107,16 +108,18 @@ namespace NKikimr::NBlobDepot { } } - // ConfirmedGC + // GC { - auto table = db.Table<Schema::ConfirmedGC>().Select(); + using T = Schema::GC; + auto table = db.Table<T>().Select(); if (!table.IsReady()) { return false; } while (table.IsValid()) { - Self->Data->AddConfirmedGenStepOnLoad(table.GetValue<Schema::ConfirmedGC::Channel>(), - table.GetValue<Schema::ConfirmedGC::GroupId>(), - table.GetValue<Schema::ConfirmedGC::ConfirmedGenStep>()); + Self->Data->AddGenStepOnLoad(table.GetValue<T::Channel>(), + table.GetValue<T::GroupId>(), + TGenStep(table.GetValueOrDefault<T::IssuedGenStep>()), + TGenStep(table.GetValueOrDefault<T::ConfirmedGenStep>())); if (!table.Next()) { return false; } @@ -132,7 +135,7 @@ namespace NKikimr::NBlobDepot { auto barriers = db.Table<Schema::Barriers>().Select(); auto data = db.Table<Schema::Data>().Select(); auto trash = db.Table<Schema::Trash>().Select(); - auto confirmedGC = db.Table<Schema::ConfirmedGC>().Select(); + auto confirmedGC = db.Table<Schema::GC>().Select(); return config.IsReady() && blocks.IsReady() && barriers.IsReady() && data.IsReady() && trash.IsReady() && confirmedGC.IsReady(); } diff --git a/ydb/core/blob_depot/schema.h b/ydb/core/blob_depot/schema.h index 057f91cad17..52a3f157503 100644 --- a/ydb/core/blob_depot/schema.h +++ b/ydb/core/blob_depot/schema.h @@ -40,15 +40,17 @@ namespace NKikimr::NBlobDepot { struct Barriers : Table<3> { struct TabletId : Column<1, NScheme::NTypeIds::Uint64> {}; struct Channel : Column<2, NScheme::NTypeIds::Uint8> {}; - struct LastRecordGenStep : Column<3, NScheme::NTypeIds::Uint64> {}; - struct Soft : Column<4, NScheme::NTypeIds::Uint64> {}; - struct Hard : Column<5, NScheme::NTypeIds::Uint64> {}; + struct RecordGeneration : Column<3, NScheme::NTypeIds::Uint32> {}; + struct PerGenerationCounter : Column<4, NScheme::NTypeIds::Uint32> {}; + struct Soft : Column<5, NScheme::NTypeIds::Uint64> {}; + struct Hard : Column<6, NScheme::NTypeIds::Uint64> {}; using TKey = TableKey<TabletId, Channel>; using TColumns = TableColumns< TabletId, Channel, - LastRecordGenStep, + RecordGeneration, + PerGenerationCounter, Soft, Hard >; @@ -75,13 +77,14 @@ namespace NKikimr::NBlobDepot { using TColumns = TableColumns<BlobId>; }; - struct ConfirmedGC : Table<6> { + struct GC : Table<6> { struct Channel : Column<1, NScheme::NTypeIds::Uint8> {}; struct GroupId : Column<2, NScheme::NTypeIds::Uint32> {}; - struct ConfirmedGenStep : Column<3, NScheme::NTypeIds::Uint64> {}; + struct IssuedGenStep : Column<3, NScheme::NTypeIds::Uint64> { static constexpr Type Default = 0; }; + struct ConfirmedGenStep : Column<4, NScheme::NTypeIds::Uint64> { static constexpr Type Default = 0; }; using TKey = TableKey<Channel, GroupId>; - using TColumns = TableColumns<Channel, GroupId, ConfirmedGenStep>; + using TColumns = TableColumns<Channel, GroupId, IssuedGenStep, ConfirmedGenStep>; }; using TTables = SchemaTables< @@ -90,7 +93,7 @@ namespace NKikimr::NBlobDepot { Barriers, Data, Trash, - ConfirmedGC + GC >; using TSettings = SchemaSettings< diff --git a/ydb/core/blob_depot/types.h b/ydb/core/blob_depot/types.h index 147c61a6f62..8cf0e3899ab 100644 --- a/ydb/core/blob_depot/types.h +++ b/ydb/core/blob_depot/types.h @@ -15,7 +15,6 @@ namespace NKikimr::NBlobDepot { struct TVirtualGroupBlobFooter { TLogoBlobID StoredBlobId; }; - #pragma pack(pop) static constexpr ui32 MaxBlobSize = 10 << 20; // 10 MB BlobStorage hard limit @@ -101,11 +100,13 @@ namespace NKikimr::NBlobDepot { struct TRange { const ui64 Begin; const ui64 End; + ui32 NumSetBits = 0; TDynBitMap Bits; TRange(ui64 begin, ui64 end) : Begin(begin) , End(end) + , NumSetBits(end - begin) { Bits.Set(0, end - begin); } @@ -118,7 +119,8 @@ namespace NKikimr::NBlobDepot { }; }; - std::set<TRange, TRange::TCompare> Ranges; + using TRanges = std::set<TRange, TRange::TCompare>; // FIXME: deque? + TRanges Ranges; ui32 NumAvailableItems = 0; public: @@ -137,6 +139,9 @@ namespace NKikimr::NBlobDepot { void Output(IOutputStream& s) const; TString ToString() const; + + private: + void Pop(TRanges::iterator it, ui64 value); }; using TValueChain = NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TValueChain>; @@ -156,16 +161,57 @@ namespace NKikimr::NBlobDepot { } } - inline ui64 GenStep(ui32 gen, ui32 step) { - return static_cast<ui64>(gen) << 32 | step; - } + class TGenStep { + ui64 Value = 0; - inline ui64 GenStep(TLogoBlobID id) { - return GenStep(id.Generation(), id.Step()); - } + public: + TGenStep() = default; + TGenStep(const TGenStep&) = default; - inline ui64 GenStep(TBlobSeqId id) { - return GenStep(id.Generation, id.Step); - } + explicit TGenStep(ui64 value) + : Value(value) + {} + + TGenStep(ui32 gen, ui32 step) + : Value(ui64(gen) << 32 | step) + {} + + explicit TGenStep(const TLogoBlobID& id) + : TGenStep(id.Generation(), id.Step()) + {} + + explicit TGenStep(const TBlobSeqId& id) + : TGenStep(id.Generation, id.Step) + {} + + explicit operator ui64() const { + return Value; + } + + ui32 Generation() const { + return Value >> 32; + } + + ui32 Step() const { + return Value; + } + + void Output(IOutputStream& s) const { + s << Generation() << ":" << Step(); + } + + TString ToString() const { + TStringStream s; + Output(s); + return s.Str(); + } + + friend bool operator ==(const TGenStep& x, const TGenStep& y) { return x.Value == y.Value; } + friend bool operator !=(const TGenStep& x, const TGenStep& y) { return x.Value != y.Value; } + friend bool operator < (const TGenStep& x, const TGenStep& y) { return x.Value < y.Value; } + friend bool operator <=(const TGenStep& x, const TGenStep& y) { return x.Value <= y.Value; } + friend bool operator > (const TGenStep& x, const TGenStep& y) { return x.Value > y.Value; } + friend bool operator >=(const TGenStep& x, const TGenStep& y) { return x.Value >= y.Value; } + }; } // NKikimr::NBlobDepot |