aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2022-07-09 22:21:01 +0300
committeralexvru <alexvru@ydb.tech>2022-07-09 22:21:01 +0300
commitd6bc1736fb9c0e5ccd815586513eb0ec8c869ee7 (patch)
tree871afe011c4ec5434793136be9b39903b12c01f3
parent22acf19be42357b6bb0e7d601b0dc28695191463 (diff)
downloadydb-d6bc1736fb9c0e5ccd815586513eb0ec8c869ee7.tar.gz
BlobDepot work in progress
-rw-r--r--ydb/core/blob_depot/agent.cpp105
-rw-r--r--ydb/core/blob_depot/agent/CMakeLists.txt1
-rw-r--r--ydb/core/blob_depot/agent/agent_impl.h49
-rw-r--r--ydb/core/blob_depot/agent/blocks.cpp4
-rw-r--r--ydb/core/blob_depot/agent/channel_kind.cpp82
-rw-r--r--ydb/core/blob_depot/agent/comm.cpp53
-rw-r--r--ydb/core/blob_depot/agent/query.cpp6
-rw-r--r--ydb/core/blob_depot/agent/request.cpp4
-rw-r--r--ydb/core/blob_depot/agent/storage_discover.cpp4
-rw-r--r--ydb/core/blob_depot/agent/storage_put.cpp9
-rw-r--r--ydb/core/blob_depot/blob_depot.cpp1
-rw-r--r--ydb/core/blob_depot/blob_depot_tablet.h39
-rw-r--r--ydb/core/blob_depot/blocks.cpp25
-rw-r--r--ydb/core/blob_depot/blocks.h7
-rw-r--r--ydb/core/blob_depot/data.cpp100
-rw-r--r--ydb/core/blob_depot/data.h3
-rw-r--r--ydb/core/blob_depot/events.h2
-rw-r--r--ydb/core/blob_depot/garbage_collection.cpp2
-rw-r--r--ydb/core/blob_depot/given_id_range.cpp168
-rw-r--r--ydb/core/blob_depot/mon_main.cpp15
-rw-r--r--ydb/core/blob_depot/op_apply_config.cpp18
-rw-r--r--ydb/core/blob_depot/op_commit_blob_seq.cpp32
-rw-r--r--ydb/core/blob_depot/op_load.cpp12
-rw-r--r--ydb/core/blob_depot/op_resolve.cpp4
-rw-r--r--ydb/core/blob_depot/types.h64
-rw-r--r--ydb/core/mind/bscontroller/bsc.cpp9
-rw-r--r--ydb/core/mind/bscontroller/config_fit_groups.cpp1
-rw-r--r--ydb/core/mind/bscontroller/impl.h18
-rw-r--r--ydb/core/mind/bscontroller/virtual_group.cpp15
-rw-r--r--ydb/core/protos/blob_depot.proto19
30 files changed, 565 insertions, 306 deletions
diff --git a/ydb/core/blob_depot/agent.cpp b/ydb/core/blob_depot/agent.cpp
index 00d18d2168..7cecc58d31 100644
--- a/ydb/core/blob_depot/agent.cpp
+++ b/ydb/core/blob_depot/agent.cpp
@@ -1,4 +1,5 @@
#include "blob_depot_tablet.h"
+#include "data.h"
namespace NKikimr::NBlobDepot {
@@ -25,21 +26,30 @@ namespace NKikimr::NBlobDepot {
PipeServerToNode.erase(it);
}
- void TBlobDepot::OnAgentDisconnect(TAgent& /*agent*/) {
+ void TBlobDepot::OnAgentDisconnect(TAgent& agent) {
+ agent.InvalidateStepRequests.clear();
}
void TBlobDepot::Handle(TEvBlobDepot::TEvRegisterAgent::TPtr ev) {
+ const auto& req = ev->Get()->Record;
+
const auto it = PipeServerToNode.find(ev->Recipient);
Y_VERIFY(it != PipeServerToNode.end());
const ui32 nodeId = ev->Sender.NodeId();
Y_VERIFY(!it->second || *it->second == nodeId);
it->second = nodeId;
auto& agent = Agents[nodeId];
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT03, "TEvRegisterAgent", (TabletId, TabletID()), (Msg, ev->Get()->Record),
- (NodeId, nodeId), (PipeServerId, it->first));
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT03, "TEvRegisterAgent", (TabletId, TabletID()), (Msg, req), (NodeId, nodeId),
+ (PipeServerId, it->first));
agent.ConnectedAgent = it->first;
agent.ConnectedNodeId = nodeId;
agent.ExpirationTimestamp = TInstant::Max();
+
+ if (agent.AgentInstanceId && *agent.AgentInstanceId != req.GetAgentInstanceId()) {
+ ResetAgent(agent);
+ }
+ agent.AgentInstanceId = req.GetAgentInstanceId();
+
OnAgentConnect(agent);
auto [response, record] = TEvBlobDepot::MakeResponseFor(*ev, SelfId());
@@ -66,28 +76,55 @@ namespace NKikimr::NBlobDepot {
(PipeServerId, ev->Recipient));
const ui32 generation = Executor()->Generation();
-
auto [response, record] = TEvBlobDepot::MakeResponseFor(*ev, SelfId(), ev->Get()->Record.GetChannelKind(), generation);
if (const auto it = ChannelKinds.find(record->GetChannelKind()); it != ChannelKinds.end()) {
auto& kind = it->second;
+ auto *givenIdRange = record->MutableGivenIdRange();
- const ui64 rangeBegin = kind.NextBlobSeqId;
- kind.NextBlobSeqId += ev->Get()->Record.GetCount();
- const ui64 rangeEnd = kind.NextBlobSeqId;
+ // FIXME: optimize for faster range selection
- TGivenIdRange range;
- range.IssueNewRange(rangeBegin, rangeEnd);
+ // create array of channels appropriate for current selection
+ std::vector<TChannelInfo*> channels;
+ channels.reserve(kind.ChannelGroups.size());
+ for (const auto& [channel, _] : kind.ChannelGroups) {
+ Y_VERIFY_DEBUG(channel < Channels.size() && Channels[channel].ChannelKind == it->first);
+ channels.push_back(&Channels[channel]);
+ }
- range.ToProto(record->MutableGivenIdRange());
+ // make a min heap
+ auto comp = [](TChannelInfo *x, const TChannelInfo *y) { return x->NextBlobSeqId > y->NextBlobSeqId; };
+ std::make_heap(channels.begin(), channels.end(), comp);
+
+ THashMap<ui8, NKikimrBlobDepot::TGivenIdRange::TChannelRange*> issuedRanges;
+ for (ui32 i = 0, count = ev->Get()->Record.GetCount(); i < count; ++i) {
+ // extract element with the least NextBlobSeqId value
+ std::pop_heap(channels.begin(), channels.end(), comp);
+
+ // map it to channel index
+ TChannelInfo *channel = channels.back();
+ const ui64 value = channel->NextBlobSeqId;
+ const ui8 channelIndex = channel - Channels.data();
+
+ // fill in range item
+ auto& range = issuedRanges[channelIndex];
+ if (!range || range->GetEnd() != value) {
+ range = givenIdRange->AddChannelRanges();
+ range->SetChannel(channelIndex);
+ range->SetBegin(value);
+ }
+ range->SetEnd(value + 1);
- TAgent& agent = GetAgent(ev->Recipient);
- agent.ChannelKinds[it->first].GivenIdRanges.Join(TGivenIdRange(range));
- kind.GivenIdRanges.Join(std::move(range));
+ // update NextBlobSeqId value and put back into heap
+ ++channel->NextBlobSeqId;
+ std::push_heap(channels.begin(), channels.end(), comp);
+ }
- for (ui64 value = rangeBegin; value < rangeEnd; ++value) {
- const auto blobSeqId = TBlobSeqId::FromBinary(generation, kind, value);
- PerChannelRecords[blobSeqId.Channel].GivenStepIndex.emplace(blobSeqId.Step, blobSeqId.Index);
+ // register issued ranges in agent and global records
+ TAgent& agent = GetAgent(ev->Recipient);
+ for (const auto& range : givenIdRange->GetChannelRanges()) {
+ agent.GivenIdRanges[range.GetChannel()].IssueNewRange(range.GetBegin(), range.GetEnd());
+ Channels[range.GetChannel()].GivenIdRanges.IssueNewRange(range.GetBegin(), range.GetEnd());
}
}
@@ -109,12 +146,21 @@ namespace NKikimr::NBlobDepot {
return agentIt->second;
}
+ void TBlobDepot::ResetAgent(TAgent& agent) {
+ for (auto& [channel, agentGivenIdRange] : agent.GivenIdRanges) {
+ Channels[channel].GivenIdRanges.Subtract(agentGivenIdRange);
+ agentGivenIdRange = {};
+ }
+ Data->HandleTrash();
+ }
+
void TBlobDepot::InitChannelKinds() {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT05, "InitChannelKinds", (TabletId, TabletID()));
+
TTabletStorageInfo *info = Info();
const ui32 generation = Executor()->Generation();
- Y_VERIFY(ChannelToKind.empty());
- ChannelToKind.resize(info->Channels.size(), NKikimrBlobDepot::TChannelKind::System);
+ Y_VERIFY(Channels.empty());
ui32 channel = 0;
for (const auto& profile : Config.GetChannelProfiles()) {
@@ -124,19 +170,22 @@ namespace NKikimr::NBlobDepot {
auto& p = ChannelKinds[kind];
p.ChannelToIndex[channel] = p.ChannelGroups.size();
p.ChannelGroups.emplace_back(channel, info->GroupFor(channel, generation));
-
- Y_VERIFY(channel < ChannelToKind.size());
- ChannelToKind[channel] = kind;
+ Channels.push_back({
+ kind,
+ &p,
+ {},
+ TBlobSeqId{channel, generation, 1, 0}.ToSequentialNumber(),
+ });
+ } else {
+ Channels.push_back({
+ NKikimrBlobDepot::TChannelKind::System,
+ nullptr,
+ {},
+ 0
+ });
}
}
}
-
- Y_VERIFY_S(channel == ChannelToKind.size(), "channel# " << channel
- << " ChannelToKind.size# " << ChannelToKind.size());
-
- for (auto& [k, v] : ChannelKinds) {
- v.NextBlobSeqId = TBlobSeqId{v.ChannelGroups.front().first, generation, 1, 0}.ToBinary(v);
- }
}
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/CMakeLists.txt b/ydb/core/blob_depot/agent/CMakeLists.txt
index a83c176d8d..ef0395b9b5 100644
--- a/ydb/core/blob_depot/agent/CMakeLists.txt
+++ b/ydb/core/blob_depot/agent/CMakeLists.txt
@@ -18,6 +18,7 @@ target_sources(core-blob_depot-agent PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/blob_mapping_cache.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/blocks.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/channel_kind.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/comm.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/garbage.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/proxy.cpp
diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h
index 75c4b88bdb..d81e72cb04 100644
--- a/ydb/core/blob_depot/agent/agent_impl.h
+++ b/ydb/core/blob_depot/agent/agent_impl.h
@@ -76,6 +76,7 @@ namespace NKikimr::NBlobDepot {
, public TRequestSender
{
const ui32 VirtualGroupId;
+ const ui64 AgentInstanceId;
ui64 TabletId = Max<ui64>();
TActorId PipeId;
@@ -84,6 +85,7 @@ namespace NKikimr::NBlobDepot {
: TActor(&TThis::StateFunc)
, TRequestSender(*this)
, VirtualGroupId(virtualGroupId)
+ , AgentInstanceId(RandomNumber<ui64>())
, BlocksManager(CreateBlocksManager())
, BlobMappingCache(CreateBlobMappingCache())
{
@@ -249,8 +251,14 @@ namespace NKikimr::NBlobDepot {
const NKikimrBlobDepot::TChannelKind::E Kind;
bool IdAllocInFlight = false;
- TGivenIdRange GivenIdRange;
- static constexpr size_t PreallocatedIdCount = 2;
+
+ struct TGivenIdRangeHeapComp;
+ using TGivenIdRangePerChannel = THashMap<ui8, TGivenIdRange>;
+ TGivenIdRangePerChannel GivenIdRangePerChannel;
+ std::vector<TGivenIdRangePerChannel::value_type*> GivenIdRangeHeap;
+ ui32 NumAvailableItems = 0;
+
+ std::set<TBlobSeqId> WritesInFlight;
TIntrusiveList<TQuery, TPendingId> QueriesWaitingForId;
@@ -258,37 +266,20 @@ namespace NKikimr::NBlobDepot {
: Kind(kind)
{}
- std::optional<TBlobSeqId> Allocate(TBlobDepotAgent& agent) {
- if (GivenIdRange.IsEmpty()) {
- return std::nullopt;
- }
- auto blobSeqId = TBlobSeqId::FromBinary(agent.BlobDepotGeneration, *this, GivenIdRange.Allocate());
- agent.IssueAllocateIdsIfNeeded(*this);
- return blobSeqId;
- }
-
- std::pair<TLogoBlobID, ui32> MakeBlobId(TBlobDepotAgent& agent, const TBlobSeqId& blobSeqId, EBlobType type, ui32 part,
- ui32 size) const {
- auto id = blobSeqId.MakeBlobId(agent.TabletId, type, part, size);
- const auto [channel, groupId] = ChannelGroups[ChannelToIndex[blobSeqId.Channel]];
- Y_VERIFY_DEBUG(channel == blobSeqId.Channel);
- return {id, groupId};
- }
-
- void EnqueueQueryWaitingForId(TQuery *query) {
- QueriesWaitingForId.PushBack(query);
- }
+ void IssueGivenIdRange(const NKikimrBlobDepot::TGivenIdRange& proto);
+ ui32 GetNumAvailableItems() const;
+ std::optional<TBlobSeqId> Allocate(TBlobDepotAgent& agent);
+ std::pair<TLogoBlobID, ui32> MakeBlobId(TBlobDepotAgent& agent, const TBlobSeqId& blobSeqId, EBlobType type,
+ ui32 part, ui32 size) const;
+ void Trim(ui8 channel, ui32 generation, ui32 invalidatedStep);
+ void RebuildHeap();
- void ProcessQueriesWaitingForId() {
- TIntrusiveList<TQuery, TPendingId> temp;
- temp.Swap(QueriesWaitingForId);
- for (TQuery& query : temp) {
- query.OnIdAllocated();
- }
- }
+ void EnqueueQueryWaitingForId(TQuery *query);
+ void ProcessQueriesWaitingForId();
};
THashMap<NKikimrBlobDepot::TChannelKind::E, TChannelKind> ChannelKinds;
+ THashMap<ui8, TChannelKind*> ChannelToKind;
void IssueAllocateIdsIfNeeded(TChannelKind& kind);
diff --git a/ydb/core/blob_depot/agent/blocks.cpp b/ydb/core/blob_depot/agent/blocks.cpp
index a3b070799f..e700e2c3d7 100644
--- a/ydb/core/blob_depot/agent/blocks.cpp
+++ b/ydb/core/blob_depot/agent/blocks.cpp
@@ -53,7 +53,7 @@ namespace NKikimr::NBlobDepot {
block.RefreshInFlight = true;
block.PendingBlockChecks.PushBack(query);
}
- STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA99, "CheckBlockForTablet", (QueryId, query->GetQueryId()),
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA01, "CheckBlockForTablet", (QueryId, query->GetQueryId()),
(TabletId, tabletId), (Generation, generation), (Status, status), (Now, now),
(ExpirationTimestamp, block.ExpirationTimestamp));
return status;
@@ -74,7 +74,7 @@ namespace NKikimr::NBlobDepot {
void Handle(TRequestContext::TPtr context, NKikimrBlobDepot::TEvQueryBlocksResult& msg) {
auto& queryBlockContext = context->Obtain<TQueryBlockContext>();
- STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA01, "TEvQueryBlocksResult", (VirtualGroupId, Agent.VirtualGroupId),
+ STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA02, "TEvQueryBlocksResult", (VirtualGroupId, Agent.VirtualGroupId),
(Msg, msg), (TabletId, queryBlockContext.TabletId));
auto& block = Blocks[queryBlockContext.TabletId];
Y_VERIFY(msg.BlockedGenerationsSize() == 1);
diff --git a/ydb/core/blob_depot/agent/channel_kind.cpp b/ydb/core/blob_depot/agent/channel_kind.cpp
new file mode 100644
index 0000000000..6cbae9e017
--- /dev/null
+++ b/ydb/core/blob_depot/agent/channel_kind.cpp
@@ -0,0 +1,82 @@
+#include "agent_impl.h"
+
+namespace NKikimr::NBlobDepot {
+
+ struct TBlobDepotAgent::TChannelKind::TGivenIdRangeHeapComp {
+ using TValue = TBlobDepotAgent::TChannelKind::TGivenIdRangePerChannel::value_type*;
+
+ bool operator ()(TValue x, TValue y) const {
+ return x->second.GetMinimumValue() > y->second.GetMinimumValue();
+ }
+ };
+
+ void TBlobDepotAgent::TChannelKind::IssueGivenIdRange(const NKikimrBlobDepot::TGivenIdRange& proto) {
+ for (const auto& range : proto.GetChannelRanges()) {
+ GivenIdRangePerChannel[range.GetChannel()].IssueNewRange(range.GetBegin(), range.GetEnd());
+ NumAvailableItems += range.GetEnd() - range.GetBegin();
+ }
+
+ // build min-heap for ids
+ RebuildHeap();
+
+ ProcessQueriesWaitingForId();
+ }
+
+ ui32 TBlobDepotAgent::TChannelKind::GetNumAvailableItems() const {
+ return NumAvailableItems;
+ }
+
+ std::optional<TBlobSeqId> TBlobDepotAgent::TChannelKind::Allocate(TBlobDepotAgent& agent) {
+ if (GivenIdRangeHeap.empty()) {
+ return std::nullopt;
+ }
+
+ std::pop_heap(GivenIdRangeHeap.begin(), GivenIdRangeHeap.end(), TGivenIdRangeHeapComp());
+ auto& [channel, range] = *GivenIdRangeHeap.back();
+ const ui64 value = range.Allocate();
+ if (range.IsEmpty()) {
+ GivenIdRangeHeap.pop_back();
+ } else {
+ std::push_heap(GivenIdRangeHeap.begin(), GivenIdRangeHeap.end(), TGivenIdRangeHeapComp());
+ }
+ --NumAvailableItems;
+
+ agent.IssueAllocateIdsIfNeeded(*this);
+
+ return TBlobSeqId::FromSequentalNumber(channel, agent.BlobDepotGeneration, value);
+ }
+
+ std::pair<TLogoBlobID, ui32> TBlobDepotAgent::TChannelKind::MakeBlobId(TBlobDepotAgent& agent,
+ const TBlobSeqId& blobSeqId, EBlobType type, ui32 part, ui32 size) const {
+ auto id = blobSeqId.MakeBlobId(agent.TabletId, type, part, size);
+ const auto [channel, groupId] = ChannelGroups[ChannelToIndex[blobSeqId.Channel]];
+ Y_VERIFY_DEBUG(channel == blobSeqId.Channel);
+ return {id, groupId};
+ }
+
+ void TBlobDepotAgent::TChannelKind::Trim(ui8 channel, ui32 generation, ui32 invalidatedStep) {
+ GivenIdRangePerChannel[channel].Trim(channel, generation, invalidatedStep);
+ RebuildHeap();
+ }
+
+ void TBlobDepotAgent::TChannelKind::RebuildHeap() {
+ GivenIdRangeHeap.clear();
+ for (auto& kv : GivenIdRangePerChannel) {
+ GivenIdRangeHeap.push_back(&kv);
+ }
+ std::make_heap(GivenIdRangeHeap.begin(), GivenIdRangeHeap.end(), TGivenIdRangeHeapComp());
+ }
+
+ void TBlobDepotAgent::TChannelKind::EnqueueQueryWaitingForId(TQuery *query) {
+ QueriesWaitingForId.PushBack(query);
+ }
+
+ void TBlobDepotAgent::TChannelKind::ProcessQueriesWaitingForId() {
+ TIntrusiveList<TQuery, TPendingId> temp;
+ temp.Swap(QueriesWaitingForId);
+ for (TQuery& query : temp) {
+ query.OnIdAllocated();
+ }
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/comm.cpp b/ydb/core/blob_depot/agent/comm.cpp
index 8aa8edf9dc..6359c4e604 100644
--- a/ydb/core/blob_depot/agent/comm.cpp
+++ b/ydb/core/blob_depot/agent/comm.cpp
@@ -3,12 +3,12 @@
namespace NKikimr::NBlobDepot {
void TBlobDepotAgent::Handle(TEvTabletPipe::TEvClientConnected::TPtr ev) {
- STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA02, "TEvClientConnected", (VirtualGroupId, VirtualGroupId),
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA03, "TEvClientConnected", (VirtualGroupId, VirtualGroupId),
(Msg, ev->Get()->ToString()));
}
void TBlobDepotAgent::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev) {
- STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA03, "TEvClientDestroyed", (VirtualGroupId, VirtualGroupId),
+ STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA04, "TEvClientDestroyed", (VirtualGroupId, VirtualGroupId),
(Msg, ev->Get()->ToString()));
PipeId = {};
OnDisconnect();
@@ -18,14 +18,14 @@ namespace NKikimr::NBlobDepot {
void TBlobDepotAgent::ConnectToBlobDepot() {
PipeId = Register(NTabletPipe::CreateClient(SelfId(), TabletId, NTabletPipe::TClientRetryPolicy::WithRetries()));
const ui64 id = NextRequestId++;
- STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA04, "ConnectToBlobDepot", (VirtualGroupId, VirtualGroupId),
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA05, "ConnectToBlobDepot", (VirtualGroupId, VirtualGroupId),
(PipeId, PipeId), (RequestId, id));
- NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobDepot::TEvRegisterAgent(VirtualGroupId), id);
+ NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobDepot::TEvRegisterAgent(VirtualGroupId, AgentInstanceId), id);
RegisterRequest(id, this, nullptr, {}, true);
}
void TBlobDepotAgent::Handle(TRequestContext::TPtr /*context*/, NKikimrBlobDepot::TEvRegisterAgentResult& msg) {
- STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA04, "TEvRegisterAgentResult", (VirtualGroupId, VirtualGroupId),
+ STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA06, "TEvRegisterAgentResult", (VirtualGroupId, VirtualGroupId),
(Msg, msg));
Registered = true;
BlobDepotGeneration = msg.GetGeneration();
@@ -35,6 +35,8 @@ namespace NKikimr::NBlobDepot {
vanishedKinds.insert(kind);
}
+ ChannelToKind.clear();
+
for (const auto& ch : msg.GetChannelKinds()) {
const NKikimrBlobDepot::TChannelKind::E kind = ch.GetChannelKind();
vanishedKinds.erase(kind);
@@ -50,24 +52,25 @@ namespace NKikimr::NBlobDepot {
const ui32 groupId = channelGroup.GetGroupId();
v.ChannelToIndex[channel] = v.ChannelGroups.size();
v.ChannelGroups.emplace_back(channel, groupId);
+ ChannelToKind[channel] = &v;
}
IssueAllocateIdsIfNeeded(v);
}
for (const NKikimrBlobDepot::TChannelKind::E kind : vanishedKinds) {
- STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA05, "kind vanished", (VirtualGroupId, VirtualGroupId), (Kind, kind));
+ STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA07, "kind vanished", (VirtualGroupId, VirtualGroupId), (Kind, kind));
ChannelKinds.erase(kind);
}
}
void TBlobDepotAgent::IssueAllocateIdsIfNeeded(TChannelKind& kind) {
- if (!kind.IdAllocInFlight && kind.GivenIdRange.GetNumAvailableItems() < 100 && PipeId) {
+ if (!kind.IdAllocInFlight && kind.GetNumAvailableItems() < 100 && PipeId) {
const ui64 id = NextRequestId++;
- STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA05, "IssueAllocateIdsIfNeeded", (VirtualGroupId, VirtualGroupId),
+ STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA08, "IssueAllocateIdsIfNeeded", (VirtualGroupId, VirtualGroupId),
(ChannelKind, NKikimrBlobDepot::TChannelKind::E_Name(kind.Kind)),
- (IdAllocInFlight, kind.IdAllocInFlight), (NumAvailableItems, kind.GivenIdRange.GetNumAvailableItems()),
- (PreallocatedIdCount, kind.PreallocatedIdCount), (RequestId, id));
+ (IdAllocInFlight, kind.IdAllocInFlight), (NumAvailableItems, kind.GetNumAvailableItems()),
+ (RequestId, id));
NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobDepot::TEvAllocateIds(kind.Kind, 100), id);
RegisterRequest(id, this, std::make_shared<TAllocateIdsContext>(kind.Kind), {}, true);
kind.IdAllocInFlight = true;
@@ -75,7 +78,7 @@ namespace NKikimr::NBlobDepot {
}
void TBlobDepotAgent::Handle(TRequestContext::TPtr context, NKikimrBlobDepot::TEvAllocateIdsResult& msg) {
- STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA06, "TEvAllocateIdsResult", (VirtualGroupId, VirtualGroupId),
+ STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA09, "TEvAllocateIdsResult", (VirtualGroupId, VirtualGroupId),
(Msg, msg));
auto& allocateIdsContext = context->Obtain<TAllocateIdsContext>();
@@ -91,15 +94,12 @@ namespace NKikimr::NBlobDepot {
Y_VERIFY(msg.GetGeneration() == BlobDepotGeneration);
if (msg.HasGivenIdRange()) {
- TGivenIdRange range(msg.GetGivenIdRange());
- kind.GivenIdRange.Join(std::move(range));
- kind.ProcessQueriesWaitingForId();
- IssueAllocateIdsIfNeeded(kind);
+ kind.IssueGivenIdRange(msg.GetGivenIdRange());
}
}
void TBlobDepotAgent::OnDisconnect() {
- STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA07, "OnDisconnect", (VirtualGroupId, VirtualGroupId));
+ STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA10, "OnDisconnect", (VirtualGroupId, VirtualGroupId));
for (auto& [id, request] : std::exchange(TabletRequestInFlight, {})) {
request.Sender->OnRequestComplete(id, TTabletDisconnected{});
@@ -144,16 +144,33 @@ namespace NKikimr::NBlobDepot {
void TBlobDepotAgent::Issue(std::unique_ptr<IEventBase> ev, TRequestSender *sender, TRequestContext::TPtr context) {
const ui64 id = NextRequestId++;
- STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA08, "Issue", (VirtualGroupId, VirtualGroupId), (Id, id), (Msg, ev->ToString()));
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA11, "Issue", (VirtualGroupId, VirtualGroupId), (Id, id), (Msg, ev->ToString()));
NTabletPipe::SendData(SelfId(), PipeId, ev.release(), id);
RegisterRequest(id, sender, std::move(context), {}, true);
}
void TBlobDepotAgent::Handle(TEvBlobDepot::TEvPushNotify::TPtr ev) {
+ auto [response, record] = TEvBlobDepot::MakeResponseFor(*ev, SelfId());
+
auto& msg = ev->Get()->Record;
OnBlockedTablets(msg.GetBlockedTablets());
- auto [response, _] = TEvBlobDepot::MakeResponseFor(*ev, SelfId());
+ for (const auto& item : msg.GetInvalidatedSteps()) {
+ const ui8 channel = item.GetChannel();
+ Y_VERIFY(item.GetGeneration() == BlobDepotGeneration);
+ const auto it = ChannelToKind.find(channel);
+ Y_VERIFY(it != ChannelToKind.end());
+ TChannelKind& kind = *it->second;
+ kind.Trim(channel, item.GetGeneration(), item.GetInvalidatedStep());
+
+ // report writes in flight that are trimmed
+ const TBlobSeqId first{channel, item.GetGeneration(), 0, 0};
+ const TBlobSeqId last{channel, item.GetGeneration(), item.GetInvalidatedStep(), Max<ui32>()};
+ for (auto it = kind.WritesInFlight.lower_bound(first); it != kind.WritesInFlight.end() && *it <= last; ++it) {
+ it->ToProto(record->AddWritesInFlight());
+ }
+ }
+
TActivationContext::Send(response.release());
}
diff --git a/ydb/core/blob_depot/agent/query.cpp b/ydb/core/blob_depot/agent/query.cpp
index 0eb0e862ce..ab4cd8c7d3 100644
--- a/ydb/core/blob_depot/agent/query.cpp
+++ b/ydb/core/blob_depot/agent/query.cpp
@@ -8,7 +8,7 @@ namespace NKikimr::NBlobDepot {
PendingEventQ.emplace_back(ev.Release());
} else {
auto *query = CreateQuery(ev);
- STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA09, "new query", (VirtualGroupId, VirtualGroupId),
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA12, "new query", (VirtualGroupId, VirtualGroupId),
(QueryId, query->GetQueryId()), (Name, query->GetName()));
if (!TabletId) {
query->EndWithError(NKikimrProto::ERROR, "group is in error state");
@@ -30,7 +30,7 @@ namespace NKikimr::NBlobDepot {
}
void TBlobDepotAgent::TQuery::EndWithError(NKikimrProto::EReplyStatus status, const TString& errorReason) {
- STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA10, "query ends with error", (VirtualGroupId, Agent.VirtualGroupId),
+ STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA13, "query ends with error", (VirtualGroupId, Agent.VirtualGroupId),
(QueryId, QueryId), (Status, status), (ErrorReason, errorReason));
std::unique_ptr<IEventBase> response;
@@ -49,7 +49,7 @@ namespace NKikimr::NBlobDepot {
}
void TBlobDepotAgent::TQuery::EndWithSuccess(std::unique_ptr<IEventBase> response) {
- STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA11, "query ends with success", (VirtualGroupId, Agent.VirtualGroupId),
+ STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA14, "query ends with success", (VirtualGroupId, Agent.VirtualGroupId),
(QueryId, QueryId), (Response, response->ToString()));
Agent.SelfId().Send(Event->Sender, response.release(), 0, Event->Cookie);
delete this;
diff --git a/ydb/core/blob_depot/agent/request.cpp b/ydb/core/blob_depot/agent/request.cpp
index 8e73045be9..3c7fe68706 100644
--- a/ydb/core/blob_depot/agent/request.cpp
+++ b/ydb/core/blob_depot/agent/request.cpp
@@ -57,7 +57,7 @@ namespace NKikimr::NBlobDepot {
template<typename TEvent>
void TBlobDepotAgent::HandleTabletResponse(TAutoPtr<TEventHandle<TEvent>> ev) {
- STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA12, "HandleTabletResponse", (VirtualGroupId, VirtualGroupId),
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA15, "HandleTabletResponse", (VirtualGroupId, VirtualGroupId),
(Id, ev->Cookie), (Type, TypeName<TEvent>()));
auto *event = ev->Get();
OnRequestComplete(ev->Cookie, event, TabletRequestInFlight);
@@ -73,7 +73,7 @@ namespace NKikimr::NBlobDepot {
template<typename TEvent>
void TBlobDepotAgent::HandleOtherResponse(TAutoPtr<TEventHandle<TEvent>> ev) {
- STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA13, "HandleOtherResponse", (VirtualGroupId, VirtualGroupId),
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA16, "HandleOtherResponse", (VirtualGroupId, VirtualGroupId),
(Id, ev->Cookie), (Type, TypeName<TEvent>()));
auto *event = ev->Get();
OnRequestComplete(ev->Cookie, event, OtherRequestInFlight);
diff --git a/ydb/core/blob_depot/agent/storage_discover.cpp b/ydb/core/blob_depot/agent/storage_discover.cpp
index 9b5c37b4a7..5214042b83 100644
--- a/ydb/core/blob_depot/agent/storage_discover.cpp
+++ b/ydb/core/blob_depot/agent/storage_discover.cpp
@@ -70,7 +70,7 @@ namespace NKikimr::NBlobDepot {
}
void OnUpdateBlock(bool success) override {
- STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA14, "OnUpdateBlock", (VirtualGroupId, Agent.VirtualGroupId),
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA17, "OnUpdateBlock", (VirtualGroupId, Agent.VirtualGroupId),
(QueryId, QueryId), (Success, success));
if (!success) {
@@ -89,7 +89,7 @@ namespace NKikimr::NBlobDepot {
}
void HandleResolveResult(ui64 id, TRequestContext::TPtr context, TEvBlobDepot::TEvResolveResult& msg) {
- STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA15, "HandleResolveResult", (VirtualGroupId, Agent.VirtualGroupId),
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA18, "HandleResolveResult", (VirtualGroupId, Agent.VirtualGroupId),
(QueryId, QueryId), (Msg, msg.Record));
Agent.HandleResolveResult(msg.Record);
diff --git a/ydb/core/blob_depot/agent/storage_put.cpp b/ydb/core/blob_depot/agent/storage_put.cpp
index 87bbc0603b..fa98c2e584 100644
--- a/ydb/core/blob_depot/agent/storage_put.cpp
+++ b/ydb/core/blob_depot/agent/storage_put.cpp
@@ -14,6 +14,12 @@ namespace NKikimr::NBlobDepot {
public:
using TQuery::TQuery;
+ ~TPutQuery() {
+ if (BlobSeqId != TBlobSeqId()) {
+ Agent.ChannelToKind[BlobSeqId.Channel]->WritesInFlight.erase(BlobSeqId);
+ }
+ }
+
void Initiate() override {
auto& msg = *Event->Get<TEvBlobStorage::TEvPut>();
@@ -36,13 +42,14 @@ namespace NKikimr::NBlobDepot {
auto& kind = it->second;
std::optional<TBlobSeqId> blobSeqId = kind.Allocate(Agent);
- STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA01, "allocated BlobSeqId", (VirtualGroupId, Agent.VirtualGroupId),
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA19, "allocated BlobSeqId", (VirtualGroupId, Agent.VirtualGroupId),
(QueryId, GetQueryId()), (BlobSeqId, blobSeqId));
if (!blobSeqId) {
return kind.EnqueueQueryWaitingForId(this);
}
BlobSeqId = *blobSeqId;
+ kind.WritesInFlight.insert(BlobSeqId);
auto& msg = *Event->Get<TEvBlobStorage::TEvPut>();
const ui32 size = msg.Id.BlobSize();
diff --git a/ydb/core/blob_depot/blob_depot.cpp b/ydb/core/blob_depot/blob_depot.cpp
index 7995c10aad..bfc5435eb3 100644
--- a/ydb/core/blob_depot/blob_depot.cpp
+++ b/ydb/core/blob_depot/blob_depot.cpp
@@ -34,6 +34,7 @@ namespace NKikimr::NBlobDepot {
hFunc(TEvBlobDepot::TEvCollectGarbage, BarrierServer->Handle);
hFunc(TEvBlobStorage::TEvCollectGarbageResult, Data->Handle);
+ hFunc(TEvBlobDepot::TEvPushNotifyResult, Data->Handle);
hFunc(TEvTabletPipe::TEvServerConnected, Handle);
hFunc(TEvTabletPipe::TEvServerDisconnected, Handle);
diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h
index 25374384e7..0fa8e971de 100644
--- a/ydb/core/blob_depot/blob_depot_tablet.h
+++ b/ydb/core/blob_depot/blob_depot_tablet.h
@@ -23,7 +23,7 @@ namespace NKikimr::NBlobDepot {
~TBlobDepot();
void HandlePoison() {
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT09, "HandlePoison", (TabletId, TabletID()));
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT19, "HandlePoison", (TabletId, TabletID()));
Become(&TThis::StateZombie);
Send(Tablet(), new TEvents::TEvPoison);
}
@@ -36,31 +36,27 @@ namespace NKikimr::NBlobDepot {
std::optional<TActorId> ConnectedAgent;
ui32 ConnectedNodeId;
TInstant ExpirationTimestamp;
+ std::optional<ui64> AgentInstanceId;
- struct TChannelKind {
- TGivenIdRange GivenIdRanges; // updated on AllocateIds and when BlobSeqIds are found in any way
- };
+ THashMap<ui8, TGivenIdRange> GivenIdRanges;
- THashMap<NKikimrBlobDepot::TChannelKind::E, TChannelKind> ChannelKinds;
+ THashMap<ui8, ui32> InvalidatedStepInFlight;
+ THashMap<ui64, THashMap<ui8, ui32>> InvalidateStepRequests;
+ ui64 LastRequestId = 0;
};
THashMap<TActorId, std::optional<ui32>> PipeServerToNode;
THashMap<ui32, TAgent> Agents; // NodeId -> Agent
- struct TChannelKind
- : NBlobDepot::TChannelKind
- {
- ui64 NextBlobSeqId = 0;
- TGivenIdRange GivenIdRanges; // for all agents, including disconnected ones
- };
-
THashMap<NKikimrBlobDepot::TChannelKind::E, TChannelKind> ChannelKinds;
- std::vector<NKikimrBlobDepot::TChannelKind::E> ChannelToKind;
- struct TPerChannelRecord {
- std::set<std::tuple<ui32, ui32>> GivenStepIndex;
+ struct TChannelInfo {
+ NKikimrBlobDepot::TChannelKind::E ChannelKind;
+ TChannelKind *KindPtr;
+ TGivenIdRange GivenIdRanges; // accumulated through all agents
+ ui64 NextBlobSeqId = 0;
};
- THashMap<ui8, TPerChannelRecord> PerChannelRecords;
+ std::vector<TChannelInfo> Channels;
void Handle(TEvTabletPipe::TEvServerConnected::TPtr ev);
void Handle(TEvTabletPipe::TEvServerDisconnected::TPtr ev);
@@ -70,6 +66,7 @@ namespace NKikimr::NBlobDepot {
void Handle(TEvBlobDepot::TEvAllocateIds::TPtr ev);
TAgent& GetAgent(const TActorId& pipeServerId);
TAgent& GetAgent(ui32 nodeId);
+ void ResetAgent(TAgent& agent);
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -80,10 +77,12 @@ namespace NKikimr::NBlobDepot {
}
void OnActivateExecutor(const TActorContext&) override {
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT10, "OnActivateExecutor", (TabletId, TabletID()));
-
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT20, "OnActivateExecutor", (TabletId, TabletID()));
ExecuteTxInitSchema();
+ }
+ void OnLoadFinished() {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT20, "OnLoadFinished", (TabletId, TabletID()));
Become(&TThis::StateWork);
for (auto&& ev : std::exchange(InitialEventsQ, {})) {
TActivationContext::Send(ev.release());
@@ -91,14 +90,14 @@ namespace NKikimr::NBlobDepot {
}
void OnDetach(const TActorContext&) override {
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT11, "OnDetach", (TabletId, TabletID()));
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT21, "OnDetach", (TabletId, TabletID()));
// TODO: what does this callback mean
PassAway();
}
void OnTabletDead(TEvTablet::TEvTabletDead::TPtr& /*ev*/, const TActorContext&) override {
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT12, "OnTabletDead", (TabletId, TabletID()));
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT22, "OnTabletDead", (TabletId, TabletID()));
PassAway();
}
diff --git a/ydb/core/blob_depot/blocks.cpp b/ydb/core/blob_depot/blocks.cpp
index 489b99045e..f1c8245c4d 100644
--- a/ydb/core/blob_depot/blocks.cpp
+++ b/ydb/core/blob_depot/blocks.cpp
@@ -98,7 +98,7 @@ namespace NKikimr::NBlobDepot {
TAgent& agent = Self->GetAgent(agentId);
if (const auto& actorId = agent.ConnectedAgent) {
- Send(*actorId, ev.release(), IEventHandle::FlagTrackDelivery, IssuerGuid);
+ Send(*actorId, ev.release(), 0, IssuerGuid);
}
NodesWaitingForPushResult.insert(agentId);
}
@@ -117,28 +117,31 @@ namespace NKikimr::NBlobDepot {
}
}
- void Handle(TEvents::TEvUndelivered::TPtr /*ev*/) {
- // can't reach an agent to notify it about blocked generation change -- we can't do anything here
- }
-
void IssueBlocksToStorage() {
+ THashSet<ui32> processedGroups;
for (const auto& [_, kind] : Self->ChannelKinds) {
for (const auto& [channel, groupId] : kind.ChannelGroups) {
// FIXME: consider previous group generations (because agent can write in obsolete tablet generation)
// !!!!!!!!!!!
- SendBlock(groupId);
- ++BlocksPending;
- RetryCount += 2;
+ if (const auto [it, inserted] = processedGroups.insert(groupId); inserted) {
+ SendBlock(groupId);
+ ++BlocksPending;
+ RetryCount += 2;
+ }
}
}
}
void SendBlock(ui32 groupId) {
- SendToBSProxy(SelfId(), groupId, new TEvBlobStorage::TEvBlock(TabletId, BlockedGeneration,
- TInstant::Max(), IssuerGuid), groupId);
+ STLOG(PRI_INFO, BLOB_DEPOT, BDT06, "issing TEvBlock", (TabletId, Self->TabletID()), (BlockedTabletId,
+ TabletId), (BlockedGeneration, BlockedGeneration), (GroupId, groupId), (IssuerGuid, IssuerGuid));
+ SendToBSProxy(SelfId(), groupId, new TEvBlobStorage::TEvBlock(TabletId, BlockedGeneration, TInstant::Max(),
+ IssuerGuid), groupId);
}
void Handle(TEvBlobStorage::TEvBlockResult::TPtr ev) {
+ STLOG(PRI_INFO, BLOB_DEPOT, BDT07, "TEvBlockResult", (TabletId, Self->TabletID()), (Msg, ev->Get()->ToString()),
+ (BlockedTabletId, TabletId), (BlockedGeneration, BlockedGeneration), (GroupId, ev->Cookie));
switch (ev->Get()->Status) {
case NKikimrProto::OK:
if (!--BlocksPending) {
@@ -148,6 +151,7 @@ namespace NKikimr::NBlobDepot {
case NKikimrProto::ALREADY:
// race, but this is not possible in current implementation
+ // ORLY? :)
Y_FAIL();
case NKikimrProto::ERROR:
@@ -172,7 +176,6 @@ namespace NKikimr::NBlobDepot {
STRICT_STFUNC(StateFunc,
hFunc(TEvBlobStorage::TEvBlockResult, Handle);
hFunc(TEvBlobDepot::TEvPushNotifyResult, Handle);
- hFunc(TEvents::TEvUndelivered, Handle);
cFunc(TEvents::TSystem::Wakeup, IssueBlocksToStorage);
cFunc(TEvents::TSystem::Poison, PassAway);
)
diff --git a/ydb/core/blob_depot/blocks.h b/ydb/core/blob_depot/blocks.h
index bd3aba3899..244b7c9ae4 100644
--- a/ydb/core/blob_depot/blocks.h
+++ b/ydb/core/blob_depot/blocks.h
@@ -37,6 +37,13 @@ namespace NKikimr::NBlobDepot {
void OnBlockCommitted(ui64 tabletId, ui32 blockedGeneration, ui32 nodeId, std::unique_ptr<IEventHandle> response);
void Handle(TEvBlobDepot::TEvBlock::TPtr ev);
void Handle(TEvBlobDepot::TEvQueryBlocks::TPtr ev);
+
+ template<typename TCallback>
+ void Enumerate(TCallback&& callback) const {
+ for (const auto& [tabletId, block] : Blocks) {
+ callback(tabletId, block.BlockedGeneration);
+ }
+ }
};
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp
index f2752fc8ee..43b4d5ceee 100644
--- a/ydb/core/blob_depot/data.cpp
+++ b/ydb/core/blob_depot/data.cpp
@@ -89,7 +89,7 @@ namespace NKikimr::NBlobDepot {
}
void TBlobDepot::TData::PutKey(TKey key, TValue&& data) {
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT01, "PutKey", (TabletId, Self->TabletID()), (Key, key.ToString(Self->Config)),
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT08, "PutKey", (TabletId, Self->TabletID()), (Key, key.ToString(Self->Config)),
(KeepState, NKikimrBlobDepot::EKeepState_Name(data.KeepState)));
EnumerateBlobsForValueChain(data.ValueChain, Self->TabletID(), [&](TLogoBlobID id) {
@@ -169,23 +169,54 @@ namespace NKikimr::NBlobDepot {
void TBlobDepot::TData::HandleTrash() {
const ui32 generation = Self->Executor()->Generation();
+ THashMap<ui32, std::unique_ptr<TEvBlobDepot::TEvPushNotify>> outbox;
for (TRecordsPerChannelGroup& record : RecordsWithTrash) {
Y_VERIFY(!record.CollectGarbageRequestInFlight);
Y_VERIFY(record.TabletId == Self->TabletID());
Y_VERIFY(!record.Trash.empty());
- ui64 nextGenStep = 0;
-
- auto& channel = Self->PerChannelRecords[record.Channel];
- if (channel.GivenStepIndex.empty()) {
- nextGenStep = GenStep(*--record.Trash.end());
- } else {
- const auto& [leastStep, leastIndex] = *channel.GivenStepIndex.begin();
- const TLogoBlobID maxId(record.TabletId, generation, leastStep, record.Channel, 0, 0);
- const auto it = record.Trash.lower_bound(maxId);
- if (it != record.Trash.begin()) {
- nextGenStep = GenStep(*std::prev(it));
+ ui64 nextGenStep = GenStep(*--record.Trash.end());
+
+ Y_VERIFY(record.Channel < Self->Channels.size());
+ auto& channel = Self->Channels[record.Channel];
+
+ if (!channel.GivenIdRanges.IsEmpty()) {
+ // minimum blob seq id that can still be written by other party
+ const auto minBlobSeqId = TBlobSeqId::FromSequentalNumber(record.Channel, generation,
+ channel.GivenIdRanges.GetMinimumValue());
+
+ // step we are going to invalidate (including blobs with this one)
+ const ui32 invalidatedStep = ui32(nextGenStep);
+
+ if (minBlobSeqId.Step <= invalidatedStep) {
+ const TLogoBlobID maxId(record.TabletId, generation, minBlobSeqId.Step, record.Channel, 0, 0);
+ const auto it = record.Trash.lower_bound(maxId);
+ if (it != record.Trash.begin()) {
+ nextGenStep = GenStep(*std::prev(it));
+ } else {
+ nextGenStep = 0;
+ }
+
+ // issue notifications to agents
+ for (auto& [agentId, agent] : Self->Agents) {
+ if (!agent.ConnectedAgent) {
+ continue;
+ }
+ const auto [it, inserted] = agent.InvalidatedStepInFlight.emplace(record.Channel, invalidatedStep);
+ if (inserted || it->second < invalidatedStep) {
+ it->second = invalidatedStep;
+
+ auto& ev = outbox[agentId];
+ if (!ev) {
+ ev.reset(new TEvBlobDepot::TEvPushNotify);
+ }
+ auto *item = ev->Record.AddInvalidatedSteps();
+ item->SetChannel(record.Channel);
+ item->SetGeneration(generation);
+ item->SetInvalidatedStep(invalidatedStep);
+ }
+ }
}
}
@@ -227,14 +258,15 @@ namespace NKikimr::NBlobDepot {
record.TrashInFlight.insert(record.TrashInFlight.end(), record.Trash.begin(), record.Trash.end());
record.NextGenStep = Max(nextGenStep, record.LastConfirmedGenStep);
- auto& kind = Self->ChannelKinds[Self->ChannelToKind[record.Channel]];
- const auto blobSeqId = TBlobSeqId::FromBinary(generation, kind, kind.NextBlobSeqId);
- Y_VERIFY(record.LastConfirmedGenStep < GenStep(generation, blobSeqId.Step));
- if (GenStep(generation, blobSeqId.Step) <= nextGenStep) {
- kind.NextBlobSeqId = TBlobSeqId{record.Channel, generation, ui32(nextGenStep) + 1, 0}.ToBinary(kind);
+ auto blobSeqId = TBlobSeqId::FromSequentalNumber(record.Channel, generation, channel.NextBlobSeqId);
+ Y_VERIFY(record.LastConfirmedGenStep < GenStep(blobSeqId));
+ if (GenStep(blobSeqId) <= nextGenStep) {
+ blobSeqId.Step = ui32(nextGenStep) + 1;
+ blobSeqId.Index = 0;
+ channel.NextBlobSeqId = blobSeqId.ToSequentialNumber();
}
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT01, "issuing TEvCollectGarbage", (TabletId, Self->TabletID()),
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT09, "issuing TEvCollectGarbage", (TabletId, Self->TabletID()),
(Channel, record.Channel), (GroupId, record.GroupId), (Msg, ev->ToString()),
(LastConfirmedGenStep, record.LastConfirmedGenStep), (NextGenStep, record.NextGenStep),
(TrashInFlight.size, record.TrashInFlight.size()));
@@ -243,10 +275,22 @@ namespace NKikimr::NBlobDepot {
}
RecordsWithTrash.Clear();
+
+ for (auto& [agentId, ev] : outbox) {
+ TAgent& agent = Self->GetAgent(agentId);
+ const ui64 id = ++agent.LastRequestId;
+ auto& request = agent.InvalidateStepRequests[id];
+ for (const auto& item : ev->Record.GetInvalidatedSteps()) {
+ request[item.GetChannel()] = item.GetInvalidatedStep();
+ }
+ if (const auto& actorId = agent.ConnectedAgent) {
+ TActivationContext::Send(new IEventHandle(*actorId, Self->SelfId(), ev.release(), 0, id));
+ }
+ }
}
void TBlobDepot::TData::Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev) {
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT01, "TEvCollectGarbageResult", (TabletId, ev->Get()->TabletId),
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT10, "TEvCollectGarbageResult", (TabletId, ev->Get()->TabletId),
(Channel, ev->Get()->Channel), (GroupId, ev->Cookie), (Msg, ev->Get()->ToString()));
const auto& key = std::make_tuple(ev->Get()->TabletId, ev->Get()->Channel, ev->Cookie);
const auto it = RecordsPerChannelGroup.find(key);
@@ -267,6 +311,24 @@ namespace NKikimr::NBlobDepot {
}
}
+ void TBlobDepot::TData::Handle(TEvBlobDepot::TEvPushNotifyResult::TPtr ev) {
+ TAgent& agent = Self->GetAgent(ev->Recipient);
+ const ui32 generation = Self->Executor()->Generation();
+ if (const auto it = agent.InvalidateStepRequests.find(ev->Cookie); it != agent.InvalidateStepRequests.end()) {
+ for (const auto& [channel, invalidatedStep] : it->second) {
+ Self->Channels[channel].GivenIdRanges.Trim(channel, generation, invalidatedStep);
+ agent.GivenIdRanges[channel].Trim(channel, generation, invalidatedStep);
+ }
+ agent.InvalidateStepRequests.erase(it);
+ }
+ for (const auto& item : ev->Get()->Record.GetWritesInFlight()) {
+ const auto blobSeqId = TBlobSeqId::FromProto(item);
+ Self->Channels[blobSeqId.Channel].GivenIdRanges.AddPoint(blobSeqId.ToSequentialNumber());
+ agent.GivenIdRanges[blobSeqId.Channel].AddPoint(blobSeqId.ToSequentialNumber());
+ }
+ HandleTrash();
+ }
+
void TBlobDepot::TData::OnCommitConfirmedGC(ui8 channel, ui32 groupId) {
const auto& key = std::make_tuple(Self->TabletID(), channel, groupId);
const auto it = RecordsPerChannelGroup.find(key);
diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h
index 22515c8273..3d092bd995 100644
--- a/ydb/core/blob_depot/data.h
+++ b/ydb/core/blob_depot/data.h
@@ -317,6 +317,7 @@ namespace NKikimr::NBlobDepot {
void CommitTrash(void *cookie);
void HandleTrash();
void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev);
+ void Handle(TEvBlobDepot::TEvPushNotifyResult::TPtr ev);
void OnCommitConfirmedGC(ui8 channel, ui32 groupId);
static TString ToValueProto(const TValue& value);
@@ -333,7 +334,7 @@ namespace NKikimr::NBlobDepot {
for (const auto& [key, record] : RecordsPerChannelGroup) {
THashSet<TLogoBlobID> inFlight(record.TrashInFlight.begin(), record.TrashInFlight.end());
for (const TLogoBlobID& id : record.Trash) {
- callback(record.TabletId, record.Channel, record.GroupId, id, inFlight.contains(id));
+ callback(record.GroupId, id, inFlight.contains(id));
}
}
}
diff --git a/ydb/core/blob_depot/events.h b/ydb/core/blob_depot/events.h
index 4532ec72b8..ce20efa2ec 100644
--- a/ydb/core/blob_depot/events.h
+++ b/ydb/core/blob_depot/events.h
@@ -53,7 +53,7 @@ namespace NKikimr {
BLOBDEPOT_EVENT_PB(EvApplyConfig, TxId);
BLOBDEPOT_EVENT_PB(EvApplyConfigResult, TabletId, TxId);
- BLOBDEPOT_EVENT_PB(EvRegisterAgent, VirtualGroupId);
+ BLOBDEPOT_EVENT_PB(EvRegisterAgent, VirtualGroupId, AgentInstanceId);
BLOBDEPOT_EVENT_PB_NO_ARGS(EvRegisterAgentResult);
BLOBDEPOT_EVENT_PB(EvAllocateIds, ChannelKind, Count);
BLOBDEPOT_EVENT_PB(EvAllocateIdsResult, ChannelKind, Generation);
diff --git a/ydb/core/blob_depot/garbage_collection.cpp b/ydb/core/blob_depot/garbage_collection.cpp
index 6d770624dc..7e69dfa76f 100644
--- a/ydb/core/blob_depot/garbage_collection.cpp
+++ b/ydb/core/blob_depot/garbage_collection.cpp
@@ -106,7 +106,7 @@ namespace NKikimr::NBlobDepot {
auto processKey = [&](const TData::TKey& key, const TData::TValue& value) {
if (value.KeepState != NKikimrBlobDepot::EKeepState::Keep || hard) {
const TLogoBlobID id(key.GetBlobId());
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT01, "DeleteKey", (TabletId, Self->TabletID()), (BlobId, id));
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT11, "DeleteKey", (TabletId, Self->TabletID()), (BlobId, id));
db.Table<Schema::Data>().Key(key.MakeBinaryKey()).Delete();
auto updateTrash = [&](TLogoBlobID id) {
db.Table<Schema::Trash>().Key(TString(id.AsBinaryString())).Update();
diff --git a/ydb/core/blob_depot/given_id_range.cpp b/ydb/core/blob_depot/given_id_range.cpp
index 2d78ac0970..9186465375 100644
--- a/ydb/core/blob_depot/given_id_range.cpp
+++ b/ydb/core/blob_depot/given_id_range.cpp
@@ -2,83 +2,99 @@
namespace NKikimr::NBlobDepot {
- TGivenIdRange::TGivenIdRange(const NKikimrBlobDepot::TGivenIdRange& proto) {
- for (const auto& range : proto.GetRanges()) {
- const ui64 begin = range.GetBegin();
- const ui32 len = range.GetLen();
- TRange& r = InsertNewRange(begin, begin + len);
- ui64 *chunks = const_cast<ui64*>(r.Bits.GetChunks());
- const ui32 count = r.Bits.GetChunkCount();
- Y_VERIFY(range.OffsetsSize() == range.BitMasksSize());
- for (ui32 i = 0; i < range.OffsetsSize(); ++i) {
- const ui32 offset = range.GetOffsets(i);
- const ui64 bitMask = range.GetBitMasks(i);
- NumAvailableItems += PopCount(bitMask) - 64;
- Y_VERIFY(offset < count);
- chunks[offset] = bitMask;
- }
+ void TGivenIdRange::IssueNewRange(ui64 begin, ui64 end) {
+ Y_VERIFY(begin < end);
+ const auto [it, inserted] = Ranges.emplace(begin, end);
+ Y_VERIFY(inserted);
+ if (it != Ranges.begin()) {
+ const auto& prev = *std::prev(it);
+ Y_VERIFY(prev.End <= begin);
+ }
+ if (std::next(it) != Ranges.end()) {
+ const auto& next = *std::next(it);
+ Y_VERIFY(end <= next.Begin);
}
+ NumAvailableItems += end - begin;
}
- TGivenIdRange::TRange& TGivenIdRange::InsertNewRange(ui64 begin, ui64 end) {
- const ui64 len = end - begin;
- Y_VERIFY(len);
- const auto it = Ranges.lower_bound(begin);
- if (it != Ranges.begin()) {
- const auto& [prevBegin, prev] = *std::prev(it);
- Y_VERIFY(prevBegin + prev.Len <= begin);
- }
- if (it != Ranges.end()) {
- Y_VERIFY(end <= it->first);
+ void TGivenIdRange::AddPoint(ui64 value) {
+ IssueNewRange(value, value + 1);
+ }
+
+ void TGivenIdRange::RemovePoint(ui64 value) {
+ const auto it = Ranges.upper_bound(value);
+ Y_VERIFY(it != Ranges.begin());
+ auto& range = const_cast<TRange&>(*std::prev(it));
+ Y_VERIFY(range.Begin <= value && value < range.End);
+ Y_VERIFY(range.Bits[value - range.Begin]);
+ range.Bits.Reset(value - range.Begin);
+ if (range.Bits.Empty()) {
+ Ranges.erase(it);
}
- NumAvailableItems += len;
- const auto newIt = Ranges.emplace_hint(it, begin, len);
- return newIt->second;
+ --NumAvailableItems;
}
- void TGivenIdRange::IssueNewRange(ui64 begin, ui64 end) {
- InsertNewRange(begin, end);
+ bool TGivenIdRange::IsEmpty() const {
+ return Ranges.empty();
+ }
+
+ ui32 TGivenIdRange::GetNumAvailableItems() const {
+ return NumAvailableItems;
}
- void TGivenIdRange::Join(TGivenIdRange&& other) {
- Ranges.merge(std::move(other.Ranges));
- if (Ranges.size() > 1) {
- for (auto it = std::next(Ranges.begin()); it != Ranges.end(); ++it) {
- const auto& [prevBegin, prev] = *std::prev(it);
- const auto& [begin, range] = *it;
- Y_VERIFY(prevBegin + prev.Len <= begin);
- }
+ ui64 TGivenIdRange::GetMinimumValue() const {
+ Y_VERIFY(!Ranges.empty());
+ const auto& range = *Ranges.begin();
+ size_t offset = range.Bits.FirstNonZeroBit();
+ Y_VERIFY(offset != range.Bits.Size());
+ return range.Begin + offset;
+ }
+
+ ui64 TGivenIdRange::Allocate() {
+ Y_VERIFY(!Ranges.empty());
+ const auto it = Ranges.begin();
+ auto& range = const_cast<TRange&>(*it);
+ size_t offset = range.Bits.FirstNonZeroBit();
+ Y_VERIFY(offset != range.Bits.Size());
+ range.Bits.Reset(offset);
+ const ui64 value = range.Begin + offset;
+ if (range.Bits.Empty()) {
+ Ranges.erase(it);
}
- NumAvailableItems += std::exchange(other.NumAvailableItems, 0);
+ --NumAvailableItems;
+ return value;
}
- void TGivenIdRange::ToProto(NKikimrBlobDepot::TGivenIdRange *proto) {
- for (const auto& [begin, range] : Ranges) {
- auto *r = proto->AddRanges();
- r->SetBegin(begin);
- r->SetLen(range.Len);
- const ui64 *chunks = range.Bits.GetChunks();
- const ui32 count = range.Bits.GetChunkCount();
- for (ui32 i = 0; i < count; ++i) {
- if (chunks[i] != ~ui64(0)) {
- r->AddOffsets(i);
- r->AddBitMasks(chunks[i]);
+ void TGivenIdRange::Trim(ui8 channel, ui32 generation, ui32 invalidatedStep) {
+ const ui64 validSince = 1 + TBlobSeqId{channel, generation, invalidatedStep, TBlobSeqId::MaxIndex}.ToSequentialNumber();
+
+ while (!Ranges.empty()) {
+ const auto it = Ranges.begin();
+ auto& range = const_cast<TRange&>(*it);
+ if (range.End <= validSince) {
+ NumAvailableItems -= range.Bits.Count();
+ Ranges.erase(it);
+ } else if (range.Begin < validSince) {
+ const ui32 len = validSince - range.Begin;
+ for (ui32 i = 0; i < len; ++i) {
+ NumAvailableItems -= range.Bits[i];
}
+ range.Bits.Reset(0, len);
+ } else {
+ break;
}
}
}
- void TGivenIdRange::RemovePoint(ui64 value) {
- auto it = Ranges.upper_bound(value);
- Y_VERIFY(it != Ranges.begin());
- auto& [begin, range] = *--it;
- Y_VERIFY(begin <= value);
- const ui64 offset = value - begin;
- Y_VERIFY(offset < range.Len);
- Y_VERIFY(range.Bits[offset]);
- range.Bits.Reset(offset);
- --NumAvailableItems;
- // FIXME: reduce range (maybe)?
+ void TGivenIdRange::Subtract(const TGivenIdRange& other) {
+ for (const TRange& range : other.Ranges) {
+ const auto it = Ranges.find(range.Begin);
+ Y_VERIFY(it != Ranges.end());
+ Y_VERIFY(range.End == it->End);
+ Y_VERIFY(range.Bits == it->Bits);
+ NumAvailableItems -= range.Bits.Count();
+ Ranges.erase(it);
+ }
}
void TGivenIdRange::Output(IOutputStream& s) const {
@@ -87,13 +103,13 @@ namespace NKikimr::NBlobDepot {
if (it != Ranges.begin()) {
s << " ";
}
- const auto& [begin, range] = *it;
- s << begin << ":" << range.Len << "[";
- for (ui32 i = 0; i < range.Len; ++i) {
- s << static_cast<int>(range.Bits[i]);
+ s << it->Begin << "-" << it->End << "[";
+ for (ui32 i = 0, count = it->End - it->Begin; i < count; ++i) {
+ s << int(it->Bits[i]);
}
s << "]";
}
+ s << "}";
}
TString TGivenIdRange::ToString() const {
@@ -102,26 +118,4 @@ namespace NKikimr::NBlobDepot {
return s.Str();
}
- bool TGivenIdRange::IsEmpty() const {
- return Ranges.empty();
- }
-
- ui32 TGivenIdRange::GetNumAvailableItems() const {
- return NumAvailableItems;
- }
-
- ui64 TGivenIdRange::Allocate() {
- Y_VERIFY(!Ranges.empty());
- const auto it = Ranges.begin();
- auto& [begin, range] = *it;
- const size_t offset = range.Bits.FirstNonZeroBit();
- const ui64 res = begin + offset;
- range.Bits.Reset(offset);
- if (range.Bits.NextNonZeroBit(offset) == range.Bits.Size()) {
- Ranges.erase(it);
- }
- --NumAvailableItems;
- return res;
- }
-
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/mon_main.cpp b/ydb/core/blob_depot/mon_main.cpp
index af2c79405f..f32db51082 100644
--- a/ydb/core/blob_depot/mon_main.cpp
+++ b/ydb/core/blob_depot/mon_main.cpp
@@ -1,6 +1,7 @@
#include "blob_depot_tablet.h"
#include "data.h"
#include "garbage_collection.h"
+#include "blocks.h"
namespace NKikimr::NBlobDepot {
@@ -173,16 +174,12 @@ namespace NKikimr::NBlobDepot {
void RenderTrashTable(bool header) {
HTML(Stream) {
if (header) {
- TABLEH() { Stream << "tablet id"; }
- TABLEH() { Stream << "channel"; }
TABLEH() { Stream << "group id"; }
TABLEH() { Stream << "blob id"; }
TABLEH() { Stream << "in flight"; }
} else {
- Self->Data->EnumerateTrash([&](ui64 tabletId, ui8 channel, ui32 groupId, TLogoBlobID blobId, bool inFlight) {
+ Self->Data->EnumerateTrash([&](ui32 groupId, TLogoBlobID blobId, bool inFlight) {
TABLER() {
- TABLED() { Stream << tabletId; }
- TABLED() { Stream << int(channel); }
TABLED() { Stream << groupId; }
TABLED() { Stream << blobId; }
TABLED() { Stream << (inFlight ? "*" : ""); }
@@ -218,7 +215,15 @@ namespace NKikimr::NBlobDepot {
void RenderBlocksTable(bool header) {
HTML(Stream) {
if (header) {
+ TABLEH() { Stream << "tablet id"; }
+ TABLEH() { Stream << "blocked generation"; }
} else {
+ Self->BlocksManager->Enumerate([&](ui64 tabletId, ui32 blockedGeneration) {
+ TABLER() {
+ TABLED() { Stream << tabletId; }
+ TABLED() { Stream << blockedGeneration; }
+ }
+ });
}
}
}
diff --git a/ydb/core/blob_depot/op_apply_config.cpp b/ydb/core/blob_depot/op_apply_config.cpp
index fdfb9c70d7..61402e4e04 100644
--- a/ydb/core/blob_depot/op_apply_config.cpp
+++ b/ydb/core/blob_depot/op_apply_config.cpp
@@ -4,7 +4,7 @@
namespace NKikimr::NBlobDepot {
void TBlobDepot::Handle(TEvBlobDepot::TEvApplyConfig::TPtr ev) {
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT06, "TEvApplyConfig", (TabletId, TabletID()), (Msg, ev->Get()->Record));
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT12, "TEvApplyConfig", (TabletId, TabletID()), (Msg, ev->Get()->Record));
class TTxApplyConfig : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
std::unique_ptr<IEventHandle> Response;
@@ -25,26 +25,30 @@ namespace NKikimr::NBlobDepot {
}
bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT13, "TTxApplyConfig::Execute", (TabletId, Self->TabletID()));
+
NIceDb::TNiceDb db(txc.DB);
auto table = db.Table<Schema::Config>().Key(Schema::Config::Key::Value).Select();
if (!table.IsReady()) {
return false;
- } else if (table.IsValid()) {
- WasConfigured = table.HaveValue<Schema::Config::ConfigProtobuf>();
- } else {
- WasConfigured = false;
}
+ WasConfigured = table.IsValid() && table.HaveValue<Schema::Config::ConfigProtobuf>();
db.Table<Schema::Config>().Key(Schema::Config::Key::Value).Update(
NIceDb::TUpdate<Schema::Config::ConfigProtobuf>(ConfigProtobuf)
);
+
+ const bool success = Self->Config.ParseFromString(ConfigProtobuf);
+ Y_VERIFY(success);
+
return true;
}
void Complete(const TActorContext&) override {
- const bool success = Self->Config.ParseFromString(ConfigProtobuf);
- Y_VERIFY(success);
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT14, "TTxApplyConfig::Complete", (TabletId, Self->TabletID()),
+ (WasConfigured, WasConfigured));
+
if (!WasConfigured) {
Self->InitChannelKinds();
}
diff --git a/ydb/core/blob_depot/op_commit_blob_seq.cpp b/ydb/core/blob_depot/op_commit_blob_seq.cpp
index 67182f2236..1c65c6a19c 100644
--- a/ydb/core/blob_depot/op_commit_blob_seq.cpp
+++ b/ydb/core/blob_depot/op_commit_blob_seq.cpp
@@ -36,9 +36,8 @@ namespace NKikimr::NBlobDepot {
auto *locator = chain->MutableLocator();
locator->CopyFrom(item.GetBlobLocator());
- if (!MarkGivenIdCommitted(agent, TBlobSeqId::FromProto(locator->GetBlobSeqId()), responseItem)) {
- continue;
- }
+ MarkGivenIdCommitted(agent, TBlobSeqId::FromProto(locator->GetBlobSeqId()));
+
if (!CheckKeyAgainstBarrier(item.GetKey(), responseItem)) {
continue;
}
@@ -60,28 +59,15 @@ namespace NKikimr::NBlobDepot {
return true;
}
- bool MarkGivenIdCommitted(TAgent& agent, const TBlobSeqId& blobSeqId,
- NKikimrBlobDepot::TEvCommitBlobSeqResult::TItem *responseItem) {
- const NKikimrBlobDepot::TChannelKind::E kind = blobSeqId.Channel < Self->ChannelToKind.size()
- ? Self->ChannelToKind[blobSeqId.Channel] : NKikimrBlobDepot::TChannelKind::System;
- if (kind == NKikimrBlobDepot::TChannelKind::System) {
- responseItem->SetStatus(NKikimrProto::ERROR);
- responseItem->SetErrorReason("incorrect Channel for blob");
- return false;
- }
+ void MarkGivenIdCommitted(TAgent& agent, const TBlobSeqId& blobSeqId) {
+ Y_VERIFY(blobSeqId.Generation == Self->Executor()->Generation());
+ Y_VERIFY(blobSeqId.Channel < Self->Channels.size());
- const auto channelKindIt = Self->ChannelKinds.find(kind);
- Y_VERIFY(channelKindIt != Self->ChannelKinds.end());
- auto& ck = channelKindIt->second;
+ auto& channel = Self->Channels[blobSeqId.Channel];
- const ui64 value = blobSeqId.ToBinary(ck);
- agent.ChannelKinds[kind].GivenIdRanges.RemovePoint(value);
- ck.GivenIdRanges.RemovePoint(value);
-
- auto& channel = Self->PerChannelRecords[blobSeqId.Channel];
- channel.GivenStepIndex.erase(std::make_pair(blobSeqId.Step, blobSeqId.Index));
-
- return true;
+ const ui64 value = blobSeqId.ToSequentialNumber();
+ agent.GivenIdRanges[blobSeqId.Channel].RemovePoint(value);
+ channel.GivenIdRanges.RemovePoint(value);
}
bool CheckKeyAgainstBarrier(const TString& key, NKikimrBlobDepot::TEvCommitBlobSeqResult::TItem *responseItem) {
diff --git a/ydb/core/blob_depot/op_load.cpp b/ydb/core/blob_depot/op_load.cpp
index e48ce2802e..d2db397b99 100644
--- a/ydb/core/blob_depot/op_load.cpp
+++ b/ydb/core/blob_depot/op_load.cpp
@@ -16,6 +16,8 @@ namespace NKikimr::NBlobDepot {
{}
bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT15, "TTxLoad::Execute", (TabletId, Self->TabletID()));
+
NIceDb::TNiceDb db(txc.DB);
if (!Precharge(db)) {
@@ -29,9 +31,8 @@ namespace NKikimr::NBlobDepot {
return false;
} else if (table.IsValid()) {
if (table.HaveValue<Schema::Config::ConfigProtobuf>()) {
- const bool success = Self->Config.ParseFromString(table.GetValue<Schema::Config::ConfigProtobuf>());
- Y_VERIFY(success);
- Configured = true;
+ Configured = Self->Config.ParseFromString(table.GetValue<Schema::Config::ConfigProtobuf>());
+ Y_VERIFY(Configured);
}
}
}
@@ -136,10 +137,15 @@ namespace NKikimr::NBlobDepot {
}
void Complete(const TActorContext&) override {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT16, "TTxLoad::Complete", (TabletId, Self->TabletID()),
+ (Configured, Configured));
+
if (Configured) {
Self->InitChannelKinds();
Self->Data->HandleTrash();
}
+
+ Self->OnLoadFinished();
}
};
diff --git a/ydb/core/blob_depot/op_resolve.cpp b/ydb/core/blob_depot/op_resolve.cpp
index 0b792f7c71..87fddfaae0 100644
--- a/ydb/core/blob_depot/op_resolve.cpp
+++ b/ydb/core/blob_depot/op_resolve.cpp
@@ -4,7 +4,7 @@
namespace NKikimr::NBlobDepot {
void TBlobDepot::Handle(TEvBlobDepot::TEvResolve::TPtr ev) {
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT07, "TEvResolve", (TabletId, TabletID()), (Msg, ev->Get()->ToString()),
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT17, "TEvResolve", (TabletId, TabletID()), (Msg, ev->Get()->ToString()),
(Sender, ev->Sender), (Recipient, ev->Recipient), (Cookie, ev->Cookie));
// collect records if needed
@@ -16,7 +16,7 @@ namespace NKikimr::NBlobDepot {
response->Record.SetStatus(NKikimrProto::OVERRUN);
}
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT08, "sending TEvResolveResult", (TabletId, TabletID()), (Msg, response->Record));
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT18, "sending TEvResolveResult", (TabletId, TabletID()), (Msg, response->Record));
auto handle = std::make_unique<IEventHandle>(ev->Sender, SelfId(), response.release(), 0, ev->Cookie);
if (ev->InterconnectSession) {
diff --git a/ydb/core/blob_depot/types.h b/ydb/core/blob_depot/types.h
index 763289ec8f..147c61a6f6 100644
--- a/ydb/core/blob_depot/types.h
+++ b/ydb/core/blob_depot/types.h
@@ -37,8 +37,13 @@ namespace NKikimr::NBlobDepot {
ui32 Index = 0;
auto AsTuple() const { return std::make_tuple(Channel, Generation, Step, Index); }
+
friend bool operator ==(const TBlobSeqId& x, const TBlobSeqId& y) { return x.AsTuple() == y.AsTuple(); }
friend bool operator !=(const TBlobSeqId& x, const TBlobSeqId& y) { return x.AsTuple() != y.AsTuple(); }
+ friend bool operator < (const TBlobSeqId& x, const TBlobSeqId& y) { return x.AsTuple() < y.AsTuple(); }
+ friend bool operator <=(const TBlobSeqId& x, const TBlobSeqId& y) { return x.AsTuple() <= y.AsTuple(); }
+ friend bool operator > (const TBlobSeqId& x, const TBlobSeqId& y) { return x.AsTuple() > y.AsTuple(); }
+ friend bool operator >=(const TBlobSeqId& x, const TBlobSeqId& y) { return x.AsTuple() >= y.AsTuple(); }
TString ToString() const {
return TStringBuilder() << "{" << Channel << ":" << Generation << ":" << Step << ":" << Index << "}";
@@ -48,23 +53,12 @@ namespace NKikimr::NBlobDepot {
return *this != TBlobSeqId();
}
- ui64 ToBinary(const TChannelKind& kind) const {
- Y_VERIFY_DEBUG(Index <= MaxIndex);
- Y_VERIFY(Channel < kind.ChannelToIndex.size());
- return (static_cast<ui64>(Step) << IndexBits | Index) * kind.ChannelGroups.size() + kind.ChannelToIndex[Channel];
+ ui64 ToSequentialNumber() const {
+ return ui64(Step) << IndexBits | Index;
}
- static TBlobSeqId FromBinary(ui32 generation, const TChannelKind& kind, ui64 value) {
- static_assert(sizeof(long long) >= sizeof(ui64));
- Y_VERIFY(!kind.ChannelGroups.empty());
- auto res = std::lldiv(value, kind.ChannelGroups.size());
-
- return TBlobSeqId{
- .Channel = kind.ChannelGroups[res.rem].first,
- .Generation = generation,
- .Step = static_cast<ui32>(res.quot >> IndexBits),
- .Index = static_cast<ui32>(res.quot) & MaxIndex
- };
+ static TBlobSeqId FromSequentalNumber(ui32 channel, ui32 generation, ui64 value) {
+ return {channel, generation, ui32(value >> IndexBits), ui32(value & MaxIndex)};
}
static TBlobSeqId FromProto(const NKikimrBlobDepot::TBlobSeqId& proto) {
@@ -105,38 +99,44 @@ namespace NKikimr::NBlobDepot {
class TGivenIdRange {
struct TRange {
- ui32 Len;
+ const ui64 Begin;
+ const ui64 End;
TDynBitMap Bits;
- TRange(ui32 len)
- : Len(len)
+ TRange(ui64 begin, ui64 end)
+ : Begin(begin)
+ , End(end)
{
- Bits.Set(0, len);
+ Bits.Set(0, end - begin);
}
+
+ struct TCompare {
+ bool operator ()(const TRange& x, const TRange& y) const { return x.Begin < y.Begin; }
+ bool operator ()(const TRange& x, ui64 y) const { return x.Begin < y; }
+ bool operator ()(ui64 x, const TRange& y) const { return x < y.Begin; }
+ using is_transparent = void;
+ };
};
- std::map<ui64, TRange> Ranges; // range.begin -> range
+
+ std::set<TRange, TRange::TCompare> Ranges;
ui32 NumAvailableItems = 0;
public:
- TGivenIdRange() = default;
- TGivenIdRange(const NKikimrBlobDepot::TGivenIdRange& proto);
-
- void ToProto(NKikimrBlobDepot::TGivenIdRange *proto);
-
- void Join(TGivenIdRange&& other);
-
void IssueNewRange(ui64 begin, ui64 end);
+ void AddPoint(ui64 value);
void RemovePoint(ui64 value);
bool IsEmpty() const;
ui32 GetNumAvailableItems() const;
+ ui64 GetMinimumValue() const;
ui64 Allocate();
+ void Subtract(const TGivenIdRange& other);
+
+ void Trim(ui8 channel, ui32 generation, ui32 invalidatedStep);
+
void Output(IOutputStream& s) const;
TString ToString() const;
-
- private:
- TRange& InsertNewRange(ui64 begin, ui64 len);
};
using TValueChain = NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TValueChain>;
@@ -164,4 +164,8 @@ namespace NKikimr::NBlobDepot {
return GenStep(id.Generation(), id.Step());
}
+ inline ui64 GenStep(TBlobSeqId id) {
+ return GenStep(id.Generation, id.Step);
+ }
+
} // NKikimr::NBlobDepot
diff --git a/ydb/core/mind/bscontroller/bsc.cpp b/ydb/core/mind/bscontroller/bsc.cpp
index b6bd67b278..c1455f7dbb 100644
--- a/ydb/core/mind/bscontroller/bsc.cpp
+++ b/ydb/core/mind/bscontroller/bsc.cpp
@@ -57,6 +57,15 @@ TBlobStorageController::TVSlotInfo::TVSlotInfo(TVSlotId vSlotId, TPDiskInfo *pdi
}
void TBlobStorageController::TGroupInfo::CalculateGroupStatus() {
+ if (VirtualGroupState) {
+ if (VirtualGroupState == NKikimrBlobStorage::EVirtualGroupState::WORKING) {
+ Status = {NKikimrBlobStorage::TGroupStatus::FULL, NKikimrBlobStorage::TGroupStatus::FULL};
+ } else {
+ Status = {NKikimrBlobStorage::TGroupStatus::DISINTEGRATED, NKikimrBlobStorage::TGroupStatus::DISINTEGRATED};
+ }
+ return;
+ }
+
TBlobStorageGroupInfo::TGroupVDisks failed(Topology.get());
TBlobStorageGroupInfo::TGroupVDisks failedByPDisk(Topology.get());
for (const TVSlotInfo *slot : VDisksInGroup) {
diff --git a/ydb/core/mind/bscontroller/config_fit_groups.cpp b/ydb/core/mind/bscontroller/config_fit_groups.cpp
index e3a97b5414..e86d44bae7 100644
--- a/ydb/core/mind/bscontroller/config_fit_groups.cpp
+++ b/ydb/core/mind/bscontroller/config_fit_groups.cpp
@@ -131,7 +131,6 @@ namespace NKikimr {
Geometry.GetNumFailDomainsPerFailRealm(), Geometry.GetNumVDisksPerFailDomain());
// bind group to storage pool
- groupInfo->StoragePoolId = StoragePoolId;
State.StoragePoolGroups.Unshare().emplace(StoragePoolId, groupId);
const TGroupSpecies species = groupInfo->GetGroupSpecies();
diff --git a/ydb/core/mind/bscontroller/impl.h b/ydb/core/mind/bscontroller/impl.h
index 0e9bcac498..64514b032d 100644
--- a/ydb/core/mind/bscontroller/impl.h
+++ b/ydb/core/mind/bscontroller/impl.h
@@ -694,9 +694,21 @@ public:
}
void FillInGroupParameters(NKikimrBlobStorage::TEvControllerSelectGroupsResult::TGroupParameters *params) const {
- FillInResources(params->MutableAssuredResources(), true);
- FillInResources(params->MutableCurrentResources(), false);
- FillInVDiskResources(params);
+ if (VirtualGroupState) {
+ for (auto *p : {params->MutableAssuredResources(), params->MutableCurrentResources()}) {
+ p->SetSpace(1'000'000'000'000);
+ p->SetIOPS(1'000);
+ p->SetReadThroughput(100'000'000);
+ p->SetWriteThroughput(100'000'000);
+ }
+ params->SetAllocatedSize(1'000'000'000'000);
+ params->SetAvailableSize(1'000'000'000'000);
+ params->SetSpaceColor(NKikimrBlobStorage::TPDiskSpaceColor::GREEN);
+ } else {
+ FillInResources(params->MutableAssuredResources(), true);
+ FillInResources(params->MutableCurrentResources(), false);
+ FillInVDiskResources(params);
+ }
}
void FillInResources(NKikimrBlobStorage::TEvControllerSelectGroupsResult::TGroupParameters::TResources *pb, bool countMaxSlots) const {
diff --git a/ydb/core/mind/bscontroller/virtual_group.cpp b/ydb/core/mind/bscontroller/virtual_group.cpp
index f85bf18e7e..a905aafcb0 100644
--- a/ydb/core/mind/bscontroller/virtual_group.cpp
+++ b/ydb/core/mind/bscontroller/virtual_group.cpp
@@ -22,7 +22,7 @@ namespace NKikimr::NBsController {
// determine storage pool that will contain newly created virtual group
TBoxStoragePoolId storagePoolId;
- auto& pools = StoragePools.Get();
+ auto& pools = StoragePools.Unshare();
switch (cmd.GetStoragePoolCase()) {
case NKikimrBlobStorage::TAllocateVirtualGroup::kStoragePoolName: {
ui32 found = 0;
@@ -62,6 +62,10 @@ namespace NKikimr::NBsController {
pool.EncryptionMode.GetOrElse(TBlobStorageGroupInfo::EEM_NONE), TBlobStorageGroupInfo::ELCP_INITIAL,
TString(), TString(), 0u, 0u, false, false, storagePoolId, 0u, 0u, 0u);
+ // bind group to storage pool
+ ++pool.NumGroups;
+ StoragePoolGroups.Unshare().emplace(storagePoolId, group->ID);
+
group->VirtualGroupPool = id;
group->VirtualGroupState = NKikimrBlobStorage::EVirtualGroupState::CREATED;
group->ParentDir = cmd.GetParentDir();
@@ -70,8 +74,11 @@ namespace NKikimr::NBsController {
if (cmd.GetBlobDepotId()) {
group->VirtualGroupState = NKikimrBlobStorage::EVirtualGroupState::WORKING;
group->BlobDepotId = cmd.GetBlobDepotId();
+ group->SeenOperational = true;
}
+ group->CalculateGroupStatus();
+
NKikimrBlobDepot::TBlobDepotConfig config;
config.SetOperationMode(NKikimrBlobDepot::EOperationMode::VirtualGroup);
config.MutableChannelProfiles()->CopyFrom(cmd.GetChannelProfiles());
@@ -129,6 +136,9 @@ namespace NKikimr::NBsController {
PARAM(BlobDepotId)
PARAM(ErrorReason)
#undef PARAM
+ if (group->SeenOperational) {
+ row.Update<T::SeenOperational>(true);
+ }
return true;
}
@@ -232,6 +242,7 @@ namespace NKikimr::NBsController {
group->VirtualGroupState = NKikimrBlobStorage::EVirtualGroupState::CREATE_FAILED;
group->ErrorReason = record.GetSchemeShardReason();
}
+ group->CalculateGroupStatus();
Self->Execute(new TTxUpdateGroup(this));
}
@@ -290,6 +301,8 @@ namespace NKikimr::NBsController {
const auto& desc = record.GetPathDescription().GetBlobDepotDescription();
group->VirtualGroupState = NKikimrBlobStorage::EVirtualGroupState::WORKING;
group->BlobDepotId = desc.GetTabletId();
+ group->SeenOperational = true;
+ group->CalculateGroupStatus();
Y_VERIFY(*group->BlobDepotId);
Self->Execute(new TTxUpdateGroup(this));
diff --git a/ydb/core/protos/blob_depot.proto b/ydb/core/protos/blob_depot.proto
index 555430f272..15f9c568f5 100644
--- a/ydb/core/protos/blob_depot.proto
+++ b/ydb/core/protos/blob_depot.proto
@@ -38,13 +38,12 @@ message TValue {
}
message TGivenIdRange {
- message TRange {
- optional uint64 Begin = 1;
- optional uint32 Len = 2;
- repeated uint32 Offsets = 3;
- repeated fixed64 BitMasks = 4;
+ message TChannelRange {
+ optional uint32 Channel = 1;
+ optional uint64 Begin = 2;
+ optional uint64 End = 3;
}
- repeated TRange Ranges = 1;
+ repeated TChannelRange ChannelRanges = 1;
}
@@ -62,6 +61,7 @@ message TEvApplyConfigResult {
message TEvRegisterAgent {
optional uint32 VirtualGroupId = 1; // for validation purposes
+ optional fixed64 AgentInstanceId = 2; // randomly generated number every time agent starts from scratch
}
message TEvRegisterAgentResult {
@@ -104,10 +104,17 @@ message TEvPushNotify {
optional fixed64 TabletId = 1;
optional uint32 BlockedGeneration = 2;
}
+ message TInvalidatedStep {
+ optional uint32 Channel = 1;
+ optional uint32 Generation = 2; // for validation purposes
+ optional uint32 InvalidatedStep = 3;
+ }
repeated TBlockedTablet BlockedTablets = 1;
+ repeated TInvalidatedStep InvalidatedSteps = 2;
}
message TEvPushNotifyResult {
+ repeated TBlobSeqId WritesInFlight = 1;
}
message TEvQueryBlocks {