aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2022-07-19 12:04:34 +0300
committeralexvru <alexvru@ydb.tech>2022-07-19 12:04:34 +0300
commit62d316c142834b7d53315c1680b6f0ce9a56d185 (patch)
tree5b283975c95fae28d53b09482fecd7c4524fed78
parent188a2f4a6b4b415f13eaf72fa82fae030b81b8aa (diff)
downloadydb-62d316c142834b7d53315c1680b6f0ce9a56d185.tar.gz
BlobDepot work in progress
-rw-r--r--ydb/core/blob_depot/agent.cpp28
-rw-r--r--ydb/core/blob_depot/agent/blocks.cpp4
-rw-r--r--ydb/core/blob_depot/agent/channel_kind.cpp22
-rw-r--r--ydb/core/blob_depot/agent/comm.cpp31
-rw-r--r--ydb/core/blob_depot/agent/query.cpp2
-rw-r--r--ydb/core/blob_depot/agent/request.cpp11
-rw-r--r--ydb/core/blob_depot/agent/storage_collect_garbage.cpp7
-rw-r--r--ydb/core/blob_depot/agent/storage_get.cpp18
-rw-r--r--ydb/core/blob_depot/agent/storage_put.cpp20
-rw-r--r--ydb/core/blob_depot/blob_depot_tablet.h13
-rw-r--r--ydb/core/blob_depot/blocks.cpp8
-rw-r--r--ydb/core/blob_depot/data.cpp287
-rw-r--r--ydb/core/blob_depot/data.h49
-rw-r--r--ydb/core/blob_depot/defs.h1
-rw-r--r--ydb/core/blob_depot/events.h18
-rw-r--r--ydb/core/blob_depot/given_id_range.cpp218
-rw-r--r--ydb/core/blob_depot/mon_main.cpp50
-rw-r--r--ydb/core/blob_depot/op_commit_blob_seq.cpp12
-rw-r--r--ydb/core/blob_depot/types.h23
-rw-r--r--ydb/core/mind/hive/monitoring.cpp2
20 files changed, 593 insertions, 231 deletions
diff --git a/ydb/core/blob_depot/agent.cpp b/ydb/core/blob_depot/agent.cpp
index 68de7977909..eeb06f7e4df 100644
--- a/ydb/core/blob_depot/agent.cpp
+++ b/ydb/core/blob_depot/agent.cpp
@@ -17,9 +17,10 @@ namespace NKikimr::NBlobDepot {
Y_VERIFY(it != PipeServerToNode.end());
if (const auto& nodeId = it->second) {
if (const auto agentIt = Agents.find(*nodeId); agentIt != Agents.end()) {
- if (TAgent& agent = agentIt->second; agent.ConnectedAgent == it->first) {
+ if (TAgent& agent = agentIt->second; agent.PipeServerId == it->first) {
OnAgentDisconnect(agent);
- agent.ConnectedAgent.reset();
+ agent.PipeServerId.reset();
+ agent.AgentId.reset();
agent.ConnectedNodeId = 0;
agent.ExpirationTimestamp = TActivationContext::Now() + ExpirationTimeout;
}
@@ -37,14 +38,15 @@ namespace NKikimr::NBlobDepot {
const TActorId& pipeServerId = ev->Recipient;
const auto& req = ev->Get()->Record;
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT03, "TEvRegisterAgent", (TabletId, TabletID()), (Msg, req), (NodeId, nodeId),
- (PipeServerId, pipeServerId));
+ (PipeServerId, pipeServerId), (Id, ev->Cookie));
const auto it = PipeServerToNode.find(pipeServerId);
Y_VERIFY(it != PipeServerToNode.end());
Y_VERIFY(!it->second || *it->second == nodeId);
it->second = nodeId;
auto& agent = Agents[nodeId];
- agent.ConnectedAgent = pipeServerId;
+ agent.PipeServerId = pipeServerId;
+ agent.AgentId = ev->Sender;
agent.ConnectedNodeId = nodeId;
agent.ExpirationTimestamp = TInstant::Max();
@@ -55,9 +57,8 @@ namespace NKikimr::NBlobDepot {
OnAgentConnect(agent);
- auto [response, record] = TEvBlobDepot::MakeResponseFor(*ev, SelfId());
+ auto [response, record] = TEvBlobDepot::MakeResponseFor(*ev, SelfId(), Executor()->Generation());
- record->SetGeneration(Executor()->Generation());
for (const auto& [k, v] : ChannelKinds) {
auto *proto = record->AddChannelKinds();
proto->SetChannelKind(k);
@@ -127,7 +128,14 @@ namespace NKikimr::NBlobDepot {
TAgent& agent = GetAgent(ev->Recipient);
for (const auto& range : givenIdRange->GetChannelRanges()) {
agent.GivenIdRanges[range.GetChannel()].IssueNewRange(range.GetBegin(), range.GetEnd());
- Channels[range.GetChannel()].GivenIdRanges.IssueNewRange(range.GetBegin(), range.GetEnd());
+
+ auto& givenIdRanges = Channels[range.GetChannel()].GivenIdRanges;
+ const bool wasEmpty = givenIdRanges.IsEmpty();
+ givenIdRanges.IssueNewRange(range.GetBegin(), range.GetEnd());
+ if (wasEmpty) {
+ Data->OnLeastExpectedBlobIdChange(range.GetChannel());
+ }
+
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT99, "IssueNewRange", (TabletId, TabletID()),
(AgentId, agent.ConnectedNodeId), (Channel, range.GetChannel()),
(Begin, range.GetBegin()), (End, range.GetEnd()));
@@ -142,14 +150,16 @@ namespace NKikimr::NBlobDepot {
Y_VERIFY(it != PipeServerToNode.end());
Y_VERIFY(it->second);
TAgent& agent = GetAgent(*it->second);
- Y_VERIFY(agent.ConnectedAgent == pipeServerId);
+ Y_VERIFY(agent.PipeServerId == pipeServerId);
return agent;
}
TBlobDepot::TAgent& TBlobDepot::GetAgent(ui32 nodeId) {
const auto agentIt = Agents.find(nodeId);
Y_VERIFY(agentIt != Agents.end());
- return agentIt->second;
+ TAgent& agent = agentIt->second;
+ Y_VERIFY(agent.ConnectedNodeId == nodeId);
+ return agent;
}
void TBlobDepot::ResetAgent(TAgent& agent) {
diff --git a/ydb/core/blob_depot/agent/blocks.cpp b/ydb/core/blob_depot/agent/blocks.cpp
index 852c6bc24c2..06efcfbff0a 100644
--- a/ydb/core/blob_depot/agent/blocks.cpp
+++ b/ydb/core/blob_depot/agent/blocks.cpp
@@ -21,6 +21,8 @@ namespace NKikimr::NBlobDepot {
Agent.Issue(std::move(queryBlocks), this, std::make_shared<TQueryBlockContext>(
TActivationContext::Monotonic(), tabletId));
block.RefreshInFlight = true;
+ }
+ if (status == NKikimrProto::UNKNOWN) {
block.PendingBlockChecks.PushBack(query);
}
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA01, "CheckBlockForTablet", (QueryId, query->GetQueryId()),
@@ -44,7 +46,7 @@ namespace NKikimr::NBlobDepot {
void TBlobDepotAgent::TBlocksManager::Handle(TRequestContext::TPtr context, NKikimrBlobDepot::TEvQueryBlocksResult& msg) {
auto& queryBlockContext = context->Obtain<TQueryBlockContext>();
- STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA02, "TEvQueryBlocksResult", (VirtualGroupId, Agent.VirtualGroupId),
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA02, "TEvQueryBlocksResult", (VirtualGroupId, Agent.VirtualGroupId),
(Msg, msg), (TabletId, queryBlockContext.TabletId));
auto& block = Blocks[queryBlockContext.TabletId];
Y_VERIFY(msg.BlockedGenerationsSize() == 1);
diff --git a/ydb/core/blob_depot/agent/channel_kind.cpp b/ydb/core/blob_depot/agent/channel_kind.cpp
index 6cbae9e0173..e9e30896474 100644
--- a/ydb/core/blob_depot/agent/channel_kind.cpp
+++ b/ydb/core/blob_depot/agent/channel_kind.cpp
@@ -23,6 +23,13 @@ namespace NKikimr::NBlobDepot {
}
ui32 TBlobDepotAgent::TChannelKind::GetNumAvailableItems() const {
+#ifndef NDEBUG
+ ui32 count = 0;
+ for (const auto& [_, givenIdRanges] : GivenIdRangePerChannel) {
+ count += givenIdRanges.GetNumAvailableItems();
+ }
+ Y_VERIFY(count == NumAvailableItems);
+#endif
return NumAvailableItems;
}
@@ -55,14 +62,21 @@ namespace NKikimr::NBlobDepot {
}
void TBlobDepotAgent::TChannelKind::Trim(ui8 channel, ui32 generation, ui32 invalidatedStep) {
- GivenIdRangePerChannel[channel].Trim(channel, generation, invalidatedStep);
+ const TBlobSeqId trimmedBlobSeqId{channel, generation, invalidatedStep, TBlobSeqId::MaxIndex};
+ const ui64 validSince = trimmedBlobSeqId.ToSequentialNumber() + 1;
+ auto& givenIdRanges = GivenIdRangePerChannel[channel];
+ NumAvailableItems -= givenIdRanges.GetNumAvailableItems();
+ givenIdRanges.Trim(validSince);
+ NumAvailableItems += givenIdRanges.GetNumAvailableItems();
RebuildHeap();
}
void TBlobDepotAgent::TChannelKind::RebuildHeap() {
GivenIdRangeHeap.clear();
for (auto& kv : GivenIdRangePerChannel) {
- GivenIdRangeHeap.push_back(&kv);
+ if (!kv.second.IsEmpty()) {
+ GivenIdRangeHeap.push_back(&kv);
+ }
}
std::make_heap(GivenIdRangeHeap.begin(), GivenIdRangeHeap.end(), TGivenIdRangeHeapComp());
}
@@ -74,9 +88,7 @@ namespace NKikimr::NBlobDepot {
void TBlobDepotAgent::TChannelKind::ProcessQueriesWaitingForId() {
TIntrusiveList<TQuery, TPendingId> temp;
temp.Swap(QueriesWaitingForId);
- for (TQuery& query : temp) {
- query.OnIdAllocated();
- }
+ temp.ForEach([&](TQuery *query) { query->OnIdAllocated(); });
}
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/comm.cpp b/ydb/core/blob_depot/agent/comm.cpp
index bd09eddd8a2..8db0a12479a 100644
--- a/ydb/core/blob_depot/agent/comm.cpp
+++ b/ydb/core/blob_depot/agent/comm.cpp
@@ -26,7 +26,7 @@ namespace NKikimr::NBlobDepot {
}
void TBlobDepotAgent::Handle(TRequestContext::TPtr /*context*/, NKikimrBlobDepot::TEvRegisterAgentResult& msg) {
- STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA06, "TEvRegisterAgentResult", (VirtualGroupId, VirtualGroupId),
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA06, "TEvRegisterAgentResult", (VirtualGroupId, VirtualGroupId),
(Msg, msg));
Registered = true;
BlobDepotGeneration = msg.GetGeneration();
@@ -63,12 +63,21 @@ namespace NKikimr::NBlobDepot {
STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA07, "kind vanished", (VirtualGroupId, VirtualGroupId), (Kind, kind));
ChannelKinds.erase(kind);
}
+
+ for (const auto& [channel, kind] : ChannelToKind) {
+ kind->Trim(channel, BlobDepotGeneration - 1, Max<ui32>());
+
+ auto& wif = kind->WritesInFlight;
+ const TBlobSeqId min{channel, 0, 0, 0};
+ const TBlobSeqId max{channel, BlobDepotGeneration - 1, Max<ui32>(), TBlobSeqId::MaxIndex};
+ wif.erase(wif.lower_bound(min), wif.upper_bound(max));
+ }
}
void TBlobDepotAgent::IssueAllocateIdsIfNeeded(TChannelKind& kind) {
if (!kind.IdAllocInFlight && kind.GetNumAvailableItems() < 100 && PipeId) {
const ui64 id = NextRequestId++;
- STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA08, "IssueAllocateIdsIfNeeded", (VirtualGroupId, VirtualGroupId),
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA08, "IssueAllocateIdsIfNeeded", (VirtualGroupId, VirtualGroupId),
(ChannelKind, NKikimrBlobDepot::TChannelKind::E_Name(kind.Kind)),
(IdAllocInFlight, kind.IdAllocInFlight), (NumAvailableItems, kind.GetNumAvailableItems()),
(RequestId, id));
@@ -79,9 +88,6 @@ namespace NKikimr::NBlobDepot {
}
void TBlobDepotAgent::Handle(TRequestContext::TPtr context, NKikimrBlobDepot::TEvAllocateIdsResult& msg) {
- STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA09, "TEvAllocateIdsResult", (VirtualGroupId, VirtualGroupId),
- (Msg, msg));
-
auto& allocateIdsContext = context->Obtain<TAllocateIdsContext>();
const auto it = ChannelKinds.find(allocateIdsContext.ChannelKind);
Y_VERIFY_S(it != ChannelKinds.end(), "Kind# " << NKikimrBlobDepot::TChannelKind::E_Name(allocateIdsContext.ChannelKind)
@@ -97,11 +103,12 @@ namespace NKikimr::NBlobDepot {
if (msg.HasGivenIdRange()) {
kind.IssueGivenIdRange(msg.GetGivenIdRange());
}
+
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA09, "TEvAllocateIdsResult", (VirtualGroupId, VirtualGroupId),
+ (Msg, msg), (NumAvailableItems, kind.GetNumAvailableItems()));
}
void TBlobDepotAgent::OnDisconnect() {
- STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA10, "OnDisconnect", (VirtualGroupId, VirtualGroupId));
-
for (auto& [id, request] : std::exchange(TabletRequestInFlight, {})) {
request.Sender->OnRequestComplete(id, TTabletDisconnected{});
}
@@ -154,6 +161,9 @@ namespace NKikimr::NBlobDepot {
auto [response, record] = TEvBlobDepot::MakeResponseFor(*ev, SelfId());
auto& msg = ev->Get()->Record;
+
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA11, "TEvPushNotify", (VirtualGroupId, VirtualGroupId), (Msg, msg));
+
BlocksManager.OnBlockedTablets(msg.GetBlockedTablets());
for (const auto& item : msg.GetInvalidatedSteps()) {
@@ -162,6 +172,7 @@ namespace NKikimr::NBlobDepot {
const auto it = ChannelToKind.find(channel);
Y_VERIFY(it != ChannelToKind.end());
TChannelKind& kind = *it->second;
+ const ui32 numAvailableItemsBefore = kind.GetNumAvailableItems();
kind.Trim(channel, item.GetGeneration(), item.GetInvalidatedStep());
// report writes in flight that are trimmed
@@ -170,6 +181,12 @@ namespace NKikimr::NBlobDepot {
for (auto it = kind.WritesInFlight.lower_bound(first); it != kind.WritesInFlight.end() && *it <= last; ++it) {
it->ToProto(record->AddWritesInFlight());
}
+
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA11, "TrimChannel", (VirtualGroupId, VirtualGroupId),
+ (Channel, int(channel)), (NumAvailableItemsBefore, numAvailableItemsBefore),
+ (NumAvailableItemsAfter, kind.GetNumAvailableItems()));
+
+ IssueAllocateIdsIfNeeded(kind);
}
TActivationContext::Send(response.release());
diff --git a/ydb/core/blob_depot/agent/query.cpp b/ydb/core/blob_depot/agent/query.cpp
index ab4cd8c7d3f..4132c86308d 100644
--- a/ydb/core/blob_depot/agent/query.cpp
+++ b/ydb/core/blob_depot/agent/query.cpp
@@ -49,7 +49,7 @@ namespace NKikimr::NBlobDepot {
}
void TBlobDepotAgent::TQuery::EndWithSuccess(std::unique_ptr<IEventBase> response) {
- STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA14, "query ends with success", (VirtualGroupId, Agent.VirtualGroupId),
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA14, "query ends with success", (VirtualGroupId, Agent.VirtualGroupId),
(QueryId, QueryId), (Response, response->ToString()));
Agent.SelfId().Send(Event->Sender, response.release(), 0, Event->Cookie);
delete this;
diff --git a/ydb/core/blob_depot/agent/request.cpp b/ydb/core/blob_depot/agent/request.cpp
index 72adcaa0906..fbc486c984f 100644
--- a/ydb/core/blob_depot/agent/request.cpp
+++ b/ydb/core/blob_depot/agent/request.cpp
@@ -95,12 +95,11 @@ namespace NKikimr::NBlobDepot {
template void TBlobDepotAgent::HandleOtherResponse(TEvBlobStorage::TEvPutResult::TPtr ev);
void TBlobDepotAgent::OnRequestComplete(ui64 id, TResponse response, TRequestsInFlight& map) {
- const auto it = map.find(id);
- Y_VERIFY_S(it != map.end(), "id# " << id << " response# " << TRequestSender::ToString(response));
- TRequestInFlight request = std::move(it->second);
- map.erase(it);
-
- request.Sender->OnRequestComplete(id, std::move(response));
+ if (const auto it = map.find(id); it != map.end()) {
+ TRequestInFlight request = std::move(it->second);
+ map.erase(it);
+ request.Sender->OnRequestComplete(id, std::move(response));
+ }
}
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/storage_collect_garbage.cpp b/ydb/core/blob_depot/agent/storage_collect_garbage.cpp
index 0b0b6f27ee0..94115450075 100644
--- a/ydb/core/blob_depot/agent/storage_collect_garbage.cpp
+++ b/ydb/core/blob_depot/agent/storage_collect_garbage.cpp
@@ -13,6 +13,7 @@ namespace NKikimr::NBlobDepot {
ui32 NumDoNotKeep;
ui32 CounterShift = 0;
bool IsLast;
+ bool QueryInFlight = false;
public:
using TQuery::TQuery;
@@ -63,6 +64,9 @@ namespace NKikimr::NBlobDepot {
Agent.Issue(std::move(record), this, nullptr);
+ Y_VERIFY(!QueryInFlight);
+ QueryInFlight = true;
+
++CounterShift;
}
@@ -85,6 +89,9 @@ namespace NKikimr::NBlobDepot {
}
void HandleCollectGarbageResult(TRequestContext::TPtr /*context*/, NKikimrBlobDepot::TEvCollectGarbageResult& msg) {
+ Y_VERIFY(QueryInFlight);
+ QueryInFlight = false;
+
if (!msg.HasStatus()) {
EndWithError(NKikimrProto::ERROR, "incorrect TEvCollectGarbageResult protobuf");
} else if (const auto status = msg.GetStatus(); status != NKikimrProto::OK) {
diff --git a/ydb/core/blob_depot/agent/storage_get.cpp b/ydb/core/blob_depot/agent/storage_get.cpp
index 65882d78809..13bc7ef8e94 100644
--- a/ydb/core/blob_depot/agent/storage_get.cpp
+++ b/ydb/core/blob_depot/agent/storage_get.cpp
@@ -29,20 +29,23 @@ namespace NKikimr::NBlobDepot {
for (ui32 i = 0; i < msg.QuerySize; ++i) {
auto& query = msg.Queries[i];
- TString blobId(reinterpret_cast<const char*>(query.Id.GetRaw()), 3 * sizeof(ui64));
- if (const TValueChain *value = Agent.BlobMappingCache.ResolveKey(blobId, this,
- std::make_shared<TResolveKeyContext>(i))) {
- ProcessSingleResult(i, value);
- }
auto& response = Response->Responses[i];
response.Id = query.Id;
response.Shift = query.Shift;
response.RequestedSize = query.Size;
+
+ TString blobId(reinterpret_cast<const char*>(query.Id.GetRaw()), 3 * sizeof(ui64));
+ if (const TValueChain *value = Agent.BlobMappingCache.ResolveKey(blobId, this,
+ std::make_shared<TResolveKeyContext>(i))) {
+ if (!ProcessSingleResult(i, value)) {
+ return;
+ }
+ }
}
}
- void ProcessSingleResult(ui32 queryIdx, const TValueChain *value) {
+ bool ProcessSingleResult(ui32 queryIdx, const TValueChain *value) {
auto& msg = GetQuery();
if (!value) {
@@ -57,11 +60,14 @@ namespace NKikimr::NBlobDepot {
msg.GetHandleClass, msg.MustRestoreFirst, this, queryIdx, true, &error);
if (!success) {
EndWithError(NKikimrProto::ERROR, std::move(error));
+ return false;
}
}
if (!AnswersRemain) {
EndWithSuccess(std::move(Response));
+ return false;
}
+ return true;
}
void OnRead(ui64 tag, NKikimrProto::EReplyStatus status, TString buffer) override {
diff --git a/ydb/core/blob_depot/agent/storage_put.cpp b/ydb/core/blob_depot/agent/storage_put.cpp
index 8a66913fbb0..01ab46aecd7 100644
--- a/ydb/core/blob_depot/agent/storage_put.cpp
+++ b/ydb/core/blob_depot/agent/storage_put.cpp
@@ -117,7 +117,25 @@ namespace NKikimr::NBlobDepot {
--PutsInFlight;
if (msg.Status != NKikimrProto::OK) {
EndWithError(msg.Status, std::move(msg.ErrorReason));
- } else if (!PutsInFlight) {
+ } else if (PutsInFlight) {
+ // wait for all puts to complete
+ } else if (BlobSeqId.Generation != Agent.BlobDepotGeneration) {
+ // FIXME: although this is error now, we can handle this in the future, when BlobDepot picks records
+ // on restarts; it may have scanned written record and already updated it in its local database;
+ // however, if it did not, we can't try to commit this records as it may be already scheduled for
+ // garbage collection by the tablet
+ EndWithError(NKikimrProto::ERROR, "BlobDepot tablet was restarting during write");
+ } else {
+ // find and remove the write in flight record to ensure it won't be reported upon TEvPushNotify
+ // reception AND to check that it wasn't already trimmed by answering TEvPushNotifyResult
+ const auto it = Agent.ChannelKinds.find(NKikimrBlobDepot::TChannelKind::Data);
+ if (it == Agent.ChannelKinds.end()) {
+ return EndWithError(NKikimrProto::ERROR, "no Data channels");
+ }
+ auto& kind = it->second;
+ const size_t numErased = kind.WritesInFlight.erase(BlobSeqId);
+ Y_VERIFY(numErased);
+
NKikimrBlobDepot::TEvCommitBlobSeq request;
auto& msg = *Event->Get<TEvBlobStorage::TEvPut>();
diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h
index 0fa8e971de2..902588da70c 100644
--- a/ydb/core/blob_depot/blob_depot_tablet.h
+++ b/ydb/core/blob_depot/blob_depot_tablet.h
@@ -33,7 +33,8 @@ namespace NKikimr::NBlobDepot {
static constexpr TDuration ExpirationTimeout = TDuration::Minutes(1);
struct TAgent {
- std::optional<TActorId> ConnectedAgent;
+ std::optional<TActorId> PipeServerId;
+ std::optional<TActorId> AgentId;
ui32 ConnectedNodeId;
TInstant ExpirationTimestamp;
std::optional<ui64> AgentInstanceId;
@@ -70,12 +71,12 @@ namespace NKikimr::NBlobDepot {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- std::deque<std::unique_ptr<IEventHandle>> InitialEventsQ;
-
void Enqueue(TAutoPtr<IEventHandle>& ev, const TActorContext&) override {
- InitialEventsQ.emplace_back(ev.Release());
+ Y_FAIL("unexpected event Type# %08" PRIx32, ev->GetTypeRewrite());
}
+ void DefaultSignalTabletActive(const TActorContext&) override {} // signalled explicitly after load is complete
+
void OnActivateExecutor(const TActorContext&) override {
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT20, "OnActivateExecutor", (TabletId, TabletID()));
ExecuteTxInitSchema();
@@ -84,9 +85,7 @@ namespace NKikimr::NBlobDepot {
void OnLoadFinished() {
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT20, "OnLoadFinished", (TabletId, TabletID()));
Become(&TThis::StateWork);
- for (auto&& ev : std::exchange(InitialEventsQ, {})) {
- TActivationContext::Send(ev.release());
- }
+ SignalTabletActive(TActivationContext::AsActorContext());
}
void OnDetach(const TActorContext&) override {
diff --git a/ydb/core/blob_depot/blocks.cpp b/ydb/core/blob_depot/blocks.cpp
index 0c7107ee1be..d26ac376abc 100644
--- a/ydb/core/blob_depot/blocks.cpp
+++ b/ydb/core/blob_depot/blocks.cpp
@@ -102,7 +102,7 @@ namespace NKikimr::NBlobDepot {
item->SetBlockedGeneration(BlockedGeneration);
TAgent& agent = Self->GetAgent(agentId);
- if (const auto& actorId = agent.ConnectedAgent) {
+ if (const auto& actorId = agent.AgentId) {
Send(*actorId, ev.release(), 0, IssuerGuid);
}
NodesWaitingForPushResult.insert(agentId);
@@ -124,7 +124,7 @@ namespace NKikimr::NBlobDepot {
void IssueBlocksToStorage() {
THashSet<ui32> processedGroups;
- for (const auto& [_, kind] : Self->ChannelKinds) {
+ for (const auto& [_, kind] : Self->ChannelKinds) { // FIXME: SIGSEGV here?
for (const auto& [channel, groupId] : kind.ChannelGroups) {
// FIXME: consider previous group generations (because agent can write in obsolete tablet generation)
// !!!!!!!!!!!
@@ -138,14 +138,14 @@ namespace NKikimr::NBlobDepot {
}
void SendBlock(ui32 groupId) {
- STLOG(PRI_INFO, BLOB_DEPOT, BDT06, "issing TEvBlock", (TabletId, Self->TabletID()), (BlockedTabletId,
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT06, "issing TEvBlock", (TabletId, Self->TabletID()), (BlockedTabletId,
TabletId), (BlockedGeneration, BlockedGeneration), (GroupId, groupId), (IssuerGuid, IssuerGuid));
SendToBSProxy(SelfId(), groupId, new TEvBlobStorage::TEvBlock(TabletId, BlockedGeneration, TInstant::Max(),
IssuerGuid), groupId);
}
void Handle(TEvBlobStorage::TEvBlockResult::TPtr ev) {
- STLOG(PRI_INFO, BLOB_DEPOT, BDT07, "TEvBlockResult", (TabletId, Self->TabletID()), (Msg, ev->Get()->ToString()),
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT07, "TEvBlockResult", (TabletId, Self->TabletID()), (Msg, ev->Get()->ToString()),
(BlockedTabletId, TabletId), (BlockedGeneration, BlockedGeneration), (GroupId, ev->Cookie));
switch (ev->Get()->Status) {
case NKikimrProto::OK:
diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp
index b0d58eddae0..c5292a7ed54 100644
--- a/ydb/core/blob_depot/data.cpp
+++ b/ydb/core/blob_depot/data.cpp
@@ -3,7 +3,9 @@
namespace NKikimr::NBlobDepot {
- class TBlobDepot::TData::TTxIssueGC : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
+ using TData = TBlobDepot::TData;
+
+ class TData::TTxIssueGC : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
const ui8 Channel;
const ui32 GroupId;
const TGenStep IssuedGenStep;
@@ -30,7 +32,7 @@ namespace NKikimr::NBlobDepot {
}
};
- class TBlobDepot::TData::TTxConfirmGC : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
+ class TData::TTxConfirmGC : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
const ui8 Channel;
const ui32 GroupId;
std::vector<TLogoBlobID> TrashDeleted;
@@ -74,21 +76,21 @@ namespace NKikimr::NBlobDepot {
}
};
- std::optional<TBlobDepot::TData::TValue> TBlobDepot::TData::FindKey(const TKey& key) {
+ std::optional<TData::TValue> TData::FindKey(const TKey& key) {
const auto it = Data.find(key);
return it != Data.end() ? std::make_optional(it->second) : std::nullopt;
}
- TBlobDepot::TData::TRecordsPerChannelGroup& TBlobDepot::TData::GetRecordsPerChannelGroup(TLogoBlobID id) {
+ TData::TRecordsPerChannelGroup& TData::GetRecordsPerChannelGroup(TLogoBlobID id) {
TTabletStorageInfo *info = Self->Info();
const ui32 groupId = info->GroupFor(id.Channel(), id.Generation());
Y_VERIFY(groupId != Max<ui32>());
const auto& key = std::make_tuple(id.TabletID(), id.Channel(), groupId);
- const auto [it, _] = RecordsPerChannelGroup.try_emplace(key, id.TabletID(), id.Channel(), groupId);
+ const auto [it, _] = RecordsPerChannelGroup.emplace(std::piecewise_construct, key, key);
return it->second;
}
- void TBlobDepot::TData::AddDataOnLoad(TKey key, TString value) {
+ void TData::AddDataOnLoad(TKey key, TString value) {
NKikimrBlobDepot::TValue proto;
const bool success = proto.ParseFromString(value);
Y_VERIFY(success);
@@ -100,21 +102,21 @@ namespace NKikimr::NBlobDepot {
});
}
- void TBlobDepot::TData::AddTrashOnLoad(TLogoBlobID id) {
+ void TData::AddTrashOnLoad(TLogoBlobID id) {
auto& record = GetRecordsPerChannelGroup(id);
record.Trash.insert(id);
- OnTrashInserted(record);
+ record.EnqueueForCollectionIfPossible(this);
}
- void TBlobDepot::TData::AddGenStepOnLoad(ui8 channel, ui32 groupId, TGenStep issuedGenStep, TGenStep confirmedGenStep) {
+ void TData::AddGenStepOnLoad(ui8 channel, ui32 groupId, TGenStep issuedGenStep, TGenStep confirmedGenStep) {
const auto& key = std::make_tuple(Self->TabletID(), channel, groupId);
- const auto [it, _] = RecordsPerChannelGroup.try_emplace(key, Self->TabletID(), channel, groupId);
+ const auto [it, _] = RecordsPerChannelGroup.emplace(std::piecewise_construct, key, key);
auto& record = it->second;
record.IssuedGenStep = issuedGenStep;
record.LastConfirmedGenStep = confirmedGenStep;
}
- void TBlobDepot::TData::PutKey(TKey key, TValue&& data) {
+ void TData::PutKey(TKey key, TValue&& data) {
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT08, "PutKey", (TabletId, Self->TabletID()), (Key, key.ToString(Self->Config)),
(KeepState, NKikimrBlobDepot::EKeepState_Name(data.KeepState)));
@@ -130,13 +132,7 @@ namespace NKikimr::NBlobDepot {
Data[std::move(key)] = std::move(data);
}
- void TBlobDepot::TData::OnTrashInserted(TRecordsPerChannelGroup& record) {
- if (!record.CollectGarbageRequestInFlight && record.TabletId == Self->TabletID()) {
- RecordsWithTrash.PushBack(&record);
- }
- }
-
- std::optional<TString> TBlobDepot::TData::UpdateKeepState(TKey key, NKikimrBlobDepot::EKeepState keepState) {
+ std::optional<TString> TData::UpdateKeepState(TKey key, NKikimrBlobDepot::EKeepState keepState) {
const auto it = Data.find(key);
if (it != Data.end() && keepState <= it->second.KeepState) {
return std::nullopt;
@@ -146,7 +142,7 @@ namespace NKikimr::NBlobDepot {
return ToValueProto(value);
}
- void TBlobDepot::TData::DeleteKey(const TKey& key, const std::function<void(TLogoBlobID)>& updateTrash, void *cookie) {
+ void TData::DeleteKey(const TKey& key, const std::function<void(TLogoBlobID)>& updateTrash, void *cookie) {
const auto it = Data.find(key);
Y_VERIFY(it != Data.end());
TValue& value = it->second;
@@ -162,19 +158,16 @@ namespace NKikimr::NBlobDepot {
Data.erase(it);
}
- void TBlobDepot::TData::CommitTrash(void *cookie) {
+ void TData::CommitTrash(void *cookie) {
auto range = InFlightTrash.equal_range(cookie);
for (auto it = range.first; it != range.second; ++it) {
auto& record = GetRecordsPerChannelGroup(it->second);
- const auto usedIt = record.Used.find(it->second);
- Y_VERIFY(usedIt != record.Used.end());
- record.Trash.insert(record.Used.extract(usedIt));
- OnTrashInserted(record);
+ record.MoveToTrash(this, it->second);
}
InFlightTrash.erase(range.first, range.second);
}
- TString TBlobDepot::TData::ToValueProto(const TValue& value) {
+ TString TData::ToValueProto(const TValue& value) {
NKikimrBlobDepot::TValue proto;
if (value.Meta) {
proto.SetMeta(value.Meta);
@@ -193,68 +186,73 @@ namespace NKikimr::NBlobDepot {
return s;
}
- void TBlobDepot::TData::HandleTrash() {
+ void TData::HandleTrash() {
const ui32 generation = Self->Executor()->Generation();
THashMap<ui32, std::unique_ptr<TEvBlobDepot::TEvPushNotify>> outbox;
- for (TRecordsPerChannelGroup& record : RecordsWithTrash) {
+ while (RecordsWithTrash) {
+ TRecordsPerChannelGroup& record = *RecordsWithTrash.PopFront();
+
Y_VERIFY(!record.CollectGarbageRequestInFlight);
Y_VERIFY(record.TabletId == Self->TabletID());
Y_VERIFY(!record.Trash.empty());
- TGenStep nextGenStep(*--record.Trash.end());
-
Y_VERIFY(record.Channel < Self->Channels.size());
auto& channel = Self->Channels[record.Channel];
- if (!channel.GivenIdRanges.IsEmpty()) {
- // minimum blob seq id that can still be written by other party
- const auto minBlobSeqId = TBlobSeqId::FromSequentalNumber(record.Channel, generation,
- channel.GivenIdRanges.GetMinimumValue());
-
- // step we are going to invalidate (including blobs with this one)
- const ui32 invalidatedStep = nextGenStep.Step();
-
- if (minBlobSeqId.Step <= invalidatedStep) {
- const TLogoBlobID maxId(record.TabletId, generation, minBlobSeqId.Step, record.Channel, 0, 0);
- const auto it = record.Trash.lower_bound(maxId);
- if (it != record.Trash.begin()) {
- nextGenStep = TGenStep(*std::prev(it));
- } else {
- nextGenStep = {};
+ TGenStep nextGenStep(*--record.Trash.end());
+
+ // step we are going to invalidate (including blobs with this one)
+ if (TGenStep(record.LeastExpectedBlobId) <= nextGenStep) {
+ const ui32 invalidatedStep = nextGenStep.Step(); // the step we want to invalidate and garbage collect
+
+ // remove invalidated step from allocations
+ auto blobSeqId = TBlobSeqId::FromSequentalNumber(record.Channel, generation, channel.NextBlobSeqId);
+ Y_VERIFY(record.LastConfirmedGenStep < TGenStep(blobSeqId));
+ if (blobSeqId.Step <= invalidatedStep) {
+ blobSeqId.Step = invalidatedStep + 1;
+ blobSeqId.Index = 0;
+ channel.NextBlobSeqId = blobSeqId.ToSequentialNumber();
+ }
+
+ // issue notifications to agents
+ for (auto& [agentId, agent] : Self->Agents) {
+ if (!agent.AgentId) {
+ continue;
}
+ const auto [it, inserted] = agent.InvalidatedStepInFlight.emplace(record.Channel, invalidatedStep);
+ if (inserted || it->second < invalidatedStep) {
+ it->second = invalidatedStep;
- // issue notifications to agents
- for (auto& [agentId, agent] : Self->Agents) {
- if (!agent.ConnectedAgent) {
- continue;
- }
- const auto [it, inserted] = agent.InvalidatedStepInFlight.emplace(record.Channel, invalidatedStep);
- if (inserted || it->second < invalidatedStep) {
- it->second = invalidatedStep;
-
- auto& ev = outbox[agentId];
- if (!ev) {
- ev.reset(new TEvBlobDepot::TEvPushNotify);
- }
- auto *item = ev->Record.AddInvalidatedSteps();
- item->SetChannel(record.Channel);
- item->SetGeneration(generation);
- item->SetInvalidatedStep(invalidatedStep);
+ auto& ev = outbox[agentId];
+ if (!ev) {
+ ev.reset(new TEvBlobDepot::TEvPushNotify);
}
+ auto *item = ev->Record.AddInvalidatedSteps();
+ item->SetChannel(record.Channel);
+ item->SetGeneration(generation);
+ item->SetInvalidatedStep(invalidatedStep);
}
}
+
+ // adjust the barrier to keep it safe now
+ const TLogoBlobID maxId(record.TabletId, record.LeastExpectedBlobId.Generation,
+ record.LeastExpectedBlobId.Step, record.Channel, 0, 0);
+ const auto it = record.Trash.lower_bound(maxId);
+ if (it != record.Trash.begin()) {
+ nextGenStep = TGenStep(*std::prev(it));
+ } else {
+ nextGenStep = {};
+ }
}
auto keep = std::make_unique<TVector<TLogoBlobID>>();
auto doNotKeep = std::make_unique<TVector<TLogoBlobID>>();
- // FIXME: check for blob leaks when LastConfirmedGenStep is not properly persisted
for (auto it = record.Trash.begin(); it != record.Trash.end() && TGenStep(*it) <= record.LastConfirmedGenStep; ++it) {
doNotKeep->push_back(*it);
}
- // FIXME: check for blob loss when LastConfirmedGenStep is not properly persisted
const TLogoBlobID keepFrom(record.TabletId, record.LastConfirmedGenStep.Generation(),
record.LastConfirmedGenStep.Step(), record.Channel, 0, 0);
for (auto it = record.Used.upper_bound(keepFrom); it != record.Used.end() && TGenStep(*it) <= nextGenStep; ++it) {
@@ -270,7 +268,7 @@ namespace NKikimr::NBlobDepot {
const bool collect = nextGenStep > record.LastConfirmedGenStep;
if (!keep && !doNotKeep && !collect) {
- continue; // skip this one
+ continue; // nothing to do here
}
auto ev = std::make_unique<TEvBlobStorage::TEvCollectGarbage>(record.TabletId, generation,
@@ -284,16 +282,10 @@ namespace NKikimr::NBlobDepot {
record.TrashInFlight.insert(record.TrashInFlight.end(), record.Trash.begin(), record.Trash.end());
record.IssuedGenStep = Max(nextGenStep, record.LastConfirmedGenStep);
- auto blobSeqId = TBlobSeqId::FromSequentalNumber(record.Channel, generation, channel.NextBlobSeqId);
- Y_VERIFY(record.LastConfirmedGenStep < TGenStep(blobSeqId));
- if (TGenStep(blobSeqId) <= nextGenStep) {
- blobSeqId.Step = nextGenStep.Step() + 1;
- blobSeqId.Index = 0;
- channel.NextBlobSeqId = blobSeqId.ToSequentialNumber();
- }
+ record.TIntrusiveListItem<TRecordsPerChannelGroup, TRecordWithTrash>::Unlink();
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT09, "issuing TEvCollectGarbage", (TabletId, Self->TabletID()),
- (Channel, record.Channel), (GroupId, record.GroupId), (Msg, ev->ToString()),
+ (Channel, int(record.Channel)), (GroupId, record.GroupId), (Msg, ev->ToString()),
(LastConfirmedGenStep, record.LastConfirmedGenStep), (IssuedGenStep, record.IssuedGenStep),
(TrashInFlight.size, record.TrashInFlight.size()));
@@ -305,8 +297,6 @@ namespace NKikimr::NBlobDepot {
}
}
- RecordsWithTrash.Clear();
-
for (auto& [agentId, ev] : outbox) {
TAgent& agent = Self->GetAgent(agentId);
const ui64 id = ++agent.LastRequestId;
@@ -314,79 +304,156 @@ namespace NKikimr::NBlobDepot {
for (const auto& item : ev->Record.GetInvalidatedSteps()) {
request[item.GetChannel()] = item.GetInvalidatedStep();
}
- if (const auto& actorId = agent.ConnectedAgent) {
- TActivationContext::Send(new IEventHandle(*actorId, Self->SelfId(), ev.release(), 0, id));
- }
+
+ Y_VERIFY(agent.AgentId);
+ TActivationContext::Send(new IEventHandle(*agent.AgentId, Self->SelfId(), ev.release(), 0, id));
}
}
- void TBlobDepot::TData::Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev) {
+ void TData::Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev) {
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT10, "TEvCollectGarbageResult", (TabletId, ev->Get()->TabletId),
(Channel, ev->Get()->Channel), (GroupId, ev->Cookie), (Msg, ev->Get()->ToString()));
const auto& key = std::make_tuple(ev->Get()->TabletId, ev->Get()->Channel, ev->Cookie);
const auto it = RecordsPerChannelGroup.find(key);
Y_VERIFY(it != RecordsPerChannelGroup.end());
auto& record = it->second;
- Y_VERIFY(record.CollectGarbageRequestInFlight);
if (ev->Get()->Status == NKikimrProto::OK) {
- for (const TLogoBlobID& id : record.TrashInFlight) { // make it merge
- record.Trash.erase(id);
- }
- record.LastConfirmedGenStep = record.IssuedGenStep;
+ Y_VERIFY(record.CollectGarbageRequestInFlight);
+ record.OnSuccessfulCollect(this);
Self->Execute(std::make_unique<TTxConfirmGC>(Self, record.Channel, record.GroupId,
std::exchange(record.TrashInFlight, {}), record.LastConfirmedGenStep));
} else {
- record.CollectGarbageRequestInFlight = false;
- OnTrashInserted(record);
+ record.ClearInFlight(this);
HandleTrash();
}
}
- void TBlobDepot::TData::Handle(TEvBlobDepot::TEvPushNotifyResult::TPtr ev) {
- TAgent& agent = Self->GetAgent(ev->Recipient);
+ void TData::Handle(TEvBlobDepot::TEvPushNotifyResult::TPtr ev) {
+ TAgent& agent = Self->GetAgent(ev->Sender.NodeId());
const ui32 generation = Self->Executor()->Generation();
+
+ std::set<TBlobSeqId> writesInFlight;
+ for (const auto& item : ev->Get()->Record.GetWritesInFlight()) {
+ writesInFlight.insert(TBlobSeqId::FromProto(item));
+ }
+
if (const auto it = agent.InvalidateStepRequests.find(ev->Cookie); it != agent.InvalidateStepRequests.end()) {
for (const auto& [channel, invalidatedStep] : it->second) {
const ui32 channel_ = channel;
const ui32 invalidatedStep_ = invalidatedStep;
+ auto& agentGivenIdRanges = agent.GivenIdRanges[channel];
+ auto& givenIdRanges = Self->Channels[channel].GivenIdRanges;
+
+ auto begin = writesInFlight.lower_bound(TBlobSeqId{channel, 0, 0, 0});
+ auto end = writesInFlight.upper_bound(TBlobSeqId{channel, Max<ui32>(), Max<ui32>(), TBlobSeqId::MaxIndex});
+
+ auto makeWritesInFlight = [&] {
+ TStringStream s;
+ s << "[";
+ for (auto it = begin; it != end; ++it) {
+ s << (it != begin ? " " : "") << it->ToString();
+ }
+ s << "]";
+ return s.Str();
+ };
+
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT99, "Trim", (TabletId, Self->TabletID()), (AgentId, agent.ConnectedNodeId),
- (Channel, channel_), (InvalidatedStep, invalidatedStep_),
+ (Id, ev->Cookie), (Channel, channel_), (InvalidatedStep, invalidatedStep_),
(GivenIdRanges, Self->Channels[channel_].GivenIdRanges),
- (Agent.GivenIdRanges, agent.GivenIdRanges[channel_]));
- Self->Channels[channel].GivenIdRanges.Trim(channel, generation, invalidatedStep);
- agent.GivenIdRanges[channel].Trim(channel, generation, invalidatedStep);
+ (Agent.GivenIdRanges, agent.GivenIdRanges[channel_]),
+ (WritesInFlight, makeWritesInFlight()));
+
+ for (auto it = begin; it != end; ++it) {
+ Y_VERIFY_S(agentGivenIdRanges.GetPoint(it->ToSequentialNumber()), "blobSeqId# " << it->ToString());
+ Y_VERIFY_S(givenIdRanges.GetPoint(it->ToSequentialNumber()), "blobSeqId# " << it->ToString());
+ }
+
+ const TBlobSeqId trimmedBlobSeqId{channel, generation, invalidatedStep, TBlobSeqId::MaxIndex};
+ const ui64 validSince = trimmedBlobSeqId.ToSequentialNumber() + 1;
+ givenIdRanges.Subtract(agentGivenIdRanges.Trim(validSince));
+
+ for (auto it = begin; it != end; ++it) {
+ agentGivenIdRanges.AddPoint(it->ToSequentialNumber());
+ givenIdRanges.AddPoint(it->ToSequentialNumber());
+ }
+
+ OnLeastExpectedBlobIdChange(channel);
}
agent.InvalidateStepRequests.erase(it);
- }
- for (const auto& item : ev->Get()->Record.GetWritesInFlight()) {
- const auto blobSeqId = TBlobSeqId::FromProto(item);
- const ui64 value = blobSeqId.ToSequentialNumber();
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT99, "WriteInFlight", (TabletId, Self->TabletID()), (AgentId, agent.ConnectedNodeId),
- (BlobSeqId, blobSeqId), (Value, value),
- (GivenIdRanges, Self->Channels[blobSeqId.Channel].GivenIdRanges),
- (Agent.GivenIdRanges, agent.GivenIdRanges[blobSeqId.Channel]));
- Self->Channels[blobSeqId.Channel].GivenIdRanges.AddPoint(value);
- agent.GivenIdRanges[blobSeqId.Channel].AddPoint(value);
+ } else {
+ Y_VERIFY_DEBUG(false);
}
HandleTrash();
}
- void TBlobDepot::TData::OnCommitConfirmedGC(ui8 channel, ui32 groupId) {
+ void TData::OnCommitConfirmedGC(ui8 channel, ui32 groupId) {
const auto& key = std::make_tuple(Self->TabletID(), channel, groupId);
const auto it = RecordsPerChannelGroup.find(key);
Y_VERIFY(it != RecordsPerChannelGroup.end());
- auto& record = it->second;
- Y_VERIFY(record.CollectGarbageRequestInFlight);
- record.CollectGarbageRequestInFlight = false;
- if (!record.Trash.empty()) {
- OnTrashInserted(record);
- HandleTrash();
- }
+ it->second.ClearInFlight(this);
}
- bool TBlobDepot::TData::CanBeCollected(ui32 groupId, TBlobSeqId id) const {
+ bool TData::CanBeCollected(ui32 groupId, TBlobSeqId id) const {
const auto it = RecordsPerChannelGroup.find(std::make_tuple(Self->TabletID(), id.Channel, groupId));
return it != RecordsPerChannelGroup.end() && TGenStep(id) <= it->second.IssuedGenStep;
}
+ void TData::OnLeastExpectedBlobIdChange(ui8 channel) {
+ auto& ch = Self->Channels[channel];
+ const ui64 minSequenceNumber = ch.GivenIdRanges.IsEmpty()
+ ? ch.NextBlobSeqId
+ : ch.GivenIdRanges.GetMinimumValue();
+ const TBlobSeqId leastExpectedBlobId = TBlobSeqId::FromSequentalNumber(channel, Self->Executor()->Generation(),
+ minSequenceNumber);
+
+ const TTabletStorageInfo *info = Self->Info();
+ const TTabletChannelInfo *storageChannel = info->ChannelInfo(leastExpectedBlobId.Channel);
+ Y_VERIFY(storageChannel);
+ for (const auto& entry : storageChannel->History) {
+ const auto& key = std::make_tuple(info->TabletID, storageChannel->Channel, entry.GroupID);
+ auto [it, _] = RecordsPerChannelGroup.emplace(std::piecewise_construct, key, key);
+ auto& record = it->second;
+ record.OnLeastExpectedBlobIdChange(this, leastExpectedBlobId);
+ }
+ }
+
+ void TData::TRecordsPerChannelGroup::MoveToTrash(TData *self, TLogoBlobID id) {
+ const auto usedIt = Used.find(id);
+ Y_VERIFY(usedIt != Used.end());
+ Trash.insert(Used.extract(usedIt));
+ EnqueueForCollectionIfPossible(self);
+ }
+
+ void TData::TRecordsPerChannelGroup::OnSuccessfulCollect(TData *self) {
+ auto it = Trash.begin();
+ for (const TLogoBlobID& id : TrashInFlight) {
+ for (; it != Trash.end() && *it < id; ++it) {}
+ Y_VERIFY(it != Trash.end() && *it == id);
+ it = Trash.erase(it);
+ }
+ LastConfirmedGenStep = IssuedGenStep;
+ EnqueueForCollectionIfPossible(self);
+ }
+
+ void TData::TRecordsPerChannelGroup::OnLeastExpectedBlobIdChange(TData *self, TBlobSeqId leastExpectedBlobId) {
+ Y_VERIFY_S(LeastExpectedBlobId <= leastExpectedBlobId, "Prev# " << LeastExpectedBlobId.ToString()
+ << " Next# " << leastExpectedBlobId.ToString());
+ if (LeastExpectedBlobId < leastExpectedBlobId) {
+ LeastExpectedBlobId = leastExpectedBlobId;
+ EnqueueForCollectionIfPossible(self);
+ }
+ }
+
+ void TData::TRecordsPerChannelGroup::ClearInFlight(TData *self) {
+ Y_VERIFY(CollectGarbageRequestInFlight);
+ CollectGarbageRequestInFlight = false;
+ EnqueueForCollectionIfPossible(self);
+ }
+
+ void TData::TRecordsPerChannelGroup::EnqueueForCollectionIfPossible(TData *self) {
+ if (!CollectGarbageRequestInFlight && TabletId == self->Self->TabletID() && Empty() && !Trash.empty()) {
+ self->RecordsWithTrash.PushBack(this);
+ }
+ }
+
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h
index 8e8b4ba84e4..160f1f5e54f 100644
--- a/ydb/core/blob_depot/data.h
+++ b/ydb/core/blob_depot/data.h
@@ -10,14 +10,9 @@ namespace NKikimr::NBlobDepot {
public:
class alignas(TString) TKey {
- union {
- ui64 Raw64[4];
- ui8 Bytes[32];
- char String[31];
- struct {
- ui8 Padding[31];
- ui8 Type;
- };
+ struct TData {
+ ui8 Bytes[31];
+ ui8 Type;
} Data;
static constexpr size_t TypeLenByteIdx = 31;
@@ -25,9 +20,12 @@ namespace NKikimr::NBlobDepot {
static constexpr char BlobIdType = 32;
static constexpr char StringType = 33;
+ static_assert(sizeof(Data) == 32);
+
public:
TKey() {
- Reset();
+ Data.Type = EncodeInlineStringLenAsTypeByte(0);
+ Data.Bytes[0] = 0;
}
explicit TKey(TLogoBlobID id) {
@@ -35,22 +33,14 @@ namespace NKikimr::NBlobDepot {
reinterpret_cast<TLogoBlobID&>(Data.Bytes) = id;
}
- explicit TKey(TStringBuf value) {
+ template<typename T, typename = std::enable_if_t<std::is_constructible_v<TString, T&&>>>
+ explicit TKey(T&& value) {
if (value.size() <= MaxInlineStringLen) {
Data.Type = EncodeInlineStringLenAsTypeByte(value.size());
- memcpy(Data.String, value.data(), value.size());
- Data.String[value.size()] = 0;
- } else {
- Data.Type = StringType;
- new(Data.Bytes) TString(value);
- }
- }
-
- explicit TKey(TString value) {
- if (value.size() <= MaxInlineStringLen) {
- Data.Type = EncodeInlineStringLenAsTypeByte(value.size());
- memcpy(Data.String, value.data(), value.size());
- Data.String[value.size()] = 0;
+ memcpy(Data.Bytes, value.data(), value.size());
+ if (value.size() != MaxInlineStringLen) {
+ Data.Bytes[value.size()] = 0;
+ }
} else {
Data.Type = StringType;
new(Data.Bytes) TString(std::move(value));
@@ -187,13 +177,14 @@ namespace NKikimr::NBlobDepot {
GetString().~TString();
}
Data.Type = EncodeInlineStringLenAsTypeByte(0);
+ Data.Bytes[0] = 0;
}
TStringBuf GetStringBuf() const {
if (Data.Type == StringType) {
return GetString();
} else {
- return TStringBuf(Data.String, DecodeInlineStringLenFromTypeByte(Data.Type));
+ return TStringBuf(reinterpret_cast<const char*>(Data.Bytes), DecodeInlineStringLenFromTypeByte(Data.Type));
}
}
@@ -249,12 +240,19 @@ namespace NKikimr::NBlobDepot {
TGenStep IssuedGenStep; // currently in flight or already confirmed
TGenStep LastConfirmedGenStep;
bool CollectGarbageRequestInFlight = false;
+ TBlobSeqId LeastExpectedBlobId;
TRecordsPerChannelGroup(ui64 tabletId, ui8 channel, ui32 groupId)
: TabletId(tabletId)
, Channel(channel)
, GroupId(groupId)
{}
+
+ void MoveToTrash(TData *self, TLogoBlobID id);
+ void OnSuccessfulCollect(TData *self);
+ void OnLeastExpectedBlobIdChange(TData *self, TBlobSeqId leastExpectedBlobId);
+ void ClearInFlight(TData *self);
+ void EnqueueForCollectionIfPossible(TData *self);
};
std::map<TKey, TValue> Data;
@@ -312,7 +310,6 @@ namespace NKikimr::NBlobDepot {
void PutKey(TKey key, TValue&& data);
- void OnTrashInserted(TRecordsPerChannelGroup& record);
std::optional<TString> UpdateKeepState(TKey key, NKikimrBlobDepot::EKeepState keepState);
void DeleteKey(const TKey& key, const std::function<void(TLogoBlobID)>& updateTrash, void *cookie);
void CommitTrash(void *cookie);
@@ -323,6 +320,8 @@ namespace NKikimr::NBlobDepot {
bool CanBeCollected(ui32 groupId, TBlobSeqId id) const;
+ void OnLeastExpectedBlobIdChange(ui8 channel);
+
static TString ToValueProto(const TValue& value);
template<typename TCallback>
diff --git a/ydb/core/blob_depot/defs.h b/ydb/core/blob_depot/defs.h
index ce566644ccf..8fceb52d73c 100644
--- a/ydb/core/blob_depot/defs.h
+++ b/ydb/core/blob_depot/defs.h
@@ -6,6 +6,7 @@
#include <ydb/core/tablet_flat/tablet_flat_executed.h>
#include <ydb/core/tablet_flat/flat_cxx_database.h>
#include <ydb/core/protos/blob_depot.pb.h>
+#include <ydb/core/util/format.h>
#include <ydb/core/util/stlog.h>
#include <library/cpp/monlib/service/pages/templates.h>
diff --git a/ydb/core/blob_depot/events.h b/ydb/core/blob_depot/events.h
index 3203d8800c9..b5f0a2542a9 100644
--- a/ydb/core/blob_depot/events.h
+++ b/ydb/core/blob_depot/events.h
@@ -54,7 +54,7 @@ namespace NKikimr {
BLOBDEPOT_EVENT_PB(EvApplyConfig, TxId);
BLOBDEPOT_EVENT_PB(EvApplyConfigResult, TabletId, TxId);
BLOBDEPOT_EVENT_PB(EvRegisterAgent, VirtualGroupId, AgentInstanceId);
- BLOBDEPOT_EVENT_PB_NO_ARGS(EvRegisterAgentResult);
+ BLOBDEPOT_EVENT_PB(EvRegisterAgentResult, Generation);
BLOBDEPOT_EVENT_PB(EvAllocateIds, ChannelKind, Count);
BLOBDEPOT_EVENT_PB(EvAllocateIdsResult, ChannelKind, Generation);
BLOBDEPOT_EVENT_PB_NO_ARGS(EvPushNotify);
@@ -73,15 +73,15 @@ namespace NKikimr {
template<typename TEvent>
struct TResponseFor {};
- template<> struct TResponseFor<TEvApplyConfig> { using Type = TEvApplyConfigResult; };
- template<> struct TResponseFor<TEvRegisterAgent> { using Type = TEvRegisterAgentResult; };
- template<> struct TResponseFor<TEvAllocateIds> { using Type = TEvAllocateIdsResult; };
- template<> struct TResponseFor<TEvPushNotify> { using Type = TEvPushNotifyResult; };
- template<> struct TResponseFor<TEvBlock> { using Type = TEvBlockResult; };
- template<> struct TResponseFor<TEvQueryBlocks> { using Type = TEvQueryBlocksResult; };
+ template<> struct TResponseFor<TEvApplyConfig> { using Type = TEvApplyConfigResult; };
+ template<> struct TResponseFor<TEvRegisterAgent> { using Type = TEvRegisterAgentResult; };
+ template<> struct TResponseFor<TEvAllocateIds> { using Type = TEvAllocateIdsResult; };
+ template<> struct TResponseFor<TEvPushNotify> { using Type = TEvPushNotifyResult; };
+ template<> struct TResponseFor<TEvBlock> { using Type = TEvBlockResult; };
+ template<> struct TResponseFor<TEvQueryBlocks> { using Type = TEvQueryBlocksResult; };
template<> struct TResponseFor<TEvCollectGarbage> { using Type = TEvCollectGarbageResult; };
- template<> struct TResponseFor<TEvCommitBlobSeq> { using Type = TEvCommitBlobSeqResult; };
- template<> struct TResponseFor<TEvResolve> { using Type = TEvResolveResult; };
+ template<> struct TResponseFor<TEvCommitBlobSeq> { using Type = TEvCommitBlobSeqResult; };
+ template<> struct TResponseFor<TEvResolve> { using Type = TEvResolveResult; };
template<typename TRequestEvent, typename... TArgs>
static auto MakeResponseFor(TEventHandle<TRequestEvent>& ev, TActorId selfId, TArgs&&... args) {
diff --git a/ydb/core/blob_depot/given_id_range.cpp b/ydb/core/blob_depot/given_id_range.cpp
index 74cd9a4e8bd..4b7f4ebe840 100644
--- a/ydb/core/blob_depot/given_id_range.cpp
+++ b/ydb/core/blob_depot/given_id_range.cpp
@@ -4,30 +4,93 @@ namespace NKikimr::NBlobDepot {
void TGivenIdRange::IssueNewRange(ui64 begin, ui64 end) {
Y_VERIFY(begin < end);
+ NumAvailableItems += end - begin;
- const auto [it, inserted] = Ranges.emplace(begin, end);
- Y_VERIFY(inserted);
+ const auto it = Ranges.upper_bound(begin);
+ bool adjacentWithNext = false, adjacentWithPrev = false;
- if (it != Ranges.begin()) {
- const auto& prev = *std::prev(it);
- Y_VERIFY(prev.End <= begin);
- }
- if (std::next(it) != Ranges.end()) {
- const auto& next = *std::next(it);
+ if (it != Ranges.end()) {
+ TRange& next = const_cast<TRange&>(*it);
+ if (next.Begin < end) {
+ Y_VERIFY(next.Bits.FirstNonZeroBit() >= end - next.Begin);
+ next.Bits.Set(0, end - next.Begin);
+ next.NumSetBits += end - next.Begin;
+ end = next.Begin;
+ }
Y_VERIFY(end <= next.Begin);
+ adjacentWithNext = end == next.Begin;
}
- NumAvailableItems += end - begin;
+ if (it != Ranges.begin()) {
+ TRange& prev = const_cast<TRange&>(*std::prev(it));
+ if (prev.End <= begin) {
+ // nothing interesting here -- no any kind overlap
+ } else if (end <= prev.End) {
+ // the [begin, end) pair is just a subrange of 'prev' item
+ Y_VERIFY(prev.Begin <= begin);
+ Y_VERIFY(!adjacentWithNext);
+ prev.Bits.Set(begin - prev.Begin, end - prev.Begin);
+ prev.NumSetBits += end - begin;
+ return;
+ } else {
+ Y_VERIFY(prev.Begin < begin);
+ prev.Bits.Set(begin - prev.Begin, prev.End - prev.Begin);
+ prev.NumSetBits += prev.End - begin;
+ begin = prev.End;
+ }
+ Y_VERIFY_DEBUG(prev.End <= begin);
+ adjacentWithPrev = prev.End == begin;
+ }
+
+ if (adjacentWithNext && adjacentWithPrev) {
+ // [ prev ) [ next )
+ // ^ ^
+ // begin end
+ TRange& next = const_cast<TRange&>(*it);
+ TRange& prev = const_cast<TRange&>(*std::prev(it));
+ prev.Bits |= next.Bits << end - prev.Begin;
+ prev.Bits.Set(begin - prev.Begin, end - prev.Begin);
+ prev.NumSetBits += next.NumSetBits + end - begin;
+ prev.End = next.End;
+ Ranges.erase(it);
+ } else if (adjacentWithNext) {
+ TRange& next = const_cast<TRange&>(*it);
+ const ui32 shift = end - begin;
+ next.Bits <<= shift;
+ next.Bits.Set(0, shift);
+ next.NumSetBits += shift;
+ next.Begin = begin;
+ } else if (adjacentWithPrev) {
+ TRange& prev = const_cast<TRange&>(*std::prev(it));
+ prev.Bits.Set(prev.End - prev.Begin, end - prev.Begin);
+ prev.NumSetBits += end - begin;
+ prev.End = end;
+ } else {
+ Ranges.emplace_hint(it, begin, end);
+ }
}
void TGivenIdRange::AddPoint(ui64 value) {
IssueNewRange(value, value + 1);
}
- void TGivenIdRange::RemovePoint(ui64 value) {
+ void TGivenIdRange::RemovePoint(ui64 value, bool *wasLeast) {
auto it = Ranges.upper_bound(value);
Y_VERIFY(it != Ranges.begin());
- Pop(std::prev(it), value);
+ --it;
+ if (wasLeast) {
+ *wasLeast = it == Ranges.begin() && it->Bits.FirstNonZeroBit() + it->Begin == value;
+ }
+ Pop(it, value);
+ }
+
+ bool TGivenIdRange::GetPoint(ui64 value) const {
+ const auto it = Ranges.upper_bound(value);
+ if (it == Ranges.begin()) {
+ return false;
+ }
+ const TRange& range = *std::prev(it);
+ return range.Begin <= value && value < range.End && range.Bits[value - range.Begin];
}
bool TGivenIdRange::IsEmpty() const {
@@ -35,6 +98,14 @@ namespace NKikimr::NBlobDepot {
}
ui32 TGivenIdRange::GetNumAvailableItems() const {
+#ifndef NDEBUG
+ ui32 count = 0;
+ for (const auto& range : Ranges) {
+ Y_VERIFY(range.NumSetBits == range.Bits.Count());
+ count += range.NumSetBits;
+ }
+ Y_VERIFY(count == NumAvailableItems);
+#endif
return NumAvailableItems;
}
@@ -53,38 +124,88 @@ namespace NKikimr::NBlobDepot {
return value;
}
- void TGivenIdRange::Trim(ui8 channel, ui32 generation, ui32 invalidatedStep) {
- const ui64 validSince = 1 + TBlobSeqId{channel, generation, invalidatedStep, TBlobSeqId::MaxIndex}.ToSequentialNumber();
+ TGivenIdRange TGivenIdRange::Trim(ui64 validSince) {
+ TGivenIdRange result;
while (!Ranges.empty()) {
- const auto it = Ranges.begin();
+ auto it = Ranges.begin();
auto& range = const_cast<TRange&>(*it);
if (range.End <= validSince) {
- NumAvailableItems -= range.Bits.Count();
- Ranges.erase(it);
+ NumAvailableItems -= range.NumSetBits;
+ result.NumAvailableItems += range.NumSetBits;
+ result.Ranges.insert(result.Ranges.end(), Ranges.extract(it++));
+ continue;
} else if (range.Begin < validSince) {
const ui32 len = validSince - range.Begin;
- for (ui32 i = 0; i < len; ++i) {
- range.NumSetBits -= range.Bits[i];
- NumAvailableItems -= range.Bits[i];
+ TDynBitMap mask;
+ mask.Set(0, len);
+
+ const TDynBitMap setBits = range.Bits & mask;
+ if (const size_t numSetBits = setBits.Count()) {
+ // clear set bits in the origin chunk
+ range.Bits -= setBits;
+ range.NumSetBits -= numSetBits;
+ NumAvailableItems -= numSetBits;
+ if (!range.NumSetBits) {
+ Ranges.erase(it);
+ }
+
+ // put 'em into the result
+ TRange& r = const_cast<TRange&>(*result.Ranges.emplace_hint(result.Ranges.end(), range.Begin, range.End, TRange::Zero));
+ r.Bits |= setBits;
+ r.NumSetBits += numSetBits;
+ result.NumAvailableItems += numSetBits;
}
- range.Bits.Reset(0, len);
- } else {
- break;
}
+ break;
}
+
+ return result;
}
void TGivenIdRange::Subtract(const TGivenIdRange& other) {
- for (const TRange& range : other.Ranges) {
- const auto it = Ranges.find(range.Begin);
- Y_VERIFY(it != Ranges.end());
- Y_VERIFY(range.End == it->End);
- Y_VERIFY(range.NumSetBits == it->NumSetBits);
- Y_VERIFY(range.Bits == it->Bits);
- NumAvailableItems -= range.NumSetBits;
- Ranges.erase(it);
+ auto it = Ranges.begin();
+ auto otherIt = other.Ranges.begin();
+
+ while (it != Ranges.end()) {
+ if (it->End <= otherIt->Begin) {
+ ++it;
+ } else if (otherIt->End <= it->Begin) {
+ Y_FAIL();
+ } else {
+ const ui64 begin = Max(it->Begin, otherIt->Begin);
+ const ui64 end = Min(it->End, otherIt->End);
+ Y_VERIFY(begin < end);
+
+ TDynBitMap subtractedMask = otherIt->Bits;
+ if (otherIt->Begin < begin) {
+ subtractedMask >>= begin - otherIt->Begin;
+ }
+ subtractedMask.Reset(end - begin, subtractedMask.Size());
+ if (it->Begin < begin) {
+ subtractedMask <<= begin - it->Begin;
+ }
+ Y_VERIFY_DEBUG((it->Bits & subtractedMask) == subtractedMask);
+
+ TRange& r = const_cast<TRange&>(*it);
+ r.Bits -= subtractedMask;
+ r.NumSetBits -= subtractedMask.Count();
+ if (!r.NumSetBits) {
+ it = Ranges.erase(it);
+ } else if (it->End == end) {
+ ++it;
+ }
+
+ if (otherIt->End == end) {
+ ++otherIt;
+ }
+
+ NumAvailableItems -= subtractedMask.Count();
+ }
}
+
+ // ensure we have processed all other range
+ Y_VERIFY(otherIt == other.Ranges.end());
}
void TGivenIdRange::Output(IOutputStream& s) const {
@@ -109,7 +230,7 @@ namespace NKikimr::NBlobDepot {
void TGivenIdRange::Pop(TRanges::iterator it, ui64 value) {
TRange& range = const_cast<TRange&>(*it);
- Y_VERIFY(range.Begin <= value && value < range.End);
+ Y_VERIFY_S(range.Begin <= value && value < range.End, "value# " << value << " this# " << ToString());
const size_t offset = value - range.Begin;
Y_VERIFY(range.Bits[offset]);
range.Bits.Reset(offset);
@@ -120,4 +241,39 @@ namespace NKikimr::NBlobDepot {
--NumAvailableItems;
}
+ std::vector<bool> TGivenIdRange::ToDebugArray(size_t numItems) const {
+ std::vector<bool> res(numItems, false);
+ for (const TRange& item : Ranges) {
+ for (ui32 i = item.Bits.FirstNonZeroBit(); i != item.Bits.Size(); i = item.Bits.NextNonZeroBit(i)) {
+ Y_VERIFY_S(i < item.End - item.Begin, "i# " << i << " this# " << ToString());
+ res[item.Begin + i] = true;
+ }
+ }
+ return res;
+ }
+
+ void TGivenIdRange::CheckConsistency() const {
+ ui32 numAvailableItems = 0;
+
+ for (auto it = Ranges.begin(); it != Ranges.end(); ++it) {
+ const TRange& range = *it;
+ Y_VERIFY_S(range.Begin < range.End, ToString());
+
+ if (it != Ranges.begin()) {
+ const TRange& prev = *std::prev(it);
+ Y_VERIFY_S(prev.End < range.Begin, ToString());
+ }
+
+ Y_VERIFY_S(range.NumSetBits, ToString());
+ Y_VERIFY_S(range.NumSetBits == range.Bits.Count(), ToString());
+ for (ui32 i = range.End - range.Begin; i < range.Bits.Size(); ++i) {
+ Y_VERIFY_S(!range.Bits[i], "begin# " << range.Begin << " i# " << i << " this# " << ToString());
+ }
+
+ numAvailableItems += range.NumSetBits;
+ }
+
+ Y_VERIFY_S(numAvailableItems == NumAvailableItems, ToString());
+ }
+
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/mon_main.cpp b/ydb/core/blob_depot/mon_main.cpp
index 7f5544420e1..e64fb0dbc8e 100644
--- a/ydb/core/blob_depot/mon_main.cpp
+++ b/ydb/core/blob_depot/mon_main.cpp
@@ -270,6 +270,56 @@ namespace NKikimr::NBlobDepot {
void TBlobDepot::RenderMainPage(IOutputStream& s) {
HTML(s) {
s << "<a href='app?TabletID=" << TabletID() << "&page=data'>Contained data</a><br>";
+
+ DIV_CLASS("panel panel-info") {
+ DIV_CLASS("panel-heading") {
+ s << "Stats";
+ }
+ DIV_CLASS("panel-body") {
+ TABLE_CLASS("table") {
+ TABLEHEAD() {
+ TABLER() {
+ TABLEH() { s << "Parameter"; }
+ TABLEH() { s << "Value"; }
+ }
+ }
+ TABLEBODY() {
+ auto outSize = [&](ui64 size) {
+ static const char *suffixes[] = {
+ "B", "KiB", "MiB", "GiB", "TiB", "PiB", nullptr
+ };
+ FormatHumanReadable(s, size, 1024, 2, suffixes);
+ };
+ TABLER() {
+ TABLED() { s << "Data, bytes"; }
+ TABLED() {
+ ui64 total = 0;
+ Data->EnumerateRefCount([&](TLogoBlobID id, ui32 /*refCount*/) {
+ total += id.BlobSize();
+ });
+ outSize(total);
+ }
+ }
+
+ ui64 trashInFlight = 0;
+ ui64 trashPending = 0;
+ Data->EnumerateTrash([&](ui32 /*groupId*/, TLogoBlobID id, bool inFlight) {
+ (inFlight ? trashInFlight : trashPending) += id.BlobSize();
+ });
+
+ TABLER() {
+ TABLED() { s << "Trash in flight, bytes"; }
+ TABLED() { outSize(trashInFlight); }
+ }
+
+ TABLER() {
+ TABLED() { s << "Trash pending, bytes"; }
+ TABLED() { outSize(trashPending); }
+ }
+ }
+ }
+ }
+ }
}
}
diff --git a/ydb/core/blob_depot/op_commit_blob_seq.cpp b/ydb/core/blob_depot/op_commit_blob_seq.cpp
index bd85b9eda49..a82df128315 100644
--- a/ydb/core/blob_depot/op_commit_blob_seq.cpp
+++ b/ydb/core/blob_depot/op_commit_blob_seq.cpp
@@ -42,7 +42,7 @@ namespace NKikimr::NBlobDepot {
if (blobSeqId.Generation == generation) {
// check for internal sanity -- we can't issue barriers on given ids without confirmed trimming
- Y_VERIFY(!canBeCollected);
+ Y_VERIFY_S(!canBeCollected, "BlobSeqId# " << blobSeqId.ToString());
} else if (canBeCollected) {
// we can't accept this record, because it is potentially under already issued barrier
responseItem->SetStatus(NKikimrProto::ERROR);
@@ -82,8 +82,14 @@ namespace NKikimr::NBlobDepot {
(AgentId, agent.ConnectedNodeId), (BlobSeqId, blobSeqId), (Value, value),
(GivenIdRanges, Self->Channels[blobSeqId.Channel].GivenIdRanges),
(Agent.GivenIdRanges, agent.GivenIdRanges[blobSeqId.Channel]));
- agent.GivenIdRanges[blobSeqId.Channel].RemovePoint(value);
- Self->Channels[blobSeqId.Channel].GivenIdRanges.RemovePoint(value);
+
+ agent.GivenIdRanges[blobSeqId.Channel].RemovePoint(value, nullptr);
+
+ bool wasLeast;
+ Self->Channels[blobSeqId.Channel].GivenIdRanges.RemovePoint(value, &wasLeast);
+ if (wasLeast) {
+ Self->Data->OnLeastExpectedBlobIdChange(blobSeqId.Channel);
+ }
}
bool CheckKeyAgainstBarrier(const TString& key, NKikimrBlobDepot::TEvCommitBlobSeqResult::TItem *responseItem) {
diff --git a/ydb/core/blob_depot/types.h b/ydb/core/blob_depot/types.h
index 8cf0e3899ab..7e6fd06c1b8 100644
--- a/ydb/core/blob_depot/types.h
+++ b/ydb/core/blob_depot/types.h
@@ -98,8 +98,8 @@ namespace NKikimr::NBlobDepot {
class TGivenIdRange {
struct TRange {
- const ui64 Begin;
- const ui64 End;
+ ui64 Begin;
+ ui64 End;
ui32 NumSetBits = 0;
TDynBitMap Bits;
@@ -111,6 +111,16 @@ namespace NKikimr::NBlobDepot {
Bits.Set(0, end - begin);
}
+ static constexpr struct TZero {} Zero{};
+
+ TRange(ui64 begin, ui64 end, TZero)
+ : Begin(begin)
+ , End(end)
+ , NumSetBits(0)
+ {
+ Bits.Reset(0, end - begin);
+ }
+
struct TCompare {
bool operator ()(const TRange& x, const TRange& y) const { return x.Begin < y.Begin; }
bool operator ()(const TRange& x, ui64 y) const { return x.Begin < y; }
@@ -126,7 +136,8 @@ namespace NKikimr::NBlobDepot {
public:
void IssueNewRange(ui64 begin, ui64 end);
void AddPoint(ui64 value);
- void RemovePoint(ui64 value);
+ void RemovePoint(ui64 value, bool *wasLeast);
+ bool GetPoint(ui64 value) const;
bool IsEmpty() const;
ui32 GetNumAvailableItems() const;
@@ -134,12 +145,14 @@ namespace NKikimr::NBlobDepot {
ui64 Allocate();
void Subtract(const TGivenIdRange& other);
-
- void Trim(ui8 channel, ui32 generation, ui32 invalidatedStep);
+ TGivenIdRange Trim(ui64 trimUpTo);
void Output(IOutputStream& s) const;
TString ToString() const;
+ std::vector<bool> ToDebugArray(size_t numItems) const;
+ void CheckConsistency() const;
+
private:
void Pop(TRanges::iterator it, ui64 value);
};
diff --git a/ydb/core/mind/hive/monitoring.cpp b/ydb/core/mind/hive/monitoring.cpp
index 362466da547..cc3152f9145 100644
--- a/ydb/core/mind/hive/monitoring.cpp
+++ b/ydb/core/mind/hive/monitoring.cpp
@@ -1101,7 +1101,7 @@ public:
case TTabletTypes::ReplicationController:
return "RC";
case TTabletTypes::BlobDepot:
- return "BS";
+ return "BD";
default:
return Sprintf("%d", (int)type);
}