aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2022-11-25 13:23:28 +0300
committeralexvru <alexvru@ydb.tech>2022-11-25 13:23:28 +0300
commitaf0121b6a767c5b91416baff9cce349cc7ada49b (patch)
tree31d9996cd1f92ecbd4f94bf136860559c6a66a6d
parentac492518e2dc1c6e55af56bc7bf6af60c202dc63 (diff)
downloadydb-af0121b6a767c5b91416baff9cce349cc7ada49b.tar.gz
Enable sequential blob write mode
-rw-r--r--ydb/core/blob_depot/agent.cpp103
-rw-r--r--ydb/core/blob_depot/agent/agent_impl.h1
-rw-r--r--ydb/core/blob_depot/agent/blocks.cpp1
-rw-r--r--ydb/core/blob_depot/agent/comm.cpp21
-rw-r--r--ydb/core/blob_depot/agent/storage_put.cpp4
-rw-r--r--ydb/core/blob_depot/blob_depot_tablet.h19
-rw-r--r--ydb/core/blob_depot/blocks.cpp78
-rw-r--r--ydb/core/blob_depot/data.cpp87
-rw-r--r--ydb/core/blob_depot/data.h4
-rw-r--r--ydb/core/blob_depot/data_trash.cpp43
-rw-r--r--ydb/core/blob_depot/op_commit_blob_seq.cpp12
-rw-r--r--ydb/core/protos/blob_depot.proto1
12 files changed, 237 insertions, 137 deletions
diff --git a/ydb/core/blob_depot/agent.cpp b/ydb/core/blob_depot/agent.cpp
index a4c849aa9be..6de33e7ea2a 100644
--- a/ydb/core/blob_depot/agent.cpp
+++ b/ydb/core/blob_depot/agent.cpp
@@ -5,7 +5,8 @@
namespace NKikimr::NBlobDepot {
void TBlobDepot::Handle(TEvTabletPipe::TEvServerConnected::TPtr ev) {
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT01, "TEvServerConnected", (Id, GetLogId()), (PipeServerId, ev->Get()->ServerId));
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT01, "TEvServerConnected", (Id, GetLogId()), (ClientId, ev->Get()->ClientId),
+ (ServerId, ev->Get()->ServerId));
const auto [it, inserted] = PipeServerToNode.emplace(ev->Get()->ServerId, std::nullopt);
Y_VERIFY(inserted);
}
@@ -16,14 +17,11 @@ namespace NKikimr::NBlobDepot {
const auto it = PipeServerToNode.find(ev->Get()->ServerId);
Y_VERIFY(it != PipeServerToNode.end());
if (const auto& nodeId = it->second) {
- if (const auto agentIt = Agents.find(*nodeId); agentIt != Agents.end()) {
- if (TAgent& agent = agentIt->second; agent.PipeServerId == it->first) {
- OnAgentDisconnect(agent);
- agent.PipeServerId.reset();
- agent.AgentId.reset();
- agent.ConnectedNodeId = 0;
- agent.ExpirationTimestamp = TActivationContext::Now() + ExpirationTimeout;
- }
+ if (const auto agentIt = Agents.find(*nodeId); agentIt != Agents.end() && agentIt->second.Connection &&
+ agentIt->second.Connection->PipeServerId == it->first) {
+ OnAgentDisconnect(agentIt->second);
+ agentIt->second.Connection.reset();
+ agentIt->second.ExpirationTimestamp = TActivationContext::Now() + ExpirationTimeout;
}
}
PipeServerToNode.erase(it);
@@ -56,9 +54,11 @@ namespace NKikimr::NBlobDepot {
Y_VERIFY(!it->second || *it->second == nodeId);
it->second = nodeId;
auto& agent = Agents[nodeId];
- agent.PipeServerId = pipeServerId;
- agent.AgentId = ev->Sender;
- agent.ConnectedNodeId = nodeId;
+ agent.Connection = {
+ .PipeServerId = pipeServerId,
+ .AgentId = ev->Sender,
+ .NodeId = nodeId,
+ };
agent.ExpirationTimestamp = TInstant::Max();
agent.LastPushedSpaceColor = SpaceMonitor->GetSpaceColor();
agent.LastPushedApproximateFreeSpaceShare = SpaceMonitor->GetApproximateFreeSpaceShare();
@@ -89,6 +89,41 @@ namespace NKikimr::NBlobDepot {
}
TActivationContext::Send(response.release());
+
+ if (!agent.InvalidatedStepInFlight.empty()) {
+ const ui32 generation = Executor()->Generation();
+ const ui64 id = ++agent.LastRequestId;
+
+ auto reply = std::make_unique<TEvBlobDepot::TEvPushNotify>();
+ auto& request = agent.InvalidateStepRequests[id];
+ for (const auto& [channel, invalidatedStep] : agent.InvalidatedStepInFlight) {
+ auto *item = reply->Record.AddInvalidatedSteps();
+ item->SetChannel(channel);
+ item->SetGeneration(generation);
+ item->SetInvalidatedStep(invalidatedStep);
+ request[channel] = invalidatedStep;
+ }
+
+ std::vector<TActorId> blockActorsPending;
+
+ for (const auto& [tabletId, data] : agent.BlockToDeliver) {
+ auto *item = reply->Record.AddBlockedTablets();
+ item->SetTabletId(tabletId);
+ const auto& [blockedGeneration, issuerGuid, actorId] = data;
+ item->SetBlockedGeneration(blockedGeneration);
+ item->SetIssuerGuid(issuerGuid);
+ blockActorsPending.push_back(actorId);
+ }
+
+ agent.PushCallbacks.emplace(id, [this, id, sender = ev->Sender, m = std::move(blockActorsPending)](
+ TEvBlobDepot::TEvPushNotifyResult::TPtr ev) {
+ Data->OnPushNotifyResult(ev);
+ for (const TActorId& actorId : m) {
+ TActivationContext::Send(new IEventHandle(actorId, sender, new TEvBlobDepot::TEvPushNotifyResult, 0, id));
+ }
+ });
+ TActivationContext::Send(new IEventHandle(ev->Sender, ev->Recipient, reply.release(), 0, id));
+ }
}
void TBlobDepot::OnAgentConnect(TAgent& /*agent*/) {
@@ -155,16 +190,10 @@ namespace NKikimr::NBlobDepot {
TAgent& agent = GetAgent(ev->Recipient);
for (const auto& range : givenIdRange->GetChannelRanges()) {
agent.GivenIdRanges[range.GetChannel()].IssueNewRange(range.GetBegin(), range.GetEnd());
-
- auto& givenIdRanges = Channels[range.GetChannel()].GivenIdRanges;
- const bool wasEmpty = givenIdRanges.IsEmpty();
- givenIdRanges.IssueNewRange(range.GetBegin(), range.GetEnd());
- if (wasEmpty) {
- Data->OnLeastExpectedBlobIdChange(range.GetChannel());
- }
+ Channels[range.GetChannel()].GivenIdRanges.IssueNewRange(range.GetBegin(), range.GetEnd());
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT05, "IssueNewRange", (Id, GetLogId()),
- (AgentId, agent.ConnectedNodeId), (Channel, range.GetChannel()),
+ (AgentId, agent.Connection->NodeId), (Channel, range.GetChannel()),
(Begin, range.GetBegin()), (End, range.GetEnd()));
}
}
@@ -177,7 +206,7 @@ namespace NKikimr::NBlobDepot {
Y_VERIFY(it != PipeServerToNode.end());
Y_VERIFY(it->second);
TAgent& agent = GetAgent(*it->second);
- Y_VERIFY(agent.PipeServerId == pipeServerId);
+ Y_VERIFY(agent.Connection && agent.Connection->PipeServerId == pipeServerId);
return agent;
}
@@ -190,15 +219,22 @@ namespace NKikimr::NBlobDepot {
void TBlobDepot::ResetAgent(TAgent& agent) {
for (auto& [channel, agentGivenIdRange] : agent.GivenIdRanges) {
- Channels[channel].GivenIdRanges.Subtract(agentGivenIdRange);
- const ui32 channel_ = channel;
- const auto& agentGivenIdRange_ = agentGivenIdRange;
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT06, "ResetAgent", (Id, GetLogId()), (AgentId, agent.ConnectedNodeId),
- (Channel, channel_), (GivenIdRanges, Channels[channel_].GivenIdRanges),
- (Agent.GivenIdRanges, agentGivenIdRange_));
- agentGivenIdRange = {};
- Data->OnLeastExpectedBlobIdChange(channel);
+ if (agentGivenIdRange.IsEmpty()) {
+ continue;
+ }
+
+ // calculate if this agent can be blocking garbage collection by holding least conserved blob sequence id
+ const bool unblock = Channels[channel].GivenIdRanges.GetMinimumValue() == agentGivenIdRange.GetMinimumValue();
+
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT06, "ResetAgent", (Id, GetLogId()), (AgentId, agent.Connection->NodeId),
+ (Channel, channel), (GivenIdRanges, Channels[channel].GivenIdRanges),
+ (Agent.GivenIdRanges, agentGivenIdRange), (Unblock, unblock));
+
+ if (unblock) {
+ Data->OnLeastExpectedBlobIdChange(channel);
+ }
}
+ agent.InvalidatedStepInFlight.clear();
}
void TBlobDepot::InitChannelKinds() {
@@ -263,7 +299,7 @@ namespace NKikimr::NBlobDepot {
};
TAgent& agent = GetAgent(ev->Recipient);
- Execute(std::make_unique<TTxInvokeCallback>(this, agent.ConnectedNodeId, ev));
+ Execute(std::make_unique<TTxInvokeCallback>(this, agent.Connection->NodeId, ev));
}
void TBlobDepot::ProcessRegisterAgentQ() {
@@ -281,15 +317,14 @@ namespace NKikimr::NBlobDepot {
void TBlobDepot::OnSpaceColorChange(NKikimrBlobStorage::TPDiskSpaceColor::E spaceColor, float approximateFreeSpaceShare) {
for (auto& [nodeId, agent] : Agents) {
- if (agent.AgentId && (agent.LastPushedSpaceColor != spaceColor ||
- agent.LastPushedApproximateFreeSpaceShare != approximateFreeSpaceShare)) {
- Y_VERIFY(agent.ConnectedNodeId == nodeId);
+ if (agent.Connection && (agent.LastPushedSpaceColor != spaceColor || agent.LastPushedApproximateFreeSpaceShare != approximateFreeSpaceShare)) {
+ Y_VERIFY(agent.Connection->NodeId == nodeId);
const ui64 id = ++agent.LastRequestId;
agent.PushCallbacks.emplace(id, [](TEvBlobDepot::TEvPushNotifyResult::TPtr) {});
auto ev = std::make_unique<TEvBlobDepot::TEvPushNotify>();
ev->Record.SetSpaceColor(spaceColor);
ev->Record.SetApproximateFreeSpaceShare(approximateFreeSpaceShare);
- Send(*agent.AgentId, ev.release(), 0, id);
+ Send(agent.Connection->AgentId, ev.release(), 0, id);
agent.LastPushedSpaceColor = spaceColor;
agent.LastPushedApproximateFreeSpaceShare = approximateFreeSpaceShare;
}
diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h
index 70e33e18c8b..fbebb7a2d2a 100644
--- a/ydb/core/blob_depot/agent/agent_impl.h
+++ b/ydb/core/blob_depot/agent/agent_impl.h
@@ -85,6 +85,7 @@ namespace NKikimr::NBlobDepot {
const ui64 AgentInstanceId;
ui64 TabletId = Max<ui64>();
TActorId PipeId;
+ TActorId PipeServerId;
bool IsConnected = false;
private:
diff --git a/ydb/core/blob_depot/agent/blocks.cpp b/ydb/core/blob_depot/agent/blocks.cpp
index 61a5cea25d7..16ec06e9dbb 100644
--- a/ydb/core/blob_depot/agent/blocks.cpp
+++ b/ydb/core/blob_depot/agent/blocks.cpp
@@ -89,6 +89,7 @@ namespace NKikimr::NBlobDepot {
if (const auto it = Blocks.find(tablet.GetTabletId()); it != Blocks.end()) {
auto& block = it->second;
block.BlockedGeneration = tablet.GetBlockedGeneration();
+ block.IssuerGuid = tablet.GetIssuerGuid();
block.ExpirationTimestamp = TMonotonic::Zero();
IssueOnUpdateBlock(block, true);
}
diff --git a/ydb/core/blob_depot/agent/comm.cpp b/ydb/core/blob_depot/agent/comm.cpp
index 63ce937f32d..c34ba23af67 100644
--- a/ydb/core/blob_depot/agent/comm.cpp
+++ b/ydb/core/blob_depot/agent/comm.cpp
@@ -4,14 +4,21 @@
namespace NKikimr::NBlobDepot {
void TBlobDepotAgent::Handle(TEvTabletPipe::TEvClientConnected::TPtr ev) {
+ auto& msg = *ev->Get();
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA03, "TEvClientConnected", (VirtualGroupId, VirtualGroupId),
- (Msg, ev->Get()->ToString()));
+ (TabletId, msg.TabletId), (Status, msg.Status), (ClientId, msg.ClientId), (ServerId, msg.ServerId));
+ Y_VERIFY_DEBUG_S(msg.Status == NKikimrProto::OK, "Status# " << NKikimrProto::EReplyStatus_Name(msg.Status));
+ if (msg.Status != NKikimrProto::OK) {
+ ConnectToBlobDepot();
+ } else {
+ PipeServerId = msg.ServerId;
+ }
}
void TBlobDepotAgent::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev) {
STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA04, "TEvClientDestroyed", (VirtualGroupId, VirtualGroupId),
(Msg, ev->Get()->ToString()));
- PipeId = {};
+ PipeId = PipeServerId = {};
OnDisconnect();
ConnectToBlobDepot();
}
@@ -169,12 +176,14 @@ namespace NKikimr::NBlobDepot {
}
void TBlobDepotAgent::Handle(TEvBlobDepot::TEvPushNotify::TPtr ev) {
- auto response = std::make_unique<TEvBlobDepot::TEvPushNotifyResult>();
-
auto& msg = ev->Get()->Record;
-
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA11, "TEvPushNotify", (VirtualGroupId, VirtualGroupId), (Msg, msg),
- (Id, ev->Cookie));
+ (Id, ev->Cookie), (Sender, ev->Sender), (PipeServerId, PipeServerId));
+ if (ev->Sender != PipeServerId) {
+ return; // race with previous connection
+ }
+
+ auto response = std::make_unique<TEvBlobDepot::TEvPushNotifyResult>();
BlocksManager.OnBlockedTablets(msg.GetBlockedTablets());
diff --git a/ydb/core/blob_depot/agent/storage_put.cpp b/ydb/core/blob_depot/agent/storage_put.cpp
index 226ab8f241c..ad964fc7832 100644
--- a/ydb/core/blob_depot/agent/storage_put.cpp
+++ b/ydb/core/blob_depot/agent/storage_put.cpp
@@ -7,7 +7,7 @@ namespace NKikimr::NBlobDepot {
TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvPut>(std::unique_ptr<IEventHandle> ev) {
class TPutQuery : public TBlobStorageQuery<TEvBlobStorage::TEvPut> {
const bool SuppressFooter = true;
- const bool IssueUncertainWrites = true;
+ const bool IssueUncertainWrites = false;
std::vector<ui32> BlockChecksRemain;
ui32 PutsInFlight = 0;
@@ -178,7 +178,7 @@ namespace NKikimr::NBlobDepot {
}
auto& kind = it->second;
const size_t numErased = kind.WritesInFlight.erase(BlobSeqId);
- Y_VERIFY(numErased);
+ Y_VERIFY(numErased || BlobSeqId.Generation < Agent.BlobDepotGeneration);
}
void OnUpdateBlock(bool success) override {
diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h
index 55e6a11b054..5b2014e8ebb 100644
--- a/ydb/core/blob_depot/blob_depot_tablet.h
+++ b/ydb/core/blob_depot/blob_depot_tablet.h
@@ -53,9 +53,13 @@ namespace NKikimr::NBlobDepot {
THashMap<TActorId, std::deque<std::unique_ptr<IEventHandle>>> RegisterAgentQ;
struct TAgent {
- std::optional<TActorId> PipeServerId;
- std::optional<TActorId> AgentId;
- ui32 ConnectedNodeId;
+ struct TConnection {
+ TActorId PipeServerId;
+ TActorId AgentId;
+ ui32 NodeId;
+ };
+
+ std::optional<TConnection> Connection;
TInstant ExpirationTimestamp;
std::optional<ui64> AgentInstanceId;
@@ -63,6 +67,9 @@ namespace NKikimr::NBlobDepot {
THashMap<ui8, ui32> InvalidatedStepInFlight;
THashMap<ui64, THashMap<ui8, ui32>> InvalidateStepRequests;
+
+ THashMap<ui64, std::tuple<ui32, ui64, TActorId>> BlockToDeliver; // TabletId -> (BlockedGeneration, IssuerGuid, ActorId)
+
THashMap<ui64, std::function<void(TEvBlobDepot::TEvPushNotifyResult::TPtr)>> PushCallbacks;
ui64 LastRequestId = 0;
@@ -81,6 +88,12 @@ namespace NKikimr::NBlobDepot {
TChannelKind *KindPtr;
TGivenIdRange GivenIdRanges; // accumulated through all agents
ui64 NextBlobSeqId = 0;
+
+ // Obtain the least BlobSeqId that is not yet confirmed, but may be written by any agent
+ TBlobSeqId GetLeastExpectedBlobId(ui32 generation) const {
+ return TBlobSeqId::FromSequentalNumber(Index, generation, GivenIdRanges.IsEmpty() ? NextBlobSeqId :
+ GivenIdRanges.GetMinimumValue());
+ }
};
std::vector<TChannelInfo> Channels;
diff --git a/ydb/core/blob_depot/blocks.cpp b/ydb/core/blob_depot/blocks.cpp
index e09310651b9..5beab7bef6a 100644
--- a/ydb/core/blob_depot/blocks.cpp
+++ b/ydb/core/blob_depot/blocks.cpp
@@ -55,6 +55,12 @@ namespace NKikimr::NBlobDepot {
};
class TBlobDepot::TBlocksManager::TBlockProcessorActor : public TActorBootstrapped<TBlockProcessorActor> {
+ struct TEvPrivate {
+ enum {
+ EvCheckWaitingNode = EventSpaceBegin(TEvents::ES_PRIVATE),
+ };
+ };
+
TBlobDepot* const Self;
const ui64 TabletId;
const ui32 BlockedGeneration;
@@ -113,27 +119,56 @@ namespace NKikimr::NBlobDepot {
// skip the origin agent
continue;
}
- if (info.ExpirationTimestamp <= now) {
- SendPushToAgent(agentId);
+ if (info.ExpirationTimestamp <= now) { // includes case when agent is connected right now
+ TAgent& agent = Self->GetAgent(agentId);
+
+ // enqueue push notification
+ const auto [it, inserted] = agent.BlockToDeliver.try_emplace(TabletId, BlockedGeneration, IssuerGuid, SelfId());
+ if (!inserted) {
+ const auto& [currentBlockedGeneration, _1, _2] = it->second;
+ Y_VERIFY(currentBlockedGeneration < BlockedGeneration);
+ it->second = {BlockedGeneration, IssuerGuid, SelfId()};
+ }
+
+ // add node to wait list; also start timer to remove this node from the wait queue
+ NodesWaitingForPushResult.insert(agentId);
+ Y_VERIFY(info.ExpirationTimestamp <= now);
+ TActivationContext::Schedule(info.ExpirationTimestamp, new IEventHandle(TEvPrivate::EvCheckWaitingNode,
+ 0, SelfId(), {}, nullptr, agentId));
+
+ // issue message to the connected node, if it is
+ if (const auto& connection = agent.Connection) {
+ auto ev = std::make_unique<TEvBlobDepot::TEvPushNotify>();
+ auto *item = ev->Record.AddBlockedTablets();
+ item->SetTabletId(TabletId);
+ item->SetBlockedGeneration(BlockedGeneration);
+ item->SetIssuerGuid(IssuerGuid);
+
+ const ui64 id = ++agent.LastRequestId;
+ agent.PushCallbacks.emplace(id, [selfId = SelfId()](TEvBlobDepot::TEvPushNotifyResult::TPtr ev) {
+ TActivationContext::Send(ev->Forward(selfId));
+ });
+ TActivationContext::Send(new IEventHandle(connection->AgentId, connection->PipeServerId, ev.release(), 0, id));
+ }
}
}
}
- void SendPushToAgent(ui32 agentId) {
- auto ev = std::make_unique<TEvBlobDepot::TEvPushNotify>();
- auto *item = ev->Record.AddBlockedTablets();
- item->SetTabletId(TabletId);
- item->SetBlockedGeneration(BlockedGeneration);
-
- TAgent& agent = Self->GetAgent(agentId);
- if (const auto& actorId = agent.AgentId) {
- const ui64 id = ++agent.LastRequestId;
- agent.PushCallbacks.emplace(id, [selfId = SelfId()](TEvBlobDepot::TEvPushNotifyResult::TPtr ev) {
- TActivationContext::Send(ev->Forward(selfId));
- });
- Send(*actorId, ev.release(), 0, id);
+ void HandleCheckWaitingNode(TAutoPtr<IEventHandle> ev) {
+ const ui32 agentId = ev->Cookie;
+ if (NodesWaitingForPushResult.contains(agentId)) {
+ const TMonotonic now = TActivationContext::Monotonic();
+ const auto& info = Self->BlocksManager->Blocks[TabletId].PerAgentInfo[agentId];
+ if (info.ExpirationTimestamp <= now) { // node still can write data for this tablet, reschedule timer
+ TActivationContext::Schedule(info.ExpirationTimestamp, new IEventHandle(TEvPrivate::EvCheckWaitingNode,
+ 0, SelfId(), {}, nullptr, agentId));
+ } else {
+ NodesWaitingForPushResult.erase(agentId);
+ if (NodesWaitingForPushResult.empty()) {
+ Finish();
+ }
+ }
}
- NodesWaitingForPushResult.insert(agentId);
}
void Handle(TEvBlobDepot::TEvPushNotifyResult::TPtr ev) {
@@ -145,6 +180,11 @@ namespace NKikimr::NBlobDepot {
auto& block = Self->BlocksManager->Blocks[TabletId];
block.PerAgentInfo.erase(agentId);
+ TAgent& agent = Self->GetAgent(agentId);
+ const auto it = agent.BlockToDeliver.find(TabletId);
+ Y_VERIFY(it != agent.BlockToDeliver.end() && it->second == std::make_tuple(BlockedGeneration, IssuerGuid, SelfId()));
+ agent.BlockToDeliver.erase(it);
+
if (NodesWaitingForPushResult.empty()) {
Finish();
}
@@ -215,6 +255,7 @@ namespace NKikimr::NBlobDepot {
switch (ev->GetTypeRewrite()) {
hFunc(TEvBlobStorage::TEvBlockResult, Handle);
hFunc(TEvBlobDepot::TEvPushNotifyResult, Handle);
+ fFunc(TEvPrivate::EvCheckWaitingNode, HandleCheckWaitingNode);
cFunc(TEvents::TSystem::Wakeup, IssueBlocksToStorage);
cFunc(TEvents::TSystem::Poison, PassAway);
}
@@ -263,7 +304,7 @@ namespace NKikimr::NBlobDepot {
if (it == Blocks.end() || it->second.CanSetNewBlock(blockedGeneration, issuerGuid)) {
TAgent& agent = Self->GetAgent(ev->Recipient);
Self->Execute(std::make_unique<TTxUpdateBlock>(Self, tabletId, blockedGeneration,
- agent.ConnectedNodeId, record.GetIssuerGuid(), TActivationContext::Now(), std::move(response)));
+ agent.Connection->NodeId, record.GetIssuerGuid(), TActivationContext::Now(), std::move(response)));
} else {
responseRecord->SetStatus(NKikimrProto::ALREADY);
}
@@ -274,8 +315,7 @@ namespace NKikimr::NBlobDepot {
void TBlobDepot::TBlocksManager::Handle(TEvBlobDepot::TEvQueryBlocks::TPtr ev) {
TAgent& agent = Self->GetAgent(ev->Recipient);
- const ui32 agentId = agent.ConnectedNodeId;
- Y_VERIFY(agentId);
+ const ui32 agentId = agent.Connection->NodeId;
const TMonotonic now = TActivationContext::Monotonic();
diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp
index e129cade51c..762f7928a37 100644
--- a/ydb/core/blob_depot/data.cpp
+++ b/ydb/core/blob_depot/data.cpp
@@ -120,19 +120,13 @@ namespace NKikimr::NBlobDepot {
EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), [&](TLogoBlobID id, ui32, ui32) {
const auto [it, inserted] = RefCount.try_emplace(id);
if (inserted) {
- // first mention of this id
- auto& record = GetRecordsPerChannelGroup(id);
- const auto [_, inserted] = record.Used.insert(id);
- Y_VERIFY(inserted);
- AccountBlob(id, 1);
- TotalStoredDataSize += id.BlobSize();
-
- // blob is first mentioned and deleted as well
- if (outcome == EUpdateOutcome::DROP) {
+ AddFirstMentionedBlob(id);
+ }
+ if (outcome == EUpdateOutcome::DROP) {
+ if (inserted) {
deleteQ.push_back(id);
}
- }
- if (outcome != EUpdateOutcome::DROP) {
+ } else {
++it->second;
}
});
@@ -277,12 +271,8 @@ namespace NKikimr::NBlobDepot {
const auto [it, inserted] = Data.try_emplace(std::move(key), std::move(proto), uncertainWrite);
if (inserted) {
EnumerateBlobsForValueChain(it->second.ValueChain, Self->TabletID(), [&](TLogoBlobID id, ui32, ui32) {
- if (!RefCount[id]++) { // first mention of this id
- auto& record = GetRecordsPerChannelGroup(id);
- const auto [_, inserted] = record.Used.insert(id);
- Y_VERIFY(inserted);
- AccountBlob(id, 1);
- TotalStoredDataSize += id.BlobSize();
+ if (!RefCount[id]++) {
+ AddFirstMentionedBlob(id);
}
});
}
@@ -358,51 +348,58 @@ namespace NKikimr::NBlobDepot {
const ui32 generation = Self->Executor()->Generation();
- std::set<TBlobSeqId> writesInFlight;
+ std::vector<TBlobSeqId> writesInFlight;
for (const auto& item : ev->Get()->Record.GetWritesInFlight()) {
- writesInFlight.insert(TBlobSeqId::FromProto(item));
+ writesInFlight.push_back(TBlobSeqId::FromProto(item));
}
+ std::sort(writesInFlight.begin(), writesInFlight.end());
for (const auto& [channel, invalidatedStep] : items) {
const ui32 channel_ = channel;
- const ui32 invalidatedStep_ = invalidatedStep;
auto& agentGivenIdRanges = agent.GivenIdRanges[channel];
auto& givenIdRanges = Self->Channels[channel].GivenIdRanges;
- auto begin = writesInFlight.lower_bound(TBlobSeqId{channel, 0, 0, 0});
- auto end = writesInFlight.upper_bound(TBlobSeqId{channel, Max<ui32>(), Max<ui32>(), TBlobSeqId::MaxIndex});
+ auto begin = std::lower_bound(writesInFlight.begin(), writesInFlight.end(), TBlobSeqId{channel, 0, 0, 0});
auto makeWritesInFlight = [&] {
TStringStream s;
s << "[";
- for (auto it = begin; it != end; ++it) {
+ for (auto it = begin; it != writesInFlight.end() && it->Channel == channel_; ++it) {
s << (it != begin ? " " : "") << it->ToString();
}
s << "]";
return s.Str();
};
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT13, "Trim", (Id, Self->GetLogId()), (AgentId, agent.ConnectedNodeId),
- (Id, ev->Cookie), (Channel, channel_), (InvalidatedStep, invalidatedStep_),
- (GivenIdRanges, Self->Channels[channel_].GivenIdRanges),
- (Agent.GivenIdRanges, agent.GivenIdRanges[channel_]),
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT13, "Trim", (Id, Self->GetLogId()), (AgentId, agent.Connection->NodeId),
+ (Id, ev->Cookie), (Channel, channel), (InvalidatedStep, invalidatedStep),
+ (GivenIdRanges, Self->Channels[channel].GivenIdRanges),
+ (Agent.GivenIdRanges, agent.GivenIdRanges[channel]),
(WritesInFlight, makeWritesInFlight()));
- for (auto it = begin; it != end; ++it) {
+ // sanity check -- ensure that current writes in flight would be conserved when processing garbage
+ for (auto it = begin; it != writesInFlight.end() && it->Channel == channel; ++it) {
Y_VERIFY_S(agentGivenIdRanges.GetPoint(it->ToSequentialNumber()), "blobSeqId# " << it->ToString());
Y_VERIFY_S(givenIdRanges.GetPoint(it->ToSequentialNumber()), "blobSeqId# " << it->ToString());
}
+ const TBlobSeqId leastExpectedBlobIdBefore = Self->Channels[channel].GetLeastExpectedBlobId(generation);
+
const TBlobSeqId trimmedBlobSeqId{channel, generation, invalidatedStep, TBlobSeqId::MaxIndex};
const ui64 validSince = trimmedBlobSeqId.ToSequentialNumber() + 1;
givenIdRanges.Subtract(agentGivenIdRanges.Trim(validSince));
- for (auto it = begin; it != end; ++it) {
+ for (auto it = begin; it != writesInFlight.end() && it->Channel == channel; ++it) {
agentGivenIdRanges.AddPoint(it->ToSequentialNumber());
givenIdRanges.AddPoint(it->ToSequentialNumber());
}
- OnLeastExpectedBlobIdChange(channel);
+ const TBlobSeqId leastExpectedBlobIdAfter = Self->Channels[channel].GetLeastExpectedBlobId(generation);
+ Y_VERIFY(leastExpectedBlobIdBefore <= leastExpectedBlobIdAfter);
+
+ if (leastExpectedBlobIdBefore != leastExpectedBlobIdAfter) {
+ OnLeastExpectedBlobIdChange(channel);
+ }
}
}
@@ -429,6 +426,14 @@ namespace NKikimr::NBlobDepot {
return finished;
}
+ void TData::AddFirstMentionedBlob(TLogoBlobID id) {
+ auto& record = GetRecordsPerChannelGroup(id);
+ const auto [_, inserted] = record.Used.insert(id);
+ Y_VERIFY(inserted);
+ AccountBlob(id, true);
+ TotalStoredDataSize += id.BlobSize();
+ }
+
void TData::AccountBlob(TLogoBlobID id, bool add) {
// account record
const ui32 groupId = Self->Info()->GroupFor(id.Channel(), id.Generation());
@@ -446,21 +451,12 @@ namespace NKikimr::NBlobDepot {
}
void TData::OnLeastExpectedBlobIdChange(ui8 channel) {
- auto& ch = Self->Channels[channel];
- const ui64 minSequenceNumber = ch.GivenIdRanges.IsEmpty()
- ? ch.NextBlobSeqId
- : ch.GivenIdRanges.GetMinimumValue();
- const TBlobSeqId leastExpectedBlobId = TBlobSeqId::FromSequentalNumber(channel, Self->Executor()->Generation(),
- minSequenceNumber);
-
- const TTabletStorageInfo *info = Self->Info();
- const TTabletChannelInfo *storageChannel = info->ChannelInfo(leastExpectedBlobId.Channel);
+ const TTabletChannelInfo *storageChannel = Self->Info()->ChannelInfo(channel);
Y_VERIFY(storageChannel);
for (const auto& entry : storageChannel->History) {
const auto& key = std::make_tuple(storageChannel->Channel, entry.GroupID);
auto [it, _] = RecordsPerChannelGroup.emplace(std::piecewise_construct, key, key);
- auto& record = it->second;
- record.OnLeastExpectedBlobIdChange(this, leastExpectedBlobId);
+ it->second.OnLeastExpectedBlobIdChange(this);
}
}
@@ -481,13 +477,8 @@ namespace NKikimr::NBlobDepot {
LastConfirmedGenStep = IssuedGenStep;
}
- void TData::TRecordsPerChannelGroup::OnLeastExpectedBlobIdChange(TData *self, TBlobSeqId leastExpectedBlobId) {
- Y_VERIFY_S(LeastExpectedBlobId <= leastExpectedBlobId, "Prev# " << LeastExpectedBlobId.ToString()
- << " Next# " << leastExpectedBlobId.ToString());
- if (LeastExpectedBlobId < leastExpectedBlobId) {
- LeastExpectedBlobId = leastExpectedBlobId;
- CollectIfPossible(self);
- }
+ void TData::TRecordsPerChannelGroup::OnLeastExpectedBlobIdChange(TData *self) {
+ CollectIfPossible(self);
}
void TData::TRecordsPerChannelGroup::ClearInFlight(TData *self) {
diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h
index cdb380d7295..54103f8da68 100644
--- a/ydb/core/blob_depot/data.h
+++ b/ydb/core/blob_depot/data.h
@@ -329,7 +329,6 @@ namespace NKikimr::NBlobDepot {
TGenStep IssuedGenStep; // currently in flight or already confirmed
TGenStep LastConfirmedGenStep;
bool CollectGarbageRequestInFlight = false;
- TBlobSeqId LeastExpectedBlobId;
TRecordsPerChannelGroup(ui8 channel, ui32 groupId)
: Channel(channel)
@@ -338,7 +337,7 @@ namespace NKikimr::NBlobDepot {
void MoveToTrash(TLogoBlobID id);
void OnSuccessfulCollect(TData *self);
- void OnLeastExpectedBlobIdChange(TData *self, TBlobSeqId leastExpectedBlobId);
+ void OnLeastExpectedBlobIdChange(TData *self);
void ClearInFlight(TData *self);
void CollectIfPossible(TData *self);
};
@@ -457,6 +456,7 @@ namespace NKikimr::NBlobDepot {
bool OnBarrierShift(ui64 tabletId, ui8 channel, bool hard, TGenStep previous, TGenStep current, ui32& maxItems,
NTabletFlatExecutor::TTransactionContext& txc, void *cookie);
+ void AddFirstMentionedBlob(TLogoBlobID id);
void AccountBlob(TLogoBlobID id, bool add);
bool CanBeCollected(ui32 groupId, TBlobSeqId id) const;
diff --git a/ydb/core/blob_depot/data_trash.cpp b/ydb/core/blob_depot/data_trash.cpp
index 4957a88cc7e..bb46c517136 100644
--- a/ydb/core/blob_depot/data_trash.cpp
+++ b/ydb/core/blob_depot/data_trash.cpp
@@ -33,9 +33,10 @@ namespace NKikimr::NBlobDepot {
std::set<TLogoBlobID>::iterator trashEndIter = record.Trash.end();
// step we are going to invalidate (including blobs with this one)
- if (TGenStep(record.LeastExpectedBlobId) <= nextGenStep) {
- const ui32 invalidatedStep = nextGenStep.Step(); // the step we want to invalidate and garbage collect
+ TBlobSeqId leastExpectedBlobId = channel.GetLeastExpectedBlobId(generation);
+ const ui32 invalidatedStep = nextGenStep.Step(); // the step we want to invalidate and garbage collect
+ if (TGenStep(leastExpectedBlobId) <= nextGenStep) {
// remove invalidated step from allocations
auto blobSeqId = TBlobSeqId::FromSequentalNumber(record.Channel, generation, channel.NextBlobSeqId);
Y_VERIFY(record.LastConfirmedGenStep < TGenStep(blobSeqId));
@@ -45,29 +46,33 @@ namespace NKikimr::NBlobDepot {
channel.NextBlobSeqId = blobSeqId.ToSequentialNumber();
}
- // issue notifications to agents
+ // recalculate least expected blob id -- it may change if the given id set was empty
+ leastExpectedBlobId = channel.GetLeastExpectedBlobId(generation);
+ }
+
+ if (TGenStep(leastExpectedBlobId) <= nextGenStep) {
+ // issue notifications to agents -- we want to trim their ids
for (auto& [agentId, agent] : Self->Agents) {
- if (!agent.AgentId) {
- continue;
- }
const auto [it, inserted] = agent.InvalidatedStepInFlight.emplace(record.Channel, invalidatedStep);
if (inserted || it->second < invalidatedStep) {
it->second = invalidatedStep;
- auto& ev = outbox[agentId];
- if (!ev) {
- ev.reset(new TEvBlobDepot::TEvPushNotify);
+ if (agent.Connection) {
+ auto& ev = outbox[agentId];
+ if (!ev) {
+ ev.reset(new TEvBlobDepot::TEvPushNotify);
+ }
+ auto *item = ev->Record.AddInvalidatedSteps();
+ item->SetChannel(record.Channel);
+ item->SetGeneration(generation);
+ item->SetInvalidatedStep(invalidatedStep);
}
- auto *item = ev->Record.AddInvalidatedSteps();
- item->SetChannel(record.Channel);
- item->SetGeneration(generation);
- item->SetInvalidatedStep(invalidatedStep);
}
}
- // adjust the barrier to keep it safe now
- const TLogoBlobID maxId(Self->TabletID(), record.LeastExpectedBlobId.Generation,
- record.LeastExpectedBlobId.Step, record.Channel, 0, 0);
+ // adjust the barrier to keep it safe now (till we trim ids)
+ const TLogoBlobID maxId(Self->TabletID(), leastExpectedBlobId.Generation,
+ leastExpectedBlobId.Step, record.Channel, 0, 0);
trashEndIter = record.Trash.lower_bound(maxId);
if (trashEndIter != record.Trash.begin()) {
nextGenStep = TGenStep(*std::prev(trashEndIter));
@@ -120,7 +125,7 @@ namespace NKikimr::NBlobDepot {
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT11, "issuing TEvCollectGarbage", (Id, Self->GetLogId()),
(Channel, int(record.Channel)), (GroupId, record.GroupId), (Msg, ev->ToString()),
(LastConfirmedGenStep, record.LastConfirmedGenStep), (IssuedGenStep, record.IssuedGenStep),
- (LeastExpectedBlobId, record.LeastExpectedBlobId), (TrashInFlight.size, record.TrashInFlight.size()));
+ (LeastExpectedBlobId, leastExpectedBlobId), (TrashInFlight.size, record.TrashInFlight.size()));
const ui64 id = ++LastCollectCmdId;
CollectCmdToGroup.emplace(id, record.GroupId);
@@ -140,9 +145,9 @@ namespace NKikimr::NBlobDepot {
request[item.GetChannel()] = item.GetInvalidatedStep();
}
- Y_VERIFY(agent.AgentId);
+ Y_VERIFY(agent.Connection);
agent.PushCallbacks.emplace(id, std::bind(&TData::OnPushNotifyResult, this, std::placeholders::_1));
- TActivationContext::Send(new IEventHandle(*agent.AgentId, Self->SelfId(), ev.release(), 0, id));
+ TActivationContext::Send(new IEventHandle(agent.Connection->AgentId, agent.Connection->PipeServerId, ev.release(), 0, id));
}
}
diff --git a/ydb/core/blob_depot/op_commit_blob_seq.cpp b/ydb/core/blob_depot/op_commit_blob_seq.cpp
index ac7196b2043..60195b15090 100644
--- a/ydb/core/blob_depot/op_commit_blob_seq.cpp
+++ b/ydb/core/blob_depot/op_commit_blob_seq.cpp
@@ -19,6 +19,11 @@ namespace NKikimr::NBlobDepot {
{}
bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ TAgent& agent = Self->GetAgent(NodeId);
+ if (!agent.Connection) { // agent disconnected while transaction was in queue -- drop this request
+ return true;
+ }
+
if (!LoadMissingKeys(txc)) {
return false;
}
@@ -28,7 +33,6 @@ namespace NKikimr::NBlobDepot {
NKikimrBlobDepot::TEvCommitBlobSeqResult *responseRecord;
std::tie(Response, responseRecord) = TEvBlobDepot::MakeResponseFor(*Request, Self->SelfId());
- TAgent& agent = Self->GetAgent(NodeId);
const ui32 generation = Self->Executor()->Generation();
for (const auto& item : Request->Get()->Record.GetItems()) {
@@ -120,7 +124,7 @@ namespace NKikimr::NBlobDepot {
const ui64 value = blobSeqId.ToSequentialNumber();
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT18, "MarkGivenIdCommitted", (Id, Self->GetLogId()),
- (AgentId, agent.ConnectedNodeId), (BlobSeqId, blobSeqId), (Value, value),
+ (AgentId, agent.Connection->NodeId), (BlobSeqId, blobSeqId), (Value, value),
(GivenIdRanges, Self->Channels[blobSeqId.Channel].GivenIdRanges),
(Agent.GivenIdRanges, agent.GivenIdRanges[blobSeqId.Channel]));
@@ -161,7 +165,7 @@ namespace NKikimr::NBlobDepot {
};
TAgent& agent = GetAgent(ev->Recipient);
- Execute(std::make_unique<TTxCommitBlobSeq>(this, agent.ConnectedNodeId,
+ Execute(std::make_unique<TTxCommitBlobSeq>(this, agent.Connection->NodeId,
std::unique_ptr<TEvBlobDepot::TEvCommitBlobSeq::THandle>(ev.Release())));
}
@@ -169,7 +173,7 @@ namespace NKikimr::NBlobDepot {
TAgent& agent = GetAgent(ev->Recipient);
const ui32 generation = Executor()->Generation();
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT57, "TEvDiscardSpoiledBlobSeq", (Id, GetLogId()), (AgentId, agent.ConnectedNodeId),
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT57, "TEvDiscardSpoiledBlobSeq", (Id, GetLogId()), (AgentId, agent.Connection->NodeId),
(Msg, ev->Get()->Record));
// FIXME(alexvru): delete uncertain keys containing this BlobSeqId as they were never written
diff --git a/ydb/core/protos/blob_depot.proto b/ydb/core/protos/blob_depot.proto
index ade8c571245..47ddf8ae62a 100644
--- a/ydb/core/protos/blob_depot.proto
+++ b/ydb/core/protos/blob_depot.proto
@@ -115,6 +115,7 @@ message TEvPushNotify {
message TBlockedTablet {
optional fixed64 TabletId = 1;
optional uint32 BlockedGeneration = 2;
+ optional uint64 IssuerGuid = 3;
}
message TInvalidatedStep {
optional uint32 Channel = 1;