diff options
author | alexvru <alexvru@ydb.tech> | 2022-07-07 21:08:05 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2022-07-07 21:08:05 +0300 |
commit | 996ba6df0ac5b5f030b5608ea4221286c8b8c122 (patch) | |
tree | ff422dec2f821585a849b12927aff0ec55828186 | |
parent | 95d4076addb1b5489601ccf87d38119b3a651260 (diff) | |
download | ydb-996ba6df0ac5b5f030b5608ea4221286c8b8c122.tar.gz |
Support proper garbage collection in BlobDepot
24 files changed, 1793 insertions, 662 deletions
diff --git a/ydb/core/blob_depot/CMakeLists.txt b/ydb/core/blob_depot/CMakeLists.txt index f393be16f28..236ee49e557 100644 --- a/ydb/core/blob_depot/CMakeLists.txt +++ b/ydb/core/blob_depot/CMakeLists.txt @@ -20,6 +20,8 @@ target_sources(ydb-core-blob_depot PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blocks.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/garbage_collection.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/given_id_range.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/mon_main.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/op_apply_config.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/op_init_schema.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/op_load.cpp diff --git a/ydb/core/blob_depot/agent.cpp b/ydb/core/blob_depot/agent.cpp index ff78ad2e75b..00d18d2168b 100644 --- a/ydb/core/blob_depot/agent.cpp +++ b/ydb/core/blob_depot/agent.cpp @@ -14,7 +14,7 @@ namespace NKikimr::NBlobDepot { Y_VERIFY(it != PipeServerToNode.end()); if (const auto& nodeId = it->second) { if (const auto agentIt = Agents.find(*nodeId); agentIt != Agents.end()) { - if (TAgentInfo& agent = agentIt->second; agent.ConnectedAgent == it->first) { + if (TAgent& agent = agentIt->second; agent.ConnectedAgent == it->first) { OnAgentDisconnect(agent); agent.ConnectedAgent.reset(); agent.ConnectedNodeId = 0; @@ -25,7 +25,7 @@ namespace NKikimr::NBlobDepot { PipeServerToNode.erase(it); } - void TBlobDepot::OnAgentDisconnect(TAgentInfo& /*agent*/) { + void TBlobDepot::OnAgentDisconnect(TAgent& /*agent*/) { } void TBlobDepot::Handle(TEvBlobDepot::TEvRegisterAgent::TPtr ev) { @@ -58,36 +58,52 @@ namespace NKikimr::NBlobDepot { TActivationContext::Send(response.release()); } - void TBlobDepot::OnAgentConnect(TAgentInfo& /*agent*/) { + void TBlobDepot::OnAgentConnect(TAgent& /*agent*/) { } void TBlobDepot::Handle(TEvBlobDepot::TEvAllocateIds::TPtr ev) { STLOG(PRI_DEBUG, BLOB_DEPOT, BDT04, "TEvAllocateIds", (TabletId, TabletID()), (Msg, ev->Get()->Record), (PipeServerId, ev->Recipient)); - auto [response, record] = TEvBlobDepot::MakeResponseFor(*ev, SelfId(), ev->Get()->Record.GetChannelKind(), - Executor()->Generation()); + const ui32 generation = Executor()->Generation(); + + auto [response, record] = TEvBlobDepot::MakeResponseFor(*ev, SelfId(), ev->Get()->Record.GetChannelKind(), generation); if (const auto it = ChannelKinds.find(record->GetChannelKind()); it != ChannelKinds.end()) { - auto& nextBlobSeqId = it->second.NextBlobSeqId; - record->SetRangeBegin(nextBlobSeqId); - nextBlobSeqId += PreallocatedIdCount; - record->SetRangeEnd(nextBlobSeqId); + auto& kind = it->second; + + const ui64 rangeBegin = kind.NextBlobSeqId; + kind.NextBlobSeqId += ev->Get()->Record.GetCount(); + const ui64 rangeEnd = kind.NextBlobSeqId; + + TGivenIdRange range; + range.IssueNewRange(rangeBegin, rangeEnd); + + range.ToProto(record->MutableGivenIdRange()); + + TAgent& agent = GetAgent(ev->Recipient); + agent.ChannelKinds[it->first].GivenIdRanges.Join(TGivenIdRange(range)); + kind.GivenIdRanges.Join(std::move(range)); + + for (ui64 value = rangeBegin; value < rangeEnd; ++value) { + const auto blobSeqId = TBlobSeqId::FromBinary(generation, kind, value); + PerChannelRecords[blobSeqId.Channel].GivenStepIndex.emplace(blobSeqId.Step, blobSeqId.Index); + } } TActivationContext::Send(response.release()); } - TBlobDepot::TAgentInfo& TBlobDepot::GetAgent(const TActorId& pipeServerId) { + TBlobDepot::TAgent& TBlobDepot::GetAgent(const TActorId& pipeServerId) { const auto it = PipeServerToNode.find(pipeServerId); Y_VERIFY(it != PipeServerToNode.end()); Y_VERIFY(it->second); - TAgentInfo& agent = GetAgent(*it->second); + TAgent& agent = GetAgent(*it->second); Y_VERIFY(agent.ConnectedAgent == pipeServerId); return agent; } - TBlobDepot::TAgentInfo& TBlobDepot::GetAgent(ui32 nodeId) { + TBlobDepot::TAgent& TBlobDepot::GetAgent(ui32 nodeId) { const auto agentIt = Agents.find(nodeId); Y_VERIFY(agentIt != Agents.end()); return agentIt->second; @@ -97,6 +113,9 @@ namespace NKikimr::NBlobDepot { TTabletStorageInfo *info = Info(); const ui32 generation = Executor()->Generation(); + Y_VERIFY(ChannelToKind.empty()); + ChannelToKind.resize(info->Channels.size(), NKikimrBlobDepot::TChannelKind::System); + ui32 channel = 0; for (const auto& profile : Config.GetChannelProfiles()) { for (ui32 i = 0, count = profile.GetCount(); i < count; ++i, ++channel) { @@ -105,9 +124,16 @@ namespace NKikimr::NBlobDepot { auto& p = ChannelKinds[kind]; p.ChannelToIndex[channel] = p.ChannelGroups.size(); p.ChannelGroups.emplace_back(channel, info->GroupFor(channel, generation)); + + Y_VERIFY(channel < ChannelToKind.size()); + ChannelToKind[channel] = kind; } } } + + Y_VERIFY_S(channel == ChannelToKind.size(), "channel# " << channel + << " ChannelToKind.size# " << ChannelToKind.size()); + for (auto& [k, v] : ChannelKinds) { v.NextBlobSeqId = TBlobSeqId{v.ChannelGroups.front().first, generation, 1, 0}.ToBinary(v); } diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h index 95216de7d64..75c4b88bdbb 100644 --- a/ydb/core/blob_depot/agent/agent_impl.h +++ b/ydb/core/blob_depot/agent/agent_impl.h @@ -246,16 +246,10 @@ namespace NKikimr::NBlobDepot { struct TChannelKind : NBlobDepot::TChannelKind { - struct TAllocatedId { - ui32 Generation; - ui64 Begin; - ui64 End; - }; - const NKikimrBlobDepot::TChannelKind::E Kind; bool IdAllocInFlight = false; - std::deque<TAllocatedId> IdQ; + TGivenIdRange GivenIdRange; static constexpr size_t PreallocatedIdCount = 2; TIntrusiveList<TQuery, TPendingId> QueriesWaitingForId; @@ -265,17 +259,11 @@ namespace NKikimr::NBlobDepot { {} std::optional<TBlobSeqId> Allocate(TBlobDepotAgent& agent) { - if (IdQ.empty()) { + if (GivenIdRange.IsEmpty()) { return std::nullopt; } - - auto& item = IdQ.front(); - auto blobSeqId = TBlobSeqId::FromBinary(item.Generation, *this, item.Begin++); - if (item.Begin == item.End) { - IdQ.pop_front(); - agent.IssueAllocateIdsIfNeeded(*this); - } - + auto blobSeqId = TBlobSeqId::FromBinary(agent.BlobDepotGeneration, *this, GivenIdRange.Allocate()); + agent.IssueAllocateIdsIfNeeded(*this); return blobSeqId; } diff --git a/ydb/core/blob_depot/agent/comm.cpp b/ydb/core/blob_depot/agent/comm.cpp index aa8abeff2ba..8aa8edf9dc3 100644 --- a/ydb/core/blob_depot/agent/comm.cpp +++ b/ydb/core/blob_depot/agent/comm.cpp @@ -18,6 +18,8 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::ConnectToBlobDepot() { PipeId = Register(NTabletPipe::CreateClient(SelfId(), TabletId, NTabletPipe::TClientRetryPolicy::WithRetries())); const ui64 id = NextRequestId++; + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA04, "ConnectToBlobDepot", (VirtualGroupId, VirtualGroupId), + (PipeId, PipeId), (RequestId, id)); NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobDepot::TEvRegisterAgent(VirtualGroupId), id); RegisterRequest(id, this, nullptr, {}, true); } @@ -60,13 +62,13 @@ namespace NKikimr::NBlobDepot { } void TBlobDepotAgent::IssueAllocateIdsIfNeeded(TChannelKind& kind) { - STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA05, "IssueAllocateIdsIfNeeded", (VirtualGroupId, VirtualGroupId), - (ChannelKind, NKikimrBlobDepot::TChannelKind::E_Name(kind.Kind)), - (IdAllocInFlight, kind.IdAllocInFlight), (IdQ.size, kind.IdQ.size()), - (PreallocatedIdCount, kind.PreallocatedIdCount), (PipeId, PipeId)); - if (!kind.IdAllocInFlight && kind.IdQ.size() < kind.PreallocatedIdCount && PipeId) { + if (!kind.IdAllocInFlight && kind.GivenIdRange.GetNumAvailableItems() < 100 && PipeId) { const ui64 id = NextRequestId++; - NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobDepot::TEvAllocateIds(kind.Kind), id); + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA05, "IssueAllocateIdsIfNeeded", (VirtualGroupId, VirtualGroupId), + (ChannelKind, NKikimrBlobDepot::TChannelKind::E_Name(kind.Kind)), + (IdAllocInFlight, kind.IdAllocInFlight), (NumAvailableItems, kind.GivenIdRange.GetNumAvailableItems()), + (PreallocatedIdCount, kind.PreallocatedIdCount), (RequestId, id)); + NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobDepot::TEvAllocateIds(kind.Kind, 100), id); RegisterRequest(id, this, std::make_shared<TAllocateIdsContext>(kind.Kind), {}, true); kind.IdAllocInFlight = true; } @@ -78,7 +80,8 @@ namespace NKikimr::NBlobDepot { auto& allocateIdsContext = context->Obtain<TAllocateIdsContext>(); const auto it = ChannelKinds.find(allocateIdsContext.ChannelKind); - Y_VERIFY_S(it != ChannelKinds.end(), "Kind# " << NKikimrBlobDepot::TChannelKind::E_Name(allocateIdsContext.ChannelKind)); + Y_VERIFY_S(it != ChannelKinds.end(), "Kind# " << NKikimrBlobDepot::TChannelKind::E_Name(allocateIdsContext.ChannelKind) + << " Msg# " << SingleLineProto(msg)); auto& kind = it->second; Y_VERIFY(kind.IdAllocInFlight); @@ -87,12 +90,11 @@ namespace NKikimr::NBlobDepot { Y_VERIFY(msg.GetChannelKind() == allocateIdsContext.ChannelKind); Y_VERIFY(msg.GetGeneration() == BlobDepotGeneration); - if (msg.HasRangeBegin() && msg.HasRangeEnd()) { - kind.IdQ.push_back({BlobDepotGeneration, msg.GetRangeBegin(), msg.GetRangeEnd()}); + if (msg.HasGivenIdRange()) { + TGivenIdRange range(msg.GetGivenIdRange()); + kind.GivenIdRange.Join(std::move(range)); kind.ProcessQueriesWaitingForId(); IssueAllocateIdsIfNeeded(kind); - } else { - // no such channel allocated } } diff --git a/ydb/core/blob_depot/blob_depot.cpp b/ydb/core/blob_depot/blob_depot.cpp index ee70e4b323a..7995c10aadd 100644 --- a/ydb/core/blob_depot/blob_depot.cpp +++ b/ydb/core/blob_depot/blob_depot.cpp @@ -1,8 +1,54 @@ #include "blob_depot.h" #include "blob_depot_tablet.h" +#include "blocks.h" +#include "garbage_collection.h" +#include "data.h" namespace NKikimr::NBlobDepot { + TBlobDepot::TBlobDepot(TActorId tablet, TTabletStorageInfo *info) + : TActor(&TThis::StateInit) + , TTabletExecutedFlat(info, tablet, new NMiniKQL::TMiniKQLFactory) + , BlocksManager(new TBlocksManager(this)) + , BarrierServer(new TBarrierServer(this)) + , Data(new TData(this)) + {} + + TBlobDepot::~TBlobDepot() + {} + + STFUNC(TBlobDepot::StateWork) { + try { + switch (const ui32 type = ev->GetTypeRewrite()) { + cFunc(TEvents::TSystem::Poison, HandlePoison); + + hFunc(TEvBlobDepot::TEvApplyConfig, Handle); + hFunc(TEvBlobDepot::TEvRegisterAgent, Handle); + hFunc(TEvBlobDepot::TEvAllocateIds, Handle); + hFunc(TEvBlobDepot::TEvCommitBlobSeq, Handle); + hFunc(TEvBlobDepot::TEvResolve, Handle); + + hFunc(TEvBlobDepot::TEvBlock, BlocksManager->Handle); + hFunc(TEvBlobDepot::TEvQueryBlocks, BlocksManager->Handle); + + hFunc(TEvBlobDepot::TEvCollectGarbage, BarrierServer->Handle); + + hFunc(TEvBlobStorage::TEvCollectGarbageResult, Data->Handle); + + hFunc(TEvTabletPipe::TEvServerConnected, Handle); + hFunc(TEvTabletPipe::TEvServerDisconnected, Handle); + + default: + if (!HandleDefaultEvents(ev, ctx)) { + Y_FAIL("unexpected event Type# 0x%08" PRIx32, type); + } + break; + } + } catch (...) { + Y_FAIL_S("unexpected exception# " << CurrentExceptionMessage()); + } + } + IActor *CreateBlobDepot(const TActorId& tablet, TTabletStorageInfo *info) { return new TBlobDepot(tablet, info); } diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h index b1ef30bf076..25374384e77 100644 --- a/ydb/core/blob_depot/blob_depot_tablet.h +++ b/ydb/core/blob_depot/blob_depot_tablet.h @@ -19,13 +19,8 @@ namespace NKikimr::NBlobDepot { }; public: - TBlobDepot(TActorId tablet, TTabletStorageInfo *info) - : TActor(&TThis::StateInit) - , TTabletExecutedFlat(info, tablet, new NMiniKQL::TMiniKQLFactory) - , BlocksManager(CreateBlocksManager()) - , GarbageCollectionManager(CreateGarbageCollectionManager()) - , DataManager(CreateDataManager()) - {} + TBlobDepot(TActorId tablet, TTabletStorageInfo *info); + ~TBlobDepot(); void HandlePoison() { STLOG(PRI_DEBUG, BLOB_DEPOT, BDT09, "HandlePoison", (TabletId, TabletID())); @@ -36,33 +31,45 @@ namespace NKikimr::NBlobDepot { //////////////////////////////////////////////////////////////////////////////////////////////////////////////// static constexpr TDuration ExpirationTimeout = TDuration::Minutes(1); - static constexpr ui32 PreallocatedIdCount = 100; - struct TAgentInfo { + struct TAgent { std::optional<TActorId> ConnectedAgent; ui32 ConnectedNodeId; TInstant ExpirationTimestamp; + + struct TChannelKind { + TGivenIdRange GivenIdRanges; // updated on AllocateIds and when BlobSeqIds are found in any way + }; + + THashMap<NKikimrBlobDepot::TChannelKind::E, TChannelKind> ChannelKinds; }; THashMap<TActorId, std::optional<ui32>> PipeServerToNode; - THashMap<ui32, TAgentInfo> Agents; // NodeId -> Agent + THashMap<ui32, TAgent> Agents; // NodeId -> Agent struct TChannelKind : NBlobDepot::TChannelKind { ui64 NextBlobSeqId = 0; + TGivenIdRange GivenIdRanges; // for all agents, including disconnected ones }; THashMap<NKikimrBlobDepot::TChannelKind::E, TChannelKind> ChannelKinds; + std::vector<NKikimrBlobDepot::TChannelKind::E> ChannelToKind; + + struct TPerChannelRecord { + std::set<std::tuple<ui32, ui32>> GivenStepIndex; + }; + THashMap<ui8, TPerChannelRecord> PerChannelRecords; void Handle(TEvTabletPipe::TEvServerConnected::TPtr ev); void Handle(TEvTabletPipe::TEvServerDisconnected::TPtr ev); - void OnAgentDisconnect(TAgentInfo& agent); + void OnAgentDisconnect(TAgent& agent); void Handle(TEvBlobDepot::TEvRegisterAgent::TPtr ev); - void OnAgentConnect(TAgentInfo& agent); + void OnAgentConnect(TAgent& agent); void Handle(TEvBlobDepot::TEvAllocateIds::TPtr ev); - TAgentInfo& GetAgent(const TActorId& pipeServerId); - TAgentInfo& GetAgent(ui32 nodeId); + TAgent& GetAgent(const TActorId& pipeServerId); + TAgent& GetAgent(ui32 nodeId); //////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -111,35 +118,7 @@ namespace NKikimr::NBlobDepot { StateInitImpl(ev, ctx); } - STFUNC(StateWork) { - try { - switch (const ui32 type = ev->GetTypeRewrite()) { - cFunc(TEvents::TSystem::Poison, HandlePoison); - - hFunc(TEvBlobDepot::TEvApplyConfig, Handle); - hFunc(TEvBlobDepot::TEvRegisterAgent, Handle); - hFunc(TEvBlobDepot::TEvAllocateIds, Handle); - hFunc(TEvBlobDepot::TEvCommitBlobSeq, Handle); - hFunc(TEvBlobDepot::TEvResolve, Handle); - - hFunc(TEvBlobDepot::TEvBlock, Handle); - hFunc(TEvBlobDepot::TEvQueryBlocks, Handle); - - hFunc(TEvBlobDepot::TEvCollectGarbage, Handle); - - hFunc(TEvTabletPipe::TEvServerConnected, Handle); - hFunc(TEvTabletPipe::TEvServerDisconnected, Handle); - - default: - if (!HandleDefaultEvents(ev, ctx)) { - Y_FAIL("unexpected event Type# 0x%08" PRIx32, type); - } - break; - } - } catch (...) { - Y_FAIL_S("unexpected exception# " << CurrentExceptionMessage()); - } - } + void StateWork(STFUNC_SIG); //////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -167,67 +146,31 @@ namespace NKikimr::NBlobDepot { // Blocks class TBlocksManager; - using TBlocksManagerPtr = std::unique_ptr<TBlocksManager, std::function<void(TBlocksManager*)>>; - TBlocksManagerPtr BlocksManager; - - TBlocksManagerPtr CreateBlocksManager(); - - void AddBlockOnLoad(ui64 tabletId, ui32 blockedGeneration); - - void Handle(TEvBlobDepot::TEvBlock::TPtr ev); - void Handle(TEvBlobDepot::TEvQueryBlocks::TPtr ev); + std::unique_ptr<TBlocksManager> BlocksManager; //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Garbage collection - class TGarbageCollectionManager; - using TGarbageCollectionManagerPtr = std::unique_ptr<TGarbageCollectionManager, std::function<void(TGarbageCollectionManager*)>>; - TGarbageCollectionManagerPtr GarbageCollectionManager; - - TGarbageCollectionManagerPtr CreateGarbageCollectionManager(); - - void Handle(TEvBlobDepot::TEvCollectGarbage::TPtr ev); - - bool CheckBlobForBarrier(TLogoBlobID id) const; + class TBarrierServer; + std::unique_ptr<TBarrierServer> BarrierServer; //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Data operations - class TDataManager; - using TDataManagerPtr = std::unique_ptr<TDataManager, std::function<void(TDataManager*)>>; - TDataManagerPtr DataManager; - - TDataManagerPtr CreateDataManager(); - - using TValueChain = NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TValueChain>; + class TData; + std::unique_ptr<TData> Data; - struct TDataValue { - TString Meta; - TValueChain ValueChain; - NKikimrBlobDepot::EKeepState KeepState; - bool Public; - }; - - enum EScanFlags : ui32 { - INCLUDE_BEGIN = 1, - INCLUDE_END = 2, - REVERSE = 4, - }; + void Handle(TEvBlobDepot::TEvCommitBlobSeq::TPtr ev); + void Handle(TEvBlobDepot::TEvResolve::TPtr ev); - Y_DECLARE_FLAGS(TScanFlags, EScanFlags) + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Monitoring - std::optional<TDataValue> FindKey(TStringBuf key); - void ScanRange(const std::optional<TStringBuf>& begin, const std::optional<TStringBuf>& end, TScanFlags flags, - const std::function<bool(TStringBuf, const TDataValue&)>& callback); - void DeleteKey(TStringBuf key); - void PutKey(TString key, TDataValue&& data); - void AddDataOnLoad(TString key, TString value); - std::optional<TString> UpdateKeepState(TStringBuf key, NKikimrBlobDepot::EKeepState keepState); + class TTxMonData; + + bool OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TActorContext&) override; - void Handle(TEvBlobDepot::TEvCommitBlobSeq::TPtr ev); - void Handle(TEvBlobDepot::TEvResolve::TPtr ev); + void RenderMainPage(IOutputStream& s); }; - Y_DECLARE_OPERATORS_FOR_FLAGS(TBlobDepot::TScanFlags) - } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/blocks.cpp b/ydb/core/blob_depot/blocks.cpp index 2bcb4ce0a77..489b99045ec 100644 --- a/ydb/core/blob_depot/blocks.cpp +++ b/ydb/core/blob_depot/blocks.cpp @@ -1,278 +1,233 @@ -#include "blob_depot_tablet.h" +#include "blocks.h" #include "schema.h" namespace NKikimr::NBlobDepot { - class TBlobDepot::TBlocksManager { - // wait duration before issuing blocks via storage - static constexpr TDuration AgentsWaitTime = TDuration::Seconds(1); + class TBlobDepot::TBlocksManager::TTxUpdateBlock : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { + const ui64 TabletId; + const ui32 BlockedGeneration; + const ui32 NodeId; + const TInstant Timestamp; + std::unique_ptr<IEventHandle> Response; - // TTL for block lease - static constexpr TDuration BlockLeaseTime = TDuration::Seconds(60); - - struct TBlock { - struct TPerAgentInfo { - TMonotonic ExpirationTimestamp = TMonotonic::Zero(); - }; - - ui32 BlockedGeneration = 0; - THashMap<ui32, TPerAgentInfo> PerAgentInfo; - }; + public: + TTxUpdateBlock(TBlobDepot *self, ui64 tabletId, ui32 blockedGeneration, ui32 nodeId, TInstant timestamp, + std::unique_ptr<IEventHandle> response) + : TTransactionBase(self) + , TabletId(tabletId) + , BlockedGeneration(blockedGeneration) + , NodeId(nodeId) + , Timestamp(timestamp) + , Response(std::move(response)) + {} - TBlobDepot* const Self; - THashMap<ui64, TBlock> Blocks; - - private: - class TTxUpdateBlock : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { - const ui64 TabletId; - const ui32 BlockedGeneration; - const ui32 NodeId; - const TInstant Timestamp; - std::unique_ptr<IEventHandle> Response; - - public: - TTxUpdateBlock(TBlobDepot *self, ui64 tabletId, ui32 blockedGeneration, ui32 nodeId, TInstant timestamp, - std::unique_ptr<IEventHandle> response) - : TTransactionBase(self) - , TabletId(tabletId) - , BlockedGeneration(blockedGeneration) - , NodeId(nodeId) - , Timestamp(timestamp) - , Response(std::move(response)) - {} - - bool Execute(TTransactionContext& txc, const TActorContext&) override { + bool Execute(TTransactionContext& txc, const TActorContext&) override { + auto& block = Self->BlocksManager->Blocks[TabletId]; + if (BlockedGeneration <= block.BlockedGeneration) { + Response->Get<TEvBlobDepot::TEvBlockResult>()->Record.SetStatus(NKikimrProto::ALREADY); + } else { + // update block value in memory auto& block = Self->BlocksManager->Blocks[TabletId]; - if (BlockedGeneration <= block.BlockedGeneration) { - Response->Get<TEvBlobDepot::TEvBlockResult>()->Record.SetStatus(NKikimrProto::ALREADY); - } else { - // update block value in memory - auto& block = Self->BlocksManager->Blocks[TabletId]; - block.BlockedGeneration = BlockedGeneration; - - // and persist it - NIceDb::TNiceDb db(txc.DB); - db.Table<Schema::Blocks>().Key(TabletId).Update( - NIceDb::TUpdate<Schema::Blocks::BlockedGeneration>(BlockedGeneration), - NIceDb::TUpdate<Schema::Blocks::IssuedByNode>(NodeId), - NIceDb::TUpdate<Schema::Blocks::IssueTimestamp>(Timestamp) - ); - } - return true; - } - - void Complete(const TActorContext&) override { - if (Response->Get<TEvBlobDepot::TEvBlockResult>()->Record.GetStatus() != NKikimrProto::OK) { - TActivationContext::Send(Response.release()); - } else { - Self->BlocksManager->OnBlockCommitted(TabletId, BlockedGeneration, NodeId, std::move(Response)); - } - } - }; - - class TBlockProcessorActor : public TActorBootstrapped<TBlockProcessorActor> { - TBlobDepot* const Self; - const ui64 TabletId; - const ui32 BlockedGeneration; - const ui32 NodeId; - std::unique_ptr<IEventHandle> Response; - ui32 BlocksPending = 0; - ui32 RetryCount = 0; - const ui64 IssuerGuid = RandomNumber<ui64>() | 1; - THashSet<ui32> NodesWaitingForPushResult; - - public: - TBlockProcessorActor(TBlobDepot *self, ui64 tabletId, ui32 blockedGeneration, ui32 nodeId, - std::unique_ptr<IEventHandle> response) - : Self(self) - , TabletId(tabletId) - , BlockedGeneration(blockedGeneration) - , NodeId(nodeId) - , Response(std::move(response)) - {} - - void Bootstrap() { - IssueNotificationsToAgents(); - Become(&TThis::StateFunc, AgentsWaitTime, new TEvents::TEvWakeup); + block.BlockedGeneration = BlockedGeneration; + + // and persist it + NIceDb::TNiceDb db(txc.DB); + db.Table<Schema::Blocks>().Key(TabletId).Update( + NIceDb::TUpdate<Schema::Blocks::BlockedGeneration>(BlockedGeneration), + NIceDb::TUpdate<Schema::Blocks::IssuedByNode>(NodeId), + NIceDb::TUpdate<Schema::Blocks::IssueTimestamp>(Timestamp) + ); } + return true; + } - void IssueNotificationsToAgents() { - const TMonotonic now = TActivationContext::Monotonic(); - auto& block = Self->BlocksManager->Blocks[TabletId]; - for (const auto& [agentId, info] : block.PerAgentInfo) { - if (agentId == NodeId) { - // skip the origin agent - continue; - } - if (info.ExpirationTimestamp <= now) { - SendPushToAgent(agentId); - } - } + void Complete(const TActorContext&) override { + if (Response->Get<TEvBlobDepot::TEvBlockResult>()->Record.GetStatus() != NKikimrProto::OK) { + TActivationContext::Send(Response.release()); + } else { + Self->BlocksManager->OnBlockCommitted(TabletId, BlockedGeneration, NodeId, std::move(Response)); } + } + }; - void SendPushToAgent(ui32 agentId) { - auto ev = std::make_unique<TEvBlobDepot::TEvPushNotify>(); - auto *item = ev->Record.AddBlockedTablets(); - item->SetTabletId(TabletId); - item->SetBlockedGeneration(BlockedGeneration); - - TAgentInfo& agent = Self->GetAgent(agentId); - if (const auto& actorId = agent.ConnectedAgent) { - Send(*actorId, ev.release(), IEventHandle::FlagTrackDelivery, IssuerGuid); - } - NodesWaitingForPushResult.insert(agentId); - } + class TBlobDepot::TBlocksManager::TBlockProcessorActor : public TActorBootstrapped<TBlockProcessorActor> { + TBlobDepot* const Self; + const ui64 TabletId; + const ui32 BlockedGeneration; + const ui32 NodeId; + std::unique_ptr<IEventHandle> Response; + ui32 BlocksPending = 0; + ui32 RetryCount = 0; + const ui64 IssuerGuid = RandomNumber<ui64>() | 1; + THashSet<ui32> NodesWaitingForPushResult; - void Handle(TEvBlobDepot::TEvPushNotifyResult::TPtr ev) { - const ui32 agentId = ev->Sender.NodeId(); - const size_t numErased = NodesWaitingForPushResult.erase(agentId); - Y_VERIFY(numErased == 1 && ev->Cookie == IssuerGuid); + public: + TBlockProcessorActor(TBlobDepot *self, ui64 tabletId, ui32 blockedGeneration, ui32 nodeId, + std::unique_ptr<IEventHandle> response) + : Self(self) + , TabletId(tabletId) + , BlockedGeneration(blockedGeneration) + , NodeId(nodeId) + , Response(std::move(response)) + {} - // mark lease as successfully revoked one - auto& block = Self->BlocksManager->Blocks[TabletId]; - block.PerAgentInfo.erase(agentId); + void Bootstrap() { + IssueNotificationsToAgents(); + Become(&TThis::StateFunc, AgentsWaitTime, new TEvents::TEvWakeup); + } - if (NodesWaitingForPushResult.empty()) { - Finish(); + void IssueNotificationsToAgents() { + const TMonotonic now = TActivationContext::Monotonic(); + auto& block = Self->BlocksManager->Blocks[TabletId]; + for (const auto& [agentId, info] : block.PerAgentInfo) { + if (agentId == NodeId) { + // skip the origin agent + continue; } - } - - void Handle(TEvents::TEvUndelivered::TPtr /*ev*/) { - // can't reach an agent to notify it about blocked generation change -- we can't do anything here - } - - void IssueBlocksToStorage() { - for (const auto& [_, kind] : Self->ChannelKinds) { - for (const auto& [channel, groupId] : kind.ChannelGroups) { - // FIXME: consider previous group generations (because agent can write in obsolete tablet generation) - // !!!!!!!!!!! - SendBlock(groupId); - ++BlocksPending; - RetryCount += 2; - } + if (info.ExpirationTimestamp <= now) { + SendPushToAgent(agentId); } } + } - void SendBlock(ui32 groupId) { - SendToBSProxy(SelfId(), groupId, new TEvBlobStorage::TEvBlock(TabletId, BlockedGeneration, - TInstant::Max(), IssuerGuid), groupId); - } + void SendPushToAgent(ui32 agentId) { + auto ev = std::make_unique<TEvBlobDepot::TEvPushNotify>(); + auto *item = ev->Record.AddBlockedTablets(); + item->SetTabletId(TabletId); + item->SetBlockedGeneration(BlockedGeneration); - void Handle(TEvBlobStorage::TEvBlockResult::TPtr ev) { - switch (ev->Get()->Status) { - case NKikimrProto::OK: - if (!--BlocksPending) { - Finish(); - } - break; - - case NKikimrProto::ALREADY: - // race, but this is not possible in current implementation - Y_FAIL(); - - case NKikimrProto::ERROR: - default: - if (!--RetryCount) { - auto& r = Response->Get<TEvBlobDepot::TEvBlockResult>()->Record; - r.SetStatus(NKikimrProto::ERROR); - r.SetErrorReason(ev->Get()->ErrorReason); - Finish(); - } else { - SendBlock(ev->Cookie); - } - break; - } + TAgent& agent = Self->GetAgent(agentId); + if (const auto& actorId = agent.ConnectedAgent) { + Send(*actorId, ev.release(), IEventHandle::FlagTrackDelivery, IssuerGuid); } + NodesWaitingForPushResult.insert(agentId); + } - void Finish() { - TActivationContext::Send(Response.release()); - PassAway(); - } + void Handle(TEvBlobDepot::TEvPushNotifyResult::TPtr ev) { + const ui32 agentId = ev->Sender.NodeId(); + const size_t numErased = NodesWaitingForPushResult.erase(agentId); + Y_VERIFY(numErased == 1 && ev->Cookie == IssuerGuid); - STRICT_STFUNC(StateFunc, - hFunc(TEvBlobStorage::TEvBlockResult, Handle); - hFunc(TEvBlobDepot::TEvPushNotifyResult, Handle); - hFunc(TEvents::TEvUndelivered, Handle); - cFunc(TEvents::TSystem::Wakeup, IssueBlocksToStorage); - cFunc(TEvents::TSystem::Poison, PassAway); - ) - }; + // mark lease as successfully revoked one + auto& block = Self->BlocksManager->Blocks[TabletId]; + block.PerAgentInfo.erase(agentId); - public: - TBlocksManager(TBlobDepot *self) - : Self(self) - {} - - void AddBlockOnLoad(ui64 tabletId, ui32 generation) { - Blocks[tabletId].BlockedGeneration = generation; + if (NodesWaitingForPushResult.empty()) { + Finish(); + } } - void OnBlockCommitted(ui64 tabletId, ui32 blockedGeneration, ui32 nodeId, std::unique_ptr<IEventHandle> response) { - Self->RegisterWithSameMailbox(new TBlockProcessorActor(Self, tabletId, blockedGeneration, nodeId, - std::move(response))); + void Handle(TEvents::TEvUndelivered::TPtr /*ev*/) { + // can't reach an agent to notify it about blocked generation change -- we can't do anything here } - void Handle(TEvBlobDepot::TEvBlock::TPtr ev) { - const auto& record = ev->Get()->Record; - auto [response, responseRecord] = TEvBlobDepot::MakeResponseFor(*ev, Self->SelfId(), NKikimrProto::OK, - std::nullopt, BlockLeaseTime.MilliSeconds()); - - if (!record.HasTabletId() || !record.HasBlockedGeneration()) { - responseRecord->SetStatus(NKikimrProto::ERROR); - responseRecord->SetErrorReason("incorrect protobuf"); - } else { - const ui64 tabletId = record.GetTabletId(); - const ui32 blockedGeneration = record.GetBlockedGeneration(); - if (const auto it = Blocks.find(tabletId); it != Blocks.end() && blockedGeneration <= it->second.BlockedGeneration) { - responseRecord->SetStatus(NKikimrProto::ALREADY); - } else { - TAgentInfo& agent = Self->GetAgent(ev->Recipient); - Self->Execute(std::make_unique<TTxUpdateBlock>(Self, tabletId, blockedGeneration, - agent.ConnectedNodeId, TActivationContext::Now(), std::move(response))); + void IssueBlocksToStorage() { + for (const auto& [_, kind] : Self->ChannelKinds) { + for (const auto& [channel, groupId] : kind.ChannelGroups) { + // FIXME: consider previous group generations (because agent can write in obsolete tablet generation) + // !!!!!!!!!!! + SendBlock(groupId); + ++BlocksPending; + RetryCount += 2; } } - - TActivationContext::Send(response.release()); // not sent if the request got processed and response now is nullptr } - void Handle(TEvBlobDepot::TEvQueryBlocks::TPtr ev) { - TAgentInfo& agent = Self->GetAgent(ev->Recipient); - const ui32 agentId = agent.ConnectedNodeId; - Y_VERIFY(agentId); - - const TMonotonic now = TActivationContext::Monotonic(); - - const auto& record = ev->Get()->Record; - auto [response, responseRecord] = TEvBlobDepot::MakeResponseFor(*ev, Self->SelfId()); - responseRecord->SetTimeToLiveMs(BlockLeaseTime.MilliSeconds()); + void SendBlock(ui32 groupId) { + SendToBSProxy(SelfId(), groupId, new TEvBlobStorage::TEvBlock(TabletId, BlockedGeneration, + TInstant::Max(), IssuerGuid), groupId); + } - for (const ui64 tabletId : record.GetTabletIds()) { - auto& block = Blocks[tabletId]; - responseRecord->AddBlockedGenerations(block.BlockedGeneration); - block.PerAgentInfo[agentId].ExpirationTimestamp = now + BlockLeaseTime; + void Handle(TEvBlobStorage::TEvBlockResult::TPtr ev) { + switch (ev->Get()->Status) { + case NKikimrProto::OK: + if (!--BlocksPending) { + Finish(); + } + break; + + case NKikimrProto::ALREADY: + // race, but this is not possible in current implementation + Y_FAIL(); + + case NKikimrProto::ERROR: + default: + if (!--RetryCount) { + auto& r = Response->Get<TEvBlobDepot::TEvBlockResult>()->Record; + r.SetStatus(NKikimrProto::ERROR); + r.SetErrorReason(ev->Get()->ErrorReason); + Finish(); + } else { + SendBlock(ev->Cookie); + } + break; } + } - TActivationContext::Send(response.release()); + void Finish() { + TActivationContext::Send(Response.release()); + PassAway(); } - }; - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // TBlocksManager wrapper + STRICT_STFUNC(StateFunc, + hFunc(TEvBlobStorage::TEvBlockResult, Handle); + hFunc(TEvBlobDepot::TEvPushNotifyResult, Handle); + hFunc(TEvents::TEvUndelivered, Handle); + cFunc(TEvents::TSystem::Wakeup, IssueBlocksToStorage); + cFunc(TEvents::TSystem::Poison, PassAway); + ) + }; - TBlobDepot::TBlocksManagerPtr TBlobDepot::CreateBlocksManager() { - return {new TBlocksManager{this}, std::default_delete<TBlocksManager>{}}; + void TBlobDepot::TBlocksManager::AddBlockOnLoad(ui64 tabletId, ui32 generation) { + Blocks[tabletId].BlockedGeneration = generation; } - void TBlobDepot::AddBlockOnLoad(ui64 tabletId, ui32 generation) { - BlocksManager->AddBlockOnLoad(tabletId, generation); + void TBlobDepot::TBlocksManager::OnBlockCommitted(ui64 tabletId, ui32 blockedGeneration, ui32 nodeId, std::unique_ptr<IEventHandle> response) { + Self->RegisterWithSameMailbox(new TBlockProcessorActor(Self, tabletId, blockedGeneration, nodeId, + std::move(response))); } - void TBlobDepot::Handle(TEvBlobDepot::TEvBlock::TPtr ev) { - return BlocksManager->Handle(ev); + void TBlobDepot::TBlocksManager::Handle(TEvBlobDepot::TEvBlock::TPtr ev) { + const auto& record = ev->Get()->Record; + auto [response, responseRecord] = TEvBlobDepot::MakeResponseFor(*ev, Self->SelfId(), NKikimrProto::OK, + std::nullopt, BlockLeaseTime.MilliSeconds()); + + if (!record.HasTabletId() || !record.HasBlockedGeneration()) { + responseRecord->SetStatus(NKikimrProto::ERROR); + responseRecord->SetErrorReason("incorrect protobuf"); + } else { + const ui64 tabletId = record.GetTabletId(); + const ui32 blockedGeneration = record.GetBlockedGeneration(); + if (const auto it = Blocks.find(tabletId); it != Blocks.end() && blockedGeneration <= it->second.BlockedGeneration) { + responseRecord->SetStatus(NKikimrProto::ALREADY); + } else { + TAgent& agent = Self->GetAgent(ev->Recipient); + Self->Execute(std::make_unique<TTxUpdateBlock>(Self, tabletId, blockedGeneration, + agent.ConnectedNodeId, TActivationContext::Now(), std::move(response))); + } + } + + TActivationContext::Send(response.release()); // not sent if the request got processed and response now is nullptr } - void TBlobDepot::Handle(TEvBlobDepot::TEvQueryBlocks::TPtr ev) { - return BlocksManager->Handle(ev); + void TBlobDepot::TBlocksManager::Handle(TEvBlobDepot::TEvQueryBlocks::TPtr ev) { + TAgent& agent = Self->GetAgent(ev->Recipient); + const ui32 agentId = agent.ConnectedNodeId; + Y_VERIFY(agentId); + + const TMonotonic now = TActivationContext::Monotonic(); + + const auto& record = ev->Get()->Record; + auto [response, responseRecord] = TEvBlobDepot::MakeResponseFor(*ev, Self->SelfId()); + responseRecord->SetTimeToLiveMs(BlockLeaseTime.MilliSeconds()); + + for (const ui64 tabletId : record.GetTabletIds()) { + auto& block = Blocks[tabletId]; + responseRecord->AddBlockedGenerations(block.BlockedGeneration); + block.PerAgentInfo[agentId].ExpirationTimestamp = now + BlockLeaseTime; + } + + TActivationContext::Send(response.release()); } } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/blocks.h b/ydb/core/blob_depot/blocks.h new file mode 100644 index 00000000000..bd3aba38997 --- /dev/null +++ b/ydb/core/blob_depot/blocks.h @@ -0,0 +1,42 @@ +#pragma once + +#include "defs.h" +#include "blob_depot_tablet.h" + +namespace NKikimr::NBlobDepot { + + class TBlobDepot::TBlocksManager { + // wait duration before issuing blocks via storage + static constexpr TDuration AgentsWaitTime = TDuration::Seconds(1); + + // TTL for block lease + static constexpr TDuration BlockLeaseTime = TDuration::Seconds(60); + + struct TBlock { + struct TPerAgentInfo { + TMonotonic ExpirationTimestamp = TMonotonic::Zero(); + }; + + ui32 BlockedGeneration = 0; + THashMap<ui32, TPerAgentInfo> PerAgentInfo; + }; + + TBlobDepot* const Self; + THashMap<ui64, TBlock> Blocks; + + private: + class TTxUpdateBlock; + class TBlockProcessorActor; + + public: + TBlocksManager(TBlobDepot *self) + : Self(self) + {} + + void AddBlockOnLoad(ui64 tabletId, ui32 generation); + void OnBlockCommitted(ui64 tabletId, ui32 blockedGeneration, ui32 nodeId, std::unique_ptr<IEventHandle> response); + void Handle(TEvBlobDepot::TEvBlock::TPtr ev); + void Handle(TEvBlobDepot::TEvQueryBlocks::TPtr ev); + }; + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp index 1ecc15a9133..f2752fc8ee4 100644 --- a/ydb/core/blob_depot/data.cpp +++ b/ydb/core/blob_depot/data.cpp @@ -1,150 +1,283 @@ -#include "blob_depot_tablet.h" +#include "data.h" +#include "schema.h" namespace NKikimr::NBlobDepot { - class TBlobDepot::TDataManager { - TBlobDepot* const Self; + class TBlobDepot::TData::TTxConfirmGC : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { + const ui8 Channel; + const ui32 GroupId; + std::vector<TLogoBlobID> TrashDeleted; + const ui64 ConfirmedGenStep; - struct TCompareKey { - bool operator ()(const TString& x, const TString& y) const { return x < y; } - bool operator ()(const TStringBuf& x, const TString& y) const { return x < y; } - bool operator ()(const TString& x, const TStringBuf& y) const { return x < y; } + static constexpr ui32 MaxKeysToProcessAtOnce = 10'000; - using is_transparent = void; - }; + public: + TTxConfirmGC(TBlobDepot *self, ui8 channel, ui32 groupId, std::vector<TLogoBlobID> trashDeleted, + ui64 confirmedGenStep) + : TTransactionBase(self) + , Channel(channel) + , GroupId(groupId) + , TrashDeleted(std::move(trashDeleted)) + , ConfirmedGenStep(confirmedGenStep) + {} - std::map<TString, TDataValue, TCompareKey> Data; - std::set<TLogoBlobID> DataBlobIds; + bool Execute(TTransactionContext& txc, const TActorContext&) override { + NIceDb::TNiceDb db(txc.DB); - public: - TDataManager(TBlobDepot *self) - : Self(self) - { - (void)Self; + for (ui32 i = 0; i < TrashDeleted.size() && i < MaxKeysToProcessAtOnce; ++i) { + db.Table<Schema::Trash>().Key(TKey(TrashDeleted[i]).MakeBinaryKey()).Delete(); + } + if (TrashDeleted.size() <= MaxKeysToProcessAtOnce) { + TrashDeleted.clear(); + db.Table<Schema::ConfirmedGC>().Key(Channel, GroupId).Update<Schema::ConfirmedGC::ConfirmedGenStep>( + ConfirmedGenStep); + } else { + std::vector<TLogoBlobID> temp; + temp.insert(temp.end(), TrashDeleted.begin() + MaxKeysToProcessAtOnce, TrashDeleted.end()); + temp.swap(TrashDeleted); + } + + return true; } - std::optional<TDataValue> FindKey(TStringBuf key) { - const auto it = Data.find(key); - return it != Data.end() ? std::make_optional(it->second) : std::nullopt; + void Complete(const TActorContext&) override { + if (TrashDeleted.empty()) { + Self->Data->OnCommitConfirmedGC(Channel, GroupId); + } else { // resume transaction + Self->Execute(std::make_unique<TTxConfirmGC>(Self, Channel, GroupId, std::move(TrashDeleted), ConfirmedGenStep)); + } } + }; - void ScanRange(const std::optional<TStringBuf>& begin, const std::optional<TStringBuf>& end, - TScanFlags flags, const std::function<bool(TStringBuf, const TDataValue&)>& callback) { - auto beginIt = !begin ? Data.begin() - : flags & EScanFlags::INCLUDE_BEGIN ? Data.lower_bound(*begin) - : Data.upper_bound(*begin); - - auto endIt = !end ? Data.end() - : flags & EScanFlags::INCLUDE_END ? Data.upper_bound(*end) - : Data.lower_bound(*end); - - if (flags & EScanFlags::REVERSE) { - if (beginIt != endIt) { - --endIt; - do { - auto& current = *endIt--; - if (!callback(current.first, current.second)) { - break; - } - } while (beginIt != endIt); - } - } else { - while (beginIt != endIt) { - auto& current = *beginIt++; - if (!callback(current.first, current.second)) { - break; - } - } + std::optional<TBlobDepot::TData::TValue> TBlobDepot::TData::FindKey(const TKey& key) { + const auto it = Data.find(key); + return it != Data.end() ? std::make_optional(it->second) : std::nullopt; + } + + TBlobDepot::TData::TRecordsPerChannelGroup& TBlobDepot::TData::GetRecordsPerChannelGroup(TLogoBlobID id) { + TTabletStorageInfo *info = Self->Info(); + const ui32 groupId = info->GroupFor(id.Channel(), id.Generation()); + Y_VERIFY(groupId != Max<ui32>()); + const auto& key = std::make_tuple(id.TabletID(), id.Channel(), groupId); + const auto [it, _] = RecordsPerChannelGroup.try_emplace(key, id.TabletID(), id.Channel(), groupId); + return it->second; + } + + void TBlobDepot::TData::AddDataOnLoad(TKey key, TString value) { + NKikimrBlobDepot::TValue proto; + const bool success = proto.ParseFromString(value); + Y_VERIFY(success); + PutKey(std::move(key), { + .Meta = proto.GetMeta(), + .ValueChain = std::move(*proto.MutableValueChain()), + .KeepState = proto.GetKeepState(), + .Public = proto.GetPublic(), + }); + } + + void TBlobDepot::TData::AddTrashOnLoad(TLogoBlobID id) { + auto& record = GetRecordsPerChannelGroup(id); + record.Trash.insert(id); + OnTrashInserted(record); + } + + void TBlobDepot::TData::AddConfirmedGenStepOnLoad(ui8 channel, ui32 groupId, ui64 confirmedGenStep) { + const auto& key = std::make_tuple(Self->TabletID(), channel, groupId); + const auto [it, _] = RecordsPerChannelGroup.try_emplace(key, Self->TabletID(), channel, groupId); + auto& record = it->second; + record.LastConfirmedGenStep = confirmedGenStep; + } + + void TBlobDepot::TData::PutKey(TKey key, TValue&& data) { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT01, "PutKey", (TabletId, Self->TabletID()), (Key, key.ToString(Self->Config)), + (KeepState, NKikimrBlobDepot::EKeepState_Name(data.KeepState))); + + EnumerateBlobsForValueChain(data.ValueChain, Self->TabletID(), [&](TLogoBlobID id) { + if (!RefCount[id]++) { + // first mention of this id + auto& record = GetRecordsPerChannelGroup(id); + const auto [_, inserted] = record.Used.insert(id); + Y_VERIFY(inserted); } + }); + + Data[std::move(key)] = std::move(data); + } + + void TBlobDepot::TData::OnTrashInserted(TRecordsPerChannelGroup& record) { + if (!record.CollectGarbageRequestInFlight && record.TabletId == Self->TabletID()) { + RecordsWithTrash.PushBack(&record); } + } - void DeleteKey(TStringBuf key) { - Data.erase(TString(key)); + std::optional<TString> TBlobDepot::TData::UpdateKeepState(TKey key, NKikimrBlobDepot::EKeepState keepState) { + const auto it = Data.find(key); + if (it != Data.end() && keepState <= it->second.KeepState) { + return std::nullopt; } + auto& value = Data[std::move(key)]; + value.KeepState = keepState; + return ToValueProto(value); + } - void PutKey(TString key, TDataValue&& data) { - auto getKeyString = [&] { - if (Self->Config.GetOperationMode() == NKikimrBlobDepot::VirtualGroup) { - return TLogoBlobID(reinterpret_cast<const ui64*>(key.data())).ToString(); - } else { - return key; - } - }; - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT01, "PutKey", (TabletId, Self->TabletID()), (Key, EscapeC(getKeyString())), - (KeepState, NKikimrBlobDepot::EKeepState_Name(data.KeepState))); + void TBlobDepot::TData::DeleteKey(const TKey& key, const std::function<void(TLogoBlobID)>& updateTrash, void *cookie) { + const auto it = Data.find(key); + Y_VERIFY(it != Data.end()); + TValue& value = it->second; + EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), [&](TLogoBlobID id) { + const auto it = RefCount.find(id); + Y_VERIFY(it != RefCount.end()); + if (!--it->second) { + InFlightTrash.emplace(cookie, id); + RefCount.erase(it); + updateTrash(id); + } + }); + Data.erase(it); + } - Data[std::move(key)] = std::move(data); + void TBlobDepot::TData::CommitTrash(void *cookie) { + auto range = InFlightTrash.equal_range(cookie); + for (auto it = range.first; it != range.second; ++it) { + auto& record = GetRecordsPerChannelGroup(it->second); + const auto usedIt = record.Used.find(it->second); + Y_VERIFY(usedIt != record.Used.end()); + record.Trash.insert(record.Used.extract(usedIt)); + OnTrashInserted(record); } + InFlightTrash.erase(range.first, range.second); + } - void AddDataOnLoad(TString key, TString value) { - NKikimrBlobDepot::TValue proto; - const bool success = proto.ParseFromString(value); - Y_VERIFY(success); - PutKey(std::move(key), { - .Meta = proto.GetMeta(), - .ValueChain = std::move(*proto.MutableValueChain()), - .KeepState = proto.GetKeepState(), - .Public = proto.GetPublic(), - }); + TString TBlobDepot::TData::ToValueProto(const TValue& value) { + NKikimrBlobDepot::TValue proto; + if (value.Meta) { + proto.SetMeta(value.Meta); + } + proto.MutableValueChain()->CopyFrom(value.ValueChain); + if (proto.GetKeepState() != value.KeepState) { + proto.SetKeepState(value.KeepState); + } + if (proto.GetPublic() != value.Public) { + proto.SetPublic(value.Public); } - std::optional<TString> UpdateKeepState(TStringBuf key, NKikimrBlobDepot::EKeepState keepState) { - auto& value = Data[TString(key)]; - if (value.KeepState < keepState) { - value.KeepState = keepState; - return ToValueProto(value); + TString s; + const bool success = proto.SerializeToString(&s); + Y_VERIFY(success); + return s; + } + + void TBlobDepot::TData::HandleTrash() { + const ui32 generation = Self->Executor()->Generation(); + + for (TRecordsPerChannelGroup& record : RecordsWithTrash) { + Y_VERIFY(!record.CollectGarbageRequestInFlight); + Y_VERIFY(record.TabletId == Self->TabletID()); + Y_VERIFY(!record.Trash.empty()); + + ui64 nextGenStep = 0; + + auto& channel = Self->PerChannelRecords[record.Channel]; + if (channel.GivenStepIndex.empty()) { + nextGenStep = GenStep(*--record.Trash.end()); } else { - return std::nullopt; + const auto& [leastStep, leastIndex] = *channel.GivenStepIndex.begin(); + const TLogoBlobID maxId(record.TabletId, generation, leastStep, record.Channel, 0, 0); + const auto it = record.Trash.lower_bound(maxId); + if (it != record.Trash.begin()) { + nextGenStep = GenStep(*std::prev(it)); + } + } + + auto keep = std::make_unique<TVector<TLogoBlobID>>(); + auto doNotKeep = std::make_unique<TVector<TLogoBlobID>>(); + + // FIXME: check for blob leaks when LastConfirmedGenStep is not properly persisted + for (auto it = record.Trash.begin(); it != record.Trash.end() && GenStep(*it) <= record.LastConfirmedGenStep; ++it) { + doNotKeep->push_back(*it); } - } - static TString ToValueProto(const TDataValue& value) { - NKikimrBlobDepot::TValue proto; - if (value.Meta) { - proto.SetMeta(value.Meta); + // FIXME: check for blob loss when LastConfirmedGenStep is not properly persisted + const TLogoBlobID keepFrom(record.TabletId, ui32(record.LastConfirmedGenStep >> 32), + ui32(record.LastConfirmedGenStep), record.Channel, 0, 0); + for (auto it = record.Used.upper_bound(keepFrom); it != record.Used.end() && GenStep(*it) <= nextGenStep; ++it) { + keep->push_back(*it); } - proto.MutableValueChain()->CopyFrom(value.ValueChain); - if (proto.GetKeepState() != value.KeepState) { - proto.SetKeepState(value.KeepState); + + if (keep->empty()) { + keep.reset(); } - if (proto.GetPublic() != value.Public) { - proto.SetPublic(value.Public); + if (doNotKeep->empty()) { + doNotKeep.reset(); } + const bool collect = nextGenStep > record.LastConfirmedGenStep; - TString s; - const bool success = proto.SerializeToString(&s); - Y_VERIFY(success); - return s; - } - }; + if (!keep && !doNotKeep && !collect) { + continue; // skip this one + } - TBlobDepot::TDataManagerPtr TBlobDepot::CreateDataManager() { - return {new TDataManager{this}, std::default_delete<TDataManager>{}}; - } + auto ev = std::make_unique<TEvBlobStorage::TEvCollectGarbage>(record.TabletId, generation, + record.PerGenerationCounter, record.Channel, collect, ui32(nextGenStep >> 32), ui32(nextGenStep), + keep.get(), doNotKeep.get(), TInstant::Max(), true); + keep.release(); + doNotKeep.release(); - std::optional<TBlobDepot::TDataValue> TBlobDepot::FindKey(TStringBuf key) { - return DataManager->FindKey(key); - } + record.CollectGarbageRequestInFlight = true; + record.PerGenerationCounter += ev->Collect ? ev->PerGenerationCounterStepSize() : 0; + record.TrashInFlight.insert(record.TrashInFlight.end(), record.Trash.begin(), record.Trash.end()); + record.NextGenStep = Max(nextGenStep, record.LastConfirmedGenStep); - void TBlobDepot::ScanRange(const std::optional<TStringBuf>& begin, const std::optional<TStringBuf>& end, - TScanFlags flags, const std::function<bool(TStringBuf, const TDataValue&)>& callback) { - return DataManager->ScanRange(begin, end, flags, callback); - } + auto& kind = Self->ChannelKinds[Self->ChannelToKind[record.Channel]]; + const auto blobSeqId = TBlobSeqId::FromBinary(generation, kind, kind.NextBlobSeqId); + Y_VERIFY(record.LastConfirmedGenStep < GenStep(generation, blobSeqId.Step)); + if (GenStep(generation, blobSeqId.Step) <= nextGenStep) { + kind.NextBlobSeqId = TBlobSeqId{record.Channel, generation, ui32(nextGenStep) + 1, 0}.ToBinary(kind); + } - void TBlobDepot::DeleteKey(TStringBuf key) { - DataManager->DeleteKey(key); - } + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT01, "issuing TEvCollectGarbage", (TabletId, Self->TabletID()), + (Channel, record.Channel), (GroupId, record.GroupId), (Msg, ev->ToString()), + (LastConfirmedGenStep, record.LastConfirmedGenStep), (NextGenStep, record.NextGenStep), + (TrashInFlight.size, record.TrashInFlight.size())); + + SendToBSProxy(Self->SelfId(), record.GroupId, ev.release(), record.GroupId); + } - void TBlobDepot::PutKey(TString key, TDataValue&& data) { - DataManager->PutKey(std::move(key), std::move(data)); + RecordsWithTrash.Clear(); } - void TBlobDepot::AddDataOnLoad(TString key, TString value) { - DataManager->AddDataOnLoad(std::move(key), std::move(value)); + void TBlobDepot::TData::Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev) { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT01, "TEvCollectGarbageResult", (TabletId, ev->Get()->TabletId), + (Channel, ev->Get()->Channel), (GroupId, ev->Cookie), (Msg, ev->Get()->ToString())); + const auto& key = std::make_tuple(ev->Get()->TabletId, ev->Get()->Channel, ev->Cookie); + const auto it = RecordsPerChannelGroup.find(key); + Y_VERIFY(it != RecordsPerChannelGroup.end()); + auto& record = it->second; + Y_VERIFY(record.CollectGarbageRequestInFlight); + if (ev->Get()->Status == NKikimrProto::OK) { + for (const TLogoBlobID& id : record.TrashInFlight) { // make it merge + record.Trash.erase(id); + } + record.LastConfirmedGenStep = record.NextGenStep; + Self->Execute(std::make_unique<TTxConfirmGC>(Self, record.Channel, record.GroupId, + std::exchange(record.TrashInFlight, {}), record.LastConfirmedGenStep)); + } else { + record.CollectGarbageRequestInFlight = false; + OnTrashInserted(record); + HandleTrash(); + } } - std::optional<TString> TBlobDepot::UpdateKeepState(TStringBuf key, NKikimrBlobDepot::EKeepState keepState) { - return DataManager->UpdateKeepState(key, keepState); + void TBlobDepot::TData::OnCommitConfirmedGC(ui8 channel, ui32 groupId) { + const auto& key = std::make_tuple(Self->TabletID(), channel, groupId); + const auto it = RecordsPerChannelGroup.find(key); + Y_VERIFY(it != RecordsPerChannelGroup.end()); + auto& record = it->second; + Y_VERIFY(record.CollectGarbageRequestInFlight); + record.CollectGarbageRequestInFlight = false; + if (!record.Trash.empty()) { + OnTrashInserted(record); + HandleTrash(); + } } } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h new file mode 100644 index 00000000000..22515c82731 --- /dev/null +++ b/ydb/core/blob_depot/data.h @@ -0,0 +1,344 @@ +#pragma once + +#include "defs.h" +#include "blob_depot_tablet.h" + +namespace NKikimr::NBlobDepot { + + class TBlobDepot::TData { + TBlobDepot* const Self; + + public: + class alignas(TString) TKey { + union { + ui64 Raw64[4]; + ui8 Bytes[32]; + char String[31]; + struct { + ui8 Padding[31]; + ui8 Type; + }; + } Data; + + static constexpr size_t TypeLenByteIdx = 31; + static constexpr size_t MaxInlineStringLen = TypeLenByteIdx; + static constexpr char BlobIdType = 32; + static constexpr char StringType = 33; + + public: + TKey() { + Reset(); + } + + explicit TKey(TLogoBlobID id) { + Data.Type = BlobIdType; + reinterpret_cast<TLogoBlobID&>(Data.Bytes) = id; + } + + explicit TKey(TStringBuf value) { + if (value.size() <= MaxInlineStringLen) { + Data.Type = EncodeInlineStringLenAsTypeByte(value.size()); + memcpy(Data.String, value.data(), value.size()); + Data.String[value.size()] = 0; + } else { + Data.Type = StringType; + new(Data.Bytes) TString(value); + } + } + + explicit TKey(TString value) { + if (value.size() <= MaxInlineStringLen) { + Data.Type = EncodeInlineStringLenAsTypeByte(value.size()); + memcpy(Data.String, value.data(), value.size()); + Data.String[value.size()] = 0; + } else { + Data.Type = StringType; + new(Data.Bytes) TString(std::move(value)); + } + } + + TKey(const TKey& other) { + if (other.Data.Type == StringType) { + Data.Type = StringType; + new(Data.Bytes) TString(other.GetString()); + } else { + Data = other.Data; + } + } + + TKey(TKey&& other) { + if (other.Data.Type == StringType) { + Data.Type = StringType; + new(Data.Bytes) TString(std::move(other.GetString())); + other.Reset(); + } else { + Data = other.Data; + } + } + + ~TKey() { + Reset(); + } + + TKey& operator =(const TKey& other) { + if (this != &other) { + if (Data.Type == StringType && other.Data.Type == StringType) { + GetString() = other.GetString(); + } else if (Data.Type == StringType) { + GetString().~TString(); + Data = other.Data; + } else if (other.Data.Type == StringType) { + Data.Type = StringType; + new(Data.Bytes) TString(other.GetString()); + } else { + Data = other.Data; + } + } + return *this; + } + + TKey& operator =(TKey&& other) { + if (this != &other) { + if (Data.Type == StringType && other.Data.Type == StringType) { + GetString() = std::move(other.GetString()); + other.Reset(); + } else if (Data.Type == StringType) { + GetString().~TString(); + Data = other.Data; + } else if (other.Data.Type == StringType) { + Data.Type = StringType; + new(Data.Bytes) TString(std::move(other.GetString())); + other.Reset(); + } else { + Data = other.Data; + } + } + return *this; + } + + std::variant<TLogoBlobID, TStringBuf> AsVariant() const { + if (Data.Type == BlobIdType) { + return GetBlobId(); + } else { + return GetStringBuf(); + } + } + + TString MakeBinaryKey() const { + if (Data.Type == BlobIdType) { + return TString(GetBlobId().AsBinaryString()); + } else { + return TString(GetStringBuf()); + } + } + + static TKey FromBinaryKey(const TStringBuf& key, const NKikimrBlobDepot::TBlobDepotConfig& config) { + if (config.GetOperationMode() == NKikimrBlobDepot::EOperationMode::VirtualGroup) { + Y_VERIFY(key.size() == 3 * sizeof(ui64)); + return TKey(TLogoBlobID(reinterpret_cast<const ui64*>(key.data()))); + } else { + return TKey(key); + } + } + + TString ToString(const NKikimrBlobDepot::TBlobDepotConfig& config) const { + TStringStream s; + Output(s, config); + return s.Str(); + } + + void Output(IOutputStream& s, const NKikimrBlobDepot::TBlobDepotConfig& config) const { + if (config.GetOperationMode() == NKikimrBlobDepot::EOperationMode::VirtualGroup) { + s << GetBlobId(); + } else { + s << EscapeC(GetStringBuf()); + } + } + + static int Compare(const TKey& x, const TKey& y) { + if (x.Data.Type == BlobIdType && y.Data.Type == BlobIdType) { + return x.GetBlobId() < y.GetBlobId() ? -1 : y.GetBlobId() < x.GetBlobId() ? 1 : 0; + } else if (x.Data.Type == BlobIdType) { + return -1; + } else if (y.Data.Type == BlobIdType) { + return 1; + } else { + const TStringBuf sbx = x.GetStringBuf(); + const TStringBuf sby = y.GetStringBuf(); + return sbx < sby ? -1 : sby < sbx ? 1 : 0; + } + } + + const TLogoBlobID& GetBlobId() const { + Y_VERIFY_DEBUG(Data.Type == BlobIdType); + return reinterpret_cast<const TLogoBlobID&>(Data.Bytes); + } + + friend bool operator ==(const TKey& x, const TKey& y) { return Compare(x, y) == 0; } + friend bool operator !=(const TKey& x, const TKey& y) { return Compare(x, y) != 0; } + friend bool operator < (const TKey& x, const TKey& y) { return Compare(x, y) < 0; } + friend bool operator <=(const TKey& x, const TKey& y) { return Compare(x, y) <= 0; } + friend bool operator > (const TKey& x, const TKey& y) { return Compare(x, y) > 0; } + friend bool operator >=(const TKey& x, const TKey& y) { return Compare(x, y) >= 0; } + + private: + void Reset() { + if (Data.Type == StringType) { + GetString().~TString(); + } + Data.Type = EncodeInlineStringLenAsTypeByte(0); + } + + TStringBuf GetStringBuf() const { + if (Data.Type == StringType) { + return GetString(); + } else { + return TStringBuf(Data.String, DecodeInlineStringLenFromTypeByte(Data.Type)); + } + } + + const TString& GetString() const { + Y_VERIFY_DEBUG(Data.Type == StringType); + return reinterpret_cast<const TString&>(Data.Bytes); + } + + TString& GetString() { + Y_VERIFY_DEBUG(Data.Type == StringType); + return reinterpret_cast<TString&>(Data.Bytes); + } + + static ui8 EncodeInlineStringLenAsTypeByte(size_t len) { + Y_VERIFY_DEBUG(len <= MaxInlineStringLen); + return len == MaxInlineStringLen ? 0 : len ? len : MaxInlineStringLen; + } + + static size_t DecodeInlineStringLenFromTypeByte(ui8 type) { + return EncodeInlineStringLenAsTypeByte(type); + } + }; + + struct TValue { + TString Meta; + TValueChain ValueChain; + NKikimrBlobDepot::EKeepState KeepState; + bool Public; + }; + + enum EScanFlags : ui32 { + INCLUDE_BEGIN = 1, + INCLUDE_END = 2, + REVERSE = 4, + }; + + Y_DECLARE_FLAGS(TScanFlags, EScanFlags) + + private: + struct TRecordWithTrash {}; + + struct TRecordsPerChannelGroup + : TIntrusiveListItem<TRecordsPerChannelGroup, TRecordWithTrash> + { + const ui64 TabletId; + const ui8 Channel; + const ui32 GroupId; + + std::set<TLogoBlobID> Used; + std::set<TLogoBlobID> Trash; // committed trash + std::vector<TLogoBlobID> TrashInFlight; + ui32 PerGenerationCounter = 1; + ui64 LastConfirmedGenStep = 0; + ui64 NextGenStep = 0; + bool CollectGarbageRequestInFlight = false; + + TRecordsPerChannelGroup(ui64 tabletId, ui8 channel, ui32 groupId) + : TabletId(tabletId) + , Channel(channel) + , GroupId(groupId) + {} + }; + + std::map<TKey, TValue> Data; + THashMap<TLogoBlobID, ui32> RefCount; + THashMap<std::tuple<ui64, ui8, ui32>, TRecordsPerChannelGroup> RecordsPerChannelGroup; + TIntrusiveList<TRecordsPerChannelGroup, TRecordWithTrash> RecordsWithTrash; + + THashMultiMap<void*, TLogoBlobID> InFlightTrash; // being committed, but not yet confirmed + + class TTxConfirmGC; + + public: + TData(TBlobDepot *self) + : Self(self) + {} + + std::optional<TValue> FindKey(const TKey& key); + + template<typename TCallback> + void ScanRange(const TKey *begin, const TKey *end, TScanFlags flags, TCallback&& callback) { + auto beginIt = !begin ? Data.begin() + : flags & EScanFlags::INCLUDE_BEGIN ? Data.lower_bound(*begin) + : Data.upper_bound(*begin); + + auto endIt = !end ? Data.end() + : flags & EScanFlags::INCLUDE_END ? Data.upper_bound(*end) + : Data.lower_bound(*end); + + if (flags & EScanFlags::REVERSE) { + if (beginIt != endIt) { + --endIt; + do { + auto& current = *endIt--; + if (!callback(current.first, current.second)) { + break; + } + } while (beginIt != endIt); + } + } else { + while (beginIt != endIt) { + auto& current = *beginIt++; + if (!callback(current.first, current.second)) { + break; + } + } + } + } + + TRecordsPerChannelGroup& GetRecordsPerChannelGroup(TLogoBlobID id); + + void AddDataOnLoad(TKey key, TString value); + void AddTrashOnLoad(TLogoBlobID id); + void AddConfirmedGenStepOnLoad(ui8 channel, ui32 groupId, ui64 confirmedGenStep); + + void PutKey(TKey key, TValue&& data); + + void OnTrashInserted(TRecordsPerChannelGroup& record); + std::optional<TString> UpdateKeepState(TKey key, NKikimrBlobDepot::EKeepState keepState); + void DeleteKey(const TKey& key, const std::function<void(TLogoBlobID)>& updateTrash, void *cookie); + void CommitTrash(void *cookie); + void HandleTrash(); + void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev); + void OnCommitConfirmedGC(ui8 channel, ui32 groupId); + + static TString ToValueProto(const TValue& value); + + template<typename TCallback> + void EnumerateRefCount(TCallback&& callback) { + for (const auto& [key, value] : RefCount) { + callback(key, value); + } + } + + template<typename TCallback> + void EnumerateTrash(TCallback&& callback) { + for (const auto& [key, record] : RecordsPerChannelGroup) { + THashSet<TLogoBlobID> inFlight(record.TrashInFlight.begin(), record.TrashInFlight.end()); + for (const TLogoBlobID& id : record.Trash) { + callback(record.TabletId, record.Channel, record.GroupId, id, inFlight.contains(id)); + } + } + } + }; + + Y_DECLARE_OPERATORS_FOR_FLAGS(TBlobDepot::TData::TScanFlags) + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/defs.h b/ydb/core/blob_depot/defs.h index bdb31cf8618..ce566644ccf 100644 --- a/ydb/core/blob_depot/defs.h +++ b/ydb/core/blob_depot/defs.h @@ -8,4 +8,6 @@ #include <ydb/core/protos/blob_depot.pb.h> #include <ydb/core/util/stlog.h> +#include <library/cpp/monlib/service/pages/templates.h> + #include <util/generic/va_args.h> diff --git a/ydb/core/blob_depot/events.h b/ydb/core/blob_depot/events.h index 01212d9e5fa..4532ec72b8e 100644 --- a/ydb/core/blob_depot/events.h +++ b/ydb/core/blob_depot/events.h @@ -55,7 +55,7 @@ namespace NKikimr { BLOBDEPOT_EVENT_PB(EvApplyConfigResult, TabletId, TxId); BLOBDEPOT_EVENT_PB(EvRegisterAgent, VirtualGroupId); BLOBDEPOT_EVENT_PB_NO_ARGS(EvRegisterAgentResult); - BLOBDEPOT_EVENT_PB(EvAllocateIds, ChannelKind); + BLOBDEPOT_EVENT_PB(EvAllocateIds, ChannelKind, Count); BLOBDEPOT_EVENT_PB(EvAllocateIdsResult, ChannelKind, Generation); BLOBDEPOT_EVENT_PB_NO_ARGS(EvPushNotify); BLOBDEPOT_EVENT_PB_NO_ARGS(EvPushNotifyResult); diff --git a/ydb/core/blob_depot/garbage_collection.cpp b/ydb/core/blob_depot/garbage_collection.cpp index 3bd4da5c743..6d770624dc8 100644 --- a/ydb/core/blob_depot/garbage_collection.cpp +++ b/ydb/core/blob_depot/garbage_collection.cpp @@ -1,196 +1,183 @@ -#include "blob_depot_tablet.h" +#include "garbage_collection.h" #include "schema.h" +#include "data.h" namespace NKikimr::NBlobDepot { - namespace { - static ui64 GenStep(ui32 gen, ui32 step) { - return static_cast<ui64>(gen) << 32 | step; - } - } + class TBlobDepot::TBarrierServer::TTxCollectGarbage : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { + std::optional<TString> Error; - class TBlobDepot::TGarbageCollectionManager { - TBlobDepot* const Self; + std::unique_ptr<TEvBlobDepot::TEvCollectGarbage::THandle> Request; + int KeepIndex = 0; + int DoNotKeepIndex = 0; + ui32 NumKeysProcessed = 0; + bool Done = false; - struct TBarrier { - ui64 LastRecordGenStep = 0; - ui64 Soft = 0; - ui64 Hard = 0; - }; + static constexpr ui32 MaxKeysToProcessAtOnce = 10'000; - THashMap<std::pair<ui64, ui8>, TBarrier> Barriers; - - private: - class TTxCollectGarbage : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { - std::optional<TString> Error; - - std::unique_ptr<TEvBlobDepot::TEvCollectGarbage::THandle> Request; - int KeepIndex = 0; - int DoNotKeepIndex = 0; - ui32 NumKeysProcessed = 0; - bool Done = false; - - static constexpr ui32 MaxKeysToProcessAtOnce = 10'000; - - public: - TTxCollectGarbage(TBlobDepot *self, std::unique_ptr<TEvBlobDepot::TEvCollectGarbage::THandle> request, - ui32 keepIndex = 0, ui32 doNotKeepIndex = 0) - : TTransactionBase(self) - , Request(std::move(request)) - , KeepIndex(keepIndex) - , DoNotKeepIndex(doNotKeepIndex) - {} - - bool Execute(TTransactionContext& txc, const TActorContext&) override { - Validate(); - if (!Error) { - auto& record = Request->Get()->Record; - Done = ProcessFlags(txc, KeepIndex, record.GetKeep(), NKikimrBlobDepot::EKeepState::Keep) - && ProcessFlags(txc, DoNotKeepIndex, record.GetDoNotKeep(), NKikimrBlobDepot::EKeepState::DoNotKeep) - && ProcessBarrier(txc); - } - return true; + public: + TTxCollectGarbage(TBlobDepot *self, std::unique_ptr<TEvBlobDepot::TEvCollectGarbage::THandle> request, + ui32 keepIndex = 0, ui32 doNotKeepIndex = 0) + : TTransactionBase(self) + , Request(std::move(request)) + , KeepIndex(keepIndex) + , DoNotKeepIndex(doNotKeepIndex) + {} + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + Validate(); + if (!Error) { + auto& record = Request->Get()->Record; + Done = ProcessFlags(txc, KeepIndex, record.GetKeep(), NKikimrBlobDepot::EKeepState::Keep) + && ProcessFlags(txc, DoNotKeepIndex, record.GetDoNotKeep(), NKikimrBlobDepot::EKeepState::DoNotKeep) + && ProcessBarrier(txc); } + return true; + } - void Complete(const TActorContext&) override { - if (Done || Error) { - auto [response, _] = TEvBlobDepot::MakeResponseFor(*Request, Self->SelfId(), - Error ? NKikimrProto::ERROR : NKikimrProto::OK, std::move(Error)); - TActivationContext::Send(response.release()); - } else { - Self->Execute(std::make_unique<TTxCollectGarbage>(Self, std::move(Request), KeepIndex, DoNotKeepIndex)); - } + void Complete(const TActorContext&) override { + Self->Data->CommitTrash(this); + + if (Done || Error) { + auto [response, _] = TEvBlobDepot::MakeResponseFor(*Request, Self->SelfId(), + Error ? NKikimrProto::ERROR : NKikimrProto::OK, std::move(Error)); + TActivationContext::Send(response.release()); + Self->Data->HandleTrash(); + } else { + Self->Execute(std::make_unique<TTxCollectGarbage>(Self, std::move(Request), KeepIndex, DoNotKeepIndex)); } + } - void Validate() { - // validate the command first - auto& record = Request->Get()->Record; - if (record.HasCollectGeneration() && record.HasCollectStep()) { - if (!record.HasTabletId() || !record.HasChannel() || record.GetChannel() > TLogoBlobID::MaxChannel) { - Error = "TabletId/Channel are either not set or invalid"; - } else if (!record.HasGeneration() || !record.HasPerGenerationCounter()) { - Error = "Generation/PerGenerationCounter are not set"; - } else { - const auto key = std::make_pair(record.GetTabletId(), record.GetChannel()); - auto& barriers = Self->GarbageCollectionManager->Barriers; - if (const auto it = barriers.find(key); it != barriers.end()) { - auto& barrier = it->second; - const ui64 recordGenStep = GenStep(record.GetGeneration(), record.GetPerGenerationCounter()); - const ui64 collectGenStep = GenStep(record.GetCollectGeneration(), record.GetCollectStep()); - ui64& currentGenStep = record.GetHard() ? barrier.Hard : barrier.Soft; - if (recordGenStep < barrier.LastRecordGenStep) { - Error = "record generation:counter is obsolete"; - } else if (recordGenStep == barrier.LastRecordGenStep) { - if (currentGenStep != collectGenStep) { - Error = "repeated command with different collect parameters received"; - } - } else if (collectGenStep < currentGenStep) { - Error = "decreasing barrier"; + void Validate() { + // validate the command first + auto& record = Request->Get()->Record; + if (record.HasCollectGeneration() && record.HasCollectStep()) { + if (!record.HasTabletId() || !record.HasChannel() || record.GetChannel() > TLogoBlobID::MaxChannel) { + Error = "TabletId/Channel are either not set or invalid"; + } else if (!record.HasGeneration() || !record.HasPerGenerationCounter()) { + Error = "Generation/PerGenerationCounter are not set"; + } else { + const auto key = std::make_pair(record.GetTabletId(), record.GetChannel()); + auto& barriers = Self->BarrierServer->Barriers; + if (const auto it = barriers.find(key); it != barriers.end()) { + auto& barrier = it->second; + const ui64 recordGenStep = GenStep(record.GetGeneration(), record.GetPerGenerationCounter()); + const ui64 collectGenStep = GenStep(record.GetCollectGeneration(), record.GetCollectStep()); + ui64& currentGenStep = record.GetHard() ? barrier.Hard : barrier.Soft; + if (recordGenStep < barrier.LastRecordGenStep) { + Error = "record generation:counter is obsolete"; + } else if (recordGenStep == barrier.LastRecordGenStep) { + if (currentGenStep != collectGenStep) { + Error = "repeated command with different collect parameters received"; } + } else if (collectGenStep < currentGenStep) { + Error = "decreasing barrier"; } } - } else if (record.HasCollectGeneration() || record.HasCollectStep()) { - Error = "CollectGeneration/CollectStep set incorrectly"; } + } else if (record.HasCollectGeneration() || record.HasCollectStep()) { + Error = "CollectGeneration/CollectStep set incorrectly"; } + } - bool ProcessFlags(TTransactionContext& txc, int& index, - const NProtoBuf::RepeatedPtrField<NKikimrProto::TLogoBlobID>& items, - NKikimrBlobDepot::EKeepState state) { - NIceDb::TNiceDb db(txc.DB); - for (; index < items.size() && NumKeysProcessed < MaxKeysToProcessAtOnce; ++index) { - const auto id = LogoBlobIDFromLogoBlobID(items[index]); - const TStringBuf key = id.AsBinaryString(); - if (const auto& value = Self->UpdateKeepState(key, state)) { - db.Table<Schema::Data>().Key(TString(key)).Update<Schema::Data::Value>(*value); - ++NumKeysProcessed; - } + bool ProcessFlags(TTransactionContext& txc, int& index, + const NProtoBuf::RepeatedPtrField<NKikimrProto::TLogoBlobID>& items, + NKikimrBlobDepot::EKeepState state) { + NIceDb::TNiceDb db(txc.DB); + for (; index < items.size() && NumKeysProcessed < MaxKeysToProcessAtOnce; ++index) { + const auto id = LogoBlobIDFromLogoBlobID(items[index]); + TData::TKey key(id); + if (const auto& value = Self->Data->UpdateKeepState(key, state)) { + db.Table<Schema::Data>().Key(key.MakeBinaryKey()).Update<Schema::Data::Value>(*value); + ++NumKeysProcessed; } - - return index == items.size(); } - bool ProcessBarrier(TTransactionContext& txc) { - NIceDb::TNiceDb db(txc.DB); - - const auto& record = Request->Get()->Record; - if (record.HasCollectGeneration() && record.HasCollectStep()) { - const TLogoBlobID first(record.GetTabletId(), 0, 0, record.GetChannel(), 0, 0); - const TLogoBlobID last(record.GetTabletId(), record.GetCollectGeneration(), record.GetCollectStep(), - record.GetChannel(), TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie, TLogoBlobID::MaxPartId, - TLogoBlobID::MaxCrcMode); - const bool hard = record.GetHard(); - - auto processKey = [&](TStringBuf key, const TDataValue& value) { - if (value.KeepState != NKikimrBlobDepot::EKeepState::Keep || hard) { - const TLogoBlobID id(reinterpret_cast<const ui64*>(key.data())); - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT01, "DeleteKey", (TabletId, Self->TabletID()), - (BlobId, id)); - db.Table<Schema::Data>().Key(TString(key)).Delete(); - Self->DeleteKey(key); - ++NumKeysProcessed; - } + return index == items.size(); + } - return NumKeysProcessed < MaxKeysToProcessAtOnce; - }; + bool ProcessBarrier(TTransactionContext& txc) { + NIceDb::TNiceDb db(txc.DB); + + const auto& record = Request->Get()->Record; + if (record.HasCollectGeneration() && record.HasCollectStep()) { + const bool hard = record.GetHard(); + + auto processKey = [&](const TData::TKey& key, const TData::TValue& value) { + if (value.KeepState != NKikimrBlobDepot::EKeepState::Keep || hard) { + const TLogoBlobID id(key.GetBlobId()); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT01, "DeleteKey", (TabletId, Self->TabletID()), (BlobId, id)); + db.Table<Schema::Data>().Key(key.MakeBinaryKey()).Delete(); + auto updateTrash = [&](TLogoBlobID id) { + db.Table<Schema::Trash>().Key(TString(id.AsBinaryString())).Update(); + }; + Self->Data->DeleteKey(key, updateTrash, this); + ++NumKeysProcessed; + } - Self->ScanRange(first.AsBinaryString(), last.AsBinaryString(), - EScanFlags::INCLUDE_BEGIN | EScanFlags::INCLUDE_END, processKey); + return NumKeysProcessed < MaxKeysToProcessAtOnce; + }; - if (NumKeysProcessed == MaxKeysToProcessAtOnce) { - return false; - } + const TData::TKey first(TLogoBlobID(record.GetTabletId(), 0, 0, record.GetChannel(), 0, 0)); + const TData::TKey last(TLogoBlobID(record.GetTabletId(), record.GetCollectGeneration(), + record.GetCollectStep(), record.GetChannel(), TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie, + TLogoBlobID::MaxPartId, TLogoBlobID::MaxCrcMode)); - const auto key = std::make_pair(record.GetTabletId(), record.GetChannel()); - auto& barriers = Self->GarbageCollectionManager->Barriers; - auto& barrier = barriers[key]; - const ui64 recordGenStep = GenStep(record.GetGeneration(), record.GetPerGenerationCounter()); - const ui64 collectGenStep = GenStep(record.GetCollectGeneration(), record.GetCollectStep()); - ui64& currentGenStep = record.GetHard() ? barrier.Hard : barrier.Soft; - Y_VERIFY(barrier.LastRecordGenStep <= recordGenStep); - barrier.LastRecordGenStep = recordGenStep; - Y_VERIFY(currentGenStep <= collectGenStep); - currentGenStep = collectGenStep; - - db.Table<Schema::Barriers>().Key(record.GetTabletId(), record.GetChannel()).Update( - NIceDb::TUpdate<Schema::Barriers::LastRecordGenStep>(recordGenStep), - NIceDb::TUpdate<Schema::Barriers::Soft>(barrier.Soft), - NIceDb::TUpdate<Schema::Barriers::Hard>(barrier.Hard) - ); + Self->Data->ScanRange(&first, &last, TData::EScanFlags::INCLUDE_BEGIN | TData::EScanFlags::INCLUDE_END, + processKey); + + if (NumKeysProcessed == MaxKeysToProcessAtOnce) { + return false; } - return true; + const auto key = std::make_pair(record.GetTabletId(), record.GetChannel()); + auto& barriers = Self->BarrierServer->Barriers; + auto& barrier = barriers[key]; + const ui64 recordGenStep = GenStep(record.GetGeneration(), record.GetPerGenerationCounter()); + const ui64 collectGenStep = GenStep(record.GetCollectGeneration(), record.GetCollectStep()); + ui64& currentGenStep = record.GetHard() ? barrier.Hard : barrier.Soft; + Y_VERIFY(barrier.LastRecordGenStep <= recordGenStep); + barrier.LastRecordGenStep = recordGenStep; + Y_VERIFY(currentGenStep <= collectGenStep); + currentGenStep = collectGenStep; + + db.Table<Schema::Barriers>().Key(record.GetTabletId(), record.GetChannel()).Update( + NIceDb::TUpdate<Schema::Barriers::LastRecordGenStep>(recordGenStep), + NIceDb::TUpdate<Schema::Barriers::Soft>(barrier.Soft), + NIceDb::TUpdate<Schema::Barriers::Hard>(barrier.Hard) + ); } - }; - public: - TGarbageCollectionManager(TBlobDepot *self) - : Self(self) - {} - - void Handle(TEvBlobDepot::TEvCollectGarbage::TPtr ev) { - std::unique_ptr<TEvBlobDepot::TEvCollectGarbage::THandle> uniq(ev.Release()); - Self->Execute(std::make_unique<TTxCollectGarbage>(Self, std::move(uniq))); - } - - bool CheckBlobForBarrier(TLogoBlobID id) const { - const auto key = std::make_pair(id.TabletID(), id.Channel()); - const auto it = Barriers.find(key); - const ui64 genstep = static_cast<ui64>(id.Generation()) << 32 | id.Step(); - return it == Barriers.end() || genstep > Max(it->second.Soft, it->second.Hard); + return true; } }; - TBlobDepot::TGarbageCollectionManagerPtr TBlobDepot::CreateGarbageCollectionManager() { - return {new TGarbageCollectionManager{this}, std::default_delete<TGarbageCollectionManager>{}}; + void TBlobDepot::TBarrierServer::AddBarrierOnLoad(ui64 tabletId, ui8 channel, ui64 lastRecordGenStep, + ui64 soft, ui64 hard) { + Barriers[std::make_pair(tabletId, channel)] = { + .LastRecordGenStep = lastRecordGenStep, + .Soft = soft, + .Hard = hard, + }; + } + + void TBlobDepot::TBarrierServer::Handle(TEvBlobDepot::TEvCollectGarbage::TPtr ev) { + Self->Execute(std::make_unique<TTxCollectGarbage>(Self, + std::unique_ptr<TEvBlobDepot::TEvCollectGarbage::THandle>(ev.Release()))); } - void TBlobDepot::Handle(TEvBlobDepot::TEvCollectGarbage::TPtr ev) { - GarbageCollectionManager->Handle(ev); + bool TBlobDepot::TBarrierServer::CheckBlobForBarrier(TLogoBlobID id) const { + const auto key = std::make_pair(id.TabletID(), id.Channel()); + const auto it = Barriers.find(key); + return it == Barriers.end() || GenStep(id) > Max(it->second.Soft, it->second.Hard); } - bool TBlobDepot::CheckBlobForBarrier(TLogoBlobID id) const { - return GarbageCollectionManager->CheckBlobForBarrier(id); + void TBlobDepot::TBarrierServer::GetBlobBarrierRelation(TLogoBlobID id, bool *underSoft, bool *underHard) const { + const auto key = std::make_pair(id.TabletID(), id.Channel()); + const auto it = Barriers.find(key); + const ui64 genStep = GenStep(id); + *underSoft = it == Barriers.end() ? false : genStep <= it->second.Soft; + *underHard = it == Barriers.end() ? false : genStep <= it->second.Hard; } } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/garbage_collection.h b/ydb/core/blob_depot/garbage_collection.h new file mode 100644 index 00000000000..c6c6a4ac2b8 --- /dev/null +++ b/ydb/core/blob_depot/garbage_collection.h @@ -0,0 +1,42 @@ +#pragma once + +#include "defs.h" +#include "blob_depot_tablet.h" + +namespace NKikimr::NBlobDepot { + + class TBlobDepot::TBarrierServer { + TBlobDepot* const Self; + + struct TBarrier { + ui64 LastRecordGenStep = 0; + ui64 Soft = 0; + ui64 Hard = 0; + }; + + THashMap<std::pair<ui64, ui8>, TBarrier> Barriers; + + private: + class TTxCollectGarbage; + + public: + TBarrierServer(TBlobDepot *self) + : Self(self) + {} + + void AddBarrierOnLoad(ui64 tabletId, ui8 channel, ui64 lastRecordGenStep, ui64 soft, ui64 hard); + void Handle(TEvBlobDepot::TEvCollectGarbage::TPtr ev); + bool CheckBlobForBarrier(TLogoBlobID id) const; + void GetBlobBarrierRelation(TLogoBlobID id, bool *underSoft, bool *underHard) const; + + template<typename TCallback> + void Enumerate(TCallback&& callback) { + for (const auto& [key, value] : Barriers) { + callback(key.first, key.second, static_cast<ui32>(value.LastRecordGenStep >> 32), + static_cast<ui32>(value.LastRecordGenStep), static_cast<ui32>(value.Soft >> 32), + static_cast<ui32>(value.Soft), static_cast<ui32>(value.Hard >> 32), static_cast<ui32>(value.Hard)); + } + } + }; + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/given_id_range.cpp b/ydb/core/blob_depot/given_id_range.cpp new file mode 100644 index 00000000000..2d78ac0970c --- /dev/null +++ b/ydb/core/blob_depot/given_id_range.cpp @@ -0,0 +1,127 @@ +#include "blob_depot_tablet.h" + +namespace NKikimr::NBlobDepot { + + TGivenIdRange::TGivenIdRange(const NKikimrBlobDepot::TGivenIdRange& proto) { + for (const auto& range : proto.GetRanges()) { + const ui64 begin = range.GetBegin(); + const ui32 len = range.GetLen(); + TRange& r = InsertNewRange(begin, begin + len); + ui64 *chunks = const_cast<ui64*>(r.Bits.GetChunks()); + const ui32 count = r.Bits.GetChunkCount(); + Y_VERIFY(range.OffsetsSize() == range.BitMasksSize()); + for (ui32 i = 0; i < range.OffsetsSize(); ++i) { + const ui32 offset = range.GetOffsets(i); + const ui64 bitMask = range.GetBitMasks(i); + NumAvailableItems += PopCount(bitMask) - 64; + Y_VERIFY(offset < count); + chunks[offset] = bitMask; + } + } + } + + TGivenIdRange::TRange& TGivenIdRange::InsertNewRange(ui64 begin, ui64 end) { + const ui64 len = end - begin; + Y_VERIFY(len); + const auto it = Ranges.lower_bound(begin); + if (it != Ranges.begin()) { + const auto& [prevBegin, prev] = *std::prev(it); + Y_VERIFY(prevBegin + prev.Len <= begin); + } + if (it != Ranges.end()) { + Y_VERIFY(end <= it->first); + } + NumAvailableItems += len; + const auto newIt = Ranges.emplace_hint(it, begin, len); + return newIt->second; + } + + void TGivenIdRange::IssueNewRange(ui64 begin, ui64 end) { + InsertNewRange(begin, end); + } + + void TGivenIdRange::Join(TGivenIdRange&& other) { + Ranges.merge(std::move(other.Ranges)); + if (Ranges.size() > 1) { + for (auto it = std::next(Ranges.begin()); it != Ranges.end(); ++it) { + const auto& [prevBegin, prev] = *std::prev(it); + const auto& [begin, range] = *it; + Y_VERIFY(prevBegin + prev.Len <= begin); + } + } + NumAvailableItems += std::exchange(other.NumAvailableItems, 0); + } + + void TGivenIdRange::ToProto(NKikimrBlobDepot::TGivenIdRange *proto) { + for (const auto& [begin, range] : Ranges) { + auto *r = proto->AddRanges(); + r->SetBegin(begin); + r->SetLen(range.Len); + const ui64 *chunks = range.Bits.GetChunks(); + const ui32 count = range.Bits.GetChunkCount(); + for (ui32 i = 0; i < count; ++i) { + if (chunks[i] != ~ui64(0)) { + r->AddOffsets(i); + r->AddBitMasks(chunks[i]); + } + } + } + } + + void TGivenIdRange::RemovePoint(ui64 value) { + auto it = Ranges.upper_bound(value); + Y_VERIFY(it != Ranges.begin()); + auto& [begin, range] = *--it; + Y_VERIFY(begin <= value); + const ui64 offset = value - begin; + Y_VERIFY(offset < range.Len); + Y_VERIFY(range.Bits[offset]); + range.Bits.Reset(offset); + --NumAvailableItems; + // FIXME: reduce range (maybe)? + } + + void TGivenIdRange::Output(IOutputStream& s) const { + s << "{"; + for (auto it = Ranges.begin(); it != Ranges.end(); ++it) { + if (it != Ranges.begin()) { + s << " "; + } + const auto& [begin, range] = *it; + s << begin << ":" << range.Len << "["; + for (ui32 i = 0; i < range.Len; ++i) { + s << static_cast<int>(range.Bits[i]); + } + s << "]"; + } + } + + TString TGivenIdRange::ToString() const { + TStringStream s; + Output(s); + return s.Str(); + } + + bool TGivenIdRange::IsEmpty() const { + return Ranges.empty(); + } + + ui32 TGivenIdRange::GetNumAvailableItems() const { + return NumAvailableItems; + } + + ui64 TGivenIdRange::Allocate() { + Y_VERIFY(!Ranges.empty()); + const auto it = Ranges.begin(); + auto& [begin, range] = *it; + const size_t offset = range.Bits.FirstNonZeroBit(); + const ui64 res = begin + offset; + range.Bits.Reset(offset); + if (range.Bits.NextNonZeroBit(offset) == range.Bits.Size()) { + Ranges.erase(it); + } + --NumAvailableItems; + return res; + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/mon_main.cpp b/ydb/core/blob_depot/mon_main.cpp new file mode 100644 index 00000000000..af2c79405f4 --- /dev/null +++ b/ydb/core/blob_depot/mon_main.cpp @@ -0,0 +1,271 @@ +#include "blob_depot_tablet.h" +#include "data.h" +#include "garbage_collection.h" + +namespace NKikimr::NBlobDepot { + + class TBlobDepot::TTxMonData : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { + std::unique_ptr<NMon::TEvRemoteHttpInfo::THandle> Request; + TStringStream Stream; + + enum class ETable { + Data, + RefCount, + Trash, + Barriers, + Blocks, + }; + + static constexpr const char *TableName(ETable table) { + switch (table) { + case ETable::Data: return "data"; + case ETable::RefCount: return "refcount"; + case ETable::Trash: return "trash"; + case ETable::Barriers: return "barriers"; + case ETable::Blocks: return "blocks"; + } + } + + static ETable TableByName(const TString& name) { + for (const ETable table : {ETable::Data, ETable::RefCount, ETable::Trash, ETable::Barriers, ETable::Blocks}) { + if (name == TableName(table)) { + return table; + } + } + return ETable::Data; + } + + public: + TTxMonData(TBlobDepot *self, NMon::TEvRemoteHttpInfo::TPtr ev) + : TTransactionBase(self) + , Request(ev.Release()) + {} + + bool Execute(TTransactionContext& /*txc*/, const TActorContext&) override { + const TCgiParameters& cgi = Request->Get()->Cgi(); + const ETable table = TableByName(cgi.Get("table")); + + void (TTxMonData::*render)(bool) = nullptr; + switch (table) { + case ETable::Data: + render = &TTxMonData::RenderDataTable; + break; + + case ETable::RefCount: + render = &TTxMonData::RenderRefCountTable; + break; + + case ETable::Trash: + render = &TTxMonData::RenderTrashTable; + break; + + case ETable::Barriers: + render = &TTxMonData::RenderBarriersTable; + break; + + case ETable::Blocks: + render = &TTxMonData::RenderBlocksTable; + break; + } + if (!render) { + Y_FAIL(); + } + + HTML(Stream) { + DIV_CLASS("panel panel-info") { + DIV_CLASS("panel-heading") { + Stream << "Data"; + } + DIV_CLASS("panel-body") { + Stream << "<ul class='nav nav-tabs'>"; + for (const ETable tab : {ETable::Data, ETable::RefCount, ETable::Trash, ETable::Barriers, ETable::Blocks}) { + Stream << "<li" << (table == tab ? " class='active'" : "") << ">" + << "<a href='app?TabletID=" << Self->TabletID() << "&page=data&table=" + << TableName(tab) << "'>" << TableName(tab) << "</a></li>"; + } + Stream << "</ul>"; + + TABLE_CLASS("table") { + TABLEHEAD() { + TABLER() { + (this->*render)(true); + } + } + TABLEBODY() { + (this->*render)(false); + } + } + } + } + } + + return true; + } + + void RenderDataTable(bool header) { + HTML(Stream) { + if (header) { + TABLEH() { Stream << "key"; } + TABLEH() { Stream << "value chain"; } + TABLEH() { Stream << "keep state"; } + TABLEH() { Stream << "barrier"; } + } else { + Self->Data->ScanRange(nullptr, nullptr, 0, [&](const TData::TKey& key, const TData::TValue& value) { + TABLER() { + TABLED() { + key.Output(Stream, Self->Config); + } + TABLED() { + bool first = true; + for (const auto& item : value.ValueChain) { + if (first) { + first = false; + } else { + Stream << "<br/>"; + } + Stream << TBlobSeqId::FromProto(item.GetLocator().GetBlobSeqId()).ToString(); + if (item.HasSubrangeBegin() || item.HasSubrangeEnd()) { + Stream << "["; + if (item.HasSubrangeBegin()) { + Stream << item.GetSubrangeBegin(); + } + Stream << ":"; + if (item.HasSubrangeEnd()) { + Stream << item.GetSubrangeEnd(); + } + Stream << "]"; + } + } + } + TABLED() { + Stream << NKikimrBlobDepot::EKeepState_Name(value.KeepState); + } + TABLED() { + if (Self->Config.GetOperationMode() == NKikimrBlobDepot::EOperationMode::VirtualGroup) { + bool underSoft, underHard; + Self->BarrierServer->GetBlobBarrierRelation(key.GetBlobId(), &underSoft, &underHard); + Stream << (underSoft ? 'S' : '-') << (underHard ? 'H' : '-'); + } + } + } + return true; + }); + } + } + } + + void RenderRefCountTable(bool header) { + HTML(Stream) { + if (header) { + TABLEH() { Stream << "blob id"; } + TABLEH() { Stream << "refcount"; } + } else { + Self->Data->EnumerateRefCount([&](TLogoBlobID id, ui32 count) { + TABLER() { + TABLED() { Stream << id; } + TABLED() { Stream << count; } + } + }); + } + } + } + + void RenderTrashTable(bool header) { + HTML(Stream) { + if (header) { + TABLEH() { Stream << "tablet id"; } + TABLEH() { Stream << "channel"; } + TABLEH() { Stream << "group id"; } + TABLEH() { Stream << "blob id"; } + TABLEH() { Stream << "in flight"; } + } else { + Self->Data->EnumerateTrash([&](ui64 tabletId, ui8 channel, ui32 groupId, TLogoBlobID blobId, bool inFlight) { + TABLER() { + TABLED() { Stream << tabletId; } + TABLED() { Stream << int(channel); } + TABLED() { Stream << groupId; } + TABLED() { Stream << blobId; } + TABLED() { Stream << (inFlight ? "*" : ""); } + } + }); + } + } + } + + void RenderBarriersTable(bool header) { + HTML(Stream) { + if (header) { + TABLEH() { Stream << "tablet id"; } + TABLEH() { Stream << "channel"; } + TABLEH() { Stream << "last record"; } + TABLEH() { Stream << "soft"; } + TABLEH() { Stream << "hard"; } + } else { + Self->BarrierServer->Enumerate([&](ui64 tabletId, ui8 channel, ui32 recordGen, ui32 recordCounter, + ui32 softGen, ui32 softStep, ui32 hardGen, ui32 hardStep) { + TABLER() { + TABLED() { Stream << tabletId; } + TABLED() { Stream << int(channel); } + TABLED() { Stream << recordGen << ":" << recordCounter; } + TABLED() { Stream << softGen << ":" << softStep; } + TABLED() { Stream << hardGen << ":" << hardStep; } + } + }); + } + } + } + + void RenderBlocksTable(bool header) { + HTML(Stream) { + if (header) { + } else { + } + } + } + + void Complete(const TActorContext&) override { + TActivationContext::Send(new IEventHandle(Request->Sender, Self->SelfId(), new NMon::TEvRemoteHttpInfoRes( + Stream.Str()), 0, Request->Cookie)); + } + }; + + bool TBlobDepot::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TActorContext&) { + if (!Executor() || !Executor()->GetStats().IsActive) { + return false; + } else if (!ev) { + return true; + } + + TStringStream s; + + const TCgiParameters& cgi = ev->Get()->Cgi(); + if (cgi.Has("page")) { + const TString& page = cgi.Get("page"); + if (page == "data") { + Execute(std::make_unique<TTxMonData>(this, ev)); + return true; + } else { + Send(ev->Sender, new NMon::TEvRemoteBinaryInfoRes(TStringBuilder() + << "HTTP/1.1 403 Page not found\r\n" + << "Content-Type: text/html\r\n" + << "Connection: close\r\n" + << "\r\n" + << "<html><body>Page " << page << " is not found</body></html>"), ev->Cookie); + return true; + } + } else { + RenderMainPage(s); + } + + Send(ev->Sender, new NMon::TEvRemoteHttpInfoRes(s.Str()), 0, ev->Cookie); + return true; + } + + + void TBlobDepot::RenderMainPage(IOutputStream& s) { + HTML(s) { + s << "<a href='app?TabletID=" << TabletID() << "&page=data'>Contained data</a><br>"; + } + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/op_apply_config.cpp b/ydb/core/blob_depot/op_apply_config.cpp index 57e161acea5..fdfb9c70d72 100644 --- a/ydb/core/blob_depot/op_apply_config.cpp +++ b/ydb/core/blob_depot/op_apply_config.cpp @@ -9,6 +9,7 @@ namespace NKikimr::NBlobDepot { class TTxApplyConfig : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { std::unique_ptr<IEventHandle> Response; TString ConfigProtobuf; + bool WasConfigured = false; public: TTxApplyConfig(TBlobDepot *self, TEvBlobDepot::TEvApplyConfig& ev, std::unique_ptr<IEventHandle> response, @@ -25,6 +26,16 @@ namespace NKikimr::NBlobDepot { bool Execute(TTransactionContext& txc, const TActorContext&) override { NIceDb::TNiceDb db(txc.DB); + + auto table = db.Table<Schema::Config>().Key(Schema::Config::Key::Value).Select(); + if (!table.IsReady()) { + return false; + } else if (table.IsValid()) { + WasConfigured = table.HaveValue<Schema::Config::ConfigProtobuf>(); + } else { + WasConfigured = false; + } + db.Table<Schema::Config>().Key(Schema::Config::Key::Value).Update( NIceDb::TUpdate<Schema::Config::ConfigProtobuf>(ConfigProtobuf) ); @@ -32,10 +43,9 @@ namespace NKikimr::NBlobDepot { } void Complete(const TActorContext&) override { - const bool wasEmpty = !Self->Config.ByteSizeLong(); const bool success = Self->Config.ParseFromString(ConfigProtobuf); Y_VERIFY(success); - if (wasEmpty) { + if (!WasConfigured) { Self->InitChannelKinds(); } TActivationContext::Send(Response.release()); diff --git a/ydb/core/blob_depot/op_commit_blob_seq.cpp b/ydb/core/blob_depot/op_commit_blob_seq.cpp index 56b9db28498..67182f22369 100644 --- a/ydb/core/blob_depot/op_commit_blob_seq.cpp +++ b/ydb/core/blob_depot/op_commit_blob_seq.cpp @@ -1,5 +1,7 @@ #include "blob_depot_tablet.h" #include "schema.h" +#include "data.h" +#include "garbage_collection.h" namespace NKikimr::NBlobDepot { @@ -20,6 +22,8 @@ namespace NKikimr::NBlobDepot { NKikimrBlobDepot::TEvCommitBlobSeqResult *responseRecord; std::tie(Response, responseRecord) = TEvBlobDepot::MakeResponseFor(*Request, Self->SelfId()); + TAgent& agent = Self->GetAgent(Request->Recipient); + for (const auto& item : Request->Get()->Record.GetItems()) { auto *responseItem = responseRecord->AddItems(); responseItem->SetStatus(NKikimrProto::OK); @@ -29,19 +33,17 @@ namespace NKikimr::NBlobDepot { value.SetMeta(item.GetMeta()); } auto *chain = value.AddValueChain(); - chain->MutableLocator()->CopyFrom(item.GetBlobLocator()); - - const TString& key = item.GetKey(); - if (key.size() == 3 * sizeof(ui64)) { - const TLogoBlobID id(reinterpret_cast<const ui64*>(key.data())); - if (!Self->CheckBlobForBarrier(id)) { - responseItem->SetStatus(NKikimrProto::ERROR); - responseItem->SetErrorReason(TStringBuilder() << "BlobId# " << id << " is being put beyond the barrier"); - continue; - } + auto *locator = chain->MutableLocator(); + locator->CopyFrom(item.GetBlobLocator()); + + if (!MarkGivenIdCommitted(agent, TBlobSeqId::FromProto(locator->GetBlobSeqId()), responseItem)) { + continue; + } + if (!CheckKeyAgainstBarrier(item.GetKey(), responseItem)) { + continue; } - Self->PutKey(std::move(key), { + Self->Data->PutKey(TData::TKey::FromBinaryKey(item.GetKey(), Self->Config), { .Meta = value.GetMeta(), .ValueChain = std::move(*value.MutableValueChain()), .KeepState = value.GetKeepState(), @@ -58,7 +60,51 @@ namespace NKikimr::NBlobDepot { return true; } + bool MarkGivenIdCommitted(TAgent& agent, const TBlobSeqId& blobSeqId, + NKikimrBlobDepot::TEvCommitBlobSeqResult::TItem *responseItem) { + const NKikimrBlobDepot::TChannelKind::E kind = blobSeqId.Channel < Self->ChannelToKind.size() + ? Self->ChannelToKind[blobSeqId.Channel] : NKikimrBlobDepot::TChannelKind::System; + if (kind == NKikimrBlobDepot::TChannelKind::System) { + responseItem->SetStatus(NKikimrProto::ERROR); + responseItem->SetErrorReason("incorrect Channel for blob"); + return false; + } + + const auto channelKindIt = Self->ChannelKinds.find(kind); + Y_VERIFY(channelKindIt != Self->ChannelKinds.end()); + auto& ck = channelKindIt->second; + + const ui64 value = blobSeqId.ToBinary(ck); + agent.ChannelKinds[kind].GivenIdRanges.RemovePoint(value); + ck.GivenIdRanges.RemovePoint(value); + + auto& channel = Self->PerChannelRecords[blobSeqId.Channel]; + channel.GivenStepIndex.erase(std::make_pair(blobSeqId.Step, blobSeqId.Index)); + + return true; + } + + bool CheckKeyAgainstBarrier(const TString& key, NKikimrBlobDepot::TEvCommitBlobSeqResult::TItem *responseItem) { + if (Self->Config.GetOperationMode() == NKikimrBlobDepot::EOperationMode::VirtualGroup) { + if (key.size() != 3 * sizeof(ui64)) { + responseItem->SetStatus(NKikimrProto::ERROR); + responseItem->SetErrorReason("incorrect BlobId format"); + return false; + } + + const TLogoBlobID id(reinterpret_cast<const ui64*>(key.data())); + if (!Self->BarrierServer->CheckBlobForBarrier(id)) { + responseItem->SetStatus(NKikimrProto::ERROR); + responseItem->SetErrorReason(TStringBuilder() << "BlobId# " << id << " is being put beyond the barrier"); + return false; + } + } + + return true; + } + void Complete(const TActorContext&) override { + Self->Data->HandleTrash(); TActivationContext::Send(Response.release()); } }; diff --git a/ydb/core/blob_depot/op_load.cpp b/ydb/core/blob_depot/op_load.cpp index 1ec538d0f07..e48ce2802ee 100644 --- a/ydb/core/blob_depot/op_load.cpp +++ b/ydb/core/blob_depot/op_load.cpp @@ -1,10 +1,15 @@ #include "blob_depot_tablet.h" #include "schema.h" +#include "blocks.h" +#include "data.h" +#include "garbage_collection.h" namespace NKikimr::NBlobDepot { void TBlobDepot::ExecuteTxLoad() { class TTxLoad : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { + bool Configured = false; + public: TTxLoad(TBlobDepot *self) : TTransactionBase(self) @@ -26,6 +31,7 @@ namespace NKikimr::NBlobDepot { if (table.HaveValue<Schema::Config::ConfigProtobuf>()) { const bool success = Self->Config.ParseFromString(table.GetValue<Schema::Config::ConfigProtobuf>()); Y_VERIFY(success); + Configured = true; } } } @@ -37,7 +43,7 @@ namespace NKikimr::NBlobDepot { return false; } while (table.IsValid()) { - Self->AddBlockOnLoad( + Self->BlocksManager->AddBlockOnLoad( table.GetValue<Schema::Blocks::TabletId>(), table.GetValue<Schema::Blocks::BlockedGeneration>() ); @@ -47,6 +53,26 @@ namespace NKikimr::NBlobDepot { } } + // Barriers + { + auto table = db.Table<Schema::Barriers>().Select(); + if (!table.IsReady()) { + return false; + } + while (table.IsValid()) { + Self->BarrierServer->AddBarrierOnLoad( + table.GetValue<Schema::Barriers::TabletId>(), + table.GetValue<Schema::Barriers::Channel>(), + table.GetValue<Schema::Barriers::LastRecordGenStep>(), + table.GetValue<Schema::Barriers::Soft>(), + table.GetValue<Schema::Barriers::Hard>() + ); + if (!table.Next()) { + return false; + } + } + } + // Data { auto table = db.Table<Schema::Data>().Select(); @@ -54,8 +80,8 @@ namespace NKikimr::NBlobDepot { return false; } while (table.IsValid()) { - Self->AddDataOnLoad( - table.GetValue<Schema::Data::Key>(), + Self->Data->AddDataOnLoad( + TData::TKey::FromBinaryKey(table.GetValue<Schema::Data::Key>(), Self->Config), table.GetValue<Schema::Data::Value>() ); if (!table.Next()) { @@ -64,17 +90,56 @@ namespace NKikimr::NBlobDepot { } } + // Trash + { + auto table = db.Table<Schema::Trash>().Select(); + if (!table.IsReady()) { + return false; + } + while (table.IsValid()) { + const TString& blobId = table.GetValue<Schema::Trash::BlobId>(); + Self->Data->AddTrashOnLoad(TLogoBlobID(reinterpret_cast<const ui64*>(blobId.data()))); + if (!table.Next()) { + return false; + } + } + } + + // ConfirmedGC + { + auto table = db.Table<Schema::ConfirmedGC>().Select(); + if (!table.IsReady()) { + return false; + } + while (table.IsValid()) { + Self->Data->AddConfirmedGenStepOnLoad(table.GetValue<Schema::ConfirmedGC::Channel>(), + table.GetValue<Schema::ConfirmedGC::GroupId>(), + table.GetValue<Schema::ConfirmedGC::ConfirmedGenStep>()); + if (!table.Next()) { + return false; + } + } + } + return true; } bool Precharge(NIceDb::TNiceDb& db) { auto config = db.Table<Schema::Config>().Select(); auto blocks = db.Table<Schema::Blocks>().Select(); - return config.IsReady() && blocks.IsReady(); + auto barriers = db.Table<Schema::Barriers>().Select(); + auto data = db.Table<Schema::Data>().Select(); + auto trash = db.Table<Schema::Trash>().Select(); + auto confirmedGC = db.Table<Schema::ConfirmedGC>().Select(); + return config.IsReady() && blocks.IsReady() && barriers.IsReady() && data.IsReady() && trash.IsReady() && + confirmedGC.IsReady(); } void Complete(const TActorContext&) override { - Self->InitChannelKinds(); + if (Configured) { + Self->InitChannelKinds(); + Self->Data->HandleTrash(); + } } }; diff --git a/ydb/core/blob_depot/op_resolve.cpp b/ydb/core/blob_depot/op_resolve.cpp index 4904339756f..0b792f7c714 100644 --- a/ydb/core/blob_depot/op_resolve.cpp +++ b/ydb/core/blob_depot/op_resolve.cpp @@ -1,4 +1,5 @@ #include "blob_depot_tablet.h" +#include "data.h" namespace NKikimr::NBlobDepot { @@ -31,14 +32,21 @@ namespace NKikimr::NBlobDepot { ui32 itemIndex = 0; for (const auto& item : ev->Get()->Record.GetItems()) { - std::optional<TStringBuf> begin = item.HasBeginningKey() ? std::make_optional(item.GetBeginningKey()) : std::nullopt; - std::optional<TStringBuf> end = item.HasEndingKey() ? std::make_optional(item.GetEndingKey()) : std::nullopt; + TData::TKey begin; + TData::TKey end; + + if (item.HasBeginningKey()) { + begin = TData::TKey::FromBinaryKey(item.GetBeginningKey(), Config); + } + if (item.HasEndingKey()) { + end = TData::TKey::FromBinaryKey(item.GetEndingKey(), Config); + } ui32 numItems = 0; - auto addKey = [&](TStringBuf key, const TDataValue& value) { + auto addKey = [&](const TData::TKey& key, const TData::TValue& value) { NKikimrBlobDepot::TEvResolveResult::TResolvedKey resolvedKey; resolvedKey.SetItemIndex(itemIndex); - resolvedKey.SetKey(key.data(), key.size()); + resolvedKey.SetKey(key.MakeBinaryKey()); resolvedKey.MutableValueChain()->CopyFrom(value.ValueChain); if (value.Meta) { @@ -58,18 +66,18 @@ namespace NKikimr::NBlobDepot { return !item.HasMaxKeys() || numItems != item.GetMaxKeys(); }; - TScanFlags flags; + TData::TScanFlags flags; if (item.GetIncludeBeginning()) { - flags |= EScanFlags::INCLUDE_BEGIN; + flags |= TData::EScanFlags::INCLUDE_BEGIN; } if (item.GetIncludeEnding()) { - flags |= EScanFlags::INCLUDE_END; + flags |= TData::EScanFlags::INCLUDE_END; } if (item.GetReverse()) { - flags |= EScanFlags::REVERSE; + flags |= TData::EScanFlags::REVERSE; } - ScanRange(begin, end, flags, addKey); + Data->ScanRange(item.HasBeginningKey() ? &begin : nullptr, item.HasEndingKey() ? &end : nullptr, flags, addKey); ++itemIndex; } diff --git a/ydb/core/blob_depot/schema.h b/ydb/core/blob_depot/schema.h index f5b2f123bb2..beeba0585d0 100644 --- a/ydb/core/blob_depot/schema.h +++ b/ydb/core/blob_depot/schema.h @@ -66,11 +66,29 @@ namespace NKikimr::NBlobDepot { >; }; + struct Trash : Table<5> { + struct BlobId : Column<1, NScheme::NTypeIds::String> {}; + + using TKey = TableKey<BlobId>; + using TColumns = TableColumns<BlobId>; + }; + + struct ConfirmedGC : Table<6> { + struct Channel : Column<1, NScheme::NTypeIds::Uint8> {}; + struct GroupId : Column<2, NScheme::NTypeIds::Uint32> {}; + struct ConfirmedGenStep : Column<3, NScheme::NTypeIds::Uint64> {}; + + using TKey = TableKey<Channel, GroupId>; + using TColumns = TableColumns<Channel, GroupId, ConfirmedGenStep>; + }; + using TTables = SchemaTables< Config, Blocks, Barriers, - Data + Data, + Trash, + ConfirmedGC >; using TSettings = SchemaSettings< diff --git a/ydb/core/blob_depot/types.h b/ydb/core/blob_depot/types.h index c3e65dd220b..763289ec8f3 100644 --- a/ydb/core/blob_depot/types.h +++ b/ydb/core/blob_depot/types.h @@ -103,4 +103,65 @@ namespace NKikimr::NBlobDepot { } }; + class TGivenIdRange { + struct TRange { + ui32 Len; + TDynBitMap Bits; + + TRange(ui32 len) + : Len(len) + { + Bits.Set(0, len); + } + }; + std::map<ui64, TRange> Ranges; // range.begin -> range + ui32 NumAvailableItems = 0; + + public: + TGivenIdRange() = default; + TGivenIdRange(const NKikimrBlobDepot::TGivenIdRange& proto); + + void ToProto(NKikimrBlobDepot::TGivenIdRange *proto); + + void Join(TGivenIdRange&& other); + + void IssueNewRange(ui64 begin, ui64 end); + void RemovePoint(ui64 value); + + bool IsEmpty() const; + ui32 GetNumAvailableItems() const; + ui64 Allocate(); + + void Output(IOutputStream& s) const; + TString ToString() const; + + private: + TRange& InsertNewRange(ui64 begin, ui64 len); + }; + + using TValueChain = NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TValueChain>; + + template<typename TCallback> + void EnumerateBlobsForValueChain(const TValueChain& valueChain, ui64 tabletId, TCallback&& callback) { + for (const auto& item : valueChain) { + const auto& locator = item.GetLocator(); + const auto& blobSeqId = TBlobSeqId::FromProto(locator.GetBlobSeqId()); + if (locator.GetTotalDataLen() + locator.GetFooterLen() > MaxBlobSize) { + callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_DATA_BLOB, 0, locator.GetTotalDataLen())); + callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_FOOTER_BLOB, 0, locator.GetFooterLen())); + } else { + callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_COMPOSITE_BLOB, 0, locator.GetTotalDataLen() + + locator.GetFooterLen())); + } + } + } + + inline ui64 GenStep(ui32 gen, ui32 step) { + return static_cast<ui64>(gen) << 32 | step; + } + + inline ui64 GenStep(TLogoBlobID id) { + return GenStep(id.Generation(), id.Step()); + } + } // NKikimr::NBlobDepot diff --git a/ydb/core/protos/blob_depot.proto b/ydb/core/protos/blob_depot.proto index 46ec82b9ec0..555430f272d 100644 --- a/ydb/core/protos/blob_depot.proto +++ b/ydb/core/protos/blob_depot.proto @@ -37,6 +37,18 @@ message TValue { optional bool Public = 4; } +message TGivenIdRange { + message TRange { + optional uint64 Begin = 1; + optional uint32 Len = 2; + repeated uint32 Offsets = 3; + repeated fixed64 BitMasks = 4; + } + repeated TRange Ranges = 1; +} + + + message TEvApplyConfig { optional uint64 TxId = 1; @@ -67,13 +79,13 @@ message TEvRegisterAgentResult { message TEvAllocateIds { optional NKikimrBlobDepot.TChannelKind.E ChannelKind = 1; + optional uint32 Count = 2; } message TEvAllocateIdsResult { optional NKikimrBlobDepot.TChannelKind.E ChannelKind = 1; optional uint32 Generation = 2; // executor generation, for validation purposes - optional uint64 RangeBegin = 3; // <Generation> <Step> <Channel> - optional uint64 RangeEnd = 4; + optional TGivenIdRange GivenIdRange = 3; } message TEvBlock { diff --git a/ydb/core/protos/blob_depot_config.proto b/ydb/core/protos/blob_depot_config.proto index 6858a242c20..4b77e4e0a7d 100644 --- a/ydb/core/protos/blob_depot_config.proto +++ b/ydb/core/protos/blob_depot_config.proto @@ -2,6 +2,7 @@ package NKikimrBlobDepot; message TChannelKind { enum E { + System = 0; Data = 1; Log = 2; } |