diff options
author | alexvru <alexvru@ydb.tech> | 2022-07-19 12:04:34 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2022-07-19 12:04:34 +0300 |
commit | 62d316c142834b7d53315c1680b6f0ce9a56d185 (patch) | |
tree | 5b283975c95fae28d53b09482fecd7c4524fed78 | |
parent | 188a2f4a6b4b415f13eaf72fa82fae030b81b8aa (diff) | |
download | ydb-62d316c142834b7d53315c1680b6f0ce9a56d185.tar.gz |
BlobDepot work in progress
-rw-r--r-- | ydb/core/blob_depot/agent.cpp | 28 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/blocks.cpp | 4 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/channel_kind.cpp | 22 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/comm.cpp | 31 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/query.cpp | 2 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/request.cpp | 11 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_collect_garbage.cpp | 7 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_get.cpp | 18 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_put.cpp | 20 | ||||
-rw-r--r-- | ydb/core/blob_depot/blob_depot_tablet.h | 13 | ||||
-rw-r--r-- | ydb/core/blob_depot/blocks.cpp | 8 | ||||
-rw-r--r-- | ydb/core/blob_depot/data.cpp | 287 | ||||
-rw-r--r-- | ydb/core/blob_depot/data.h | 49 | ||||
-rw-r--r-- | ydb/core/blob_depot/defs.h | 1 | ||||
-rw-r--r-- | ydb/core/blob_depot/events.h | 18 | ||||
-rw-r--r-- | ydb/core/blob_depot/given_id_range.cpp | 218 | ||||
-rw-r--r-- | ydb/core/blob_depot/mon_main.cpp | 50 | ||||
-rw-r--r-- | ydb/core/blob_depot/op_commit_blob_seq.cpp | 12 | ||||
-rw-r--r-- | ydb/core/blob_depot/types.h | 23 | ||||
-rw-r--r-- | ydb/core/mind/hive/monitoring.cpp | 2 |
20 files changed, 593 insertions, 231 deletions
diff --git a/ydb/core/blob_depot/agent.cpp b/ydb/core/blob_depot/agent.cpp index 68de7977909..eeb06f7e4df 100644 --- a/ydb/core/blob_depot/agent.cpp +++ b/ydb/core/blob_depot/agent.cpp @@ -17,9 +17,10 @@ namespace NKikimr::NBlobDepot { Y_VERIFY(it != PipeServerToNode.end()); if (const auto& nodeId = it->second) { if (const auto agentIt = Agents.find(*nodeId); agentIt != Agents.end()) { - if (TAgent& agent = agentIt->second; agent.ConnectedAgent == it->first) { + if (TAgent& agent = agentIt->second; agent.PipeServerId == it->first) { OnAgentDisconnect(agent); - agent.ConnectedAgent.reset(); + agent.PipeServerId.reset(); + agent.AgentId.reset(); agent.ConnectedNodeId = 0; agent.ExpirationTimestamp = TActivationContext::Now() + ExpirationTimeout; } @@ -37,14 +38,15 @@ namespace NKikimr::NBlobDepot { const TActorId& pipeServerId = ev->Recipient; const auto& req = ev->Get()->Record; STLOG(PRI_DEBUG, BLOB_DEPOT, BDT03, "TEvRegisterAgent", (TabletId, TabletID()), (Msg, req), (NodeId, nodeId), - (PipeServerId, pipeServerId)); + (PipeServerId, pipeServerId), (Id, ev->Cookie)); const auto it = PipeServerToNode.find(pipeServerId); Y_VERIFY(it != PipeServerToNode.end()); Y_VERIFY(!it->second || *it->second == nodeId); it->second = nodeId; auto& agent = Agents[nodeId]; - agent.ConnectedAgent = pipeServerId; + agent.PipeServerId = pipeServerId; + agent.AgentId = ev->Sender; agent.ConnectedNodeId = nodeId; agent.ExpirationTimestamp = TInstant::Max(); @@ -55,9 +57,8 @@ namespace NKikimr::NBlobDepot { OnAgentConnect(agent); - auto [response, record] = TEvBlobDepot::MakeResponseFor(*ev, SelfId()); + auto [response, record] = TEvBlobDepot::MakeResponseFor(*ev, SelfId(), Executor()->Generation()); - record->SetGeneration(Executor()->Generation()); for (const auto& [k, v] : ChannelKinds) { auto *proto = record->AddChannelKinds(); proto->SetChannelKind(k); @@ -127,7 +128,14 @@ namespace NKikimr::NBlobDepot { TAgent& agent = GetAgent(ev->Recipient); for (const auto& range : givenIdRange->GetChannelRanges()) { agent.GivenIdRanges[range.GetChannel()].IssueNewRange(range.GetBegin(), range.GetEnd()); - Channels[range.GetChannel()].GivenIdRanges.IssueNewRange(range.GetBegin(), range.GetEnd()); + + auto& givenIdRanges = Channels[range.GetChannel()].GivenIdRanges; + const bool wasEmpty = givenIdRanges.IsEmpty(); + givenIdRanges.IssueNewRange(range.GetBegin(), range.GetEnd()); + if (wasEmpty) { + Data->OnLeastExpectedBlobIdChange(range.GetChannel()); + } + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT99, "IssueNewRange", (TabletId, TabletID()), (AgentId, agent.ConnectedNodeId), (Channel, range.GetChannel()), (Begin, range.GetBegin()), (End, range.GetEnd())); @@ -142,14 +150,16 @@ namespace NKikimr::NBlobDepot { Y_VERIFY(it != PipeServerToNode.end()); Y_VERIFY(it->second); TAgent& agent = GetAgent(*it->second); - Y_VERIFY(agent.ConnectedAgent == pipeServerId); + Y_VERIFY(agent.PipeServerId == pipeServerId); return agent; } TBlobDepot::TAgent& TBlobDepot::GetAgent(ui32 nodeId) { const auto agentIt = Agents.find(nodeId); Y_VERIFY(agentIt != Agents.end()); - return agentIt->second; + TAgent& agent = agentIt->second; + Y_VERIFY(agent.ConnectedNodeId == nodeId); + return agent; } void TBlobDepot::ResetAgent(TAgent& agent) { diff --git a/ydb/core/blob_depot/agent/blocks.cpp b/ydb/core/blob_depot/agent/blocks.cpp index 852c6bc24c2..06efcfbff0a 100644 --- a/ydb/core/blob_depot/agent/blocks.cpp +++ b/ydb/core/blob_depot/agent/blocks.cpp @@ -21,6 +21,8 @@ namespace NKikimr::NBlobDepot { Agent.Issue(std::move(queryBlocks), this, std::make_shared<TQueryBlockContext>( TActivationContext::Monotonic(), tabletId)); block.RefreshInFlight = true; + } + if (status == NKikimrProto::UNKNOWN) { block.PendingBlockChecks.PushBack(query); } STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA01, "CheckBlockForTablet", (QueryId, query->GetQueryId()), @@ -44,7 +46,7 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::TBlocksManager::Handle(TRequestContext::TPtr context, NKikimrBlobDepot::TEvQueryBlocksResult& msg) { auto& queryBlockContext = context->Obtain<TQueryBlockContext>(); - STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA02, "TEvQueryBlocksResult", (VirtualGroupId, Agent.VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA02, "TEvQueryBlocksResult", (VirtualGroupId, Agent.VirtualGroupId), (Msg, msg), (TabletId, queryBlockContext.TabletId)); auto& block = Blocks[queryBlockContext.TabletId]; Y_VERIFY(msg.BlockedGenerationsSize() == 1); diff --git a/ydb/core/blob_depot/agent/channel_kind.cpp b/ydb/core/blob_depot/agent/channel_kind.cpp index 6cbae9e0173..e9e30896474 100644 --- a/ydb/core/blob_depot/agent/channel_kind.cpp +++ b/ydb/core/blob_depot/agent/channel_kind.cpp @@ -23,6 +23,13 @@ namespace NKikimr::NBlobDepot { } ui32 TBlobDepotAgent::TChannelKind::GetNumAvailableItems() const { +#ifndef NDEBUG + ui32 count = 0; + for (const auto& [_, givenIdRanges] : GivenIdRangePerChannel) { + count += givenIdRanges.GetNumAvailableItems(); + } + Y_VERIFY(count == NumAvailableItems); +#endif return NumAvailableItems; } @@ -55,14 +62,21 @@ namespace NKikimr::NBlobDepot { } void TBlobDepotAgent::TChannelKind::Trim(ui8 channel, ui32 generation, ui32 invalidatedStep) { - GivenIdRangePerChannel[channel].Trim(channel, generation, invalidatedStep); + const TBlobSeqId trimmedBlobSeqId{channel, generation, invalidatedStep, TBlobSeqId::MaxIndex}; + const ui64 validSince = trimmedBlobSeqId.ToSequentialNumber() + 1; + auto& givenIdRanges = GivenIdRangePerChannel[channel]; + NumAvailableItems -= givenIdRanges.GetNumAvailableItems(); + givenIdRanges.Trim(validSince); + NumAvailableItems += givenIdRanges.GetNumAvailableItems(); RebuildHeap(); } void TBlobDepotAgent::TChannelKind::RebuildHeap() { GivenIdRangeHeap.clear(); for (auto& kv : GivenIdRangePerChannel) { - GivenIdRangeHeap.push_back(&kv); + if (!kv.second.IsEmpty()) { + GivenIdRangeHeap.push_back(&kv); + } } std::make_heap(GivenIdRangeHeap.begin(), GivenIdRangeHeap.end(), TGivenIdRangeHeapComp()); } @@ -74,9 +88,7 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::TChannelKind::ProcessQueriesWaitingForId() { TIntrusiveList<TQuery, TPendingId> temp; temp.Swap(QueriesWaitingForId); - for (TQuery& query : temp) { - query.OnIdAllocated(); - } + temp.ForEach([&](TQuery *query) { query->OnIdAllocated(); }); } } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/comm.cpp b/ydb/core/blob_depot/agent/comm.cpp index bd09eddd8a2..8db0a12479a 100644 --- a/ydb/core/blob_depot/agent/comm.cpp +++ b/ydb/core/blob_depot/agent/comm.cpp @@ -26,7 +26,7 @@ namespace NKikimr::NBlobDepot { } void TBlobDepotAgent::Handle(TRequestContext::TPtr /*context*/, NKikimrBlobDepot::TEvRegisterAgentResult& msg) { - STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA06, "TEvRegisterAgentResult", (VirtualGroupId, VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA06, "TEvRegisterAgentResult", (VirtualGroupId, VirtualGroupId), (Msg, msg)); Registered = true; BlobDepotGeneration = msg.GetGeneration(); @@ -63,12 +63,21 @@ namespace NKikimr::NBlobDepot { STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA07, "kind vanished", (VirtualGroupId, VirtualGroupId), (Kind, kind)); ChannelKinds.erase(kind); } + + for (const auto& [channel, kind] : ChannelToKind) { + kind->Trim(channel, BlobDepotGeneration - 1, Max<ui32>()); + + auto& wif = kind->WritesInFlight; + const TBlobSeqId min{channel, 0, 0, 0}; + const TBlobSeqId max{channel, BlobDepotGeneration - 1, Max<ui32>(), TBlobSeqId::MaxIndex}; + wif.erase(wif.lower_bound(min), wif.upper_bound(max)); + } } void TBlobDepotAgent::IssueAllocateIdsIfNeeded(TChannelKind& kind) { if (!kind.IdAllocInFlight && kind.GetNumAvailableItems() < 100 && PipeId) { const ui64 id = NextRequestId++; - STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA08, "IssueAllocateIdsIfNeeded", (VirtualGroupId, VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA08, "IssueAllocateIdsIfNeeded", (VirtualGroupId, VirtualGroupId), (ChannelKind, NKikimrBlobDepot::TChannelKind::E_Name(kind.Kind)), (IdAllocInFlight, kind.IdAllocInFlight), (NumAvailableItems, kind.GetNumAvailableItems()), (RequestId, id)); @@ -79,9 +88,6 @@ namespace NKikimr::NBlobDepot { } void TBlobDepotAgent::Handle(TRequestContext::TPtr context, NKikimrBlobDepot::TEvAllocateIdsResult& msg) { - STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA09, "TEvAllocateIdsResult", (VirtualGroupId, VirtualGroupId), - (Msg, msg)); - auto& allocateIdsContext = context->Obtain<TAllocateIdsContext>(); const auto it = ChannelKinds.find(allocateIdsContext.ChannelKind); Y_VERIFY_S(it != ChannelKinds.end(), "Kind# " << NKikimrBlobDepot::TChannelKind::E_Name(allocateIdsContext.ChannelKind) @@ -97,11 +103,12 @@ namespace NKikimr::NBlobDepot { if (msg.HasGivenIdRange()) { kind.IssueGivenIdRange(msg.GetGivenIdRange()); } + + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA09, "TEvAllocateIdsResult", (VirtualGroupId, VirtualGroupId), + (Msg, msg), (NumAvailableItems, kind.GetNumAvailableItems())); } void TBlobDepotAgent::OnDisconnect() { - STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA10, "OnDisconnect", (VirtualGroupId, VirtualGroupId)); - for (auto& [id, request] : std::exchange(TabletRequestInFlight, {})) { request.Sender->OnRequestComplete(id, TTabletDisconnected{}); } @@ -154,6 +161,9 @@ namespace NKikimr::NBlobDepot { auto [response, record] = TEvBlobDepot::MakeResponseFor(*ev, SelfId()); auto& msg = ev->Get()->Record; + + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA11, "TEvPushNotify", (VirtualGroupId, VirtualGroupId), (Msg, msg)); + BlocksManager.OnBlockedTablets(msg.GetBlockedTablets()); for (const auto& item : msg.GetInvalidatedSteps()) { @@ -162,6 +172,7 @@ namespace NKikimr::NBlobDepot { const auto it = ChannelToKind.find(channel); Y_VERIFY(it != ChannelToKind.end()); TChannelKind& kind = *it->second; + const ui32 numAvailableItemsBefore = kind.GetNumAvailableItems(); kind.Trim(channel, item.GetGeneration(), item.GetInvalidatedStep()); // report writes in flight that are trimmed @@ -170,6 +181,12 @@ namespace NKikimr::NBlobDepot { for (auto it = kind.WritesInFlight.lower_bound(first); it != kind.WritesInFlight.end() && *it <= last; ++it) { it->ToProto(record->AddWritesInFlight()); } + + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA11, "TrimChannel", (VirtualGroupId, VirtualGroupId), + (Channel, int(channel)), (NumAvailableItemsBefore, numAvailableItemsBefore), + (NumAvailableItemsAfter, kind.GetNumAvailableItems())); + + IssueAllocateIdsIfNeeded(kind); } TActivationContext::Send(response.release()); diff --git a/ydb/core/blob_depot/agent/query.cpp b/ydb/core/blob_depot/agent/query.cpp index ab4cd8c7d3f..4132c86308d 100644 --- a/ydb/core/blob_depot/agent/query.cpp +++ b/ydb/core/blob_depot/agent/query.cpp @@ -49,7 +49,7 @@ namespace NKikimr::NBlobDepot { } void TBlobDepotAgent::TQuery::EndWithSuccess(std::unique_ptr<IEventBase> response) { - STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA14, "query ends with success", (VirtualGroupId, Agent.VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA14, "query ends with success", (VirtualGroupId, Agent.VirtualGroupId), (QueryId, QueryId), (Response, response->ToString())); Agent.SelfId().Send(Event->Sender, response.release(), 0, Event->Cookie); delete this; diff --git a/ydb/core/blob_depot/agent/request.cpp b/ydb/core/blob_depot/agent/request.cpp index 72adcaa0906..fbc486c984f 100644 --- a/ydb/core/blob_depot/agent/request.cpp +++ b/ydb/core/blob_depot/agent/request.cpp @@ -95,12 +95,11 @@ namespace NKikimr::NBlobDepot { template void TBlobDepotAgent::HandleOtherResponse(TEvBlobStorage::TEvPutResult::TPtr ev); void TBlobDepotAgent::OnRequestComplete(ui64 id, TResponse response, TRequestsInFlight& map) { - const auto it = map.find(id); - Y_VERIFY_S(it != map.end(), "id# " << id << " response# " << TRequestSender::ToString(response)); - TRequestInFlight request = std::move(it->second); - map.erase(it); - - request.Sender->OnRequestComplete(id, std::move(response)); + if (const auto it = map.find(id); it != map.end()) { + TRequestInFlight request = std::move(it->second); + map.erase(it); + request.Sender->OnRequestComplete(id, std::move(response)); + } } } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/storage_collect_garbage.cpp b/ydb/core/blob_depot/agent/storage_collect_garbage.cpp index 0b0b6f27ee0..94115450075 100644 --- a/ydb/core/blob_depot/agent/storage_collect_garbage.cpp +++ b/ydb/core/blob_depot/agent/storage_collect_garbage.cpp @@ -13,6 +13,7 @@ namespace NKikimr::NBlobDepot { ui32 NumDoNotKeep; ui32 CounterShift = 0; bool IsLast; + bool QueryInFlight = false; public: using TQuery::TQuery; @@ -63,6 +64,9 @@ namespace NKikimr::NBlobDepot { Agent.Issue(std::move(record), this, nullptr); + Y_VERIFY(!QueryInFlight); + QueryInFlight = true; + ++CounterShift; } @@ -85,6 +89,9 @@ namespace NKikimr::NBlobDepot { } void HandleCollectGarbageResult(TRequestContext::TPtr /*context*/, NKikimrBlobDepot::TEvCollectGarbageResult& msg) { + Y_VERIFY(QueryInFlight); + QueryInFlight = false; + if (!msg.HasStatus()) { EndWithError(NKikimrProto::ERROR, "incorrect TEvCollectGarbageResult protobuf"); } else if (const auto status = msg.GetStatus(); status != NKikimrProto::OK) { diff --git a/ydb/core/blob_depot/agent/storage_get.cpp b/ydb/core/blob_depot/agent/storage_get.cpp index 65882d78809..13bc7ef8e94 100644 --- a/ydb/core/blob_depot/agent/storage_get.cpp +++ b/ydb/core/blob_depot/agent/storage_get.cpp @@ -29,20 +29,23 @@ namespace NKikimr::NBlobDepot { for (ui32 i = 0; i < msg.QuerySize; ++i) { auto& query = msg.Queries[i]; - TString blobId(reinterpret_cast<const char*>(query.Id.GetRaw()), 3 * sizeof(ui64)); - if (const TValueChain *value = Agent.BlobMappingCache.ResolveKey(blobId, this, - std::make_shared<TResolveKeyContext>(i))) { - ProcessSingleResult(i, value); - } auto& response = Response->Responses[i]; response.Id = query.Id; response.Shift = query.Shift; response.RequestedSize = query.Size; + + TString blobId(reinterpret_cast<const char*>(query.Id.GetRaw()), 3 * sizeof(ui64)); + if (const TValueChain *value = Agent.BlobMappingCache.ResolveKey(blobId, this, + std::make_shared<TResolveKeyContext>(i))) { + if (!ProcessSingleResult(i, value)) { + return; + } + } } } - void ProcessSingleResult(ui32 queryIdx, const TValueChain *value) { + bool ProcessSingleResult(ui32 queryIdx, const TValueChain *value) { auto& msg = GetQuery(); if (!value) { @@ -57,11 +60,14 @@ namespace NKikimr::NBlobDepot { msg.GetHandleClass, msg.MustRestoreFirst, this, queryIdx, true, &error); if (!success) { EndWithError(NKikimrProto::ERROR, std::move(error)); + return false; } } if (!AnswersRemain) { EndWithSuccess(std::move(Response)); + return false; } + return true; } void OnRead(ui64 tag, NKikimrProto::EReplyStatus status, TString buffer) override { diff --git a/ydb/core/blob_depot/agent/storage_put.cpp b/ydb/core/blob_depot/agent/storage_put.cpp index 8a66913fbb0..01ab46aecd7 100644 --- a/ydb/core/blob_depot/agent/storage_put.cpp +++ b/ydb/core/blob_depot/agent/storage_put.cpp @@ -117,7 +117,25 @@ namespace NKikimr::NBlobDepot { --PutsInFlight; if (msg.Status != NKikimrProto::OK) { EndWithError(msg.Status, std::move(msg.ErrorReason)); - } else if (!PutsInFlight) { + } else if (PutsInFlight) { + // wait for all puts to complete + } else if (BlobSeqId.Generation != Agent.BlobDepotGeneration) { + // FIXME: although this is error now, we can handle this in the future, when BlobDepot picks records + // on restarts; it may have scanned written record and already updated it in its local database; + // however, if it did not, we can't try to commit this records as it may be already scheduled for + // garbage collection by the tablet + EndWithError(NKikimrProto::ERROR, "BlobDepot tablet was restarting during write"); + } else { + // find and remove the write in flight record to ensure it won't be reported upon TEvPushNotify + // reception AND to check that it wasn't already trimmed by answering TEvPushNotifyResult + const auto it = Agent.ChannelKinds.find(NKikimrBlobDepot::TChannelKind::Data); + if (it == Agent.ChannelKinds.end()) { + return EndWithError(NKikimrProto::ERROR, "no Data channels"); + } + auto& kind = it->second; + const size_t numErased = kind.WritesInFlight.erase(BlobSeqId); + Y_VERIFY(numErased); + NKikimrBlobDepot::TEvCommitBlobSeq request; auto& msg = *Event->Get<TEvBlobStorage::TEvPut>(); diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h index 0fa8e971de2..902588da70c 100644 --- a/ydb/core/blob_depot/blob_depot_tablet.h +++ b/ydb/core/blob_depot/blob_depot_tablet.h @@ -33,7 +33,8 @@ namespace NKikimr::NBlobDepot { static constexpr TDuration ExpirationTimeout = TDuration::Minutes(1); struct TAgent { - std::optional<TActorId> ConnectedAgent; + std::optional<TActorId> PipeServerId; + std::optional<TActorId> AgentId; ui32 ConnectedNodeId; TInstant ExpirationTimestamp; std::optional<ui64> AgentInstanceId; @@ -70,12 +71,12 @@ namespace NKikimr::NBlobDepot { //////////////////////////////////////////////////////////////////////////////////////////////////////////////// - std::deque<std::unique_ptr<IEventHandle>> InitialEventsQ; - void Enqueue(TAutoPtr<IEventHandle>& ev, const TActorContext&) override { - InitialEventsQ.emplace_back(ev.Release()); + Y_FAIL("unexpected event Type# %08" PRIx32, ev->GetTypeRewrite()); } + void DefaultSignalTabletActive(const TActorContext&) override {} // signalled explicitly after load is complete + void OnActivateExecutor(const TActorContext&) override { STLOG(PRI_DEBUG, BLOB_DEPOT, BDT20, "OnActivateExecutor", (TabletId, TabletID())); ExecuteTxInitSchema(); @@ -84,9 +85,7 @@ namespace NKikimr::NBlobDepot { void OnLoadFinished() { STLOG(PRI_DEBUG, BLOB_DEPOT, BDT20, "OnLoadFinished", (TabletId, TabletID())); Become(&TThis::StateWork); - for (auto&& ev : std::exchange(InitialEventsQ, {})) { - TActivationContext::Send(ev.release()); - } + SignalTabletActive(TActivationContext::AsActorContext()); } void OnDetach(const TActorContext&) override { diff --git a/ydb/core/blob_depot/blocks.cpp b/ydb/core/blob_depot/blocks.cpp index 0c7107ee1be..d26ac376abc 100644 --- a/ydb/core/blob_depot/blocks.cpp +++ b/ydb/core/blob_depot/blocks.cpp @@ -102,7 +102,7 @@ namespace NKikimr::NBlobDepot { item->SetBlockedGeneration(BlockedGeneration); TAgent& agent = Self->GetAgent(agentId); - if (const auto& actorId = agent.ConnectedAgent) { + if (const auto& actorId = agent.AgentId) { Send(*actorId, ev.release(), 0, IssuerGuid); } NodesWaitingForPushResult.insert(agentId); @@ -124,7 +124,7 @@ namespace NKikimr::NBlobDepot { void IssueBlocksToStorage() { THashSet<ui32> processedGroups; - for (const auto& [_, kind] : Self->ChannelKinds) { + for (const auto& [_, kind] : Self->ChannelKinds) { // FIXME: SIGSEGV here? for (const auto& [channel, groupId] : kind.ChannelGroups) { // FIXME: consider previous group generations (because agent can write in obsolete tablet generation) // !!!!!!!!!!! @@ -138,14 +138,14 @@ namespace NKikimr::NBlobDepot { } void SendBlock(ui32 groupId) { - STLOG(PRI_INFO, BLOB_DEPOT, BDT06, "issing TEvBlock", (TabletId, Self->TabletID()), (BlockedTabletId, + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT06, "issing TEvBlock", (TabletId, Self->TabletID()), (BlockedTabletId, TabletId), (BlockedGeneration, BlockedGeneration), (GroupId, groupId), (IssuerGuid, IssuerGuid)); SendToBSProxy(SelfId(), groupId, new TEvBlobStorage::TEvBlock(TabletId, BlockedGeneration, TInstant::Max(), IssuerGuid), groupId); } void Handle(TEvBlobStorage::TEvBlockResult::TPtr ev) { - STLOG(PRI_INFO, BLOB_DEPOT, BDT07, "TEvBlockResult", (TabletId, Self->TabletID()), (Msg, ev->Get()->ToString()), + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT07, "TEvBlockResult", (TabletId, Self->TabletID()), (Msg, ev->Get()->ToString()), (BlockedTabletId, TabletId), (BlockedGeneration, BlockedGeneration), (GroupId, ev->Cookie)); switch (ev->Get()->Status) { case NKikimrProto::OK: diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp index b0d58eddae0..c5292a7ed54 100644 --- a/ydb/core/blob_depot/data.cpp +++ b/ydb/core/blob_depot/data.cpp @@ -3,7 +3,9 @@ namespace NKikimr::NBlobDepot { - class TBlobDepot::TData::TTxIssueGC : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { + using TData = TBlobDepot::TData; + + class TData::TTxIssueGC : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { const ui8 Channel; const ui32 GroupId; const TGenStep IssuedGenStep; @@ -30,7 +32,7 @@ namespace NKikimr::NBlobDepot { } }; - class TBlobDepot::TData::TTxConfirmGC : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { + class TData::TTxConfirmGC : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { const ui8 Channel; const ui32 GroupId; std::vector<TLogoBlobID> TrashDeleted; @@ -74,21 +76,21 @@ namespace NKikimr::NBlobDepot { } }; - std::optional<TBlobDepot::TData::TValue> TBlobDepot::TData::FindKey(const TKey& key) { + std::optional<TData::TValue> TData::FindKey(const TKey& key) { const auto it = Data.find(key); return it != Data.end() ? std::make_optional(it->second) : std::nullopt; } - TBlobDepot::TData::TRecordsPerChannelGroup& TBlobDepot::TData::GetRecordsPerChannelGroup(TLogoBlobID id) { + TData::TRecordsPerChannelGroup& TData::GetRecordsPerChannelGroup(TLogoBlobID id) { TTabletStorageInfo *info = Self->Info(); const ui32 groupId = info->GroupFor(id.Channel(), id.Generation()); Y_VERIFY(groupId != Max<ui32>()); const auto& key = std::make_tuple(id.TabletID(), id.Channel(), groupId); - const auto [it, _] = RecordsPerChannelGroup.try_emplace(key, id.TabletID(), id.Channel(), groupId); + const auto [it, _] = RecordsPerChannelGroup.emplace(std::piecewise_construct, key, key); return it->second; } - void TBlobDepot::TData::AddDataOnLoad(TKey key, TString value) { + void TData::AddDataOnLoad(TKey key, TString value) { NKikimrBlobDepot::TValue proto; const bool success = proto.ParseFromString(value); Y_VERIFY(success); @@ -100,21 +102,21 @@ namespace NKikimr::NBlobDepot { }); } - void TBlobDepot::TData::AddTrashOnLoad(TLogoBlobID id) { + void TData::AddTrashOnLoad(TLogoBlobID id) { auto& record = GetRecordsPerChannelGroup(id); record.Trash.insert(id); - OnTrashInserted(record); + record.EnqueueForCollectionIfPossible(this); } - void TBlobDepot::TData::AddGenStepOnLoad(ui8 channel, ui32 groupId, TGenStep issuedGenStep, TGenStep confirmedGenStep) { + void TData::AddGenStepOnLoad(ui8 channel, ui32 groupId, TGenStep issuedGenStep, TGenStep confirmedGenStep) { const auto& key = std::make_tuple(Self->TabletID(), channel, groupId); - const auto [it, _] = RecordsPerChannelGroup.try_emplace(key, Self->TabletID(), channel, groupId); + const auto [it, _] = RecordsPerChannelGroup.emplace(std::piecewise_construct, key, key); auto& record = it->second; record.IssuedGenStep = issuedGenStep; record.LastConfirmedGenStep = confirmedGenStep; } - void TBlobDepot::TData::PutKey(TKey key, TValue&& data) { + void TData::PutKey(TKey key, TValue&& data) { STLOG(PRI_DEBUG, BLOB_DEPOT, BDT08, "PutKey", (TabletId, Self->TabletID()), (Key, key.ToString(Self->Config)), (KeepState, NKikimrBlobDepot::EKeepState_Name(data.KeepState))); @@ -130,13 +132,7 @@ namespace NKikimr::NBlobDepot { Data[std::move(key)] = std::move(data); } - void TBlobDepot::TData::OnTrashInserted(TRecordsPerChannelGroup& record) { - if (!record.CollectGarbageRequestInFlight && record.TabletId == Self->TabletID()) { - RecordsWithTrash.PushBack(&record); - } - } - - std::optional<TString> TBlobDepot::TData::UpdateKeepState(TKey key, NKikimrBlobDepot::EKeepState keepState) { + std::optional<TString> TData::UpdateKeepState(TKey key, NKikimrBlobDepot::EKeepState keepState) { const auto it = Data.find(key); if (it != Data.end() && keepState <= it->second.KeepState) { return std::nullopt; @@ -146,7 +142,7 @@ namespace NKikimr::NBlobDepot { return ToValueProto(value); } - void TBlobDepot::TData::DeleteKey(const TKey& key, const std::function<void(TLogoBlobID)>& updateTrash, void *cookie) { + void TData::DeleteKey(const TKey& key, const std::function<void(TLogoBlobID)>& updateTrash, void *cookie) { const auto it = Data.find(key); Y_VERIFY(it != Data.end()); TValue& value = it->second; @@ -162,19 +158,16 @@ namespace NKikimr::NBlobDepot { Data.erase(it); } - void TBlobDepot::TData::CommitTrash(void *cookie) { + void TData::CommitTrash(void *cookie) { auto range = InFlightTrash.equal_range(cookie); for (auto it = range.first; it != range.second; ++it) { auto& record = GetRecordsPerChannelGroup(it->second); - const auto usedIt = record.Used.find(it->second); - Y_VERIFY(usedIt != record.Used.end()); - record.Trash.insert(record.Used.extract(usedIt)); - OnTrashInserted(record); + record.MoveToTrash(this, it->second); } InFlightTrash.erase(range.first, range.second); } - TString TBlobDepot::TData::ToValueProto(const TValue& value) { + TString TData::ToValueProto(const TValue& value) { NKikimrBlobDepot::TValue proto; if (value.Meta) { proto.SetMeta(value.Meta); @@ -193,68 +186,73 @@ namespace NKikimr::NBlobDepot { return s; } - void TBlobDepot::TData::HandleTrash() { + void TData::HandleTrash() { const ui32 generation = Self->Executor()->Generation(); THashMap<ui32, std::unique_ptr<TEvBlobDepot::TEvPushNotify>> outbox; - for (TRecordsPerChannelGroup& record : RecordsWithTrash) { + while (RecordsWithTrash) { + TRecordsPerChannelGroup& record = *RecordsWithTrash.PopFront(); + Y_VERIFY(!record.CollectGarbageRequestInFlight); Y_VERIFY(record.TabletId == Self->TabletID()); Y_VERIFY(!record.Trash.empty()); - TGenStep nextGenStep(*--record.Trash.end()); - Y_VERIFY(record.Channel < Self->Channels.size()); auto& channel = Self->Channels[record.Channel]; - if (!channel.GivenIdRanges.IsEmpty()) { - // minimum blob seq id that can still be written by other party - const auto minBlobSeqId = TBlobSeqId::FromSequentalNumber(record.Channel, generation, - channel.GivenIdRanges.GetMinimumValue()); - - // step we are going to invalidate (including blobs with this one) - const ui32 invalidatedStep = nextGenStep.Step(); - - if (minBlobSeqId.Step <= invalidatedStep) { - const TLogoBlobID maxId(record.TabletId, generation, minBlobSeqId.Step, record.Channel, 0, 0); - const auto it = record.Trash.lower_bound(maxId); - if (it != record.Trash.begin()) { - nextGenStep = TGenStep(*std::prev(it)); - } else { - nextGenStep = {}; + TGenStep nextGenStep(*--record.Trash.end()); + + // step we are going to invalidate (including blobs with this one) + if (TGenStep(record.LeastExpectedBlobId) <= nextGenStep) { + const ui32 invalidatedStep = nextGenStep.Step(); // the step we want to invalidate and garbage collect + + // remove invalidated step from allocations + auto blobSeqId = TBlobSeqId::FromSequentalNumber(record.Channel, generation, channel.NextBlobSeqId); + Y_VERIFY(record.LastConfirmedGenStep < TGenStep(blobSeqId)); + if (blobSeqId.Step <= invalidatedStep) { + blobSeqId.Step = invalidatedStep + 1; + blobSeqId.Index = 0; + channel.NextBlobSeqId = blobSeqId.ToSequentialNumber(); + } + + // issue notifications to agents + for (auto& [agentId, agent] : Self->Agents) { + if (!agent.AgentId) { + continue; } + const auto [it, inserted] = agent.InvalidatedStepInFlight.emplace(record.Channel, invalidatedStep); + if (inserted || it->second < invalidatedStep) { + it->second = invalidatedStep; - // issue notifications to agents - for (auto& [agentId, agent] : Self->Agents) { - if (!agent.ConnectedAgent) { - continue; - } - const auto [it, inserted] = agent.InvalidatedStepInFlight.emplace(record.Channel, invalidatedStep); - if (inserted || it->second < invalidatedStep) { - it->second = invalidatedStep; - - auto& ev = outbox[agentId]; - if (!ev) { - ev.reset(new TEvBlobDepot::TEvPushNotify); - } - auto *item = ev->Record.AddInvalidatedSteps(); - item->SetChannel(record.Channel); - item->SetGeneration(generation); - item->SetInvalidatedStep(invalidatedStep); + auto& ev = outbox[agentId]; + if (!ev) { + ev.reset(new TEvBlobDepot::TEvPushNotify); } + auto *item = ev->Record.AddInvalidatedSteps(); + item->SetChannel(record.Channel); + item->SetGeneration(generation); + item->SetInvalidatedStep(invalidatedStep); } } + + // adjust the barrier to keep it safe now + const TLogoBlobID maxId(record.TabletId, record.LeastExpectedBlobId.Generation, + record.LeastExpectedBlobId.Step, record.Channel, 0, 0); + const auto it = record.Trash.lower_bound(maxId); + if (it != record.Trash.begin()) { + nextGenStep = TGenStep(*std::prev(it)); + } else { + nextGenStep = {}; + } } auto keep = std::make_unique<TVector<TLogoBlobID>>(); auto doNotKeep = std::make_unique<TVector<TLogoBlobID>>(); - // FIXME: check for blob leaks when LastConfirmedGenStep is not properly persisted for (auto it = record.Trash.begin(); it != record.Trash.end() && TGenStep(*it) <= record.LastConfirmedGenStep; ++it) { doNotKeep->push_back(*it); } - // FIXME: check for blob loss when LastConfirmedGenStep is not properly persisted const TLogoBlobID keepFrom(record.TabletId, record.LastConfirmedGenStep.Generation(), record.LastConfirmedGenStep.Step(), record.Channel, 0, 0); for (auto it = record.Used.upper_bound(keepFrom); it != record.Used.end() && TGenStep(*it) <= nextGenStep; ++it) { @@ -270,7 +268,7 @@ namespace NKikimr::NBlobDepot { const bool collect = nextGenStep > record.LastConfirmedGenStep; if (!keep && !doNotKeep && !collect) { - continue; // skip this one + continue; // nothing to do here } auto ev = std::make_unique<TEvBlobStorage::TEvCollectGarbage>(record.TabletId, generation, @@ -284,16 +282,10 @@ namespace NKikimr::NBlobDepot { record.TrashInFlight.insert(record.TrashInFlight.end(), record.Trash.begin(), record.Trash.end()); record.IssuedGenStep = Max(nextGenStep, record.LastConfirmedGenStep); - auto blobSeqId = TBlobSeqId::FromSequentalNumber(record.Channel, generation, channel.NextBlobSeqId); - Y_VERIFY(record.LastConfirmedGenStep < TGenStep(blobSeqId)); - if (TGenStep(blobSeqId) <= nextGenStep) { - blobSeqId.Step = nextGenStep.Step() + 1; - blobSeqId.Index = 0; - channel.NextBlobSeqId = blobSeqId.ToSequentialNumber(); - } + record.TIntrusiveListItem<TRecordsPerChannelGroup, TRecordWithTrash>::Unlink(); STLOG(PRI_DEBUG, BLOB_DEPOT, BDT09, "issuing TEvCollectGarbage", (TabletId, Self->TabletID()), - (Channel, record.Channel), (GroupId, record.GroupId), (Msg, ev->ToString()), + (Channel, int(record.Channel)), (GroupId, record.GroupId), (Msg, ev->ToString()), (LastConfirmedGenStep, record.LastConfirmedGenStep), (IssuedGenStep, record.IssuedGenStep), (TrashInFlight.size, record.TrashInFlight.size())); @@ -305,8 +297,6 @@ namespace NKikimr::NBlobDepot { } } - RecordsWithTrash.Clear(); - for (auto& [agentId, ev] : outbox) { TAgent& agent = Self->GetAgent(agentId); const ui64 id = ++agent.LastRequestId; @@ -314,79 +304,156 @@ namespace NKikimr::NBlobDepot { for (const auto& item : ev->Record.GetInvalidatedSteps()) { request[item.GetChannel()] = item.GetInvalidatedStep(); } - if (const auto& actorId = agent.ConnectedAgent) { - TActivationContext::Send(new IEventHandle(*actorId, Self->SelfId(), ev.release(), 0, id)); - } + + Y_VERIFY(agent.AgentId); + TActivationContext::Send(new IEventHandle(*agent.AgentId, Self->SelfId(), ev.release(), 0, id)); } } - void TBlobDepot::TData::Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev) { + void TData::Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev) { STLOG(PRI_DEBUG, BLOB_DEPOT, BDT10, "TEvCollectGarbageResult", (TabletId, ev->Get()->TabletId), (Channel, ev->Get()->Channel), (GroupId, ev->Cookie), (Msg, ev->Get()->ToString())); const auto& key = std::make_tuple(ev->Get()->TabletId, ev->Get()->Channel, ev->Cookie); const auto it = RecordsPerChannelGroup.find(key); Y_VERIFY(it != RecordsPerChannelGroup.end()); auto& record = it->second; - Y_VERIFY(record.CollectGarbageRequestInFlight); if (ev->Get()->Status == NKikimrProto::OK) { - for (const TLogoBlobID& id : record.TrashInFlight) { // make it merge - record.Trash.erase(id); - } - record.LastConfirmedGenStep = record.IssuedGenStep; + Y_VERIFY(record.CollectGarbageRequestInFlight); + record.OnSuccessfulCollect(this); Self->Execute(std::make_unique<TTxConfirmGC>(Self, record.Channel, record.GroupId, std::exchange(record.TrashInFlight, {}), record.LastConfirmedGenStep)); } else { - record.CollectGarbageRequestInFlight = false; - OnTrashInserted(record); + record.ClearInFlight(this); HandleTrash(); } } - void TBlobDepot::TData::Handle(TEvBlobDepot::TEvPushNotifyResult::TPtr ev) { - TAgent& agent = Self->GetAgent(ev->Recipient); + void TData::Handle(TEvBlobDepot::TEvPushNotifyResult::TPtr ev) { + TAgent& agent = Self->GetAgent(ev->Sender.NodeId()); const ui32 generation = Self->Executor()->Generation(); + + std::set<TBlobSeqId> writesInFlight; + for (const auto& item : ev->Get()->Record.GetWritesInFlight()) { + writesInFlight.insert(TBlobSeqId::FromProto(item)); + } + if (const auto it = agent.InvalidateStepRequests.find(ev->Cookie); it != agent.InvalidateStepRequests.end()) { for (const auto& [channel, invalidatedStep] : it->second) { const ui32 channel_ = channel; const ui32 invalidatedStep_ = invalidatedStep; + auto& agentGivenIdRanges = agent.GivenIdRanges[channel]; + auto& givenIdRanges = Self->Channels[channel].GivenIdRanges; + + auto begin = writesInFlight.lower_bound(TBlobSeqId{channel, 0, 0, 0}); + auto end = writesInFlight.upper_bound(TBlobSeqId{channel, Max<ui32>(), Max<ui32>(), TBlobSeqId::MaxIndex}); + + auto makeWritesInFlight = [&] { + TStringStream s; + s << "["; + for (auto it = begin; it != end; ++it) { + s << (it != begin ? " " : "") << it->ToString(); + } + s << "]"; + return s.Str(); + }; + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT99, "Trim", (TabletId, Self->TabletID()), (AgentId, agent.ConnectedNodeId), - (Channel, channel_), (InvalidatedStep, invalidatedStep_), + (Id, ev->Cookie), (Channel, channel_), (InvalidatedStep, invalidatedStep_), (GivenIdRanges, Self->Channels[channel_].GivenIdRanges), - (Agent.GivenIdRanges, agent.GivenIdRanges[channel_])); - Self->Channels[channel].GivenIdRanges.Trim(channel, generation, invalidatedStep); - agent.GivenIdRanges[channel].Trim(channel, generation, invalidatedStep); + (Agent.GivenIdRanges, agent.GivenIdRanges[channel_]), + (WritesInFlight, makeWritesInFlight())); + + for (auto it = begin; it != end; ++it) { + Y_VERIFY_S(agentGivenIdRanges.GetPoint(it->ToSequentialNumber()), "blobSeqId# " << it->ToString()); + Y_VERIFY_S(givenIdRanges.GetPoint(it->ToSequentialNumber()), "blobSeqId# " << it->ToString()); + } + + const TBlobSeqId trimmedBlobSeqId{channel, generation, invalidatedStep, TBlobSeqId::MaxIndex}; + const ui64 validSince = trimmedBlobSeqId.ToSequentialNumber() + 1; + givenIdRanges.Subtract(agentGivenIdRanges.Trim(validSince)); + + for (auto it = begin; it != end; ++it) { + agentGivenIdRanges.AddPoint(it->ToSequentialNumber()); + givenIdRanges.AddPoint(it->ToSequentialNumber()); + } + + OnLeastExpectedBlobIdChange(channel); } agent.InvalidateStepRequests.erase(it); - } - for (const auto& item : ev->Get()->Record.GetWritesInFlight()) { - const auto blobSeqId = TBlobSeqId::FromProto(item); - const ui64 value = blobSeqId.ToSequentialNumber(); - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT99, "WriteInFlight", (TabletId, Self->TabletID()), (AgentId, agent.ConnectedNodeId), - (BlobSeqId, blobSeqId), (Value, value), - (GivenIdRanges, Self->Channels[blobSeqId.Channel].GivenIdRanges), - (Agent.GivenIdRanges, agent.GivenIdRanges[blobSeqId.Channel])); - Self->Channels[blobSeqId.Channel].GivenIdRanges.AddPoint(value); - agent.GivenIdRanges[blobSeqId.Channel].AddPoint(value); + } else { + Y_VERIFY_DEBUG(false); } HandleTrash(); } - void TBlobDepot::TData::OnCommitConfirmedGC(ui8 channel, ui32 groupId) { + void TData::OnCommitConfirmedGC(ui8 channel, ui32 groupId) { const auto& key = std::make_tuple(Self->TabletID(), channel, groupId); const auto it = RecordsPerChannelGroup.find(key); Y_VERIFY(it != RecordsPerChannelGroup.end()); - auto& record = it->second; - Y_VERIFY(record.CollectGarbageRequestInFlight); - record.CollectGarbageRequestInFlight = false; - if (!record.Trash.empty()) { - OnTrashInserted(record); - HandleTrash(); - } + it->second.ClearInFlight(this); } - bool TBlobDepot::TData::CanBeCollected(ui32 groupId, TBlobSeqId id) const { + bool TData::CanBeCollected(ui32 groupId, TBlobSeqId id) const { const auto it = RecordsPerChannelGroup.find(std::make_tuple(Self->TabletID(), id.Channel, groupId)); return it != RecordsPerChannelGroup.end() && TGenStep(id) <= it->second.IssuedGenStep; } + void TData::OnLeastExpectedBlobIdChange(ui8 channel) { + auto& ch = Self->Channels[channel]; + const ui64 minSequenceNumber = ch.GivenIdRanges.IsEmpty() + ? ch.NextBlobSeqId + : ch.GivenIdRanges.GetMinimumValue(); + const TBlobSeqId leastExpectedBlobId = TBlobSeqId::FromSequentalNumber(channel, Self->Executor()->Generation(), + minSequenceNumber); + + const TTabletStorageInfo *info = Self->Info(); + const TTabletChannelInfo *storageChannel = info->ChannelInfo(leastExpectedBlobId.Channel); + Y_VERIFY(storageChannel); + for (const auto& entry : storageChannel->History) { + const auto& key = std::make_tuple(info->TabletID, storageChannel->Channel, entry.GroupID); + auto [it, _] = RecordsPerChannelGroup.emplace(std::piecewise_construct, key, key); + auto& record = it->second; + record.OnLeastExpectedBlobIdChange(this, leastExpectedBlobId); + } + } + + void TData::TRecordsPerChannelGroup::MoveToTrash(TData *self, TLogoBlobID id) { + const auto usedIt = Used.find(id); + Y_VERIFY(usedIt != Used.end()); + Trash.insert(Used.extract(usedIt)); + EnqueueForCollectionIfPossible(self); + } + + void TData::TRecordsPerChannelGroup::OnSuccessfulCollect(TData *self) { + auto it = Trash.begin(); + for (const TLogoBlobID& id : TrashInFlight) { + for (; it != Trash.end() && *it < id; ++it) {} + Y_VERIFY(it != Trash.end() && *it == id); + it = Trash.erase(it); + } + LastConfirmedGenStep = IssuedGenStep; + EnqueueForCollectionIfPossible(self); + } + + void TData::TRecordsPerChannelGroup::OnLeastExpectedBlobIdChange(TData *self, TBlobSeqId leastExpectedBlobId) { + Y_VERIFY_S(LeastExpectedBlobId <= leastExpectedBlobId, "Prev# " << LeastExpectedBlobId.ToString() + << " Next# " << leastExpectedBlobId.ToString()); + if (LeastExpectedBlobId < leastExpectedBlobId) { + LeastExpectedBlobId = leastExpectedBlobId; + EnqueueForCollectionIfPossible(self); + } + } + + void TData::TRecordsPerChannelGroup::ClearInFlight(TData *self) { + Y_VERIFY(CollectGarbageRequestInFlight); + CollectGarbageRequestInFlight = false; + EnqueueForCollectionIfPossible(self); + } + + void TData::TRecordsPerChannelGroup::EnqueueForCollectionIfPossible(TData *self) { + if (!CollectGarbageRequestInFlight && TabletId == self->Self->TabletID() && Empty() && !Trash.empty()) { + self->RecordsWithTrash.PushBack(this); + } + } + } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h index 8e8b4ba84e4..160f1f5e54f 100644 --- a/ydb/core/blob_depot/data.h +++ b/ydb/core/blob_depot/data.h @@ -10,14 +10,9 @@ namespace NKikimr::NBlobDepot { public: class alignas(TString) TKey { - union { - ui64 Raw64[4]; - ui8 Bytes[32]; - char String[31]; - struct { - ui8 Padding[31]; - ui8 Type; - }; + struct TData { + ui8 Bytes[31]; + ui8 Type; } Data; static constexpr size_t TypeLenByteIdx = 31; @@ -25,9 +20,12 @@ namespace NKikimr::NBlobDepot { static constexpr char BlobIdType = 32; static constexpr char StringType = 33; + static_assert(sizeof(Data) == 32); + public: TKey() { - Reset(); + Data.Type = EncodeInlineStringLenAsTypeByte(0); + Data.Bytes[0] = 0; } explicit TKey(TLogoBlobID id) { @@ -35,22 +33,14 @@ namespace NKikimr::NBlobDepot { reinterpret_cast<TLogoBlobID&>(Data.Bytes) = id; } - explicit TKey(TStringBuf value) { + template<typename T, typename = std::enable_if_t<std::is_constructible_v<TString, T&&>>> + explicit TKey(T&& value) { if (value.size() <= MaxInlineStringLen) { Data.Type = EncodeInlineStringLenAsTypeByte(value.size()); - memcpy(Data.String, value.data(), value.size()); - Data.String[value.size()] = 0; - } else { - Data.Type = StringType; - new(Data.Bytes) TString(value); - } - } - - explicit TKey(TString value) { - if (value.size() <= MaxInlineStringLen) { - Data.Type = EncodeInlineStringLenAsTypeByte(value.size()); - memcpy(Data.String, value.data(), value.size()); - Data.String[value.size()] = 0; + memcpy(Data.Bytes, value.data(), value.size()); + if (value.size() != MaxInlineStringLen) { + Data.Bytes[value.size()] = 0; + } } else { Data.Type = StringType; new(Data.Bytes) TString(std::move(value)); @@ -187,13 +177,14 @@ namespace NKikimr::NBlobDepot { GetString().~TString(); } Data.Type = EncodeInlineStringLenAsTypeByte(0); + Data.Bytes[0] = 0; } TStringBuf GetStringBuf() const { if (Data.Type == StringType) { return GetString(); } else { - return TStringBuf(Data.String, DecodeInlineStringLenFromTypeByte(Data.Type)); + return TStringBuf(reinterpret_cast<const char*>(Data.Bytes), DecodeInlineStringLenFromTypeByte(Data.Type)); } } @@ -249,12 +240,19 @@ namespace NKikimr::NBlobDepot { TGenStep IssuedGenStep; // currently in flight or already confirmed TGenStep LastConfirmedGenStep; bool CollectGarbageRequestInFlight = false; + TBlobSeqId LeastExpectedBlobId; TRecordsPerChannelGroup(ui64 tabletId, ui8 channel, ui32 groupId) : TabletId(tabletId) , Channel(channel) , GroupId(groupId) {} + + void MoveToTrash(TData *self, TLogoBlobID id); + void OnSuccessfulCollect(TData *self); + void OnLeastExpectedBlobIdChange(TData *self, TBlobSeqId leastExpectedBlobId); + void ClearInFlight(TData *self); + void EnqueueForCollectionIfPossible(TData *self); }; std::map<TKey, TValue> Data; @@ -312,7 +310,6 @@ namespace NKikimr::NBlobDepot { void PutKey(TKey key, TValue&& data); - void OnTrashInserted(TRecordsPerChannelGroup& record); std::optional<TString> UpdateKeepState(TKey key, NKikimrBlobDepot::EKeepState keepState); void DeleteKey(const TKey& key, const std::function<void(TLogoBlobID)>& updateTrash, void *cookie); void CommitTrash(void *cookie); @@ -323,6 +320,8 @@ namespace NKikimr::NBlobDepot { bool CanBeCollected(ui32 groupId, TBlobSeqId id) const; + void OnLeastExpectedBlobIdChange(ui8 channel); + static TString ToValueProto(const TValue& value); template<typename TCallback> diff --git a/ydb/core/blob_depot/defs.h b/ydb/core/blob_depot/defs.h index ce566644ccf..8fceb52d73c 100644 --- a/ydb/core/blob_depot/defs.h +++ b/ydb/core/blob_depot/defs.h @@ -6,6 +6,7 @@ #include <ydb/core/tablet_flat/tablet_flat_executed.h> #include <ydb/core/tablet_flat/flat_cxx_database.h> #include <ydb/core/protos/blob_depot.pb.h> +#include <ydb/core/util/format.h> #include <ydb/core/util/stlog.h> #include <library/cpp/monlib/service/pages/templates.h> diff --git a/ydb/core/blob_depot/events.h b/ydb/core/blob_depot/events.h index 3203d8800c9..b5f0a2542a9 100644 --- a/ydb/core/blob_depot/events.h +++ b/ydb/core/blob_depot/events.h @@ -54,7 +54,7 @@ namespace NKikimr { BLOBDEPOT_EVENT_PB(EvApplyConfig, TxId); BLOBDEPOT_EVENT_PB(EvApplyConfigResult, TabletId, TxId); BLOBDEPOT_EVENT_PB(EvRegisterAgent, VirtualGroupId, AgentInstanceId); - BLOBDEPOT_EVENT_PB_NO_ARGS(EvRegisterAgentResult); + BLOBDEPOT_EVENT_PB(EvRegisterAgentResult, Generation); BLOBDEPOT_EVENT_PB(EvAllocateIds, ChannelKind, Count); BLOBDEPOT_EVENT_PB(EvAllocateIdsResult, ChannelKind, Generation); BLOBDEPOT_EVENT_PB_NO_ARGS(EvPushNotify); @@ -73,15 +73,15 @@ namespace NKikimr { template<typename TEvent> struct TResponseFor {}; - template<> struct TResponseFor<TEvApplyConfig> { using Type = TEvApplyConfigResult; }; - template<> struct TResponseFor<TEvRegisterAgent> { using Type = TEvRegisterAgentResult; }; - template<> struct TResponseFor<TEvAllocateIds> { using Type = TEvAllocateIdsResult; }; - template<> struct TResponseFor<TEvPushNotify> { using Type = TEvPushNotifyResult; }; - template<> struct TResponseFor<TEvBlock> { using Type = TEvBlockResult; }; - template<> struct TResponseFor<TEvQueryBlocks> { using Type = TEvQueryBlocksResult; }; + template<> struct TResponseFor<TEvApplyConfig> { using Type = TEvApplyConfigResult; }; + template<> struct TResponseFor<TEvRegisterAgent> { using Type = TEvRegisterAgentResult; }; + template<> struct TResponseFor<TEvAllocateIds> { using Type = TEvAllocateIdsResult; }; + template<> struct TResponseFor<TEvPushNotify> { using Type = TEvPushNotifyResult; }; + template<> struct TResponseFor<TEvBlock> { using Type = TEvBlockResult; }; + template<> struct TResponseFor<TEvQueryBlocks> { using Type = TEvQueryBlocksResult; }; template<> struct TResponseFor<TEvCollectGarbage> { using Type = TEvCollectGarbageResult; }; - template<> struct TResponseFor<TEvCommitBlobSeq> { using Type = TEvCommitBlobSeqResult; }; - template<> struct TResponseFor<TEvResolve> { using Type = TEvResolveResult; }; + template<> struct TResponseFor<TEvCommitBlobSeq> { using Type = TEvCommitBlobSeqResult; }; + template<> struct TResponseFor<TEvResolve> { using Type = TEvResolveResult; }; template<typename TRequestEvent, typename... TArgs> static auto MakeResponseFor(TEventHandle<TRequestEvent>& ev, TActorId selfId, TArgs&&... args) { diff --git a/ydb/core/blob_depot/given_id_range.cpp b/ydb/core/blob_depot/given_id_range.cpp index 74cd9a4e8bd..4b7f4ebe840 100644 --- a/ydb/core/blob_depot/given_id_range.cpp +++ b/ydb/core/blob_depot/given_id_range.cpp @@ -4,30 +4,93 @@ namespace NKikimr::NBlobDepot { void TGivenIdRange::IssueNewRange(ui64 begin, ui64 end) { Y_VERIFY(begin < end); + NumAvailableItems += end - begin; - const auto [it, inserted] = Ranges.emplace(begin, end); - Y_VERIFY(inserted); + const auto it = Ranges.upper_bound(begin); + bool adjacentWithNext = false, adjacentWithPrev = false; - if (it != Ranges.begin()) { - const auto& prev = *std::prev(it); - Y_VERIFY(prev.End <= begin); - } - if (std::next(it) != Ranges.end()) { - const auto& next = *std::next(it); + if (it != Ranges.end()) { + TRange& next = const_cast<TRange&>(*it); + if (next.Begin < end) { + Y_VERIFY(next.Bits.FirstNonZeroBit() >= end - next.Begin); + next.Bits.Set(0, end - next.Begin); + next.NumSetBits += end - next.Begin; + end = next.Begin; + } Y_VERIFY(end <= next.Begin); + adjacentWithNext = end == next.Begin; } - NumAvailableItems += end - begin; + if (it != Ranges.begin()) { + TRange& prev = const_cast<TRange&>(*std::prev(it)); + if (prev.End <= begin) { + // nothing interesting here -- no any kind overlap + } else if (end <= prev.End) { + // the [begin, end) pair is just a subrange of 'prev' item + Y_VERIFY(prev.Begin <= begin); + Y_VERIFY(!adjacentWithNext); + prev.Bits.Set(begin - prev.Begin, end - prev.Begin); + prev.NumSetBits += end - begin; + return; + } else { + Y_VERIFY(prev.Begin < begin); + prev.Bits.Set(begin - prev.Begin, prev.End - prev.Begin); + prev.NumSetBits += prev.End - begin; + begin = prev.End; + } + Y_VERIFY_DEBUG(prev.End <= begin); + adjacentWithPrev = prev.End == begin; + } + + if (adjacentWithNext && adjacentWithPrev) { + // [ prev ) [ next ) + // ^ ^ + // begin end + TRange& next = const_cast<TRange&>(*it); + TRange& prev = const_cast<TRange&>(*std::prev(it)); + prev.Bits |= next.Bits << end - prev.Begin; + prev.Bits.Set(begin - prev.Begin, end - prev.Begin); + prev.NumSetBits += next.NumSetBits + end - begin; + prev.End = next.End; + Ranges.erase(it); + } else if (adjacentWithNext) { + TRange& next = const_cast<TRange&>(*it); + const ui32 shift = end - begin; + next.Bits <<= shift; + next.Bits.Set(0, shift); + next.NumSetBits += shift; + next.Begin = begin; + } else if (adjacentWithPrev) { + TRange& prev = const_cast<TRange&>(*std::prev(it)); + prev.Bits.Set(prev.End - prev.Begin, end - prev.Begin); + prev.NumSetBits += end - begin; + prev.End = end; + } else { + Ranges.emplace_hint(it, begin, end); + } } void TGivenIdRange::AddPoint(ui64 value) { IssueNewRange(value, value + 1); } - void TGivenIdRange::RemovePoint(ui64 value) { + void TGivenIdRange::RemovePoint(ui64 value, bool *wasLeast) { auto it = Ranges.upper_bound(value); Y_VERIFY(it != Ranges.begin()); - Pop(std::prev(it), value); + --it; + if (wasLeast) { + *wasLeast = it == Ranges.begin() && it->Bits.FirstNonZeroBit() + it->Begin == value; + } + Pop(it, value); + } + + bool TGivenIdRange::GetPoint(ui64 value) const { + const auto it = Ranges.upper_bound(value); + if (it == Ranges.begin()) { + return false; + } + const TRange& range = *std::prev(it); + return range.Begin <= value && value < range.End && range.Bits[value - range.Begin]; } bool TGivenIdRange::IsEmpty() const { @@ -35,6 +98,14 @@ namespace NKikimr::NBlobDepot { } ui32 TGivenIdRange::GetNumAvailableItems() const { +#ifndef NDEBUG + ui32 count = 0; + for (const auto& range : Ranges) { + Y_VERIFY(range.NumSetBits == range.Bits.Count()); + count += range.NumSetBits; + } + Y_VERIFY(count == NumAvailableItems); +#endif return NumAvailableItems; } @@ -53,38 +124,88 @@ namespace NKikimr::NBlobDepot { return value; } - void TGivenIdRange::Trim(ui8 channel, ui32 generation, ui32 invalidatedStep) { - const ui64 validSince = 1 + TBlobSeqId{channel, generation, invalidatedStep, TBlobSeqId::MaxIndex}.ToSequentialNumber(); + TGivenIdRange TGivenIdRange::Trim(ui64 validSince) { + TGivenIdRange result; while (!Ranges.empty()) { - const auto it = Ranges.begin(); + auto it = Ranges.begin(); auto& range = const_cast<TRange&>(*it); if (range.End <= validSince) { - NumAvailableItems -= range.Bits.Count(); - Ranges.erase(it); + NumAvailableItems -= range.NumSetBits; + result.NumAvailableItems += range.NumSetBits; + result.Ranges.insert(result.Ranges.end(), Ranges.extract(it++)); + continue; } else if (range.Begin < validSince) { const ui32 len = validSince - range.Begin; - for (ui32 i = 0; i < len; ++i) { - range.NumSetBits -= range.Bits[i]; - NumAvailableItems -= range.Bits[i]; + TDynBitMap mask; + mask.Set(0, len); + + const TDynBitMap setBits = range.Bits & mask; + if (const size_t numSetBits = setBits.Count()) { + // clear set bits in the origin chunk + range.Bits -= setBits; + range.NumSetBits -= numSetBits; + NumAvailableItems -= numSetBits; + if (!range.NumSetBits) { + Ranges.erase(it); + } + + // put 'em into the result + TRange& r = const_cast<TRange&>(*result.Ranges.emplace_hint(result.Ranges.end(), range.Begin, range.End, TRange::Zero)); + r.Bits |= setBits; + r.NumSetBits += numSetBits; + result.NumAvailableItems += numSetBits; } - range.Bits.Reset(0, len); - } else { - break; } + break; } + + return result; } void TGivenIdRange::Subtract(const TGivenIdRange& other) { - for (const TRange& range : other.Ranges) { - const auto it = Ranges.find(range.Begin); - Y_VERIFY(it != Ranges.end()); - Y_VERIFY(range.End == it->End); - Y_VERIFY(range.NumSetBits == it->NumSetBits); - Y_VERIFY(range.Bits == it->Bits); - NumAvailableItems -= range.NumSetBits; - Ranges.erase(it); + auto it = Ranges.begin(); + auto otherIt = other.Ranges.begin(); + + while (it != Ranges.end()) { + if (it->End <= otherIt->Begin) { + ++it; + } else if (otherIt->End <= it->Begin) { + Y_FAIL(); + } else { + const ui64 begin = Max(it->Begin, otherIt->Begin); + const ui64 end = Min(it->End, otherIt->End); + Y_VERIFY(begin < end); + + TDynBitMap subtractedMask = otherIt->Bits; + if (otherIt->Begin < begin) { + subtractedMask >>= begin - otherIt->Begin; + } + subtractedMask.Reset(end - begin, subtractedMask.Size()); + if (it->Begin < begin) { + subtractedMask <<= begin - it->Begin; + } + Y_VERIFY_DEBUG((it->Bits & subtractedMask) == subtractedMask); + + TRange& r = const_cast<TRange&>(*it); + r.Bits -= subtractedMask; + r.NumSetBits -= subtractedMask.Count(); + if (!r.NumSetBits) { + it = Ranges.erase(it); + } else if (it->End == end) { + ++it; + } + + if (otherIt->End == end) { + ++otherIt; + } + + NumAvailableItems -= subtractedMask.Count(); + } } + + // ensure we have processed all other range + Y_VERIFY(otherIt == other.Ranges.end()); } void TGivenIdRange::Output(IOutputStream& s) const { @@ -109,7 +230,7 @@ namespace NKikimr::NBlobDepot { void TGivenIdRange::Pop(TRanges::iterator it, ui64 value) { TRange& range = const_cast<TRange&>(*it); - Y_VERIFY(range.Begin <= value && value < range.End); + Y_VERIFY_S(range.Begin <= value && value < range.End, "value# " << value << " this# " << ToString()); const size_t offset = value - range.Begin; Y_VERIFY(range.Bits[offset]); range.Bits.Reset(offset); @@ -120,4 +241,39 @@ namespace NKikimr::NBlobDepot { --NumAvailableItems; } + std::vector<bool> TGivenIdRange::ToDebugArray(size_t numItems) const { + std::vector<bool> res(numItems, false); + for (const TRange& item : Ranges) { + for (ui32 i = item.Bits.FirstNonZeroBit(); i != item.Bits.Size(); i = item.Bits.NextNonZeroBit(i)) { + Y_VERIFY_S(i < item.End - item.Begin, "i# " << i << " this# " << ToString()); + res[item.Begin + i] = true; + } + } + return res; + } + + void TGivenIdRange::CheckConsistency() const { + ui32 numAvailableItems = 0; + + for (auto it = Ranges.begin(); it != Ranges.end(); ++it) { + const TRange& range = *it; + Y_VERIFY_S(range.Begin < range.End, ToString()); + + if (it != Ranges.begin()) { + const TRange& prev = *std::prev(it); + Y_VERIFY_S(prev.End < range.Begin, ToString()); + } + + Y_VERIFY_S(range.NumSetBits, ToString()); + Y_VERIFY_S(range.NumSetBits == range.Bits.Count(), ToString()); + for (ui32 i = range.End - range.Begin; i < range.Bits.Size(); ++i) { + Y_VERIFY_S(!range.Bits[i], "begin# " << range.Begin << " i# " << i << " this# " << ToString()); + } + + numAvailableItems += range.NumSetBits; + } + + Y_VERIFY_S(numAvailableItems == NumAvailableItems, ToString()); + } + } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/mon_main.cpp b/ydb/core/blob_depot/mon_main.cpp index 7f5544420e1..e64fb0dbc8e 100644 --- a/ydb/core/blob_depot/mon_main.cpp +++ b/ydb/core/blob_depot/mon_main.cpp @@ -270,6 +270,56 @@ namespace NKikimr::NBlobDepot { void TBlobDepot::RenderMainPage(IOutputStream& s) { HTML(s) { s << "<a href='app?TabletID=" << TabletID() << "&page=data'>Contained data</a><br>"; + + DIV_CLASS("panel panel-info") { + DIV_CLASS("panel-heading") { + s << "Stats"; + } + DIV_CLASS("panel-body") { + TABLE_CLASS("table") { + TABLEHEAD() { + TABLER() { + TABLEH() { s << "Parameter"; } + TABLEH() { s << "Value"; } + } + } + TABLEBODY() { + auto outSize = [&](ui64 size) { + static const char *suffixes[] = { + "B", "KiB", "MiB", "GiB", "TiB", "PiB", nullptr + }; + FormatHumanReadable(s, size, 1024, 2, suffixes); + }; + TABLER() { + TABLED() { s << "Data, bytes"; } + TABLED() { + ui64 total = 0; + Data->EnumerateRefCount([&](TLogoBlobID id, ui32 /*refCount*/) { + total += id.BlobSize(); + }); + outSize(total); + } + } + + ui64 trashInFlight = 0; + ui64 trashPending = 0; + Data->EnumerateTrash([&](ui32 /*groupId*/, TLogoBlobID id, bool inFlight) { + (inFlight ? trashInFlight : trashPending) += id.BlobSize(); + }); + + TABLER() { + TABLED() { s << "Trash in flight, bytes"; } + TABLED() { outSize(trashInFlight); } + } + + TABLER() { + TABLED() { s << "Trash pending, bytes"; } + TABLED() { outSize(trashPending); } + } + } + } + } + } } } diff --git a/ydb/core/blob_depot/op_commit_blob_seq.cpp b/ydb/core/blob_depot/op_commit_blob_seq.cpp index bd85b9eda49..a82df128315 100644 --- a/ydb/core/blob_depot/op_commit_blob_seq.cpp +++ b/ydb/core/blob_depot/op_commit_blob_seq.cpp @@ -42,7 +42,7 @@ namespace NKikimr::NBlobDepot { if (blobSeqId.Generation == generation) { // check for internal sanity -- we can't issue barriers on given ids without confirmed trimming - Y_VERIFY(!canBeCollected); + Y_VERIFY_S(!canBeCollected, "BlobSeqId# " << blobSeqId.ToString()); } else if (canBeCollected) { // we can't accept this record, because it is potentially under already issued barrier responseItem->SetStatus(NKikimrProto::ERROR); @@ -82,8 +82,14 @@ namespace NKikimr::NBlobDepot { (AgentId, agent.ConnectedNodeId), (BlobSeqId, blobSeqId), (Value, value), (GivenIdRanges, Self->Channels[blobSeqId.Channel].GivenIdRanges), (Agent.GivenIdRanges, agent.GivenIdRanges[blobSeqId.Channel])); - agent.GivenIdRanges[blobSeqId.Channel].RemovePoint(value); - Self->Channels[blobSeqId.Channel].GivenIdRanges.RemovePoint(value); + + agent.GivenIdRanges[blobSeqId.Channel].RemovePoint(value, nullptr); + + bool wasLeast; + Self->Channels[blobSeqId.Channel].GivenIdRanges.RemovePoint(value, &wasLeast); + if (wasLeast) { + Self->Data->OnLeastExpectedBlobIdChange(blobSeqId.Channel); + } } bool CheckKeyAgainstBarrier(const TString& key, NKikimrBlobDepot::TEvCommitBlobSeqResult::TItem *responseItem) { diff --git a/ydb/core/blob_depot/types.h b/ydb/core/blob_depot/types.h index 8cf0e3899ab..7e6fd06c1b8 100644 --- a/ydb/core/blob_depot/types.h +++ b/ydb/core/blob_depot/types.h @@ -98,8 +98,8 @@ namespace NKikimr::NBlobDepot { class TGivenIdRange { struct TRange { - const ui64 Begin; - const ui64 End; + ui64 Begin; + ui64 End; ui32 NumSetBits = 0; TDynBitMap Bits; @@ -111,6 +111,16 @@ namespace NKikimr::NBlobDepot { Bits.Set(0, end - begin); } + static constexpr struct TZero {} Zero{}; + + TRange(ui64 begin, ui64 end, TZero) + : Begin(begin) + , End(end) + , NumSetBits(0) + { + Bits.Reset(0, end - begin); + } + struct TCompare { bool operator ()(const TRange& x, const TRange& y) const { return x.Begin < y.Begin; } bool operator ()(const TRange& x, ui64 y) const { return x.Begin < y; } @@ -126,7 +136,8 @@ namespace NKikimr::NBlobDepot { public: void IssueNewRange(ui64 begin, ui64 end); void AddPoint(ui64 value); - void RemovePoint(ui64 value); + void RemovePoint(ui64 value, bool *wasLeast); + bool GetPoint(ui64 value) const; bool IsEmpty() const; ui32 GetNumAvailableItems() const; @@ -134,12 +145,14 @@ namespace NKikimr::NBlobDepot { ui64 Allocate(); void Subtract(const TGivenIdRange& other); - - void Trim(ui8 channel, ui32 generation, ui32 invalidatedStep); + TGivenIdRange Trim(ui64 trimUpTo); void Output(IOutputStream& s) const; TString ToString() const; + std::vector<bool> ToDebugArray(size_t numItems) const; + void CheckConsistency() const; + private: void Pop(TRanges::iterator it, ui64 value); }; diff --git a/ydb/core/mind/hive/monitoring.cpp b/ydb/core/mind/hive/monitoring.cpp index 362466da547..cc3152f9145 100644 --- a/ydb/core/mind/hive/monitoring.cpp +++ b/ydb/core/mind/hive/monitoring.cpp @@ -1101,7 +1101,7 @@ public: case TTabletTypes::ReplicationController: return "RC"; case TTabletTypes::BlobDepot: - return "BS"; + return "BD"; default: return Sprintf("%d", (int)type); } |