diff options
author | Alexander Rutkovsky <[email protected]> | 2022-06-23 12:52:25 +0300 |
---|---|---|
committer | Alexander Rutkovsky <[email protected]> | 2022-06-23 12:52:25 +0300 |
commit | aecdb821de0afc13c27eee8efaa50521021294c3 (patch) | |
tree | 530771e8183b9345e1e1e0483d18be2496fce9a7 | |
parent | 426b0be91fe907e8ebb94ad7fb397f620184d43e (diff) |
BlobDepot work in progress KIKIMR-14867
ref:0290e81dc459353c3d0ce554ade5fcda36af845c
32 files changed, 1124 insertions, 535 deletions
diff --git a/ydb/core/blob_depot/agent/CMakeLists.txt b/ydb/core/blob_depot/agent/CMakeLists.txt index 76f8566f83f..29aca02aa67 100644 --- a/ydb/core/blob_depot/agent/CMakeLists.txt +++ b/ydb/core/blob_depot/agent/CMakeLists.txt @@ -16,14 +16,18 @@ target_link_libraries(core-blob_depot-agent PUBLIC ) target_sources(core-blob_depot-agent PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_comm.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_storage_query.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_storage_put.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_storage_get.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_storage_block.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_storage_discover.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_storage_range.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_storage_collect_garbage.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_storage_status.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_storage_patch.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/blocks.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/comm.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/query.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/read.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/request.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/status.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/storage_put.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/storage_get.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/storage_block.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/storage_discover.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/storage_range.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/storage_collect_garbage.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/storage_status.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/storage_patch.cpp ) diff --git a/ydb/core/blob_depot/agent/agent_comm.cpp b/ydb/core/blob_depot/agent/agent_comm.cpp deleted file mode 100644 index cf884280e45..00000000000 --- a/ydb/core/blob_depot/agent/agent_comm.cpp +++ /dev/null @@ -1,194 +0,0 @@ -#include "agent_impl.h" - -namespace NKikimr::NBlobDepot { - - void TBlobDepotAgent::Handle(TEvTabletPipe::TEvClientConnected::TPtr ev) { - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDAC01, "TEvClientConnected", (VirtualGroupId, VirtualGroupId), - (Msg, ev->Get()->ToString())); - } - - void TBlobDepotAgent::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev) { - STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC02, "TEvClientDestroyed", (VirtualGroupId, VirtualGroupId), - (Msg, ev->Get()->ToString())); - PipeId = {}; - OnDisconnect(); - ConnectToBlobDepot(); - } - - void TBlobDepotAgent::ConnectToBlobDepot() { - PipeId = Register(NTabletPipe::CreateClient(SelfId(), TabletId, NTabletPipe::TClientRetryPolicy::WithRetries())); - const ui64 id = NextRequestId++; - NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobDepot::TEvRegisterAgent(VirtualGroupId), id); - RegisterRequest(id, nullptr, [this](IEventBase *ev) { - if (ev) { - auto& msg = *static_cast<TEvBlobDepot::TEvRegisterAgentResult*>(ev); - STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC06, "TEvRegisterAgentResult", (VirtualGroupId, VirtualGroupId), - (Msg, msg.Record)); - Registered = true; - BlobDepotGeneration = msg.Record.GetGeneration(); - for (const auto& kind : msg.Record.GetChannelKinds()) { - auto& v = ChannelKinds[kind.GetChannelKind()]; - v.ChannelGroups.clear(); - v.IndexToChannel.clear(); - for (const auto& channelGroup : kind.GetChannelGroups()) { - const ui8 channel = channelGroup.GetChannel(); - const ui32 groupId = channelGroup.GetGroupId(); - v.ChannelGroups.emplace_back(channel, groupId); - v.ChannelToIndex[channel] = v.IndexToChannel.size(); - v.IndexToChannel.push_back(channel); - } - } - } - return true; - }); - IssueAllocateIdsIfNeeded(NKikimrBlobDepot::TChannelKind::Data); - IssueAllocateIdsIfNeeded(NKikimrBlobDepot::TChannelKind::Log); - } - - void TBlobDepotAgent::IssueAllocateIdsIfNeeded(NKikimrBlobDepot::TChannelKind::E channelKind) { - auto& kind = ChannelKinds[channelKind]; - - STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC09, "IssueAllocateIdsIfNeeded", (VirtualGroupId, VirtualGroupId), - (ChannelKind, NKikimrBlobDepot::TChannelKind::E_Name(channelKind)), - (IdAllocInFlight, kind.IdAllocInFlight), (IdQ.size, kind.IdQ.size()), - (PreallocatedIdCount, kind.PreallocatedIdCount), (PipeId, PipeId)); - if (!kind.IdAllocInFlight && kind.IdQ.size() < kind.PreallocatedIdCount && PipeId) { - const ui64 id = NextRequestId++; - NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobDepot::TEvAllocateIds(channelKind), id); - kind.IdAllocInFlight = true; - - RegisterRequest(id, nullptr, [this, channelKind](IEventBase *ev) { - auto& kind = ChannelKinds[channelKind]; - - Y_VERIFY(kind.IdAllocInFlight); - kind.IdAllocInFlight = false; - - if (ev) { - auto& msg = *static_cast<TEvBlobDepot::TEvAllocateIdsResult*>(ev); - STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC07, "TEvAllocateIdsResult", (VirtualGroupId, VirtualGroupId), - (Msg, msg.Record)); - Y_VERIFY(msg.Record.GetChannelKind() == channelKind); - Y_VERIFY(msg.Record.GetGeneration() == BlobDepotGeneration); - - if (msg.Record.HasRangeBegin() && msg.Record.HasRangeEnd()) { - kind.IdQ.push_back({BlobDepotGeneration, msg.Record.GetRangeBegin(), msg.Record.GetRangeEnd()}); - - // FIXME notify waiting requests about new ids - - // ask for more ids if needed - IssueAllocateIdsIfNeeded(channelKind); - } else { - // no such channel allocated - } - } - - return true; // request complete, remove from queue - }); - } - } - - void TBlobDepotAgent::OnDisconnect() { - STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC04, "OnDisconnect", (VirtualGroupId, VirtualGroupId)); - - for (auto& [id, item] : std::exchange(RequestInFlight, {})) { - if (item.Sender) { - item.Sender->OnRequestComplete(id); - } - const bool done = item.Callback(nullptr); - Y_VERIFY(done); - } - - Registered = false; - } - - void TBlobDepotAgent::RegisterRequest(ui64 id, TRequestSender *sender, TRequestCompleteCallback callback) { - const auto [_, inserted] = RequestInFlight.emplace(id, TRequestInFlight{sender, std::move(callback)}); - Y_VERIFY(inserted); - if (sender) { - sender->RegisterRequest(id); - } - } - - template<typename TEvent> - void TBlobDepotAgent::HandleTabletResponse(TAutoPtr<TEventHandle<TEvent>> ev) { - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDAC05, "HandleTabletResponse", (VirtualGroupId, VirtualGroupId), - (Id, ev->Cookie), (Type, TypeName<TEvent>())); - if (const auto it = RequestInFlight.find(ev->Cookie); it != RequestInFlight.end()) { - auto& [id, item] = *it; - if (item.Sender) { - item.Sender->OnRequestComplete(id); - } - if (item.Callback(ev->Get())) { - RequestInFlight.erase(it); - } - } else { - Y_FAIL(); // don't know how this can happen without logic error - } - } - - template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvRegisterAgentResult::TPtr ev); - template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvAllocateIdsResult::TPtr ev); - template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvBlockResult::TPtr ev); - template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvQueryBlocksResult::TPtr ev); - template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvCommitBlobSeqResult::TPtr ev); - template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvResolveResult::TPtr ev); - - void TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvBlock msg, TRequestSender *sender, TRequestCompleteCallback callback) { - auto ev = std::make_unique<TEvBlobDepot::TEvBlock>(); - msg.Swap(&ev->Record); - Issue(std::move(ev), sender, std::move(callback)); - } - - void TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvResolve msg, TRequestSender *sender, TRequestCompleteCallback callback) { - auto ev = std::make_unique<TEvBlobDepot::TEvResolve>(); - msg.Swap(&ev->Record); - Issue(std::move(ev), sender, std::move(callback)); - } - - void TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvQueryBlocks msg, TRequestSender *sender, TRequestCompleteCallback callback) { - auto ev = std::make_unique<TEvBlobDepot::TEvQueryBlocks>(); - msg.Swap(&ev->Record); - Issue(std::move(ev), sender, std::move(callback)); - } - - void TBlobDepotAgent::Issue(std::unique_ptr<IEventBase> ev, TRequestSender *sender, TRequestCompleteCallback callback) { - const ui64 id = NextRequestId++; - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDAC03, "Issue", (VirtualGroupId, VirtualGroupId), (Id, id), (Msg, ev->ToString())); - NTabletPipe::SendData(SelfId(), PipeId, ev.release(), id); - RegisterRequest(id, sender, std::move(callback)); - } - - NKikimrProto::EReplyStatus TBlobDepotAgent::CheckBlockForTablet(ui64 tabletId, ui32 generation, TExecutingQuery *query) { - auto& block = Blocks[tabletId]; - const TMonotonic issueTime = TActivationContext::Monotonic(); - if (generation <= block.BlockedGeneration) { - return NKikimrProto::RACE; - } else if (issueTime< block.ExpirationTimestamp) { - return NKikimrProto::OK; - } else if (!block.RefreshInFlight) { - NKikimrBlobDepot::TEvQueryBlocks queryBlocks; - queryBlocks.AddTabletIds(tabletId); - Issue(std::move(queryBlocks), nullptr, [=](IEventBase *ev) { - if (ev) { - auto& msg = *static_cast<TEvBlobDepot::TEvQueryBlocksResult*>(ev); - const ui64 tabletId_ = tabletId; - STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC08, "TEvQueryBlocksResult", (VirtualGroupId, VirtualGroupId), - (Msg, msg.Record), (TabletId, tabletId_)); - Y_VERIFY(msg.Record.BlockedGenerationsSize() == 1); - auto& block = Blocks[tabletId]; - Y_VERIFY(block.BlockedGeneration < generation); - block.BlockedGeneration = msg.Record.GetBlockedGenerations(1); - block.ExpirationTimestamp = issueTime + TDuration::MilliSeconds(msg.Record.GetTimeToLiveMs()); - for (auto& query : block.PendingBlockChecks) { - query.OnUpdateBlock(); - } - } - return true; - }); - block.RefreshInFlight = true; - block.PendingBlockChecks.PushBack(query); - } - return NKikimrProto::NOTREADY; - } - -} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h index 1a27fbd921f..ff0f9848e55 100644 --- a/ydb/core/blob_depot/agent/agent_impl.h +++ b/ydb/core/blob_depot/agent/agent_impl.h @@ -15,7 +15,60 @@ namespace NKikimr::NBlobDepot { XX(EvPatch) \ // END - class TBlobDepotAgent : public TActor<TBlobDepotAgent> { + class TBlobDepotAgent; + + struct TRequestContext { + virtual ~TRequestContext() = default; + + template<typename T> + T& Obtain() { + T *sp = static_cast<T*>(this); + Y_VERIFY_DEBUG(sp == dynamic_cast<T*>(this)); + return *sp; + } + + using TPtr = std::shared_ptr<TRequestContext>; + }; + + struct TTabletDisconnected {}; + + class TRequestSender { + THashMap<ui64, TRequestContext::TPtr> RequestsInFlight; + + protected: + TBlobDepotAgent& Agent; + + public: + using TResponse = std::variant< + // internal events + TTabletDisconnected, + + // tablet responses + TEvBlobDepot::TEvRegisterAgentResult*, + TEvBlobDepot::TEvAllocateIdsResult*, + TEvBlobDepot::TEvBlockResult*, + TEvBlobDepot::TEvQueryBlocksResult*, + TEvBlobDepot::TEvCommitBlobSeqResult*, + TEvBlobDepot::TEvResolveResult*, + + // underlying DS proxy responses + TEvBlobStorage::TEvGetResult* + >; + + public: + TRequestSender(TBlobDepotAgent& agent); + virtual ~TRequestSender(); + void RegisterRequest(ui64 id, TRequestContext::TPtr context); + void OnRequestComplete(ui64 id, TResponse response); + + protected: + virtual void ProcessResponse(ui64 id, TRequestContext::TPtr context, TResponse response) = 0; + }; + + class TBlobDepotAgent + : public TActor<TBlobDepotAgent> + , public TRequestSender + { const ui32 VirtualGroupId; ui64 TabletId = Max<ui64>(); TActorId PipeId; @@ -23,7 +76,9 @@ namespace NKikimr::NBlobDepot { public: TBlobDepotAgent(ui32 virtualGroupId) : TActor(&TThis::StateFunc) + , TRequestSender(*this) , VirtualGroupId(virtualGroupId) + , BlocksManager(CreateBlocksManager()) { Y_VERIFY(TGroupID(VirtualGroupId).ConfigurationType() == EGroupConfigurationType::Virtual); } @@ -43,6 +98,8 @@ namespace NKikimr::NBlobDepot { hFunc(TEvBlobDepot::TEvCommitBlobSeqResult, HandleTabletResponse); hFunc(TEvBlobDepot::TEvResolveResult, HandleTabletResponse); + hFunc(TEvBlobStorage::TEvGetResult, Handle); + ENUMERATE_INCOMING_EVENTS(FORWARD_STORAGE_PROXY) ); #undef FORWARD_STORAGE_PROXY @@ -67,54 +124,32 @@ namespace NKikimr::NBlobDepot { } //////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // BlobDepot communications + // BlobDepot and other actor communications - using TRequestCompleteCallback = std::function<bool(IEventBase*)>; + ui64 NextRequestId = 1; + THashMap<ui64, TRequestSender*> TabletRequestInFlight; + THashMap<ui64, TRequestSender*> OtherRequestInFlight; - class TRequestSender { - THashSet<ui64> IdsInFlight; + void RegisterRequest(ui64 id, TRequestSender *sender, TRequestContext::TPtr context, bool toBlobDepotTablet); - protected: - TBlobDepotAgent& Agent; + template<typename TEvent> + void HandleTabletResponse(TAutoPtr<TEventHandle<TEvent>> ev); - public: - TRequestSender(TBlobDepotAgent& agent) - : Agent(agent) - {} + template<typename TEvent> + void HandleOtherResponse(TAutoPtr<TEventHandle<TEvent>> ev); - ~TRequestSender() { - for (const ui64 id : IdsInFlight) { - const size_t num = Agent.RequestInFlight.erase(id); - Y_VERIFY(num); - } - } + void HandleResponse(TAutoPtr<IEventHandle> ev, TRequestSender::TResponse response, THashMap<ui64, TRequestSender*>& map); - void RegisterRequest(ui64 id) { - const auto [_, inserted] = IdsInFlight.insert(id); - Y_VERIFY(inserted); - } + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// - void OnRequestComplete(ui64 id) { - const size_t num = IdsInFlight.erase(id); - Y_VERIFY(num); - } - }; + struct TAllocateIdsContext : TRequestContext { + NKikimrBlobDepot::TChannelKind::E ChannelKind; - struct TRequestInFlight { - TRequestSender *Sender; - TRequestCompleteCallback Callback; + TAllocateIdsContext(NKikimrBlobDepot::TChannelKind::E channelKind) + : ChannelKind(channelKind) + {} }; - ui64 NextRequestId = 1; - THashMap<ui64, TRequestInFlight> RequestInFlight; - - void RegisterRequest(ui64 id, TRequestSender *sender, TRequestCompleteCallback callback); - - template<typename TEvent> - void HandleTabletResponse(TAutoPtr<TEventHandle<TEvent>> ev); - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////// - bool Registered = false; ui32 BlobDepotGeneration = 0; @@ -123,11 +158,15 @@ namespace NKikimr::NBlobDepot { void ConnectToBlobDepot(); void OnDisconnect(); - void Issue(NKikimrBlobDepot::TEvBlock msg, TRequestSender *sender, TRequestCompleteCallback callback); - void Issue(NKikimrBlobDepot::TEvResolve msg, TRequestSender *sender, TRequestCompleteCallback callback); - void Issue(NKikimrBlobDepot::TEvQueryBlocks msg, TRequestSender *sender, TRequestCompleteCallback callback); + void ProcessResponse(ui64 id, TRequestContext::TPtr context, TResponse response) override; + void HandleRegisterAgentResult(TRequestContext::TPtr context, TEvBlobDepot::TEvRegisterAgentResult& msg); + void HandleAllocateIdsResult(TRequestContext::TPtr context, TEvBlobDepot::TEvAllocateIdsResult& msg); - void Issue(std::unique_ptr<IEventBase> ev, TRequestSender *sender, TRequestCompleteCallback callback); + void Issue(NKikimrBlobDepot::TEvBlock msg, TRequestSender *sender, TRequestContext::TPtr context); + void Issue(NKikimrBlobDepot::TEvResolve msg, TRequestSender *sender, TRequestContext::TPtr context); + void Issue(NKikimrBlobDepot::TEvQueryBlocks msg, TRequestSender *sender, TRequestContext::TPtr context); + + void Issue(std::unique_ptr<IEventBase> ev, TRequestSender *sender, TRequestContext::TPtr context); //////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -146,7 +185,7 @@ namespace NKikimr::NBlobDepot { std::deque<TAllocatedId> IdQ; static constexpr size_t PreallocatedIdCount = 2; - std::pair<TLogoBlobID, ui32> Allocate(TBlobDepotAgent& agent, ui32 size, ui32 type) { + std::pair<TLogoBlobID, ui32> Allocate(TBlobDepotAgent& agent, EBlobType type, ui32 part, ui32 size) { if (IdQ.empty()) { return {}; } @@ -156,10 +195,7 @@ namespace NKikimr::NBlobDepot { if (item.Begin == item.End) { IdQ.pop_front(); } - static constexpr ui32 typeBits = 24 - TCGSI::IndexBits; - Y_VERIFY(type < (1 << typeBits)); - const ui32 cookie = cgsi.Index << typeBits | type; - const TLogoBlobID id(agent.TabletId, cgsi.Generation, cgsi.Step, cgsi.Channel, size, cookie); + auto id = cgsi.MakeBlobId(agent.TabletId, type, part, size); const auto [channel, groupId] = ChannelGroups[ChannelToIndex[cgsi.Channel]]; Y_VERIFY_DEBUG(channel == cgsi.Channel); return {id, groupId}; @@ -175,23 +211,41 @@ namespace NKikimr::NBlobDepot { struct TExecutingQueries {}; struct TPendingBlockChecks {}; - class TExecutingQuery - : public TIntrusiveListItem<TExecutingQuery, TExecutingQueries> - , public TIntrusiveListItem<TExecutingQuery, TPendingBlockChecks> + class TQuery + : public TIntrusiveListItem<TQuery, TExecutingQueries> + , public TIntrusiveListItem<TQuery, TPendingBlockChecks> , public TRequestSender { + friend class TBlobDepotAgent; + + struct TReadContext { + TQuery *Query; + ui64 Tag; + TString Buffer; + ui64 Size; + NKikimrProto::EReplyStatus Status = NKikimrProto::OK; + THashMap<ui64, ui64> ReadsToOffset; + }; + std::list<TReadContext> Reads; + protected: std::unique_ptr<IEventHandle> Event; // original query event const ui64 QueryId; public: - TExecutingQuery(TBlobDepotAgent& agent, std::unique_ptr<IEventHandle> event) + TQuery(TBlobDepotAgent& agent, std::unique_ptr<IEventHandle> event) : TRequestSender(agent) , Event(std::move(event)) , QueryId(RandomNumber<ui64>()) {} - virtual ~TExecutingQuery() = default; + virtual ~TQuery() { + for (const auto& read : Reads) { + for (const auto& [id, _] : read.ReadsToOffset) { + Agent.ReadsInFlight.erase(id); + } + } + } void EndWithError(NKikimrProto::EReplyStatus status, const TString& errorReason); void EndWithSuccess(std::unique_ptr<IEventBase> response); @@ -199,34 +253,55 @@ namespace NKikimr::NBlobDepot { ui64 GetQueryId() const { return QueryId; } virtual void Initiate() = 0; - virtual void OnUpdateBlock() {} + virtual void OnUpdateBlock(bool /*success*/) {} + virtual void OnRead(ui64 /*tag*/, NKikimrProto::EReplyStatus /*status*/, TString /*dataOrErrorReason*/) {} public: struct TDeleter { - static void Destroy(TExecutingQuery *query) { delete query; } + static void Destroy(TQuery *query) { delete query; } }; }; std::deque<std::unique_ptr<IEventHandle>> PendingEventQ; - TIntrusiveListWithAutoDelete<TExecutingQuery, TExecutingQuery::TDeleter, TExecutingQueries> ExecutingQueries; + TIntrusiveListWithAutoDelete<TQuery, TQuery::TDeleter, TExecutingQueries> ExecutingQueries; void HandleStorageProxy(TAutoPtr<IEventHandle> ev); - TExecutingQuery *CreateExecutingQuery(TAutoPtr<IEventHandle> ev); - template<ui32 EventType> TExecutingQuery *CreateExecutingQuery(std::unique_ptr<IEventHandle> ev); + TQuery *CreateQuery(TAutoPtr<IEventHandle> ev); + template<ui32 EventType> TQuery *CreateQuery(std::unique_ptr<IEventHandle> ev); //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Blocks - struct TBlockInfo { - ui32 BlockedGeneration; - TMonotonic ExpirationTimestamp; // not valid after - bool RefreshInFlight = false; - TIntrusiveList<TExecutingQuery, TPendingBlockChecks> PendingBlockChecks; - }; + class TBlocksManager; + struct TBlocksManagerDeleter { void operator ()(TBlocksManager*) const; }; + using TBlocksManagerPtr = std::unique_ptr<TBlocksManager, TBlocksManagerDeleter>; + TBlocksManagerPtr BlocksManager; + + TBlocksManagerPtr CreateBlocksManager(); + + NKikimrProto::EReplyStatus CheckBlockForTablet(ui64 tabletId, ui32 generation, TQuery *query, + ui32 *blockedGeneration = nullptr); + + ui32 GetBlockForTablet(ui64 tabletId); + + void SetBlockForTablet(ui64 tabletId, ui32 blockedGeneration, TMonotonic expirationTimestamp); - std::unordered_map<ui64, TBlockInfo> Blocks; + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Reading + + ui64 NextReadId = 1; + THashMap<ui64, std::list<TQuery::TReadContext>::iterator> ReadsInFlight; + + bool IssueRead(const NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TValueChain>& values, ui64 offset, ui64 size, + NKikimrBlobStorage::EGetHandleClass getHandleClass, bool mustRestoreFirst, TQuery *query, ui64 tag, + bool vg, TString *error); + + void Handle(TEvBlobStorage::TEvGetResult::TPtr ev); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Status flags - NKikimrProto::EReplyStatus CheckBlockForTablet(ui64 tabletId, ui32 generation, TExecutingQuery *query); + TStorageStatusFlags GetStorageStatusFlags() const; }; } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/agent_storage_block.cpp b/ydb/core/blob_depot/agent/agent_storage_block.cpp deleted file mode 100644 index 1d90be19463..00000000000 --- a/ydb/core/blob_depot/agent/agent_storage_block.cpp +++ /dev/null @@ -1,55 +0,0 @@ -#include "agent_impl.h" - -namespace NKikimr::NBlobDepot { - - template<> - TBlobDepotAgent::TExecutingQuery *TBlobDepotAgent::CreateExecutingQuery<TEvBlobStorage::EvBlock>(std::unique_ptr<IEventHandle> ev) { - class TBlockExecutingQuery : public TExecutingQuery { - public: - using TExecutingQuery::TExecutingQuery; - - void Initiate() override { - auto& msg = *Event->Get<TEvBlobStorage::TEvBlock>(); - - // lookup existing blocks to try fail-fast - if (const auto it = Agent.Blocks.find(msg.TabletId); it != Agent.Blocks.end()) { - const TBlockInfo& block = it->second; - if (msg.Generation <= block.BlockedGeneration) { - // we don't consider ExpirationTimestamp here because blocked generation may only increase - return EndWithError(NKikimrProto::RACE, "block race detected"); - } - } - - // issue request to the tablet - NKikimrBlobDepot::TEvBlock block; - block.SetTabletId(msg.TabletId); - block.SetBlockedGeneration(msg.Generation); - Agent.Issue(std::move(block), this, std::bind(&TBlockExecutingQuery::HandleBlockResult, this, - std::placeholders::_1, TActivationContext::Monotonic())); - } - - bool HandleBlockResult(IEventBase *event, TMonotonic issueTimestamp) { - if (!event) { - EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected"); - } else if (const auto& msg = static_cast<TEvBlobDepot::TEvBlockResult&>(*event); !msg.Record.HasStatus()) { - EndWithError(NKikimrProto::ERROR, "incorrect TEvBlockResult response"); - } else if (const auto status = msg.Record.GetStatus(); status != NKikimrProto::OK) { - EndWithError(status, msg.Record.GetErrorReason()); - } else { - // update blocks cache - auto& query = *Event->Get<TEvBlobStorage::TEvBlock>(); - auto& block = Agent.Blocks[query.TabletId]; - Y_VERIFY(block.BlockedGeneration <= query.Generation); // TODO: QueryBlocks can race with Block? - block.BlockedGeneration = query.Generation; - block.ExpirationTimestamp = issueTimestamp + TDuration::MilliSeconds(msg.Record.GetTimeToLiveMs()); - - EndWithSuccess(std::make_unique<TEvBlobStorage::TEvBlockResult>(NKikimrProto::OK)); - } - return true; - } - }; - - return new TBlockExecutingQuery(*this, std::move(ev)); - } - -} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/agent_storage_collect_garbage.cpp b/ydb/core/blob_depot/agent/agent_storage_collect_garbage.cpp deleted file mode 100644 index 8a411246a52..00000000000 --- a/ydb/core/blob_depot/agent/agent_storage_collect_garbage.cpp +++ /dev/null @@ -1,10 +0,0 @@ -#include "agent_impl.h" - -namespace NKikimr::NBlobDepot { - - template<> - TBlobDepotAgent::TExecutingQuery *TBlobDepotAgent::CreateExecutingQuery<TEvBlobStorage::EvCollectGarbage>(std::unique_ptr<IEventHandle> ev) { - return (void)ev, nullptr; - } - -} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/agent_storage_discover.cpp b/ydb/core/blob_depot/agent/agent_storage_discover.cpp deleted file mode 100644 index 6cce45058a6..00000000000 --- a/ydb/core/blob_depot/agent/agent_storage_discover.cpp +++ /dev/null @@ -1,76 +0,0 @@ -#include "agent_impl.h" - -namespace NKikimr::NBlobDepot { - - template<> - TBlobDepotAgent::TExecutingQuery *TBlobDepotAgent::CreateExecutingQuery<TEvBlobStorage::EvDiscover>(std::unique_ptr<IEventHandle> ev) { - class TDiscoverExecutingQuery : public TExecutingQuery { - public: - using TExecutingQuery::TExecutingQuery; - - void Initiate() override { - auto& msg = *Event->Get<TEvBlobStorage::TEvDiscover>(); - - const TLogoBlobID from(msg.TabletId, Max<ui32>(), Max<ui32>(), 0, TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie); - const TLogoBlobID to(msg.TabletId, 0, 0, 0, 0, 0); - - NKikimrBlobDepot::TEvResolve resolve; - auto *item = resolve.AddItems(); - item->SetBeginningKey(from.GetRaw(), 3 * sizeof(ui64)); - item->SetIncludeBeginning(true); - item->SetEndingKey(to.GetRaw(), 3 * sizeof(ui64)); - item->SetIncludeEnding(true); - item->SetMaxKeys(1); - item->SetReverse(true); - - Agent.Issue(std::move(resolve), this, std::bind(&TDiscoverExecutingQuery::HandleResolveResult, - this, std::placeholders::_1)); - - if (msg.DiscoverBlockedGeneration) { - NKikimrBlobDepot::TEvQueryBlocks queryBlocks; - queryBlocks.AddTabletIds(msg.TabletId); - Agent.Issue(std::move(queryBlocks), this, std::bind( - &TDiscoverExecutingQuery::HandleQueryBlocksResult, this, - std::placeholders::_1, TActivationContext::Now())); - } - } - - bool HandleResolveResult(IEventBase *result) { - if (!result) { // server has disconnected before request got processed - EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected"); - return true; - } - - auto& msg = static_cast<TEvBlobDepot::TEvResolveResult&>(*result); - - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDASD01, "HandleResolveResult", (VirtualGroupId, Agent.VirtualGroupId), - (QueryId, QueryId), (Msg, msg.Record)); - - const NKikimrProto::EReplyStatus status = msg.Record.GetStatus(); - if (status != NKikimrProto::OK && status != NKikimrProto::OVERRUN) { - EndWithError(status, msg.Record.GetErrorReason()); - return true; - } - - return status != NKikimrProto::OVERRUN; - } - - bool HandleQueryBlocksResult(IEventBase *result, TInstant /*issueTimestamp*/) { - if (!result) { - EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected"); - return true; - } - - auto& msg = static_cast<TEvBlobDepot::TEvQueryBlocksResult&>(*result); - - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDASD02, "HandleQueryBlocksResult", (VirtualGroupId, Agent.VirtualGroupId), - (QueryId, QueryId), (Msg, msg.Record)); - - return true; - } - }; - - return new TDiscoverExecutingQuery(*this, std::move(ev)); - } - -} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/agent_storage_get.cpp b/ydb/core/blob_depot/agent/agent_storage_get.cpp deleted file mode 100644 index 55d4e166904..00000000000 --- a/ydb/core/blob_depot/agent/agent_storage_get.cpp +++ /dev/null @@ -1,10 +0,0 @@ -#include "agent_impl.h" - -namespace NKikimr::NBlobDepot { - - template<> - TBlobDepotAgent::TExecutingQuery *TBlobDepotAgent::CreateExecutingQuery<TEvBlobStorage::EvGet>(std::unique_ptr<IEventHandle> ev) { - return (void)ev, nullptr; - } - -} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/agent_storage_patch.cpp b/ydb/core/blob_depot/agent/agent_storage_patch.cpp deleted file mode 100644 index 92bc96c9d01..00000000000 --- a/ydb/core/blob_depot/agent/agent_storage_patch.cpp +++ /dev/null @@ -1,10 +0,0 @@ -#include "agent_impl.h" - -namespace NKikimr::NBlobDepot { - - template<> - TBlobDepotAgent::TExecutingQuery *TBlobDepotAgent::CreateExecutingQuery<TEvBlobStorage::EvPatch>(std::unique_ptr<IEventHandle> ev) { - return (void)ev, nullptr; - } - -} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/agent_storage_put.cpp b/ydb/core/blob_depot/agent/agent_storage_put.cpp deleted file mode 100644 index 9628abdf27b..00000000000 --- a/ydb/core/blob_depot/agent/agent_storage_put.cpp +++ /dev/null @@ -1,46 +0,0 @@ -#include "agent_impl.h" - -namespace NKikimr::NBlobDepot { - - template<> - TBlobDepotAgent::TExecutingQuery *TBlobDepotAgent::CreateExecutingQuery<TEvBlobStorage::EvPut>(std::unique_ptr<IEventHandle> ev) { - class TPutExecutingQuery : public TExecutingQuery { - ui32 BlockChecksRemain = 3; - - public: - using TExecutingQuery::TExecutingQuery; - - void Initiate() override { - auto& msg = *Event->Get<TEvBlobStorage::TEvPut>(); - - // first step -- check blocks - switch (Agent.CheckBlockForTablet(msg.Id.TabletID(), msg.Id.Generation(), this)) { - case NKikimrProto::OK: - return IssuePuts(); - - case NKikimrProto::RACE: - return EndWithError(NKikimrProto::RACE, "block race detected"); - - case NKikimrProto::NOTREADY: - if (!--BlockChecksRemain) { - EndWithError(NKikimrProto::ERROR, "failed to acquire blocks"); - } - return; - - default: - Y_FAIL(); - } - } - - void IssuePuts() { - } - - void OnUpdateBlock() override { - Initiate(); // just restart request - } - }; - - return new TPutExecutingQuery(*this, std::move(ev)); - } - -} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/agent_storage_range.cpp b/ydb/core/blob_depot/agent/agent_storage_range.cpp deleted file mode 100644 index d4dec509ee0..00000000000 --- a/ydb/core/blob_depot/agent/agent_storage_range.cpp +++ /dev/null @@ -1,10 +0,0 @@ -#include "agent_impl.h" - -namespace NKikimr::NBlobDepot { - - template<> - TBlobDepotAgent::TExecutingQuery *TBlobDepotAgent::CreateExecutingQuery<TEvBlobStorage::EvRange>(std::unique_ptr<IEventHandle> ev) { - return (void)ev, nullptr; - } - -} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/agent_storage_status.cpp b/ydb/core/blob_depot/agent/agent_storage_status.cpp deleted file mode 100644 index 36dcdf209bf..00000000000 --- a/ydb/core/blob_depot/agent/agent_storage_status.cpp +++ /dev/null @@ -1,10 +0,0 @@ -#include "agent_impl.h" - -namespace NKikimr::NBlobDepot { - - template<> - TBlobDepotAgent::TExecutingQuery *TBlobDepotAgent::CreateExecutingQuery<TEvBlobStorage::EvStatus>(std::unique_ptr<IEventHandle> ev) { - return (void)ev, nullptr; - } - -} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/blocks.cpp b/ydb/core/blob_depot/agent/blocks.cpp new file mode 100644 index 00000000000..86b49bc342f --- /dev/null +++ b/ydb/core/blob_depot/agent/blocks.cpp @@ -0,0 +1,125 @@ +#include "agent_impl.h" + +namespace NKikimr::NBlobDepot { + + class TBlobDepotAgent::TBlocksManager + : public TRequestSender + { + struct TBlockInfo { + ui32 BlockedGeneration; + TMonotonic ExpirationTimestamp; // not valid after + bool RefreshInFlight = false; + TIntrusiveList<TQuery, TPendingBlockChecks> PendingBlockChecks; + }; + + THashMap<ui64, TBlockInfo> Blocks; + + public: + TBlocksManager(TBlobDepotAgent& agent) + : TRequestSender(agent) + {} + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + struct TQueryBlockContext : TRequestContext { + TMonotonic Timestamp; + ui64 TabletId; + + TQueryBlockContext(TMonotonic timestamp, ui64 tabletId) + : Timestamp(timestamp) + , TabletId(tabletId) + {} + }; + + NKikimrProto::EReplyStatus CheckBlockForTablet(ui64 tabletId, ui32 generation, TQuery *query, + ui32 *blockedGeneration) { + auto& block = Blocks[tabletId]; + const TMonotonic issueTime = TActivationContext::Monotonic(); + if (generation <= block.BlockedGeneration) { + return NKikimrProto::RACE; + } else if (issueTime < block.ExpirationTimestamp) { + if (blockedGeneration) { + *blockedGeneration = block.BlockedGeneration; + } + return NKikimrProto::OK; + } else if (!block.RefreshInFlight) { + NKikimrBlobDepot::TEvQueryBlocks queryBlocks; + queryBlocks.AddTabletIds(tabletId); + Agent.Issue(std::move(queryBlocks), this, std::make_shared<TQueryBlockContext>( + TActivationContext::Monotonic(), tabletId)); + block.RefreshInFlight = true; + block.PendingBlockChecks.PushBack(query); + } + return NKikimrProto::UNKNOWN; + } + + void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr context, TResponse response) override { + auto& queryBlockContext = context->Obtain<TQueryBlockContext>(); + auto& block = Blocks[queryBlockContext.TabletId]; + + if (auto *p = std::get_if<TEvBlobDepot::TEvQueryBlocksResult*>(&response)) { + auto& msg = **p; + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC08, "TEvQueryBlocksResult", (VirtualGroupId, Agent.VirtualGroupId), + (Msg, msg.Record), (TabletId, queryBlockContext.TabletId)); + Y_VERIFY(msg.Record.BlockedGenerationsSize() == 1); + const ui32 newBlockedGeneration = msg.Record.GetBlockedGenerations(0); + Y_VERIFY(block.BlockedGeneration <= newBlockedGeneration); + block.BlockedGeneration = newBlockedGeneration; + block.ExpirationTimestamp = queryBlockContext.Timestamp + TDuration::MilliSeconds(msg.Record.GetTimeToLiveMs()); + } else { + Y_VERIFY(std::holds_alternative<TTabletDisconnected>(response)); + } + + TIntrusiveList<TQuery, TPendingBlockChecks> temp; + temp.Swap(block.PendingBlockChecks); + for (auto it = temp.begin(); it != temp.end(); ) { + const auto current = it++; + current->OnUpdateBlock(!std::holds_alternative<TTabletDisconnected>(response)); + } + } + + ui32 GetBlockForTablet(ui64 tabletId) { + const auto it = Blocks.find(tabletId); + return it != Blocks.end() ? it->second.BlockedGeneration : 0; + } + + void SetBlockForTablet(ui64 tabletId, ui32 blockedGeneration, TMonotonic expirationTimestamp) { + auto& block = Blocks[tabletId]; + Y_VERIFY(block.BlockedGeneration <= blockedGeneration); + if (block.BlockedGeneration < blockedGeneration) { + block.BlockedGeneration = blockedGeneration; + block.ExpirationTimestamp = expirationTimestamp; + } else if (block.ExpirationTimestamp < expirationTimestamp) { + block.ExpirationTimestamp = expirationTimestamp; + } + } + }; + + void TBlobDepotAgent::TBlocksManagerDeleter::operator ()(TBlocksManager *object) const { + delete object; + } + + TBlobDepotAgent::TBlocksManagerPtr TBlobDepotAgent::CreateBlocksManager() { + return TBlocksManagerPtr(new TBlocksManager(*this)); + } + + void TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvQueryBlocks msg, TRequestSender *sender, TRequestContext::TPtr context) { + auto ev = std::make_unique<TEvBlobDepot::TEvQueryBlocks>(); + msg.Swap(&ev->Record); + Issue(std::move(ev), sender, std::move(context)); + } + + NKikimrProto::EReplyStatus TBlobDepotAgent::CheckBlockForTablet(ui64 tabletId, ui32 generation, TQuery *query, + ui32 *blockedGeneration) { + return BlocksManager->CheckBlockForTablet(tabletId, generation, query, blockedGeneration); + } + + ui32 TBlobDepotAgent::GetBlockForTablet(ui64 tabletId) { + return BlocksManager->GetBlockForTablet(tabletId); + } + + void TBlobDepotAgent::SetBlockForTablet(ui64 tabletId, ui32 blockedGeneration, TMonotonic expirationTimestamp) { + BlocksManager->SetBlockForTablet(tabletId, blockedGeneration, expirationTimestamp); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/comm.cpp b/ydb/core/blob_depot/agent/comm.cpp new file mode 100644 index 00000000000..480504db430 --- /dev/null +++ b/ydb/core/blob_depot/agent/comm.cpp @@ -0,0 +1,131 @@ +#include "agent_impl.h" + +namespace NKikimr::NBlobDepot { + + void TBlobDepotAgent::Handle(TEvTabletPipe::TEvClientConnected::TPtr ev) { + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDAC01, "TEvClientConnected", (VirtualGroupId, VirtualGroupId), + (Msg, ev->Get()->ToString())); + } + + void TBlobDepotAgent::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev) { + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC02, "TEvClientDestroyed", (VirtualGroupId, VirtualGroupId), + (Msg, ev->Get()->ToString())); + PipeId = {}; + OnDisconnect(); + ConnectToBlobDepot(); + } + + void TBlobDepotAgent::ConnectToBlobDepot() { + PipeId = Register(NTabletPipe::CreateClient(SelfId(), TabletId, NTabletPipe::TClientRetryPolicy::WithRetries())); + const ui64 id = NextRequestId++; + NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobDepot::TEvRegisterAgent(VirtualGroupId), id); + RegisterRequest(id, this, nullptr, true); + IssueAllocateIdsIfNeeded(NKikimrBlobDepot::TChannelKind::Data); + IssueAllocateIdsIfNeeded(NKikimrBlobDepot::TChannelKind::Log); + } + + void TBlobDepotAgent::HandleRegisterAgentResult(TRequestContext::TPtr /*context*/, TEvBlobDepot::TEvRegisterAgentResult& msg) { + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC06, "TEvRegisterAgentResult", (VirtualGroupId, VirtualGroupId), + (Msg, msg.Record)); + Registered = true; + BlobDepotGeneration = msg.Record.GetGeneration(); + for (const auto& kind : msg.Record.GetChannelKinds()) { + auto& v = ChannelKinds[kind.GetChannelKind()]; + v.ChannelGroups.clear(); + v.IndexToChannel.clear(); + for (const auto& channelGroup : kind.GetChannelGroups()) { + const ui8 channel = channelGroup.GetChannel(); + const ui32 groupId = channelGroup.GetGroupId(); + v.ChannelGroups.emplace_back(channel, groupId); + v.ChannelToIndex[channel] = v.IndexToChannel.size(); + v.IndexToChannel.push_back(channel); + } + } + } + + void TBlobDepotAgent::IssueAllocateIdsIfNeeded(NKikimrBlobDepot::TChannelKind::E channelKind) { + auto& kind = ChannelKinds[channelKind]; + + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC09, "IssueAllocateIdsIfNeeded", (VirtualGroupId, VirtualGroupId), + (ChannelKind, NKikimrBlobDepot::TChannelKind::E_Name(channelKind)), + (IdAllocInFlight, kind.IdAllocInFlight), (IdQ.size, kind.IdQ.size()), + (PreallocatedIdCount, kind.PreallocatedIdCount), (PipeId, PipeId)); + if (!kind.IdAllocInFlight && kind.IdQ.size() < kind.PreallocatedIdCount && PipeId) { + const ui64 id = NextRequestId++; + NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobDepot::TEvAllocateIds(channelKind), id); + RegisterRequest(id, this, std::make_shared<TAllocateIdsContext>(channelKind), true); + kind.IdAllocInFlight = true; + } + } + + void TBlobDepotAgent::HandleAllocateIdsResult(TRequestContext::TPtr context, TEvBlobDepot::TEvAllocateIdsResult& msg) { + auto& allocateIdsContext = context->Obtain<TAllocateIdsContext>(); + auto& kind = ChannelKinds[allocateIdsContext.ChannelKind]; + + Y_VERIFY(kind.IdAllocInFlight); + kind.IdAllocInFlight = false; + + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC07, "TEvAllocateIdsResult", (VirtualGroupId, VirtualGroupId), + (Msg, msg.Record)); + Y_VERIFY(msg.Record.GetChannelKind() == allocateIdsContext.ChannelKind); + Y_VERIFY(msg.Record.GetGeneration() == BlobDepotGeneration); + + if (msg.Record.HasRangeBegin() && msg.Record.HasRangeEnd()) { + kind.IdQ.push_back({BlobDepotGeneration, msg.Record.GetRangeBegin(), msg.Record.GetRangeEnd()}); + + // FIXME notify waiting requests about new ids + + // ask for more ids if needed + IssueAllocateIdsIfNeeded(allocateIdsContext.ChannelKind); + } else { + // no such channel allocated + } + } + + void TBlobDepotAgent::OnDisconnect() { + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC04, "OnDisconnect", (VirtualGroupId, VirtualGroupId)); + + for (auto& [id, sender] : std::exchange(TabletRequestInFlight, {})) { + sender->OnRequestComplete(id, TTabletDisconnected{}); + } + + for (auto& [_, kind] : ChannelKinds) { + kind.IdAllocInFlight = false; + } + + Registered = false; + } + + void TBlobDepotAgent::ProcessResponse(ui64 /*id*/, TRequestContext::TPtr context, TResponse response) { + std::visit([&](auto&& item) { + using T = std::decay_t<decltype(item)>; + if constexpr (std::is_same_v<T, TEvBlobDepot::TEvRegisterAgentResult*>) { + HandleRegisterAgentResult(std::move(context), *item); + } else if constexpr (std::is_same_v<T, TEvBlobDepot::TEvAllocateIdsResult*>) { + HandleAllocateIdsResult(std::move(context), *item); + } else if constexpr (!std::is_same_v<T, TTabletDisconnected>) { + Y_FAIL(); + } + }, response); + } + + void TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvBlock msg, TRequestSender *sender, TRequestContext::TPtr context) { + auto ev = std::make_unique<TEvBlobDepot::TEvBlock>(); + msg.Swap(&ev->Record); + Issue(std::move(ev), sender, std::move(context)); + } + + void TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvResolve msg, TRequestSender *sender, TRequestContext::TPtr context) { + auto ev = std::make_unique<TEvBlobDepot::TEvResolve>(); + msg.Swap(&ev->Record); + Issue(std::move(ev), sender, std::move(context)); + } + + void TBlobDepotAgent::Issue(std::unique_ptr<IEventBase> ev, TRequestSender *sender, TRequestContext::TPtr context) { + const ui64 id = NextRequestId++; + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDAC03, "Issue", (VirtualGroupId, VirtualGroupId), (Id, id), (Msg, ev->ToString())); + NTabletPipe::SendData(SelfId(), PipeId, ev.release(), id); + RegisterRequest(id, sender, std::move(context), true); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/agent_storage_query.cpp b/ydb/core/blob_depot/agent/query.cpp index a0dc670d536..533a793077e 100644 --- a/ydb/core/blob_depot/agent/agent_storage_query.cpp +++ b/ydb/core/blob_depot/agent/query.cpp @@ -7,7 +7,7 @@ namespace NKikimr::NBlobDepot { // TODO: memory usage control PendingEventQ.emplace_back(ev.Release()); } else { - auto *query = CreateExecutingQuery(ev); + auto *query = CreateQuery(ev); STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA01, "new query", (VirtualGroupId, VirtualGroupId), (QueryId, query->GetQueryId()), (Name, query->GetName())); if (!TabletId) { @@ -18,10 +18,10 @@ namespace NKikimr::NBlobDepot { } } - TBlobDepotAgent::TExecutingQuery *TBlobDepotAgent::CreateExecutingQuery(TAutoPtr<IEventHandle> ev) { + TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery(TAutoPtr<IEventHandle> ev) { switch (ev->GetTypeRewrite()) { #define XX(TYPE) \ - case TEvBlobStorage::TYPE: return CreateExecutingQuery<TEvBlobStorage::TYPE>(std::unique_ptr<IEventHandle>(ev.Release())); + case TEvBlobStorage::TYPE: return CreateQuery<TEvBlobStorage::TYPE>(std::unique_ptr<IEventHandle>(ev.Release())); ENUMERATE_INCOMING_EVENTS(XX) #undef XX @@ -29,7 +29,7 @@ namespace NKikimr::NBlobDepot { Y_FAIL(); } - void TBlobDepotAgent::TExecutingQuery::EndWithError(NKikimrProto::EReplyStatus status, const TString& errorReason) { + void TBlobDepotAgent::TQuery::EndWithError(NKikimrProto::EReplyStatus status, const TString& errorReason) { STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA02, "query ends with error", (VirtualGroupId, Agent.VirtualGroupId), (QueryId, QueryId), (Status, status), (ErrorReason, errorReason)); @@ -48,14 +48,14 @@ namespace NKikimr::NBlobDepot { delete this; } - void TBlobDepotAgent::TExecutingQuery::EndWithSuccess(std::unique_ptr<IEventBase> response) { + void TBlobDepotAgent::TQuery::EndWithSuccess(std::unique_ptr<IEventBase> response) { STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA02, "query ends with success", (VirtualGroupId, Agent.VirtualGroupId), (QueryId, QueryId), (Response, response->ToString())); Agent.SelfId().Send(Event->Sender, response.release(), 0, Event->Cookie); delete this; } - TString TBlobDepotAgent::TExecutingQuery::GetName() const { + TString TBlobDepotAgent::TQuery::GetName() const { switch (Event->GetTypeRewrite()) { #define XX(TYPE) case TEvBlobStorage::TYPE: return #TYPE; ENUMERATE_INCOMING_EVENTS(XX) diff --git a/ydb/core/blob_depot/agent/read.cpp b/ydb/core/blob_depot/agent/read.cpp new file mode 100644 index 00000000000..53e7f0cef24 --- /dev/null +++ b/ydb/core/blob_depot/agent/read.cpp @@ -0,0 +1,128 @@ +#include "agent_impl.h" + +namespace NKikimr::NBlobDepot { + + bool TBlobDepotAgent::IssueRead(const NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TValueChain>& values, ui64 offset, + ui64 size, NKikimrBlobStorage::EGetHandleClass getHandleClass, bool mustRestoreFirst, TQuery *query, + ui64 tag, bool vg, TString *error) { + ui64 outputOffset = 0; + + struct TReadItem { + ui32 GroupId; + TLogoBlobID Id; + ui32 Offset; + ui32 Size; + ui64 OutputOffset; + }; + std::vector<TReadItem> items; + + for (const auto& value : values) { + if (!value.HasLocator()) { + *error = "TValueChain.Locator is missing"; + return false; + } + const auto& locator = value.GetLocator(); + const ui64 totalDataLen = locator.GetTotalDataLen(); + if (!totalDataLen) { + *error = "TBlobLocator.TotalDataLen is missing or zero"; + return false; + } + const ui64 begin = value.GetSubrangeBegin(); + const ui64 end = value.HasSubrangeEnd() ? value.GetSubrangeEnd() : totalDataLen; + if (end <= begin || totalDataLen < end) { + *error = "incorrect SubrangeBegin/SubrangeEnd pair"; + return false; + } + + const ui64 partLen = end - begin; + if (offset >= partLen) { + // just skip this part + offset -= partLen; + continue; + } + + const ui64 partSize = Min(size ? size : Max<ui64>(), partLen - offset); + + auto cgsi = TCGSI::FromProto(locator.GetBlobSeqId()); + + if (vg) { + const bool composite = totalDataLen + sizeof(TVirtualGroupBlobFooter) <= MaxBlobSize; + const EBlobType type = composite ? EBlobType::VG_COMPOSITE_BLOB : EBlobType::VG_DATA_BLOB; + const ui32 blobSize = totalDataLen + (composite ? sizeof(TVirtualGroupBlobFooter) : 0); + const auto id = cgsi.MakeBlobId(TabletId, type, 0, blobSize); + items.push_back(TReadItem{locator.GetGroupId(), id, static_cast<ui32>(offset + begin), + static_cast<ui32>(partSize), outputOffset}); + } else { + Y_FAIL(); + } + + if (size) { + size -= partSize; + if (!size) { + break; + } + } + offset = 0; + outputOffset += partSize; + } + + if (size) { + *error = "incorrect offset/size provided"; + return false; + } + + auto& reads = query->Reads; + auto iter = reads.insert(reads.end(), TQuery::TReadContext{ + .Query = query, + .Tag = tag, + .Size = outputOffset, + }); + + for (const TReadItem& item : items) { + const ui64 id = NextReadId++; + SendToBSProxy(SelfId(), item.GroupId, new TEvBlobStorage::TEvGet(item.Id, item.Offset, item.Size, + TInstant::Max(), getHandleClass, mustRestoreFirst), id); + ReadsInFlight.emplace(id, iter); + iter->ReadsToOffset.emplace(id, item.OutputOffset); + } + + return true; + } + + void TBlobDepotAgent::Handle(TEvBlobStorage::TEvGetResult::TPtr ev) { + if (const auto it = ReadsInFlight.find(ev->Cookie); it != ReadsInFlight.end()) { + auto ctx = it->second; + ReadsInFlight.erase(it); + + const auto offsetIt = ctx->ReadsToOffset.find(ev->Cookie); + Y_VERIFY(offsetIt != ctx->ReadsToOffset.end()); + const ui64 offset = offsetIt->second; + ctx->ReadsToOffset.erase(offsetIt); + + auto& msg = *ev->Get(); + if (msg.Status != NKikimrProto::OK) { + ctx->Status = msg.Status; + ctx->Buffer = std::move(msg.ErrorReason); + } else if (ctx->Status == NKikimrProto::OK) { + Y_VERIFY(msg.ResponseSz == 1); + auto& blob = msg.Responses[1]; + Y_VERIFY(offset + blob.Buffer.size() <= ctx->Size); + if (!ctx->Buffer && !offset) { + ctx->Buffer = std::move(blob.Buffer); + ctx->Buffer.resize(ctx->Size); + } else { + if (!ctx->Buffer) { + ctx->Buffer = TString::Uninitialized(ctx->Size); + } + memcpy(ctx->Buffer.Detach() + offset, blob.Buffer.data(), blob.Buffer.size()); + } + } + + if (ctx->ReadsToOffset.empty()) { + ctx->Query->OnRead(ctx->Tag, ctx->Status, std::move(ctx->Buffer)); + ctx->Query->Reads.erase(ctx); + } + } + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/request.cpp b/ydb/core/blob_depot/agent/request.cpp new file mode 100644 index 00000000000..46639c9b379 --- /dev/null +++ b/ydb/core/blob_depot/agent/request.cpp @@ -0,0 +1,75 @@ +#include "agent_impl.h" + +namespace NKikimr::NBlobDepot { + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // TRequestSender class + + TRequestSender::TRequestSender(TBlobDepotAgent& agent) + : Agent(agent) + {} + + TRequestSender::~TRequestSender() { + if (this != &Agent) { + for (const auto& [id, context] : RequestsInFlight) { + const size_t numErased = Agent.TabletRequestInFlight.erase(id) + Agent.OtherRequestInFlight.erase(id); + Y_VERIFY(numErased == 1); + } + } + } + + void TRequestSender::RegisterRequest(ui64 id, TRequestContext::TPtr context) { + const auto [_, inserted] = RequestsInFlight.emplace(id, std::move(context)); + Y_VERIFY(inserted); + } + + void TRequestSender::OnRequestComplete(ui64 id, TResponse response) { + const auto it = RequestsInFlight.find(id); + Y_VERIFY(it != RequestsInFlight.end()); + TRequestContext::TPtr context = std::move(it->second); + RequestsInFlight.erase(it); + ProcessResponse(id, std::move(context), std::move(response)); + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // TBlobDepotAgent machinery + + void TBlobDepotAgent::RegisterRequest(ui64 id, TRequestSender *sender, TRequestContext::TPtr context, bool toBlobDepotTablet) { + auto& map = toBlobDepotTablet ? TabletRequestInFlight : OtherRequestInFlight; + const auto [_, inserted] = map.emplace(id, sender); + Y_VERIFY(inserted); + sender->RegisterRequest(id, std::move(context)); + } + + template<typename TEvent> + void TBlobDepotAgent::HandleTabletResponse(TAutoPtr<TEventHandle<TEvent>> ev) { + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA01, "HandleTabletResponse", (VirtualGroupId, VirtualGroupId), + (Id, ev->Cookie), (Type, TypeName<TEvent>())); + auto *event = ev->Get(); + HandleResponse(reinterpret_cast<TAutoPtr<IEventHandle>&>(ev), event, TabletRequestInFlight); + } + + template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvRegisterAgentResult::TPtr ev); + template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvAllocateIdsResult::TPtr ev); + template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvBlockResult::TPtr ev); + template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvQueryBlocksResult::TPtr ev); + template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvCommitBlobSeqResult::TPtr ev); + template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvResolveResult::TPtr ev); + + template<typename TEvent> + void TBlobDepotAgent::HandleOtherResponse(TAutoPtr<TEventHandle<TEvent>> ev) { + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA02, "HandleOtherResponse", (VirtualGroupId, VirtualGroupId), + (Id, ev->Cookie), (Type, TypeName<TEvent>())); + auto *event = ev->Get(); + HandleResponse(ev, event, OtherRequestInFlight); + } + + void TBlobDepotAgent::HandleResponse(TAutoPtr<IEventHandle> ev, TResponse response, THashMap<ui64, TRequestSender*>& map) { + const auto it = map.find(ev->Cookie); + Y_VERIFY(it != map.end()); + const auto [id, sender] = *it; + map.erase(it); + sender->OnRequestComplete(id, std::move(response)); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/status.cpp b/ydb/core/blob_depot/agent/status.cpp new file mode 100644 index 00000000000..b1e57338bd6 --- /dev/null +++ b/ydb/core/blob_depot/agent/status.cpp @@ -0,0 +1,9 @@ +#include "agent_impl.h" + +namespace NKikimr::NBlobDepot { + + TStorageStatusFlags TBlobDepotAgent::GetStorageStatusFlags() const { + return NKikimrBlobStorage::StatusIsValid; // FIXME: implement + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/storage_block.cpp b/ydb/core/blob_depot/agent/storage_block.cpp new file mode 100644 index 00000000000..9503615f912 --- /dev/null +++ b/ydb/core/blob_depot/agent/storage_block.cpp @@ -0,0 +1,65 @@ +#include "agent_impl.h" + +namespace NKikimr::NBlobDepot { + + template<> + TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvBlock>(std::unique_ptr<IEventHandle> ev) { + class TBlockQuery : public TQuery { + struct TBlockContext : TRequestContext { + TMonotonic Timestamp; + + TBlockContext(TMonotonic timestamp) + : Timestamp(timestamp) + {} + }; + + public: + using TQuery::TQuery; + + void Initiate() override { + auto& msg = *Event->Get<TEvBlobStorage::TEvBlock>(); + + // lookup existing blocks to try fail-fast + const ui32 blockedGeneration = Agent.GetBlockForTablet(msg.TabletId); + if (msg.Generation <= blockedGeneration) { + // we don't consider ExpirationTimestamp here because blocked generation may only increase + return EndWithError(NKikimrProto::RACE, "block race detected"); + } + + // issue request to the tablet + NKikimrBlobDepot::TEvBlock block; + block.SetTabletId(msg.TabletId); + block.SetBlockedGeneration(msg.Generation); + Agent.Issue(std::move(block), this, std::make_shared<TBlockContext>(TActivationContext::Monotonic())); + } + + void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr context, TResponse response) override { + if (std::holds_alternative<TTabletDisconnected>(response)) { + EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected"); + } else if (auto *p = std::get_if<TEvBlobDepot::TEvBlockResult*>(&response)) { + return HandleBlockResult(std::move(context), **p); + } else { + Y_FAIL(); + } + } + + void HandleBlockResult(TRequestContext::TPtr context, TEvBlobDepot::TEvBlockResult& msg) { + if (!msg.Record.HasStatus()) { + EndWithError(NKikimrProto::ERROR, "incorrect TEvBlockResult response"); + } else if (const auto status = msg.Record.GetStatus(); status != NKikimrProto::OK) { + EndWithError(status, msg.Record.GetErrorReason()); + } else { + // update blocks cache + auto& blockContext = context->Obtain<TBlockContext>(); + auto& query = *Event->Get<TEvBlobStorage::TEvBlock>(); + Agent.SetBlockForTablet(query.TabletId, query.Generation, blockContext.Timestamp + + TDuration::MilliSeconds(msg.Record.GetTimeToLiveMs())); + EndWithSuccess(std::make_unique<TEvBlobStorage::TEvBlockResult>(NKikimrProto::OK)); + } + } + }; + + return new TBlockQuery(*this, std::move(ev)); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/storage_collect_garbage.cpp b/ydb/core/blob_depot/agent/storage_collect_garbage.cpp new file mode 100644 index 00000000000..eeaa7de4368 --- /dev/null +++ b/ydb/core/blob_depot/agent/storage_collect_garbage.cpp @@ -0,0 +1,38 @@ +#include "agent_impl.h" + +namespace NKikimr::NBlobDepot { + + template<> + TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvCollectGarbage>(std::unique_ptr<IEventHandle> ev) { + class TCollectGarbageQuery : public TQuery { + ui32 BlockChecksRemain = 3; + + public: + using TQuery::TQuery; + + void Initiate() override { + auto& msg = *Event->Get<TEvBlobStorage::TEvCollectGarbage>(); + + const auto status = Agent.CheckBlockForTablet(msg.TabletId, msg.RecordGeneration, this); + if (status == NKikimrProto::OK) { + IssueCollectGarbage(); + } else if (status != NKikimrProto::UNKNOWN) { + EndWithError(status, "block race detected"); + } else if (!--BlockChecksRemain) { + EndWithError(NKikimrProto::ERROR, "failed to acquire blocks"); + } + } + + void IssueCollectGarbage() { + } + + void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr /*context*/, TResponse response) override { + (void)response; + Y_FAIL(); + } + }; + + return new TCollectGarbageQuery(*this, std::move(ev)); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/storage_discover.cpp b/ydb/core/blob_depot/agent/storage_discover.cpp new file mode 100644 index 00000000000..61e22a3c578 --- /dev/null +++ b/ydb/core/blob_depot/agent/storage_discover.cpp @@ -0,0 +1,154 @@ +#include "agent_impl.h" + +namespace NKikimr::NBlobDepot { + + template<> + TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvDiscover>(std::unique_ptr<IEventHandle> ev) { + class TDiscoverQuery : public TQuery { + ui64 TabletId = 0; + bool ReadBody; + ui32 MinGeneration = 0; + + bool DoneWithBlockedGeneration = false; + bool DoneWithData = false; + + TLogoBlobID Id; + TString Buffer; + ui32 BlockedGeneration = 0; + + public: + using TQuery::TQuery; + + void Initiate() override { + auto& msg = *Event->Get<TEvBlobStorage::TEvDiscover>(); + + TabletId = msg.TabletId; + ReadBody = msg.ReadBody; + MinGeneration = msg.MinGeneration; + + IssueResolve(); + + if (msg.DiscoverBlockedGeneration) { + const auto status = Agent.CheckBlockForTablet(TabletId, Max<ui32>(), this, &BlockedGeneration); + if (status == NKikimrProto::OK) { + DoneWithBlockedGeneration = true; + } else if (status != NKikimrProto::UNKNOWN) { + EndWithError(status, "tablet was deleted"); + } + } else { + DoneWithBlockedGeneration = true; + } + } + + void IssueResolve() { + const ui8 channel = 0; + const TLogoBlobID from(TabletId, MinGeneration, 0, channel, 0, 0); + const TLogoBlobID to(TabletId, Max<ui32>(), Max<ui32>(), channel, TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie); + + NKikimrBlobDepot::TEvResolve resolve; + auto *item = resolve.AddItems(); + item->SetBeginningKey(from.GetRaw(), 3 * sizeof(ui64)); + item->SetIncludeBeginning(true); + item->SetEndingKey(to.GetRaw(), 3 * sizeof(ui64)); + item->SetIncludeEnding(true); + item->SetMaxKeys(1); + item->SetReverse(true); + + Agent.Issue(std::move(resolve), this, nullptr); + } + + void ProcessResponse(ui64 id, TRequestContext::TPtr context, TResponse response) override { + if (std::holds_alternative<TTabletDisconnected>(response)) { + return EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected"); + } else if (auto *p = std::get_if<TEvBlobDepot::TEvResolveResult*>(&response)) { + HandleResolveResult(id, std::move(context), **p); + } else { + Y_FAIL(); + } + } + + void OnUpdateBlock(bool success) override { + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDASD02, "OnUpdateBlock", (VirtualGroupId, Agent.VirtualGroupId), + (QueryId, QueryId), (Success, success)); + + if (!success) { + return EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected"); + } + + const auto status = Agent.CheckBlockForTablet(TabletId, Max<ui32>(), this, &BlockedGeneration); + if (status == NKikimrProto::OK) { + DoneWithBlockedGeneration = true; + CheckIfDone(); + } else if (status != NKikimrProto::UNKNOWN) { + EndWithError(status, "tablet was deleted"); + } else { + Y_FAIL(); + } + } + + void HandleResolveResult(ui64 id, TRequestContext::TPtr context, TEvBlobDepot::TEvResolveResult& msg) { + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDASD01, "HandleResolveResult", (VirtualGroupId, Agent.VirtualGroupId), + (QueryId, QueryId), (Msg, msg.Record)); + + const NKikimrProto::EReplyStatus status = msg.Record.GetStatus(); + if (status != NKikimrProto::OK && status != NKikimrProto::OVERRUN) { + return EndWithError(status, msg.Record.GetErrorReason()); + } + + if (status == NKikimrProto::OK) { + for (const auto& item : msg.Record.GetResolvedKeys()) { + const TString& id = item.GetKey(); + Y_VERIFY(id.size() == 3 * sizeof(ui64)); + Y_VERIFY(!Id); + Id = TLogoBlobID(reinterpret_cast<const ui64*>(id.data())); + Y_VERIFY(item.ValueChainSize() == 1); + if (ReadBody) { + TString error; + if (!Agent.IssueRead(item.GetValueChain(), 0, 0, NKikimrBlobStorage::Discover, true, this, 0, + true, &error)) { + return EndWithError(NKikimrProto::ERROR, TStringBuilder() << "failed to read discovered blob: " + << error); + } + } + } + + if (!ReadBody) { + DoneWithData = true; + CheckIfDone(); + } + } else { + Y_FAIL(); // do not expect to return single key in few messages + } + + if (status == NKikimrProto::OVERRUN) { // there will be extra message with data + Agent.RegisterRequest(id, this, std::move(context), true); + } + } + + void OnRead(ui64 /*tag*/, NKikimrProto::EReplyStatus status, TString dataOrErrorReason) override { + if (status == NKikimrProto::OK) { + Buffer = std::move(dataOrErrorReason); + DoneWithData = true; + CheckIfDone(); + } else if (status == NKikimrProto::NODATA) { + // this may indicate a data race between locator and key value, we have to restart our resolution query + IssueResolve(); + // FIXME: infinite cycle? + } else { + EndWithError(status, dataOrErrorReason); + } + } + + void CheckIfDone() { + if (DoneWithBlockedGeneration && DoneWithData) { + EndWithSuccess(Id + ? std::make_unique<TEvBlobStorage::TEvDiscoverResult>(Id, MinGeneration, Buffer, BlockedGeneration) + : std::make_unique<TEvBlobStorage::TEvDiscoverResult>(NKikimrProto::NODATA, MinGeneration, BlockedGeneration)); + } + } + }; + + return new TDiscoverQuery(*this, std::move(ev)); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/storage_get.cpp b/ydb/core/blob_depot/agent/storage_get.cpp new file mode 100644 index 00000000000..2857441f0a9 --- /dev/null +++ b/ydb/core/blob_depot/agent/storage_get.cpp @@ -0,0 +1,24 @@ +#include "agent_impl.h" + +namespace NKikimr::NBlobDepot { + + template<> + TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvGet>(std::unique_ptr<IEventHandle> ev) { + class TGetQuery : public TQuery { + public: + using TQuery::TQuery; + + void Initiate() override { + EndWithError(NKikimrProto::ERROR, "not implemented"); + } + + void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr /*context*/, TResponse response) override { + (void)response; + Y_FAIL(); + } + }; + + return new TGetQuery(*this, std::move(ev)); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/storage_patch.cpp b/ydb/core/blob_depot/agent/storage_patch.cpp new file mode 100644 index 00000000000..bc6cdfcf911 --- /dev/null +++ b/ydb/core/blob_depot/agent/storage_patch.cpp @@ -0,0 +1,24 @@ +#include "agent_impl.h" + +namespace NKikimr::NBlobDepot { + + template<> + TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvPatch>(std::unique_ptr<IEventHandle> ev) { + class TPatchQuery : public TQuery { + public: + using TQuery::TQuery; + + void Initiate() override { + EndWithError(NKikimrProto::ERROR, "not implemented"); + } + + void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr /*context*/, TResponse response) override { + (void)response; + Y_FAIL(); + } + }; + + return new TPatchQuery(*this, std::move(ev)); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/storage_put.cpp b/ydb/core/blob_depot/agent/storage_put.cpp new file mode 100644 index 00000000000..8c7d4045300 --- /dev/null +++ b/ydb/core/blob_depot/agent/storage_put.cpp @@ -0,0 +1,47 @@ +#include "agent_impl.h" + +namespace NKikimr::NBlobDepot { + + template<> + TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvPut>(std::unique_ptr<IEventHandle> ev) { + class TPutQuery : public TQuery { + ui32 BlockChecksRemain = 3; + + public: + using TQuery::TQuery; + + void Initiate() override { + auto& msg = *Event->Get<TEvBlobStorage::TEvPut>(); + + // first step -- check blocks + const auto status = Agent.CheckBlockForTablet(msg.Id.TabletID(), msg.Id.Generation(), this); + if (status == NKikimrProto::OK) { + IssuePuts(); + } else if (status != NKikimrProto::UNKNOWN) { + EndWithError(status, "block race detected"); + } else if (!--BlockChecksRemain) { + EndWithError(NKikimrProto::ERROR, "failed to acquire blocks"); + } + } + + void IssuePuts() { + } + + void OnUpdateBlock(bool success) override { + if (success) { + Initiate(); // just restart request + } else { + EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected"); + } + } + + void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr /*context*/, TResponse response) override { + (void)response; + Y_FAIL(); + } + }; + + return new TPutQuery(*this, std::move(ev)); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/storage_range.cpp b/ydb/core/blob_depot/agent/storage_range.cpp new file mode 100644 index 00000000000..95049a2401b --- /dev/null +++ b/ydb/core/blob_depot/agent/storage_range.cpp @@ -0,0 +1,24 @@ +#include "agent_impl.h" + +namespace NKikimr::NBlobDepot { + + template<> + TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvRange>(std::unique_ptr<IEventHandle> ev) { + class TRangeQuery : public TQuery { + public: + using TQuery::TQuery; + + void Initiate() override { + EndWithError(NKikimrProto::ERROR, "not implemented"); + } + + void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr /*context*/, TResponse response) override { + (void)response; + Y_FAIL(); + } + }; + + return new TRangeQuery(*this, std::move(ev)); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/storage_status.cpp b/ydb/core/blob_depot/agent/storage_status.cpp new file mode 100644 index 00000000000..8354f096982 --- /dev/null +++ b/ydb/core/blob_depot/agent/storage_status.cpp @@ -0,0 +1,25 @@ +#include "agent_impl.h" + +namespace NKikimr::NBlobDepot { + + template<> + TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvStatus>(std::unique_ptr<IEventHandle> ev) { + class TStatusQuery : public TQuery { + public: + using TQuery::TQuery; + + void Initiate() override { + EndWithSuccess(std::make_unique<TEvBlobStorage::TEvStatusResult>(NKikimrProto::OK, + Agent.GetStorageStatusFlags())); + } + + void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr /*context*/, TResponse response) override { + (void)response; + Y_FAIL(); + } + }; + + return new TStatusQuery(*this, std::move(ev)); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/blob_depot_agent.cpp b/ydb/core/blob_depot/blob_depot_agent.cpp index a9d7aeabf6a..6c2b769d338 100644 --- a/ydb/core/blob_depot/blob_depot_agent.cpp +++ b/ydb/core/blob_depot/blob_depot_agent.cpp @@ -43,11 +43,11 @@ namespace NKikimr::NBlobDepot { agent.ExpirationTimestamp = TInstant::Max(); OnAgentConnect(agent); - auto response = std::make_unique<TEvBlobDepot::TEvRegisterAgentResult>(); - auto& record = response->Record; - record.SetGeneration(Executor()->Generation()); + auto [response, record] = TEvBlobDepot::MakeResponseFor(ev, SelfId()); + + record->SetGeneration(Executor()->Generation()); for (const auto& [k, v] : ChannelKinds) { - auto *proto = record.AddChannelKinds(); + auto *proto = record->AddChannelKinds(); proto->SetChannelKind(k); for (const ui32 channel : v.IndexToChannel) { auto *cg = proto->AddChannelGroups(); @@ -56,7 +56,7 @@ namespace NKikimr::NBlobDepot { } } - SendResponseToAgent(*ev, std::move(response)); + TActivationContext::Send(response.release()); } void TBlobDepot::OnAgentConnect(TAgentInfo& agent) { @@ -66,16 +66,18 @@ namespace NKikimr::NBlobDepot { void TBlobDepot::Handle(TEvBlobDepot::TEvAllocateIds::TPtr ev) { STLOG(PRI_DEBUG, BLOB_DEPOT, BD04, "TEvAllocateIds", (TabletId, TabletID()), (Msg, ev->Get()->Record), (PipeServerId, ev->Recipient)); - auto response = std::make_unique<TEvBlobDepot::TEvAllocateIdsResult>(ev->Get()->Record.GetChannelKind(), + + auto [response, record] = TEvBlobDepot::MakeResponseFor(ev, SelfId(), ev->Get()->Record.GetChannelKind(), Executor()->Generation()); - auto& record = response->Record; - if (const auto it = ChannelKinds.find(record.GetChannelKind()); it != ChannelKinds.end()) { + + if (const auto it = ChannelKinds.find(record->GetChannelKind()); it != ChannelKinds.end()) { auto& nextBlobSeqId = it->second.NextBlobSeqId; - record.SetRangeBegin(nextBlobSeqId); + record->SetRangeBegin(nextBlobSeqId); nextBlobSeqId += PreallocatedIdCount; - record.SetRangeEnd(nextBlobSeqId); + record->SetRangeEnd(nextBlobSeqId); } - SendResponseToAgent(*ev, std::move(response)); + + TActivationContext::Send(response.release()); } TBlobDepot::TAgentInfo& TBlobDepot::GetAgent(const TActorId& pipeServerId) { @@ -88,14 +90,6 @@ namespace NKikimr::NBlobDepot { return agentIt->second; } - void TBlobDepot::SendResponseToAgent(IEventHandle& request, std::unique_ptr<IEventBase> response) { - auto handle = std::make_unique<IEventHandle>(request.Sender, SelfId(), response.release(), 0, request.Cookie); - if (request.InterconnectSession) { - handle->Rewrite(TEvInterconnect::EvForward, request.InterconnectSession); - } - TActivationContext::Send(handle.release()); - } - void TBlobDepot::InitChannelKinds() { ui32 channel = 0; for (const auto& profile : Config.GetChannelProfiles()) { diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h index 9e2bdb0a736..d7b14fed2ae 100644 --- a/ydb/core/blob_depot/blob_depot_tablet.h +++ b/ydb/core/blob_depot/blob_depot_tablet.h @@ -92,8 +92,6 @@ namespace NKikimr::NBlobDepot { PassAway(); } - void SendResponseToAgent(IEventHandle& request, std::unique_ptr<IEventBase> response); - void InitChannelKinds(); //////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/blob_depot/blocks.cpp b/ydb/core/blob_depot/blocks.cpp index 1550255eb14..2e36788569e 100644 --- a/ydb/core/blob_depot/blocks.cpp +++ b/ydb/core/blob_depot/blocks.cpp @@ -75,22 +75,17 @@ namespace NKikimr::NBlobDepot { } void Handle(TEvBlobDepot::TEvBlock::TPtr ev) { - auto event = std::make_unique<TEvBlobDepot::TEvBlockResult>(NKikimrProto::OK, std::nullopt); - auto& responseRecord = event->Record; - auto response = std::make_unique<IEventHandle>(ev->Sender, Self->SelfId(), event.release(), 0, ev->Cookie); - if (ev->InterconnectSession) { - response->Rewrite(TEvInterconnect::EvForward, ev->InterconnectSession); - } - const auto& record = ev->Get()->Record; + auto [response, responseRecord] = TEvBlobDepot::MakeResponseFor(ev, Self->SelfId(), NKikimrProto::OK, std::nullopt); + if (!record.HasTabletId() || !record.HasBlockedGeneration()) { - responseRecord.SetStatus(NKikimrProto::ERROR); - responseRecord.SetErrorReason("incorrect protobuf"); + responseRecord->SetStatus(NKikimrProto::ERROR); + responseRecord->SetErrorReason("incorrect protobuf"); } else { const ui64 tabletId = record.GetTabletId(); const ui32 blockedGeneration = record.GetBlockedGeneration(); if (const auto it = Blocks.find(tabletId); it != Blocks.end() && blockedGeneration <= it->second) { - responseRecord.SetStatus(NKikimrProto::ERROR); + responseRecord->SetStatus(NKikimrProto::ERROR); } else { TAgentInfo& agent = Self->GetAgent(ev->Recipient); Self->Execute(std::make_unique<TTxUpdateBlock>(Self, tabletId, blockedGeneration, @@ -102,7 +97,16 @@ namespace NKikimr::NBlobDepot { } void Handle(TEvBlobDepot::TEvQueryBlocks::TPtr ev) { - (void)ev; + const auto& record = ev->Get()->Record; + auto [response, responseRecord] = TEvBlobDepot::MakeResponseFor(ev, Self->SelfId()); + responseRecord->SetTimeToLiveMs(15000); // FIXME + + for (const ui64 tabletId : record.GetTabletIds()) { + const auto it = Blocks.find(tabletId); + responseRecord->AddBlockedGenerations(it != Blocks.end() ? it->second : 0); + } + + TActivationContext::Send(response.release()); } }; diff --git a/ydb/core/blob_depot/defs.h b/ydb/core/blob_depot/defs.h index 9d531864287..bdb31cf8618 100644 --- a/ydb/core/blob_depot/defs.h +++ b/ydb/core/blob_depot/defs.h @@ -5,6 +5,7 @@ #include <ydb/core/engine/minikql/flat_local_tx_factory.h> #include <ydb/core/tablet_flat/tablet_flat_executed.h> #include <ydb/core/tablet_flat/flat_cxx_database.h> +#include <ydb/core/protos/blob_depot.pb.h> #include <ydb/core/util/stlog.h> #include <util/generic/va_args.h> diff --git a/ydb/core/blob_depot/events.h b/ydb/core/blob_depot/events.h index 57b8e21c729..b64f893c6cb 100644 --- a/ydb/core/blob_depot/events.h +++ b/ydb/core/blob_depot/events.h @@ -63,6 +63,28 @@ namespace NKikimr { BLOBDEPOT_EVENT_PB_NO_ARGS(EvCommitBlobSeqResult); BLOBDEPOT_EVENT_PB_NO_ARGS(EvResolve); BLOBDEPOT_EVENT_PB(EvResolveResult, Status, ErrorReason); + + template<typename TEvent> + struct TResponseFor {}; + + template<> struct TResponseFor<TEvApplyConfig> { using Type = TEvApplyConfigResult; }; + template<> struct TResponseFor<TEvRegisterAgent> { using Type = TEvRegisterAgentResult; }; + template<> struct TResponseFor<TEvAllocateIds> { using Type = TEvAllocateIdsResult; }; + template<> struct TResponseFor<TEvBlock> { using Type = TEvBlockResult; }; + template<> struct TResponseFor<TEvQueryBlocks> { using Type = TEvQueryBlocksResult; }; + template<> struct TResponseFor<TEvCommitBlobSeq> { using Type = TEvCommitBlobSeqResult; }; + template<> struct TResponseFor<TEvResolve> { using Type = TEvResolveResult; }; + + template<typename TRequestEvent, typename... TArgs> + static auto MakeResponseFor(TAutoPtr<TEventHandle<TRequestEvent>>& ev, TActorId selfId, TArgs&&... args) { + auto event = std::make_unique<typename TResponseFor<TRequestEvent>::Type>(std::forward<TArgs>(args)...); + auto *record = &event->Record; + auto handle = std::make_unique<IEventHandle>(ev->Sender, selfId, event.release(), 0, ev->Cookie); + if (ev->InterconnectSession) { + handle->Rewrite(TEvInterconnect::EvForward, ev->InterconnectSession); + } + return std::make_pair(std::move(handle), record); + } }; } // NKikimr diff --git a/ydb/core/blob_depot/types.h b/ydb/core/blob_depot/types.h index caf347a6cea..cb475e6a4ff 100644 --- a/ydb/core/blob_depot/types.h +++ b/ydb/core/blob_depot/types.h @@ -11,6 +11,21 @@ namespace NKikimr::NBlobDepot { std::vector<ui8> IndexToChannel; }; +#pragma pack(push, 1) + struct TVirtualGroupBlobFooter { + TLogoBlobID StoredBlobId; + }; + +#pragma pack(pop) + + static constexpr ui32 MaxBlobSize = 10 << 20; // 10 MB BlobStorage hard limit + + enum class EBlobType : ui32 { + VG_COMPOSITE_BLOB = 0, // data + footer + VG_DATA_BLOB = 1, // just data, footer aside + VG_FOOTER_BLOB = 2, // footer only + }; + struct TCGSI { static constexpr ui32 IndexBits = 20; static constexpr ui32 MaxIndex = (1 << IndexBits) - 1; @@ -37,6 +52,33 @@ namespace NKikimr::NBlobDepot { .Index = static_cast<ui32>(res.quot) & MaxIndex }; } + + static TCGSI FromProto(const NKikimrBlobDepot::TBlobSeqId& proto) { + return TCGSI{ + proto.GetChannel(), + proto.GetGeneration(), + proto.GetStep(), + proto.GetIndex() + }; + } + + TLogoBlobID MakeBlobId(ui64 tabletId, EBlobType type, ui32 part, ui32 size) const { + return TLogoBlobID(tabletId, Generation, Step, Channel, size, MakeCookie(type, part)); + } + + ui32 MakeCookie(EBlobType type, ui32 part) const { + switch (type) { + case EBlobType::VG_COMPOSITE_BLOB: + case EBlobType::VG_DATA_BLOB: + case EBlobType::VG_FOOTER_BLOB: + static constexpr ui32 typeBits = 24 - IndexBits; + Y_VERIFY(static_cast<ui32>(type) < (1 << typeBits)); + Y_VERIFY(!part); + return Index << typeBits | static_cast<ui32>(type); + } + + Y_FAIL(); + } }; enum class EKeepState : ui8 { diff --git a/ydb/core/protos/blob_depot.proto b/ydb/core/protos/blob_depot.proto index 81a56285d2a..e6f75da82db 100644 --- a/ydb/core/protos/blob_depot.proto +++ b/ydb/core/protos/blob_depot.proto @@ -15,6 +15,7 @@ message TBlobLocator { optional TBlobSeqId BlobSeqId = 2; optional uint32 Checksum = 3; optional uint64 TotalDataLen = 4; + optional uint32 FooterLen = 5; } message TValueChain { |