summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexander Rutkovsky <[email protected]>2022-06-23 12:52:25 +0300
committerAlexander Rutkovsky <[email protected]>2022-06-23 12:52:25 +0300
commitaecdb821de0afc13c27eee8efaa50521021294c3 (patch)
tree530771e8183b9345e1e1e0483d18be2496fce9a7
parent426b0be91fe907e8ebb94ad7fb397f620184d43e (diff)
BlobDepot work in progress KIKIMR-14867
ref:0290e81dc459353c3d0ce554ade5fcda36af845c
-rw-r--r--ydb/core/blob_depot/agent/CMakeLists.txt24
-rw-r--r--ydb/core/blob_depot/agent/agent_comm.cpp194
-rw-r--r--ydb/core/blob_depot/agent/agent_impl.h207
-rw-r--r--ydb/core/blob_depot/agent/agent_storage_block.cpp55
-rw-r--r--ydb/core/blob_depot/agent/agent_storage_collect_garbage.cpp10
-rw-r--r--ydb/core/blob_depot/agent/agent_storage_discover.cpp76
-rw-r--r--ydb/core/blob_depot/agent/agent_storage_get.cpp10
-rw-r--r--ydb/core/blob_depot/agent/agent_storage_patch.cpp10
-rw-r--r--ydb/core/blob_depot/agent/agent_storage_put.cpp46
-rw-r--r--ydb/core/blob_depot/agent/agent_storage_range.cpp10
-rw-r--r--ydb/core/blob_depot/agent/agent_storage_status.cpp10
-rw-r--r--ydb/core/blob_depot/agent/blocks.cpp125
-rw-r--r--ydb/core/blob_depot/agent/comm.cpp131
-rw-r--r--ydb/core/blob_depot/agent/query.cpp (renamed from ydb/core/blob_depot/agent/agent_storage_query.cpp)12
-rw-r--r--ydb/core/blob_depot/agent/read.cpp128
-rw-r--r--ydb/core/blob_depot/agent/request.cpp75
-rw-r--r--ydb/core/blob_depot/agent/status.cpp9
-rw-r--r--ydb/core/blob_depot/agent/storage_block.cpp65
-rw-r--r--ydb/core/blob_depot/agent/storage_collect_garbage.cpp38
-rw-r--r--ydb/core/blob_depot/agent/storage_discover.cpp154
-rw-r--r--ydb/core/blob_depot/agent/storage_get.cpp24
-rw-r--r--ydb/core/blob_depot/agent/storage_patch.cpp24
-rw-r--r--ydb/core/blob_depot/agent/storage_put.cpp47
-rw-r--r--ydb/core/blob_depot/agent/storage_range.cpp24
-rw-r--r--ydb/core/blob_depot/agent/storage_status.cpp25
-rw-r--r--ydb/core/blob_depot/blob_depot_agent.cpp32
-rw-r--r--ydb/core/blob_depot/blob_depot_tablet.h2
-rw-r--r--ydb/core/blob_depot/blocks.cpp26
-rw-r--r--ydb/core/blob_depot/defs.h1
-rw-r--r--ydb/core/blob_depot/events.h22
-rw-r--r--ydb/core/blob_depot/types.h42
-rw-r--r--ydb/core/protos/blob_depot.proto1
32 files changed, 1124 insertions, 535 deletions
diff --git a/ydb/core/blob_depot/agent/CMakeLists.txt b/ydb/core/blob_depot/agent/CMakeLists.txt
index 76f8566f83f..29aca02aa67 100644
--- a/ydb/core/blob_depot/agent/CMakeLists.txt
+++ b/ydb/core/blob_depot/agent/CMakeLists.txt
@@ -16,14 +16,18 @@ target_link_libraries(core-blob_depot-agent PUBLIC
)
target_sources(core-blob_depot-agent PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_comm.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_storage_query.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_storage_put.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_storage_get.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_storage_block.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_storage_discover.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_storage_range.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_storage_collect_garbage.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_storage_status.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_storage_patch.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/blocks.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/comm.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/query.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/read.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/request.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/status.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/storage_put.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/storage_get.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/storage_block.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/storage_discover.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/storage_range.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/storage_collect_garbage.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/storage_status.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/storage_patch.cpp
)
diff --git a/ydb/core/blob_depot/agent/agent_comm.cpp b/ydb/core/blob_depot/agent/agent_comm.cpp
deleted file mode 100644
index cf884280e45..00000000000
--- a/ydb/core/blob_depot/agent/agent_comm.cpp
+++ /dev/null
@@ -1,194 +0,0 @@
-#include "agent_impl.h"
-
-namespace NKikimr::NBlobDepot {
-
- void TBlobDepotAgent::Handle(TEvTabletPipe::TEvClientConnected::TPtr ev) {
- STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDAC01, "TEvClientConnected", (VirtualGroupId, VirtualGroupId),
- (Msg, ev->Get()->ToString()));
- }
-
- void TBlobDepotAgent::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev) {
- STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC02, "TEvClientDestroyed", (VirtualGroupId, VirtualGroupId),
- (Msg, ev->Get()->ToString()));
- PipeId = {};
- OnDisconnect();
- ConnectToBlobDepot();
- }
-
- void TBlobDepotAgent::ConnectToBlobDepot() {
- PipeId = Register(NTabletPipe::CreateClient(SelfId(), TabletId, NTabletPipe::TClientRetryPolicy::WithRetries()));
- const ui64 id = NextRequestId++;
- NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobDepot::TEvRegisterAgent(VirtualGroupId), id);
- RegisterRequest(id, nullptr, [this](IEventBase *ev) {
- if (ev) {
- auto& msg = *static_cast<TEvBlobDepot::TEvRegisterAgentResult*>(ev);
- STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC06, "TEvRegisterAgentResult", (VirtualGroupId, VirtualGroupId),
- (Msg, msg.Record));
- Registered = true;
- BlobDepotGeneration = msg.Record.GetGeneration();
- for (const auto& kind : msg.Record.GetChannelKinds()) {
- auto& v = ChannelKinds[kind.GetChannelKind()];
- v.ChannelGroups.clear();
- v.IndexToChannel.clear();
- for (const auto& channelGroup : kind.GetChannelGroups()) {
- const ui8 channel = channelGroup.GetChannel();
- const ui32 groupId = channelGroup.GetGroupId();
- v.ChannelGroups.emplace_back(channel, groupId);
- v.ChannelToIndex[channel] = v.IndexToChannel.size();
- v.IndexToChannel.push_back(channel);
- }
- }
- }
- return true;
- });
- IssueAllocateIdsIfNeeded(NKikimrBlobDepot::TChannelKind::Data);
- IssueAllocateIdsIfNeeded(NKikimrBlobDepot::TChannelKind::Log);
- }
-
- void TBlobDepotAgent::IssueAllocateIdsIfNeeded(NKikimrBlobDepot::TChannelKind::E channelKind) {
- auto& kind = ChannelKinds[channelKind];
-
- STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC09, "IssueAllocateIdsIfNeeded", (VirtualGroupId, VirtualGroupId),
- (ChannelKind, NKikimrBlobDepot::TChannelKind::E_Name(channelKind)),
- (IdAllocInFlight, kind.IdAllocInFlight), (IdQ.size, kind.IdQ.size()),
- (PreallocatedIdCount, kind.PreallocatedIdCount), (PipeId, PipeId));
- if (!kind.IdAllocInFlight && kind.IdQ.size() < kind.PreallocatedIdCount && PipeId) {
- const ui64 id = NextRequestId++;
- NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobDepot::TEvAllocateIds(channelKind), id);
- kind.IdAllocInFlight = true;
-
- RegisterRequest(id, nullptr, [this, channelKind](IEventBase *ev) {
- auto& kind = ChannelKinds[channelKind];
-
- Y_VERIFY(kind.IdAllocInFlight);
- kind.IdAllocInFlight = false;
-
- if (ev) {
- auto& msg = *static_cast<TEvBlobDepot::TEvAllocateIdsResult*>(ev);
- STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC07, "TEvAllocateIdsResult", (VirtualGroupId, VirtualGroupId),
- (Msg, msg.Record));
- Y_VERIFY(msg.Record.GetChannelKind() == channelKind);
- Y_VERIFY(msg.Record.GetGeneration() == BlobDepotGeneration);
-
- if (msg.Record.HasRangeBegin() && msg.Record.HasRangeEnd()) {
- kind.IdQ.push_back({BlobDepotGeneration, msg.Record.GetRangeBegin(), msg.Record.GetRangeEnd()});
-
- // FIXME notify waiting requests about new ids
-
- // ask for more ids if needed
- IssueAllocateIdsIfNeeded(channelKind);
- } else {
- // no such channel allocated
- }
- }
-
- return true; // request complete, remove from queue
- });
- }
- }
-
- void TBlobDepotAgent::OnDisconnect() {
- STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC04, "OnDisconnect", (VirtualGroupId, VirtualGroupId));
-
- for (auto& [id, item] : std::exchange(RequestInFlight, {})) {
- if (item.Sender) {
- item.Sender->OnRequestComplete(id);
- }
- const bool done = item.Callback(nullptr);
- Y_VERIFY(done);
- }
-
- Registered = false;
- }
-
- void TBlobDepotAgent::RegisterRequest(ui64 id, TRequestSender *sender, TRequestCompleteCallback callback) {
- const auto [_, inserted] = RequestInFlight.emplace(id, TRequestInFlight{sender, std::move(callback)});
- Y_VERIFY(inserted);
- if (sender) {
- sender->RegisterRequest(id);
- }
- }
-
- template<typename TEvent>
- void TBlobDepotAgent::HandleTabletResponse(TAutoPtr<TEventHandle<TEvent>> ev) {
- STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDAC05, "HandleTabletResponse", (VirtualGroupId, VirtualGroupId),
- (Id, ev->Cookie), (Type, TypeName<TEvent>()));
- if (const auto it = RequestInFlight.find(ev->Cookie); it != RequestInFlight.end()) {
- auto& [id, item] = *it;
- if (item.Sender) {
- item.Sender->OnRequestComplete(id);
- }
- if (item.Callback(ev->Get())) {
- RequestInFlight.erase(it);
- }
- } else {
- Y_FAIL(); // don't know how this can happen without logic error
- }
- }
-
- template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvRegisterAgentResult::TPtr ev);
- template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvAllocateIdsResult::TPtr ev);
- template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvBlockResult::TPtr ev);
- template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvQueryBlocksResult::TPtr ev);
- template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvCommitBlobSeqResult::TPtr ev);
- template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvResolveResult::TPtr ev);
-
- void TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvBlock msg, TRequestSender *sender, TRequestCompleteCallback callback) {
- auto ev = std::make_unique<TEvBlobDepot::TEvBlock>();
- msg.Swap(&ev->Record);
- Issue(std::move(ev), sender, std::move(callback));
- }
-
- void TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvResolve msg, TRequestSender *sender, TRequestCompleteCallback callback) {
- auto ev = std::make_unique<TEvBlobDepot::TEvResolve>();
- msg.Swap(&ev->Record);
- Issue(std::move(ev), sender, std::move(callback));
- }
-
- void TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvQueryBlocks msg, TRequestSender *sender, TRequestCompleteCallback callback) {
- auto ev = std::make_unique<TEvBlobDepot::TEvQueryBlocks>();
- msg.Swap(&ev->Record);
- Issue(std::move(ev), sender, std::move(callback));
- }
-
- void TBlobDepotAgent::Issue(std::unique_ptr<IEventBase> ev, TRequestSender *sender, TRequestCompleteCallback callback) {
- const ui64 id = NextRequestId++;
- STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDAC03, "Issue", (VirtualGroupId, VirtualGroupId), (Id, id), (Msg, ev->ToString()));
- NTabletPipe::SendData(SelfId(), PipeId, ev.release(), id);
- RegisterRequest(id, sender, std::move(callback));
- }
-
- NKikimrProto::EReplyStatus TBlobDepotAgent::CheckBlockForTablet(ui64 tabletId, ui32 generation, TExecutingQuery *query) {
- auto& block = Blocks[tabletId];
- const TMonotonic issueTime = TActivationContext::Monotonic();
- if (generation <= block.BlockedGeneration) {
- return NKikimrProto::RACE;
- } else if (issueTime< block.ExpirationTimestamp) {
- return NKikimrProto::OK;
- } else if (!block.RefreshInFlight) {
- NKikimrBlobDepot::TEvQueryBlocks queryBlocks;
- queryBlocks.AddTabletIds(tabletId);
- Issue(std::move(queryBlocks), nullptr, [=](IEventBase *ev) {
- if (ev) {
- auto& msg = *static_cast<TEvBlobDepot::TEvQueryBlocksResult*>(ev);
- const ui64 tabletId_ = tabletId;
- STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC08, "TEvQueryBlocksResult", (VirtualGroupId, VirtualGroupId),
- (Msg, msg.Record), (TabletId, tabletId_));
- Y_VERIFY(msg.Record.BlockedGenerationsSize() == 1);
- auto& block = Blocks[tabletId];
- Y_VERIFY(block.BlockedGeneration < generation);
- block.BlockedGeneration = msg.Record.GetBlockedGenerations(1);
- block.ExpirationTimestamp = issueTime + TDuration::MilliSeconds(msg.Record.GetTimeToLiveMs());
- for (auto& query : block.PendingBlockChecks) {
- query.OnUpdateBlock();
- }
- }
- return true;
- });
- block.RefreshInFlight = true;
- block.PendingBlockChecks.PushBack(query);
- }
- return NKikimrProto::NOTREADY;
- }
-
-} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h
index 1a27fbd921f..ff0f9848e55 100644
--- a/ydb/core/blob_depot/agent/agent_impl.h
+++ b/ydb/core/blob_depot/agent/agent_impl.h
@@ -15,7 +15,60 @@ namespace NKikimr::NBlobDepot {
XX(EvPatch) \
// END
- class TBlobDepotAgent : public TActor<TBlobDepotAgent> {
+ class TBlobDepotAgent;
+
+ struct TRequestContext {
+ virtual ~TRequestContext() = default;
+
+ template<typename T>
+ T& Obtain() {
+ T *sp = static_cast<T*>(this);
+ Y_VERIFY_DEBUG(sp == dynamic_cast<T*>(this));
+ return *sp;
+ }
+
+ using TPtr = std::shared_ptr<TRequestContext>;
+ };
+
+ struct TTabletDisconnected {};
+
+ class TRequestSender {
+ THashMap<ui64, TRequestContext::TPtr> RequestsInFlight;
+
+ protected:
+ TBlobDepotAgent& Agent;
+
+ public:
+ using TResponse = std::variant<
+ // internal events
+ TTabletDisconnected,
+
+ // tablet responses
+ TEvBlobDepot::TEvRegisterAgentResult*,
+ TEvBlobDepot::TEvAllocateIdsResult*,
+ TEvBlobDepot::TEvBlockResult*,
+ TEvBlobDepot::TEvQueryBlocksResult*,
+ TEvBlobDepot::TEvCommitBlobSeqResult*,
+ TEvBlobDepot::TEvResolveResult*,
+
+ // underlying DS proxy responses
+ TEvBlobStorage::TEvGetResult*
+ >;
+
+ public:
+ TRequestSender(TBlobDepotAgent& agent);
+ virtual ~TRequestSender();
+ void RegisterRequest(ui64 id, TRequestContext::TPtr context);
+ void OnRequestComplete(ui64 id, TResponse response);
+
+ protected:
+ virtual void ProcessResponse(ui64 id, TRequestContext::TPtr context, TResponse response) = 0;
+ };
+
+ class TBlobDepotAgent
+ : public TActor<TBlobDepotAgent>
+ , public TRequestSender
+ {
const ui32 VirtualGroupId;
ui64 TabletId = Max<ui64>();
TActorId PipeId;
@@ -23,7 +76,9 @@ namespace NKikimr::NBlobDepot {
public:
TBlobDepotAgent(ui32 virtualGroupId)
: TActor(&TThis::StateFunc)
+ , TRequestSender(*this)
, VirtualGroupId(virtualGroupId)
+ , BlocksManager(CreateBlocksManager())
{
Y_VERIFY(TGroupID(VirtualGroupId).ConfigurationType() == EGroupConfigurationType::Virtual);
}
@@ -43,6 +98,8 @@ namespace NKikimr::NBlobDepot {
hFunc(TEvBlobDepot::TEvCommitBlobSeqResult, HandleTabletResponse);
hFunc(TEvBlobDepot::TEvResolveResult, HandleTabletResponse);
+ hFunc(TEvBlobStorage::TEvGetResult, Handle);
+
ENUMERATE_INCOMING_EVENTS(FORWARD_STORAGE_PROXY)
);
#undef FORWARD_STORAGE_PROXY
@@ -67,54 +124,32 @@ namespace NKikimr::NBlobDepot {
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // BlobDepot communications
+ // BlobDepot and other actor communications
- using TRequestCompleteCallback = std::function<bool(IEventBase*)>;
+ ui64 NextRequestId = 1;
+ THashMap<ui64, TRequestSender*> TabletRequestInFlight;
+ THashMap<ui64, TRequestSender*> OtherRequestInFlight;
- class TRequestSender {
- THashSet<ui64> IdsInFlight;
+ void RegisterRequest(ui64 id, TRequestSender *sender, TRequestContext::TPtr context, bool toBlobDepotTablet);
- protected:
- TBlobDepotAgent& Agent;
+ template<typename TEvent>
+ void HandleTabletResponse(TAutoPtr<TEventHandle<TEvent>> ev);
- public:
- TRequestSender(TBlobDepotAgent& agent)
- : Agent(agent)
- {}
+ template<typename TEvent>
+ void HandleOtherResponse(TAutoPtr<TEventHandle<TEvent>> ev);
- ~TRequestSender() {
- for (const ui64 id : IdsInFlight) {
- const size_t num = Agent.RequestInFlight.erase(id);
- Y_VERIFY(num);
- }
- }
+ void HandleResponse(TAutoPtr<IEventHandle> ev, TRequestSender::TResponse response, THashMap<ui64, TRequestSender*>& map);
- void RegisterRequest(ui64 id) {
- const auto [_, inserted] = IdsInFlight.insert(id);
- Y_VERIFY(inserted);
- }
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- void OnRequestComplete(ui64 id) {
- const size_t num = IdsInFlight.erase(id);
- Y_VERIFY(num);
- }
- };
+ struct TAllocateIdsContext : TRequestContext {
+ NKikimrBlobDepot::TChannelKind::E ChannelKind;
- struct TRequestInFlight {
- TRequestSender *Sender;
- TRequestCompleteCallback Callback;
+ TAllocateIdsContext(NKikimrBlobDepot::TChannelKind::E channelKind)
+ : ChannelKind(channelKind)
+ {}
};
- ui64 NextRequestId = 1;
- THashMap<ui64, TRequestInFlight> RequestInFlight;
-
- void RegisterRequest(ui64 id, TRequestSender *sender, TRequestCompleteCallback callback);
-
- template<typename TEvent>
- void HandleTabletResponse(TAutoPtr<TEventHandle<TEvent>> ev);
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-
bool Registered = false;
ui32 BlobDepotGeneration = 0;
@@ -123,11 +158,15 @@ namespace NKikimr::NBlobDepot {
void ConnectToBlobDepot();
void OnDisconnect();
- void Issue(NKikimrBlobDepot::TEvBlock msg, TRequestSender *sender, TRequestCompleteCallback callback);
- void Issue(NKikimrBlobDepot::TEvResolve msg, TRequestSender *sender, TRequestCompleteCallback callback);
- void Issue(NKikimrBlobDepot::TEvQueryBlocks msg, TRequestSender *sender, TRequestCompleteCallback callback);
+ void ProcessResponse(ui64 id, TRequestContext::TPtr context, TResponse response) override;
+ void HandleRegisterAgentResult(TRequestContext::TPtr context, TEvBlobDepot::TEvRegisterAgentResult& msg);
+ void HandleAllocateIdsResult(TRequestContext::TPtr context, TEvBlobDepot::TEvAllocateIdsResult& msg);
- void Issue(std::unique_ptr<IEventBase> ev, TRequestSender *sender, TRequestCompleteCallback callback);
+ void Issue(NKikimrBlobDepot::TEvBlock msg, TRequestSender *sender, TRequestContext::TPtr context);
+ void Issue(NKikimrBlobDepot::TEvResolve msg, TRequestSender *sender, TRequestContext::TPtr context);
+ void Issue(NKikimrBlobDepot::TEvQueryBlocks msg, TRequestSender *sender, TRequestContext::TPtr context);
+
+ void Issue(std::unique_ptr<IEventBase> ev, TRequestSender *sender, TRequestContext::TPtr context);
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -146,7 +185,7 @@ namespace NKikimr::NBlobDepot {
std::deque<TAllocatedId> IdQ;
static constexpr size_t PreallocatedIdCount = 2;
- std::pair<TLogoBlobID, ui32> Allocate(TBlobDepotAgent& agent, ui32 size, ui32 type) {
+ std::pair<TLogoBlobID, ui32> Allocate(TBlobDepotAgent& agent, EBlobType type, ui32 part, ui32 size) {
if (IdQ.empty()) {
return {};
}
@@ -156,10 +195,7 @@ namespace NKikimr::NBlobDepot {
if (item.Begin == item.End) {
IdQ.pop_front();
}
- static constexpr ui32 typeBits = 24 - TCGSI::IndexBits;
- Y_VERIFY(type < (1 << typeBits));
- const ui32 cookie = cgsi.Index << typeBits | type;
- const TLogoBlobID id(agent.TabletId, cgsi.Generation, cgsi.Step, cgsi.Channel, size, cookie);
+ auto id = cgsi.MakeBlobId(agent.TabletId, type, part, size);
const auto [channel, groupId] = ChannelGroups[ChannelToIndex[cgsi.Channel]];
Y_VERIFY_DEBUG(channel == cgsi.Channel);
return {id, groupId};
@@ -175,23 +211,41 @@ namespace NKikimr::NBlobDepot {
struct TExecutingQueries {};
struct TPendingBlockChecks {};
- class TExecutingQuery
- : public TIntrusiveListItem<TExecutingQuery, TExecutingQueries>
- , public TIntrusiveListItem<TExecutingQuery, TPendingBlockChecks>
+ class TQuery
+ : public TIntrusiveListItem<TQuery, TExecutingQueries>
+ , public TIntrusiveListItem<TQuery, TPendingBlockChecks>
, public TRequestSender
{
+ friend class TBlobDepotAgent;
+
+ struct TReadContext {
+ TQuery *Query;
+ ui64 Tag;
+ TString Buffer;
+ ui64 Size;
+ NKikimrProto::EReplyStatus Status = NKikimrProto::OK;
+ THashMap<ui64, ui64> ReadsToOffset;
+ };
+ std::list<TReadContext> Reads;
+
protected:
std::unique_ptr<IEventHandle> Event; // original query event
const ui64 QueryId;
public:
- TExecutingQuery(TBlobDepotAgent& agent, std::unique_ptr<IEventHandle> event)
+ TQuery(TBlobDepotAgent& agent, std::unique_ptr<IEventHandle> event)
: TRequestSender(agent)
, Event(std::move(event))
, QueryId(RandomNumber<ui64>())
{}
- virtual ~TExecutingQuery() = default;
+ virtual ~TQuery() {
+ for (const auto& read : Reads) {
+ for (const auto& [id, _] : read.ReadsToOffset) {
+ Agent.ReadsInFlight.erase(id);
+ }
+ }
+ }
void EndWithError(NKikimrProto::EReplyStatus status, const TString& errorReason);
void EndWithSuccess(std::unique_ptr<IEventBase> response);
@@ -199,34 +253,55 @@ namespace NKikimr::NBlobDepot {
ui64 GetQueryId() const { return QueryId; }
virtual void Initiate() = 0;
- virtual void OnUpdateBlock() {}
+ virtual void OnUpdateBlock(bool /*success*/) {}
+ virtual void OnRead(ui64 /*tag*/, NKikimrProto::EReplyStatus /*status*/, TString /*dataOrErrorReason*/) {}
public:
struct TDeleter {
- static void Destroy(TExecutingQuery *query) { delete query; }
+ static void Destroy(TQuery *query) { delete query; }
};
};
std::deque<std::unique_ptr<IEventHandle>> PendingEventQ;
- TIntrusiveListWithAutoDelete<TExecutingQuery, TExecutingQuery::TDeleter, TExecutingQueries> ExecutingQueries;
+ TIntrusiveListWithAutoDelete<TQuery, TQuery::TDeleter, TExecutingQueries> ExecutingQueries;
void HandleStorageProxy(TAutoPtr<IEventHandle> ev);
- TExecutingQuery *CreateExecutingQuery(TAutoPtr<IEventHandle> ev);
- template<ui32 EventType> TExecutingQuery *CreateExecutingQuery(std::unique_ptr<IEventHandle> ev);
+ TQuery *CreateQuery(TAutoPtr<IEventHandle> ev);
+ template<ui32 EventType> TQuery *CreateQuery(std::unique_ptr<IEventHandle> ev);
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Blocks
- struct TBlockInfo {
- ui32 BlockedGeneration;
- TMonotonic ExpirationTimestamp; // not valid after
- bool RefreshInFlight = false;
- TIntrusiveList<TExecutingQuery, TPendingBlockChecks> PendingBlockChecks;
- };
+ class TBlocksManager;
+ struct TBlocksManagerDeleter { void operator ()(TBlocksManager*) const; };
+ using TBlocksManagerPtr = std::unique_ptr<TBlocksManager, TBlocksManagerDeleter>;
+ TBlocksManagerPtr BlocksManager;
+
+ TBlocksManagerPtr CreateBlocksManager();
+
+ NKikimrProto::EReplyStatus CheckBlockForTablet(ui64 tabletId, ui32 generation, TQuery *query,
+ ui32 *blockedGeneration = nullptr);
+
+ ui32 GetBlockForTablet(ui64 tabletId);
+
+ void SetBlockForTablet(ui64 tabletId, ui32 blockedGeneration, TMonotonic expirationTimestamp);
- std::unordered_map<ui64, TBlockInfo> Blocks;
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Reading
+
+ ui64 NextReadId = 1;
+ THashMap<ui64, std::list<TQuery::TReadContext>::iterator> ReadsInFlight;
+
+ bool IssueRead(const NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TValueChain>& values, ui64 offset, ui64 size,
+ NKikimrBlobStorage::EGetHandleClass getHandleClass, bool mustRestoreFirst, TQuery *query, ui64 tag,
+ bool vg, TString *error);
+
+ void Handle(TEvBlobStorage::TEvGetResult::TPtr ev);
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Status flags
- NKikimrProto::EReplyStatus CheckBlockForTablet(ui64 tabletId, ui32 generation, TExecutingQuery *query);
+ TStorageStatusFlags GetStorageStatusFlags() const;
};
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/agent_storage_block.cpp b/ydb/core/blob_depot/agent/agent_storage_block.cpp
deleted file mode 100644
index 1d90be19463..00000000000
--- a/ydb/core/blob_depot/agent/agent_storage_block.cpp
+++ /dev/null
@@ -1,55 +0,0 @@
-#include "agent_impl.h"
-
-namespace NKikimr::NBlobDepot {
-
- template<>
- TBlobDepotAgent::TExecutingQuery *TBlobDepotAgent::CreateExecutingQuery<TEvBlobStorage::EvBlock>(std::unique_ptr<IEventHandle> ev) {
- class TBlockExecutingQuery : public TExecutingQuery {
- public:
- using TExecutingQuery::TExecutingQuery;
-
- void Initiate() override {
- auto& msg = *Event->Get<TEvBlobStorage::TEvBlock>();
-
- // lookup existing blocks to try fail-fast
- if (const auto it = Agent.Blocks.find(msg.TabletId); it != Agent.Blocks.end()) {
- const TBlockInfo& block = it->second;
- if (msg.Generation <= block.BlockedGeneration) {
- // we don't consider ExpirationTimestamp here because blocked generation may only increase
- return EndWithError(NKikimrProto::RACE, "block race detected");
- }
- }
-
- // issue request to the tablet
- NKikimrBlobDepot::TEvBlock block;
- block.SetTabletId(msg.TabletId);
- block.SetBlockedGeneration(msg.Generation);
- Agent.Issue(std::move(block), this, std::bind(&TBlockExecutingQuery::HandleBlockResult, this,
- std::placeholders::_1, TActivationContext::Monotonic()));
- }
-
- bool HandleBlockResult(IEventBase *event, TMonotonic issueTimestamp) {
- if (!event) {
- EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected");
- } else if (const auto& msg = static_cast<TEvBlobDepot::TEvBlockResult&>(*event); !msg.Record.HasStatus()) {
- EndWithError(NKikimrProto::ERROR, "incorrect TEvBlockResult response");
- } else if (const auto status = msg.Record.GetStatus(); status != NKikimrProto::OK) {
- EndWithError(status, msg.Record.GetErrorReason());
- } else {
- // update blocks cache
- auto& query = *Event->Get<TEvBlobStorage::TEvBlock>();
- auto& block = Agent.Blocks[query.TabletId];
- Y_VERIFY(block.BlockedGeneration <= query.Generation); // TODO: QueryBlocks can race with Block?
- block.BlockedGeneration = query.Generation;
- block.ExpirationTimestamp = issueTimestamp + TDuration::MilliSeconds(msg.Record.GetTimeToLiveMs());
-
- EndWithSuccess(std::make_unique<TEvBlobStorage::TEvBlockResult>(NKikimrProto::OK));
- }
- return true;
- }
- };
-
- return new TBlockExecutingQuery(*this, std::move(ev));
- }
-
-} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/agent_storage_collect_garbage.cpp b/ydb/core/blob_depot/agent/agent_storage_collect_garbage.cpp
deleted file mode 100644
index 8a411246a52..00000000000
--- a/ydb/core/blob_depot/agent/agent_storage_collect_garbage.cpp
+++ /dev/null
@@ -1,10 +0,0 @@
-#include "agent_impl.h"
-
-namespace NKikimr::NBlobDepot {
-
- template<>
- TBlobDepotAgent::TExecutingQuery *TBlobDepotAgent::CreateExecutingQuery<TEvBlobStorage::EvCollectGarbage>(std::unique_ptr<IEventHandle> ev) {
- return (void)ev, nullptr;
- }
-
-} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/agent_storage_discover.cpp b/ydb/core/blob_depot/agent/agent_storage_discover.cpp
deleted file mode 100644
index 6cce45058a6..00000000000
--- a/ydb/core/blob_depot/agent/agent_storage_discover.cpp
+++ /dev/null
@@ -1,76 +0,0 @@
-#include "agent_impl.h"
-
-namespace NKikimr::NBlobDepot {
-
- template<>
- TBlobDepotAgent::TExecutingQuery *TBlobDepotAgent::CreateExecutingQuery<TEvBlobStorage::EvDiscover>(std::unique_ptr<IEventHandle> ev) {
- class TDiscoverExecutingQuery : public TExecutingQuery {
- public:
- using TExecutingQuery::TExecutingQuery;
-
- void Initiate() override {
- auto& msg = *Event->Get<TEvBlobStorage::TEvDiscover>();
-
- const TLogoBlobID from(msg.TabletId, Max<ui32>(), Max<ui32>(), 0, TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie);
- const TLogoBlobID to(msg.TabletId, 0, 0, 0, 0, 0);
-
- NKikimrBlobDepot::TEvResolve resolve;
- auto *item = resolve.AddItems();
- item->SetBeginningKey(from.GetRaw(), 3 * sizeof(ui64));
- item->SetIncludeBeginning(true);
- item->SetEndingKey(to.GetRaw(), 3 * sizeof(ui64));
- item->SetIncludeEnding(true);
- item->SetMaxKeys(1);
- item->SetReverse(true);
-
- Agent.Issue(std::move(resolve), this, std::bind(&TDiscoverExecutingQuery::HandleResolveResult,
- this, std::placeholders::_1));
-
- if (msg.DiscoverBlockedGeneration) {
- NKikimrBlobDepot::TEvQueryBlocks queryBlocks;
- queryBlocks.AddTabletIds(msg.TabletId);
- Agent.Issue(std::move(queryBlocks), this, std::bind(
- &TDiscoverExecutingQuery::HandleQueryBlocksResult, this,
- std::placeholders::_1, TActivationContext::Now()));
- }
- }
-
- bool HandleResolveResult(IEventBase *result) {
- if (!result) { // server has disconnected before request got processed
- EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected");
- return true;
- }
-
- auto& msg = static_cast<TEvBlobDepot::TEvResolveResult&>(*result);
-
- STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDASD01, "HandleResolveResult", (VirtualGroupId, Agent.VirtualGroupId),
- (QueryId, QueryId), (Msg, msg.Record));
-
- const NKikimrProto::EReplyStatus status = msg.Record.GetStatus();
- if (status != NKikimrProto::OK && status != NKikimrProto::OVERRUN) {
- EndWithError(status, msg.Record.GetErrorReason());
- return true;
- }
-
- return status != NKikimrProto::OVERRUN;
- }
-
- bool HandleQueryBlocksResult(IEventBase *result, TInstant /*issueTimestamp*/) {
- if (!result) {
- EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected");
- return true;
- }
-
- auto& msg = static_cast<TEvBlobDepot::TEvQueryBlocksResult&>(*result);
-
- STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDASD02, "HandleQueryBlocksResult", (VirtualGroupId, Agent.VirtualGroupId),
- (QueryId, QueryId), (Msg, msg.Record));
-
- return true;
- }
- };
-
- return new TDiscoverExecutingQuery(*this, std::move(ev));
- }
-
-} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/agent_storage_get.cpp b/ydb/core/blob_depot/agent/agent_storage_get.cpp
deleted file mode 100644
index 55d4e166904..00000000000
--- a/ydb/core/blob_depot/agent/agent_storage_get.cpp
+++ /dev/null
@@ -1,10 +0,0 @@
-#include "agent_impl.h"
-
-namespace NKikimr::NBlobDepot {
-
- template<>
- TBlobDepotAgent::TExecutingQuery *TBlobDepotAgent::CreateExecutingQuery<TEvBlobStorage::EvGet>(std::unique_ptr<IEventHandle> ev) {
- return (void)ev, nullptr;
- }
-
-} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/agent_storage_patch.cpp b/ydb/core/blob_depot/agent/agent_storage_patch.cpp
deleted file mode 100644
index 92bc96c9d01..00000000000
--- a/ydb/core/blob_depot/agent/agent_storage_patch.cpp
+++ /dev/null
@@ -1,10 +0,0 @@
-#include "agent_impl.h"
-
-namespace NKikimr::NBlobDepot {
-
- template<>
- TBlobDepotAgent::TExecutingQuery *TBlobDepotAgent::CreateExecutingQuery<TEvBlobStorage::EvPatch>(std::unique_ptr<IEventHandle> ev) {
- return (void)ev, nullptr;
- }
-
-} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/agent_storage_put.cpp b/ydb/core/blob_depot/agent/agent_storage_put.cpp
deleted file mode 100644
index 9628abdf27b..00000000000
--- a/ydb/core/blob_depot/agent/agent_storage_put.cpp
+++ /dev/null
@@ -1,46 +0,0 @@
-#include "agent_impl.h"
-
-namespace NKikimr::NBlobDepot {
-
- template<>
- TBlobDepotAgent::TExecutingQuery *TBlobDepotAgent::CreateExecutingQuery<TEvBlobStorage::EvPut>(std::unique_ptr<IEventHandle> ev) {
- class TPutExecutingQuery : public TExecutingQuery {
- ui32 BlockChecksRemain = 3;
-
- public:
- using TExecutingQuery::TExecutingQuery;
-
- void Initiate() override {
- auto& msg = *Event->Get<TEvBlobStorage::TEvPut>();
-
- // first step -- check blocks
- switch (Agent.CheckBlockForTablet(msg.Id.TabletID(), msg.Id.Generation(), this)) {
- case NKikimrProto::OK:
- return IssuePuts();
-
- case NKikimrProto::RACE:
- return EndWithError(NKikimrProto::RACE, "block race detected");
-
- case NKikimrProto::NOTREADY:
- if (!--BlockChecksRemain) {
- EndWithError(NKikimrProto::ERROR, "failed to acquire blocks");
- }
- return;
-
- default:
- Y_FAIL();
- }
- }
-
- void IssuePuts() {
- }
-
- void OnUpdateBlock() override {
- Initiate(); // just restart request
- }
- };
-
- return new TPutExecutingQuery(*this, std::move(ev));
- }
-
-} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/agent_storage_range.cpp b/ydb/core/blob_depot/agent/agent_storage_range.cpp
deleted file mode 100644
index d4dec509ee0..00000000000
--- a/ydb/core/blob_depot/agent/agent_storage_range.cpp
+++ /dev/null
@@ -1,10 +0,0 @@
-#include "agent_impl.h"
-
-namespace NKikimr::NBlobDepot {
-
- template<>
- TBlobDepotAgent::TExecutingQuery *TBlobDepotAgent::CreateExecutingQuery<TEvBlobStorage::EvRange>(std::unique_ptr<IEventHandle> ev) {
- return (void)ev, nullptr;
- }
-
-} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/agent_storage_status.cpp b/ydb/core/blob_depot/agent/agent_storage_status.cpp
deleted file mode 100644
index 36dcdf209bf..00000000000
--- a/ydb/core/blob_depot/agent/agent_storage_status.cpp
+++ /dev/null
@@ -1,10 +0,0 @@
-#include "agent_impl.h"
-
-namespace NKikimr::NBlobDepot {
-
- template<>
- TBlobDepotAgent::TExecutingQuery *TBlobDepotAgent::CreateExecutingQuery<TEvBlobStorage::EvStatus>(std::unique_ptr<IEventHandle> ev) {
- return (void)ev, nullptr;
- }
-
-} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/blocks.cpp b/ydb/core/blob_depot/agent/blocks.cpp
new file mode 100644
index 00000000000..86b49bc342f
--- /dev/null
+++ b/ydb/core/blob_depot/agent/blocks.cpp
@@ -0,0 +1,125 @@
+#include "agent_impl.h"
+
+namespace NKikimr::NBlobDepot {
+
+ class TBlobDepotAgent::TBlocksManager
+ : public TRequestSender
+ {
+ struct TBlockInfo {
+ ui32 BlockedGeneration;
+ TMonotonic ExpirationTimestamp; // not valid after
+ bool RefreshInFlight = false;
+ TIntrusiveList<TQuery, TPendingBlockChecks> PendingBlockChecks;
+ };
+
+ THashMap<ui64, TBlockInfo> Blocks;
+
+ public:
+ TBlocksManager(TBlobDepotAgent& agent)
+ : TRequestSender(agent)
+ {}
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ struct TQueryBlockContext : TRequestContext {
+ TMonotonic Timestamp;
+ ui64 TabletId;
+
+ TQueryBlockContext(TMonotonic timestamp, ui64 tabletId)
+ : Timestamp(timestamp)
+ , TabletId(tabletId)
+ {}
+ };
+
+ NKikimrProto::EReplyStatus CheckBlockForTablet(ui64 tabletId, ui32 generation, TQuery *query,
+ ui32 *blockedGeneration) {
+ auto& block = Blocks[tabletId];
+ const TMonotonic issueTime = TActivationContext::Monotonic();
+ if (generation <= block.BlockedGeneration) {
+ return NKikimrProto::RACE;
+ } else if (issueTime < block.ExpirationTimestamp) {
+ if (blockedGeneration) {
+ *blockedGeneration = block.BlockedGeneration;
+ }
+ return NKikimrProto::OK;
+ } else if (!block.RefreshInFlight) {
+ NKikimrBlobDepot::TEvQueryBlocks queryBlocks;
+ queryBlocks.AddTabletIds(tabletId);
+ Agent.Issue(std::move(queryBlocks), this, std::make_shared<TQueryBlockContext>(
+ TActivationContext::Monotonic(), tabletId));
+ block.RefreshInFlight = true;
+ block.PendingBlockChecks.PushBack(query);
+ }
+ return NKikimrProto::UNKNOWN;
+ }
+
+ void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr context, TResponse response) override {
+ auto& queryBlockContext = context->Obtain<TQueryBlockContext>();
+ auto& block = Blocks[queryBlockContext.TabletId];
+
+ if (auto *p = std::get_if<TEvBlobDepot::TEvQueryBlocksResult*>(&response)) {
+ auto& msg = **p;
+ STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC08, "TEvQueryBlocksResult", (VirtualGroupId, Agent.VirtualGroupId),
+ (Msg, msg.Record), (TabletId, queryBlockContext.TabletId));
+ Y_VERIFY(msg.Record.BlockedGenerationsSize() == 1);
+ const ui32 newBlockedGeneration = msg.Record.GetBlockedGenerations(0);
+ Y_VERIFY(block.BlockedGeneration <= newBlockedGeneration);
+ block.BlockedGeneration = newBlockedGeneration;
+ block.ExpirationTimestamp = queryBlockContext.Timestamp + TDuration::MilliSeconds(msg.Record.GetTimeToLiveMs());
+ } else {
+ Y_VERIFY(std::holds_alternative<TTabletDisconnected>(response));
+ }
+
+ TIntrusiveList<TQuery, TPendingBlockChecks> temp;
+ temp.Swap(block.PendingBlockChecks);
+ for (auto it = temp.begin(); it != temp.end(); ) {
+ const auto current = it++;
+ current->OnUpdateBlock(!std::holds_alternative<TTabletDisconnected>(response));
+ }
+ }
+
+ ui32 GetBlockForTablet(ui64 tabletId) {
+ const auto it = Blocks.find(tabletId);
+ return it != Blocks.end() ? it->second.BlockedGeneration : 0;
+ }
+
+ void SetBlockForTablet(ui64 tabletId, ui32 blockedGeneration, TMonotonic expirationTimestamp) {
+ auto& block = Blocks[tabletId];
+ Y_VERIFY(block.BlockedGeneration <= blockedGeneration);
+ if (block.BlockedGeneration < blockedGeneration) {
+ block.BlockedGeneration = blockedGeneration;
+ block.ExpirationTimestamp = expirationTimestamp;
+ } else if (block.ExpirationTimestamp < expirationTimestamp) {
+ block.ExpirationTimestamp = expirationTimestamp;
+ }
+ }
+ };
+
+ void TBlobDepotAgent::TBlocksManagerDeleter::operator ()(TBlocksManager *object) const {
+ delete object;
+ }
+
+ TBlobDepotAgent::TBlocksManagerPtr TBlobDepotAgent::CreateBlocksManager() {
+ return TBlocksManagerPtr(new TBlocksManager(*this));
+ }
+
+ void TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvQueryBlocks msg, TRequestSender *sender, TRequestContext::TPtr context) {
+ auto ev = std::make_unique<TEvBlobDepot::TEvQueryBlocks>();
+ msg.Swap(&ev->Record);
+ Issue(std::move(ev), sender, std::move(context));
+ }
+
+ NKikimrProto::EReplyStatus TBlobDepotAgent::CheckBlockForTablet(ui64 tabletId, ui32 generation, TQuery *query,
+ ui32 *blockedGeneration) {
+ return BlocksManager->CheckBlockForTablet(tabletId, generation, query, blockedGeneration);
+ }
+
+ ui32 TBlobDepotAgent::GetBlockForTablet(ui64 tabletId) {
+ return BlocksManager->GetBlockForTablet(tabletId);
+ }
+
+ void TBlobDepotAgent::SetBlockForTablet(ui64 tabletId, ui32 blockedGeneration, TMonotonic expirationTimestamp) {
+ BlocksManager->SetBlockForTablet(tabletId, blockedGeneration, expirationTimestamp);
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/comm.cpp b/ydb/core/blob_depot/agent/comm.cpp
new file mode 100644
index 00000000000..480504db430
--- /dev/null
+++ b/ydb/core/blob_depot/agent/comm.cpp
@@ -0,0 +1,131 @@
+#include "agent_impl.h"
+
+namespace NKikimr::NBlobDepot {
+
+ void TBlobDepotAgent::Handle(TEvTabletPipe::TEvClientConnected::TPtr ev) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDAC01, "TEvClientConnected", (VirtualGroupId, VirtualGroupId),
+ (Msg, ev->Get()->ToString()));
+ }
+
+ void TBlobDepotAgent::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev) {
+ STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC02, "TEvClientDestroyed", (VirtualGroupId, VirtualGroupId),
+ (Msg, ev->Get()->ToString()));
+ PipeId = {};
+ OnDisconnect();
+ ConnectToBlobDepot();
+ }
+
+ void TBlobDepotAgent::ConnectToBlobDepot() {
+ PipeId = Register(NTabletPipe::CreateClient(SelfId(), TabletId, NTabletPipe::TClientRetryPolicy::WithRetries()));
+ const ui64 id = NextRequestId++;
+ NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobDepot::TEvRegisterAgent(VirtualGroupId), id);
+ RegisterRequest(id, this, nullptr, true);
+ IssueAllocateIdsIfNeeded(NKikimrBlobDepot::TChannelKind::Data);
+ IssueAllocateIdsIfNeeded(NKikimrBlobDepot::TChannelKind::Log);
+ }
+
+ void TBlobDepotAgent::HandleRegisterAgentResult(TRequestContext::TPtr /*context*/, TEvBlobDepot::TEvRegisterAgentResult& msg) {
+ STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC06, "TEvRegisterAgentResult", (VirtualGroupId, VirtualGroupId),
+ (Msg, msg.Record));
+ Registered = true;
+ BlobDepotGeneration = msg.Record.GetGeneration();
+ for (const auto& kind : msg.Record.GetChannelKinds()) {
+ auto& v = ChannelKinds[kind.GetChannelKind()];
+ v.ChannelGroups.clear();
+ v.IndexToChannel.clear();
+ for (const auto& channelGroup : kind.GetChannelGroups()) {
+ const ui8 channel = channelGroup.GetChannel();
+ const ui32 groupId = channelGroup.GetGroupId();
+ v.ChannelGroups.emplace_back(channel, groupId);
+ v.ChannelToIndex[channel] = v.IndexToChannel.size();
+ v.IndexToChannel.push_back(channel);
+ }
+ }
+ }
+
+ void TBlobDepotAgent::IssueAllocateIdsIfNeeded(NKikimrBlobDepot::TChannelKind::E channelKind) {
+ auto& kind = ChannelKinds[channelKind];
+
+ STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC09, "IssueAllocateIdsIfNeeded", (VirtualGroupId, VirtualGroupId),
+ (ChannelKind, NKikimrBlobDepot::TChannelKind::E_Name(channelKind)),
+ (IdAllocInFlight, kind.IdAllocInFlight), (IdQ.size, kind.IdQ.size()),
+ (PreallocatedIdCount, kind.PreallocatedIdCount), (PipeId, PipeId));
+ if (!kind.IdAllocInFlight && kind.IdQ.size() < kind.PreallocatedIdCount && PipeId) {
+ const ui64 id = NextRequestId++;
+ NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobDepot::TEvAllocateIds(channelKind), id);
+ RegisterRequest(id, this, std::make_shared<TAllocateIdsContext>(channelKind), true);
+ kind.IdAllocInFlight = true;
+ }
+ }
+
+ void TBlobDepotAgent::HandleAllocateIdsResult(TRequestContext::TPtr context, TEvBlobDepot::TEvAllocateIdsResult& msg) {
+ auto& allocateIdsContext = context->Obtain<TAllocateIdsContext>();
+ auto& kind = ChannelKinds[allocateIdsContext.ChannelKind];
+
+ Y_VERIFY(kind.IdAllocInFlight);
+ kind.IdAllocInFlight = false;
+
+ STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC07, "TEvAllocateIdsResult", (VirtualGroupId, VirtualGroupId),
+ (Msg, msg.Record));
+ Y_VERIFY(msg.Record.GetChannelKind() == allocateIdsContext.ChannelKind);
+ Y_VERIFY(msg.Record.GetGeneration() == BlobDepotGeneration);
+
+ if (msg.Record.HasRangeBegin() && msg.Record.HasRangeEnd()) {
+ kind.IdQ.push_back({BlobDepotGeneration, msg.Record.GetRangeBegin(), msg.Record.GetRangeEnd()});
+
+ // FIXME notify waiting requests about new ids
+
+ // ask for more ids if needed
+ IssueAllocateIdsIfNeeded(allocateIdsContext.ChannelKind);
+ } else {
+ // no such channel allocated
+ }
+ }
+
+ void TBlobDepotAgent::OnDisconnect() {
+ STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC04, "OnDisconnect", (VirtualGroupId, VirtualGroupId));
+
+ for (auto& [id, sender] : std::exchange(TabletRequestInFlight, {})) {
+ sender->OnRequestComplete(id, TTabletDisconnected{});
+ }
+
+ for (auto& [_, kind] : ChannelKinds) {
+ kind.IdAllocInFlight = false;
+ }
+
+ Registered = false;
+ }
+
+ void TBlobDepotAgent::ProcessResponse(ui64 /*id*/, TRequestContext::TPtr context, TResponse response) {
+ std::visit([&](auto&& item) {
+ using T = std::decay_t<decltype(item)>;
+ if constexpr (std::is_same_v<T, TEvBlobDepot::TEvRegisterAgentResult*>) {
+ HandleRegisterAgentResult(std::move(context), *item);
+ } else if constexpr (std::is_same_v<T, TEvBlobDepot::TEvAllocateIdsResult*>) {
+ HandleAllocateIdsResult(std::move(context), *item);
+ } else if constexpr (!std::is_same_v<T, TTabletDisconnected>) {
+ Y_FAIL();
+ }
+ }, response);
+ }
+
+ void TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvBlock msg, TRequestSender *sender, TRequestContext::TPtr context) {
+ auto ev = std::make_unique<TEvBlobDepot::TEvBlock>();
+ msg.Swap(&ev->Record);
+ Issue(std::move(ev), sender, std::move(context));
+ }
+
+ void TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvResolve msg, TRequestSender *sender, TRequestContext::TPtr context) {
+ auto ev = std::make_unique<TEvBlobDepot::TEvResolve>();
+ msg.Swap(&ev->Record);
+ Issue(std::move(ev), sender, std::move(context));
+ }
+
+ void TBlobDepotAgent::Issue(std::unique_ptr<IEventBase> ev, TRequestSender *sender, TRequestContext::TPtr context) {
+ const ui64 id = NextRequestId++;
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDAC03, "Issue", (VirtualGroupId, VirtualGroupId), (Id, id), (Msg, ev->ToString()));
+ NTabletPipe::SendData(SelfId(), PipeId, ev.release(), id);
+ RegisterRequest(id, sender, std::move(context), true);
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/agent_storage_query.cpp b/ydb/core/blob_depot/agent/query.cpp
index a0dc670d536..533a793077e 100644
--- a/ydb/core/blob_depot/agent/agent_storage_query.cpp
+++ b/ydb/core/blob_depot/agent/query.cpp
@@ -7,7 +7,7 @@ namespace NKikimr::NBlobDepot {
// TODO: memory usage control
PendingEventQ.emplace_back(ev.Release());
} else {
- auto *query = CreateExecutingQuery(ev);
+ auto *query = CreateQuery(ev);
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA01, "new query", (VirtualGroupId, VirtualGroupId),
(QueryId, query->GetQueryId()), (Name, query->GetName()));
if (!TabletId) {
@@ -18,10 +18,10 @@ namespace NKikimr::NBlobDepot {
}
}
- TBlobDepotAgent::TExecutingQuery *TBlobDepotAgent::CreateExecutingQuery(TAutoPtr<IEventHandle> ev) {
+ TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery(TAutoPtr<IEventHandle> ev) {
switch (ev->GetTypeRewrite()) {
#define XX(TYPE) \
- case TEvBlobStorage::TYPE: return CreateExecutingQuery<TEvBlobStorage::TYPE>(std::unique_ptr<IEventHandle>(ev.Release()));
+ case TEvBlobStorage::TYPE: return CreateQuery<TEvBlobStorage::TYPE>(std::unique_ptr<IEventHandle>(ev.Release()));
ENUMERATE_INCOMING_EVENTS(XX)
#undef XX
@@ -29,7 +29,7 @@ namespace NKikimr::NBlobDepot {
Y_FAIL();
}
- void TBlobDepotAgent::TExecutingQuery::EndWithError(NKikimrProto::EReplyStatus status, const TString& errorReason) {
+ void TBlobDepotAgent::TQuery::EndWithError(NKikimrProto::EReplyStatus status, const TString& errorReason) {
STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA02, "query ends with error", (VirtualGroupId, Agent.VirtualGroupId),
(QueryId, QueryId), (Status, status), (ErrorReason, errorReason));
@@ -48,14 +48,14 @@ namespace NKikimr::NBlobDepot {
delete this;
}
- void TBlobDepotAgent::TExecutingQuery::EndWithSuccess(std::unique_ptr<IEventBase> response) {
+ void TBlobDepotAgent::TQuery::EndWithSuccess(std::unique_ptr<IEventBase> response) {
STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA02, "query ends with success", (VirtualGroupId, Agent.VirtualGroupId),
(QueryId, QueryId), (Response, response->ToString()));
Agent.SelfId().Send(Event->Sender, response.release(), 0, Event->Cookie);
delete this;
}
- TString TBlobDepotAgent::TExecutingQuery::GetName() const {
+ TString TBlobDepotAgent::TQuery::GetName() const {
switch (Event->GetTypeRewrite()) {
#define XX(TYPE) case TEvBlobStorage::TYPE: return #TYPE;
ENUMERATE_INCOMING_EVENTS(XX)
diff --git a/ydb/core/blob_depot/agent/read.cpp b/ydb/core/blob_depot/agent/read.cpp
new file mode 100644
index 00000000000..53e7f0cef24
--- /dev/null
+++ b/ydb/core/blob_depot/agent/read.cpp
@@ -0,0 +1,128 @@
+#include "agent_impl.h"
+
+namespace NKikimr::NBlobDepot {
+
+ bool TBlobDepotAgent::IssueRead(const NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TValueChain>& values, ui64 offset,
+ ui64 size, NKikimrBlobStorage::EGetHandleClass getHandleClass, bool mustRestoreFirst, TQuery *query,
+ ui64 tag, bool vg, TString *error) {
+ ui64 outputOffset = 0;
+
+ struct TReadItem {
+ ui32 GroupId;
+ TLogoBlobID Id;
+ ui32 Offset;
+ ui32 Size;
+ ui64 OutputOffset;
+ };
+ std::vector<TReadItem> items;
+
+ for (const auto& value : values) {
+ if (!value.HasLocator()) {
+ *error = "TValueChain.Locator is missing";
+ return false;
+ }
+ const auto& locator = value.GetLocator();
+ const ui64 totalDataLen = locator.GetTotalDataLen();
+ if (!totalDataLen) {
+ *error = "TBlobLocator.TotalDataLen is missing or zero";
+ return false;
+ }
+ const ui64 begin = value.GetSubrangeBegin();
+ const ui64 end = value.HasSubrangeEnd() ? value.GetSubrangeEnd() : totalDataLen;
+ if (end <= begin || totalDataLen < end) {
+ *error = "incorrect SubrangeBegin/SubrangeEnd pair";
+ return false;
+ }
+
+ const ui64 partLen = end - begin;
+ if (offset >= partLen) {
+ // just skip this part
+ offset -= partLen;
+ continue;
+ }
+
+ const ui64 partSize = Min(size ? size : Max<ui64>(), partLen - offset);
+
+ auto cgsi = TCGSI::FromProto(locator.GetBlobSeqId());
+
+ if (vg) {
+ const bool composite = totalDataLen + sizeof(TVirtualGroupBlobFooter) <= MaxBlobSize;
+ const EBlobType type = composite ? EBlobType::VG_COMPOSITE_BLOB : EBlobType::VG_DATA_BLOB;
+ const ui32 blobSize = totalDataLen + (composite ? sizeof(TVirtualGroupBlobFooter) : 0);
+ const auto id = cgsi.MakeBlobId(TabletId, type, 0, blobSize);
+ items.push_back(TReadItem{locator.GetGroupId(), id, static_cast<ui32>(offset + begin),
+ static_cast<ui32>(partSize), outputOffset});
+ } else {
+ Y_FAIL();
+ }
+
+ if (size) {
+ size -= partSize;
+ if (!size) {
+ break;
+ }
+ }
+ offset = 0;
+ outputOffset += partSize;
+ }
+
+ if (size) {
+ *error = "incorrect offset/size provided";
+ return false;
+ }
+
+ auto& reads = query->Reads;
+ auto iter = reads.insert(reads.end(), TQuery::TReadContext{
+ .Query = query,
+ .Tag = tag,
+ .Size = outputOffset,
+ });
+
+ for (const TReadItem& item : items) {
+ const ui64 id = NextReadId++;
+ SendToBSProxy(SelfId(), item.GroupId, new TEvBlobStorage::TEvGet(item.Id, item.Offset, item.Size,
+ TInstant::Max(), getHandleClass, mustRestoreFirst), id);
+ ReadsInFlight.emplace(id, iter);
+ iter->ReadsToOffset.emplace(id, item.OutputOffset);
+ }
+
+ return true;
+ }
+
+ void TBlobDepotAgent::Handle(TEvBlobStorage::TEvGetResult::TPtr ev) {
+ if (const auto it = ReadsInFlight.find(ev->Cookie); it != ReadsInFlight.end()) {
+ auto ctx = it->second;
+ ReadsInFlight.erase(it);
+
+ const auto offsetIt = ctx->ReadsToOffset.find(ev->Cookie);
+ Y_VERIFY(offsetIt != ctx->ReadsToOffset.end());
+ const ui64 offset = offsetIt->second;
+ ctx->ReadsToOffset.erase(offsetIt);
+
+ auto& msg = *ev->Get();
+ if (msg.Status != NKikimrProto::OK) {
+ ctx->Status = msg.Status;
+ ctx->Buffer = std::move(msg.ErrorReason);
+ } else if (ctx->Status == NKikimrProto::OK) {
+ Y_VERIFY(msg.ResponseSz == 1);
+ auto& blob = msg.Responses[1];
+ Y_VERIFY(offset + blob.Buffer.size() <= ctx->Size);
+ if (!ctx->Buffer && !offset) {
+ ctx->Buffer = std::move(blob.Buffer);
+ ctx->Buffer.resize(ctx->Size);
+ } else {
+ if (!ctx->Buffer) {
+ ctx->Buffer = TString::Uninitialized(ctx->Size);
+ }
+ memcpy(ctx->Buffer.Detach() + offset, blob.Buffer.data(), blob.Buffer.size());
+ }
+ }
+
+ if (ctx->ReadsToOffset.empty()) {
+ ctx->Query->OnRead(ctx->Tag, ctx->Status, std::move(ctx->Buffer));
+ ctx->Query->Reads.erase(ctx);
+ }
+ }
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/request.cpp b/ydb/core/blob_depot/agent/request.cpp
new file mode 100644
index 00000000000..46639c9b379
--- /dev/null
+++ b/ydb/core/blob_depot/agent/request.cpp
@@ -0,0 +1,75 @@
+#include "agent_impl.h"
+
+namespace NKikimr::NBlobDepot {
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // TRequestSender class
+
+ TRequestSender::TRequestSender(TBlobDepotAgent& agent)
+ : Agent(agent)
+ {}
+
+ TRequestSender::~TRequestSender() {
+ if (this != &Agent) {
+ for (const auto& [id, context] : RequestsInFlight) {
+ const size_t numErased = Agent.TabletRequestInFlight.erase(id) + Agent.OtherRequestInFlight.erase(id);
+ Y_VERIFY(numErased == 1);
+ }
+ }
+ }
+
+ void TRequestSender::RegisterRequest(ui64 id, TRequestContext::TPtr context) {
+ const auto [_, inserted] = RequestsInFlight.emplace(id, std::move(context));
+ Y_VERIFY(inserted);
+ }
+
+ void TRequestSender::OnRequestComplete(ui64 id, TResponse response) {
+ const auto it = RequestsInFlight.find(id);
+ Y_VERIFY(it != RequestsInFlight.end());
+ TRequestContext::TPtr context = std::move(it->second);
+ RequestsInFlight.erase(it);
+ ProcessResponse(id, std::move(context), std::move(response));
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // TBlobDepotAgent machinery
+
+ void TBlobDepotAgent::RegisterRequest(ui64 id, TRequestSender *sender, TRequestContext::TPtr context, bool toBlobDepotTablet) {
+ auto& map = toBlobDepotTablet ? TabletRequestInFlight : OtherRequestInFlight;
+ const auto [_, inserted] = map.emplace(id, sender);
+ Y_VERIFY(inserted);
+ sender->RegisterRequest(id, std::move(context));
+ }
+
+ template<typename TEvent>
+ void TBlobDepotAgent::HandleTabletResponse(TAutoPtr<TEventHandle<TEvent>> ev) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA01, "HandleTabletResponse", (VirtualGroupId, VirtualGroupId),
+ (Id, ev->Cookie), (Type, TypeName<TEvent>()));
+ auto *event = ev->Get();
+ HandleResponse(reinterpret_cast<TAutoPtr<IEventHandle>&>(ev), event, TabletRequestInFlight);
+ }
+
+ template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvRegisterAgentResult::TPtr ev);
+ template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvAllocateIdsResult::TPtr ev);
+ template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvBlockResult::TPtr ev);
+ template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvQueryBlocksResult::TPtr ev);
+ template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvCommitBlobSeqResult::TPtr ev);
+ template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvResolveResult::TPtr ev);
+
+ template<typename TEvent>
+ void TBlobDepotAgent::HandleOtherResponse(TAutoPtr<TEventHandle<TEvent>> ev) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA02, "HandleOtherResponse", (VirtualGroupId, VirtualGroupId),
+ (Id, ev->Cookie), (Type, TypeName<TEvent>()));
+ auto *event = ev->Get();
+ HandleResponse(ev, event, OtherRequestInFlight);
+ }
+
+ void TBlobDepotAgent::HandleResponse(TAutoPtr<IEventHandle> ev, TResponse response, THashMap<ui64, TRequestSender*>& map) {
+ const auto it = map.find(ev->Cookie);
+ Y_VERIFY(it != map.end());
+ const auto [id, sender] = *it;
+ map.erase(it);
+ sender->OnRequestComplete(id, std::move(response));
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/status.cpp b/ydb/core/blob_depot/agent/status.cpp
new file mode 100644
index 00000000000..b1e57338bd6
--- /dev/null
+++ b/ydb/core/blob_depot/agent/status.cpp
@@ -0,0 +1,9 @@
+#include "agent_impl.h"
+
+namespace NKikimr::NBlobDepot {
+
+ TStorageStatusFlags TBlobDepotAgent::GetStorageStatusFlags() const {
+ return NKikimrBlobStorage::StatusIsValid; // FIXME: implement
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/storage_block.cpp b/ydb/core/blob_depot/agent/storage_block.cpp
new file mode 100644
index 00000000000..9503615f912
--- /dev/null
+++ b/ydb/core/blob_depot/agent/storage_block.cpp
@@ -0,0 +1,65 @@
+#include "agent_impl.h"
+
+namespace NKikimr::NBlobDepot {
+
+ template<>
+ TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvBlock>(std::unique_ptr<IEventHandle> ev) {
+ class TBlockQuery : public TQuery {
+ struct TBlockContext : TRequestContext {
+ TMonotonic Timestamp;
+
+ TBlockContext(TMonotonic timestamp)
+ : Timestamp(timestamp)
+ {}
+ };
+
+ public:
+ using TQuery::TQuery;
+
+ void Initiate() override {
+ auto& msg = *Event->Get<TEvBlobStorage::TEvBlock>();
+
+ // lookup existing blocks to try fail-fast
+ const ui32 blockedGeneration = Agent.GetBlockForTablet(msg.TabletId);
+ if (msg.Generation <= blockedGeneration) {
+ // we don't consider ExpirationTimestamp here because blocked generation may only increase
+ return EndWithError(NKikimrProto::RACE, "block race detected");
+ }
+
+ // issue request to the tablet
+ NKikimrBlobDepot::TEvBlock block;
+ block.SetTabletId(msg.TabletId);
+ block.SetBlockedGeneration(msg.Generation);
+ Agent.Issue(std::move(block), this, std::make_shared<TBlockContext>(TActivationContext::Monotonic()));
+ }
+
+ void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr context, TResponse response) override {
+ if (std::holds_alternative<TTabletDisconnected>(response)) {
+ EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected");
+ } else if (auto *p = std::get_if<TEvBlobDepot::TEvBlockResult*>(&response)) {
+ return HandleBlockResult(std::move(context), **p);
+ } else {
+ Y_FAIL();
+ }
+ }
+
+ void HandleBlockResult(TRequestContext::TPtr context, TEvBlobDepot::TEvBlockResult& msg) {
+ if (!msg.Record.HasStatus()) {
+ EndWithError(NKikimrProto::ERROR, "incorrect TEvBlockResult response");
+ } else if (const auto status = msg.Record.GetStatus(); status != NKikimrProto::OK) {
+ EndWithError(status, msg.Record.GetErrorReason());
+ } else {
+ // update blocks cache
+ auto& blockContext = context->Obtain<TBlockContext>();
+ auto& query = *Event->Get<TEvBlobStorage::TEvBlock>();
+ Agent.SetBlockForTablet(query.TabletId, query.Generation, blockContext.Timestamp +
+ TDuration::MilliSeconds(msg.Record.GetTimeToLiveMs()));
+ EndWithSuccess(std::make_unique<TEvBlobStorage::TEvBlockResult>(NKikimrProto::OK));
+ }
+ }
+ };
+
+ return new TBlockQuery(*this, std::move(ev));
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/storage_collect_garbage.cpp b/ydb/core/blob_depot/agent/storage_collect_garbage.cpp
new file mode 100644
index 00000000000..eeaa7de4368
--- /dev/null
+++ b/ydb/core/blob_depot/agent/storage_collect_garbage.cpp
@@ -0,0 +1,38 @@
+#include "agent_impl.h"
+
+namespace NKikimr::NBlobDepot {
+
+ template<>
+ TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvCollectGarbage>(std::unique_ptr<IEventHandle> ev) {
+ class TCollectGarbageQuery : public TQuery {
+ ui32 BlockChecksRemain = 3;
+
+ public:
+ using TQuery::TQuery;
+
+ void Initiate() override {
+ auto& msg = *Event->Get<TEvBlobStorage::TEvCollectGarbage>();
+
+ const auto status = Agent.CheckBlockForTablet(msg.TabletId, msg.RecordGeneration, this);
+ if (status == NKikimrProto::OK) {
+ IssueCollectGarbage();
+ } else if (status != NKikimrProto::UNKNOWN) {
+ EndWithError(status, "block race detected");
+ } else if (!--BlockChecksRemain) {
+ EndWithError(NKikimrProto::ERROR, "failed to acquire blocks");
+ }
+ }
+
+ void IssueCollectGarbage() {
+ }
+
+ void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr /*context*/, TResponse response) override {
+ (void)response;
+ Y_FAIL();
+ }
+ };
+
+ return new TCollectGarbageQuery(*this, std::move(ev));
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/storage_discover.cpp b/ydb/core/blob_depot/agent/storage_discover.cpp
new file mode 100644
index 00000000000..61e22a3c578
--- /dev/null
+++ b/ydb/core/blob_depot/agent/storage_discover.cpp
@@ -0,0 +1,154 @@
+#include "agent_impl.h"
+
+namespace NKikimr::NBlobDepot {
+
+ template<>
+ TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvDiscover>(std::unique_ptr<IEventHandle> ev) {
+ class TDiscoverQuery : public TQuery {
+ ui64 TabletId = 0;
+ bool ReadBody;
+ ui32 MinGeneration = 0;
+
+ bool DoneWithBlockedGeneration = false;
+ bool DoneWithData = false;
+
+ TLogoBlobID Id;
+ TString Buffer;
+ ui32 BlockedGeneration = 0;
+
+ public:
+ using TQuery::TQuery;
+
+ void Initiate() override {
+ auto& msg = *Event->Get<TEvBlobStorage::TEvDiscover>();
+
+ TabletId = msg.TabletId;
+ ReadBody = msg.ReadBody;
+ MinGeneration = msg.MinGeneration;
+
+ IssueResolve();
+
+ if (msg.DiscoverBlockedGeneration) {
+ const auto status = Agent.CheckBlockForTablet(TabletId, Max<ui32>(), this, &BlockedGeneration);
+ if (status == NKikimrProto::OK) {
+ DoneWithBlockedGeneration = true;
+ } else if (status != NKikimrProto::UNKNOWN) {
+ EndWithError(status, "tablet was deleted");
+ }
+ } else {
+ DoneWithBlockedGeneration = true;
+ }
+ }
+
+ void IssueResolve() {
+ const ui8 channel = 0;
+ const TLogoBlobID from(TabletId, MinGeneration, 0, channel, 0, 0);
+ const TLogoBlobID to(TabletId, Max<ui32>(), Max<ui32>(), channel, TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie);
+
+ NKikimrBlobDepot::TEvResolve resolve;
+ auto *item = resolve.AddItems();
+ item->SetBeginningKey(from.GetRaw(), 3 * sizeof(ui64));
+ item->SetIncludeBeginning(true);
+ item->SetEndingKey(to.GetRaw(), 3 * sizeof(ui64));
+ item->SetIncludeEnding(true);
+ item->SetMaxKeys(1);
+ item->SetReverse(true);
+
+ Agent.Issue(std::move(resolve), this, nullptr);
+ }
+
+ void ProcessResponse(ui64 id, TRequestContext::TPtr context, TResponse response) override {
+ if (std::holds_alternative<TTabletDisconnected>(response)) {
+ return EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected");
+ } else if (auto *p = std::get_if<TEvBlobDepot::TEvResolveResult*>(&response)) {
+ HandleResolveResult(id, std::move(context), **p);
+ } else {
+ Y_FAIL();
+ }
+ }
+
+ void OnUpdateBlock(bool success) override {
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDASD02, "OnUpdateBlock", (VirtualGroupId, Agent.VirtualGroupId),
+ (QueryId, QueryId), (Success, success));
+
+ if (!success) {
+ return EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected");
+ }
+
+ const auto status = Agent.CheckBlockForTablet(TabletId, Max<ui32>(), this, &BlockedGeneration);
+ if (status == NKikimrProto::OK) {
+ DoneWithBlockedGeneration = true;
+ CheckIfDone();
+ } else if (status != NKikimrProto::UNKNOWN) {
+ EndWithError(status, "tablet was deleted");
+ } else {
+ Y_FAIL();
+ }
+ }
+
+ void HandleResolveResult(ui64 id, TRequestContext::TPtr context, TEvBlobDepot::TEvResolveResult& msg) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDASD01, "HandleResolveResult", (VirtualGroupId, Agent.VirtualGroupId),
+ (QueryId, QueryId), (Msg, msg.Record));
+
+ const NKikimrProto::EReplyStatus status = msg.Record.GetStatus();
+ if (status != NKikimrProto::OK && status != NKikimrProto::OVERRUN) {
+ return EndWithError(status, msg.Record.GetErrorReason());
+ }
+
+ if (status == NKikimrProto::OK) {
+ for (const auto& item : msg.Record.GetResolvedKeys()) {
+ const TString& id = item.GetKey();
+ Y_VERIFY(id.size() == 3 * sizeof(ui64));
+ Y_VERIFY(!Id);
+ Id = TLogoBlobID(reinterpret_cast<const ui64*>(id.data()));
+ Y_VERIFY(item.ValueChainSize() == 1);
+ if (ReadBody) {
+ TString error;
+ if (!Agent.IssueRead(item.GetValueChain(), 0, 0, NKikimrBlobStorage::Discover, true, this, 0,
+ true, &error)) {
+ return EndWithError(NKikimrProto::ERROR, TStringBuilder() << "failed to read discovered blob: "
+ << error);
+ }
+ }
+ }
+
+ if (!ReadBody) {
+ DoneWithData = true;
+ CheckIfDone();
+ }
+ } else {
+ Y_FAIL(); // do not expect to return single key in few messages
+ }
+
+ if (status == NKikimrProto::OVERRUN) { // there will be extra message with data
+ Agent.RegisterRequest(id, this, std::move(context), true);
+ }
+ }
+
+ void OnRead(ui64 /*tag*/, NKikimrProto::EReplyStatus status, TString dataOrErrorReason) override {
+ if (status == NKikimrProto::OK) {
+ Buffer = std::move(dataOrErrorReason);
+ DoneWithData = true;
+ CheckIfDone();
+ } else if (status == NKikimrProto::NODATA) {
+ // this may indicate a data race between locator and key value, we have to restart our resolution query
+ IssueResolve();
+ // FIXME: infinite cycle?
+ } else {
+ EndWithError(status, dataOrErrorReason);
+ }
+ }
+
+ void CheckIfDone() {
+ if (DoneWithBlockedGeneration && DoneWithData) {
+ EndWithSuccess(Id
+ ? std::make_unique<TEvBlobStorage::TEvDiscoverResult>(Id, MinGeneration, Buffer, BlockedGeneration)
+ : std::make_unique<TEvBlobStorage::TEvDiscoverResult>(NKikimrProto::NODATA, MinGeneration, BlockedGeneration));
+ }
+ }
+ };
+
+ return new TDiscoverQuery(*this, std::move(ev));
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/storage_get.cpp b/ydb/core/blob_depot/agent/storage_get.cpp
new file mode 100644
index 00000000000..2857441f0a9
--- /dev/null
+++ b/ydb/core/blob_depot/agent/storage_get.cpp
@@ -0,0 +1,24 @@
+#include "agent_impl.h"
+
+namespace NKikimr::NBlobDepot {
+
+ template<>
+ TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvGet>(std::unique_ptr<IEventHandle> ev) {
+ class TGetQuery : public TQuery {
+ public:
+ using TQuery::TQuery;
+
+ void Initiate() override {
+ EndWithError(NKikimrProto::ERROR, "not implemented");
+ }
+
+ void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr /*context*/, TResponse response) override {
+ (void)response;
+ Y_FAIL();
+ }
+ };
+
+ return new TGetQuery(*this, std::move(ev));
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/storage_patch.cpp b/ydb/core/blob_depot/agent/storage_patch.cpp
new file mode 100644
index 00000000000..bc6cdfcf911
--- /dev/null
+++ b/ydb/core/blob_depot/agent/storage_patch.cpp
@@ -0,0 +1,24 @@
+#include "agent_impl.h"
+
+namespace NKikimr::NBlobDepot {
+
+ template<>
+ TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvPatch>(std::unique_ptr<IEventHandle> ev) {
+ class TPatchQuery : public TQuery {
+ public:
+ using TQuery::TQuery;
+
+ void Initiate() override {
+ EndWithError(NKikimrProto::ERROR, "not implemented");
+ }
+
+ void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr /*context*/, TResponse response) override {
+ (void)response;
+ Y_FAIL();
+ }
+ };
+
+ return new TPatchQuery(*this, std::move(ev));
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/storage_put.cpp b/ydb/core/blob_depot/agent/storage_put.cpp
new file mode 100644
index 00000000000..8c7d4045300
--- /dev/null
+++ b/ydb/core/blob_depot/agent/storage_put.cpp
@@ -0,0 +1,47 @@
+#include "agent_impl.h"
+
+namespace NKikimr::NBlobDepot {
+
+ template<>
+ TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvPut>(std::unique_ptr<IEventHandle> ev) {
+ class TPutQuery : public TQuery {
+ ui32 BlockChecksRemain = 3;
+
+ public:
+ using TQuery::TQuery;
+
+ void Initiate() override {
+ auto& msg = *Event->Get<TEvBlobStorage::TEvPut>();
+
+ // first step -- check blocks
+ const auto status = Agent.CheckBlockForTablet(msg.Id.TabletID(), msg.Id.Generation(), this);
+ if (status == NKikimrProto::OK) {
+ IssuePuts();
+ } else if (status != NKikimrProto::UNKNOWN) {
+ EndWithError(status, "block race detected");
+ } else if (!--BlockChecksRemain) {
+ EndWithError(NKikimrProto::ERROR, "failed to acquire blocks");
+ }
+ }
+
+ void IssuePuts() {
+ }
+
+ void OnUpdateBlock(bool success) override {
+ if (success) {
+ Initiate(); // just restart request
+ } else {
+ EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected");
+ }
+ }
+
+ void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr /*context*/, TResponse response) override {
+ (void)response;
+ Y_FAIL();
+ }
+ };
+
+ return new TPutQuery(*this, std::move(ev));
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/storage_range.cpp b/ydb/core/blob_depot/agent/storage_range.cpp
new file mode 100644
index 00000000000..95049a2401b
--- /dev/null
+++ b/ydb/core/blob_depot/agent/storage_range.cpp
@@ -0,0 +1,24 @@
+#include "agent_impl.h"
+
+namespace NKikimr::NBlobDepot {
+
+ template<>
+ TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvRange>(std::unique_ptr<IEventHandle> ev) {
+ class TRangeQuery : public TQuery {
+ public:
+ using TQuery::TQuery;
+
+ void Initiate() override {
+ EndWithError(NKikimrProto::ERROR, "not implemented");
+ }
+
+ void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr /*context*/, TResponse response) override {
+ (void)response;
+ Y_FAIL();
+ }
+ };
+
+ return new TRangeQuery(*this, std::move(ev));
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/storage_status.cpp b/ydb/core/blob_depot/agent/storage_status.cpp
new file mode 100644
index 00000000000..8354f096982
--- /dev/null
+++ b/ydb/core/blob_depot/agent/storage_status.cpp
@@ -0,0 +1,25 @@
+#include "agent_impl.h"
+
+namespace NKikimr::NBlobDepot {
+
+ template<>
+ TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvStatus>(std::unique_ptr<IEventHandle> ev) {
+ class TStatusQuery : public TQuery {
+ public:
+ using TQuery::TQuery;
+
+ void Initiate() override {
+ EndWithSuccess(std::make_unique<TEvBlobStorage::TEvStatusResult>(NKikimrProto::OK,
+ Agent.GetStorageStatusFlags()));
+ }
+
+ void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr /*context*/, TResponse response) override {
+ (void)response;
+ Y_FAIL();
+ }
+ };
+
+ return new TStatusQuery(*this, std::move(ev));
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/blob_depot_agent.cpp b/ydb/core/blob_depot/blob_depot_agent.cpp
index a9d7aeabf6a..6c2b769d338 100644
--- a/ydb/core/blob_depot/blob_depot_agent.cpp
+++ b/ydb/core/blob_depot/blob_depot_agent.cpp
@@ -43,11 +43,11 @@ namespace NKikimr::NBlobDepot {
agent.ExpirationTimestamp = TInstant::Max();
OnAgentConnect(agent);
- auto response = std::make_unique<TEvBlobDepot::TEvRegisterAgentResult>();
- auto& record = response->Record;
- record.SetGeneration(Executor()->Generation());
+ auto [response, record] = TEvBlobDepot::MakeResponseFor(ev, SelfId());
+
+ record->SetGeneration(Executor()->Generation());
for (const auto& [k, v] : ChannelKinds) {
- auto *proto = record.AddChannelKinds();
+ auto *proto = record->AddChannelKinds();
proto->SetChannelKind(k);
for (const ui32 channel : v.IndexToChannel) {
auto *cg = proto->AddChannelGroups();
@@ -56,7 +56,7 @@ namespace NKikimr::NBlobDepot {
}
}
- SendResponseToAgent(*ev, std::move(response));
+ TActivationContext::Send(response.release());
}
void TBlobDepot::OnAgentConnect(TAgentInfo& agent) {
@@ -66,16 +66,18 @@ namespace NKikimr::NBlobDepot {
void TBlobDepot::Handle(TEvBlobDepot::TEvAllocateIds::TPtr ev) {
STLOG(PRI_DEBUG, BLOB_DEPOT, BD04, "TEvAllocateIds", (TabletId, TabletID()), (Msg, ev->Get()->Record),
(PipeServerId, ev->Recipient));
- auto response = std::make_unique<TEvBlobDepot::TEvAllocateIdsResult>(ev->Get()->Record.GetChannelKind(),
+
+ auto [response, record] = TEvBlobDepot::MakeResponseFor(ev, SelfId(), ev->Get()->Record.GetChannelKind(),
Executor()->Generation());
- auto& record = response->Record;
- if (const auto it = ChannelKinds.find(record.GetChannelKind()); it != ChannelKinds.end()) {
+
+ if (const auto it = ChannelKinds.find(record->GetChannelKind()); it != ChannelKinds.end()) {
auto& nextBlobSeqId = it->second.NextBlobSeqId;
- record.SetRangeBegin(nextBlobSeqId);
+ record->SetRangeBegin(nextBlobSeqId);
nextBlobSeqId += PreallocatedIdCount;
- record.SetRangeEnd(nextBlobSeqId);
+ record->SetRangeEnd(nextBlobSeqId);
}
- SendResponseToAgent(*ev, std::move(response));
+
+ TActivationContext::Send(response.release());
}
TBlobDepot::TAgentInfo& TBlobDepot::GetAgent(const TActorId& pipeServerId) {
@@ -88,14 +90,6 @@ namespace NKikimr::NBlobDepot {
return agentIt->second;
}
- void TBlobDepot::SendResponseToAgent(IEventHandle& request, std::unique_ptr<IEventBase> response) {
- auto handle = std::make_unique<IEventHandle>(request.Sender, SelfId(), response.release(), 0, request.Cookie);
- if (request.InterconnectSession) {
- handle->Rewrite(TEvInterconnect::EvForward, request.InterconnectSession);
- }
- TActivationContext::Send(handle.release());
- }
-
void TBlobDepot::InitChannelKinds() {
ui32 channel = 0;
for (const auto& profile : Config.GetChannelProfiles()) {
diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h
index 9e2bdb0a736..d7b14fed2ae 100644
--- a/ydb/core/blob_depot/blob_depot_tablet.h
+++ b/ydb/core/blob_depot/blob_depot_tablet.h
@@ -92,8 +92,6 @@ namespace NKikimr::NBlobDepot {
PassAway();
}
- void SendResponseToAgent(IEventHandle& request, std::unique_ptr<IEventBase> response);
-
void InitChannelKinds();
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/core/blob_depot/blocks.cpp b/ydb/core/blob_depot/blocks.cpp
index 1550255eb14..2e36788569e 100644
--- a/ydb/core/blob_depot/blocks.cpp
+++ b/ydb/core/blob_depot/blocks.cpp
@@ -75,22 +75,17 @@ namespace NKikimr::NBlobDepot {
}
void Handle(TEvBlobDepot::TEvBlock::TPtr ev) {
- auto event = std::make_unique<TEvBlobDepot::TEvBlockResult>(NKikimrProto::OK, std::nullopt);
- auto& responseRecord = event->Record;
- auto response = std::make_unique<IEventHandle>(ev->Sender, Self->SelfId(), event.release(), 0, ev->Cookie);
- if (ev->InterconnectSession) {
- response->Rewrite(TEvInterconnect::EvForward, ev->InterconnectSession);
- }
-
const auto& record = ev->Get()->Record;
+ auto [response, responseRecord] = TEvBlobDepot::MakeResponseFor(ev, Self->SelfId(), NKikimrProto::OK, std::nullopt);
+
if (!record.HasTabletId() || !record.HasBlockedGeneration()) {
- responseRecord.SetStatus(NKikimrProto::ERROR);
- responseRecord.SetErrorReason("incorrect protobuf");
+ responseRecord->SetStatus(NKikimrProto::ERROR);
+ responseRecord->SetErrorReason("incorrect protobuf");
} else {
const ui64 tabletId = record.GetTabletId();
const ui32 blockedGeneration = record.GetBlockedGeneration();
if (const auto it = Blocks.find(tabletId); it != Blocks.end() && blockedGeneration <= it->second) {
- responseRecord.SetStatus(NKikimrProto::ERROR);
+ responseRecord->SetStatus(NKikimrProto::ERROR);
} else {
TAgentInfo& agent = Self->GetAgent(ev->Recipient);
Self->Execute(std::make_unique<TTxUpdateBlock>(Self, tabletId, blockedGeneration,
@@ -102,7 +97,16 @@ namespace NKikimr::NBlobDepot {
}
void Handle(TEvBlobDepot::TEvQueryBlocks::TPtr ev) {
- (void)ev;
+ const auto& record = ev->Get()->Record;
+ auto [response, responseRecord] = TEvBlobDepot::MakeResponseFor(ev, Self->SelfId());
+ responseRecord->SetTimeToLiveMs(15000); // FIXME
+
+ for (const ui64 tabletId : record.GetTabletIds()) {
+ const auto it = Blocks.find(tabletId);
+ responseRecord->AddBlockedGenerations(it != Blocks.end() ? it->second : 0);
+ }
+
+ TActivationContext::Send(response.release());
}
};
diff --git a/ydb/core/blob_depot/defs.h b/ydb/core/blob_depot/defs.h
index 9d531864287..bdb31cf8618 100644
--- a/ydb/core/blob_depot/defs.h
+++ b/ydb/core/blob_depot/defs.h
@@ -5,6 +5,7 @@
#include <ydb/core/engine/minikql/flat_local_tx_factory.h>
#include <ydb/core/tablet_flat/tablet_flat_executed.h>
#include <ydb/core/tablet_flat/flat_cxx_database.h>
+#include <ydb/core/protos/blob_depot.pb.h>
#include <ydb/core/util/stlog.h>
#include <util/generic/va_args.h>
diff --git a/ydb/core/blob_depot/events.h b/ydb/core/blob_depot/events.h
index 57b8e21c729..b64f893c6cb 100644
--- a/ydb/core/blob_depot/events.h
+++ b/ydb/core/blob_depot/events.h
@@ -63,6 +63,28 @@ namespace NKikimr {
BLOBDEPOT_EVENT_PB_NO_ARGS(EvCommitBlobSeqResult);
BLOBDEPOT_EVENT_PB_NO_ARGS(EvResolve);
BLOBDEPOT_EVENT_PB(EvResolveResult, Status, ErrorReason);
+
+ template<typename TEvent>
+ struct TResponseFor {};
+
+ template<> struct TResponseFor<TEvApplyConfig> { using Type = TEvApplyConfigResult; };
+ template<> struct TResponseFor<TEvRegisterAgent> { using Type = TEvRegisterAgentResult; };
+ template<> struct TResponseFor<TEvAllocateIds> { using Type = TEvAllocateIdsResult; };
+ template<> struct TResponseFor<TEvBlock> { using Type = TEvBlockResult; };
+ template<> struct TResponseFor<TEvQueryBlocks> { using Type = TEvQueryBlocksResult; };
+ template<> struct TResponseFor<TEvCommitBlobSeq> { using Type = TEvCommitBlobSeqResult; };
+ template<> struct TResponseFor<TEvResolve> { using Type = TEvResolveResult; };
+
+ template<typename TRequestEvent, typename... TArgs>
+ static auto MakeResponseFor(TAutoPtr<TEventHandle<TRequestEvent>>& ev, TActorId selfId, TArgs&&... args) {
+ auto event = std::make_unique<typename TResponseFor<TRequestEvent>::Type>(std::forward<TArgs>(args)...);
+ auto *record = &event->Record;
+ auto handle = std::make_unique<IEventHandle>(ev->Sender, selfId, event.release(), 0, ev->Cookie);
+ if (ev->InterconnectSession) {
+ handle->Rewrite(TEvInterconnect::EvForward, ev->InterconnectSession);
+ }
+ return std::make_pair(std::move(handle), record);
+ }
};
} // NKikimr
diff --git a/ydb/core/blob_depot/types.h b/ydb/core/blob_depot/types.h
index caf347a6cea..cb475e6a4ff 100644
--- a/ydb/core/blob_depot/types.h
+++ b/ydb/core/blob_depot/types.h
@@ -11,6 +11,21 @@ namespace NKikimr::NBlobDepot {
std::vector<ui8> IndexToChannel;
};
+#pragma pack(push, 1)
+ struct TVirtualGroupBlobFooter {
+ TLogoBlobID StoredBlobId;
+ };
+
+#pragma pack(pop)
+
+ static constexpr ui32 MaxBlobSize = 10 << 20; // 10 MB BlobStorage hard limit
+
+ enum class EBlobType : ui32 {
+ VG_COMPOSITE_BLOB = 0, // data + footer
+ VG_DATA_BLOB = 1, // just data, footer aside
+ VG_FOOTER_BLOB = 2, // footer only
+ };
+
struct TCGSI {
static constexpr ui32 IndexBits = 20;
static constexpr ui32 MaxIndex = (1 << IndexBits) - 1;
@@ -37,6 +52,33 @@ namespace NKikimr::NBlobDepot {
.Index = static_cast<ui32>(res.quot) & MaxIndex
};
}
+
+ static TCGSI FromProto(const NKikimrBlobDepot::TBlobSeqId& proto) {
+ return TCGSI{
+ proto.GetChannel(),
+ proto.GetGeneration(),
+ proto.GetStep(),
+ proto.GetIndex()
+ };
+ }
+
+ TLogoBlobID MakeBlobId(ui64 tabletId, EBlobType type, ui32 part, ui32 size) const {
+ return TLogoBlobID(tabletId, Generation, Step, Channel, size, MakeCookie(type, part));
+ }
+
+ ui32 MakeCookie(EBlobType type, ui32 part) const {
+ switch (type) {
+ case EBlobType::VG_COMPOSITE_BLOB:
+ case EBlobType::VG_DATA_BLOB:
+ case EBlobType::VG_FOOTER_BLOB:
+ static constexpr ui32 typeBits = 24 - IndexBits;
+ Y_VERIFY(static_cast<ui32>(type) < (1 << typeBits));
+ Y_VERIFY(!part);
+ return Index << typeBits | static_cast<ui32>(type);
+ }
+
+ Y_FAIL();
+ }
};
enum class EKeepState : ui8 {
diff --git a/ydb/core/protos/blob_depot.proto b/ydb/core/protos/blob_depot.proto
index 81a56285d2a..e6f75da82db 100644
--- a/ydb/core/protos/blob_depot.proto
+++ b/ydb/core/protos/blob_depot.proto
@@ -15,6 +15,7 @@ message TBlobLocator {
optional TBlobSeqId BlobSeqId = 2;
optional uint32 Checksum = 3;
optional uint64 TotalDataLen = 4;
+ optional uint32 FooterLen = 5;
}
message TValueChain {