diff options
author | alexvru <alexvru@ydb.tech> | 2022-11-25 13:23:28 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2022-11-25 13:23:28 +0300 |
commit | af0121b6a767c5b91416baff9cce349cc7ada49b (patch) | |
tree | 31d9996cd1f92ecbd4f94bf136860559c6a66a6d | |
parent | ac492518e2dc1c6e55af56bc7bf6af60c202dc63 (diff) | |
download | ydb-af0121b6a767c5b91416baff9cce349cc7ada49b.tar.gz |
Enable sequential blob write mode
-rw-r--r-- | ydb/core/blob_depot/agent.cpp | 103 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/agent_impl.h | 1 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/blocks.cpp | 1 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/comm.cpp | 21 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_put.cpp | 4 | ||||
-rw-r--r-- | ydb/core/blob_depot/blob_depot_tablet.h | 19 | ||||
-rw-r--r-- | ydb/core/blob_depot/blocks.cpp | 78 | ||||
-rw-r--r-- | ydb/core/blob_depot/data.cpp | 87 | ||||
-rw-r--r-- | ydb/core/blob_depot/data.h | 4 | ||||
-rw-r--r-- | ydb/core/blob_depot/data_trash.cpp | 43 | ||||
-rw-r--r-- | ydb/core/blob_depot/op_commit_blob_seq.cpp | 12 | ||||
-rw-r--r-- | ydb/core/protos/blob_depot.proto | 1 |
12 files changed, 237 insertions, 137 deletions
diff --git a/ydb/core/blob_depot/agent.cpp b/ydb/core/blob_depot/agent.cpp index a4c849aa9be..6de33e7ea2a 100644 --- a/ydb/core/blob_depot/agent.cpp +++ b/ydb/core/blob_depot/agent.cpp @@ -5,7 +5,8 @@ namespace NKikimr::NBlobDepot { void TBlobDepot::Handle(TEvTabletPipe::TEvServerConnected::TPtr ev) { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT01, "TEvServerConnected", (Id, GetLogId()), (PipeServerId, ev->Get()->ServerId)); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT01, "TEvServerConnected", (Id, GetLogId()), (ClientId, ev->Get()->ClientId), + (ServerId, ev->Get()->ServerId)); const auto [it, inserted] = PipeServerToNode.emplace(ev->Get()->ServerId, std::nullopt); Y_VERIFY(inserted); } @@ -16,14 +17,11 @@ namespace NKikimr::NBlobDepot { const auto it = PipeServerToNode.find(ev->Get()->ServerId); Y_VERIFY(it != PipeServerToNode.end()); if (const auto& nodeId = it->second) { - if (const auto agentIt = Agents.find(*nodeId); agentIt != Agents.end()) { - if (TAgent& agent = agentIt->second; agent.PipeServerId == it->first) { - OnAgentDisconnect(agent); - agent.PipeServerId.reset(); - agent.AgentId.reset(); - agent.ConnectedNodeId = 0; - agent.ExpirationTimestamp = TActivationContext::Now() + ExpirationTimeout; - } + if (const auto agentIt = Agents.find(*nodeId); agentIt != Agents.end() && agentIt->second.Connection && + agentIt->second.Connection->PipeServerId == it->first) { + OnAgentDisconnect(agentIt->second); + agentIt->second.Connection.reset(); + agentIt->second.ExpirationTimestamp = TActivationContext::Now() + ExpirationTimeout; } } PipeServerToNode.erase(it); @@ -56,9 +54,11 @@ namespace NKikimr::NBlobDepot { Y_VERIFY(!it->second || *it->second == nodeId); it->second = nodeId; auto& agent = Agents[nodeId]; - agent.PipeServerId = pipeServerId; - agent.AgentId = ev->Sender; - agent.ConnectedNodeId = nodeId; + agent.Connection = { + .PipeServerId = pipeServerId, + .AgentId = ev->Sender, + .NodeId = nodeId, + }; agent.ExpirationTimestamp = TInstant::Max(); agent.LastPushedSpaceColor = SpaceMonitor->GetSpaceColor(); agent.LastPushedApproximateFreeSpaceShare = SpaceMonitor->GetApproximateFreeSpaceShare(); @@ -89,6 +89,41 @@ namespace NKikimr::NBlobDepot { } TActivationContext::Send(response.release()); + + if (!agent.InvalidatedStepInFlight.empty()) { + const ui32 generation = Executor()->Generation(); + const ui64 id = ++agent.LastRequestId; + + auto reply = std::make_unique<TEvBlobDepot::TEvPushNotify>(); + auto& request = agent.InvalidateStepRequests[id]; + for (const auto& [channel, invalidatedStep] : agent.InvalidatedStepInFlight) { + auto *item = reply->Record.AddInvalidatedSteps(); + item->SetChannel(channel); + item->SetGeneration(generation); + item->SetInvalidatedStep(invalidatedStep); + request[channel] = invalidatedStep; + } + + std::vector<TActorId> blockActorsPending; + + for (const auto& [tabletId, data] : agent.BlockToDeliver) { + auto *item = reply->Record.AddBlockedTablets(); + item->SetTabletId(tabletId); + const auto& [blockedGeneration, issuerGuid, actorId] = data; + item->SetBlockedGeneration(blockedGeneration); + item->SetIssuerGuid(issuerGuid); + blockActorsPending.push_back(actorId); + } + + agent.PushCallbacks.emplace(id, [this, id, sender = ev->Sender, m = std::move(blockActorsPending)]( + TEvBlobDepot::TEvPushNotifyResult::TPtr ev) { + Data->OnPushNotifyResult(ev); + for (const TActorId& actorId : m) { + TActivationContext::Send(new IEventHandle(actorId, sender, new TEvBlobDepot::TEvPushNotifyResult, 0, id)); + } + }); + TActivationContext::Send(new IEventHandle(ev->Sender, ev->Recipient, reply.release(), 0, id)); + } } void TBlobDepot::OnAgentConnect(TAgent& /*agent*/) { @@ -155,16 +190,10 @@ namespace NKikimr::NBlobDepot { TAgent& agent = GetAgent(ev->Recipient); for (const auto& range : givenIdRange->GetChannelRanges()) { agent.GivenIdRanges[range.GetChannel()].IssueNewRange(range.GetBegin(), range.GetEnd()); - - auto& givenIdRanges = Channels[range.GetChannel()].GivenIdRanges; - const bool wasEmpty = givenIdRanges.IsEmpty(); - givenIdRanges.IssueNewRange(range.GetBegin(), range.GetEnd()); - if (wasEmpty) { - Data->OnLeastExpectedBlobIdChange(range.GetChannel()); - } + Channels[range.GetChannel()].GivenIdRanges.IssueNewRange(range.GetBegin(), range.GetEnd()); STLOG(PRI_DEBUG, BLOB_DEPOT, BDT05, "IssueNewRange", (Id, GetLogId()), - (AgentId, agent.ConnectedNodeId), (Channel, range.GetChannel()), + (AgentId, agent.Connection->NodeId), (Channel, range.GetChannel()), (Begin, range.GetBegin()), (End, range.GetEnd())); } } @@ -177,7 +206,7 @@ namespace NKikimr::NBlobDepot { Y_VERIFY(it != PipeServerToNode.end()); Y_VERIFY(it->second); TAgent& agent = GetAgent(*it->second); - Y_VERIFY(agent.PipeServerId == pipeServerId); + Y_VERIFY(agent.Connection && agent.Connection->PipeServerId == pipeServerId); return agent; } @@ -190,15 +219,22 @@ namespace NKikimr::NBlobDepot { void TBlobDepot::ResetAgent(TAgent& agent) { for (auto& [channel, agentGivenIdRange] : agent.GivenIdRanges) { - Channels[channel].GivenIdRanges.Subtract(agentGivenIdRange); - const ui32 channel_ = channel; - const auto& agentGivenIdRange_ = agentGivenIdRange; - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT06, "ResetAgent", (Id, GetLogId()), (AgentId, agent.ConnectedNodeId), - (Channel, channel_), (GivenIdRanges, Channels[channel_].GivenIdRanges), - (Agent.GivenIdRanges, agentGivenIdRange_)); - agentGivenIdRange = {}; - Data->OnLeastExpectedBlobIdChange(channel); + if (agentGivenIdRange.IsEmpty()) { + continue; + } + + // calculate if this agent can be blocking garbage collection by holding least conserved blob sequence id + const bool unblock = Channels[channel].GivenIdRanges.GetMinimumValue() == agentGivenIdRange.GetMinimumValue(); + + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT06, "ResetAgent", (Id, GetLogId()), (AgentId, agent.Connection->NodeId), + (Channel, channel), (GivenIdRanges, Channels[channel].GivenIdRanges), + (Agent.GivenIdRanges, agentGivenIdRange), (Unblock, unblock)); + + if (unblock) { + Data->OnLeastExpectedBlobIdChange(channel); + } } + agent.InvalidatedStepInFlight.clear(); } void TBlobDepot::InitChannelKinds() { @@ -263,7 +299,7 @@ namespace NKikimr::NBlobDepot { }; TAgent& agent = GetAgent(ev->Recipient); - Execute(std::make_unique<TTxInvokeCallback>(this, agent.ConnectedNodeId, ev)); + Execute(std::make_unique<TTxInvokeCallback>(this, agent.Connection->NodeId, ev)); } void TBlobDepot::ProcessRegisterAgentQ() { @@ -281,15 +317,14 @@ namespace NKikimr::NBlobDepot { void TBlobDepot::OnSpaceColorChange(NKikimrBlobStorage::TPDiskSpaceColor::E spaceColor, float approximateFreeSpaceShare) { for (auto& [nodeId, agent] : Agents) { - if (agent.AgentId && (agent.LastPushedSpaceColor != spaceColor || - agent.LastPushedApproximateFreeSpaceShare != approximateFreeSpaceShare)) { - Y_VERIFY(agent.ConnectedNodeId == nodeId); + if (agent.Connection && (agent.LastPushedSpaceColor != spaceColor || agent.LastPushedApproximateFreeSpaceShare != approximateFreeSpaceShare)) { + Y_VERIFY(agent.Connection->NodeId == nodeId); const ui64 id = ++agent.LastRequestId; agent.PushCallbacks.emplace(id, [](TEvBlobDepot::TEvPushNotifyResult::TPtr) {}); auto ev = std::make_unique<TEvBlobDepot::TEvPushNotify>(); ev->Record.SetSpaceColor(spaceColor); ev->Record.SetApproximateFreeSpaceShare(approximateFreeSpaceShare); - Send(*agent.AgentId, ev.release(), 0, id); + Send(agent.Connection->AgentId, ev.release(), 0, id); agent.LastPushedSpaceColor = spaceColor; agent.LastPushedApproximateFreeSpaceShare = approximateFreeSpaceShare; } diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h index 70e33e18c8b..fbebb7a2d2a 100644 --- a/ydb/core/blob_depot/agent/agent_impl.h +++ b/ydb/core/blob_depot/agent/agent_impl.h @@ -85,6 +85,7 @@ namespace NKikimr::NBlobDepot { const ui64 AgentInstanceId; ui64 TabletId = Max<ui64>(); TActorId PipeId; + TActorId PipeServerId; bool IsConnected = false; private: diff --git a/ydb/core/blob_depot/agent/blocks.cpp b/ydb/core/blob_depot/agent/blocks.cpp index 61a5cea25d7..16ec06e9dbb 100644 --- a/ydb/core/blob_depot/agent/blocks.cpp +++ b/ydb/core/blob_depot/agent/blocks.cpp @@ -89,6 +89,7 @@ namespace NKikimr::NBlobDepot { if (const auto it = Blocks.find(tablet.GetTabletId()); it != Blocks.end()) { auto& block = it->second; block.BlockedGeneration = tablet.GetBlockedGeneration(); + block.IssuerGuid = tablet.GetIssuerGuid(); block.ExpirationTimestamp = TMonotonic::Zero(); IssueOnUpdateBlock(block, true); } diff --git a/ydb/core/blob_depot/agent/comm.cpp b/ydb/core/blob_depot/agent/comm.cpp index 63ce937f32d..c34ba23af67 100644 --- a/ydb/core/blob_depot/agent/comm.cpp +++ b/ydb/core/blob_depot/agent/comm.cpp @@ -4,14 +4,21 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::Handle(TEvTabletPipe::TEvClientConnected::TPtr ev) { + auto& msg = *ev->Get(); STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA03, "TEvClientConnected", (VirtualGroupId, VirtualGroupId), - (Msg, ev->Get()->ToString())); + (TabletId, msg.TabletId), (Status, msg.Status), (ClientId, msg.ClientId), (ServerId, msg.ServerId)); + Y_VERIFY_DEBUG_S(msg.Status == NKikimrProto::OK, "Status# " << NKikimrProto::EReplyStatus_Name(msg.Status)); + if (msg.Status != NKikimrProto::OK) { + ConnectToBlobDepot(); + } else { + PipeServerId = msg.ServerId; + } } void TBlobDepotAgent::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev) { STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA04, "TEvClientDestroyed", (VirtualGroupId, VirtualGroupId), (Msg, ev->Get()->ToString())); - PipeId = {}; + PipeId = PipeServerId = {}; OnDisconnect(); ConnectToBlobDepot(); } @@ -169,12 +176,14 @@ namespace NKikimr::NBlobDepot { } void TBlobDepotAgent::Handle(TEvBlobDepot::TEvPushNotify::TPtr ev) { - auto response = std::make_unique<TEvBlobDepot::TEvPushNotifyResult>(); - auto& msg = ev->Get()->Record; - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA11, "TEvPushNotify", (VirtualGroupId, VirtualGroupId), (Msg, msg), - (Id, ev->Cookie)); + (Id, ev->Cookie), (Sender, ev->Sender), (PipeServerId, PipeServerId)); + if (ev->Sender != PipeServerId) { + return; // race with previous connection + } + + auto response = std::make_unique<TEvBlobDepot::TEvPushNotifyResult>(); BlocksManager.OnBlockedTablets(msg.GetBlockedTablets()); diff --git a/ydb/core/blob_depot/agent/storage_put.cpp b/ydb/core/blob_depot/agent/storage_put.cpp index 226ab8f241c..ad964fc7832 100644 --- a/ydb/core/blob_depot/agent/storage_put.cpp +++ b/ydb/core/blob_depot/agent/storage_put.cpp @@ -7,7 +7,7 @@ namespace NKikimr::NBlobDepot { TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvPut>(std::unique_ptr<IEventHandle> ev) { class TPutQuery : public TBlobStorageQuery<TEvBlobStorage::TEvPut> { const bool SuppressFooter = true; - const bool IssueUncertainWrites = true; + const bool IssueUncertainWrites = false; std::vector<ui32> BlockChecksRemain; ui32 PutsInFlight = 0; @@ -178,7 +178,7 @@ namespace NKikimr::NBlobDepot { } auto& kind = it->second; const size_t numErased = kind.WritesInFlight.erase(BlobSeqId); - Y_VERIFY(numErased); + Y_VERIFY(numErased || BlobSeqId.Generation < Agent.BlobDepotGeneration); } void OnUpdateBlock(bool success) override { diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h index 55e6a11b054..5b2014e8ebb 100644 --- a/ydb/core/blob_depot/blob_depot_tablet.h +++ b/ydb/core/blob_depot/blob_depot_tablet.h @@ -53,9 +53,13 @@ namespace NKikimr::NBlobDepot { THashMap<TActorId, std::deque<std::unique_ptr<IEventHandle>>> RegisterAgentQ; struct TAgent { - std::optional<TActorId> PipeServerId; - std::optional<TActorId> AgentId; - ui32 ConnectedNodeId; + struct TConnection { + TActorId PipeServerId; + TActorId AgentId; + ui32 NodeId; + }; + + std::optional<TConnection> Connection; TInstant ExpirationTimestamp; std::optional<ui64> AgentInstanceId; @@ -63,6 +67,9 @@ namespace NKikimr::NBlobDepot { THashMap<ui8, ui32> InvalidatedStepInFlight; THashMap<ui64, THashMap<ui8, ui32>> InvalidateStepRequests; + + THashMap<ui64, std::tuple<ui32, ui64, TActorId>> BlockToDeliver; // TabletId -> (BlockedGeneration, IssuerGuid, ActorId) + THashMap<ui64, std::function<void(TEvBlobDepot::TEvPushNotifyResult::TPtr)>> PushCallbacks; ui64 LastRequestId = 0; @@ -81,6 +88,12 @@ namespace NKikimr::NBlobDepot { TChannelKind *KindPtr; TGivenIdRange GivenIdRanges; // accumulated through all agents ui64 NextBlobSeqId = 0; + + // Obtain the least BlobSeqId that is not yet confirmed, but may be written by any agent + TBlobSeqId GetLeastExpectedBlobId(ui32 generation) const { + return TBlobSeqId::FromSequentalNumber(Index, generation, GivenIdRanges.IsEmpty() ? NextBlobSeqId : + GivenIdRanges.GetMinimumValue()); + } }; std::vector<TChannelInfo> Channels; diff --git a/ydb/core/blob_depot/blocks.cpp b/ydb/core/blob_depot/blocks.cpp index e09310651b9..5beab7bef6a 100644 --- a/ydb/core/blob_depot/blocks.cpp +++ b/ydb/core/blob_depot/blocks.cpp @@ -55,6 +55,12 @@ namespace NKikimr::NBlobDepot { }; class TBlobDepot::TBlocksManager::TBlockProcessorActor : public TActorBootstrapped<TBlockProcessorActor> { + struct TEvPrivate { + enum { + EvCheckWaitingNode = EventSpaceBegin(TEvents::ES_PRIVATE), + }; + }; + TBlobDepot* const Self; const ui64 TabletId; const ui32 BlockedGeneration; @@ -113,27 +119,56 @@ namespace NKikimr::NBlobDepot { // skip the origin agent continue; } - if (info.ExpirationTimestamp <= now) { - SendPushToAgent(agentId); + if (info.ExpirationTimestamp <= now) { // includes case when agent is connected right now + TAgent& agent = Self->GetAgent(agentId); + + // enqueue push notification + const auto [it, inserted] = agent.BlockToDeliver.try_emplace(TabletId, BlockedGeneration, IssuerGuid, SelfId()); + if (!inserted) { + const auto& [currentBlockedGeneration, _1, _2] = it->second; + Y_VERIFY(currentBlockedGeneration < BlockedGeneration); + it->second = {BlockedGeneration, IssuerGuid, SelfId()}; + } + + // add node to wait list; also start timer to remove this node from the wait queue + NodesWaitingForPushResult.insert(agentId); + Y_VERIFY(info.ExpirationTimestamp <= now); + TActivationContext::Schedule(info.ExpirationTimestamp, new IEventHandle(TEvPrivate::EvCheckWaitingNode, + 0, SelfId(), {}, nullptr, agentId)); + + // issue message to the connected node, if it is + if (const auto& connection = agent.Connection) { + auto ev = std::make_unique<TEvBlobDepot::TEvPushNotify>(); + auto *item = ev->Record.AddBlockedTablets(); + item->SetTabletId(TabletId); + item->SetBlockedGeneration(BlockedGeneration); + item->SetIssuerGuid(IssuerGuid); + + const ui64 id = ++agent.LastRequestId; + agent.PushCallbacks.emplace(id, [selfId = SelfId()](TEvBlobDepot::TEvPushNotifyResult::TPtr ev) { + TActivationContext::Send(ev->Forward(selfId)); + }); + TActivationContext::Send(new IEventHandle(connection->AgentId, connection->PipeServerId, ev.release(), 0, id)); + } } } } - void SendPushToAgent(ui32 agentId) { - auto ev = std::make_unique<TEvBlobDepot::TEvPushNotify>(); - auto *item = ev->Record.AddBlockedTablets(); - item->SetTabletId(TabletId); - item->SetBlockedGeneration(BlockedGeneration); - - TAgent& agent = Self->GetAgent(agentId); - if (const auto& actorId = agent.AgentId) { - const ui64 id = ++agent.LastRequestId; - agent.PushCallbacks.emplace(id, [selfId = SelfId()](TEvBlobDepot::TEvPushNotifyResult::TPtr ev) { - TActivationContext::Send(ev->Forward(selfId)); - }); - Send(*actorId, ev.release(), 0, id); + void HandleCheckWaitingNode(TAutoPtr<IEventHandle> ev) { + const ui32 agentId = ev->Cookie; + if (NodesWaitingForPushResult.contains(agentId)) { + const TMonotonic now = TActivationContext::Monotonic(); + const auto& info = Self->BlocksManager->Blocks[TabletId].PerAgentInfo[agentId]; + if (info.ExpirationTimestamp <= now) { // node still can write data for this tablet, reschedule timer + TActivationContext::Schedule(info.ExpirationTimestamp, new IEventHandle(TEvPrivate::EvCheckWaitingNode, + 0, SelfId(), {}, nullptr, agentId)); + } else { + NodesWaitingForPushResult.erase(agentId); + if (NodesWaitingForPushResult.empty()) { + Finish(); + } + } } - NodesWaitingForPushResult.insert(agentId); } void Handle(TEvBlobDepot::TEvPushNotifyResult::TPtr ev) { @@ -145,6 +180,11 @@ namespace NKikimr::NBlobDepot { auto& block = Self->BlocksManager->Blocks[TabletId]; block.PerAgentInfo.erase(agentId); + TAgent& agent = Self->GetAgent(agentId); + const auto it = agent.BlockToDeliver.find(TabletId); + Y_VERIFY(it != agent.BlockToDeliver.end() && it->second == std::make_tuple(BlockedGeneration, IssuerGuid, SelfId())); + agent.BlockToDeliver.erase(it); + if (NodesWaitingForPushResult.empty()) { Finish(); } @@ -215,6 +255,7 @@ namespace NKikimr::NBlobDepot { switch (ev->GetTypeRewrite()) { hFunc(TEvBlobStorage::TEvBlockResult, Handle); hFunc(TEvBlobDepot::TEvPushNotifyResult, Handle); + fFunc(TEvPrivate::EvCheckWaitingNode, HandleCheckWaitingNode); cFunc(TEvents::TSystem::Wakeup, IssueBlocksToStorage); cFunc(TEvents::TSystem::Poison, PassAway); } @@ -263,7 +304,7 @@ namespace NKikimr::NBlobDepot { if (it == Blocks.end() || it->second.CanSetNewBlock(blockedGeneration, issuerGuid)) { TAgent& agent = Self->GetAgent(ev->Recipient); Self->Execute(std::make_unique<TTxUpdateBlock>(Self, tabletId, blockedGeneration, - agent.ConnectedNodeId, record.GetIssuerGuid(), TActivationContext::Now(), std::move(response))); + agent.Connection->NodeId, record.GetIssuerGuid(), TActivationContext::Now(), std::move(response))); } else { responseRecord->SetStatus(NKikimrProto::ALREADY); } @@ -274,8 +315,7 @@ namespace NKikimr::NBlobDepot { void TBlobDepot::TBlocksManager::Handle(TEvBlobDepot::TEvQueryBlocks::TPtr ev) { TAgent& agent = Self->GetAgent(ev->Recipient); - const ui32 agentId = agent.ConnectedNodeId; - Y_VERIFY(agentId); + const ui32 agentId = agent.Connection->NodeId; const TMonotonic now = TActivationContext::Monotonic(); diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp index e129cade51c..762f7928a37 100644 --- a/ydb/core/blob_depot/data.cpp +++ b/ydb/core/blob_depot/data.cpp @@ -120,19 +120,13 @@ namespace NKikimr::NBlobDepot { EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), [&](TLogoBlobID id, ui32, ui32) { const auto [it, inserted] = RefCount.try_emplace(id); if (inserted) { - // first mention of this id - auto& record = GetRecordsPerChannelGroup(id); - const auto [_, inserted] = record.Used.insert(id); - Y_VERIFY(inserted); - AccountBlob(id, 1); - TotalStoredDataSize += id.BlobSize(); - - // blob is first mentioned and deleted as well - if (outcome == EUpdateOutcome::DROP) { + AddFirstMentionedBlob(id); + } + if (outcome == EUpdateOutcome::DROP) { + if (inserted) { deleteQ.push_back(id); } - } - if (outcome != EUpdateOutcome::DROP) { + } else { ++it->second; } }); @@ -277,12 +271,8 @@ namespace NKikimr::NBlobDepot { const auto [it, inserted] = Data.try_emplace(std::move(key), std::move(proto), uncertainWrite); if (inserted) { EnumerateBlobsForValueChain(it->second.ValueChain, Self->TabletID(), [&](TLogoBlobID id, ui32, ui32) { - if (!RefCount[id]++) { // first mention of this id - auto& record = GetRecordsPerChannelGroup(id); - const auto [_, inserted] = record.Used.insert(id); - Y_VERIFY(inserted); - AccountBlob(id, 1); - TotalStoredDataSize += id.BlobSize(); + if (!RefCount[id]++) { + AddFirstMentionedBlob(id); } }); } @@ -358,51 +348,58 @@ namespace NKikimr::NBlobDepot { const ui32 generation = Self->Executor()->Generation(); - std::set<TBlobSeqId> writesInFlight; + std::vector<TBlobSeqId> writesInFlight; for (const auto& item : ev->Get()->Record.GetWritesInFlight()) { - writesInFlight.insert(TBlobSeqId::FromProto(item)); + writesInFlight.push_back(TBlobSeqId::FromProto(item)); } + std::sort(writesInFlight.begin(), writesInFlight.end()); for (const auto& [channel, invalidatedStep] : items) { const ui32 channel_ = channel; - const ui32 invalidatedStep_ = invalidatedStep; auto& agentGivenIdRanges = agent.GivenIdRanges[channel]; auto& givenIdRanges = Self->Channels[channel].GivenIdRanges; - auto begin = writesInFlight.lower_bound(TBlobSeqId{channel, 0, 0, 0}); - auto end = writesInFlight.upper_bound(TBlobSeqId{channel, Max<ui32>(), Max<ui32>(), TBlobSeqId::MaxIndex}); + auto begin = std::lower_bound(writesInFlight.begin(), writesInFlight.end(), TBlobSeqId{channel, 0, 0, 0}); auto makeWritesInFlight = [&] { TStringStream s; s << "["; - for (auto it = begin; it != end; ++it) { + for (auto it = begin; it != writesInFlight.end() && it->Channel == channel_; ++it) { s << (it != begin ? " " : "") << it->ToString(); } s << "]"; return s.Str(); }; - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT13, "Trim", (Id, Self->GetLogId()), (AgentId, agent.ConnectedNodeId), - (Id, ev->Cookie), (Channel, channel_), (InvalidatedStep, invalidatedStep_), - (GivenIdRanges, Self->Channels[channel_].GivenIdRanges), - (Agent.GivenIdRanges, agent.GivenIdRanges[channel_]), + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT13, "Trim", (Id, Self->GetLogId()), (AgentId, agent.Connection->NodeId), + (Id, ev->Cookie), (Channel, channel), (InvalidatedStep, invalidatedStep), + (GivenIdRanges, Self->Channels[channel].GivenIdRanges), + (Agent.GivenIdRanges, agent.GivenIdRanges[channel]), (WritesInFlight, makeWritesInFlight())); - for (auto it = begin; it != end; ++it) { + // sanity check -- ensure that current writes in flight would be conserved when processing garbage + for (auto it = begin; it != writesInFlight.end() && it->Channel == channel; ++it) { Y_VERIFY_S(agentGivenIdRanges.GetPoint(it->ToSequentialNumber()), "blobSeqId# " << it->ToString()); Y_VERIFY_S(givenIdRanges.GetPoint(it->ToSequentialNumber()), "blobSeqId# " << it->ToString()); } + const TBlobSeqId leastExpectedBlobIdBefore = Self->Channels[channel].GetLeastExpectedBlobId(generation); + const TBlobSeqId trimmedBlobSeqId{channel, generation, invalidatedStep, TBlobSeqId::MaxIndex}; const ui64 validSince = trimmedBlobSeqId.ToSequentialNumber() + 1; givenIdRanges.Subtract(agentGivenIdRanges.Trim(validSince)); - for (auto it = begin; it != end; ++it) { + for (auto it = begin; it != writesInFlight.end() && it->Channel == channel; ++it) { agentGivenIdRanges.AddPoint(it->ToSequentialNumber()); givenIdRanges.AddPoint(it->ToSequentialNumber()); } - OnLeastExpectedBlobIdChange(channel); + const TBlobSeqId leastExpectedBlobIdAfter = Self->Channels[channel].GetLeastExpectedBlobId(generation); + Y_VERIFY(leastExpectedBlobIdBefore <= leastExpectedBlobIdAfter); + + if (leastExpectedBlobIdBefore != leastExpectedBlobIdAfter) { + OnLeastExpectedBlobIdChange(channel); + } } } @@ -429,6 +426,14 @@ namespace NKikimr::NBlobDepot { return finished; } + void TData::AddFirstMentionedBlob(TLogoBlobID id) { + auto& record = GetRecordsPerChannelGroup(id); + const auto [_, inserted] = record.Used.insert(id); + Y_VERIFY(inserted); + AccountBlob(id, true); + TotalStoredDataSize += id.BlobSize(); + } + void TData::AccountBlob(TLogoBlobID id, bool add) { // account record const ui32 groupId = Self->Info()->GroupFor(id.Channel(), id.Generation()); @@ -446,21 +451,12 @@ namespace NKikimr::NBlobDepot { } void TData::OnLeastExpectedBlobIdChange(ui8 channel) { - auto& ch = Self->Channels[channel]; - const ui64 minSequenceNumber = ch.GivenIdRanges.IsEmpty() - ? ch.NextBlobSeqId - : ch.GivenIdRanges.GetMinimumValue(); - const TBlobSeqId leastExpectedBlobId = TBlobSeqId::FromSequentalNumber(channel, Self->Executor()->Generation(), - minSequenceNumber); - - const TTabletStorageInfo *info = Self->Info(); - const TTabletChannelInfo *storageChannel = info->ChannelInfo(leastExpectedBlobId.Channel); + const TTabletChannelInfo *storageChannel = Self->Info()->ChannelInfo(channel); Y_VERIFY(storageChannel); for (const auto& entry : storageChannel->History) { const auto& key = std::make_tuple(storageChannel->Channel, entry.GroupID); auto [it, _] = RecordsPerChannelGroup.emplace(std::piecewise_construct, key, key); - auto& record = it->second; - record.OnLeastExpectedBlobIdChange(this, leastExpectedBlobId); + it->second.OnLeastExpectedBlobIdChange(this); } } @@ -481,13 +477,8 @@ namespace NKikimr::NBlobDepot { LastConfirmedGenStep = IssuedGenStep; } - void TData::TRecordsPerChannelGroup::OnLeastExpectedBlobIdChange(TData *self, TBlobSeqId leastExpectedBlobId) { - Y_VERIFY_S(LeastExpectedBlobId <= leastExpectedBlobId, "Prev# " << LeastExpectedBlobId.ToString() - << " Next# " << leastExpectedBlobId.ToString()); - if (LeastExpectedBlobId < leastExpectedBlobId) { - LeastExpectedBlobId = leastExpectedBlobId; - CollectIfPossible(self); - } + void TData::TRecordsPerChannelGroup::OnLeastExpectedBlobIdChange(TData *self) { + CollectIfPossible(self); } void TData::TRecordsPerChannelGroup::ClearInFlight(TData *self) { diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h index cdb380d7295..54103f8da68 100644 --- a/ydb/core/blob_depot/data.h +++ b/ydb/core/blob_depot/data.h @@ -329,7 +329,6 @@ namespace NKikimr::NBlobDepot { TGenStep IssuedGenStep; // currently in flight or already confirmed TGenStep LastConfirmedGenStep; bool CollectGarbageRequestInFlight = false; - TBlobSeqId LeastExpectedBlobId; TRecordsPerChannelGroup(ui8 channel, ui32 groupId) : Channel(channel) @@ -338,7 +337,7 @@ namespace NKikimr::NBlobDepot { void MoveToTrash(TLogoBlobID id); void OnSuccessfulCollect(TData *self); - void OnLeastExpectedBlobIdChange(TData *self, TBlobSeqId leastExpectedBlobId); + void OnLeastExpectedBlobIdChange(TData *self); void ClearInFlight(TData *self); void CollectIfPossible(TData *self); }; @@ -457,6 +456,7 @@ namespace NKikimr::NBlobDepot { bool OnBarrierShift(ui64 tabletId, ui8 channel, bool hard, TGenStep previous, TGenStep current, ui32& maxItems, NTabletFlatExecutor::TTransactionContext& txc, void *cookie); + void AddFirstMentionedBlob(TLogoBlobID id); void AccountBlob(TLogoBlobID id, bool add); bool CanBeCollected(ui32 groupId, TBlobSeqId id) const; diff --git a/ydb/core/blob_depot/data_trash.cpp b/ydb/core/blob_depot/data_trash.cpp index 4957a88cc7e..bb46c517136 100644 --- a/ydb/core/blob_depot/data_trash.cpp +++ b/ydb/core/blob_depot/data_trash.cpp @@ -33,9 +33,10 @@ namespace NKikimr::NBlobDepot { std::set<TLogoBlobID>::iterator trashEndIter = record.Trash.end(); // step we are going to invalidate (including blobs with this one) - if (TGenStep(record.LeastExpectedBlobId) <= nextGenStep) { - const ui32 invalidatedStep = nextGenStep.Step(); // the step we want to invalidate and garbage collect + TBlobSeqId leastExpectedBlobId = channel.GetLeastExpectedBlobId(generation); + const ui32 invalidatedStep = nextGenStep.Step(); // the step we want to invalidate and garbage collect + if (TGenStep(leastExpectedBlobId) <= nextGenStep) { // remove invalidated step from allocations auto blobSeqId = TBlobSeqId::FromSequentalNumber(record.Channel, generation, channel.NextBlobSeqId); Y_VERIFY(record.LastConfirmedGenStep < TGenStep(blobSeqId)); @@ -45,29 +46,33 @@ namespace NKikimr::NBlobDepot { channel.NextBlobSeqId = blobSeqId.ToSequentialNumber(); } - // issue notifications to agents + // recalculate least expected blob id -- it may change if the given id set was empty + leastExpectedBlobId = channel.GetLeastExpectedBlobId(generation); + } + + if (TGenStep(leastExpectedBlobId) <= nextGenStep) { + // issue notifications to agents -- we want to trim their ids for (auto& [agentId, agent] : Self->Agents) { - if (!agent.AgentId) { - continue; - } const auto [it, inserted] = agent.InvalidatedStepInFlight.emplace(record.Channel, invalidatedStep); if (inserted || it->second < invalidatedStep) { it->second = invalidatedStep; - auto& ev = outbox[agentId]; - if (!ev) { - ev.reset(new TEvBlobDepot::TEvPushNotify); + if (agent.Connection) { + auto& ev = outbox[agentId]; + if (!ev) { + ev.reset(new TEvBlobDepot::TEvPushNotify); + } + auto *item = ev->Record.AddInvalidatedSteps(); + item->SetChannel(record.Channel); + item->SetGeneration(generation); + item->SetInvalidatedStep(invalidatedStep); } - auto *item = ev->Record.AddInvalidatedSteps(); - item->SetChannel(record.Channel); - item->SetGeneration(generation); - item->SetInvalidatedStep(invalidatedStep); } } - // adjust the barrier to keep it safe now - const TLogoBlobID maxId(Self->TabletID(), record.LeastExpectedBlobId.Generation, - record.LeastExpectedBlobId.Step, record.Channel, 0, 0); + // adjust the barrier to keep it safe now (till we trim ids) + const TLogoBlobID maxId(Self->TabletID(), leastExpectedBlobId.Generation, + leastExpectedBlobId.Step, record.Channel, 0, 0); trashEndIter = record.Trash.lower_bound(maxId); if (trashEndIter != record.Trash.begin()) { nextGenStep = TGenStep(*std::prev(trashEndIter)); @@ -120,7 +125,7 @@ namespace NKikimr::NBlobDepot { STLOG(PRI_DEBUG, BLOB_DEPOT, BDT11, "issuing TEvCollectGarbage", (Id, Self->GetLogId()), (Channel, int(record.Channel)), (GroupId, record.GroupId), (Msg, ev->ToString()), (LastConfirmedGenStep, record.LastConfirmedGenStep), (IssuedGenStep, record.IssuedGenStep), - (LeastExpectedBlobId, record.LeastExpectedBlobId), (TrashInFlight.size, record.TrashInFlight.size())); + (LeastExpectedBlobId, leastExpectedBlobId), (TrashInFlight.size, record.TrashInFlight.size())); const ui64 id = ++LastCollectCmdId; CollectCmdToGroup.emplace(id, record.GroupId); @@ -140,9 +145,9 @@ namespace NKikimr::NBlobDepot { request[item.GetChannel()] = item.GetInvalidatedStep(); } - Y_VERIFY(agent.AgentId); + Y_VERIFY(agent.Connection); agent.PushCallbacks.emplace(id, std::bind(&TData::OnPushNotifyResult, this, std::placeholders::_1)); - TActivationContext::Send(new IEventHandle(*agent.AgentId, Self->SelfId(), ev.release(), 0, id)); + TActivationContext::Send(new IEventHandle(agent.Connection->AgentId, agent.Connection->PipeServerId, ev.release(), 0, id)); } } diff --git a/ydb/core/blob_depot/op_commit_blob_seq.cpp b/ydb/core/blob_depot/op_commit_blob_seq.cpp index ac7196b2043..60195b15090 100644 --- a/ydb/core/blob_depot/op_commit_blob_seq.cpp +++ b/ydb/core/blob_depot/op_commit_blob_seq.cpp @@ -19,6 +19,11 @@ namespace NKikimr::NBlobDepot { {} bool Execute(TTransactionContext& txc, const TActorContext&) override { + TAgent& agent = Self->GetAgent(NodeId); + if (!agent.Connection) { // agent disconnected while transaction was in queue -- drop this request + return true; + } + if (!LoadMissingKeys(txc)) { return false; } @@ -28,7 +33,6 @@ namespace NKikimr::NBlobDepot { NKikimrBlobDepot::TEvCommitBlobSeqResult *responseRecord; std::tie(Response, responseRecord) = TEvBlobDepot::MakeResponseFor(*Request, Self->SelfId()); - TAgent& agent = Self->GetAgent(NodeId); const ui32 generation = Self->Executor()->Generation(); for (const auto& item : Request->Get()->Record.GetItems()) { @@ -120,7 +124,7 @@ namespace NKikimr::NBlobDepot { const ui64 value = blobSeqId.ToSequentialNumber(); STLOG(PRI_DEBUG, BLOB_DEPOT, BDT18, "MarkGivenIdCommitted", (Id, Self->GetLogId()), - (AgentId, agent.ConnectedNodeId), (BlobSeqId, blobSeqId), (Value, value), + (AgentId, agent.Connection->NodeId), (BlobSeqId, blobSeqId), (Value, value), (GivenIdRanges, Self->Channels[blobSeqId.Channel].GivenIdRanges), (Agent.GivenIdRanges, agent.GivenIdRanges[blobSeqId.Channel])); @@ -161,7 +165,7 @@ namespace NKikimr::NBlobDepot { }; TAgent& agent = GetAgent(ev->Recipient); - Execute(std::make_unique<TTxCommitBlobSeq>(this, agent.ConnectedNodeId, + Execute(std::make_unique<TTxCommitBlobSeq>(this, agent.Connection->NodeId, std::unique_ptr<TEvBlobDepot::TEvCommitBlobSeq::THandle>(ev.Release()))); } @@ -169,7 +173,7 @@ namespace NKikimr::NBlobDepot { TAgent& agent = GetAgent(ev->Recipient); const ui32 generation = Executor()->Generation(); - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT57, "TEvDiscardSpoiledBlobSeq", (Id, GetLogId()), (AgentId, agent.ConnectedNodeId), + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT57, "TEvDiscardSpoiledBlobSeq", (Id, GetLogId()), (AgentId, agent.Connection->NodeId), (Msg, ev->Get()->Record)); // FIXME(alexvru): delete uncertain keys containing this BlobSeqId as they were never written diff --git a/ydb/core/protos/blob_depot.proto b/ydb/core/protos/blob_depot.proto index ade8c571245..47ddf8ae62a 100644 --- a/ydb/core/protos/blob_depot.proto +++ b/ydb/core/protos/blob_depot.proto @@ -115,6 +115,7 @@ message TEvPushNotify { message TBlockedTablet { optional fixed64 TabletId = 1; optional uint32 BlockedGeneration = 2; + optional uint64 IssuerGuid = 3; } message TInvalidatedStep { optional uint32 Channel = 1; |