aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2022-07-07 21:08:05 +0300
committeralexvru <alexvru@ydb.tech>2022-07-07 21:08:05 +0300
commit996ba6df0ac5b5f030b5608ea4221286c8b8c122 (patch)
treeff422dec2f821585a849b12927aff0ec55828186
parent95d4076addb1b5489601ccf87d38119b3a651260 (diff)
downloadydb-996ba6df0ac5b5f030b5608ea4221286c8b8c122.tar.gz
Support proper garbage collection in BlobDepot
-rw-r--r--ydb/core/blob_depot/CMakeLists.txt2
-rw-r--r--ydb/core/blob_depot/agent.cpp50
-rw-r--r--ydb/core/blob_depot/agent/agent_impl.h20
-rw-r--r--ydb/core/blob_depot/agent/comm.cpp24
-rw-r--r--ydb/core/blob_depot/blob_depot.cpp46
-rw-r--r--ydb/core/blob_depot/blob_depot_tablet.h127
-rw-r--r--ydb/core/blob_depot/blocks.cpp419
-rw-r--r--ydb/core/blob_depot/blocks.h42
-rw-r--r--ydb/core/blob_depot/data.cpp353
-rw-r--r--ydb/core/blob_depot/data.h344
-rw-r--r--ydb/core/blob_depot/defs.h2
-rw-r--r--ydb/core/blob_depot/events.h2
-rw-r--r--ydb/core/blob_depot/garbage_collection.cpp303
-rw-r--r--ydb/core/blob_depot/garbage_collection.h42
-rw-r--r--ydb/core/blob_depot/given_id_range.cpp127
-rw-r--r--ydb/core/blob_depot/mon_main.cpp271
-rw-r--r--ydb/core/blob_depot/op_apply_config.cpp14
-rw-r--r--ydb/core/blob_depot/op_commit_blob_seq.cpp68
-rw-r--r--ydb/core/blob_depot/op_load.cpp75
-rw-r--r--ydb/core/blob_depot/op_resolve.cpp26
-rw-r--r--ydb/core/blob_depot/schema.h20
-rw-r--r--ydb/core/blob_depot/types.h61
-rw-r--r--ydb/core/protos/blob_depot.proto16
-rw-r--r--ydb/core/protos/blob_depot_config.proto1
24 files changed, 1793 insertions, 662 deletions
diff --git a/ydb/core/blob_depot/CMakeLists.txt b/ydb/core/blob_depot/CMakeLists.txt
index f393be16f28..236ee49e557 100644
--- a/ydb/core/blob_depot/CMakeLists.txt
+++ b/ydb/core/blob_depot/CMakeLists.txt
@@ -20,6 +20,8 @@ target_sources(ydb-core-blob_depot PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blocks.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/garbage_collection.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/given_id_range.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/mon_main.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/op_apply_config.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/op_init_schema.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/op_load.cpp
diff --git a/ydb/core/blob_depot/agent.cpp b/ydb/core/blob_depot/agent.cpp
index ff78ad2e75b..00d18d2168b 100644
--- a/ydb/core/blob_depot/agent.cpp
+++ b/ydb/core/blob_depot/agent.cpp
@@ -14,7 +14,7 @@ namespace NKikimr::NBlobDepot {
Y_VERIFY(it != PipeServerToNode.end());
if (const auto& nodeId = it->second) {
if (const auto agentIt = Agents.find(*nodeId); agentIt != Agents.end()) {
- if (TAgentInfo& agent = agentIt->second; agent.ConnectedAgent == it->first) {
+ if (TAgent& agent = agentIt->second; agent.ConnectedAgent == it->first) {
OnAgentDisconnect(agent);
agent.ConnectedAgent.reset();
agent.ConnectedNodeId = 0;
@@ -25,7 +25,7 @@ namespace NKikimr::NBlobDepot {
PipeServerToNode.erase(it);
}
- void TBlobDepot::OnAgentDisconnect(TAgentInfo& /*agent*/) {
+ void TBlobDepot::OnAgentDisconnect(TAgent& /*agent*/) {
}
void TBlobDepot::Handle(TEvBlobDepot::TEvRegisterAgent::TPtr ev) {
@@ -58,36 +58,52 @@ namespace NKikimr::NBlobDepot {
TActivationContext::Send(response.release());
}
- void TBlobDepot::OnAgentConnect(TAgentInfo& /*agent*/) {
+ void TBlobDepot::OnAgentConnect(TAgent& /*agent*/) {
}
void TBlobDepot::Handle(TEvBlobDepot::TEvAllocateIds::TPtr ev) {
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT04, "TEvAllocateIds", (TabletId, TabletID()), (Msg, ev->Get()->Record),
(PipeServerId, ev->Recipient));
- auto [response, record] = TEvBlobDepot::MakeResponseFor(*ev, SelfId(), ev->Get()->Record.GetChannelKind(),
- Executor()->Generation());
+ const ui32 generation = Executor()->Generation();
+
+ auto [response, record] = TEvBlobDepot::MakeResponseFor(*ev, SelfId(), ev->Get()->Record.GetChannelKind(), generation);
if (const auto it = ChannelKinds.find(record->GetChannelKind()); it != ChannelKinds.end()) {
- auto& nextBlobSeqId = it->second.NextBlobSeqId;
- record->SetRangeBegin(nextBlobSeqId);
- nextBlobSeqId += PreallocatedIdCount;
- record->SetRangeEnd(nextBlobSeqId);
+ auto& kind = it->second;
+
+ const ui64 rangeBegin = kind.NextBlobSeqId;
+ kind.NextBlobSeqId += ev->Get()->Record.GetCount();
+ const ui64 rangeEnd = kind.NextBlobSeqId;
+
+ TGivenIdRange range;
+ range.IssueNewRange(rangeBegin, rangeEnd);
+
+ range.ToProto(record->MutableGivenIdRange());
+
+ TAgent& agent = GetAgent(ev->Recipient);
+ agent.ChannelKinds[it->first].GivenIdRanges.Join(TGivenIdRange(range));
+ kind.GivenIdRanges.Join(std::move(range));
+
+ for (ui64 value = rangeBegin; value < rangeEnd; ++value) {
+ const auto blobSeqId = TBlobSeqId::FromBinary(generation, kind, value);
+ PerChannelRecords[blobSeqId.Channel].GivenStepIndex.emplace(blobSeqId.Step, blobSeqId.Index);
+ }
}
TActivationContext::Send(response.release());
}
- TBlobDepot::TAgentInfo& TBlobDepot::GetAgent(const TActorId& pipeServerId) {
+ TBlobDepot::TAgent& TBlobDepot::GetAgent(const TActorId& pipeServerId) {
const auto it = PipeServerToNode.find(pipeServerId);
Y_VERIFY(it != PipeServerToNode.end());
Y_VERIFY(it->second);
- TAgentInfo& agent = GetAgent(*it->second);
+ TAgent& agent = GetAgent(*it->second);
Y_VERIFY(agent.ConnectedAgent == pipeServerId);
return agent;
}
- TBlobDepot::TAgentInfo& TBlobDepot::GetAgent(ui32 nodeId) {
+ TBlobDepot::TAgent& TBlobDepot::GetAgent(ui32 nodeId) {
const auto agentIt = Agents.find(nodeId);
Y_VERIFY(agentIt != Agents.end());
return agentIt->second;
@@ -97,6 +113,9 @@ namespace NKikimr::NBlobDepot {
TTabletStorageInfo *info = Info();
const ui32 generation = Executor()->Generation();
+ Y_VERIFY(ChannelToKind.empty());
+ ChannelToKind.resize(info->Channels.size(), NKikimrBlobDepot::TChannelKind::System);
+
ui32 channel = 0;
for (const auto& profile : Config.GetChannelProfiles()) {
for (ui32 i = 0, count = profile.GetCount(); i < count; ++i, ++channel) {
@@ -105,9 +124,16 @@ namespace NKikimr::NBlobDepot {
auto& p = ChannelKinds[kind];
p.ChannelToIndex[channel] = p.ChannelGroups.size();
p.ChannelGroups.emplace_back(channel, info->GroupFor(channel, generation));
+
+ Y_VERIFY(channel < ChannelToKind.size());
+ ChannelToKind[channel] = kind;
}
}
}
+
+ Y_VERIFY_S(channel == ChannelToKind.size(), "channel# " << channel
+ << " ChannelToKind.size# " << ChannelToKind.size());
+
for (auto& [k, v] : ChannelKinds) {
v.NextBlobSeqId = TBlobSeqId{v.ChannelGroups.front().first, generation, 1, 0}.ToBinary(v);
}
diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h
index 95216de7d64..75c4b88bdbb 100644
--- a/ydb/core/blob_depot/agent/agent_impl.h
+++ b/ydb/core/blob_depot/agent/agent_impl.h
@@ -246,16 +246,10 @@ namespace NKikimr::NBlobDepot {
struct TChannelKind
: NBlobDepot::TChannelKind
{
- struct TAllocatedId {
- ui32 Generation;
- ui64 Begin;
- ui64 End;
- };
-
const NKikimrBlobDepot::TChannelKind::E Kind;
bool IdAllocInFlight = false;
- std::deque<TAllocatedId> IdQ;
+ TGivenIdRange GivenIdRange;
static constexpr size_t PreallocatedIdCount = 2;
TIntrusiveList<TQuery, TPendingId> QueriesWaitingForId;
@@ -265,17 +259,11 @@ namespace NKikimr::NBlobDepot {
{}
std::optional<TBlobSeqId> Allocate(TBlobDepotAgent& agent) {
- if (IdQ.empty()) {
+ if (GivenIdRange.IsEmpty()) {
return std::nullopt;
}
-
- auto& item = IdQ.front();
- auto blobSeqId = TBlobSeqId::FromBinary(item.Generation, *this, item.Begin++);
- if (item.Begin == item.End) {
- IdQ.pop_front();
- agent.IssueAllocateIdsIfNeeded(*this);
- }
-
+ auto blobSeqId = TBlobSeqId::FromBinary(agent.BlobDepotGeneration, *this, GivenIdRange.Allocate());
+ agent.IssueAllocateIdsIfNeeded(*this);
return blobSeqId;
}
diff --git a/ydb/core/blob_depot/agent/comm.cpp b/ydb/core/blob_depot/agent/comm.cpp
index aa8abeff2ba..8aa8edf9dc3 100644
--- a/ydb/core/blob_depot/agent/comm.cpp
+++ b/ydb/core/blob_depot/agent/comm.cpp
@@ -18,6 +18,8 @@ namespace NKikimr::NBlobDepot {
void TBlobDepotAgent::ConnectToBlobDepot() {
PipeId = Register(NTabletPipe::CreateClient(SelfId(), TabletId, NTabletPipe::TClientRetryPolicy::WithRetries()));
const ui64 id = NextRequestId++;
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA04, "ConnectToBlobDepot", (VirtualGroupId, VirtualGroupId),
+ (PipeId, PipeId), (RequestId, id));
NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobDepot::TEvRegisterAgent(VirtualGroupId), id);
RegisterRequest(id, this, nullptr, {}, true);
}
@@ -60,13 +62,13 @@ namespace NKikimr::NBlobDepot {
}
void TBlobDepotAgent::IssueAllocateIdsIfNeeded(TChannelKind& kind) {
- STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA05, "IssueAllocateIdsIfNeeded", (VirtualGroupId, VirtualGroupId),
- (ChannelKind, NKikimrBlobDepot::TChannelKind::E_Name(kind.Kind)),
- (IdAllocInFlight, kind.IdAllocInFlight), (IdQ.size, kind.IdQ.size()),
- (PreallocatedIdCount, kind.PreallocatedIdCount), (PipeId, PipeId));
- if (!kind.IdAllocInFlight && kind.IdQ.size() < kind.PreallocatedIdCount && PipeId) {
+ if (!kind.IdAllocInFlight && kind.GivenIdRange.GetNumAvailableItems() < 100 && PipeId) {
const ui64 id = NextRequestId++;
- NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobDepot::TEvAllocateIds(kind.Kind), id);
+ STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA05, "IssueAllocateIdsIfNeeded", (VirtualGroupId, VirtualGroupId),
+ (ChannelKind, NKikimrBlobDepot::TChannelKind::E_Name(kind.Kind)),
+ (IdAllocInFlight, kind.IdAllocInFlight), (NumAvailableItems, kind.GivenIdRange.GetNumAvailableItems()),
+ (PreallocatedIdCount, kind.PreallocatedIdCount), (RequestId, id));
+ NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobDepot::TEvAllocateIds(kind.Kind, 100), id);
RegisterRequest(id, this, std::make_shared<TAllocateIdsContext>(kind.Kind), {}, true);
kind.IdAllocInFlight = true;
}
@@ -78,7 +80,8 @@ namespace NKikimr::NBlobDepot {
auto& allocateIdsContext = context->Obtain<TAllocateIdsContext>();
const auto it = ChannelKinds.find(allocateIdsContext.ChannelKind);
- Y_VERIFY_S(it != ChannelKinds.end(), "Kind# " << NKikimrBlobDepot::TChannelKind::E_Name(allocateIdsContext.ChannelKind));
+ Y_VERIFY_S(it != ChannelKinds.end(), "Kind# " << NKikimrBlobDepot::TChannelKind::E_Name(allocateIdsContext.ChannelKind)
+ << " Msg# " << SingleLineProto(msg));
auto& kind = it->second;
Y_VERIFY(kind.IdAllocInFlight);
@@ -87,12 +90,11 @@ namespace NKikimr::NBlobDepot {
Y_VERIFY(msg.GetChannelKind() == allocateIdsContext.ChannelKind);
Y_VERIFY(msg.GetGeneration() == BlobDepotGeneration);
- if (msg.HasRangeBegin() && msg.HasRangeEnd()) {
- kind.IdQ.push_back({BlobDepotGeneration, msg.GetRangeBegin(), msg.GetRangeEnd()});
+ if (msg.HasGivenIdRange()) {
+ TGivenIdRange range(msg.GetGivenIdRange());
+ kind.GivenIdRange.Join(std::move(range));
kind.ProcessQueriesWaitingForId();
IssueAllocateIdsIfNeeded(kind);
- } else {
- // no such channel allocated
}
}
diff --git a/ydb/core/blob_depot/blob_depot.cpp b/ydb/core/blob_depot/blob_depot.cpp
index ee70e4b323a..7995c10aadd 100644
--- a/ydb/core/blob_depot/blob_depot.cpp
+++ b/ydb/core/blob_depot/blob_depot.cpp
@@ -1,8 +1,54 @@
#include "blob_depot.h"
#include "blob_depot_tablet.h"
+#include "blocks.h"
+#include "garbage_collection.h"
+#include "data.h"
namespace NKikimr::NBlobDepot {
+ TBlobDepot::TBlobDepot(TActorId tablet, TTabletStorageInfo *info)
+ : TActor(&TThis::StateInit)
+ , TTabletExecutedFlat(info, tablet, new NMiniKQL::TMiniKQLFactory)
+ , BlocksManager(new TBlocksManager(this))
+ , BarrierServer(new TBarrierServer(this))
+ , Data(new TData(this))
+ {}
+
+ TBlobDepot::~TBlobDepot()
+ {}
+
+ STFUNC(TBlobDepot::StateWork) {
+ try {
+ switch (const ui32 type = ev->GetTypeRewrite()) {
+ cFunc(TEvents::TSystem::Poison, HandlePoison);
+
+ hFunc(TEvBlobDepot::TEvApplyConfig, Handle);
+ hFunc(TEvBlobDepot::TEvRegisterAgent, Handle);
+ hFunc(TEvBlobDepot::TEvAllocateIds, Handle);
+ hFunc(TEvBlobDepot::TEvCommitBlobSeq, Handle);
+ hFunc(TEvBlobDepot::TEvResolve, Handle);
+
+ hFunc(TEvBlobDepot::TEvBlock, BlocksManager->Handle);
+ hFunc(TEvBlobDepot::TEvQueryBlocks, BlocksManager->Handle);
+
+ hFunc(TEvBlobDepot::TEvCollectGarbage, BarrierServer->Handle);
+
+ hFunc(TEvBlobStorage::TEvCollectGarbageResult, Data->Handle);
+
+ hFunc(TEvTabletPipe::TEvServerConnected, Handle);
+ hFunc(TEvTabletPipe::TEvServerDisconnected, Handle);
+
+ default:
+ if (!HandleDefaultEvents(ev, ctx)) {
+ Y_FAIL("unexpected event Type# 0x%08" PRIx32, type);
+ }
+ break;
+ }
+ } catch (...) {
+ Y_FAIL_S("unexpected exception# " << CurrentExceptionMessage());
+ }
+ }
+
IActor *CreateBlobDepot(const TActorId& tablet, TTabletStorageInfo *info) {
return new TBlobDepot(tablet, info);
}
diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h
index b1ef30bf076..25374384e77 100644
--- a/ydb/core/blob_depot/blob_depot_tablet.h
+++ b/ydb/core/blob_depot/blob_depot_tablet.h
@@ -19,13 +19,8 @@ namespace NKikimr::NBlobDepot {
};
public:
- TBlobDepot(TActorId tablet, TTabletStorageInfo *info)
- : TActor(&TThis::StateInit)
- , TTabletExecutedFlat(info, tablet, new NMiniKQL::TMiniKQLFactory)
- , BlocksManager(CreateBlocksManager())
- , GarbageCollectionManager(CreateGarbageCollectionManager())
- , DataManager(CreateDataManager())
- {}
+ TBlobDepot(TActorId tablet, TTabletStorageInfo *info);
+ ~TBlobDepot();
void HandlePoison() {
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT09, "HandlePoison", (TabletId, TabletID()));
@@ -36,33 +31,45 @@ namespace NKikimr::NBlobDepot {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
static constexpr TDuration ExpirationTimeout = TDuration::Minutes(1);
- static constexpr ui32 PreallocatedIdCount = 100;
- struct TAgentInfo {
+ struct TAgent {
std::optional<TActorId> ConnectedAgent;
ui32 ConnectedNodeId;
TInstant ExpirationTimestamp;
+
+ struct TChannelKind {
+ TGivenIdRange GivenIdRanges; // updated on AllocateIds and when BlobSeqIds are found in any way
+ };
+
+ THashMap<NKikimrBlobDepot::TChannelKind::E, TChannelKind> ChannelKinds;
};
THashMap<TActorId, std::optional<ui32>> PipeServerToNode;
- THashMap<ui32, TAgentInfo> Agents; // NodeId -> Agent
+ THashMap<ui32, TAgent> Agents; // NodeId -> Agent
struct TChannelKind
: NBlobDepot::TChannelKind
{
ui64 NextBlobSeqId = 0;
+ TGivenIdRange GivenIdRanges; // for all agents, including disconnected ones
};
THashMap<NKikimrBlobDepot::TChannelKind::E, TChannelKind> ChannelKinds;
+ std::vector<NKikimrBlobDepot::TChannelKind::E> ChannelToKind;
+
+ struct TPerChannelRecord {
+ std::set<std::tuple<ui32, ui32>> GivenStepIndex;
+ };
+ THashMap<ui8, TPerChannelRecord> PerChannelRecords;
void Handle(TEvTabletPipe::TEvServerConnected::TPtr ev);
void Handle(TEvTabletPipe::TEvServerDisconnected::TPtr ev);
- void OnAgentDisconnect(TAgentInfo& agent);
+ void OnAgentDisconnect(TAgent& agent);
void Handle(TEvBlobDepot::TEvRegisterAgent::TPtr ev);
- void OnAgentConnect(TAgentInfo& agent);
+ void OnAgentConnect(TAgent& agent);
void Handle(TEvBlobDepot::TEvAllocateIds::TPtr ev);
- TAgentInfo& GetAgent(const TActorId& pipeServerId);
- TAgentInfo& GetAgent(ui32 nodeId);
+ TAgent& GetAgent(const TActorId& pipeServerId);
+ TAgent& GetAgent(ui32 nodeId);
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -111,35 +118,7 @@ namespace NKikimr::NBlobDepot {
StateInitImpl(ev, ctx);
}
- STFUNC(StateWork) {
- try {
- switch (const ui32 type = ev->GetTypeRewrite()) {
- cFunc(TEvents::TSystem::Poison, HandlePoison);
-
- hFunc(TEvBlobDepot::TEvApplyConfig, Handle);
- hFunc(TEvBlobDepot::TEvRegisterAgent, Handle);
- hFunc(TEvBlobDepot::TEvAllocateIds, Handle);
- hFunc(TEvBlobDepot::TEvCommitBlobSeq, Handle);
- hFunc(TEvBlobDepot::TEvResolve, Handle);
-
- hFunc(TEvBlobDepot::TEvBlock, Handle);
- hFunc(TEvBlobDepot::TEvQueryBlocks, Handle);
-
- hFunc(TEvBlobDepot::TEvCollectGarbage, Handle);
-
- hFunc(TEvTabletPipe::TEvServerConnected, Handle);
- hFunc(TEvTabletPipe::TEvServerDisconnected, Handle);
-
- default:
- if (!HandleDefaultEvents(ev, ctx)) {
- Y_FAIL("unexpected event Type# 0x%08" PRIx32, type);
- }
- break;
- }
- } catch (...) {
- Y_FAIL_S("unexpected exception# " << CurrentExceptionMessage());
- }
- }
+ void StateWork(STFUNC_SIG);
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -167,67 +146,31 @@ namespace NKikimr::NBlobDepot {
// Blocks
class TBlocksManager;
- using TBlocksManagerPtr = std::unique_ptr<TBlocksManager, std::function<void(TBlocksManager*)>>;
- TBlocksManagerPtr BlocksManager;
-
- TBlocksManagerPtr CreateBlocksManager();
-
- void AddBlockOnLoad(ui64 tabletId, ui32 blockedGeneration);
-
- void Handle(TEvBlobDepot::TEvBlock::TPtr ev);
- void Handle(TEvBlobDepot::TEvQueryBlocks::TPtr ev);
+ std::unique_ptr<TBlocksManager> BlocksManager;
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Garbage collection
- class TGarbageCollectionManager;
- using TGarbageCollectionManagerPtr = std::unique_ptr<TGarbageCollectionManager, std::function<void(TGarbageCollectionManager*)>>;
- TGarbageCollectionManagerPtr GarbageCollectionManager;
-
- TGarbageCollectionManagerPtr CreateGarbageCollectionManager();
-
- void Handle(TEvBlobDepot::TEvCollectGarbage::TPtr ev);
-
- bool CheckBlobForBarrier(TLogoBlobID id) const;
+ class TBarrierServer;
+ std::unique_ptr<TBarrierServer> BarrierServer;
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Data operations
- class TDataManager;
- using TDataManagerPtr = std::unique_ptr<TDataManager, std::function<void(TDataManager*)>>;
- TDataManagerPtr DataManager;
-
- TDataManagerPtr CreateDataManager();
-
- using TValueChain = NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TValueChain>;
+ class TData;
+ std::unique_ptr<TData> Data;
- struct TDataValue {
- TString Meta;
- TValueChain ValueChain;
- NKikimrBlobDepot::EKeepState KeepState;
- bool Public;
- };
-
- enum EScanFlags : ui32 {
- INCLUDE_BEGIN = 1,
- INCLUDE_END = 2,
- REVERSE = 4,
- };
+ void Handle(TEvBlobDepot::TEvCommitBlobSeq::TPtr ev);
+ void Handle(TEvBlobDepot::TEvResolve::TPtr ev);
- Y_DECLARE_FLAGS(TScanFlags, EScanFlags)
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Monitoring
- std::optional<TDataValue> FindKey(TStringBuf key);
- void ScanRange(const std::optional<TStringBuf>& begin, const std::optional<TStringBuf>& end, TScanFlags flags,
- const std::function<bool(TStringBuf, const TDataValue&)>& callback);
- void DeleteKey(TStringBuf key);
- void PutKey(TString key, TDataValue&& data);
- void AddDataOnLoad(TString key, TString value);
- std::optional<TString> UpdateKeepState(TStringBuf key, NKikimrBlobDepot::EKeepState keepState);
+ class TTxMonData;
+
+ bool OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TActorContext&) override;
- void Handle(TEvBlobDepot::TEvCommitBlobSeq::TPtr ev);
- void Handle(TEvBlobDepot::TEvResolve::TPtr ev);
+ void RenderMainPage(IOutputStream& s);
};
- Y_DECLARE_OPERATORS_FOR_FLAGS(TBlobDepot::TScanFlags)
-
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/blocks.cpp b/ydb/core/blob_depot/blocks.cpp
index 2bcb4ce0a77..489b99045ec 100644
--- a/ydb/core/blob_depot/blocks.cpp
+++ b/ydb/core/blob_depot/blocks.cpp
@@ -1,278 +1,233 @@
-#include "blob_depot_tablet.h"
+#include "blocks.h"
#include "schema.h"
namespace NKikimr::NBlobDepot {
- class TBlobDepot::TBlocksManager {
- // wait duration before issuing blocks via storage
- static constexpr TDuration AgentsWaitTime = TDuration::Seconds(1);
+ class TBlobDepot::TBlocksManager::TTxUpdateBlock : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
+ const ui64 TabletId;
+ const ui32 BlockedGeneration;
+ const ui32 NodeId;
+ const TInstant Timestamp;
+ std::unique_ptr<IEventHandle> Response;
- // TTL for block lease
- static constexpr TDuration BlockLeaseTime = TDuration::Seconds(60);
-
- struct TBlock {
- struct TPerAgentInfo {
- TMonotonic ExpirationTimestamp = TMonotonic::Zero();
- };
-
- ui32 BlockedGeneration = 0;
- THashMap<ui32, TPerAgentInfo> PerAgentInfo;
- };
+ public:
+ TTxUpdateBlock(TBlobDepot *self, ui64 tabletId, ui32 blockedGeneration, ui32 nodeId, TInstant timestamp,
+ std::unique_ptr<IEventHandle> response)
+ : TTransactionBase(self)
+ , TabletId(tabletId)
+ , BlockedGeneration(blockedGeneration)
+ , NodeId(nodeId)
+ , Timestamp(timestamp)
+ , Response(std::move(response))
+ {}
- TBlobDepot* const Self;
- THashMap<ui64, TBlock> Blocks;
-
- private:
- class TTxUpdateBlock : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
- const ui64 TabletId;
- const ui32 BlockedGeneration;
- const ui32 NodeId;
- const TInstant Timestamp;
- std::unique_ptr<IEventHandle> Response;
-
- public:
- TTxUpdateBlock(TBlobDepot *self, ui64 tabletId, ui32 blockedGeneration, ui32 nodeId, TInstant timestamp,
- std::unique_ptr<IEventHandle> response)
- : TTransactionBase(self)
- , TabletId(tabletId)
- , BlockedGeneration(blockedGeneration)
- , NodeId(nodeId)
- , Timestamp(timestamp)
- , Response(std::move(response))
- {}
-
- bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ auto& block = Self->BlocksManager->Blocks[TabletId];
+ if (BlockedGeneration <= block.BlockedGeneration) {
+ Response->Get<TEvBlobDepot::TEvBlockResult>()->Record.SetStatus(NKikimrProto::ALREADY);
+ } else {
+ // update block value in memory
auto& block = Self->BlocksManager->Blocks[TabletId];
- if (BlockedGeneration <= block.BlockedGeneration) {
- Response->Get<TEvBlobDepot::TEvBlockResult>()->Record.SetStatus(NKikimrProto::ALREADY);
- } else {
- // update block value in memory
- auto& block = Self->BlocksManager->Blocks[TabletId];
- block.BlockedGeneration = BlockedGeneration;
-
- // and persist it
- NIceDb::TNiceDb db(txc.DB);
- db.Table<Schema::Blocks>().Key(TabletId).Update(
- NIceDb::TUpdate<Schema::Blocks::BlockedGeneration>(BlockedGeneration),
- NIceDb::TUpdate<Schema::Blocks::IssuedByNode>(NodeId),
- NIceDb::TUpdate<Schema::Blocks::IssueTimestamp>(Timestamp)
- );
- }
- return true;
- }
-
- void Complete(const TActorContext&) override {
- if (Response->Get<TEvBlobDepot::TEvBlockResult>()->Record.GetStatus() != NKikimrProto::OK) {
- TActivationContext::Send(Response.release());
- } else {
- Self->BlocksManager->OnBlockCommitted(TabletId, BlockedGeneration, NodeId, std::move(Response));
- }
- }
- };
-
- class TBlockProcessorActor : public TActorBootstrapped<TBlockProcessorActor> {
- TBlobDepot* const Self;
- const ui64 TabletId;
- const ui32 BlockedGeneration;
- const ui32 NodeId;
- std::unique_ptr<IEventHandle> Response;
- ui32 BlocksPending = 0;
- ui32 RetryCount = 0;
- const ui64 IssuerGuid = RandomNumber<ui64>() | 1;
- THashSet<ui32> NodesWaitingForPushResult;
-
- public:
- TBlockProcessorActor(TBlobDepot *self, ui64 tabletId, ui32 blockedGeneration, ui32 nodeId,
- std::unique_ptr<IEventHandle> response)
- : Self(self)
- , TabletId(tabletId)
- , BlockedGeneration(blockedGeneration)
- , NodeId(nodeId)
- , Response(std::move(response))
- {}
-
- void Bootstrap() {
- IssueNotificationsToAgents();
- Become(&TThis::StateFunc, AgentsWaitTime, new TEvents::TEvWakeup);
+ block.BlockedGeneration = BlockedGeneration;
+
+ // and persist it
+ NIceDb::TNiceDb db(txc.DB);
+ db.Table<Schema::Blocks>().Key(TabletId).Update(
+ NIceDb::TUpdate<Schema::Blocks::BlockedGeneration>(BlockedGeneration),
+ NIceDb::TUpdate<Schema::Blocks::IssuedByNode>(NodeId),
+ NIceDb::TUpdate<Schema::Blocks::IssueTimestamp>(Timestamp)
+ );
}
+ return true;
+ }
- void IssueNotificationsToAgents() {
- const TMonotonic now = TActivationContext::Monotonic();
- auto& block = Self->BlocksManager->Blocks[TabletId];
- for (const auto& [agentId, info] : block.PerAgentInfo) {
- if (agentId == NodeId) {
- // skip the origin agent
- continue;
- }
- if (info.ExpirationTimestamp <= now) {
- SendPushToAgent(agentId);
- }
- }
+ void Complete(const TActorContext&) override {
+ if (Response->Get<TEvBlobDepot::TEvBlockResult>()->Record.GetStatus() != NKikimrProto::OK) {
+ TActivationContext::Send(Response.release());
+ } else {
+ Self->BlocksManager->OnBlockCommitted(TabletId, BlockedGeneration, NodeId, std::move(Response));
}
+ }
+ };
- void SendPushToAgent(ui32 agentId) {
- auto ev = std::make_unique<TEvBlobDepot::TEvPushNotify>();
- auto *item = ev->Record.AddBlockedTablets();
- item->SetTabletId(TabletId);
- item->SetBlockedGeneration(BlockedGeneration);
-
- TAgentInfo& agent = Self->GetAgent(agentId);
- if (const auto& actorId = agent.ConnectedAgent) {
- Send(*actorId, ev.release(), IEventHandle::FlagTrackDelivery, IssuerGuid);
- }
- NodesWaitingForPushResult.insert(agentId);
- }
+ class TBlobDepot::TBlocksManager::TBlockProcessorActor : public TActorBootstrapped<TBlockProcessorActor> {
+ TBlobDepot* const Self;
+ const ui64 TabletId;
+ const ui32 BlockedGeneration;
+ const ui32 NodeId;
+ std::unique_ptr<IEventHandle> Response;
+ ui32 BlocksPending = 0;
+ ui32 RetryCount = 0;
+ const ui64 IssuerGuid = RandomNumber<ui64>() | 1;
+ THashSet<ui32> NodesWaitingForPushResult;
- void Handle(TEvBlobDepot::TEvPushNotifyResult::TPtr ev) {
- const ui32 agentId = ev->Sender.NodeId();
- const size_t numErased = NodesWaitingForPushResult.erase(agentId);
- Y_VERIFY(numErased == 1 && ev->Cookie == IssuerGuid);
+ public:
+ TBlockProcessorActor(TBlobDepot *self, ui64 tabletId, ui32 blockedGeneration, ui32 nodeId,
+ std::unique_ptr<IEventHandle> response)
+ : Self(self)
+ , TabletId(tabletId)
+ , BlockedGeneration(blockedGeneration)
+ , NodeId(nodeId)
+ , Response(std::move(response))
+ {}
- // mark lease as successfully revoked one
- auto& block = Self->BlocksManager->Blocks[TabletId];
- block.PerAgentInfo.erase(agentId);
+ void Bootstrap() {
+ IssueNotificationsToAgents();
+ Become(&TThis::StateFunc, AgentsWaitTime, new TEvents::TEvWakeup);
+ }
- if (NodesWaitingForPushResult.empty()) {
- Finish();
+ void IssueNotificationsToAgents() {
+ const TMonotonic now = TActivationContext::Monotonic();
+ auto& block = Self->BlocksManager->Blocks[TabletId];
+ for (const auto& [agentId, info] : block.PerAgentInfo) {
+ if (agentId == NodeId) {
+ // skip the origin agent
+ continue;
}
- }
-
- void Handle(TEvents::TEvUndelivered::TPtr /*ev*/) {
- // can't reach an agent to notify it about blocked generation change -- we can't do anything here
- }
-
- void IssueBlocksToStorage() {
- for (const auto& [_, kind] : Self->ChannelKinds) {
- for (const auto& [channel, groupId] : kind.ChannelGroups) {
- // FIXME: consider previous group generations (because agent can write in obsolete tablet generation)
- // !!!!!!!!!!!
- SendBlock(groupId);
- ++BlocksPending;
- RetryCount += 2;
- }
+ if (info.ExpirationTimestamp <= now) {
+ SendPushToAgent(agentId);
}
}
+ }
- void SendBlock(ui32 groupId) {
- SendToBSProxy(SelfId(), groupId, new TEvBlobStorage::TEvBlock(TabletId, BlockedGeneration,
- TInstant::Max(), IssuerGuid), groupId);
- }
+ void SendPushToAgent(ui32 agentId) {
+ auto ev = std::make_unique<TEvBlobDepot::TEvPushNotify>();
+ auto *item = ev->Record.AddBlockedTablets();
+ item->SetTabletId(TabletId);
+ item->SetBlockedGeneration(BlockedGeneration);
- void Handle(TEvBlobStorage::TEvBlockResult::TPtr ev) {
- switch (ev->Get()->Status) {
- case NKikimrProto::OK:
- if (!--BlocksPending) {
- Finish();
- }
- break;
-
- case NKikimrProto::ALREADY:
- // race, but this is not possible in current implementation
- Y_FAIL();
-
- case NKikimrProto::ERROR:
- default:
- if (!--RetryCount) {
- auto& r = Response->Get<TEvBlobDepot::TEvBlockResult>()->Record;
- r.SetStatus(NKikimrProto::ERROR);
- r.SetErrorReason(ev->Get()->ErrorReason);
- Finish();
- } else {
- SendBlock(ev->Cookie);
- }
- break;
- }
+ TAgent& agent = Self->GetAgent(agentId);
+ if (const auto& actorId = agent.ConnectedAgent) {
+ Send(*actorId, ev.release(), IEventHandle::FlagTrackDelivery, IssuerGuid);
}
+ NodesWaitingForPushResult.insert(agentId);
+ }
- void Finish() {
- TActivationContext::Send(Response.release());
- PassAway();
- }
+ void Handle(TEvBlobDepot::TEvPushNotifyResult::TPtr ev) {
+ const ui32 agentId = ev->Sender.NodeId();
+ const size_t numErased = NodesWaitingForPushResult.erase(agentId);
+ Y_VERIFY(numErased == 1 && ev->Cookie == IssuerGuid);
- STRICT_STFUNC(StateFunc,
- hFunc(TEvBlobStorage::TEvBlockResult, Handle);
- hFunc(TEvBlobDepot::TEvPushNotifyResult, Handle);
- hFunc(TEvents::TEvUndelivered, Handle);
- cFunc(TEvents::TSystem::Wakeup, IssueBlocksToStorage);
- cFunc(TEvents::TSystem::Poison, PassAway);
- )
- };
+ // mark lease as successfully revoked one
+ auto& block = Self->BlocksManager->Blocks[TabletId];
+ block.PerAgentInfo.erase(agentId);
- public:
- TBlocksManager(TBlobDepot *self)
- : Self(self)
- {}
-
- void AddBlockOnLoad(ui64 tabletId, ui32 generation) {
- Blocks[tabletId].BlockedGeneration = generation;
+ if (NodesWaitingForPushResult.empty()) {
+ Finish();
+ }
}
- void OnBlockCommitted(ui64 tabletId, ui32 blockedGeneration, ui32 nodeId, std::unique_ptr<IEventHandle> response) {
- Self->RegisterWithSameMailbox(new TBlockProcessorActor(Self, tabletId, blockedGeneration, nodeId,
- std::move(response)));
+ void Handle(TEvents::TEvUndelivered::TPtr /*ev*/) {
+ // can't reach an agent to notify it about blocked generation change -- we can't do anything here
}
- void Handle(TEvBlobDepot::TEvBlock::TPtr ev) {
- const auto& record = ev->Get()->Record;
- auto [response, responseRecord] = TEvBlobDepot::MakeResponseFor(*ev, Self->SelfId(), NKikimrProto::OK,
- std::nullopt, BlockLeaseTime.MilliSeconds());
-
- if (!record.HasTabletId() || !record.HasBlockedGeneration()) {
- responseRecord->SetStatus(NKikimrProto::ERROR);
- responseRecord->SetErrorReason("incorrect protobuf");
- } else {
- const ui64 tabletId = record.GetTabletId();
- const ui32 blockedGeneration = record.GetBlockedGeneration();
- if (const auto it = Blocks.find(tabletId); it != Blocks.end() && blockedGeneration <= it->second.BlockedGeneration) {
- responseRecord->SetStatus(NKikimrProto::ALREADY);
- } else {
- TAgentInfo& agent = Self->GetAgent(ev->Recipient);
- Self->Execute(std::make_unique<TTxUpdateBlock>(Self, tabletId, blockedGeneration,
- agent.ConnectedNodeId, TActivationContext::Now(), std::move(response)));
+ void IssueBlocksToStorage() {
+ for (const auto& [_, kind] : Self->ChannelKinds) {
+ for (const auto& [channel, groupId] : kind.ChannelGroups) {
+ // FIXME: consider previous group generations (because agent can write in obsolete tablet generation)
+ // !!!!!!!!!!!
+ SendBlock(groupId);
+ ++BlocksPending;
+ RetryCount += 2;
}
}
-
- TActivationContext::Send(response.release()); // not sent if the request got processed and response now is nullptr
}
- void Handle(TEvBlobDepot::TEvQueryBlocks::TPtr ev) {
- TAgentInfo& agent = Self->GetAgent(ev->Recipient);
- const ui32 agentId = agent.ConnectedNodeId;
- Y_VERIFY(agentId);
-
- const TMonotonic now = TActivationContext::Monotonic();
-
- const auto& record = ev->Get()->Record;
- auto [response, responseRecord] = TEvBlobDepot::MakeResponseFor(*ev, Self->SelfId());
- responseRecord->SetTimeToLiveMs(BlockLeaseTime.MilliSeconds());
+ void SendBlock(ui32 groupId) {
+ SendToBSProxy(SelfId(), groupId, new TEvBlobStorage::TEvBlock(TabletId, BlockedGeneration,
+ TInstant::Max(), IssuerGuid), groupId);
+ }
- for (const ui64 tabletId : record.GetTabletIds()) {
- auto& block = Blocks[tabletId];
- responseRecord->AddBlockedGenerations(block.BlockedGeneration);
- block.PerAgentInfo[agentId].ExpirationTimestamp = now + BlockLeaseTime;
+ void Handle(TEvBlobStorage::TEvBlockResult::TPtr ev) {
+ switch (ev->Get()->Status) {
+ case NKikimrProto::OK:
+ if (!--BlocksPending) {
+ Finish();
+ }
+ break;
+
+ case NKikimrProto::ALREADY:
+ // race, but this is not possible in current implementation
+ Y_FAIL();
+
+ case NKikimrProto::ERROR:
+ default:
+ if (!--RetryCount) {
+ auto& r = Response->Get<TEvBlobDepot::TEvBlockResult>()->Record;
+ r.SetStatus(NKikimrProto::ERROR);
+ r.SetErrorReason(ev->Get()->ErrorReason);
+ Finish();
+ } else {
+ SendBlock(ev->Cookie);
+ }
+ break;
}
+ }
- TActivationContext::Send(response.release());
+ void Finish() {
+ TActivationContext::Send(Response.release());
+ PassAway();
}
- };
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // TBlocksManager wrapper
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvBlobStorage::TEvBlockResult, Handle);
+ hFunc(TEvBlobDepot::TEvPushNotifyResult, Handle);
+ hFunc(TEvents::TEvUndelivered, Handle);
+ cFunc(TEvents::TSystem::Wakeup, IssueBlocksToStorage);
+ cFunc(TEvents::TSystem::Poison, PassAway);
+ )
+ };
- TBlobDepot::TBlocksManagerPtr TBlobDepot::CreateBlocksManager() {
- return {new TBlocksManager{this}, std::default_delete<TBlocksManager>{}};
+ void TBlobDepot::TBlocksManager::AddBlockOnLoad(ui64 tabletId, ui32 generation) {
+ Blocks[tabletId].BlockedGeneration = generation;
}
- void TBlobDepot::AddBlockOnLoad(ui64 tabletId, ui32 generation) {
- BlocksManager->AddBlockOnLoad(tabletId, generation);
+ void TBlobDepot::TBlocksManager::OnBlockCommitted(ui64 tabletId, ui32 blockedGeneration, ui32 nodeId, std::unique_ptr<IEventHandle> response) {
+ Self->RegisterWithSameMailbox(new TBlockProcessorActor(Self, tabletId, blockedGeneration, nodeId,
+ std::move(response)));
}
- void TBlobDepot::Handle(TEvBlobDepot::TEvBlock::TPtr ev) {
- return BlocksManager->Handle(ev);
+ void TBlobDepot::TBlocksManager::Handle(TEvBlobDepot::TEvBlock::TPtr ev) {
+ const auto& record = ev->Get()->Record;
+ auto [response, responseRecord] = TEvBlobDepot::MakeResponseFor(*ev, Self->SelfId(), NKikimrProto::OK,
+ std::nullopt, BlockLeaseTime.MilliSeconds());
+
+ if (!record.HasTabletId() || !record.HasBlockedGeneration()) {
+ responseRecord->SetStatus(NKikimrProto::ERROR);
+ responseRecord->SetErrorReason("incorrect protobuf");
+ } else {
+ const ui64 tabletId = record.GetTabletId();
+ const ui32 blockedGeneration = record.GetBlockedGeneration();
+ if (const auto it = Blocks.find(tabletId); it != Blocks.end() && blockedGeneration <= it->second.BlockedGeneration) {
+ responseRecord->SetStatus(NKikimrProto::ALREADY);
+ } else {
+ TAgent& agent = Self->GetAgent(ev->Recipient);
+ Self->Execute(std::make_unique<TTxUpdateBlock>(Self, tabletId, blockedGeneration,
+ agent.ConnectedNodeId, TActivationContext::Now(), std::move(response)));
+ }
+ }
+
+ TActivationContext::Send(response.release()); // not sent if the request got processed and response now is nullptr
}
- void TBlobDepot::Handle(TEvBlobDepot::TEvQueryBlocks::TPtr ev) {
- return BlocksManager->Handle(ev);
+ void TBlobDepot::TBlocksManager::Handle(TEvBlobDepot::TEvQueryBlocks::TPtr ev) {
+ TAgent& agent = Self->GetAgent(ev->Recipient);
+ const ui32 agentId = agent.ConnectedNodeId;
+ Y_VERIFY(agentId);
+
+ const TMonotonic now = TActivationContext::Monotonic();
+
+ const auto& record = ev->Get()->Record;
+ auto [response, responseRecord] = TEvBlobDepot::MakeResponseFor(*ev, Self->SelfId());
+ responseRecord->SetTimeToLiveMs(BlockLeaseTime.MilliSeconds());
+
+ for (const ui64 tabletId : record.GetTabletIds()) {
+ auto& block = Blocks[tabletId];
+ responseRecord->AddBlockedGenerations(block.BlockedGeneration);
+ block.PerAgentInfo[agentId].ExpirationTimestamp = now + BlockLeaseTime;
+ }
+
+ TActivationContext::Send(response.release());
}
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/blocks.h b/ydb/core/blob_depot/blocks.h
new file mode 100644
index 00000000000..bd3aba38997
--- /dev/null
+++ b/ydb/core/blob_depot/blocks.h
@@ -0,0 +1,42 @@
+#pragma once
+
+#include "defs.h"
+#include "blob_depot_tablet.h"
+
+namespace NKikimr::NBlobDepot {
+
+ class TBlobDepot::TBlocksManager {
+ // wait duration before issuing blocks via storage
+ static constexpr TDuration AgentsWaitTime = TDuration::Seconds(1);
+
+ // TTL for block lease
+ static constexpr TDuration BlockLeaseTime = TDuration::Seconds(60);
+
+ struct TBlock {
+ struct TPerAgentInfo {
+ TMonotonic ExpirationTimestamp = TMonotonic::Zero();
+ };
+
+ ui32 BlockedGeneration = 0;
+ THashMap<ui32, TPerAgentInfo> PerAgentInfo;
+ };
+
+ TBlobDepot* const Self;
+ THashMap<ui64, TBlock> Blocks;
+
+ private:
+ class TTxUpdateBlock;
+ class TBlockProcessorActor;
+
+ public:
+ TBlocksManager(TBlobDepot *self)
+ : Self(self)
+ {}
+
+ void AddBlockOnLoad(ui64 tabletId, ui32 generation);
+ void OnBlockCommitted(ui64 tabletId, ui32 blockedGeneration, ui32 nodeId, std::unique_ptr<IEventHandle> response);
+ void Handle(TEvBlobDepot::TEvBlock::TPtr ev);
+ void Handle(TEvBlobDepot::TEvQueryBlocks::TPtr ev);
+ };
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp
index 1ecc15a9133..f2752fc8ee4 100644
--- a/ydb/core/blob_depot/data.cpp
+++ b/ydb/core/blob_depot/data.cpp
@@ -1,150 +1,283 @@
-#include "blob_depot_tablet.h"
+#include "data.h"
+#include "schema.h"
namespace NKikimr::NBlobDepot {
- class TBlobDepot::TDataManager {
- TBlobDepot* const Self;
+ class TBlobDepot::TData::TTxConfirmGC : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
+ const ui8 Channel;
+ const ui32 GroupId;
+ std::vector<TLogoBlobID> TrashDeleted;
+ const ui64 ConfirmedGenStep;
- struct TCompareKey {
- bool operator ()(const TString& x, const TString& y) const { return x < y; }
- bool operator ()(const TStringBuf& x, const TString& y) const { return x < y; }
- bool operator ()(const TString& x, const TStringBuf& y) const { return x < y; }
+ static constexpr ui32 MaxKeysToProcessAtOnce = 10'000;
- using is_transparent = void;
- };
+ public:
+ TTxConfirmGC(TBlobDepot *self, ui8 channel, ui32 groupId, std::vector<TLogoBlobID> trashDeleted,
+ ui64 confirmedGenStep)
+ : TTransactionBase(self)
+ , Channel(channel)
+ , GroupId(groupId)
+ , TrashDeleted(std::move(trashDeleted))
+ , ConfirmedGenStep(confirmedGenStep)
+ {}
- std::map<TString, TDataValue, TCompareKey> Data;
- std::set<TLogoBlobID> DataBlobIds;
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ NIceDb::TNiceDb db(txc.DB);
- public:
- TDataManager(TBlobDepot *self)
- : Self(self)
- {
- (void)Self;
+ for (ui32 i = 0; i < TrashDeleted.size() && i < MaxKeysToProcessAtOnce; ++i) {
+ db.Table<Schema::Trash>().Key(TKey(TrashDeleted[i]).MakeBinaryKey()).Delete();
+ }
+ if (TrashDeleted.size() <= MaxKeysToProcessAtOnce) {
+ TrashDeleted.clear();
+ db.Table<Schema::ConfirmedGC>().Key(Channel, GroupId).Update<Schema::ConfirmedGC::ConfirmedGenStep>(
+ ConfirmedGenStep);
+ } else {
+ std::vector<TLogoBlobID> temp;
+ temp.insert(temp.end(), TrashDeleted.begin() + MaxKeysToProcessAtOnce, TrashDeleted.end());
+ temp.swap(TrashDeleted);
+ }
+
+ return true;
}
- std::optional<TDataValue> FindKey(TStringBuf key) {
- const auto it = Data.find(key);
- return it != Data.end() ? std::make_optional(it->second) : std::nullopt;
+ void Complete(const TActorContext&) override {
+ if (TrashDeleted.empty()) {
+ Self->Data->OnCommitConfirmedGC(Channel, GroupId);
+ } else { // resume transaction
+ Self->Execute(std::make_unique<TTxConfirmGC>(Self, Channel, GroupId, std::move(TrashDeleted), ConfirmedGenStep));
+ }
}
+ };
- void ScanRange(const std::optional<TStringBuf>& begin, const std::optional<TStringBuf>& end,
- TScanFlags flags, const std::function<bool(TStringBuf, const TDataValue&)>& callback) {
- auto beginIt = !begin ? Data.begin()
- : flags & EScanFlags::INCLUDE_BEGIN ? Data.lower_bound(*begin)
- : Data.upper_bound(*begin);
-
- auto endIt = !end ? Data.end()
- : flags & EScanFlags::INCLUDE_END ? Data.upper_bound(*end)
- : Data.lower_bound(*end);
-
- if (flags & EScanFlags::REVERSE) {
- if (beginIt != endIt) {
- --endIt;
- do {
- auto& current = *endIt--;
- if (!callback(current.first, current.second)) {
- break;
- }
- } while (beginIt != endIt);
- }
- } else {
- while (beginIt != endIt) {
- auto& current = *beginIt++;
- if (!callback(current.first, current.second)) {
- break;
- }
- }
+ std::optional<TBlobDepot::TData::TValue> TBlobDepot::TData::FindKey(const TKey& key) {
+ const auto it = Data.find(key);
+ return it != Data.end() ? std::make_optional(it->second) : std::nullopt;
+ }
+
+ TBlobDepot::TData::TRecordsPerChannelGroup& TBlobDepot::TData::GetRecordsPerChannelGroup(TLogoBlobID id) {
+ TTabletStorageInfo *info = Self->Info();
+ const ui32 groupId = info->GroupFor(id.Channel(), id.Generation());
+ Y_VERIFY(groupId != Max<ui32>());
+ const auto& key = std::make_tuple(id.TabletID(), id.Channel(), groupId);
+ const auto [it, _] = RecordsPerChannelGroup.try_emplace(key, id.TabletID(), id.Channel(), groupId);
+ return it->second;
+ }
+
+ void TBlobDepot::TData::AddDataOnLoad(TKey key, TString value) {
+ NKikimrBlobDepot::TValue proto;
+ const bool success = proto.ParseFromString(value);
+ Y_VERIFY(success);
+ PutKey(std::move(key), {
+ .Meta = proto.GetMeta(),
+ .ValueChain = std::move(*proto.MutableValueChain()),
+ .KeepState = proto.GetKeepState(),
+ .Public = proto.GetPublic(),
+ });
+ }
+
+ void TBlobDepot::TData::AddTrashOnLoad(TLogoBlobID id) {
+ auto& record = GetRecordsPerChannelGroup(id);
+ record.Trash.insert(id);
+ OnTrashInserted(record);
+ }
+
+ void TBlobDepot::TData::AddConfirmedGenStepOnLoad(ui8 channel, ui32 groupId, ui64 confirmedGenStep) {
+ const auto& key = std::make_tuple(Self->TabletID(), channel, groupId);
+ const auto [it, _] = RecordsPerChannelGroup.try_emplace(key, Self->TabletID(), channel, groupId);
+ auto& record = it->second;
+ record.LastConfirmedGenStep = confirmedGenStep;
+ }
+
+ void TBlobDepot::TData::PutKey(TKey key, TValue&& data) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT01, "PutKey", (TabletId, Self->TabletID()), (Key, key.ToString(Self->Config)),
+ (KeepState, NKikimrBlobDepot::EKeepState_Name(data.KeepState)));
+
+ EnumerateBlobsForValueChain(data.ValueChain, Self->TabletID(), [&](TLogoBlobID id) {
+ if (!RefCount[id]++) {
+ // first mention of this id
+ auto& record = GetRecordsPerChannelGroup(id);
+ const auto [_, inserted] = record.Used.insert(id);
+ Y_VERIFY(inserted);
}
+ });
+
+ Data[std::move(key)] = std::move(data);
+ }
+
+ void TBlobDepot::TData::OnTrashInserted(TRecordsPerChannelGroup& record) {
+ if (!record.CollectGarbageRequestInFlight && record.TabletId == Self->TabletID()) {
+ RecordsWithTrash.PushBack(&record);
}
+ }
- void DeleteKey(TStringBuf key) {
- Data.erase(TString(key));
+ std::optional<TString> TBlobDepot::TData::UpdateKeepState(TKey key, NKikimrBlobDepot::EKeepState keepState) {
+ const auto it = Data.find(key);
+ if (it != Data.end() && keepState <= it->second.KeepState) {
+ return std::nullopt;
}
+ auto& value = Data[std::move(key)];
+ value.KeepState = keepState;
+ return ToValueProto(value);
+ }
- void PutKey(TString key, TDataValue&& data) {
- auto getKeyString = [&] {
- if (Self->Config.GetOperationMode() == NKikimrBlobDepot::VirtualGroup) {
- return TLogoBlobID(reinterpret_cast<const ui64*>(key.data())).ToString();
- } else {
- return key;
- }
- };
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT01, "PutKey", (TabletId, Self->TabletID()), (Key, EscapeC(getKeyString())),
- (KeepState, NKikimrBlobDepot::EKeepState_Name(data.KeepState)));
+ void TBlobDepot::TData::DeleteKey(const TKey& key, const std::function<void(TLogoBlobID)>& updateTrash, void *cookie) {
+ const auto it = Data.find(key);
+ Y_VERIFY(it != Data.end());
+ TValue& value = it->second;
+ EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), [&](TLogoBlobID id) {
+ const auto it = RefCount.find(id);
+ Y_VERIFY(it != RefCount.end());
+ if (!--it->second) {
+ InFlightTrash.emplace(cookie, id);
+ RefCount.erase(it);
+ updateTrash(id);
+ }
+ });
+ Data.erase(it);
+ }
- Data[std::move(key)] = std::move(data);
+ void TBlobDepot::TData::CommitTrash(void *cookie) {
+ auto range = InFlightTrash.equal_range(cookie);
+ for (auto it = range.first; it != range.second; ++it) {
+ auto& record = GetRecordsPerChannelGroup(it->second);
+ const auto usedIt = record.Used.find(it->second);
+ Y_VERIFY(usedIt != record.Used.end());
+ record.Trash.insert(record.Used.extract(usedIt));
+ OnTrashInserted(record);
}
+ InFlightTrash.erase(range.first, range.second);
+ }
- void AddDataOnLoad(TString key, TString value) {
- NKikimrBlobDepot::TValue proto;
- const bool success = proto.ParseFromString(value);
- Y_VERIFY(success);
- PutKey(std::move(key), {
- .Meta = proto.GetMeta(),
- .ValueChain = std::move(*proto.MutableValueChain()),
- .KeepState = proto.GetKeepState(),
- .Public = proto.GetPublic(),
- });
+ TString TBlobDepot::TData::ToValueProto(const TValue& value) {
+ NKikimrBlobDepot::TValue proto;
+ if (value.Meta) {
+ proto.SetMeta(value.Meta);
+ }
+ proto.MutableValueChain()->CopyFrom(value.ValueChain);
+ if (proto.GetKeepState() != value.KeepState) {
+ proto.SetKeepState(value.KeepState);
+ }
+ if (proto.GetPublic() != value.Public) {
+ proto.SetPublic(value.Public);
}
- std::optional<TString> UpdateKeepState(TStringBuf key, NKikimrBlobDepot::EKeepState keepState) {
- auto& value = Data[TString(key)];
- if (value.KeepState < keepState) {
- value.KeepState = keepState;
- return ToValueProto(value);
+ TString s;
+ const bool success = proto.SerializeToString(&s);
+ Y_VERIFY(success);
+ return s;
+ }
+
+ void TBlobDepot::TData::HandleTrash() {
+ const ui32 generation = Self->Executor()->Generation();
+
+ for (TRecordsPerChannelGroup& record : RecordsWithTrash) {
+ Y_VERIFY(!record.CollectGarbageRequestInFlight);
+ Y_VERIFY(record.TabletId == Self->TabletID());
+ Y_VERIFY(!record.Trash.empty());
+
+ ui64 nextGenStep = 0;
+
+ auto& channel = Self->PerChannelRecords[record.Channel];
+ if (channel.GivenStepIndex.empty()) {
+ nextGenStep = GenStep(*--record.Trash.end());
} else {
- return std::nullopt;
+ const auto& [leastStep, leastIndex] = *channel.GivenStepIndex.begin();
+ const TLogoBlobID maxId(record.TabletId, generation, leastStep, record.Channel, 0, 0);
+ const auto it = record.Trash.lower_bound(maxId);
+ if (it != record.Trash.begin()) {
+ nextGenStep = GenStep(*std::prev(it));
+ }
+ }
+
+ auto keep = std::make_unique<TVector<TLogoBlobID>>();
+ auto doNotKeep = std::make_unique<TVector<TLogoBlobID>>();
+
+ // FIXME: check for blob leaks when LastConfirmedGenStep is not properly persisted
+ for (auto it = record.Trash.begin(); it != record.Trash.end() && GenStep(*it) <= record.LastConfirmedGenStep; ++it) {
+ doNotKeep->push_back(*it);
}
- }
- static TString ToValueProto(const TDataValue& value) {
- NKikimrBlobDepot::TValue proto;
- if (value.Meta) {
- proto.SetMeta(value.Meta);
+ // FIXME: check for blob loss when LastConfirmedGenStep is not properly persisted
+ const TLogoBlobID keepFrom(record.TabletId, ui32(record.LastConfirmedGenStep >> 32),
+ ui32(record.LastConfirmedGenStep), record.Channel, 0, 0);
+ for (auto it = record.Used.upper_bound(keepFrom); it != record.Used.end() && GenStep(*it) <= nextGenStep; ++it) {
+ keep->push_back(*it);
}
- proto.MutableValueChain()->CopyFrom(value.ValueChain);
- if (proto.GetKeepState() != value.KeepState) {
- proto.SetKeepState(value.KeepState);
+
+ if (keep->empty()) {
+ keep.reset();
}
- if (proto.GetPublic() != value.Public) {
- proto.SetPublic(value.Public);
+ if (doNotKeep->empty()) {
+ doNotKeep.reset();
}
+ const bool collect = nextGenStep > record.LastConfirmedGenStep;
- TString s;
- const bool success = proto.SerializeToString(&s);
- Y_VERIFY(success);
- return s;
- }
- };
+ if (!keep && !doNotKeep && !collect) {
+ continue; // skip this one
+ }
- TBlobDepot::TDataManagerPtr TBlobDepot::CreateDataManager() {
- return {new TDataManager{this}, std::default_delete<TDataManager>{}};
- }
+ auto ev = std::make_unique<TEvBlobStorage::TEvCollectGarbage>(record.TabletId, generation,
+ record.PerGenerationCounter, record.Channel, collect, ui32(nextGenStep >> 32), ui32(nextGenStep),
+ keep.get(), doNotKeep.get(), TInstant::Max(), true);
+ keep.release();
+ doNotKeep.release();
- std::optional<TBlobDepot::TDataValue> TBlobDepot::FindKey(TStringBuf key) {
- return DataManager->FindKey(key);
- }
+ record.CollectGarbageRequestInFlight = true;
+ record.PerGenerationCounter += ev->Collect ? ev->PerGenerationCounterStepSize() : 0;
+ record.TrashInFlight.insert(record.TrashInFlight.end(), record.Trash.begin(), record.Trash.end());
+ record.NextGenStep = Max(nextGenStep, record.LastConfirmedGenStep);
- void TBlobDepot::ScanRange(const std::optional<TStringBuf>& begin, const std::optional<TStringBuf>& end,
- TScanFlags flags, const std::function<bool(TStringBuf, const TDataValue&)>& callback) {
- return DataManager->ScanRange(begin, end, flags, callback);
- }
+ auto& kind = Self->ChannelKinds[Self->ChannelToKind[record.Channel]];
+ const auto blobSeqId = TBlobSeqId::FromBinary(generation, kind, kind.NextBlobSeqId);
+ Y_VERIFY(record.LastConfirmedGenStep < GenStep(generation, blobSeqId.Step));
+ if (GenStep(generation, blobSeqId.Step) <= nextGenStep) {
+ kind.NextBlobSeqId = TBlobSeqId{record.Channel, generation, ui32(nextGenStep) + 1, 0}.ToBinary(kind);
+ }
- void TBlobDepot::DeleteKey(TStringBuf key) {
- DataManager->DeleteKey(key);
- }
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT01, "issuing TEvCollectGarbage", (TabletId, Self->TabletID()),
+ (Channel, record.Channel), (GroupId, record.GroupId), (Msg, ev->ToString()),
+ (LastConfirmedGenStep, record.LastConfirmedGenStep), (NextGenStep, record.NextGenStep),
+ (TrashInFlight.size, record.TrashInFlight.size()));
+
+ SendToBSProxy(Self->SelfId(), record.GroupId, ev.release(), record.GroupId);
+ }
- void TBlobDepot::PutKey(TString key, TDataValue&& data) {
- DataManager->PutKey(std::move(key), std::move(data));
+ RecordsWithTrash.Clear();
}
- void TBlobDepot::AddDataOnLoad(TString key, TString value) {
- DataManager->AddDataOnLoad(std::move(key), std::move(value));
+ void TBlobDepot::TData::Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT01, "TEvCollectGarbageResult", (TabletId, ev->Get()->TabletId),
+ (Channel, ev->Get()->Channel), (GroupId, ev->Cookie), (Msg, ev->Get()->ToString()));
+ const auto& key = std::make_tuple(ev->Get()->TabletId, ev->Get()->Channel, ev->Cookie);
+ const auto it = RecordsPerChannelGroup.find(key);
+ Y_VERIFY(it != RecordsPerChannelGroup.end());
+ auto& record = it->second;
+ Y_VERIFY(record.CollectGarbageRequestInFlight);
+ if (ev->Get()->Status == NKikimrProto::OK) {
+ for (const TLogoBlobID& id : record.TrashInFlight) { // make it merge
+ record.Trash.erase(id);
+ }
+ record.LastConfirmedGenStep = record.NextGenStep;
+ Self->Execute(std::make_unique<TTxConfirmGC>(Self, record.Channel, record.GroupId,
+ std::exchange(record.TrashInFlight, {}), record.LastConfirmedGenStep));
+ } else {
+ record.CollectGarbageRequestInFlight = false;
+ OnTrashInserted(record);
+ HandleTrash();
+ }
}
- std::optional<TString> TBlobDepot::UpdateKeepState(TStringBuf key, NKikimrBlobDepot::EKeepState keepState) {
- return DataManager->UpdateKeepState(key, keepState);
+ void TBlobDepot::TData::OnCommitConfirmedGC(ui8 channel, ui32 groupId) {
+ const auto& key = std::make_tuple(Self->TabletID(), channel, groupId);
+ const auto it = RecordsPerChannelGroup.find(key);
+ Y_VERIFY(it != RecordsPerChannelGroup.end());
+ auto& record = it->second;
+ Y_VERIFY(record.CollectGarbageRequestInFlight);
+ record.CollectGarbageRequestInFlight = false;
+ if (!record.Trash.empty()) {
+ OnTrashInserted(record);
+ HandleTrash();
+ }
}
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h
new file mode 100644
index 00000000000..22515c82731
--- /dev/null
+++ b/ydb/core/blob_depot/data.h
@@ -0,0 +1,344 @@
+#pragma once
+
+#include "defs.h"
+#include "blob_depot_tablet.h"
+
+namespace NKikimr::NBlobDepot {
+
+ class TBlobDepot::TData {
+ TBlobDepot* const Self;
+
+ public:
+ class alignas(TString) TKey {
+ union {
+ ui64 Raw64[4];
+ ui8 Bytes[32];
+ char String[31];
+ struct {
+ ui8 Padding[31];
+ ui8 Type;
+ };
+ } Data;
+
+ static constexpr size_t TypeLenByteIdx = 31;
+ static constexpr size_t MaxInlineStringLen = TypeLenByteIdx;
+ static constexpr char BlobIdType = 32;
+ static constexpr char StringType = 33;
+
+ public:
+ TKey() {
+ Reset();
+ }
+
+ explicit TKey(TLogoBlobID id) {
+ Data.Type = BlobIdType;
+ reinterpret_cast<TLogoBlobID&>(Data.Bytes) = id;
+ }
+
+ explicit TKey(TStringBuf value) {
+ if (value.size() <= MaxInlineStringLen) {
+ Data.Type = EncodeInlineStringLenAsTypeByte(value.size());
+ memcpy(Data.String, value.data(), value.size());
+ Data.String[value.size()] = 0;
+ } else {
+ Data.Type = StringType;
+ new(Data.Bytes) TString(value);
+ }
+ }
+
+ explicit TKey(TString value) {
+ if (value.size() <= MaxInlineStringLen) {
+ Data.Type = EncodeInlineStringLenAsTypeByte(value.size());
+ memcpy(Data.String, value.data(), value.size());
+ Data.String[value.size()] = 0;
+ } else {
+ Data.Type = StringType;
+ new(Data.Bytes) TString(std::move(value));
+ }
+ }
+
+ TKey(const TKey& other) {
+ if (other.Data.Type == StringType) {
+ Data.Type = StringType;
+ new(Data.Bytes) TString(other.GetString());
+ } else {
+ Data = other.Data;
+ }
+ }
+
+ TKey(TKey&& other) {
+ if (other.Data.Type == StringType) {
+ Data.Type = StringType;
+ new(Data.Bytes) TString(std::move(other.GetString()));
+ other.Reset();
+ } else {
+ Data = other.Data;
+ }
+ }
+
+ ~TKey() {
+ Reset();
+ }
+
+ TKey& operator =(const TKey& other) {
+ if (this != &other) {
+ if (Data.Type == StringType && other.Data.Type == StringType) {
+ GetString() = other.GetString();
+ } else if (Data.Type == StringType) {
+ GetString().~TString();
+ Data = other.Data;
+ } else if (other.Data.Type == StringType) {
+ Data.Type = StringType;
+ new(Data.Bytes) TString(other.GetString());
+ } else {
+ Data = other.Data;
+ }
+ }
+ return *this;
+ }
+
+ TKey& operator =(TKey&& other) {
+ if (this != &other) {
+ if (Data.Type == StringType && other.Data.Type == StringType) {
+ GetString() = std::move(other.GetString());
+ other.Reset();
+ } else if (Data.Type == StringType) {
+ GetString().~TString();
+ Data = other.Data;
+ } else if (other.Data.Type == StringType) {
+ Data.Type = StringType;
+ new(Data.Bytes) TString(std::move(other.GetString()));
+ other.Reset();
+ } else {
+ Data = other.Data;
+ }
+ }
+ return *this;
+ }
+
+ std::variant<TLogoBlobID, TStringBuf> AsVariant() const {
+ if (Data.Type == BlobIdType) {
+ return GetBlobId();
+ } else {
+ return GetStringBuf();
+ }
+ }
+
+ TString MakeBinaryKey() const {
+ if (Data.Type == BlobIdType) {
+ return TString(GetBlobId().AsBinaryString());
+ } else {
+ return TString(GetStringBuf());
+ }
+ }
+
+ static TKey FromBinaryKey(const TStringBuf& key, const NKikimrBlobDepot::TBlobDepotConfig& config) {
+ if (config.GetOperationMode() == NKikimrBlobDepot::EOperationMode::VirtualGroup) {
+ Y_VERIFY(key.size() == 3 * sizeof(ui64));
+ return TKey(TLogoBlobID(reinterpret_cast<const ui64*>(key.data())));
+ } else {
+ return TKey(key);
+ }
+ }
+
+ TString ToString(const NKikimrBlobDepot::TBlobDepotConfig& config) const {
+ TStringStream s;
+ Output(s, config);
+ return s.Str();
+ }
+
+ void Output(IOutputStream& s, const NKikimrBlobDepot::TBlobDepotConfig& config) const {
+ if (config.GetOperationMode() == NKikimrBlobDepot::EOperationMode::VirtualGroup) {
+ s << GetBlobId();
+ } else {
+ s << EscapeC(GetStringBuf());
+ }
+ }
+
+ static int Compare(const TKey& x, const TKey& y) {
+ if (x.Data.Type == BlobIdType && y.Data.Type == BlobIdType) {
+ return x.GetBlobId() < y.GetBlobId() ? -1 : y.GetBlobId() < x.GetBlobId() ? 1 : 0;
+ } else if (x.Data.Type == BlobIdType) {
+ return -1;
+ } else if (y.Data.Type == BlobIdType) {
+ return 1;
+ } else {
+ const TStringBuf sbx = x.GetStringBuf();
+ const TStringBuf sby = y.GetStringBuf();
+ return sbx < sby ? -1 : sby < sbx ? 1 : 0;
+ }
+ }
+
+ const TLogoBlobID& GetBlobId() const {
+ Y_VERIFY_DEBUG(Data.Type == BlobIdType);
+ return reinterpret_cast<const TLogoBlobID&>(Data.Bytes);
+ }
+
+ friend bool operator ==(const TKey& x, const TKey& y) { return Compare(x, y) == 0; }
+ friend bool operator !=(const TKey& x, const TKey& y) { return Compare(x, y) != 0; }
+ friend bool operator < (const TKey& x, const TKey& y) { return Compare(x, y) < 0; }
+ friend bool operator <=(const TKey& x, const TKey& y) { return Compare(x, y) <= 0; }
+ friend bool operator > (const TKey& x, const TKey& y) { return Compare(x, y) > 0; }
+ friend bool operator >=(const TKey& x, const TKey& y) { return Compare(x, y) >= 0; }
+
+ private:
+ void Reset() {
+ if (Data.Type == StringType) {
+ GetString().~TString();
+ }
+ Data.Type = EncodeInlineStringLenAsTypeByte(0);
+ }
+
+ TStringBuf GetStringBuf() const {
+ if (Data.Type == StringType) {
+ return GetString();
+ } else {
+ return TStringBuf(Data.String, DecodeInlineStringLenFromTypeByte(Data.Type));
+ }
+ }
+
+ const TString& GetString() const {
+ Y_VERIFY_DEBUG(Data.Type == StringType);
+ return reinterpret_cast<const TString&>(Data.Bytes);
+ }
+
+ TString& GetString() {
+ Y_VERIFY_DEBUG(Data.Type == StringType);
+ return reinterpret_cast<TString&>(Data.Bytes);
+ }
+
+ static ui8 EncodeInlineStringLenAsTypeByte(size_t len) {
+ Y_VERIFY_DEBUG(len <= MaxInlineStringLen);
+ return len == MaxInlineStringLen ? 0 : len ? len : MaxInlineStringLen;
+ }
+
+ static size_t DecodeInlineStringLenFromTypeByte(ui8 type) {
+ return EncodeInlineStringLenAsTypeByte(type);
+ }
+ };
+
+ struct TValue {
+ TString Meta;
+ TValueChain ValueChain;
+ NKikimrBlobDepot::EKeepState KeepState;
+ bool Public;
+ };
+
+ enum EScanFlags : ui32 {
+ INCLUDE_BEGIN = 1,
+ INCLUDE_END = 2,
+ REVERSE = 4,
+ };
+
+ Y_DECLARE_FLAGS(TScanFlags, EScanFlags)
+
+ private:
+ struct TRecordWithTrash {};
+
+ struct TRecordsPerChannelGroup
+ : TIntrusiveListItem<TRecordsPerChannelGroup, TRecordWithTrash>
+ {
+ const ui64 TabletId;
+ const ui8 Channel;
+ const ui32 GroupId;
+
+ std::set<TLogoBlobID> Used;
+ std::set<TLogoBlobID> Trash; // committed trash
+ std::vector<TLogoBlobID> TrashInFlight;
+ ui32 PerGenerationCounter = 1;
+ ui64 LastConfirmedGenStep = 0;
+ ui64 NextGenStep = 0;
+ bool CollectGarbageRequestInFlight = false;
+
+ TRecordsPerChannelGroup(ui64 tabletId, ui8 channel, ui32 groupId)
+ : TabletId(tabletId)
+ , Channel(channel)
+ , GroupId(groupId)
+ {}
+ };
+
+ std::map<TKey, TValue> Data;
+ THashMap<TLogoBlobID, ui32> RefCount;
+ THashMap<std::tuple<ui64, ui8, ui32>, TRecordsPerChannelGroup> RecordsPerChannelGroup;
+ TIntrusiveList<TRecordsPerChannelGroup, TRecordWithTrash> RecordsWithTrash;
+
+ THashMultiMap<void*, TLogoBlobID> InFlightTrash; // being committed, but not yet confirmed
+
+ class TTxConfirmGC;
+
+ public:
+ TData(TBlobDepot *self)
+ : Self(self)
+ {}
+
+ std::optional<TValue> FindKey(const TKey& key);
+
+ template<typename TCallback>
+ void ScanRange(const TKey *begin, const TKey *end, TScanFlags flags, TCallback&& callback) {
+ auto beginIt = !begin ? Data.begin()
+ : flags & EScanFlags::INCLUDE_BEGIN ? Data.lower_bound(*begin)
+ : Data.upper_bound(*begin);
+
+ auto endIt = !end ? Data.end()
+ : flags & EScanFlags::INCLUDE_END ? Data.upper_bound(*end)
+ : Data.lower_bound(*end);
+
+ if (flags & EScanFlags::REVERSE) {
+ if (beginIt != endIt) {
+ --endIt;
+ do {
+ auto& current = *endIt--;
+ if (!callback(current.first, current.second)) {
+ break;
+ }
+ } while (beginIt != endIt);
+ }
+ } else {
+ while (beginIt != endIt) {
+ auto& current = *beginIt++;
+ if (!callback(current.first, current.second)) {
+ break;
+ }
+ }
+ }
+ }
+
+ TRecordsPerChannelGroup& GetRecordsPerChannelGroup(TLogoBlobID id);
+
+ void AddDataOnLoad(TKey key, TString value);
+ void AddTrashOnLoad(TLogoBlobID id);
+ void AddConfirmedGenStepOnLoad(ui8 channel, ui32 groupId, ui64 confirmedGenStep);
+
+ void PutKey(TKey key, TValue&& data);
+
+ void OnTrashInserted(TRecordsPerChannelGroup& record);
+ std::optional<TString> UpdateKeepState(TKey key, NKikimrBlobDepot::EKeepState keepState);
+ void DeleteKey(const TKey& key, const std::function<void(TLogoBlobID)>& updateTrash, void *cookie);
+ void CommitTrash(void *cookie);
+ void HandleTrash();
+ void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev);
+ void OnCommitConfirmedGC(ui8 channel, ui32 groupId);
+
+ static TString ToValueProto(const TValue& value);
+
+ template<typename TCallback>
+ void EnumerateRefCount(TCallback&& callback) {
+ for (const auto& [key, value] : RefCount) {
+ callback(key, value);
+ }
+ }
+
+ template<typename TCallback>
+ void EnumerateTrash(TCallback&& callback) {
+ for (const auto& [key, record] : RecordsPerChannelGroup) {
+ THashSet<TLogoBlobID> inFlight(record.TrashInFlight.begin(), record.TrashInFlight.end());
+ for (const TLogoBlobID& id : record.Trash) {
+ callback(record.TabletId, record.Channel, record.GroupId, id, inFlight.contains(id));
+ }
+ }
+ }
+ };
+
+ Y_DECLARE_OPERATORS_FOR_FLAGS(TBlobDepot::TData::TScanFlags)
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/defs.h b/ydb/core/blob_depot/defs.h
index bdb31cf8618..ce566644ccf 100644
--- a/ydb/core/blob_depot/defs.h
+++ b/ydb/core/blob_depot/defs.h
@@ -8,4 +8,6 @@
#include <ydb/core/protos/blob_depot.pb.h>
#include <ydb/core/util/stlog.h>
+#include <library/cpp/monlib/service/pages/templates.h>
+
#include <util/generic/va_args.h>
diff --git a/ydb/core/blob_depot/events.h b/ydb/core/blob_depot/events.h
index 01212d9e5fa..4532ec72b8e 100644
--- a/ydb/core/blob_depot/events.h
+++ b/ydb/core/blob_depot/events.h
@@ -55,7 +55,7 @@ namespace NKikimr {
BLOBDEPOT_EVENT_PB(EvApplyConfigResult, TabletId, TxId);
BLOBDEPOT_EVENT_PB(EvRegisterAgent, VirtualGroupId);
BLOBDEPOT_EVENT_PB_NO_ARGS(EvRegisterAgentResult);
- BLOBDEPOT_EVENT_PB(EvAllocateIds, ChannelKind);
+ BLOBDEPOT_EVENT_PB(EvAllocateIds, ChannelKind, Count);
BLOBDEPOT_EVENT_PB(EvAllocateIdsResult, ChannelKind, Generation);
BLOBDEPOT_EVENT_PB_NO_ARGS(EvPushNotify);
BLOBDEPOT_EVENT_PB_NO_ARGS(EvPushNotifyResult);
diff --git a/ydb/core/blob_depot/garbage_collection.cpp b/ydb/core/blob_depot/garbage_collection.cpp
index 3bd4da5c743..6d770624dc8 100644
--- a/ydb/core/blob_depot/garbage_collection.cpp
+++ b/ydb/core/blob_depot/garbage_collection.cpp
@@ -1,196 +1,183 @@
-#include "blob_depot_tablet.h"
+#include "garbage_collection.h"
#include "schema.h"
+#include "data.h"
namespace NKikimr::NBlobDepot {
- namespace {
- static ui64 GenStep(ui32 gen, ui32 step) {
- return static_cast<ui64>(gen) << 32 | step;
- }
- }
+ class TBlobDepot::TBarrierServer::TTxCollectGarbage : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
+ std::optional<TString> Error;
- class TBlobDepot::TGarbageCollectionManager {
- TBlobDepot* const Self;
+ std::unique_ptr<TEvBlobDepot::TEvCollectGarbage::THandle> Request;
+ int KeepIndex = 0;
+ int DoNotKeepIndex = 0;
+ ui32 NumKeysProcessed = 0;
+ bool Done = false;
- struct TBarrier {
- ui64 LastRecordGenStep = 0;
- ui64 Soft = 0;
- ui64 Hard = 0;
- };
+ static constexpr ui32 MaxKeysToProcessAtOnce = 10'000;
- THashMap<std::pair<ui64, ui8>, TBarrier> Barriers;
-
- private:
- class TTxCollectGarbage : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
- std::optional<TString> Error;
-
- std::unique_ptr<TEvBlobDepot::TEvCollectGarbage::THandle> Request;
- int KeepIndex = 0;
- int DoNotKeepIndex = 0;
- ui32 NumKeysProcessed = 0;
- bool Done = false;
-
- static constexpr ui32 MaxKeysToProcessAtOnce = 10'000;
-
- public:
- TTxCollectGarbage(TBlobDepot *self, std::unique_ptr<TEvBlobDepot::TEvCollectGarbage::THandle> request,
- ui32 keepIndex = 0, ui32 doNotKeepIndex = 0)
- : TTransactionBase(self)
- , Request(std::move(request))
- , KeepIndex(keepIndex)
- , DoNotKeepIndex(doNotKeepIndex)
- {}
-
- bool Execute(TTransactionContext& txc, const TActorContext&) override {
- Validate();
- if (!Error) {
- auto& record = Request->Get()->Record;
- Done = ProcessFlags(txc, KeepIndex, record.GetKeep(), NKikimrBlobDepot::EKeepState::Keep)
- && ProcessFlags(txc, DoNotKeepIndex, record.GetDoNotKeep(), NKikimrBlobDepot::EKeepState::DoNotKeep)
- && ProcessBarrier(txc);
- }
- return true;
+ public:
+ TTxCollectGarbage(TBlobDepot *self, std::unique_ptr<TEvBlobDepot::TEvCollectGarbage::THandle> request,
+ ui32 keepIndex = 0, ui32 doNotKeepIndex = 0)
+ : TTransactionBase(self)
+ , Request(std::move(request))
+ , KeepIndex(keepIndex)
+ , DoNotKeepIndex(doNotKeepIndex)
+ {}
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ Validate();
+ if (!Error) {
+ auto& record = Request->Get()->Record;
+ Done = ProcessFlags(txc, KeepIndex, record.GetKeep(), NKikimrBlobDepot::EKeepState::Keep)
+ && ProcessFlags(txc, DoNotKeepIndex, record.GetDoNotKeep(), NKikimrBlobDepot::EKeepState::DoNotKeep)
+ && ProcessBarrier(txc);
}
+ return true;
+ }
- void Complete(const TActorContext&) override {
- if (Done || Error) {
- auto [response, _] = TEvBlobDepot::MakeResponseFor(*Request, Self->SelfId(),
- Error ? NKikimrProto::ERROR : NKikimrProto::OK, std::move(Error));
- TActivationContext::Send(response.release());
- } else {
- Self->Execute(std::make_unique<TTxCollectGarbage>(Self, std::move(Request), KeepIndex, DoNotKeepIndex));
- }
+ void Complete(const TActorContext&) override {
+ Self->Data->CommitTrash(this);
+
+ if (Done || Error) {
+ auto [response, _] = TEvBlobDepot::MakeResponseFor(*Request, Self->SelfId(),
+ Error ? NKikimrProto::ERROR : NKikimrProto::OK, std::move(Error));
+ TActivationContext::Send(response.release());
+ Self->Data->HandleTrash();
+ } else {
+ Self->Execute(std::make_unique<TTxCollectGarbage>(Self, std::move(Request), KeepIndex, DoNotKeepIndex));
}
+ }
- void Validate() {
- // validate the command first
- auto& record = Request->Get()->Record;
- if (record.HasCollectGeneration() && record.HasCollectStep()) {
- if (!record.HasTabletId() || !record.HasChannel() || record.GetChannel() > TLogoBlobID::MaxChannel) {
- Error = "TabletId/Channel are either not set or invalid";
- } else if (!record.HasGeneration() || !record.HasPerGenerationCounter()) {
- Error = "Generation/PerGenerationCounter are not set";
- } else {
- const auto key = std::make_pair(record.GetTabletId(), record.GetChannel());
- auto& barriers = Self->GarbageCollectionManager->Barriers;
- if (const auto it = barriers.find(key); it != barriers.end()) {
- auto& barrier = it->second;
- const ui64 recordGenStep = GenStep(record.GetGeneration(), record.GetPerGenerationCounter());
- const ui64 collectGenStep = GenStep(record.GetCollectGeneration(), record.GetCollectStep());
- ui64& currentGenStep = record.GetHard() ? barrier.Hard : barrier.Soft;
- if (recordGenStep < barrier.LastRecordGenStep) {
- Error = "record generation:counter is obsolete";
- } else if (recordGenStep == barrier.LastRecordGenStep) {
- if (currentGenStep != collectGenStep) {
- Error = "repeated command with different collect parameters received";
- }
- } else if (collectGenStep < currentGenStep) {
- Error = "decreasing barrier";
+ void Validate() {
+ // validate the command first
+ auto& record = Request->Get()->Record;
+ if (record.HasCollectGeneration() && record.HasCollectStep()) {
+ if (!record.HasTabletId() || !record.HasChannel() || record.GetChannel() > TLogoBlobID::MaxChannel) {
+ Error = "TabletId/Channel are either not set or invalid";
+ } else if (!record.HasGeneration() || !record.HasPerGenerationCounter()) {
+ Error = "Generation/PerGenerationCounter are not set";
+ } else {
+ const auto key = std::make_pair(record.GetTabletId(), record.GetChannel());
+ auto& barriers = Self->BarrierServer->Barriers;
+ if (const auto it = barriers.find(key); it != barriers.end()) {
+ auto& barrier = it->second;
+ const ui64 recordGenStep = GenStep(record.GetGeneration(), record.GetPerGenerationCounter());
+ const ui64 collectGenStep = GenStep(record.GetCollectGeneration(), record.GetCollectStep());
+ ui64& currentGenStep = record.GetHard() ? barrier.Hard : barrier.Soft;
+ if (recordGenStep < barrier.LastRecordGenStep) {
+ Error = "record generation:counter is obsolete";
+ } else if (recordGenStep == barrier.LastRecordGenStep) {
+ if (currentGenStep != collectGenStep) {
+ Error = "repeated command with different collect parameters received";
}
+ } else if (collectGenStep < currentGenStep) {
+ Error = "decreasing barrier";
}
}
- } else if (record.HasCollectGeneration() || record.HasCollectStep()) {
- Error = "CollectGeneration/CollectStep set incorrectly";
}
+ } else if (record.HasCollectGeneration() || record.HasCollectStep()) {
+ Error = "CollectGeneration/CollectStep set incorrectly";
}
+ }
- bool ProcessFlags(TTransactionContext& txc, int& index,
- const NProtoBuf::RepeatedPtrField<NKikimrProto::TLogoBlobID>& items,
- NKikimrBlobDepot::EKeepState state) {
- NIceDb::TNiceDb db(txc.DB);
- for (; index < items.size() && NumKeysProcessed < MaxKeysToProcessAtOnce; ++index) {
- const auto id = LogoBlobIDFromLogoBlobID(items[index]);
- const TStringBuf key = id.AsBinaryString();
- if (const auto& value = Self->UpdateKeepState(key, state)) {
- db.Table<Schema::Data>().Key(TString(key)).Update<Schema::Data::Value>(*value);
- ++NumKeysProcessed;
- }
+ bool ProcessFlags(TTransactionContext& txc, int& index,
+ const NProtoBuf::RepeatedPtrField<NKikimrProto::TLogoBlobID>& items,
+ NKikimrBlobDepot::EKeepState state) {
+ NIceDb::TNiceDb db(txc.DB);
+ for (; index < items.size() && NumKeysProcessed < MaxKeysToProcessAtOnce; ++index) {
+ const auto id = LogoBlobIDFromLogoBlobID(items[index]);
+ TData::TKey key(id);
+ if (const auto& value = Self->Data->UpdateKeepState(key, state)) {
+ db.Table<Schema::Data>().Key(key.MakeBinaryKey()).Update<Schema::Data::Value>(*value);
+ ++NumKeysProcessed;
}
-
- return index == items.size();
}
- bool ProcessBarrier(TTransactionContext& txc) {
- NIceDb::TNiceDb db(txc.DB);
-
- const auto& record = Request->Get()->Record;
- if (record.HasCollectGeneration() && record.HasCollectStep()) {
- const TLogoBlobID first(record.GetTabletId(), 0, 0, record.GetChannel(), 0, 0);
- const TLogoBlobID last(record.GetTabletId(), record.GetCollectGeneration(), record.GetCollectStep(),
- record.GetChannel(), TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie, TLogoBlobID::MaxPartId,
- TLogoBlobID::MaxCrcMode);
- const bool hard = record.GetHard();
-
- auto processKey = [&](TStringBuf key, const TDataValue& value) {
- if (value.KeepState != NKikimrBlobDepot::EKeepState::Keep || hard) {
- const TLogoBlobID id(reinterpret_cast<const ui64*>(key.data()));
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT01, "DeleteKey", (TabletId, Self->TabletID()),
- (BlobId, id));
- db.Table<Schema::Data>().Key(TString(key)).Delete();
- Self->DeleteKey(key);
- ++NumKeysProcessed;
- }
+ return index == items.size();
+ }
- return NumKeysProcessed < MaxKeysToProcessAtOnce;
- };
+ bool ProcessBarrier(TTransactionContext& txc) {
+ NIceDb::TNiceDb db(txc.DB);
+
+ const auto& record = Request->Get()->Record;
+ if (record.HasCollectGeneration() && record.HasCollectStep()) {
+ const bool hard = record.GetHard();
+
+ auto processKey = [&](const TData::TKey& key, const TData::TValue& value) {
+ if (value.KeepState != NKikimrBlobDepot::EKeepState::Keep || hard) {
+ const TLogoBlobID id(key.GetBlobId());
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT01, "DeleteKey", (TabletId, Self->TabletID()), (BlobId, id));
+ db.Table<Schema::Data>().Key(key.MakeBinaryKey()).Delete();
+ auto updateTrash = [&](TLogoBlobID id) {
+ db.Table<Schema::Trash>().Key(TString(id.AsBinaryString())).Update();
+ };
+ Self->Data->DeleteKey(key, updateTrash, this);
+ ++NumKeysProcessed;
+ }
- Self->ScanRange(first.AsBinaryString(), last.AsBinaryString(),
- EScanFlags::INCLUDE_BEGIN | EScanFlags::INCLUDE_END, processKey);
+ return NumKeysProcessed < MaxKeysToProcessAtOnce;
+ };
- if (NumKeysProcessed == MaxKeysToProcessAtOnce) {
- return false;
- }
+ const TData::TKey first(TLogoBlobID(record.GetTabletId(), 0, 0, record.GetChannel(), 0, 0));
+ const TData::TKey last(TLogoBlobID(record.GetTabletId(), record.GetCollectGeneration(),
+ record.GetCollectStep(), record.GetChannel(), TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie,
+ TLogoBlobID::MaxPartId, TLogoBlobID::MaxCrcMode));
- const auto key = std::make_pair(record.GetTabletId(), record.GetChannel());
- auto& barriers = Self->GarbageCollectionManager->Barriers;
- auto& barrier = barriers[key];
- const ui64 recordGenStep = GenStep(record.GetGeneration(), record.GetPerGenerationCounter());
- const ui64 collectGenStep = GenStep(record.GetCollectGeneration(), record.GetCollectStep());
- ui64& currentGenStep = record.GetHard() ? barrier.Hard : barrier.Soft;
- Y_VERIFY(barrier.LastRecordGenStep <= recordGenStep);
- barrier.LastRecordGenStep = recordGenStep;
- Y_VERIFY(currentGenStep <= collectGenStep);
- currentGenStep = collectGenStep;
-
- db.Table<Schema::Barriers>().Key(record.GetTabletId(), record.GetChannel()).Update(
- NIceDb::TUpdate<Schema::Barriers::LastRecordGenStep>(recordGenStep),
- NIceDb::TUpdate<Schema::Barriers::Soft>(barrier.Soft),
- NIceDb::TUpdate<Schema::Barriers::Hard>(barrier.Hard)
- );
+ Self->Data->ScanRange(&first, &last, TData::EScanFlags::INCLUDE_BEGIN | TData::EScanFlags::INCLUDE_END,
+ processKey);
+
+ if (NumKeysProcessed == MaxKeysToProcessAtOnce) {
+ return false;
}
- return true;
+ const auto key = std::make_pair(record.GetTabletId(), record.GetChannel());
+ auto& barriers = Self->BarrierServer->Barriers;
+ auto& barrier = barriers[key];
+ const ui64 recordGenStep = GenStep(record.GetGeneration(), record.GetPerGenerationCounter());
+ const ui64 collectGenStep = GenStep(record.GetCollectGeneration(), record.GetCollectStep());
+ ui64& currentGenStep = record.GetHard() ? barrier.Hard : barrier.Soft;
+ Y_VERIFY(barrier.LastRecordGenStep <= recordGenStep);
+ barrier.LastRecordGenStep = recordGenStep;
+ Y_VERIFY(currentGenStep <= collectGenStep);
+ currentGenStep = collectGenStep;
+
+ db.Table<Schema::Barriers>().Key(record.GetTabletId(), record.GetChannel()).Update(
+ NIceDb::TUpdate<Schema::Barriers::LastRecordGenStep>(recordGenStep),
+ NIceDb::TUpdate<Schema::Barriers::Soft>(barrier.Soft),
+ NIceDb::TUpdate<Schema::Barriers::Hard>(barrier.Hard)
+ );
}
- };
- public:
- TGarbageCollectionManager(TBlobDepot *self)
- : Self(self)
- {}
-
- void Handle(TEvBlobDepot::TEvCollectGarbage::TPtr ev) {
- std::unique_ptr<TEvBlobDepot::TEvCollectGarbage::THandle> uniq(ev.Release());
- Self->Execute(std::make_unique<TTxCollectGarbage>(Self, std::move(uniq)));
- }
-
- bool CheckBlobForBarrier(TLogoBlobID id) const {
- const auto key = std::make_pair(id.TabletID(), id.Channel());
- const auto it = Barriers.find(key);
- const ui64 genstep = static_cast<ui64>(id.Generation()) << 32 | id.Step();
- return it == Barriers.end() || genstep > Max(it->second.Soft, it->second.Hard);
+ return true;
}
};
- TBlobDepot::TGarbageCollectionManagerPtr TBlobDepot::CreateGarbageCollectionManager() {
- return {new TGarbageCollectionManager{this}, std::default_delete<TGarbageCollectionManager>{}};
+ void TBlobDepot::TBarrierServer::AddBarrierOnLoad(ui64 tabletId, ui8 channel, ui64 lastRecordGenStep,
+ ui64 soft, ui64 hard) {
+ Barriers[std::make_pair(tabletId, channel)] = {
+ .LastRecordGenStep = lastRecordGenStep,
+ .Soft = soft,
+ .Hard = hard,
+ };
+ }
+
+ void TBlobDepot::TBarrierServer::Handle(TEvBlobDepot::TEvCollectGarbage::TPtr ev) {
+ Self->Execute(std::make_unique<TTxCollectGarbage>(Self,
+ std::unique_ptr<TEvBlobDepot::TEvCollectGarbage::THandle>(ev.Release())));
}
- void TBlobDepot::Handle(TEvBlobDepot::TEvCollectGarbage::TPtr ev) {
- GarbageCollectionManager->Handle(ev);
+ bool TBlobDepot::TBarrierServer::CheckBlobForBarrier(TLogoBlobID id) const {
+ const auto key = std::make_pair(id.TabletID(), id.Channel());
+ const auto it = Barriers.find(key);
+ return it == Barriers.end() || GenStep(id) > Max(it->second.Soft, it->second.Hard);
}
- bool TBlobDepot::CheckBlobForBarrier(TLogoBlobID id) const {
- return GarbageCollectionManager->CheckBlobForBarrier(id);
+ void TBlobDepot::TBarrierServer::GetBlobBarrierRelation(TLogoBlobID id, bool *underSoft, bool *underHard) const {
+ const auto key = std::make_pair(id.TabletID(), id.Channel());
+ const auto it = Barriers.find(key);
+ const ui64 genStep = GenStep(id);
+ *underSoft = it == Barriers.end() ? false : genStep <= it->second.Soft;
+ *underHard = it == Barriers.end() ? false : genStep <= it->second.Hard;
}
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/garbage_collection.h b/ydb/core/blob_depot/garbage_collection.h
new file mode 100644
index 00000000000..c6c6a4ac2b8
--- /dev/null
+++ b/ydb/core/blob_depot/garbage_collection.h
@@ -0,0 +1,42 @@
+#pragma once
+
+#include "defs.h"
+#include "blob_depot_tablet.h"
+
+namespace NKikimr::NBlobDepot {
+
+ class TBlobDepot::TBarrierServer {
+ TBlobDepot* const Self;
+
+ struct TBarrier {
+ ui64 LastRecordGenStep = 0;
+ ui64 Soft = 0;
+ ui64 Hard = 0;
+ };
+
+ THashMap<std::pair<ui64, ui8>, TBarrier> Barriers;
+
+ private:
+ class TTxCollectGarbage;
+
+ public:
+ TBarrierServer(TBlobDepot *self)
+ : Self(self)
+ {}
+
+ void AddBarrierOnLoad(ui64 tabletId, ui8 channel, ui64 lastRecordGenStep, ui64 soft, ui64 hard);
+ void Handle(TEvBlobDepot::TEvCollectGarbage::TPtr ev);
+ bool CheckBlobForBarrier(TLogoBlobID id) const;
+ void GetBlobBarrierRelation(TLogoBlobID id, bool *underSoft, bool *underHard) const;
+
+ template<typename TCallback>
+ void Enumerate(TCallback&& callback) {
+ for (const auto& [key, value] : Barriers) {
+ callback(key.first, key.second, static_cast<ui32>(value.LastRecordGenStep >> 32),
+ static_cast<ui32>(value.LastRecordGenStep), static_cast<ui32>(value.Soft >> 32),
+ static_cast<ui32>(value.Soft), static_cast<ui32>(value.Hard >> 32), static_cast<ui32>(value.Hard));
+ }
+ }
+ };
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/given_id_range.cpp b/ydb/core/blob_depot/given_id_range.cpp
new file mode 100644
index 00000000000..2d78ac0970c
--- /dev/null
+++ b/ydb/core/blob_depot/given_id_range.cpp
@@ -0,0 +1,127 @@
+#include "blob_depot_tablet.h"
+
+namespace NKikimr::NBlobDepot {
+
+ TGivenIdRange::TGivenIdRange(const NKikimrBlobDepot::TGivenIdRange& proto) {
+ for (const auto& range : proto.GetRanges()) {
+ const ui64 begin = range.GetBegin();
+ const ui32 len = range.GetLen();
+ TRange& r = InsertNewRange(begin, begin + len);
+ ui64 *chunks = const_cast<ui64*>(r.Bits.GetChunks());
+ const ui32 count = r.Bits.GetChunkCount();
+ Y_VERIFY(range.OffsetsSize() == range.BitMasksSize());
+ for (ui32 i = 0; i < range.OffsetsSize(); ++i) {
+ const ui32 offset = range.GetOffsets(i);
+ const ui64 bitMask = range.GetBitMasks(i);
+ NumAvailableItems += PopCount(bitMask) - 64;
+ Y_VERIFY(offset < count);
+ chunks[offset] = bitMask;
+ }
+ }
+ }
+
+ TGivenIdRange::TRange& TGivenIdRange::InsertNewRange(ui64 begin, ui64 end) {
+ const ui64 len = end - begin;
+ Y_VERIFY(len);
+ const auto it = Ranges.lower_bound(begin);
+ if (it != Ranges.begin()) {
+ const auto& [prevBegin, prev] = *std::prev(it);
+ Y_VERIFY(prevBegin + prev.Len <= begin);
+ }
+ if (it != Ranges.end()) {
+ Y_VERIFY(end <= it->first);
+ }
+ NumAvailableItems += len;
+ const auto newIt = Ranges.emplace_hint(it, begin, len);
+ return newIt->second;
+ }
+
+ void TGivenIdRange::IssueNewRange(ui64 begin, ui64 end) {
+ InsertNewRange(begin, end);
+ }
+
+ void TGivenIdRange::Join(TGivenIdRange&& other) {
+ Ranges.merge(std::move(other.Ranges));
+ if (Ranges.size() > 1) {
+ for (auto it = std::next(Ranges.begin()); it != Ranges.end(); ++it) {
+ const auto& [prevBegin, prev] = *std::prev(it);
+ const auto& [begin, range] = *it;
+ Y_VERIFY(prevBegin + prev.Len <= begin);
+ }
+ }
+ NumAvailableItems += std::exchange(other.NumAvailableItems, 0);
+ }
+
+ void TGivenIdRange::ToProto(NKikimrBlobDepot::TGivenIdRange *proto) {
+ for (const auto& [begin, range] : Ranges) {
+ auto *r = proto->AddRanges();
+ r->SetBegin(begin);
+ r->SetLen(range.Len);
+ const ui64 *chunks = range.Bits.GetChunks();
+ const ui32 count = range.Bits.GetChunkCount();
+ for (ui32 i = 0; i < count; ++i) {
+ if (chunks[i] != ~ui64(0)) {
+ r->AddOffsets(i);
+ r->AddBitMasks(chunks[i]);
+ }
+ }
+ }
+ }
+
+ void TGivenIdRange::RemovePoint(ui64 value) {
+ auto it = Ranges.upper_bound(value);
+ Y_VERIFY(it != Ranges.begin());
+ auto& [begin, range] = *--it;
+ Y_VERIFY(begin <= value);
+ const ui64 offset = value - begin;
+ Y_VERIFY(offset < range.Len);
+ Y_VERIFY(range.Bits[offset]);
+ range.Bits.Reset(offset);
+ --NumAvailableItems;
+ // FIXME: reduce range (maybe)?
+ }
+
+ void TGivenIdRange::Output(IOutputStream& s) const {
+ s << "{";
+ for (auto it = Ranges.begin(); it != Ranges.end(); ++it) {
+ if (it != Ranges.begin()) {
+ s << " ";
+ }
+ const auto& [begin, range] = *it;
+ s << begin << ":" << range.Len << "[";
+ for (ui32 i = 0; i < range.Len; ++i) {
+ s << static_cast<int>(range.Bits[i]);
+ }
+ s << "]";
+ }
+ }
+
+ TString TGivenIdRange::ToString() const {
+ TStringStream s;
+ Output(s);
+ return s.Str();
+ }
+
+ bool TGivenIdRange::IsEmpty() const {
+ return Ranges.empty();
+ }
+
+ ui32 TGivenIdRange::GetNumAvailableItems() const {
+ return NumAvailableItems;
+ }
+
+ ui64 TGivenIdRange::Allocate() {
+ Y_VERIFY(!Ranges.empty());
+ const auto it = Ranges.begin();
+ auto& [begin, range] = *it;
+ const size_t offset = range.Bits.FirstNonZeroBit();
+ const ui64 res = begin + offset;
+ range.Bits.Reset(offset);
+ if (range.Bits.NextNonZeroBit(offset) == range.Bits.Size()) {
+ Ranges.erase(it);
+ }
+ --NumAvailableItems;
+ return res;
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/mon_main.cpp b/ydb/core/blob_depot/mon_main.cpp
new file mode 100644
index 00000000000..af2c79405f4
--- /dev/null
+++ b/ydb/core/blob_depot/mon_main.cpp
@@ -0,0 +1,271 @@
+#include "blob_depot_tablet.h"
+#include "data.h"
+#include "garbage_collection.h"
+
+namespace NKikimr::NBlobDepot {
+
+ class TBlobDepot::TTxMonData : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
+ std::unique_ptr<NMon::TEvRemoteHttpInfo::THandle> Request;
+ TStringStream Stream;
+
+ enum class ETable {
+ Data,
+ RefCount,
+ Trash,
+ Barriers,
+ Blocks,
+ };
+
+ static constexpr const char *TableName(ETable table) {
+ switch (table) {
+ case ETable::Data: return "data";
+ case ETable::RefCount: return "refcount";
+ case ETable::Trash: return "trash";
+ case ETable::Barriers: return "barriers";
+ case ETable::Blocks: return "blocks";
+ }
+ }
+
+ static ETable TableByName(const TString& name) {
+ for (const ETable table : {ETable::Data, ETable::RefCount, ETable::Trash, ETable::Barriers, ETable::Blocks}) {
+ if (name == TableName(table)) {
+ return table;
+ }
+ }
+ return ETable::Data;
+ }
+
+ public:
+ TTxMonData(TBlobDepot *self, NMon::TEvRemoteHttpInfo::TPtr ev)
+ : TTransactionBase(self)
+ , Request(ev.Release())
+ {}
+
+ bool Execute(TTransactionContext& /*txc*/, const TActorContext&) override {
+ const TCgiParameters& cgi = Request->Get()->Cgi();
+ const ETable table = TableByName(cgi.Get("table"));
+
+ void (TTxMonData::*render)(bool) = nullptr;
+ switch (table) {
+ case ETable::Data:
+ render = &TTxMonData::RenderDataTable;
+ break;
+
+ case ETable::RefCount:
+ render = &TTxMonData::RenderRefCountTable;
+ break;
+
+ case ETable::Trash:
+ render = &TTxMonData::RenderTrashTable;
+ break;
+
+ case ETable::Barriers:
+ render = &TTxMonData::RenderBarriersTable;
+ break;
+
+ case ETable::Blocks:
+ render = &TTxMonData::RenderBlocksTable;
+ break;
+ }
+ if (!render) {
+ Y_FAIL();
+ }
+
+ HTML(Stream) {
+ DIV_CLASS("panel panel-info") {
+ DIV_CLASS("panel-heading") {
+ Stream << "Data";
+ }
+ DIV_CLASS("panel-body") {
+ Stream << "<ul class='nav nav-tabs'>";
+ for (const ETable tab : {ETable::Data, ETable::RefCount, ETable::Trash, ETable::Barriers, ETable::Blocks}) {
+ Stream << "<li" << (table == tab ? " class='active'" : "") << ">"
+ << "<a href='app?TabletID=" << Self->TabletID() << "&page=data&table="
+ << TableName(tab) << "'>" << TableName(tab) << "</a></li>";
+ }
+ Stream << "</ul>";
+
+ TABLE_CLASS("table") {
+ TABLEHEAD() {
+ TABLER() {
+ (this->*render)(true);
+ }
+ }
+ TABLEBODY() {
+ (this->*render)(false);
+ }
+ }
+ }
+ }
+ }
+
+ return true;
+ }
+
+ void RenderDataTable(bool header) {
+ HTML(Stream) {
+ if (header) {
+ TABLEH() { Stream << "key"; }
+ TABLEH() { Stream << "value chain"; }
+ TABLEH() { Stream << "keep state"; }
+ TABLEH() { Stream << "barrier"; }
+ } else {
+ Self->Data->ScanRange(nullptr, nullptr, 0, [&](const TData::TKey& key, const TData::TValue& value) {
+ TABLER() {
+ TABLED() {
+ key.Output(Stream, Self->Config);
+ }
+ TABLED() {
+ bool first = true;
+ for (const auto& item : value.ValueChain) {
+ if (first) {
+ first = false;
+ } else {
+ Stream << "<br/>";
+ }
+ Stream << TBlobSeqId::FromProto(item.GetLocator().GetBlobSeqId()).ToString();
+ if (item.HasSubrangeBegin() || item.HasSubrangeEnd()) {
+ Stream << "[";
+ if (item.HasSubrangeBegin()) {
+ Stream << item.GetSubrangeBegin();
+ }
+ Stream << ":";
+ if (item.HasSubrangeEnd()) {
+ Stream << item.GetSubrangeEnd();
+ }
+ Stream << "]";
+ }
+ }
+ }
+ TABLED() {
+ Stream << NKikimrBlobDepot::EKeepState_Name(value.KeepState);
+ }
+ TABLED() {
+ if (Self->Config.GetOperationMode() == NKikimrBlobDepot::EOperationMode::VirtualGroup) {
+ bool underSoft, underHard;
+ Self->BarrierServer->GetBlobBarrierRelation(key.GetBlobId(), &underSoft, &underHard);
+ Stream << (underSoft ? 'S' : '-') << (underHard ? 'H' : '-');
+ }
+ }
+ }
+ return true;
+ });
+ }
+ }
+ }
+
+ void RenderRefCountTable(bool header) {
+ HTML(Stream) {
+ if (header) {
+ TABLEH() { Stream << "blob id"; }
+ TABLEH() { Stream << "refcount"; }
+ } else {
+ Self->Data->EnumerateRefCount([&](TLogoBlobID id, ui32 count) {
+ TABLER() {
+ TABLED() { Stream << id; }
+ TABLED() { Stream << count; }
+ }
+ });
+ }
+ }
+ }
+
+ void RenderTrashTable(bool header) {
+ HTML(Stream) {
+ if (header) {
+ TABLEH() { Stream << "tablet id"; }
+ TABLEH() { Stream << "channel"; }
+ TABLEH() { Stream << "group id"; }
+ TABLEH() { Stream << "blob id"; }
+ TABLEH() { Stream << "in flight"; }
+ } else {
+ Self->Data->EnumerateTrash([&](ui64 tabletId, ui8 channel, ui32 groupId, TLogoBlobID blobId, bool inFlight) {
+ TABLER() {
+ TABLED() { Stream << tabletId; }
+ TABLED() { Stream << int(channel); }
+ TABLED() { Stream << groupId; }
+ TABLED() { Stream << blobId; }
+ TABLED() { Stream << (inFlight ? "*" : ""); }
+ }
+ });
+ }
+ }
+ }
+
+ void RenderBarriersTable(bool header) {
+ HTML(Stream) {
+ if (header) {
+ TABLEH() { Stream << "tablet id"; }
+ TABLEH() { Stream << "channel"; }
+ TABLEH() { Stream << "last record"; }
+ TABLEH() { Stream << "soft"; }
+ TABLEH() { Stream << "hard"; }
+ } else {
+ Self->BarrierServer->Enumerate([&](ui64 tabletId, ui8 channel, ui32 recordGen, ui32 recordCounter,
+ ui32 softGen, ui32 softStep, ui32 hardGen, ui32 hardStep) {
+ TABLER() {
+ TABLED() { Stream << tabletId; }
+ TABLED() { Stream << int(channel); }
+ TABLED() { Stream << recordGen << ":" << recordCounter; }
+ TABLED() { Stream << softGen << ":" << softStep; }
+ TABLED() { Stream << hardGen << ":" << hardStep; }
+ }
+ });
+ }
+ }
+ }
+
+ void RenderBlocksTable(bool header) {
+ HTML(Stream) {
+ if (header) {
+ } else {
+ }
+ }
+ }
+
+ void Complete(const TActorContext&) override {
+ TActivationContext::Send(new IEventHandle(Request->Sender, Self->SelfId(), new NMon::TEvRemoteHttpInfoRes(
+ Stream.Str()), 0, Request->Cookie));
+ }
+ };
+
+ bool TBlobDepot::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TActorContext&) {
+ if (!Executor() || !Executor()->GetStats().IsActive) {
+ return false;
+ } else if (!ev) {
+ return true;
+ }
+
+ TStringStream s;
+
+ const TCgiParameters& cgi = ev->Get()->Cgi();
+ if (cgi.Has("page")) {
+ const TString& page = cgi.Get("page");
+ if (page == "data") {
+ Execute(std::make_unique<TTxMonData>(this, ev));
+ return true;
+ } else {
+ Send(ev->Sender, new NMon::TEvRemoteBinaryInfoRes(TStringBuilder()
+ << "HTTP/1.1 403 Page not found\r\n"
+ << "Content-Type: text/html\r\n"
+ << "Connection: close\r\n"
+ << "\r\n"
+ << "<html><body>Page " << page << " is not found</body></html>"), ev->Cookie);
+ return true;
+ }
+ } else {
+ RenderMainPage(s);
+ }
+
+ Send(ev->Sender, new NMon::TEvRemoteHttpInfoRes(s.Str()), 0, ev->Cookie);
+ return true;
+ }
+
+
+ void TBlobDepot::RenderMainPage(IOutputStream& s) {
+ HTML(s) {
+ s << "<a href='app?TabletID=" << TabletID() << "&page=data'>Contained data</a><br>";
+ }
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/op_apply_config.cpp b/ydb/core/blob_depot/op_apply_config.cpp
index 57e161acea5..fdfb9c70d72 100644
--- a/ydb/core/blob_depot/op_apply_config.cpp
+++ b/ydb/core/blob_depot/op_apply_config.cpp
@@ -9,6 +9,7 @@ namespace NKikimr::NBlobDepot {
class TTxApplyConfig : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
std::unique_ptr<IEventHandle> Response;
TString ConfigProtobuf;
+ bool WasConfigured = false;
public:
TTxApplyConfig(TBlobDepot *self, TEvBlobDepot::TEvApplyConfig& ev, std::unique_ptr<IEventHandle> response,
@@ -25,6 +26,16 @@ namespace NKikimr::NBlobDepot {
bool Execute(TTransactionContext& txc, const TActorContext&) override {
NIceDb::TNiceDb db(txc.DB);
+
+ auto table = db.Table<Schema::Config>().Key(Schema::Config::Key::Value).Select();
+ if (!table.IsReady()) {
+ return false;
+ } else if (table.IsValid()) {
+ WasConfigured = table.HaveValue<Schema::Config::ConfigProtobuf>();
+ } else {
+ WasConfigured = false;
+ }
+
db.Table<Schema::Config>().Key(Schema::Config::Key::Value).Update(
NIceDb::TUpdate<Schema::Config::ConfigProtobuf>(ConfigProtobuf)
);
@@ -32,10 +43,9 @@ namespace NKikimr::NBlobDepot {
}
void Complete(const TActorContext&) override {
- const bool wasEmpty = !Self->Config.ByteSizeLong();
const bool success = Self->Config.ParseFromString(ConfigProtobuf);
Y_VERIFY(success);
- if (wasEmpty) {
+ if (!WasConfigured) {
Self->InitChannelKinds();
}
TActivationContext::Send(Response.release());
diff --git a/ydb/core/blob_depot/op_commit_blob_seq.cpp b/ydb/core/blob_depot/op_commit_blob_seq.cpp
index 56b9db28498..67182f22369 100644
--- a/ydb/core/blob_depot/op_commit_blob_seq.cpp
+++ b/ydb/core/blob_depot/op_commit_blob_seq.cpp
@@ -1,5 +1,7 @@
#include "blob_depot_tablet.h"
#include "schema.h"
+#include "data.h"
+#include "garbage_collection.h"
namespace NKikimr::NBlobDepot {
@@ -20,6 +22,8 @@ namespace NKikimr::NBlobDepot {
NKikimrBlobDepot::TEvCommitBlobSeqResult *responseRecord;
std::tie(Response, responseRecord) = TEvBlobDepot::MakeResponseFor(*Request, Self->SelfId());
+ TAgent& agent = Self->GetAgent(Request->Recipient);
+
for (const auto& item : Request->Get()->Record.GetItems()) {
auto *responseItem = responseRecord->AddItems();
responseItem->SetStatus(NKikimrProto::OK);
@@ -29,19 +33,17 @@ namespace NKikimr::NBlobDepot {
value.SetMeta(item.GetMeta());
}
auto *chain = value.AddValueChain();
- chain->MutableLocator()->CopyFrom(item.GetBlobLocator());
-
- const TString& key = item.GetKey();
- if (key.size() == 3 * sizeof(ui64)) {
- const TLogoBlobID id(reinterpret_cast<const ui64*>(key.data()));
- if (!Self->CheckBlobForBarrier(id)) {
- responseItem->SetStatus(NKikimrProto::ERROR);
- responseItem->SetErrorReason(TStringBuilder() << "BlobId# " << id << " is being put beyond the barrier");
- continue;
- }
+ auto *locator = chain->MutableLocator();
+ locator->CopyFrom(item.GetBlobLocator());
+
+ if (!MarkGivenIdCommitted(agent, TBlobSeqId::FromProto(locator->GetBlobSeqId()), responseItem)) {
+ continue;
+ }
+ if (!CheckKeyAgainstBarrier(item.GetKey(), responseItem)) {
+ continue;
}
- Self->PutKey(std::move(key), {
+ Self->Data->PutKey(TData::TKey::FromBinaryKey(item.GetKey(), Self->Config), {
.Meta = value.GetMeta(),
.ValueChain = std::move(*value.MutableValueChain()),
.KeepState = value.GetKeepState(),
@@ -58,7 +60,51 @@ namespace NKikimr::NBlobDepot {
return true;
}
+ bool MarkGivenIdCommitted(TAgent& agent, const TBlobSeqId& blobSeqId,
+ NKikimrBlobDepot::TEvCommitBlobSeqResult::TItem *responseItem) {
+ const NKikimrBlobDepot::TChannelKind::E kind = blobSeqId.Channel < Self->ChannelToKind.size()
+ ? Self->ChannelToKind[blobSeqId.Channel] : NKikimrBlobDepot::TChannelKind::System;
+ if (kind == NKikimrBlobDepot::TChannelKind::System) {
+ responseItem->SetStatus(NKikimrProto::ERROR);
+ responseItem->SetErrorReason("incorrect Channel for blob");
+ return false;
+ }
+
+ const auto channelKindIt = Self->ChannelKinds.find(kind);
+ Y_VERIFY(channelKindIt != Self->ChannelKinds.end());
+ auto& ck = channelKindIt->second;
+
+ const ui64 value = blobSeqId.ToBinary(ck);
+ agent.ChannelKinds[kind].GivenIdRanges.RemovePoint(value);
+ ck.GivenIdRanges.RemovePoint(value);
+
+ auto& channel = Self->PerChannelRecords[blobSeqId.Channel];
+ channel.GivenStepIndex.erase(std::make_pair(blobSeqId.Step, blobSeqId.Index));
+
+ return true;
+ }
+
+ bool CheckKeyAgainstBarrier(const TString& key, NKikimrBlobDepot::TEvCommitBlobSeqResult::TItem *responseItem) {
+ if (Self->Config.GetOperationMode() == NKikimrBlobDepot::EOperationMode::VirtualGroup) {
+ if (key.size() != 3 * sizeof(ui64)) {
+ responseItem->SetStatus(NKikimrProto::ERROR);
+ responseItem->SetErrorReason("incorrect BlobId format");
+ return false;
+ }
+
+ const TLogoBlobID id(reinterpret_cast<const ui64*>(key.data()));
+ if (!Self->BarrierServer->CheckBlobForBarrier(id)) {
+ responseItem->SetStatus(NKikimrProto::ERROR);
+ responseItem->SetErrorReason(TStringBuilder() << "BlobId# " << id << " is being put beyond the barrier");
+ return false;
+ }
+ }
+
+ return true;
+ }
+
void Complete(const TActorContext&) override {
+ Self->Data->HandleTrash();
TActivationContext::Send(Response.release());
}
};
diff --git a/ydb/core/blob_depot/op_load.cpp b/ydb/core/blob_depot/op_load.cpp
index 1ec538d0f07..e48ce2802ee 100644
--- a/ydb/core/blob_depot/op_load.cpp
+++ b/ydb/core/blob_depot/op_load.cpp
@@ -1,10 +1,15 @@
#include "blob_depot_tablet.h"
#include "schema.h"
+#include "blocks.h"
+#include "data.h"
+#include "garbage_collection.h"
namespace NKikimr::NBlobDepot {
void TBlobDepot::ExecuteTxLoad() {
class TTxLoad : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
+ bool Configured = false;
+
public:
TTxLoad(TBlobDepot *self)
: TTransactionBase(self)
@@ -26,6 +31,7 @@ namespace NKikimr::NBlobDepot {
if (table.HaveValue<Schema::Config::ConfigProtobuf>()) {
const bool success = Self->Config.ParseFromString(table.GetValue<Schema::Config::ConfigProtobuf>());
Y_VERIFY(success);
+ Configured = true;
}
}
}
@@ -37,7 +43,7 @@ namespace NKikimr::NBlobDepot {
return false;
}
while (table.IsValid()) {
- Self->AddBlockOnLoad(
+ Self->BlocksManager->AddBlockOnLoad(
table.GetValue<Schema::Blocks::TabletId>(),
table.GetValue<Schema::Blocks::BlockedGeneration>()
);
@@ -47,6 +53,26 @@ namespace NKikimr::NBlobDepot {
}
}
+ // Barriers
+ {
+ auto table = db.Table<Schema::Barriers>().Select();
+ if (!table.IsReady()) {
+ return false;
+ }
+ while (table.IsValid()) {
+ Self->BarrierServer->AddBarrierOnLoad(
+ table.GetValue<Schema::Barriers::TabletId>(),
+ table.GetValue<Schema::Barriers::Channel>(),
+ table.GetValue<Schema::Barriers::LastRecordGenStep>(),
+ table.GetValue<Schema::Barriers::Soft>(),
+ table.GetValue<Schema::Barriers::Hard>()
+ );
+ if (!table.Next()) {
+ return false;
+ }
+ }
+ }
+
// Data
{
auto table = db.Table<Schema::Data>().Select();
@@ -54,8 +80,8 @@ namespace NKikimr::NBlobDepot {
return false;
}
while (table.IsValid()) {
- Self->AddDataOnLoad(
- table.GetValue<Schema::Data::Key>(),
+ Self->Data->AddDataOnLoad(
+ TData::TKey::FromBinaryKey(table.GetValue<Schema::Data::Key>(), Self->Config),
table.GetValue<Schema::Data::Value>()
);
if (!table.Next()) {
@@ -64,17 +90,56 @@ namespace NKikimr::NBlobDepot {
}
}
+ // Trash
+ {
+ auto table = db.Table<Schema::Trash>().Select();
+ if (!table.IsReady()) {
+ return false;
+ }
+ while (table.IsValid()) {
+ const TString& blobId = table.GetValue<Schema::Trash::BlobId>();
+ Self->Data->AddTrashOnLoad(TLogoBlobID(reinterpret_cast<const ui64*>(blobId.data())));
+ if (!table.Next()) {
+ return false;
+ }
+ }
+ }
+
+ // ConfirmedGC
+ {
+ auto table = db.Table<Schema::ConfirmedGC>().Select();
+ if (!table.IsReady()) {
+ return false;
+ }
+ while (table.IsValid()) {
+ Self->Data->AddConfirmedGenStepOnLoad(table.GetValue<Schema::ConfirmedGC::Channel>(),
+ table.GetValue<Schema::ConfirmedGC::GroupId>(),
+ table.GetValue<Schema::ConfirmedGC::ConfirmedGenStep>());
+ if (!table.Next()) {
+ return false;
+ }
+ }
+ }
+
return true;
}
bool Precharge(NIceDb::TNiceDb& db) {
auto config = db.Table<Schema::Config>().Select();
auto blocks = db.Table<Schema::Blocks>().Select();
- return config.IsReady() && blocks.IsReady();
+ auto barriers = db.Table<Schema::Barriers>().Select();
+ auto data = db.Table<Schema::Data>().Select();
+ auto trash = db.Table<Schema::Trash>().Select();
+ auto confirmedGC = db.Table<Schema::ConfirmedGC>().Select();
+ return config.IsReady() && blocks.IsReady() && barriers.IsReady() && data.IsReady() && trash.IsReady() &&
+ confirmedGC.IsReady();
}
void Complete(const TActorContext&) override {
- Self->InitChannelKinds();
+ if (Configured) {
+ Self->InitChannelKinds();
+ Self->Data->HandleTrash();
+ }
}
};
diff --git a/ydb/core/blob_depot/op_resolve.cpp b/ydb/core/blob_depot/op_resolve.cpp
index 4904339756f..0b792f7c714 100644
--- a/ydb/core/blob_depot/op_resolve.cpp
+++ b/ydb/core/blob_depot/op_resolve.cpp
@@ -1,4 +1,5 @@
#include "blob_depot_tablet.h"
+#include "data.h"
namespace NKikimr::NBlobDepot {
@@ -31,14 +32,21 @@ namespace NKikimr::NBlobDepot {
ui32 itemIndex = 0;
for (const auto& item : ev->Get()->Record.GetItems()) {
- std::optional<TStringBuf> begin = item.HasBeginningKey() ? std::make_optional(item.GetBeginningKey()) : std::nullopt;
- std::optional<TStringBuf> end = item.HasEndingKey() ? std::make_optional(item.GetEndingKey()) : std::nullopt;
+ TData::TKey begin;
+ TData::TKey end;
+
+ if (item.HasBeginningKey()) {
+ begin = TData::TKey::FromBinaryKey(item.GetBeginningKey(), Config);
+ }
+ if (item.HasEndingKey()) {
+ end = TData::TKey::FromBinaryKey(item.GetEndingKey(), Config);
+ }
ui32 numItems = 0;
- auto addKey = [&](TStringBuf key, const TDataValue& value) {
+ auto addKey = [&](const TData::TKey& key, const TData::TValue& value) {
NKikimrBlobDepot::TEvResolveResult::TResolvedKey resolvedKey;
resolvedKey.SetItemIndex(itemIndex);
- resolvedKey.SetKey(key.data(), key.size());
+ resolvedKey.SetKey(key.MakeBinaryKey());
resolvedKey.MutableValueChain()->CopyFrom(value.ValueChain);
if (value.Meta) {
@@ -58,18 +66,18 @@ namespace NKikimr::NBlobDepot {
return !item.HasMaxKeys() || numItems != item.GetMaxKeys();
};
- TScanFlags flags;
+ TData::TScanFlags flags;
if (item.GetIncludeBeginning()) {
- flags |= EScanFlags::INCLUDE_BEGIN;
+ flags |= TData::EScanFlags::INCLUDE_BEGIN;
}
if (item.GetIncludeEnding()) {
- flags |= EScanFlags::INCLUDE_END;
+ flags |= TData::EScanFlags::INCLUDE_END;
}
if (item.GetReverse()) {
- flags |= EScanFlags::REVERSE;
+ flags |= TData::EScanFlags::REVERSE;
}
- ScanRange(begin, end, flags, addKey);
+ Data->ScanRange(item.HasBeginningKey() ? &begin : nullptr, item.HasEndingKey() ? &end : nullptr, flags, addKey);
++itemIndex;
}
diff --git a/ydb/core/blob_depot/schema.h b/ydb/core/blob_depot/schema.h
index f5b2f123bb2..beeba0585d0 100644
--- a/ydb/core/blob_depot/schema.h
+++ b/ydb/core/blob_depot/schema.h
@@ -66,11 +66,29 @@ namespace NKikimr::NBlobDepot {
>;
};
+ struct Trash : Table<5> {
+ struct BlobId : Column<1, NScheme::NTypeIds::String> {};
+
+ using TKey = TableKey<BlobId>;
+ using TColumns = TableColumns<BlobId>;
+ };
+
+ struct ConfirmedGC : Table<6> {
+ struct Channel : Column<1, NScheme::NTypeIds::Uint8> {};
+ struct GroupId : Column<2, NScheme::NTypeIds::Uint32> {};
+ struct ConfirmedGenStep : Column<3, NScheme::NTypeIds::Uint64> {};
+
+ using TKey = TableKey<Channel, GroupId>;
+ using TColumns = TableColumns<Channel, GroupId, ConfirmedGenStep>;
+ };
+
using TTables = SchemaTables<
Config,
Blocks,
Barriers,
- Data
+ Data,
+ Trash,
+ ConfirmedGC
>;
using TSettings = SchemaSettings<
diff --git a/ydb/core/blob_depot/types.h b/ydb/core/blob_depot/types.h
index c3e65dd220b..763289ec8f3 100644
--- a/ydb/core/blob_depot/types.h
+++ b/ydb/core/blob_depot/types.h
@@ -103,4 +103,65 @@ namespace NKikimr::NBlobDepot {
}
};
+ class TGivenIdRange {
+ struct TRange {
+ ui32 Len;
+ TDynBitMap Bits;
+
+ TRange(ui32 len)
+ : Len(len)
+ {
+ Bits.Set(0, len);
+ }
+ };
+ std::map<ui64, TRange> Ranges; // range.begin -> range
+ ui32 NumAvailableItems = 0;
+
+ public:
+ TGivenIdRange() = default;
+ TGivenIdRange(const NKikimrBlobDepot::TGivenIdRange& proto);
+
+ void ToProto(NKikimrBlobDepot::TGivenIdRange *proto);
+
+ void Join(TGivenIdRange&& other);
+
+ void IssueNewRange(ui64 begin, ui64 end);
+ void RemovePoint(ui64 value);
+
+ bool IsEmpty() const;
+ ui32 GetNumAvailableItems() const;
+ ui64 Allocate();
+
+ void Output(IOutputStream& s) const;
+ TString ToString() const;
+
+ private:
+ TRange& InsertNewRange(ui64 begin, ui64 len);
+ };
+
+ using TValueChain = NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TValueChain>;
+
+ template<typename TCallback>
+ void EnumerateBlobsForValueChain(const TValueChain& valueChain, ui64 tabletId, TCallback&& callback) {
+ for (const auto& item : valueChain) {
+ const auto& locator = item.GetLocator();
+ const auto& blobSeqId = TBlobSeqId::FromProto(locator.GetBlobSeqId());
+ if (locator.GetTotalDataLen() + locator.GetFooterLen() > MaxBlobSize) {
+ callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_DATA_BLOB, 0, locator.GetTotalDataLen()));
+ callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_FOOTER_BLOB, 0, locator.GetFooterLen()));
+ } else {
+ callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_COMPOSITE_BLOB, 0, locator.GetTotalDataLen() +
+ locator.GetFooterLen()));
+ }
+ }
+ }
+
+ inline ui64 GenStep(ui32 gen, ui32 step) {
+ return static_cast<ui64>(gen) << 32 | step;
+ }
+
+ inline ui64 GenStep(TLogoBlobID id) {
+ return GenStep(id.Generation(), id.Step());
+ }
+
} // NKikimr::NBlobDepot
diff --git a/ydb/core/protos/blob_depot.proto b/ydb/core/protos/blob_depot.proto
index 46ec82b9ec0..555430f272d 100644
--- a/ydb/core/protos/blob_depot.proto
+++ b/ydb/core/protos/blob_depot.proto
@@ -37,6 +37,18 @@ message TValue {
optional bool Public = 4;
}
+message TGivenIdRange {
+ message TRange {
+ optional uint64 Begin = 1;
+ optional uint32 Len = 2;
+ repeated uint32 Offsets = 3;
+ repeated fixed64 BitMasks = 4;
+ }
+ repeated TRange Ranges = 1;
+}
+
+
+
message TEvApplyConfig {
optional uint64 TxId = 1;
@@ -67,13 +79,13 @@ message TEvRegisterAgentResult {
message TEvAllocateIds {
optional NKikimrBlobDepot.TChannelKind.E ChannelKind = 1;
+ optional uint32 Count = 2;
}
message TEvAllocateIdsResult {
optional NKikimrBlobDepot.TChannelKind.E ChannelKind = 1;
optional uint32 Generation = 2; // executor generation, for validation purposes
- optional uint64 RangeBegin = 3; // <Generation> <Step> <Channel>
- optional uint64 RangeEnd = 4;
+ optional TGivenIdRange GivenIdRange = 3;
}
message TEvBlock {
diff --git a/ydb/core/protos/blob_depot_config.proto b/ydb/core/protos/blob_depot_config.proto
index 6858a242c20..4b77e4e0a7d 100644
--- a/ydb/core/protos/blob_depot_config.proto
+++ b/ydb/core/protos/blob_depot_config.proto
@@ -2,6 +2,7 @@ package NKikimrBlobDepot;
message TChannelKind {
enum E {
+ System = 0;
Data = 1;
Log = 2;
}