diff options
author | alexvru <alexvru@ydb.tech> | 2022-07-09 22:21:01 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2022-07-09 22:21:01 +0300 |
commit | d6bc1736fb9c0e5ccd815586513eb0ec8c869ee7 (patch) | |
tree | 871afe011c4ec5434793136be9b39903b12c01f3 | |
parent | 22acf19be42357b6bb0e7d601b0dc28695191463 (diff) | |
download | ydb-d6bc1736fb9c0e5ccd815586513eb0ec8c869ee7.tar.gz |
BlobDepot work in progress
30 files changed, 565 insertions, 306 deletions
diff --git a/ydb/core/blob_depot/agent.cpp b/ydb/core/blob_depot/agent.cpp index 00d18d2168..7cecc58d31 100644 --- a/ydb/core/blob_depot/agent.cpp +++ b/ydb/core/blob_depot/agent.cpp @@ -1,4 +1,5 @@ #include "blob_depot_tablet.h" +#include "data.h" namespace NKikimr::NBlobDepot { @@ -25,21 +26,30 @@ namespace NKikimr::NBlobDepot { PipeServerToNode.erase(it); } - void TBlobDepot::OnAgentDisconnect(TAgent& /*agent*/) { + void TBlobDepot::OnAgentDisconnect(TAgent& agent) { + agent.InvalidateStepRequests.clear(); } void TBlobDepot::Handle(TEvBlobDepot::TEvRegisterAgent::TPtr ev) { + const auto& req = ev->Get()->Record; + const auto it = PipeServerToNode.find(ev->Recipient); Y_VERIFY(it != PipeServerToNode.end()); const ui32 nodeId = ev->Sender.NodeId(); Y_VERIFY(!it->second || *it->second == nodeId); it->second = nodeId; auto& agent = Agents[nodeId]; - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT03, "TEvRegisterAgent", (TabletId, TabletID()), (Msg, ev->Get()->Record), - (NodeId, nodeId), (PipeServerId, it->first)); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT03, "TEvRegisterAgent", (TabletId, TabletID()), (Msg, req), (NodeId, nodeId), + (PipeServerId, it->first)); agent.ConnectedAgent = it->first; agent.ConnectedNodeId = nodeId; agent.ExpirationTimestamp = TInstant::Max(); + + if (agent.AgentInstanceId && *agent.AgentInstanceId != req.GetAgentInstanceId()) { + ResetAgent(agent); + } + agent.AgentInstanceId = req.GetAgentInstanceId(); + OnAgentConnect(agent); auto [response, record] = TEvBlobDepot::MakeResponseFor(*ev, SelfId()); @@ -66,28 +76,55 @@ namespace NKikimr::NBlobDepot { (PipeServerId, ev->Recipient)); const ui32 generation = Executor()->Generation(); - auto [response, record] = TEvBlobDepot::MakeResponseFor(*ev, SelfId(), ev->Get()->Record.GetChannelKind(), generation); if (const auto it = ChannelKinds.find(record->GetChannelKind()); it != ChannelKinds.end()) { auto& kind = it->second; + auto *givenIdRange = record->MutableGivenIdRange(); - const ui64 rangeBegin = kind.NextBlobSeqId; - kind.NextBlobSeqId += ev->Get()->Record.GetCount(); - const ui64 rangeEnd = kind.NextBlobSeqId; + // FIXME: optimize for faster range selection - TGivenIdRange range; - range.IssueNewRange(rangeBegin, rangeEnd); + // create array of channels appropriate for current selection + std::vector<TChannelInfo*> channels; + channels.reserve(kind.ChannelGroups.size()); + for (const auto& [channel, _] : kind.ChannelGroups) { + Y_VERIFY_DEBUG(channel < Channels.size() && Channels[channel].ChannelKind == it->first); + channels.push_back(&Channels[channel]); + } - range.ToProto(record->MutableGivenIdRange()); + // make a min heap + auto comp = [](TChannelInfo *x, const TChannelInfo *y) { return x->NextBlobSeqId > y->NextBlobSeqId; }; + std::make_heap(channels.begin(), channels.end(), comp); + + THashMap<ui8, NKikimrBlobDepot::TGivenIdRange::TChannelRange*> issuedRanges; + for (ui32 i = 0, count = ev->Get()->Record.GetCount(); i < count; ++i) { + // extract element with the least NextBlobSeqId value + std::pop_heap(channels.begin(), channels.end(), comp); + + // map it to channel index + TChannelInfo *channel = channels.back(); + const ui64 value = channel->NextBlobSeqId; + const ui8 channelIndex = channel - Channels.data(); + + // fill in range item + auto& range = issuedRanges[channelIndex]; + if (!range || range->GetEnd() != value) { + range = givenIdRange->AddChannelRanges(); + range->SetChannel(channelIndex); + range->SetBegin(value); + } + range->SetEnd(value + 1); - TAgent& agent = GetAgent(ev->Recipient); - agent.ChannelKinds[it->first].GivenIdRanges.Join(TGivenIdRange(range)); - kind.GivenIdRanges.Join(std::move(range)); + // update NextBlobSeqId value and put back into heap + ++channel->NextBlobSeqId; + std::push_heap(channels.begin(), channels.end(), comp); + } - for (ui64 value = rangeBegin; value < rangeEnd; ++value) { - const auto blobSeqId = TBlobSeqId::FromBinary(generation, kind, value); - PerChannelRecords[blobSeqId.Channel].GivenStepIndex.emplace(blobSeqId.Step, blobSeqId.Index); + // register issued ranges in agent and global records + TAgent& agent = GetAgent(ev->Recipient); + for (const auto& range : givenIdRange->GetChannelRanges()) { + agent.GivenIdRanges[range.GetChannel()].IssueNewRange(range.GetBegin(), range.GetEnd()); + Channels[range.GetChannel()].GivenIdRanges.IssueNewRange(range.GetBegin(), range.GetEnd()); } } @@ -109,12 +146,21 @@ namespace NKikimr::NBlobDepot { return agentIt->second; } + void TBlobDepot::ResetAgent(TAgent& agent) { + for (auto& [channel, agentGivenIdRange] : agent.GivenIdRanges) { + Channels[channel].GivenIdRanges.Subtract(agentGivenIdRange); + agentGivenIdRange = {}; + } + Data->HandleTrash(); + } + void TBlobDepot::InitChannelKinds() { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT05, "InitChannelKinds", (TabletId, TabletID())); + TTabletStorageInfo *info = Info(); const ui32 generation = Executor()->Generation(); - Y_VERIFY(ChannelToKind.empty()); - ChannelToKind.resize(info->Channels.size(), NKikimrBlobDepot::TChannelKind::System); + Y_VERIFY(Channels.empty()); ui32 channel = 0; for (const auto& profile : Config.GetChannelProfiles()) { @@ -124,19 +170,22 @@ namespace NKikimr::NBlobDepot { auto& p = ChannelKinds[kind]; p.ChannelToIndex[channel] = p.ChannelGroups.size(); p.ChannelGroups.emplace_back(channel, info->GroupFor(channel, generation)); - - Y_VERIFY(channel < ChannelToKind.size()); - ChannelToKind[channel] = kind; + Channels.push_back({ + kind, + &p, + {}, + TBlobSeqId{channel, generation, 1, 0}.ToSequentialNumber(), + }); + } else { + Channels.push_back({ + NKikimrBlobDepot::TChannelKind::System, + nullptr, + {}, + 0 + }); } } } - - Y_VERIFY_S(channel == ChannelToKind.size(), "channel# " << channel - << " ChannelToKind.size# " << ChannelToKind.size()); - - for (auto& [k, v] : ChannelKinds) { - v.NextBlobSeqId = TBlobSeqId{v.ChannelGroups.front().first, generation, 1, 0}.ToBinary(v); - } } } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/CMakeLists.txt b/ydb/core/blob_depot/agent/CMakeLists.txt index a83c176d8d..ef0395b9b5 100644 --- a/ydb/core/blob_depot/agent/CMakeLists.txt +++ b/ydb/core/blob_depot/agent/CMakeLists.txt @@ -18,6 +18,7 @@ target_sources(core-blob_depot-agent PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/blob_mapping_cache.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/blocks.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/channel_kind.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/comm.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/garbage.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/proxy.cpp diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h index 75c4b88bdb..d81e72cb04 100644 --- a/ydb/core/blob_depot/agent/agent_impl.h +++ b/ydb/core/blob_depot/agent/agent_impl.h @@ -76,6 +76,7 @@ namespace NKikimr::NBlobDepot { , public TRequestSender { const ui32 VirtualGroupId; + const ui64 AgentInstanceId; ui64 TabletId = Max<ui64>(); TActorId PipeId; @@ -84,6 +85,7 @@ namespace NKikimr::NBlobDepot { : TActor(&TThis::StateFunc) , TRequestSender(*this) , VirtualGroupId(virtualGroupId) + , AgentInstanceId(RandomNumber<ui64>()) , BlocksManager(CreateBlocksManager()) , BlobMappingCache(CreateBlobMappingCache()) { @@ -249,8 +251,14 @@ namespace NKikimr::NBlobDepot { const NKikimrBlobDepot::TChannelKind::E Kind; bool IdAllocInFlight = false; - TGivenIdRange GivenIdRange; - static constexpr size_t PreallocatedIdCount = 2; + + struct TGivenIdRangeHeapComp; + using TGivenIdRangePerChannel = THashMap<ui8, TGivenIdRange>; + TGivenIdRangePerChannel GivenIdRangePerChannel; + std::vector<TGivenIdRangePerChannel::value_type*> GivenIdRangeHeap; + ui32 NumAvailableItems = 0; + + std::set<TBlobSeqId> WritesInFlight; TIntrusiveList<TQuery, TPendingId> QueriesWaitingForId; @@ -258,37 +266,20 @@ namespace NKikimr::NBlobDepot { : Kind(kind) {} - std::optional<TBlobSeqId> Allocate(TBlobDepotAgent& agent) { - if (GivenIdRange.IsEmpty()) { - return std::nullopt; - } - auto blobSeqId = TBlobSeqId::FromBinary(agent.BlobDepotGeneration, *this, GivenIdRange.Allocate()); - agent.IssueAllocateIdsIfNeeded(*this); - return blobSeqId; - } - - std::pair<TLogoBlobID, ui32> MakeBlobId(TBlobDepotAgent& agent, const TBlobSeqId& blobSeqId, EBlobType type, ui32 part, - ui32 size) const { - auto id = blobSeqId.MakeBlobId(agent.TabletId, type, part, size); - const auto [channel, groupId] = ChannelGroups[ChannelToIndex[blobSeqId.Channel]]; - Y_VERIFY_DEBUG(channel == blobSeqId.Channel); - return {id, groupId}; - } - - void EnqueueQueryWaitingForId(TQuery *query) { - QueriesWaitingForId.PushBack(query); - } + void IssueGivenIdRange(const NKikimrBlobDepot::TGivenIdRange& proto); + ui32 GetNumAvailableItems() const; + std::optional<TBlobSeqId> Allocate(TBlobDepotAgent& agent); + std::pair<TLogoBlobID, ui32> MakeBlobId(TBlobDepotAgent& agent, const TBlobSeqId& blobSeqId, EBlobType type, + ui32 part, ui32 size) const; + void Trim(ui8 channel, ui32 generation, ui32 invalidatedStep); + void RebuildHeap(); - void ProcessQueriesWaitingForId() { - TIntrusiveList<TQuery, TPendingId> temp; - temp.Swap(QueriesWaitingForId); - for (TQuery& query : temp) { - query.OnIdAllocated(); - } - } + void EnqueueQueryWaitingForId(TQuery *query); + void ProcessQueriesWaitingForId(); }; THashMap<NKikimrBlobDepot::TChannelKind::E, TChannelKind> ChannelKinds; + THashMap<ui8, TChannelKind*> ChannelToKind; void IssueAllocateIdsIfNeeded(TChannelKind& kind); diff --git a/ydb/core/blob_depot/agent/blocks.cpp b/ydb/core/blob_depot/agent/blocks.cpp index a3b070799f..e700e2c3d7 100644 --- a/ydb/core/blob_depot/agent/blocks.cpp +++ b/ydb/core/blob_depot/agent/blocks.cpp @@ -53,7 +53,7 @@ namespace NKikimr::NBlobDepot { block.RefreshInFlight = true; block.PendingBlockChecks.PushBack(query); } - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA99, "CheckBlockForTablet", (QueryId, query->GetQueryId()), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA01, "CheckBlockForTablet", (QueryId, query->GetQueryId()), (TabletId, tabletId), (Generation, generation), (Status, status), (Now, now), (ExpirationTimestamp, block.ExpirationTimestamp)); return status; @@ -74,7 +74,7 @@ namespace NKikimr::NBlobDepot { void Handle(TRequestContext::TPtr context, NKikimrBlobDepot::TEvQueryBlocksResult& msg) { auto& queryBlockContext = context->Obtain<TQueryBlockContext>(); - STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA01, "TEvQueryBlocksResult", (VirtualGroupId, Agent.VirtualGroupId), + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA02, "TEvQueryBlocksResult", (VirtualGroupId, Agent.VirtualGroupId), (Msg, msg), (TabletId, queryBlockContext.TabletId)); auto& block = Blocks[queryBlockContext.TabletId]; Y_VERIFY(msg.BlockedGenerationsSize() == 1); diff --git a/ydb/core/blob_depot/agent/channel_kind.cpp b/ydb/core/blob_depot/agent/channel_kind.cpp new file mode 100644 index 0000000000..6cbae9e017 --- /dev/null +++ b/ydb/core/blob_depot/agent/channel_kind.cpp @@ -0,0 +1,82 @@ +#include "agent_impl.h" + +namespace NKikimr::NBlobDepot { + + struct TBlobDepotAgent::TChannelKind::TGivenIdRangeHeapComp { + using TValue = TBlobDepotAgent::TChannelKind::TGivenIdRangePerChannel::value_type*; + + bool operator ()(TValue x, TValue y) const { + return x->second.GetMinimumValue() > y->second.GetMinimumValue(); + } + }; + + void TBlobDepotAgent::TChannelKind::IssueGivenIdRange(const NKikimrBlobDepot::TGivenIdRange& proto) { + for (const auto& range : proto.GetChannelRanges()) { + GivenIdRangePerChannel[range.GetChannel()].IssueNewRange(range.GetBegin(), range.GetEnd()); + NumAvailableItems += range.GetEnd() - range.GetBegin(); + } + + // build min-heap for ids + RebuildHeap(); + + ProcessQueriesWaitingForId(); + } + + ui32 TBlobDepotAgent::TChannelKind::GetNumAvailableItems() const { + return NumAvailableItems; + } + + std::optional<TBlobSeqId> TBlobDepotAgent::TChannelKind::Allocate(TBlobDepotAgent& agent) { + if (GivenIdRangeHeap.empty()) { + return std::nullopt; + } + + std::pop_heap(GivenIdRangeHeap.begin(), GivenIdRangeHeap.end(), TGivenIdRangeHeapComp()); + auto& [channel, range] = *GivenIdRangeHeap.back(); + const ui64 value = range.Allocate(); + if (range.IsEmpty()) { + GivenIdRangeHeap.pop_back(); + } else { + std::push_heap(GivenIdRangeHeap.begin(), GivenIdRangeHeap.end(), TGivenIdRangeHeapComp()); + } + --NumAvailableItems; + + agent.IssueAllocateIdsIfNeeded(*this); + + return TBlobSeqId::FromSequentalNumber(channel, agent.BlobDepotGeneration, value); + } + + std::pair<TLogoBlobID, ui32> TBlobDepotAgent::TChannelKind::MakeBlobId(TBlobDepotAgent& agent, + const TBlobSeqId& blobSeqId, EBlobType type, ui32 part, ui32 size) const { + auto id = blobSeqId.MakeBlobId(agent.TabletId, type, part, size); + const auto [channel, groupId] = ChannelGroups[ChannelToIndex[blobSeqId.Channel]]; + Y_VERIFY_DEBUG(channel == blobSeqId.Channel); + return {id, groupId}; + } + + void TBlobDepotAgent::TChannelKind::Trim(ui8 channel, ui32 generation, ui32 invalidatedStep) { + GivenIdRangePerChannel[channel].Trim(channel, generation, invalidatedStep); + RebuildHeap(); + } + + void TBlobDepotAgent::TChannelKind::RebuildHeap() { + GivenIdRangeHeap.clear(); + for (auto& kv : GivenIdRangePerChannel) { + GivenIdRangeHeap.push_back(&kv); + } + std::make_heap(GivenIdRangeHeap.begin(), GivenIdRangeHeap.end(), TGivenIdRangeHeapComp()); + } + + void TBlobDepotAgent::TChannelKind::EnqueueQueryWaitingForId(TQuery *query) { + QueriesWaitingForId.PushBack(query); + } + + void TBlobDepotAgent::TChannelKind::ProcessQueriesWaitingForId() { + TIntrusiveList<TQuery, TPendingId> temp; + temp.Swap(QueriesWaitingForId); + for (TQuery& query : temp) { + query.OnIdAllocated(); + } + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/comm.cpp b/ydb/core/blob_depot/agent/comm.cpp index 8aa8edf9dc..6359c4e604 100644 --- a/ydb/core/blob_depot/agent/comm.cpp +++ b/ydb/core/blob_depot/agent/comm.cpp @@ -3,12 +3,12 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::Handle(TEvTabletPipe::TEvClientConnected::TPtr ev) { - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA02, "TEvClientConnected", (VirtualGroupId, VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA03, "TEvClientConnected", (VirtualGroupId, VirtualGroupId), (Msg, ev->Get()->ToString())); } void TBlobDepotAgent::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev) { - STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA03, "TEvClientDestroyed", (VirtualGroupId, VirtualGroupId), + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA04, "TEvClientDestroyed", (VirtualGroupId, VirtualGroupId), (Msg, ev->Get()->ToString())); PipeId = {}; OnDisconnect(); @@ -18,14 +18,14 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::ConnectToBlobDepot() { PipeId = Register(NTabletPipe::CreateClient(SelfId(), TabletId, NTabletPipe::TClientRetryPolicy::WithRetries())); const ui64 id = NextRequestId++; - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA04, "ConnectToBlobDepot", (VirtualGroupId, VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA05, "ConnectToBlobDepot", (VirtualGroupId, VirtualGroupId), (PipeId, PipeId), (RequestId, id)); - NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobDepot::TEvRegisterAgent(VirtualGroupId), id); + NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobDepot::TEvRegisterAgent(VirtualGroupId, AgentInstanceId), id); RegisterRequest(id, this, nullptr, {}, true); } void TBlobDepotAgent::Handle(TRequestContext::TPtr /*context*/, NKikimrBlobDepot::TEvRegisterAgentResult& msg) { - STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA04, "TEvRegisterAgentResult", (VirtualGroupId, VirtualGroupId), + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA06, "TEvRegisterAgentResult", (VirtualGroupId, VirtualGroupId), (Msg, msg)); Registered = true; BlobDepotGeneration = msg.GetGeneration(); @@ -35,6 +35,8 @@ namespace NKikimr::NBlobDepot { vanishedKinds.insert(kind); } + ChannelToKind.clear(); + for (const auto& ch : msg.GetChannelKinds()) { const NKikimrBlobDepot::TChannelKind::E kind = ch.GetChannelKind(); vanishedKinds.erase(kind); @@ -50,24 +52,25 @@ namespace NKikimr::NBlobDepot { const ui32 groupId = channelGroup.GetGroupId(); v.ChannelToIndex[channel] = v.ChannelGroups.size(); v.ChannelGroups.emplace_back(channel, groupId); + ChannelToKind[channel] = &v; } IssueAllocateIdsIfNeeded(v); } for (const NKikimrBlobDepot::TChannelKind::E kind : vanishedKinds) { - STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA05, "kind vanished", (VirtualGroupId, VirtualGroupId), (Kind, kind)); + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA07, "kind vanished", (VirtualGroupId, VirtualGroupId), (Kind, kind)); ChannelKinds.erase(kind); } } void TBlobDepotAgent::IssueAllocateIdsIfNeeded(TChannelKind& kind) { - if (!kind.IdAllocInFlight && kind.GivenIdRange.GetNumAvailableItems() < 100 && PipeId) { + if (!kind.IdAllocInFlight && kind.GetNumAvailableItems() < 100 && PipeId) { const ui64 id = NextRequestId++; - STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA05, "IssueAllocateIdsIfNeeded", (VirtualGroupId, VirtualGroupId), + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA08, "IssueAllocateIdsIfNeeded", (VirtualGroupId, VirtualGroupId), (ChannelKind, NKikimrBlobDepot::TChannelKind::E_Name(kind.Kind)), - (IdAllocInFlight, kind.IdAllocInFlight), (NumAvailableItems, kind.GivenIdRange.GetNumAvailableItems()), - (PreallocatedIdCount, kind.PreallocatedIdCount), (RequestId, id)); + (IdAllocInFlight, kind.IdAllocInFlight), (NumAvailableItems, kind.GetNumAvailableItems()), + (RequestId, id)); NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobDepot::TEvAllocateIds(kind.Kind, 100), id); RegisterRequest(id, this, std::make_shared<TAllocateIdsContext>(kind.Kind), {}, true); kind.IdAllocInFlight = true; @@ -75,7 +78,7 @@ namespace NKikimr::NBlobDepot { } void TBlobDepotAgent::Handle(TRequestContext::TPtr context, NKikimrBlobDepot::TEvAllocateIdsResult& msg) { - STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA06, "TEvAllocateIdsResult", (VirtualGroupId, VirtualGroupId), + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA09, "TEvAllocateIdsResult", (VirtualGroupId, VirtualGroupId), (Msg, msg)); auto& allocateIdsContext = context->Obtain<TAllocateIdsContext>(); @@ -91,15 +94,12 @@ namespace NKikimr::NBlobDepot { Y_VERIFY(msg.GetGeneration() == BlobDepotGeneration); if (msg.HasGivenIdRange()) { - TGivenIdRange range(msg.GetGivenIdRange()); - kind.GivenIdRange.Join(std::move(range)); - kind.ProcessQueriesWaitingForId(); - IssueAllocateIdsIfNeeded(kind); + kind.IssueGivenIdRange(msg.GetGivenIdRange()); } } void TBlobDepotAgent::OnDisconnect() { - STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA07, "OnDisconnect", (VirtualGroupId, VirtualGroupId)); + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA10, "OnDisconnect", (VirtualGroupId, VirtualGroupId)); for (auto& [id, request] : std::exchange(TabletRequestInFlight, {})) { request.Sender->OnRequestComplete(id, TTabletDisconnected{}); @@ -144,16 +144,33 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::Issue(std::unique_ptr<IEventBase> ev, TRequestSender *sender, TRequestContext::TPtr context) { const ui64 id = NextRequestId++; - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA08, "Issue", (VirtualGroupId, VirtualGroupId), (Id, id), (Msg, ev->ToString())); + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA11, "Issue", (VirtualGroupId, VirtualGroupId), (Id, id), (Msg, ev->ToString())); NTabletPipe::SendData(SelfId(), PipeId, ev.release(), id); RegisterRequest(id, sender, std::move(context), {}, true); } void TBlobDepotAgent::Handle(TEvBlobDepot::TEvPushNotify::TPtr ev) { + auto [response, record] = TEvBlobDepot::MakeResponseFor(*ev, SelfId()); + auto& msg = ev->Get()->Record; OnBlockedTablets(msg.GetBlockedTablets()); - auto [response, _] = TEvBlobDepot::MakeResponseFor(*ev, SelfId()); + for (const auto& item : msg.GetInvalidatedSteps()) { + const ui8 channel = item.GetChannel(); + Y_VERIFY(item.GetGeneration() == BlobDepotGeneration); + const auto it = ChannelToKind.find(channel); + Y_VERIFY(it != ChannelToKind.end()); + TChannelKind& kind = *it->second; + kind.Trim(channel, item.GetGeneration(), item.GetInvalidatedStep()); + + // report writes in flight that are trimmed + const TBlobSeqId first{channel, item.GetGeneration(), 0, 0}; + const TBlobSeqId last{channel, item.GetGeneration(), item.GetInvalidatedStep(), Max<ui32>()}; + for (auto it = kind.WritesInFlight.lower_bound(first); it != kind.WritesInFlight.end() && *it <= last; ++it) { + it->ToProto(record->AddWritesInFlight()); + } + } + TActivationContext::Send(response.release()); } diff --git a/ydb/core/blob_depot/agent/query.cpp b/ydb/core/blob_depot/agent/query.cpp index 0eb0e862ce..ab4cd8c7d3 100644 --- a/ydb/core/blob_depot/agent/query.cpp +++ b/ydb/core/blob_depot/agent/query.cpp @@ -8,7 +8,7 @@ namespace NKikimr::NBlobDepot { PendingEventQ.emplace_back(ev.Release()); } else { auto *query = CreateQuery(ev); - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA09, "new query", (VirtualGroupId, VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA12, "new query", (VirtualGroupId, VirtualGroupId), (QueryId, query->GetQueryId()), (Name, query->GetName())); if (!TabletId) { query->EndWithError(NKikimrProto::ERROR, "group is in error state"); @@ -30,7 +30,7 @@ namespace NKikimr::NBlobDepot { } void TBlobDepotAgent::TQuery::EndWithError(NKikimrProto::EReplyStatus status, const TString& errorReason) { - STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA10, "query ends with error", (VirtualGroupId, Agent.VirtualGroupId), + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA13, "query ends with error", (VirtualGroupId, Agent.VirtualGroupId), (QueryId, QueryId), (Status, status), (ErrorReason, errorReason)); std::unique_ptr<IEventBase> response; @@ -49,7 +49,7 @@ namespace NKikimr::NBlobDepot { } void TBlobDepotAgent::TQuery::EndWithSuccess(std::unique_ptr<IEventBase> response) { - STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA11, "query ends with success", (VirtualGroupId, Agent.VirtualGroupId), + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA14, "query ends with success", (VirtualGroupId, Agent.VirtualGroupId), (QueryId, QueryId), (Response, response->ToString())); Agent.SelfId().Send(Event->Sender, response.release(), 0, Event->Cookie); delete this; diff --git a/ydb/core/blob_depot/agent/request.cpp b/ydb/core/blob_depot/agent/request.cpp index 8e73045be9..3c7fe68706 100644 --- a/ydb/core/blob_depot/agent/request.cpp +++ b/ydb/core/blob_depot/agent/request.cpp @@ -57,7 +57,7 @@ namespace NKikimr::NBlobDepot { template<typename TEvent> void TBlobDepotAgent::HandleTabletResponse(TAutoPtr<TEventHandle<TEvent>> ev) { - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA12, "HandleTabletResponse", (VirtualGroupId, VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA15, "HandleTabletResponse", (VirtualGroupId, VirtualGroupId), (Id, ev->Cookie), (Type, TypeName<TEvent>())); auto *event = ev->Get(); OnRequestComplete(ev->Cookie, event, TabletRequestInFlight); @@ -73,7 +73,7 @@ namespace NKikimr::NBlobDepot { template<typename TEvent> void TBlobDepotAgent::HandleOtherResponse(TAutoPtr<TEventHandle<TEvent>> ev) { - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA13, "HandleOtherResponse", (VirtualGroupId, VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA16, "HandleOtherResponse", (VirtualGroupId, VirtualGroupId), (Id, ev->Cookie), (Type, TypeName<TEvent>())); auto *event = ev->Get(); OnRequestComplete(ev->Cookie, event, OtherRequestInFlight); diff --git a/ydb/core/blob_depot/agent/storage_discover.cpp b/ydb/core/blob_depot/agent/storage_discover.cpp index 9b5c37b4a7..5214042b83 100644 --- a/ydb/core/blob_depot/agent/storage_discover.cpp +++ b/ydb/core/blob_depot/agent/storage_discover.cpp @@ -70,7 +70,7 @@ namespace NKikimr::NBlobDepot { } void OnUpdateBlock(bool success) override { - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA14, "OnUpdateBlock", (VirtualGroupId, Agent.VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA17, "OnUpdateBlock", (VirtualGroupId, Agent.VirtualGroupId), (QueryId, QueryId), (Success, success)); if (!success) { @@ -89,7 +89,7 @@ namespace NKikimr::NBlobDepot { } void HandleResolveResult(ui64 id, TRequestContext::TPtr context, TEvBlobDepot::TEvResolveResult& msg) { - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA15, "HandleResolveResult", (VirtualGroupId, Agent.VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA18, "HandleResolveResult", (VirtualGroupId, Agent.VirtualGroupId), (QueryId, QueryId), (Msg, msg.Record)); Agent.HandleResolveResult(msg.Record); diff --git a/ydb/core/blob_depot/agent/storage_put.cpp b/ydb/core/blob_depot/agent/storage_put.cpp index 87bbc0603b..fa98c2e584 100644 --- a/ydb/core/blob_depot/agent/storage_put.cpp +++ b/ydb/core/blob_depot/agent/storage_put.cpp @@ -14,6 +14,12 @@ namespace NKikimr::NBlobDepot { public: using TQuery::TQuery; + ~TPutQuery() { + if (BlobSeqId != TBlobSeqId()) { + Agent.ChannelToKind[BlobSeqId.Channel]->WritesInFlight.erase(BlobSeqId); + } + } + void Initiate() override { auto& msg = *Event->Get<TEvBlobStorage::TEvPut>(); @@ -36,13 +42,14 @@ namespace NKikimr::NBlobDepot { auto& kind = it->second; std::optional<TBlobSeqId> blobSeqId = kind.Allocate(Agent); - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA01, "allocated BlobSeqId", (VirtualGroupId, Agent.VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA19, "allocated BlobSeqId", (VirtualGroupId, Agent.VirtualGroupId), (QueryId, GetQueryId()), (BlobSeqId, blobSeqId)); if (!blobSeqId) { return kind.EnqueueQueryWaitingForId(this); } BlobSeqId = *blobSeqId; + kind.WritesInFlight.insert(BlobSeqId); auto& msg = *Event->Get<TEvBlobStorage::TEvPut>(); const ui32 size = msg.Id.BlobSize(); diff --git a/ydb/core/blob_depot/blob_depot.cpp b/ydb/core/blob_depot/blob_depot.cpp index 7995c10aad..bfc5435eb3 100644 --- a/ydb/core/blob_depot/blob_depot.cpp +++ b/ydb/core/blob_depot/blob_depot.cpp @@ -34,6 +34,7 @@ namespace NKikimr::NBlobDepot { hFunc(TEvBlobDepot::TEvCollectGarbage, BarrierServer->Handle); hFunc(TEvBlobStorage::TEvCollectGarbageResult, Data->Handle); + hFunc(TEvBlobDepot::TEvPushNotifyResult, Data->Handle); hFunc(TEvTabletPipe::TEvServerConnected, Handle); hFunc(TEvTabletPipe::TEvServerDisconnected, Handle); diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h index 25374384e7..0fa8e971de 100644 --- a/ydb/core/blob_depot/blob_depot_tablet.h +++ b/ydb/core/blob_depot/blob_depot_tablet.h @@ -23,7 +23,7 @@ namespace NKikimr::NBlobDepot { ~TBlobDepot(); void HandlePoison() { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT09, "HandlePoison", (TabletId, TabletID())); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT19, "HandlePoison", (TabletId, TabletID())); Become(&TThis::StateZombie); Send(Tablet(), new TEvents::TEvPoison); } @@ -36,31 +36,27 @@ namespace NKikimr::NBlobDepot { std::optional<TActorId> ConnectedAgent; ui32 ConnectedNodeId; TInstant ExpirationTimestamp; + std::optional<ui64> AgentInstanceId; - struct TChannelKind { - TGivenIdRange GivenIdRanges; // updated on AllocateIds and when BlobSeqIds are found in any way - }; + THashMap<ui8, TGivenIdRange> GivenIdRanges; - THashMap<NKikimrBlobDepot::TChannelKind::E, TChannelKind> ChannelKinds; + THashMap<ui8, ui32> InvalidatedStepInFlight; + THashMap<ui64, THashMap<ui8, ui32>> InvalidateStepRequests; + ui64 LastRequestId = 0; }; THashMap<TActorId, std::optional<ui32>> PipeServerToNode; THashMap<ui32, TAgent> Agents; // NodeId -> Agent - struct TChannelKind - : NBlobDepot::TChannelKind - { - ui64 NextBlobSeqId = 0; - TGivenIdRange GivenIdRanges; // for all agents, including disconnected ones - }; - THashMap<NKikimrBlobDepot::TChannelKind::E, TChannelKind> ChannelKinds; - std::vector<NKikimrBlobDepot::TChannelKind::E> ChannelToKind; - struct TPerChannelRecord { - std::set<std::tuple<ui32, ui32>> GivenStepIndex; + struct TChannelInfo { + NKikimrBlobDepot::TChannelKind::E ChannelKind; + TChannelKind *KindPtr; + TGivenIdRange GivenIdRanges; // accumulated through all agents + ui64 NextBlobSeqId = 0; }; - THashMap<ui8, TPerChannelRecord> PerChannelRecords; + std::vector<TChannelInfo> Channels; void Handle(TEvTabletPipe::TEvServerConnected::TPtr ev); void Handle(TEvTabletPipe::TEvServerDisconnected::TPtr ev); @@ -70,6 +66,7 @@ namespace NKikimr::NBlobDepot { void Handle(TEvBlobDepot::TEvAllocateIds::TPtr ev); TAgent& GetAgent(const TActorId& pipeServerId); TAgent& GetAgent(ui32 nodeId); + void ResetAgent(TAgent& agent); //////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -80,10 +77,12 @@ namespace NKikimr::NBlobDepot { } void OnActivateExecutor(const TActorContext&) override { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT10, "OnActivateExecutor", (TabletId, TabletID())); - + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT20, "OnActivateExecutor", (TabletId, TabletID())); ExecuteTxInitSchema(); + } + void OnLoadFinished() { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT20, "OnLoadFinished", (TabletId, TabletID())); Become(&TThis::StateWork); for (auto&& ev : std::exchange(InitialEventsQ, {})) { TActivationContext::Send(ev.release()); @@ -91,14 +90,14 @@ namespace NKikimr::NBlobDepot { } void OnDetach(const TActorContext&) override { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT11, "OnDetach", (TabletId, TabletID())); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT21, "OnDetach", (TabletId, TabletID())); // TODO: what does this callback mean PassAway(); } void OnTabletDead(TEvTablet::TEvTabletDead::TPtr& /*ev*/, const TActorContext&) override { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT12, "OnTabletDead", (TabletId, TabletID())); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT22, "OnTabletDead", (TabletId, TabletID())); PassAway(); } diff --git a/ydb/core/blob_depot/blocks.cpp b/ydb/core/blob_depot/blocks.cpp index 489b99045e..f1c8245c4d 100644 --- a/ydb/core/blob_depot/blocks.cpp +++ b/ydb/core/blob_depot/blocks.cpp @@ -98,7 +98,7 @@ namespace NKikimr::NBlobDepot { TAgent& agent = Self->GetAgent(agentId); if (const auto& actorId = agent.ConnectedAgent) { - Send(*actorId, ev.release(), IEventHandle::FlagTrackDelivery, IssuerGuid); + Send(*actorId, ev.release(), 0, IssuerGuid); } NodesWaitingForPushResult.insert(agentId); } @@ -117,28 +117,31 @@ namespace NKikimr::NBlobDepot { } } - void Handle(TEvents::TEvUndelivered::TPtr /*ev*/) { - // can't reach an agent to notify it about blocked generation change -- we can't do anything here - } - void IssueBlocksToStorage() { + THashSet<ui32> processedGroups; for (const auto& [_, kind] : Self->ChannelKinds) { for (const auto& [channel, groupId] : kind.ChannelGroups) { // FIXME: consider previous group generations (because agent can write in obsolete tablet generation) // !!!!!!!!!!! - SendBlock(groupId); - ++BlocksPending; - RetryCount += 2; + if (const auto [it, inserted] = processedGroups.insert(groupId); inserted) { + SendBlock(groupId); + ++BlocksPending; + RetryCount += 2; + } } } } void SendBlock(ui32 groupId) { - SendToBSProxy(SelfId(), groupId, new TEvBlobStorage::TEvBlock(TabletId, BlockedGeneration, - TInstant::Max(), IssuerGuid), groupId); + STLOG(PRI_INFO, BLOB_DEPOT, BDT06, "issing TEvBlock", (TabletId, Self->TabletID()), (BlockedTabletId, + TabletId), (BlockedGeneration, BlockedGeneration), (GroupId, groupId), (IssuerGuid, IssuerGuid)); + SendToBSProxy(SelfId(), groupId, new TEvBlobStorage::TEvBlock(TabletId, BlockedGeneration, TInstant::Max(), + IssuerGuid), groupId); } void Handle(TEvBlobStorage::TEvBlockResult::TPtr ev) { + STLOG(PRI_INFO, BLOB_DEPOT, BDT07, "TEvBlockResult", (TabletId, Self->TabletID()), (Msg, ev->Get()->ToString()), + (BlockedTabletId, TabletId), (BlockedGeneration, BlockedGeneration), (GroupId, ev->Cookie)); switch (ev->Get()->Status) { case NKikimrProto::OK: if (!--BlocksPending) { @@ -148,6 +151,7 @@ namespace NKikimr::NBlobDepot { case NKikimrProto::ALREADY: // race, but this is not possible in current implementation + // ORLY? :) Y_FAIL(); case NKikimrProto::ERROR: @@ -172,7 +176,6 @@ namespace NKikimr::NBlobDepot { STRICT_STFUNC(StateFunc, hFunc(TEvBlobStorage::TEvBlockResult, Handle); hFunc(TEvBlobDepot::TEvPushNotifyResult, Handle); - hFunc(TEvents::TEvUndelivered, Handle); cFunc(TEvents::TSystem::Wakeup, IssueBlocksToStorage); cFunc(TEvents::TSystem::Poison, PassAway); ) diff --git a/ydb/core/blob_depot/blocks.h b/ydb/core/blob_depot/blocks.h index bd3aba3899..244b7c9ae4 100644 --- a/ydb/core/blob_depot/blocks.h +++ b/ydb/core/blob_depot/blocks.h @@ -37,6 +37,13 @@ namespace NKikimr::NBlobDepot { void OnBlockCommitted(ui64 tabletId, ui32 blockedGeneration, ui32 nodeId, std::unique_ptr<IEventHandle> response); void Handle(TEvBlobDepot::TEvBlock::TPtr ev); void Handle(TEvBlobDepot::TEvQueryBlocks::TPtr ev); + + template<typename TCallback> + void Enumerate(TCallback&& callback) const { + for (const auto& [tabletId, block] : Blocks) { + callback(tabletId, block.BlockedGeneration); + } + } }; } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp index f2752fc8ee..43b4d5ceee 100644 --- a/ydb/core/blob_depot/data.cpp +++ b/ydb/core/blob_depot/data.cpp @@ -89,7 +89,7 @@ namespace NKikimr::NBlobDepot { } void TBlobDepot::TData::PutKey(TKey key, TValue&& data) { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT01, "PutKey", (TabletId, Self->TabletID()), (Key, key.ToString(Self->Config)), + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT08, "PutKey", (TabletId, Self->TabletID()), (Key, key.ToString(Self->Config)), (KeepState, NKikimrBlobDepot::EKeepState_Name(data.KeepState))); EnumerateBlobsForValueChain(data.ValueChain, Self->TabletID(), [&](TLogoBlobID id) { @@ -169,23 +169,54 @@ namespace NKikimr::NBlobDepot { void TBlobDepot::TData::HandleTrash() { const ui32 generation = Self->Executor()->Generation(); + THashMap<ui32, std::unique_ptr<TEvBlobDepot::TEvPushNotify>> outbox; for (TRecordsPerChannelGroup& record : RecordsWithTrash) { Y_VERIFY(!record.CollectGarbageRequestInFlight); Y_VERIFY(record.TabletId == Self->TabletID()); Y_VERIFY(!record.Trash.empty()); - ui64 nextGenStep = 0; - - auto& channel = Self->PerChannelRecords[record.Channel]; - if (channel.GivenStepIndex.empty()) { - nextGenStep = GenStep(*--record.Trash.end()); - } else { - const auto& [leastStep, leastIndex] = *channel.GivenStepIndex.begin(); - const TLogoBlobID maxId(record.TabletId, generation, leastStep, record.Channel, 0, 0); - const auto it = record.Trash.lower_bound(maxId); - if (it != record.Trash.begin()) { - nextGenStep = GenStep(*std::prev(it)); + ui64 nextGenStep = GenStep(*--record.Trash.end()); + + Y_VERIFY(record.Channel < Self->Channels.size()); + auto& channel = Self->Channels[record.Channel]; + + if (!channel.GivenIdRanges.IsEmpty()) { + // minimum blob seq id that can still be written by other party + const auto minBlobSeqId = TBlobSeqId::FromSequentalNumber(record.Channel, generation, + channel.GivenIdRanges.GetMinimumValue()); + + // step we are going to invalidate (including blobs with this one) + const ui32 invalidatedStep = ui32(nextGenStep); + + if (minBlobSeqId.Step <= invalidatedStep) { + const TLogoBlobID maxId(record.TabletId, generation, minBlobSeqId.Step, record.Channel, 0, 0); + const auto it = record.Trash.lower_bound(maxId); + if (it != record.Trash.begin()) { + nextGenStep = GenStep(*std::prev(it)); + } else { + nextGenStep = 0; + } + + // issue notifications to agents + for (auto& [agentId, agent] : Self->Agents) { + if (!agent.ConnectedAgent) { + continue; + } + const auto [it, inserted] = agent.InvalidatedStepInFlight.emplace(record.Channel, invalidatedStep); + if (inserted || it->second < invalidatedStep) { + it->second = invalidatedStep; + + auto& ev = outbox[agentId]; + if (!ev) { + ev.reset(new TEvBlobDepot::TEvPushNotify); + } + auto *item = ev->Record.AddInvalidatedSteps(); + item->SetChannel(record.Channel); + item->SetGeneration(generation); + item->SetInvalidatedStep(invalidatedStep); + } + } } } @@ -227,14 +258,15 @@ namespace NKikimr::NBlobDepot { record.TrashInFlight.insert(record.TrashInFlight.end(), record.Trash.begin(), record.Trash.end()); record.NextGenStep = Max(nextGenStep, record.LastConfirmedGenStep); - auto& kind = Self->ChannelKinds[Self->ChannelToKind[record.Channel]]; - const auto blobSeqId = TBlobSeqId::FromBinary(generation, kind, kind.NextBlobSeqId); - Y_VERIFY(record.LastConfirmedGenStep < GenStep(generation, blobSeqId.Step)); - if (GenStep(generation, blobSeqId.Step) <= nextGenStep) { - kind.NextBlobSeqId = TBlobSeqId{record.Channel, generation, ui32(nextGenStep) + 1, 0}.ToBinary(kind); + auto blobSeqId = TBlobSeqId::FromSequentalNumber(record.Channel, generation, channel.NextBlobSeqId); + Y_VERIFY(record.LastConfirmedGenStep < GenStep(blobSeqId)); + if (GenStep(blobSeqId) <= nextGenStep) { + blobSeqId.Step = ui32(nextGenStep) + 1; + blobSeqId.Index = 0; + channel.NextBlobSeqId = blobSeqId.ToSequentialNumber(); } - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT01, "issuing TEvCollectGarbage", (TabletId, Self->TabletID()), + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT09, "issuing TEvCollectGarbage", (TabletId, Self->TabletID()), (Channel, record.Channel), (GroupId, record.GroupId), (Msg, ev->ToString()), (LastConfirmedGenStep, record.LastConfirmedGenStep), (NextGenStep, record.NextGenStep), (TrashInFlight.size, record.TrashInFlight.size())); @@ -243,10 +275,22 @@ namespace NKikimr::NBlobDepot { } RecordsWithTrash.Clear(); + + for (auto& [agentId, ev] : outbox) { + TAgent& agent = Self->GetAgent(agentId); + const ui64 id = ++agent.LastRequestId; + auto& request = agent.InvalidateStepRequests[id]; + for (const auto& item : ev->Record.GetInvalidatedSteps()) { + request[item.GetChannel()] = item.GetInvalidatedStep(); + } + if (const auto& actorId = agent.ConnectedAgent) { + TActivationContext::Send(new IEventHandle(*actorId, Self->SelfId(), ev.release(), 0, id)); + } + } } void TBlobDepot::TData::Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev) { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT01, "TEvCollectGarbageResult", (TabletId, ev->Get()->TabletId), + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT10, "TEvCollectGarbageResult", (TabletId, ev->Get()->TabletId), (Channel, ev->Get()->Channel), (GroupId, ev->Cookie), (Msg, ev->Get()->ToString())); const auto& key = std::make_tuple(ev->Get()->TabletId, ev->Get()->Channel, ev->Cookie); const auto it = RecordsPerChannelGroup.find(key); @@ -267,6 +311,24 @@ namespace NKikimr::NBlobDepot { } } + void TBlobDepot::TData::Handle(TEvBlobDepot::TEvPushNotifyResult::TPtr ev) { + TAgent& agent = Self->GetAgent(ev->Recipient); + const ui32 generation = Self->Executor()->Generation(); + if (const auto it = agent.InvalidateStepRequests.find(ev->Cookie); it != agent.InvalidateStepRequests.end()) { + for (const auto& [channel, invalidatedStep] : it->second) { + Self->Channels[channel].GivenIdRanges.Trim(channel, generation, invalidatedStep); + agent.GivenIdRanges[channel].Trim(channel, generation, invalidatedStep); + } + agent.InvalidateStepRequests.erase(it); + } + for (const auto& item : ev->Get()->Record.GetWritesInFlight()) { + const auto blobSeqId = TBlobSeqId::FromProto(item); + Self->Channels[blobSeqId.Channel].GivenIdRanges.AddPoint(blobSeqId.ToSequentialNumber()); + agent.GivenIdRanges[blobSeqId.Channel].AddPoint(blobSeqId.ToSequentialNumber()); + } + HandleTrash(); + } + void TBlobDepot::TData::OnCommitConfirmedGC(ui8 channel, ui32 groupId) { const auto& key = std::make_tuple(Self->TabletID(), channel, groupId); const auto it = RecordsPerChannelGroup.find(key); diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h index 22515c8273..3d092bd995 100644 --- a/ydb/core/blob_depot/data.h +++ b/ydb/core/blob_depot/data.h @@ -317,6 +317,7 @@ namespace NKikimr::NBlobDepot { void CommitTrash(void *cookie); void HandleTrash(); void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev); + void Handle(TEvBlobDepot::TEvPushNotifyResult::TPtr ev); void OnCommitConfirmedGC(ui8 channel, ui32 groupId); static TString ToValueProto(const TValue& value); @@ -333,7 +334,7 @@ namespace NKikimr::NBlobDepot { for (const auto& [key, record] : RecordsPerChannelGroup) { THashSet<TLogoBlobID> inFlight(record.TrashInFlight.begin(), record.TrashInFlight.end()); for (const TLogoBlobID& id : record.Trash) { - callback(record.TabletId, record.Channel, record.GroupId, id, inFlight.contains(id)); + callback(record.GroupId, id, inFlight.contains(id)); } } } diff --git a/ydb/core/blob_depot/events.h b/ydb/core/blob_depot/events.h index 4532ec72b8..ce20efa2ec 100644 --- a/ydb/core/blob_depot/events.h +++ b/ydb/core/blob_depot/events.h @@ -53,7 +53,7 @@ namespace NKikimr { BLOBDEPOT_EVENT_PB(EvApplyConfig, TxId); BLOBDEPOT_EVENT_PB(EvApplyConfigResult, TabletId, TxId); - BLOBDEPOT_EVENT_PB(EvRegisterAgent, VirtualGroupId); + BLOBDEPOT_EVENT_PB(EvRegisterAgent, VirtualGroupId, AgentInstanceId); BLOBDEPOT_EVENT_PB_NO_ARGS(EvRegisterAgentResult); BLOBDEPOT_EVENT_PB(EvAllocateIds, ChannelKind, Count); BLOBDEPOT_EVENT_PB(EvAllocateIdsResult, ChannelKind, Generation); diff --git a/ydb/core/blob_depot/garbage_collection.cpp b/ydb/core/blob_depot/garbage_collection.cpp index 6d770624dc..7e69dfa76f 100644 --- a/ydb/core/blob_depot/garbage_collection.cpp +++ b/ydb/core/blob_depot/garbage_collection.cpp @@ -106,7 +106,7 @@ namespace NKikimr::NBlobDepot { auto processKey = [&](const TData::TKey& key, const TData::TValue& value) { if (value.KeepState != NKikimrBlobDepot::EKeepState::Keep || hard) { const TLogoBlobID id(key.GetBlobId()); - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT01, "DeleteKey", (TabletId, Self->TabletID()), (BlobId, id)); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT11, "DeleteKey", (TabletId, Self->TabletID()), (BlobId, id)); db.Table<Schema::Data>().Key(key.MakeBinaryKey()).Delete(); auto updateTrash = [&](TLogoBlobID id) { db.Table<Schema::Trash>().Key(TString(id.AsBinaryString())).Update(); diff --git a/ydb/core/blob_depot/given_id_range.cpp b/ydb/core/blob_depot/given_id_range.cpp index 2d78ac0970..9186465375 100644 --- a/ydb/core/blob_depot/given_id_range.cpp +++ b/ydb/core/blob_depot/given_id_range.cpp @@ -2,83 +2,99 @@ namespace NKikimr::NBlobDepot { - TGivenIdRange::TGivenIdRange(const NKikimrBlobDepot::TGivenIdRange& proto) { - for (const auto& range : proto.GetRanges()) { - const ui64 begin = range.GetBegin(); - const ui32 len = range.GetLen(); - TRange& r = InsertNewRange(begin, begin + len); - ui64 *chunks = const_cast<ui64*>(r.Bits.GetChunks()); - const ui32 count = r.Bits.GetChunkCount(); - Y_VERIFY(range.OffsetsSize() == range.BitMasksSize()); - for (ui32 i = 0; i < range.OffsetsSize(); ++i) { - const ui32 offset = range.GetOffsets(i); - const ui64 bitMask = range.GetBitMasks(i); - NumAvailableItems += PopCount(bitMask) - 64; - Y_VERIFY(offset < count); - chunks[offset] = bitMask; - } + void TGivenIdRange::IssueNewRange(ui64 begin, ui64 end) { + Y_VERIFY(begin < end); + const auto [it, inserted] = Ranges.emplace(begin, end); + Y_VERIFY(inserted); + if (it != Ranges.begin()) { + const auto& prev = *std::prev(it); + Y_VERIFY(prev.End <= begin); + } + if (std::next(it) != Ranges.end()) { + const auto& next = *std::next(it); + Y_VERIFY(end <= next.Begin); } + NumAvailableItems += end - begin; } - TGivenIdRange::TRange& TGivenIdRange::InsertNewRange(ui64 begin, ui64 end) { - const ui64 len = end - begin; - Y_VERIFY(len); - const auto it = Ranges.lower_bound(begin); - if (it != Ranges.begin()) { - const auto& [prevBegin, prev] = *std::prev(it); - Y_VERIFY(prevBegin + prev.Len <= begin); - } - if (it != Ranges.end()) { - Y_VERIFY(end <= it->first); + void TGivenIdRange::AddPoint(ui64 value) { + IssueNewRange(value, value + 1); + } + + void TGivenIdRange::RemovePoint(ui64 value) { + const auto it = Ranges.upper_bound(value); + Y_VERIFY(it != Ranges.begin()); + auto& range = const_cast<TRange&>(*std::prev(it)); + Y_VERIFY(range.Begin <= value && value < range.End); + Y_VERIFY(range.Bits[value - range.Begin]); + range.Bits.Reset(value - range.Begin); + if (range.Bits.Empty()) { + Ranges.erase(it); } - NumAvailableItems += len; - const auto newIt = Ranges.emplace_hint(it, begin, len); - return newIt->second; + --NumAvailableItems; } - void TGivenIdRange::IssueNewRange(ui64 begin, ui64 end) { - InsertNewRange(begin, end); + bool TGivenIdRange::IsEmpty() const { + return Ranges.empty(); + } + + ui32 TGivenIdRange::GetNumAvailableItems() const { + return NumAvailableItems; } - void TGivenIdRange::Join(TGivenIdRange&& other) { - Ranges.merge(std::move(other.Ranges)); - if (Ranges.size() > 1) { - for (auto it = std::next(Ranges.begin()); it != Ranges.end(); ++it) { - const auto& [prevBegin, prev] = *std::prev(it); - const auto& [begin, range] = *it; - Y_VERIFY(prevBegin + prev.Len <= begin); - } + ui64 TGivenIdRange::GetMinimumValue() const { + Y_VERIFY(!Ranges.empty()); + const auto& range = *Ranges.begin(); + size_t offset = range.Bits.FirstNonZeroBit(); + Y_VERIFY(offset != range.Bits.Size()); + return range.Begin + offset; + } + + ui64 TGivenIdRange::Allocate() { + Y_VERIFY(!Ranges.empty()); + const auto it = Ranges.begin(); + auto& range = const_cast<TRange&>(*it); + size_t offset = range.Bits.FirstNonZeroBit(); + Y_VERIFY(offset != range.Bits.Size()); + range.Bits.Reset(offset); + const ui64 value = range.Begin + offset; + if (range.Bits.Empty()) { + Ranges.erase(it); } - NumAvailableItems += std::exchange(other.NumAvailableItems, 0); + --NumAvailableItems; + return value; } - void TGivenIdRange::ToProto(NKikimrBlobDepot::TGivenIdRange *proto) { - for (const auto& [begin, range] : Ranges) { - auto *r = proto->AddRanges(); - r->SetBegin(begin); - r->SetLen(range.Len); - const ui64 *chunks = range.Bits.GetChunks(); - const ui32 count = range.Bits.GetChunkCount(); - for (ui32 i = 0; i < count; ++i) { - if (chunks[i] != ~ui64(0)) { - r->AddOffsets(i); - r->AddBitMasks(chunks[i]); + void TGivenIdRange::Trim(ui8 channel, ui32 generation, ui32 invalidatedStep) { + const ui64 validSince = 1 + TBlobSeqId{channel, generation, invalidatedStep, TBlobSeqId::MaxIndex}.ToSequentialNumber(); + + while (!Ranges.empty()) { + const auto it = Ranges.begin(); + auto& range = const_cast<TRange&>(*it); + if (range.End <= validSince) { + NumAvailableItems -= range.Bits.Count(); + Ranges.erase(it); + } else if (range.Begin < validSince) { + const ui32 len = validSince - range.Begin; + for (ui32 i = 0; i < len; ++i) { + NumAvailableItems -= range.Bits[i]; } + range.Bits.Reset(0, len); + } else { + break; } } } - void TGivenIdRange::RemovePoint(ui64 value) { - auto it = Ranges.upper_bound(value); - Y_VERIFY(it != Ranges.begin()); - auto& [begin, range] = *--it; - Y_VERIFY(begin <= value); - const ui64 offset = value - begin; - Y_VERIFY(offset < range.Len); - Y_VERIFY(range.Bits[offset]); - range.Bits.Reset(offset); - --NumAvailableItems; - // FIXME: reduce range (maybe)? + void TGivenIdRange::Subtract(const TGivenIdRange& other) { + for (const TRange& range : other.Ranges) { + const auto it = Ranges.find(range.Begin); + Y_VERIFY(it != Ranges.end()); + Y_VERIFY(range.End == it->End); + Y_VERIFY(range.Bits == it->Bits); + NumAvailableItems -= range.Bits.Count(); + Ranges.erase(it); + } } void TGivenIdRange::Output(IOutputStream& s) const { @@ -87,13 +103,13 @@ namespace NKikimr::NBlobDepot { if (it != Ranges.begin()) { s << " "; } - const auto& [begin, range] = *it; - s << begin << ":" << range.Len << "["; - for (ui32 i = 0; i < range.Len; ++i) { - s << static_cast<int>(range.Bits[i]); + s << it->Begin << "-" << it->End << "["; + for (ui32 i = 0, count = it->End - it->Begin; i < count; ++i) { + s << int(it->Bits[i]); } s << "]"; } + s << "}"; } TString TGivenIdRange::ToString() const { @@ -102,26 +118,4 @@ namespace NKikimr::NBlobDepot { return s.Str(); } - bool TGivenIdRange::IsEmpty() const { - return Ranges.empty(); - } - - ui32 TGivenIdRange::GetNumAvailableItems() const { - return NumAvailableItems; - } - - ui64 TGivenIdRange::Allocate() { - Y_VERIFY(!Ranges.empty()); - const auto it = Ranges.begin(); - auto& [begin, range] = *it; - const size_t offset = range.Bits.FirstNonZeroBit(); - const ui64 res = begin + offset; - range.Bits.Reset(offset); - if (range.Bits.NextNonZeroBit(offset) == range.Bits.Size()) { - Ranges.erase(it); - } - --NumAvailableItems; - return res; - } - } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/mon_main.cpp b/ydb/core/blob_depot/mon_main.cpp index af2c79405f..f32db51082 100644 --- a/ydb/core/blob_depot/mon_main.cpp +++ b/ydb/core/blob_depot/mon_main.cpp @@ -1,6 +1,7 @@ #include "blob_depot_tablet.h" #include "data.h" #include "garbage_collection.h" +#include "blocks.h" namespace NKikimr::NBlobDepot { @@ -173,16 +174,12 @@ namespace NKikimr::NBlobDepot { void RenderTrashTable(bool header) { HTML(Stream) { if (header) { - TABLEH() { Stream << "tablet id"; } - TABLEH() { Stream << "channel"; } TABLEH() { Stream << "group id"; } TABLEH() { Stream << "blob id"; } TABLEH() { Stream << "in flight"; } } else { - Self->Data->EnumerateTrash([&](ui64 tabletId, ui8 channel, ui32 groupId, TLogoBlobID blobId, bool inFlight) { + Self->Data->EnumerateTrash([&](ui32 groupId, TLogoBlobID blobId, bool inFlight) { TABLER() { - TABLED() { Stream << tabletId; } - TABLED() { Stream << int(channel); } TABLED() { Stream << groupId; } TABLED() { Stream << blobId; } TABLED() { Stream << (inFlight ? "*" : ""); } @@ -218,7 +215,15 @@ namespace NKikimr::NBlobDepot { void RenderBlocksTable(bool header) { HTML(Stream) { if (header) { + TABLEH() { Stream << "tablet id"; } + TABLEH() { Stream << "blocked generation"; } } else { + Self->BlocksManager->Enumerate([&](ui64 tabletId, ui32 blockedGeneration) { + TABLER() { + TABLED() { Stream << tabletId; } + TABLED() { Stream << blockedGeneration; } + } + }); } } } diff --git a/ydb/core/blob_depot/op_apply_config.cpp b/ydb/core/blob_depot/op_apply_config.cpp index fdfb9c70d7..61402e4e04 100644 --- a/ydb/core/blob_depot/op_apply_config.cpp +++ b/ydb/core/blob_depot/op_apply_config.cpp @@ -4,7 +4,7 @@ namespace NKikimr::NBlobDepot { void TBlobDepot::Handle(TEvBlobDepot::TEvApplyConfig::TPtr ev) { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT06, "TEvApplyConfig", (TabletId, TabletID()), (Msg, ev->Get()->Record)); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT12, "TEvApplyConfig", (TabletId, TabletID()), (Msg, ev->Get()->Record)); class TTxApplyConfig : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { std::unique_ptr<IEventHandle> Response; @@ -25,26 +25,30 @@ namespace NKikimr::NBlobDepot { } bool Execute(TTransactionContext& txc, const TActorContext&) override { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT13, "TTxApplyConfig::Execute", (TabletId, Self->TabletID())); + NIceDb::TNiceDb db(txc.DB); auto table = db.Table<Schema::Config>().Key(Schema::Config::Key::Value).Select(); if (!table.IsReady()) { return false; - } else if (table.IsValid()) { - WasConfigured = table.HaveValue<Schema::Config::ConfigProtobuf>(); - } else { - WasConfigured = false; } + WasConfigured = table.IsValid() && table.HaveValue<Schema::Config::ConfigProtobuf>(); db.Table<Schema::Config>().Key(Schema::Config::Key::Value).Update( NIceDb::TUpdate<Schema::Config::ConfigProtobuf>(ConfigProtobuf) ); + + const bool success = Self->Config.ParseFromString(ConfigProtobuf); + Y_VERIFY(success); + return true; } void Complete(const TActorContext&) override { - const bool success = Self->Config.ParseFromString(ConfigProtobuf); - Y_VERIFY(success); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT14, "TTxApplyConfig::Complete", (TabletId, Self->TabletID()), + (WasConfigured, WasConfigured)); + if (!WasConfigured) { Self->InitChannelKinds(); } diff --git a/ydb/core/blob_depot/op_commit_blob_seq.cpp b/ydb/core/blob_depot/op_commit_blob_seq.cpp index 67182f2236..1c65c6a19c 100644 --- a/ydb/core/blob_depot/op_commit_blob_seq.cpp +++ b/ydb/core/blob_depot/op_commit_blob_seq.cpp @@ -36,9 +36,8 @@ namespace NKikimr::NBlobDepot { auto *locator = chain->MutableLocator(); locator->CopyFrom(item.GetBlobLocator()); - if (!MarkGivenIdCommitted(agent, TBlobSeqId::FromProto(locator->GetBlobSeqId()), responseItem)) { - continue; - } + MarkGivenIdCommitted(agent, TBlobSeqId::FromProto(locator->GetBlobSeqId())); + if (!CheckKeyAgainstBarrier(item.GetKey(), responseItem)) { continue; } @@ -60,28 +59,15 @@ namespace NKikimr::NBlobDepot { return true; } - bool MarkGivenIdCommitted(TAgent& agent, const TBlobSeqId& blobSeqId, - NKikimrBlobDepot::TEvCommitBlobSeqResult::TItem *responseItem) { - const NKikimrBlobDepot::TChannelKind::E kind = blobSeqId.Channel < Self->ChannelToKind.size() - ? Self->ChannelToKind[blobSeqId.Channel] : NKikimrBlobDepot::TChannelKind::System; - if (kind == NKikimrBlobDepot::TChannelKind::System) { - responseItem->SetStatus(NKikimrProto::ERROR); - responseItem->SetErrorReason("incorrect Channel for blob"); - return false; - } + void MarkGivenIdCommitted(TAgent& agent, const TBlobSeqId& blobSeqId) { + Y_VERIFY(blobSeqId.Generation == Self->Executor()->Generation()); + Y_VERIFY(blobSeqId.Channel < Self->Channels.size()); - const auto channelKindIt = Self->ChannelKinds.find(kind); - Y_VERIFY(channelKindIt != Self->ChannelKinds.end()); - auto& ck = channelKindIt->second; + auto& channel = Self->Channels[blobSeqId.Channel]; - const ui64 value = blobSeqId.ToBinary(ck); - agent.ChannelKinds[kind].GivenIdRanges.RemovePoint(value); - ck.GivenIdRanges.RemovePoint(value); - - auto& channel = Self->PerChannelRecords[blobSeqId.Channel]; - channel.GivenStepIndex.erase(std::make_pair(blobSeqId.Step, blobSeqId.Index)); - - return true; + const ui64 value = blobSeqId.ToSequentialNumber(); + agent.GivenIdRanges[blobSeqId.Channel].RemovePoint(value); + channel.GivenIdRanges.RemovePoint(value); } bool CheckKeyAgainstBarrier(const TString& key, NKikimrBlobDepot::TEvCommitBlobSeqResult::TItem *responseItem) { diff --git a/ydb/core/blob_depot/op_load.cpp b/ydb/core/blob_depot/op_load.cpp index e48ce2802e..d2db397b99 100644 --- a/ydb/core/blob_depot/op_load.cpp +++ b/ydb/core/blob_depot/op_load.cpp @@ -16,6 +16,8 @@ namespace NKikimr::NBlobDepot { {} bool Execute(TTransactionContext& txc, const TActorContext&) override { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT15, "TTxLoad::Execute", (TabletId, Self->TabletID())); + NIceDb::TNiceDb db(txc.DB); if (!Precharge(db)) { @@ -29,9 +31,8 @@ namespace NKikimr::NBlobDepot { return false; } else if (table.IsValid()) { if (table.HaveValue<Schema::Config::ConfigProtobuf>()) { - const bool success = Self->Config.ParseFromString(table.GetValue<Schema::Config::ConfigProtobuf>()); - Y_VERIFY(success); - Configured = true; + Configured = Self->Config.ParseFromString(table.GetValue<Schema::Config::ConfigProtobuf>()); + Y_VERIFY(Configured); } } } @@ -136,10 +137,15 @@ namespace NKikimr::NBlobDepot { } void Complete(const TActorContext&) override { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT16, "TTxLoad::Complete", (TabletId, Self->TabletID()), + (Configured, Configured)); + if (Configured) { Self->InitChannelKinds(); Self->Data->HandleTrash(); } + + Self->OnLoadFinished(); } }; diff --git a/ydb/core/blob_depot/op_resolve.cpp b/ydb/core/blob_depot/op_resolve.cpp index 0b792f7c71..87fddfaae0 100644 --- a/ydb/core/blob_depot/op_resolve.cpp +++ b/ydb/core/blob_depot/op_resolve.cpp @@ -4,7 +4,7 @@ namespace NKikimr::NBlobDepot { void TBlobDepot::Handle(TEvBlobDepot::TEvResolve::TPtr ev) { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT07, "TEvResolve", (TabletId, TabletID()), (Msg, ev->Get()->ToString()), + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT17, "TEvResolve", (TabletId, TabletID()), (Msg, ev->Get()->ToString()), (Sender, ev->Sender), (Recipient, ev->Recipient), (Cookie, ev->Cookie)); // collect records if needed @@ -16,7 +16,7 @@ namespace NKikimr::NBlobDepot { response->Record.SetStatus(NKikimrProto::OVERRUN); } - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT08, "sending TEvResolveResult", (TabletId, TabletID()), (Msg, response->Record)); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT18, "sending TEvResolveResult", (TabletId, TabletID()), (Msg, response->Record)); auto handle = std::make_unique<IEventHandle>(ev->Sender, SelfId(), response.release(), 0, ev->Cookie); if (ev->InterconnectSession) { diff --git a/ydb/core/blob_depot/types.h b/ydb/core/blob_depot/types.h index 763289ec8f..147c61a6f6 100644 --- a/ydb/core/blob_depot/types.h +++ b/ydb/core/blob_depot/types.h @@ -37,8 +37,13 @@ namespace NKikimr::NBlobDepot { ui32 Index = 0; auto AsTuple() const { return std::make_tuple(Channel, Generation, Step, Index); } + friend bool operator ==(const TBlobSeqId& x, const TBlobSeqId& y) { return x.AsTuple() == y.AsTuple(); } friend bool operator !=(const TBlobSeqId& x, const TBlobSeqId& y) { return x.AsTuple() != y.AsTuple(); } + friend bool operator < (const TBlobSeqId& x, const TBlobSeqId& y) { return x.AsTuple() < y.AsTuple(); } + friend bool operator <=(const TBlobSeqId& x, const TBlobSeqId& y) { return x.AsTuple() <= y.AsTuple(); } + friend bool operator > (const TBlobSeqId& x, const TBlobSeqId& y) { return x.AsTuple() > y.AsTuple(); } + friend bool operator >=(const TBlobSeqId& x, const TBlobSeqId& y) { return x.AsTuple() >= y.AsTuple(); } TString ToString() const { return TStringBuilder() << "{" << Channel << ":" << Generation << ":" << Step << ":" << Index << "}"; @@ -48,23 +53,12 @@ namespace NKikimr::NBlobDepot { return *this != TBlobSeqId(); } - ui64 ToBinary(const TChannelKind& kind) const { - Y_VERIFY_DEBUG(Index <= MaxIndex); - Y_VERIFY(Channel < kind.ChannelToIndex.size()); - return (static_cast<ui64>(Step) << IndexBits | Index) * kind.ChannelGroups.size() + kind.ChannelToIndex[Channel]; + ui64 ToSequentialNumber() const { + return ui64(Step) << IndexBits | Index; } - static TBlobSeqId FromBinary(ui32 generation, const TChannelKind& kind, ui64 value) { - static_assert(sizeof(long long) >= sizeof(ui64)); - Y_VERIFY(!kind.ChannelGroups.empty()); - auto res = std::lldiv(value, kind.ChannelGroups.size()); - - return TBlobSeqId{ - .Channel = kind.ChannelGroups[res.rem].first, - .Generation = generation, - .Step = static_cast<ui32>(res.quot >> IndexBits), - .Index = static_cast<ui32>(res.quot) & MaxIndex - }; + static TBlobSeqId FromSequentalNumber(ui32 channel, ui32 generation, ui64 value) { + return {channel, generation, ui32(value >> IndexBits), ui32(value & MaxIndex)}; } static TBlobSeqId FromProto(const NKikimrBlobDepot::TBlobSeqId& proto) { @@ -105,38 +99,44 @@ namespace NKikimr::NBlobDepot { class TGivenIdRange { struct TRange { - ui32 Len; + const ui64 Begin; + const ui64 End; TDynBitMap Bits; - TRange(ui32 len) - : Len(len) + TRange(ui64 begin, ui64 end) + : Begin(begin) + , End(end) { - Bits.Set(0, len); + Bits.Set(0, end - begin); } + + struct TCompare { + bool operator ()(const TRange& x, const TRange& y) const { return x.Begin < y.Begin; } + bool operator ()(const TRange& x, ui64 y) const { return x.Begin < y; } + bool operator ()(ui64 x, const TRange& y) const { return x < y.Begin; } + using is_transparent = void; + }; }; - std::map<ui64, TRange> Ranges; // range.begin -> range + + std::set<TRange, TRange::TCompare> Ranges; ui32 NumAvailableItems = 0; public: - TGivenIdRange() = default; - TGivenIdRange(const NKikimrBlobDepot::TGivenIdRange& proto); - - void ToProto(NKikimrBlobDepot::TGivenIdRange *proto); - - void Join(TGivenIdRange&& other); - void IssueNewRange(ui64 begin, ui64 end); + void AddPoint(ui64 value); void RemovePoint(ui64 value); bool IsEmpty() const; ui32 GetNumAvailableItems() const; + ui64 GetMinimumValue() const; ui64 Allocate(); + void Subtract(const TGivenIdRange& other); + + void Trim(ui8 channel, ui32 generation, ui32 invalidatedStep); + void Output(IOutputStream& s) const; TString ToString() const; - - private: - TRange& InsertNewRange(ui64 begin, ui64 len); }; using TValueChain = NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TValueChain>; @@ -164,4 +164,8 @@ namespace NKikimr::NBlobDepot { return GenStep(id.Generation(), id.Step()); } + inline ui64 GenStep(TBlobSeqId id) { + return GenStep(id.Generation, id.Step); + } + } // NKikimr::NBlobDepot diff --git a/ydb/core/mind/bscontroller/bsc.cpp b/ydb/core/mind/bscontroller/bsc.cpp index b6bd67b278..c1455f7dbb 100644 --- a/ydb/core/mind/bscontroller/bsc.cpp +++ b/ydb/core/mind/bscontroller/bsc.cpp @@ -57,6 +57,15 @@ TBlobStorageController::TVSlotInfo::TVSlotInfo(TVSlotId vSlotId, TPDiskInfo *pdi } void TBlobStorageController::TGroupInfo::CalculateGroupStatus() { + if (VirtualGroupState) { + if (VirtualGroupState == NKikimrBlobStorage::EVirtualGroupState::WORKING) { + Status = {NKikimrBlobStorage::TGroupStatus::FULL, NKikimrBlobStorage::TGroupStatus::FULL}; + } else { + Status = {NKikimrBlobStorage::TGroupStatus::DISINTEGRATED, NKikimrBlobStorage::TGroupStatus::DISINTEGRATED}; + } + return; + } + TBlobStorageGroupInfo::TGroupVDisks failed(Topology.get()); TBlobStorageGroupInfo::TGroupVDisks failedByPDisk(Topology.get()); for (const TVSlotInfo *slot : VDisksInGroup) { diff --git a/ydb/core/mind/bscontroller/config_fit_groups.cpp b/ydb/core/mind/bscontroller/config_fit_groups.cpp index e3a97b5414..e86d44bae7 100644 --- a/ydb/core/mind/bscontroller/config_fit_groups.cpp +++ b/ydb/core/mind/bscontroller/config_fit_groups.cpp @@ -131,7 +131,6 @@ namespace NKikimr { Geometry.GetNumFailDomainsPerFailRealm(), Geometry.GetNumVDisksPerFailDomain()); // bind group to storage pool - groupInfo->StoragePoolId = StoragePoolId; State.StoragePoolGroups.Unshare().emplace(StoragePoolId, groupId); const TGroupSpecies species = groupInfo->GetGroupSpecies(); diff --git a/ydb/core/mind/bscontroller/impl.h b/ydb/core/mind/bscontroller/impl.h index 0e9bcac498..64514b032d 100644 --- a/ydb/core/mind/bscontroller/impl.h +++ b/ydb/core/mind/bscontroller/impl.h @@ -694,9 +694,21 @@ public: } void FillInGroupParameters(NKikimrBlobStorage::TEvControllerSelectGroupsResult::TGroupParameters *params) const { - FillInResources(params->MutableAssuredResources(), true); - FillInResources(params->MutableCurrentResources(), false); - FillInVDiskResources(params); + if (VirtualGroupState) { + for (auto *p : {params->MutableAssuredResources(), params->MutableCurrentResources()}) { + p->SetSpace(1'000'000'000'000); + p->SetIOPS(1'000); + p->SetReadThroughput(100'000'000); + p->SetWriteThroughput(100'000'000); + } + params->SetAllocatedSize(1'000'000'000'000); + params->SetAvailableSize(1'000'000'000'000); + params->SetSpaceColor(NKikimrBlobStorage::TPDiskSpaceColor::GREEN); + } else { + FillInResources(params->MutableAssuredResources(), true); + FillInResources(params->MutableCurrentResources(), false); + FillInVDiskResources(params); + } } void FillInResources(NKikimrBlobStorage::TEvControllerSelectGroupsResult::TGroupParameters::TResources *pb, bool countMaxSlots) const { diff --git a/ydb/core/mind/bscontroller/virtual_group.cpp b/ydb/core/mind/bscontroller/virtual_group.cpp index f85bf18e7e..a905aafcb0 100644 --- a/ydb/core/mind/bscontroller/virtual_group.cpp +++ b/ydb/core/mind/bscontroller/virtual_group.cpp @@ -22,7 +22,7 @@ namespace NKikimr::NBsController { // determine storage pool that will contain newly created virtual group TBoxStoragePoolId storagePoolId; - auto& pools = StoragePools.Get(); + auto& pools = StoragePools.Unshare(); switch (cmd.GetStoragePoolCase()) { case NKikimrBlobStorage::TAllocateVirtualGroup::kStoragePoolName: { ui32 found = 0; @@ -62,6 +62,10 @@ namespace NKikimr::NBsController { pool.EncryptionMode.GetOrElse(TBlobStorageGroupInfo::EEM_NONE), TBlobStorageGroupInfo::ELCP_INITIAL, TString(), TString(), 0u, 0u, false, false, storagePoolId, 0u, 0u, 0u); + // bind group to storage pool + ++pool.NumGroups; + StoragePoolGroups.Unshare().emplace(storagePoolId, group->ID); + group->VirtualGroupPool = id; group->VirtualGroupState = NKikimrBlobStorage::EVirtualGroupState::CREATED; group->ParentDir = cmd.GetParentDir(); @@ -70,8 +74,11 @@ namespace NKikimr::NBsController { if (cmd.GetBlobDepotId()) { group->VirtualGroupState = NKikimrBlobStorage::EVirtualGroupState::WORKING; group->BlobDepotId = cmd.GetBlobDepotId(); + group->SeenOperational = true; } + group->CalculateGroupStatus(); + NKikimrBlobDepot::TBlobDepotConfig config; config.SetOperationMode(NKikimrBlobDepot::EOperationMode::VirtualGroup); config.MutableChannelProfiles()->CopyFrom(cmd.GetChannelProfiles()); @@ -129,6 +136,9 @@ namespace NKikimr::NBsController { PARAM(BlobDepotId) PARAM(ErrorReason) #undef PARAM + if (group->SeenOperational) { + row.Update<T::SeenOperational>(true); + } return true; } @@ -232,6 +242,7 @@ namespace NKikimr::NBsController { group->VirtualGroupState = NKikimrBlobStorage::EVirtualGroupState::CREATE_FAILED; group->ErrorReason = record.GetSchemeShardReason(); } + group->CalculateGroupStatus(); Self->Execute(new TTxUpdateGroup(this)); } @@ -290,6 +301,8 @@ namespace NKikimr::NBsController { const auto& desc = record.GetPathDescription().GetBlobDepotDescription(); group->VirtualGroupState = NKikimrBlobStorage::EVirtualGroupState::WORKING; group->BlobDepotId = desc.GetTabletId(); + group->SeenOperational = true; + group->CalculateGroupStatus(); Y_VERIFY(*group->BlobDepotId); Self->Execute(new TTxUpdateGroup(this)); diff --git a/ydb/core/protos/blob_depot.proto b/ydb/core/protos/blob_depot.proto index 555430f272..15f9c568f5 100644 --- a/ydb/core/protos/blob_depot.proto +++ b/ydb/core/protos/blob_depot.proto @@ -38,13 +38,12 @@ message TValue { } message TGivenIdRange { - message TRange { - optional uint64 Begin = 1; - optional uint32 Len = 2; - repeated uint32 Offsets = 3; - repeated fixed64 BitMasks = 4; + message TChannelRange { + optional uint32 Channel = 1; + optional uint64 Begin = 2; + optional uint64 End = 3; } - repeated TRange Ranges = 1; + repeated TChannelRange ChannelRanges = 1; } @@ -62,6 +61,7 @@ message TEvApplyConfigResult { message TEvRegisterAgent { optional uint32 VirtualGroupId = 1; // for validation purposes + optional fixed64 AgentInstanceId = 2; // randomly generated number every time agent starts from scratch } message TEvRegisterAgentResult { @@ -104,10 +104,17 @@ message TEvPushNotify { optional fixed64 TabletId = 1; optional uint32 BlockedGeneration = 2; } + message TInvalidatedStep { + optional uint32 Channel = 1; + optional uint32 Generation = 2; // for validation purposes + optional uint32 InvalidatedStep = 3; + } repeated TBlockedTablet BlockedTablets = 1; + repeated TInvalidatedStep InvalidatedSteps = 2; } message TEvPushNotifyResult { + repeated TBlobSeqId WritesInFlight = 1; } message TEvQueryBlocks { |