diff options
author | Alexander Rutkovsky <[email protected]> | 2022-06-10 05:06:12 +0300 |
---|---|---|
committer | Alexander Rutkovsky <[email protected]> | 2022-06-10 05:06:12 +0300 |
commit | 90319fb7ae588774acc4249c0ebf210a801e880f (patch) | |
tree | 8fad057213849d6a7be34f54308faefd9397f5c2 | |
parent | b62983df1309f8f2418ce4dfd289fe28cc3f3103 (diff) |
Support BlobDepot agent config propagation KIKIMR-14867
ref:f247b4ec35abb72fe96a94b5ec8d8730c316dc2c
54 files changed, 1914 insertions, 86 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt index a803091f3d9..06ce997224a 100644 --- a/CMakeLists.darwin.txt +++ b/CMakeLists.darwin.txt @@ -438,6 +438,7 @@ add_subdirectory(ydb/core/blobstorage/dsproxy) add_subdirectory(ydb/core/blobstorage/storagepoolmon) add_subdirectory(ydb/core/blobstorage/incrhuge) add_subdirectory(ydb/core/blobstorage/nodewarden) +add_subdirectory(ydb/core/blob_depot/agent) add_subdirectory(ydb/core/blobstorage/pdisk) add_subdirectory(ydb/library/schlab) add_subdirectory(ydb/library/schlab/schine) @@ -938,7 +939,6 @@ add_subdirectory(ydb/core/blobstorage/ut_mirror3of4) add_subdirectory(ydb/core/blobstorage/ut_vdisk) add_subdirectory(ydb/core/blobstorage/ut_vdisk/lib) add_subdirectory(ydb/core/blobstorage/ut_vdisk2) -add_subdirectory(ydb/core/blob_depot/agent) add_subdirectory(ydb/core/client/ut) add_subdirectory(ydb/core/tablet_flat/test/libs/rows) add_subdirectory(ydb/core/client/minikql_result_lib) @@ -1014,6 +1014,7 @@ add_subdirectory(ydb/services/ydb/sdk_credprovider_ut) add_subdirectory(ydb/services/ydb/ut) add_subdirectory(ydb/public/sdk/cpp/client/ydb_extension) add_subdirectory(ydb/services/yq/ut_integration) +add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_blob_depot) add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_group_reconfiguration) add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_osiris) add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_replication) diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt index 6c5ab225d99..ca1e352eea0 100644 --- a/CMakeLists.linux.txt +++ b/CMakeLists.linux.txt @@ -518,6 +518,7 @@ add_subdirectory(ydb/core/blobstorage/dsproxy) add_subdirectory(ydb/core/blobstorage/storagepoolmon) add_subdirectory(ydb/core/blobstorage/incrhuge) add_subdirectory(ydb/core/blobstorage/nodewarden) +add_subdirectory(ydb/core/blob_depot/agent) add_subdirectory(ydb/core/blobstorage/pdisk) add_subdirectory(ydb/library/schlab) add_subdirectory(ydb/library/schlab/schine) @@ -1034,7 +1035,6 @@ add_subdirectory(ydb/core/blobstorage/ut_mirror3of4) add_subdirectory(ydb/core/blobstorage/ut_vdisk) add_subdirectory(ydb/core/blobstorage/ut_vdisk/lib) add_subdirectory(ydb/core/blobstorage/ut_vdisk2) -add_subdirectory(ydb/core/blob_depot/agent) add_subdirectory(ydb/core/client/ut) add_subdirectory(ydb/core/tablet_flat/test/libs/rows) add_subdirectory(ydb/core/client/minikql_result_lib) @@ -1109,6 +1109,7 @@ add_subdirectory(ydb/services/ydb/sdk_credprovider_ut) add_subdirectory(ydb/services/ydb/ut) add_subdirectory(ydb/public/sdk/cpp/client/ydb_extension) add_subdirectory(ydb/services/yq/ut_integration) +add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_blob_depot) add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_group_reconfiguration) add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_osiris) add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_replication) diff --git a/ydb/core/blob_depot/CMakeLists.txt b/ydb/core/blob_depot/CMakeLists.txt index 46eed711fc2..e0edc850baf 100644 --- a/ydb/core/blob_depot/CMakeLists.txt +++ b/ydb/core/blob_depot/CMakeLists.txt @@ -16,4 +16,11 @@ target_link_libraries(ydb-core-blob_depot PUBLIC ) target_sources(ydb-core-blob_depot PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blob_depot.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blob_depot_agent.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blocks.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/op_apply_config.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/op_init_schema.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/op_load.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/op_resolve.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/op_commit_blob_seq.cpp ) diff --git a/ydb/core/blob_depot/agent/CMakeLists.txt b/ydb/core/blob_depot/agent/CMakeLists.txt index 41a1f8fc18f..76f8566f83f 100644 --- a/ydb/core/blob_depot/agent/CMakeLists.txt +++ b/ydb/core/blob_depot/agent/CMakeLists.txt @@ -16,4 +16,14 @@ target_link_libraries(core-blob_depot-agent PUBLIC ) target_sources(core-blob_depot-agent PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_comm.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_storage_query.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_storage_put.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_storage_get.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_storage_block.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_storage_discover.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_storage_range.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_storage_collect_garbage.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_storage_status.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_storage_patch.cpp ) diff --git a/ydb/core/blob_depot/agent/agent.cpp b/ydb/core/blob_depot/agent/agent.cpp index 73c9848652c..0fc3ea0fa4b 100644 --- a/ydb/core/blob_depot/agent/agent.cpp +++ b/ydb/core/blob_depot/agent/agent.cpp @@ -3,8 +3,8 @@ namespace NKikimr::NBlobDepot { - IActor *CreateBlobDepotAgent(ui32 virtualGroupId, ui64 tabletId) { - return new TBlobDepotAgent(virtualGroupId, tabletId); + IActor *CreateBlobDepotAgent(ui32 virtualGroupId) { + return new TBlobDepotAgent(virtualGroupId); } } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/agent.h b/ydb/core/blob_depot/agent/agent.h index e848f1969c1..4d66f9967b4 100644 --- a/ydb/core/blob_depot/agent/agent.h +++ b/ydb/core/blob_depot/agent/agent.h @@ -4,6 +4,6 @@ namespace NKikimr::NBlobDepot { - IActor *CreateBlobDepotAgent(ui32 virtualGroupId, ui64 tabletId); + IActor *CreateBlobDepotAgent(ui32 virtualGroupId); } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/agent_comm.cpp b/ydb/core/blob_depot/agent/agent_comm.cpp new file mode 100644 index 00000000000..87cd4f95cb4 --- /dev/null +++ b/ydb/core/blob_depot/agent/agent_comm.cpp @@ -0,0 +1,172 @@ +#include "agent_impl.h" + +namespace NKikimr::NBlobDepot { + + void TBlobDepotAgent::Handle(TEvTabletPipe::TEvClientConnected::TPtr ev) { + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDAC01, "TEvClientConnected", (VirtualGroupId, VirtualGroupId), + (Msg, ev->Get()->ToString())); + } + + void TBlobDepotAgent::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev) { + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC02, "TEvClientDestroyed", (VirtualGroupId, VirtualGroupId), + (Msg, ev->Get()->ToString())); + PipeId = {}; + OnDisconnect(); + ConnectToBlobDepot(); + } + + void TBlobDepotAgent::ConnectToBlobDepot() { + PipeId = Register(NTabletPipe::CreateClient(SelfId(), TabletId, NTabletPipe::TClientRetryPolicy::WithRetries())); + const ui64 id = NextRequestId++; + NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobDepot::TEvRegisterAgent(VirtualGroupId), id); + RegisterRequest(id, nullptr, [this](IEventBase *ev) { + if (ev) { + auto& msg = *static_cast<TEvBlobDepot::TEvRegisterAgentResult*>(ev); + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC06, "TEvRegisterAgentResult", (VirtualGroupId, VirtualGroupId), + (Msg, msg.Record)); + Registered = true; + BlobDepotGeneration = msg.Record.GetGeneration(); + auto& channelGroups = msg.Record.GetChannelGroups(); + BlobDepotChannelGroups = {channelGroups.begin(), channelGroups.end()}; + } + return true; + }); + IssueAllocateIdsIfNeeded(); + } + + void TBlobDepotAgent::IssueAllocateIdsIfNeeded() { + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC09, "IssueAllocateIdsIfNeeded", (VirtualGroupId, VirtualGroupId), + (IdAllocInFlight, IdAllocInFlight), (IdQ.size, IdQ.size()), (PreallocatedIdCount, PreallocatedIdCount), + (PipeId, PipeId)); + if (!IdAllocInFlight && IdQ.size() < PreallocatedIdCount && PipeId) { + const ui64 id = NextRequestId++; + NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobDepot::TEvAllocateIds, id); + IdAllocInFlight = true; + + RegisterRequest(id, nullptr, [this](IEventBase *ev) { + Y_VERIFY(IdAllocInFlight); + IdAllocInFlight = false; + + if (ev) { + auto& msg = *static_cast<TEvBlobDepot::TEvAllocateIdsResult*>(ev); + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC07, "TEvAllocateIdsResult", (VirtualGroupId, VirtualGroupId), + (Msg, msg.Record)); + Y_VERIFY(msg.Record.GetGeneration() == BlobDepotGeneration); + IdQ.push_back(TAllocatedId{BlobDepotGeneration, msg.Record.GetRangeBegin(), msg.Record.GetRangeEnd()}); + + // FIXME notify waiting requests about new ids + + // ask for more ids if needed + IssueAllocateIdsIfNeeded(); + } + + return true; // request complete, remove from queue + }); + } + } + + void TBlobDepotAgent::OnDisconnect() { + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC04, "OnDisconnect", (VirtualGroupId, VirtualGroupId)); + + for (auto& [id, item] : std::exchange(RequestInFlight, {})) { + if (item.Sender) { + item.Sender->OnRequestComplete(id); + } + const bool done = item.Callback(nullptr); + Y_VERIFY(done); + } + + Registered = false; + } + + void TBlobDepotAgent::RegisterRequest(ui64 id, TRequestSender *sender, TRequestCompleteCallback callback) { + const auto [_, inserted] = RequestInFlight.emplace(id, TRequestInFlight{sender, std::move(callback)}); + Y_VERIFY(inserted); + if (sender) { + sender->RegisterRequest(id); + } + } + + template<typename TEvent> + void TBlobDepotAgent::HandleTabletResponse(TAutoPtr<TEventHandle<TEvent>> ev) { + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDAC05, "HandleTabletResponse", (VirtualGroupId, VirtualGroupId), + (Id, ev->Cookie), (Type, TypeName<TEvent>())); + if (const auto it = RequestInFlight.find(ev->Cookie); it != RequestInFlight.end()) { + auto& [id, item] = *it; + if (item.Sender) { + item.Sender->OnRequestComplete(id); + } + if (item.Callback(ev->Get())) { + RequestInFlight.erase(it); + } + } else { + Y_FAIL(); // don't know how this can happen without logic error + } + } + + template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvRegisterAgentResult::TPtr ev); + template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvAllocateIdsResult::TPtr ev); + template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvBlockResult::TPtr ev); + template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvQueryBlocksResult::TPtr ev); + template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvCommitBlobSeqResult::TPtr ev); + template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvResolveResult::TPtr ev); + + void TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvBlock msg, TRequestSender *sender, TRequestCompleteCallback callback) { + auto ev = std::make_unique<TEvBlobDepot::TEvBlock>(); + msg.Swap(&ev->Record); + Issue(std::move(ev), sender, std::move(callback)); + } + + void TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvResolve msg, TRequestSender *sender, TRequestCompleteCallback callback) { + auto ev = std::make_unique<TEvBlobDepot::TEvResolve>(); + msg.Swap(&ev->Record); + Issue(std::move(ev), sender, std::move(callback)); + } + + void TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvQueryBlocks msg, TRequestSender *sender, TRequestCompleteCallback callback) { + auto ev = std::make_unique<TEvBlobDepot::TEvQueryBlocks>(); + msg.Swap(&ev->Record); + Issue(std::move(ev), sender, std::move(callback)); + } + + void TBlobDepotAgent::Issue(std::unique_ptr<IEventBase> ev, TRequestSender *sender, TRequestCompleteCallback callback) { + const ui64 id = NextRequestId++; + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDAC03, "Issue", (VirtualGroupId, VirtualGroupId), (Id, id), (Msg, ev->ToString())); + NTabletPipe::SendData(SelfId(), PipeId, ev.release(), id); + RegisterRequest(id, sender, std::move(callback)); + } + + NKikimrProto::EReplyStatus TBlobDepotAgent::CheckBlockForTablet(ui64 tabletId, ui32 generation, TExecutingQuery *query) { + auto& block = Blocks[tabletId]; + const TMonotonic issueTime = TActivationContext::Monotonic(); + if (generation <= block.BlockedGeneration) { + return NKikimrProto::RACE; + } else if (issueTime< block.ExpirationTimestamp) { + return NKikimrProto::OK; + } else if (!block.RefreshInFlight) { + NKikimrBlobDepot::TEvQueryBlocks queryBlocks; + queryBlocks.AddTabletIds(tabletId); + Issue(std::move(queryBlocks), nullptr, [=](IEventBase *ev) { + if (ev) { + auto& msg = *static_cast<TEvBlobDepot::TEvQueryBlocksResult*>(ev); + const ui64 tabletId_ = tabletId; + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC08, "TEvQueryBlocksResult", (VirtualGroupId, VirtualGroupId), + (Msg, msg.Record), (TabletId, tabletId_)); + Y_VERIFY(msg.Record.BlockedGenerationsSize() == 1); + auto& block = Blocks[tabletId]; + Y_VERIFY(block.BlockedGeneration < generation); + block.BlockedGeneration = msg.Record.GetBlockedGenerations(1); + block.ExpirationTimestamp = issueTime + TDuration::MilliSeconds(msg.Record.GetTimeToLiveMs()); + for (auto& query : block.PendingBlockChecks) { + query.OnUpdateBlock(); + } + } + return true; + }); + block.RefreshInFlight = true; + block.PendingBlockChecks.PushBack(query); + } + return NKikimrProto::NOTREADY; + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h index 8d9d05328ca..e4caa77894b 100644 --- a/ydb/core/blob_depot/agent/agent_impl.h +++ b/ydb/core/blob_depot/agent/agent_impl.h @@ -4,14 +4,219 @@ namespace NKikimr::NBlobDepot { - class TBlobDepotAgent : public TActorBootstrapped<TBlobDepotAgent> { +#define ENUMERATE_INCOMING_EVENTS(XX) \ + XX(EvPut) \ + XX(EvGet) \ + XX(EvBlock) \ + XX(EvDiscover) \ + XX(EvRange) \ + XX(EvCollectGarbage) \ + XX(EvStatus) \ + XX(EvPatch) \ + // END + + class TBlobDepotAgent : public TActor<TBlobDepotAgent> { + const ui32 VirtualGroupId; + ui64 TabletId = Max<ui64>(); + TActorId PipeId; + public: - TBlobDepotAgent(ui32 virtualGroupId, ui64 tabletId); - void Bootstrap(); + TBlobDepotAgent(ui32 virtualGroupId) + : TActor(&TThis::StateFunc) + , VirtualGroupId(virtualGroupId) + { + Y_VERIFY(TGroupID(VirtualGroupId).ConfigurationType() == EGroupConfigurationType::Virtual); + } +#define FORWARD_STORAGE_PROXY(TYPE) fFunc(TEvBlobStorage::TYPE, HandleStorageProxy); STRICT_STFUNC(StateFunc, cFunc(TEvents::TSystem::Poison, PassAway); + hFunc(TEvBlobStorage::TEvConfigureProxy, Handle); + + hFunc(TEvTabletPipe::TEvClientConnected, Handle); + hFunc(TEvTabletPipe::TEvClientDestroyed, Handle); + + hFunc(TEvBlobDepot::TEvRegisterAgentResult, HandleTabletResponse); + hFunc(TEvBlobDepot::TEvAllocateIdsResult, HandleTabletResponse); + hFunc(TEvBlobDepot::TEvBlockResult, HandleTabletResponse); + hFunc(TEvBlobDepot::TEvQueryBlocksResult, HandleTabletResponse); + hFunc(TEvBlobDepot::TEvCommitBlobSeqResult, HandleTabletResponse); + hFunc(TEvBlobDepot::TEvResolveResult, HandleTabletResponse); + + ENUMERATE_INCOMING_EVENTS(FORWARD_STORAGE_PROXY) ); +#undef FORWARD_STORAGE_PROXY + + void PassAway() override { + NTabletPipe::CloseAndForgetClient(SelfId(), PipeId); + TActor::PassAway(); + } + + void Handle(TEvBlobStorage::TEvConfigureProxy::TPtr ev) { + const auto& info = ev->Get()->Info; + Y_VERIFY(info); + Y_VERIFY(info->BlobDepotId); + TabletId = *info->BlobDepotId; + ConnectToBlobDepot(); + + for (auto& ev : std::exchange(PendingEventQ, {})) { + TActivationContext::Send(ev.release()); + } + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // BlobDepot communications + + using TRequestCompleteCallback = std::function<bool(IEventBase*)>; + + class TRequestSender { + THashSet<ui64> IdsInFlight; + + protected: + TBlobDepotAgent& Agent; + + public: + TRequestSender(TBlobDepotAgent& agent) + : Agent(agent) + {} + + ~TRequestSender() { + for (const ui64 id : IdsInFlight) { + const size_t num = Agent.RequestInFlight.erase(id); + Y_VERIFY(num); + } + } + + void RegisterRequest(ui64 id) { + const auto [_, inserted] = IdsInFlight.insert(id); + Y_VERIFY(inserted); + } + + void OnRequestComplete(ui64 id) { + const size_t num = IdsInFlight.erase(id); + Y_VERIFY(num); + } + }; + + struct TRequestInFlight { + TRequestSender *Sender; + TRequestCompleteCallback Callback; + }; + + ui64 NextRequestId = 1; + THashMap<ui64, TRequestInFlight> RequestInFlight; + + void RegisterRequest(ui64 id, TRequestSender *sender, TRequestCompleteCallback callback); + + template<typename TEvent> + void HandleTabletResponse(TAutoPtr<TEventHandle<TEvent>> ev); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + bool Registered = false; + ui32 BlobDepotGeneration = 0; + std::vector<ui32> BlobDepotChannelGroups; + + void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev); + void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev); + void ConnectToBlobDepot(); + void IssueAllocateIdsIfNeeded(); + void OnDisconnect(); + + void Issue(NKikimrBlobDepot::TEvBlock msg, TRequestSender *sender, TRequestCompleteCallback callback); + void Issue(NKikimrBlobDepot::TEvResolve msg, TRequestSender *sender, TRequestCompleteCallback callback); + void Issue(NKikimrBlobDepot::TEvQueryBlocks msg, TRequestSender *sender, TRequestCompleteCallback callback); + + void Issue(std::unique_ptr<IEventBase> ev, TRequestSender *sender, TRequestCompleteCallback callback); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + struct TAllocatedId { + ui32 Generation; + ui64 Begin; + ui64 End; + }; + + bool IdAllocInFlight = false; + std::deque<TAllocatedId> IdQ; + static constexpr size_t PreallocatedIdCount = 2; + + std::pair<TLogoBlobID, ui32> AllocateDataBlobId(ui32 size, ui32 type) { + if (IdQ.empty()) { + return {}; + } + + auto& item = IdQ.front(); + auto cgsi = TCGSI::FromBinary(BlobDepotGeneration, BlobDepotChannelGroups.size(), item.Begin++); + if (item.Begin == item.End) { + IdQ.pop_front(); + IssueAllocateIdsIfNeeded(); + } + static constexpr ui32 typeBits = 24 - TCGSI::IndexBits; + Y_VERIFY(type < (1 << typeBits)); + const ui32 cookie = cgsi.Index << typeBits | type; + const TLogoBlobID id(TabletId, cgsi.Generation, cgsi.Step, cgsi.Channel, size, cookie); + const ui32 groupId = BlobDepotChannelGroups[cgsi.Channel]; + return {id, groupId}; + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + struct TExecutingQueries {}; + struct TPendingBlockChecks {}; + + class TExecutingQuery + : public TIntrusiveListItem<TExecutingQuery, TExecutingQueries> + , public TIntrusiveListItem<TExecutingQuery, TPendingBlockChecks> + , public TRequestSender + { + protected: + std::unique_ptr<IEventHandle> Event; // original query event + const ui64 QueryId; + + public: + TExecutingQuery(TBlobDepotAgent& agent, std::unique_ptr<IEventHandle> event) + : TRequestSender(agent) + , Event(std::move(event)) + , QueryId(RandomNumber<ui64>()) + {} + + virtual ~TExecutingQuery() = default; + + void EndWithError(NKikimrProto::EReplyStatus status, const TString& errorReason); + void EndWithSuccess(std::unique_ptr<IEventBase> response); + TString GetName() const; + ui64 GetQueryId() const { return QueryId; } + virtual void Initiate() = 0; + + virtual void OnUpdateBlock() {} + + public: + struct TDeleter { + static void Destroy(TExecutingQuery *query) { delete query; } + }; + }; + + std::deque<std::unique_ptr<IEventHandle>> PendingEventQ; + TIntrusiveListWithAutoDelete<TExecutingQuery, TExecutingQuery::TDeleter, TExecutingQueries> ExecutingQueries; + + void HandleStorageProxy(TAutoPtr<IEventHandle> ev); + TExecutingQuery *CreateExecutingQuery(TAutoPtr<IEventHandle> ev); + template<ui32 EventType> TExecutingQuery *CreateExecutingQuery(std::unique_ptr<IEventHandle> ev); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Blocks + + struct TBlockInfo { + ui32 BlockedGeneration; + TMonotonic ExpirationTimestamp; // not valid after + bool RefreshInFlight = false; + TIntrusiveList<TExecutingQuery, TPendingBlockChecks> PendingBlockChecks; + }; + + std::unordered_map<ui64, TBlockInfo> Blocks; + + NKikimrProto::EReplyStatus CheckBlockForTablet(ui64 tabletId, ui32 generation, TExecutingQuery *query); }; } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/agent_storage_block.cpp b/ydb/core/blob_depot/agent/agent_storage_block.cpp new file mode 100644 index 00000000000..1d90be19463 --- /dev/null +++ b/ydb/core/blob_depot/agent/agent_storage_block.cpp @@ -0,0 +1,55 @@ +#include "agent_impl.h" + +namespace NKikimr::NBlobDepot { + + template<> + TBlobDepotAgent::TExecutingQuery *TBlobDepotAgent::CreateExecutingQuery<TEvBlobStorage::EvBlock>(std::unique_ptr<IEventHandle> ev) { + class TBlockExecutingQuery : public TExecutingQuery { + public: + using TExecutingQuery::TExecutingQuery; + + void Initiate() override { + auto& msg = *Event->Get<TEvBlobStorage::TEvBlock>(); + + // lookup existing blocks to try fail-fast + if (const auto it = Agent.Blocks.find(msg.TabletId); it != Agent.Blocks.end()) { + const TBlockInfo& block = it->second; + if (msg.Generation <= block.BlockedGeneration) { + // we don't consider ExpirationTimestamp here because blocked generation may only increase + return EndWithError(NKikimrProto::RACE, "block race detected"); + } + } + + // issue request to the tablet + NKikimrBlobDepot::TEvBlock block; + block.SetTabletId(msg.TabletId); + block.SetBlockedGeneration(msg.Generation); + Agent.Issue(std::move(block), this, std::bind(&TBlockExecutingQuery::HandleBlockResult, this, + std::placeholders::_1, TActivationContext::Monotonic())); + } + + bool HandleBlockResult(IEventBase *event, TMonotonic issueTimestamp) { + if (!event) { + EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected"); + } else if (const auto& msg = static_cast<TEvBlobDepot::TEvBlockResult&>(*event); !msg.Record.HasStatus()) { + EndWithError(NKikimrProto::ERROR, "incorrect TEvBlockResult response"); + } else if (const auto status = msg.Record.GetStatus(); status != NKikimrProto::OK) { + EndWithError(status, msg.Record.GetErrorReason()); + } else { + // update blocks cache + auto& query = *Event->Get<TEvBlobStorage::TEvBlock>(); + auto& block = Agent.Blocks[query.TabletId]; + Y_VERIFY(block.BlockedGeneration <= query.Generation); // TODO: QueryBlocks can race with Block? + block.BlockedGeneration = query.Generation; + block.ExpirationTimestamp = issueTimestamp + TDuration::MilliSeconds(msg.Record.GetTimeToLiveMs()); + + EndWithSuccess(std::make_unique<TEvBlobStorage::TEvBlockResult>(NKikimrProto::OK)); + } + return true; + } + }; + + return new TBlockExecutingQuery(*this, std::move(ev)); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/agent_storage_collect_garbage.cpp b/ydb/core/blob_depot/agent/agent_storage_collect_garbage.cpp new file mode 100644 index 00000000000..8a411246a52 --- /dev/null +++ b/ydb/core/blob_depot/agent/agent_storage_collect_garbage.cpp @@ -0,0 +1,10 @@ +#include "agent_impl.h" + +namespace NKikimr::NBlobDepot { + + template<> + TBlobDepotAgent::TExecutingQuery *TBlobDepotAgent::CreateExecutingQuery<TEvBlobStorage::EvCollectGarbage>(std::unique_ptr<IEventHandle> ev) { + return (void)ev, nullptr; + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/agent_storage_discover.cpp b/ydb/core/blob_depot/agent/agent_storage_discover.cpp new file mode 100644 index 00000000000..6cce45058a6 --- /dev/null +++ b/ydb/core/blob_depot/agent/agent_storage_discover.cpp @@ -0,0 +1,76 @@ +#include "agent_impl.h" + +namespace NKikimr::NBlobDepot { + + template<> + TBlobDepotAgent::TExecutingQuery *TBlobDepotAgent::CreateExecutingQuery<TEvBlobStorage::EvDiscover>(std::unique_ptr<IEventHandle> ev) { + class TDiscoverExecutingQuery : public TExecutingQuery { + public: + using TExecutingQuery::TExecutingQuery; + + void Initiate() override { + auto& msg = *Event->Get<TEvBlobStorage::TEvDiscover>(); + + const TLogoBlobID from(msg.TabletId, Max<ui32>(), Max<ui32>(), 0, TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie); + const TLogoBlobID to(msg.TabletId, 0, 0, 0, 0, 0); + + NKikimrBlobDepot::TEvResolve resolve; + auto *item = resolve.AddItems(); + item->SetBeginningKey(from.GetRaw(), 3 * sizeof(ui64)); + item->SetIncludeBeginning(true); + item->SetEndingKey(to.GetRaw(), 3 * sizeof(ui64)); + item->SetIncludeEnding(true); + item->SetMaxKeys(1); + item->SetReverse(true); + + Agent.Issue(std::move(resolve), this, std::bind(&TDiscoverExecutingQuery::HandleResolveResult, + this, std::placeholders::_1)); + + if (msg.DiscoverBlockedGeneration) { + NKikimrBlobDepot::TEvQueryBlocks queryBlocks; + queryBlocks.AddTabletIds(msg.TabletId); + Agent.Issue(std::move(queryBlocks), this, std::bind( + &TDiscoverExecutingQuery::HandleQueryBlocksResult, this, + std::placeholders::_1, TActivationContext::Now())); + } + } + + bool HandleResolveResult(IEventBase *result) { + if (!result) { // server has disconnected before request got processed + EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected"); + return true; + } + + auto& msg = static_cast<TEvBlobDepot::TEvResolveResult&>(*result); + + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDASD01, "HandleResolveResult", (VirtualGroupId, Agent.VirtualGroupId), + (QueryId, QueryId), (Msg, msg.Record)); + + const NKikimrProto::EReplyStatus status = msg.Record.GetStatus(); + if (status != NKikimrProto::OK && status != NKikimrProto::OVERRUN) { + EndWithError(status, msg.Record.GetErrorReason()); + return true; + } + + return status != NKikimrProto::OVERRUN; + } + + bool HandleQueryBlocksResult(IEventBase *result, TInstant /*issueTimestamp*/) { + if (!result) { + EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected"); + return true; + } + + auto& msg = static_cast<TEvBlobDepot::TEvQueryBlocksResult&>(*result); + + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDASD02, "HandleQueryBlocksResult", (VirtualGroupId, Agent.VirtualGroupId), + (QueryId, QueryId), (Msg, msg.Record)); + + return true; + } + }; + + return new TDiscoverExecutingQuery(*this, std::move(ev)); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/agent_storage_get.cpp b/ydb/core/blob_depot/agent/agent_storage_get.cpp new file mode 100644 index 00000000000..55d4e166904 --- /dev/null +++ b/ydb/core/blob_depot/agent/agent_storage_get.cpp @@ -0,0 +1,10 @@ +#include "agent_impl.h" + +namespace NKikimr::NBlobDepot { + + template<> + TBlobDepotAgent::TExecutingQuery *TBlobDepotAgent::CreateExecutingQuery<TEvBlobStorage::EvGet>(std::unique_ptr<IEventHandle> ev) { + return (void)ev, nullptr; + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/agent_storage_patch.cpp b/ydb/core/blob_depot/agent/agent_storage_patch.cpp new file mode 100644 index 00000000000..92bc96c9d01 --- /dev/null +++ b/ydb/core/blob_depot/agent/agent_storage_patch.cpp @@ -0,0 +1,10 @@ +#include "agent_impl.h" + +namespace NKikimr::NBlobDepot { + + template<> + TBlobDepotAgent::TExecutingQuery *TBlobDepotAgent::CreateExecutingQuery<TEvBlobStorage::EvPatch>(std::unique_ptr<IEventHandle> ev) { + return (void)ev, nullptr; + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/agent_storage_put.cpp b/ydb/core/blob_depot/agent/agent_storage_put.cpp new file mode 100644 index 00000000000..9628abdf27b --- /dev/null +++ b/ydb/core/blob_depot/agent/agent_storage_put.cpp @@ -0,0 +1,46 @@ +#include "agent_impl.h" + +namespace NKikimr::NBlobDepot { + + template<> + TBlobDepotAgent::TExecutingQuery *TBlobDepotAgent::CreateExecutingQuery<TEvBlobStorage::EvPut>(std::unique_ptr<IEventHandle> ev) { + class TPutExecutingQuery : public TExecutingQuery { + ui32 BlockChecksRemain = 3; + + public: + using TExecutingQuery::TExecutingQuery; + + void Initiate() override { + auto& msg = *Event->Get<TEvBlobStorage::TEvPut>(); + + // first step -- check blocks + switch (Agent.CheckBlockForTablet(msg.Id.TabletID(), msg.Id.Generation(), this)) { + case NKikimrProto::OK: + return IssuePuts(); + + case NKikimrProto::RACE: + return EndWithError(NKikimrProto::RACE, "block race detected"); + + case NKikimrProto::NOTREADY: + if (!--BlockChecksRemain) { + EndWithError(NKikimrProto::ERROR, "failed to acquire blocks"); + } + return; + + default: + Y_FAIL(); + } + } + + void IssuePuts() { + } + + void OnUpdateBlock() override { + Initiate(); // just restart request + } + }; + + return new TPutExecutingQuery(*this, std::move(ev)); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/agent_storage_query.cpp b/ydb/core/blob_depot/agent/agent_storage_query.cpp new file mode 100644 index 00000000000..a0dc670d536 --- /dev/null +++ b/ydb/core/blob_depot/agent/agent_storage_query.cpp @@ -0,0 +1,67 @@ +#include "agent_impl.h" + +namespace NKikimr::NBlobDepot { + + void TBlobDepotAgent::HandleStorageProxy(TAutoPtr<IEventHandle> ev) { + if (TabletId == Max<ui64>()) { + // TODO: memory usage control + PendingEventQ.emplace_back(ev.Release()); + } else { + auto *query = CreateExecutingQuery(ev); + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA01, "new query", (VirtualGroupId, VirtualGroupId), + (QueryId, query->GetQueryId()), (Name, query->GetName())); + if (!TabletId) { + query->EndWithError(NKikimrProto::ERROR, "group is in error state"); + } else { + query->Initiate(); + } + } + } + + TBlobDepotAgent::TExecutingQuery *TBlobDepotAgent::CreateExecutingQuery(TAutoPtr<IEventHandle> ev) { + switch (ev->GetTypeRewrite()) { +#define XX(TYPE) \ + case TEvBlobStorage::TYPE: return CreateExecutingQuery<TEvBlobStorage::TYPE>(std::unique_ptr<IEventHandle>(ev.Release())); + + ENUMERATE_INCOMING_EVENTS(XX) +#undef XX + } + Y_FAIL(); + } + + void TBlobDepotAgent::TExecutingQuery::EndWithError(NKikimrProto::EReplyStatus status, const TString& errorReason) { + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA02, "query ends with error", (VirtualGroupId, Agent.VirtualGroupId), + (QueryId, QueryId), (Status, status), (ErrorReason, errorReason)); + + std::unique_ptr<IEventBase> response; + switch (Event->GetTypeRewrite()) { +#define XX(TYPE) \ + case TEvBlobStorage::TYPE: \ + response = Event->Get<TEvBlobStorage::T##TYPE>()->MakeErrorResponse(status, errorReason, Agent.VirtualGroupId); \ + break; + + ENUMERATE_INCOMING_EVENTS(XX) +#undef XX + } + Y_VERIFY(response); + Agent.SelfId().Send(Event->Sender, response.release(), 0, Event->Cookie); + delete this; + } + + void TBlobDepotAgent::TExecutingQuery::EndWithSuccess(std::unique_ptr<IEventBase> response) { + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA02, "query ends with success", (VirtualGroupId, Agent.VirtualGroupId), + (QueryId, QueryId), (Response, response->ToString())); + Agent.SelfId().Send(Event->Sender, response.release(), 0, Event->Cookie); + delete this; + } + + TString TBlobDepotAgent::TExecutingQuery::GetName() const { + switch (Event->GetTypeRewrite()) { +#define XX(TYPE) case TEvBlobStorage::TYPE: return #TYPE; + ENUMERATE_INCOMING_EVENTS(XX) +#undef XX + } + Y_FAIL(); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/agent_storage_range.cpp b/ydb/core/blob_depot/agent/agent_storage_range.cpp new file mode 100644 index 00000000000..d4dec509ee0 --- /dev/null +++ b/ydb/core/blob_depot/agent/agent_storage_range.cpp @@ -0,0 +1,10 @@ +#include "agent_impl.h" + +namespace NKikimr::NBlobDepot { + + template<> + TBlobDepotAgent::TExecutingQuery *TBlobDepotAgent::CreateExecutingQuery<TEvBlobStorage::EvRange>(std::unique_ptr<IEventHandle> ev) { + return (void)ev, nullptr; + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/agent_storage_status.cpp b/ydb/core/blob_depot/agent/agent_storage_status.cpp new file mode 100644 index 00000000000..36dcdf209bf --- /dev/null +++ b/ydb/core/blob_depot/agent/agent_storage_status.cpp @@ -0,0 +1,10 @@ +#include "agent_impl.h" + +namespace NKikimr::NBlobDepot { + + template<> + TBlobDepotAgent::TExecutingQuery *TBlobDepotAgent::CreateExecutingQuery<TEvBlobStorage::EvStatus>(std::unique_ptr<IEventHandle> ev) { + return (void)ev, nullptr; + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/defs.h b/ydb/core/blob_depot/agent/defs.h index c5e120f3006..15052a2ae6c 100644 --- a/ydb/core/blob_depot/agent/defs.h +++ b/ydb/core/blob_depot/agent/defs.h @@ -1,3 +1,9 @@ #pragma once #include <ydb/core/blob_depot/defs.h> + +#include <ydb/core/blob_depot/events.h> +#include <ydb/core/blob_depot/types.h> + +#include <ydb/core/blobstorage/vdisk/common/vdisk_events.h> +#include <ydb/core/util/stlog.h> diff --git a/ydb/core/blob_depot/blob_depot_agent.cpp b/ydb/core/blob_depot/blob_depot_agent.cpp new file mode 100644 index 00000000000..7b4e0c45ec5 --- /dev/null +++ b/ydb/core/blob_depot/blob_depot_agent.cpp @@ -0,0 +1,92 @@ +#include "blob_depot_tablet.h" + +namespace NKikimr::NBlobDepot { + + void TBlobDepot::Handle(TEvTabletPipe::TEvServerConnected::TPtr ev) { + STLOG(PRI_DEBUG, BLOB_DEPOT, BD01, "TEvServerConnected", (TabletId, TabletID()), (Msg, ev->Get()->ToString())); + const auto [it, inserted] = PipeServerToNode.emplace(ev->Get()->ServerId, std::nullopt); + Y_VERIFY(inserted); + } + + void TBlobDepot::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr ev) { + STLOG(PRI_DEBUG, BLOB_DEPOT, BD02, "TEvServerDisconnected", (TabletId, TabletID()), (Msg, ev->Get()->ToString())); + const auto it = PipeServerToNode.find(ev->Get()->ServerId); + Y_VERIFY(it != PipeServerToNode.end()); + if (const auto& nodeId = it->second) { + if (const auto agentIt = Agents.find(*nodeId); agentIt != Agents.end()) { + if (TAgentInfo& agent = agentIt->second; agent.ConnectedAgent == it->first) { + OnAgentDisconnect(agent); + agent.ConnectedAgent.reset(); + agent.ConnectedNodeId = 0; + agent.ExpirationTimestamp = TActivationContext::Now() + ExpirationTimeout; + } + } + } + PipeServerToNode.erase(it); + } + + void TBlobDepot::OnAgentDisconnect(TAgentInfo& agent) { + BlocksManager.OnAgentDisconnect(agent); + } + + void TBlobDepot::Handle(TEvBlobDepot::TEvRegisterAgent::TPtr ev) { + const auto it = PipeServerToNode.find(ev->Recipient); + Y_VERIFY(it != PipeServerToNode.end()); + const ui32 nodeId = ev->Sender.NodeId(); + Y_VERIFY(!it->second || *it->second == nodeId); + it->second = nodeId; + auto& agent = Agents[nodeId]; + STLOG(PRI_DEBUG, BLOB_DEPOT, BD03, "TEvRegisterAgent", (TabletId, TabletID()), (Msg, ev->Get()->Record), + (NodeId, nodeId), (PipeServerId, it->first)); + agent.ConnectedAgent = it->first; + agent.ConnectedNodeId = nodeId; + agent.ExpirationTimestamp = TInstant::Max(); + OnAgentConnect(agent); + + auto response = std::make_unique<TEvBlobDepot::TEvRegisterAgentResult>(); + auto& record = response->Record; + record.SetGeneration(Executor()->Generation()); + for (const auto& channel : Info()->Channels) { + Y_VERIFY(channel.Channel == record.ChannelGroupsSize()); + record.AddChannelGroups(channel.History ? channel.History.back().GroupID : 0); + } + + SendResponseToAgent(*ev, std::move(response)); + } + + void TBlobDepot::OnAgentConnect(TAgentInfo& agent) { + BlocksManager.OnAgentConnect(agent); + } + + void TBlobDepot::Handle(TEvBlobDepot::TEvAllocateIds::TPtr ev) { + STLOG(PRI_DEBUG, BLOB_DEPOT, BD04, "TEvAllocateIds", (TabletId, TabletID()), (Msg, ev->Get()->Record), + (PipeServerId, ev->Recipient)); + auto response = std::make_unique<TEvBlobDepot::TEvAllocateIdsResult>(); + auto& record = response->Record; + const ui32 generation = Executor()->Generation(); + record.SetGeneration(generation); + record.SetRangeBegin(NextBlobSeqId); + NextBlobSeqId += PreallocatedIdCount; + record.SetRangeEnd(NextBlobSeqId); + SendResponseToAgent(*ev, std::move(response)); + } + + TBlobDepot::TAgentInfo& TBlobDepot::GetAgent(const TActorId& pipeServerId) { + const auto it = PipeServerToNode.find(pipeServerId); + Y_VERIFY(it != PipeServerToNode.end()); + Y_VERIFY(it->second); + const auto agentIt = Agents.find(*it->second); + Y_VERIFY(agentIt != Agents.end()); + Y_VERIFY(agentIt->second.ConnectedAgent == pipeServerId); + return agentIt->second; + } + + void TBlobDepot::SendResponseToAgent(IEventHandle& request, std::unique_ptr<IEventBase> response) { + auto handle = std::make_unique<IEventHandle>(request.Sender, SelfId(), response.release(), 0, request.Cookie); + if (request.InterconnectSession) { + handle->Rewrite(TEvInterconnect::EvForward, request.InterconnectSession); + } + TActivationContext::Send(handle.release()); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h index f2b50523849..014ad61d42f 100644 --- a/ydb/core/blob_depot/blob_depot_tablet.h +++ b/ydb/core/blob_depot/blob_depot_tablet.h @@ -2,6 +2,7 @@ #include "defs.h" #include "events.h" +#include "types.h" namespace NKikimr::NBlobDepot { @@ -11,32 +12,51 @@ namespace NKikimr::NBlobDepot { : public TActor<TBlobDepot> , public TTabletExecutedFlat { + struct TEvPrivate { + enum { + EvCheckExpiredAgents = EventSpaceBegin(TEvents::ES_PRIVATE), + }; + }; + public: TBlobDepot(TActorId tablet, TTabletStorageInfo *info) : TActor(&TThis::StateInit) , TTabletExecutedFlat(info, tablet, new NMiniKQL::TMiniKQLFactory) + , BlocksManager(this) {} - void Handle(TEvBlobDepot::TEvApplyConfig::TPtr ev) { - auto response = std::make_unique<TEvBlobDepot::TEvApplyConfigResult>(TabletID(), ev->Get()->Record.GetTxId()); - Send(ev->Sender, response.release(), 0, ev->Cookie); - } - - void Handle(TEvTabletPipe::TEvServerConnected::TPtr ev) { - Y_UNUSED(ev); - } - - void Handle(TEvTabletPipe::TEvServerDisconnected::TPtr ev) { - Y_UNUSED(ev); - } - void HandlePoison() { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT02, "HandlePoison", (TabletId, TabletID())); Become(&TThis::StateZombie); Send(Tablet(), new TEvents::TEvPoison); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + static constexpr TDuration ExpirationTimeout = TDuration::Minutes(1); + static constexpr ui32 PreallocatedIdCount = 100; + + struct TAgentInfo { + std::optional<TActorId> ConnectedAgent; + ui32 ConnectedNodeId; + TInstant ExpirationTimestamp; + }; + + THashMap<TActorId, std::optional<ui32>> PipeServerToNode; + THashMap<ui32, TAgentInfo> Agents; // NodeId -> Agent + + ui64 NextBlobSeqId = 0; + + void Handle(TEvTabletPipe::TEvServerConnected::TPtr ev); + void Handle(TEvTabletPipe::TEvServerDisconnected::TPtr ev); + void OnAgentDisconnect(TAgentInfo& agent); + void Handle(TEvBlobDepot::TEvRegisterAgent::TPtr ev); + void OnAgentConnect(TAgentInfo& agent); + void Handle(TEvBlobDepot::TEvAllocateIds::TPtr ev); + TAgentInfo& GetAgent(const TActorId& pipeServerId); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + std::deque<std::unique_ptr<IEventHandle>> InitialEventsQ; void Enqueue(TAutoPtr<IEventHandle>& ev, const TActorContext&) override { @@ -44,21 +64,32 @@ namespace NKikimr::NBlobDepot { } void OnActivateExecutor(const TActorContext&) override { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT03, "OnActivateExecutor", (TabletId, TabletID())); + + ExecuteTxInitSchema(); + Become(&TThis::StateWork); for (auto&& ev : std::exchange(InitialEventsQ, {})) { TActivationContext::Send(ev.release()); } + + NextBlobSeqId = TCGSI{BaseDataChannel, Executor()->Generation(), 1, 0}.ToBinary(Info()->Channels.size()); } void OnDetach(const TActorContext&) override { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT04, "OnDetach", (TabletId, TabletID())); + // TODO: what does this callback mean PassAway(); } void OnTabletDead(TEvTablet::TEvTabletDead::TPtr& /*ev*/, const TActorContext&) override { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT05, "OnTabletDead", (TabletId, TabletID())); PassAway(); } + void SendResponseToAgent(IEventHandle& request, std::unique_ptr<IEventBase> response); + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// STFUNC(StateInit) { @@ -78,6 +109,13 @@ namespace NKikimr::NBlobDepot { cFunc(TEvents::TSystem::Poison, HandlePoison); hFunc(TEvBlobDepot::TEvApplyConfig, Handle); + hFunc(TEvBlobDepot::TEvRegisterAgent, Handle); + hFunc(TEvBlobDepot::TEvAllocateIds, Handle); + hFunc(TEvBlobDepot::TEvCommitBlobSeq, Handle); + hFunc(TEvBlobDepot::TEvResolve, Handle); + + hFunc(TEvBlobDepot::TEvBlock, BlocksManager.Handle); + hFunc(TEvBlobDepot::TEvQueryBlocks, BlocksManager.Handle); hFunc(TEvTabletPipe::TEvServerConnected, Handle); hFunc(TEvTabletPipe::TEvServerDisconnected, Handle); @@ -95,6 +133,54 @@ namespace NKikimr::NBlobDepot { bool ReassignChannelsEnabled() const override { return true; } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + void Execute(std::unique_ptr<NTabletFlatExecutor::TTransactionBase<TBlobDepot>> tx) { + Executor()->Execute(tx.release(), TActivationContext::AsActorContext()); + } + + void ExecuteTxInitSchema(); + void ExecuteTxLoad(); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Configuration + + NKikimrBlobDepot::TBlobDepotConfig Config; + + void Handle(TEvBlobDepot::TEvApplyConfig::TPtr ev); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Blocks + + class TBlocksManager { + class TImpl; + std::unique_ptr<TImpl> Impl; + + public: + TBlocksManager(TBlobDepot *self); + ~TBlocksManager(); + void AddBlockOnLoad(ui64 tabletId, ui32 blockedGeneration); + void OnAgentConnect(TAgentInfo& agent); + void OnAgentDisconnect(TAgentInfo& agent); + + void Handle(TEvBlobDepot::TEvBlock::TPtr ev); + void Handle(TEvBlobDepot::TEvQueryBlocks::TPtr ev); + }; + + TBlocksManager BlocksManager; + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Key operation + + struct TKeyValue { + }; + + std::map<TString, TKeyValue> Data; + + void Handle(TEvBlobDepot::TEvCommitBlobSeq::TPtr ev); + + void Handle(TEvBlobDepot::TEvResolve::TPtr ev); }; } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/blocks.cpp b/ydb/core/blob_depot/blocks.cpp new file mode 100644 index 00000000000..1550255eb14 --- /dev/null +++ b/ydb/core/blob_depot/blocks.cpp @@ -0,0 +1,139 @@ +#include "blob_depot_tablet.h" +#include "schema.h" + +namespace NKikimr::NBlobDepot { + + class TBlobDepot::TBlocksManager::TImpl { + TBlobDepot *Self; + THashMap<ui64, ui32> Blocks; + + private: + class TTxUpdateBlock : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { + const ui64 TabletId; + const ui32 BlockedGeneration; + const ui32 NodeId; + const TInstant Timestamp; + std::unique_ptr<IEventHandle> Response; + bool RaceDetected; + + public: + TTxUpdateBlock(TBlobDepot *self, ui64 tabletId, ui32 blockedGeneration, ui32 nodeId, TInstant timestamp, + std::unique_ptr<IEventHandle> response) + : TTransactionBase(self) + , TabletId(tabletId) + , BlockedGeneration(blockedGeneration) + , NodeId(nodeId) + , Timestamp(timestamp) + , Response(std::move(response)) + {} + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + TImpl& impl = *Self->BlocksManager.Impl; + const auto [it, inserted] = impl.Blocks.emplace(TabletId, BlockedGeneration); + RaceDetected = !inserted && BlockedGeneration <= it->second; + if (RaceDetected) { + Response->Get<TEvBlobDepot::TEvBlockResult>()->Record.SetStatus(NKikimrProto::RACE); + } else { + NIceDb::TNiceDb db(txc.DB); + db.Table<Schema::Blocks>().Key(TabletId).Update( + NIceDb::TUpdate<Schema::Blocks::BlockedGeneration>(BlockedGeneration), + NIceDb::TUpdate<Schema::Blocks::IssuedByNode>(NodeId), + NIceDb::TUpdate<Schema::Blocks::IssueTimestamp>(Timestamp) + ); + } + return true; + } + + void Complete(const TActorContext&) override { + if (RaceDetected) { + TActivationContext::Send(Response.release()); + } else { + Self->BlocksManager.Impl->OnBlockCommitted(TabletId, BlockedGeneration, std::move(Response)); + } + } + }; + + public: + TImpl(TBlobDepot *self) + : Self(self) + {} + + void AddBlockOnLoad(ui64 tabletId, ui32 generation) { + Blocks.emplace(tabletId, generation); + } + + void OnBlockCommitted(ui64 tabletId, ui32 blockedGeneration, std::unique_ptr<IEventHandle> response) { + (void)tabletId, (void)blockedGeneration, (void)response; + } + + void OnAgentConnect(TAgentInfo& agent) { + (void)agent; + } + + void OnAgentDisconnect(TAgentInfo& agent) { + (void)agent; + } + + void Handle(TEvBlobDepot::TEvBlock::TPtr ev) { + auto event = std::make_unique<TEvBlobDepot::TEvBlockResult>(NKikimrProto::OK, std::nullopt); + auto& responseRecord = event->Record; + auto response = std::make_unique<IEventHandle>(ev->Sender, Self->SelfId(), event.release(), 0, ev->Cookie); + if (ev->InterconnectSession) { + response->Rewrite(TEvInterconnect::EvForward, ev->InterconnectSession); + } + + const auto& record = ev->Get()->Record; + if (!record.HasTabletId() || !record.HasBlockedGeneration()) { + responseRecord.SetStatus(NKikimrProto::ERROR); + responseRecord.SetErrorReason("incorrect protobuf"); + } else { + const ui64 tabletId = record.GetTabletId(); + const ui32 blockedGeneration = record.GetBlockedGeneration(); + if (const auto it = Blocks.find(tabletId); it != Blocks.end() && blockedGeneration <= it->second) { + responseRecord.SetStatus(NKikimrProto::ERROR); + } else { + TAgentInfo& agent = Self->GetAgent(ev->Recipient); + Self->Execute(std::make_unique<TTxUpdateBlock>(Self, tabletId, blockedGeneration, + agent.ConnectedNodeId, TActivationContext::Now(), std::move(response))); + } + } + + TActivationContext::Send(response.release()); // not sent if the request got processed and response now is nullptr + } + + void Handle(TEvBlobDepot::TEvQueryBlocks::TPtr ev) { + (void)ev; + } + }; + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // TBlocksManager wrapper + + TBlobDepot::TBlocksManager::TBlocksManager(TBlobDepot *self) + : Impl(std::make_unique<TImpl>(self)) + {} + + TBlobDepot::TBlocksManager::~TBlocksManager() + {} + + void TBlobDepot::TBlocksManager::AddBlockOnLoad(ui64 tabletId, ui32 generation) { + Impl->AddBlockOnLoad(tabletId, generation); + } + + void TBlobDepot::TBlocksManager::OnAgentConnect(TAgentInfo& agent) { + Impl->OnAgentConnect(agent); + } + + void TBlobDepot::TBlocksManager::OnAgentDisconnect(TAgentInfo& agent) { + Impl->OnAgentDisconnect(agent); + } + + void TBlobDepot::TBlocksManager::Handle(TEvBlobDepot::TEvBlock::TPtr ev) { + return Impl->Handle(ev); + } + + void TBlobDepot::TBlocksManager::Handle(TEvBlobDepot::TEvQueryBlocks::TPtr ev) { + return Impl->Handle(ev); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/defs.h b/ydb/core/blob_depot/defs.h index 51a5a82fad6..9d531864287 100644 --- a/ydb/core/blob_depot/defs.h +++ b/ydb/core/blob_depot/defs.h @@ -1,5 +1,10 @@ #pragma once #include <ydb/core/base/tablet_pipe.h> +#include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.h> #include <ydb/core/engine/minikql/flat_local_tx_factory.h> #include <ydb/core/tablet_flat/tablet_flat_executed.h> +#include <ydb/core/tablet_flat/flat_cxx_database.h> +#include <ydb/core/util/stlog.h> + +#include <util/generic/va_args.h> diff --git a/ydb/core/blob_depot/events.h b/ydb/core/blob_depot/events.h index 312e2fc5d54..7c01329c502 100644 --- a/ydb/core/blob_depot/events.h +++ b/ydb/core/blob_depot/events.h @@ -10,25 +10,59 @@ namespace NKikimr { enum { EvApplyConfig = EventSpaceBegin(TKikimrEvents::ES_BLOB_DEPOT), EvApplyConfigResult, + EvRegisterAgent, + EvRegisterAgentResult, + EvAllocateIds, + EvAllocateIdsResult, + EvBlock, + EvBlockResult, + EvPushNotify, + EvQueryBlocks, + EvQueryBlocksResult, + EvCommitBlobSeq, + EvCommitBlobSeqResult, + EvResolve, + EvResolveResult, }; - struct TEvApplyConfig : TEventPB<TEvApplyConfig, NKikimrBlobDepot::TEvApplyConfig, TEvBlobDepot::EvApplyConfig> { - TEvApplyConfig() = default; +#define BLOBDEPOT_PARAM_ARG(ARG) std::optional<std::decay_t<decltype(Record.Get##ARG())>> param##ARG, - TEvApplyConfig(ui64 txId) { - Record.SetTxId(txId); - } - }; +#define BLOBDEPOT_SETTER(ARG) \ + if (param##ARG) { \ + Record.Set##ARG(*param##ARG); \ + } - struct TEvApplyConfigResult : TEventPB<TEvApplyConfigResult, NKikimrBlobDepot::TEvApplyConfigResult, TEvBlobDepot::EvApplyConfigResult> { - TEvApplyConfigResult() = default; +#define BLOBDEPOT_EVENT_PB_NO_ARGS(NAME) \ + struct T##NAME : TEventPB<T##NAME, NKikimrBlobDepot::T##NAME, NAME> { \ + T##NAME() = default; \ + } - TEvApplyConfigResult(ui64 tabletId, ui64 txId) { - Record.SetTabletId(tabletId); - Record.SetTxId(txId); - } - }; +#define BLOBDEPOT_EVENT_PB(NAME, ...) \ + struct T##NAME : TEventPB<T##NAME, NKikimrBlobDepot::T##NAME, NAME> { \ + T##NAME() = default; \ + \ + struct TArgListTerminator {}; \ + \ + T##NAME(Y_MAP_ARGS(BLOBDEPOT_PARAM_ARG, __VA_ARGS__) TArgListTerminator = {}) { \ + Y_MAP_ARGS(BLOBDEPOT_SETTER, __VA_ARGS__) \ + } \ + } + BLOBDEPOT_EVENT_PB(EvApplyConfig, TxId); + BLOBDEPOT_EVENT_PB(EvApplyConfigResult, TabletId, TxId); + BLOBDEPOT_EVENT_PB(EvRegisterAgent, VirtualGroupId); + BLOBDEPOT_EVENT_PB_NO_ARGS(EvRegisterAgentResult); + BLOBDEPOT_EVENT_PB_NO_ARGS(EvAllocateIds); + BLOBDEPOT_EVENT_PB_NO_ARGS(EvAllocateIdsResult); + BLOBDEPOT_EVENT_PB(EvBlock, TabletId, BlockedGeneration); + BLOBDEPOT_EVENT_PB(EvBlockResult, Status, ErrorReason); + BLOBDEPOT_EVENT_PB_NO_ARGS(EvPushNotify); + BLOBDEPOT_EVENT_PB_NO_ARGS(EvQueryBlocks); + BLOBDEPOT_EVENT_PB_NO_ARGS(EvQueryBlocksResult); + BLOBDEPOT_EVENT_PB_NO_ARGS(EvCommitBlobSeq); + BLOBDEPOT_EVENT_PB_NO_ARGS(EvCommitBlobSeqResult); + BLOBDEPOT_EVENT_PB_NO_ARGS(EvResolve); + BLOBDEPOT_EVENT_PB(EvResolveResult, Status, ErrorReason); }; } // NKikimr diff --git a/ydb/core/blob_depot/op_apply_config.cpp b/ydb/core/blob_depot/op_apply_config.cpp new file mode 100644 index 00000000000..57b77e0183d --- /dev/null +++ b/ydb/core/blob_depot/op_apply_config.cpp @@ -0,0 +1,48 @@ +#include "blob_depot_tablet.h" +#include "schema.h" + +namespace NKikimr::NBlobDepot { + + void TBlobDepot::Handle(TEvBlobDepot::TEvApplyConfig::TPtr ev) { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT01, "TEvApplyConfig", (TabletId, TabletID()), (Msg, ev->Get()->Record)); + + class TTxApplyConfig : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { + std::unique_ptr<IEventHandle> Response; + const TActorId InterconnectSession; + TString ConfigProtobuf; + + public: + TTxApplyConfig(TBlobDepot *self, TEvBlobDepot::TEvApplyConfig& ev, std::unique_ptr<IEventHandle> response, + TActorId interconnectSession) + : TTransactionBase(self) + , Response(std::move(response)) + , InterconnectSession(interconnectSession) + { + const bool success = ev.Record.SerializeToString(&ConfigProtobuf); + Y_VERIFY(success); + } + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + NIceDb::TNiceDb db(txc.DB); + db.Table<Schema::Config>().Key(Schema::Config::Key::Value).Update( + NIceDb::TUpdate<Schema::Config::ConfigProtobuf>(ConfigProtobuf) + ); + return true; + } + + void Complete(const TActorContext&) override { + const bool success = Self->Config.ParseFromString(ConfigProtobuf); + Y_VERIFY(success); + if (InterconnectSession) { + Response->Rewrite(TEvInterconnect::EvForward, InterconnectSession); + } + TActivationContext::Send(Response.release()); + } + }; + + auto responseEvent = std::make_unique<TEvBlobDepot::TEvApplyConfigResult>(TabletID(), ev->Get()->Record.GetTxId()); + auto response = std::make_unique<IEventHandle>(ev->Sender, SelfId(), responseEvent.release(), 0, ev->Cookie); + Execute(std::make_unique<TTxApplyConfig>(this, *ev->Get(), std::move(response), ev->InterconnectSession)); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/op_commit_blob_seq.cpp b/ydb/core/blob_depot/op_commit_blob_seq.cpp new file mode 100644 index 00000000000..4f5d628ac74 --- /dev/null +++ b/ydb/core/blob_depot/op_commit_blob_seq.cpp @@ -0,0 +1,9 @@ +#include "blob_depot_tablet.h" + +namespace NKikimr::NBlobDepot { + + void TBlobDepot::Handle(TEvBlobDepot::TEvCommitBlobSeq::TPtr ev) { + (void)ev; + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/op_init_schema.cpp b/ydb/core/blob_depot/op_init_schema.cpp new file mode 100644 index 00000000000..6e2ba92d639 --- /dev/null +++ b/ydb/core/blob_depot/op_init_schema.cpp @@ -0,0 +1,27 @@ +#include "blob_depot_tablet.h" +#include "schema.h" + +namespace NKikimr::NBlobDepot { + + void TBlobDepot::ExecuteTxInitSchema() { + class TTxInitSchema : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { + public: + TTxInitSchema(TBlobDepot *self) + : TTransactionBase(self) + {} + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + NIceDb::TNiceDb db(txc.DB); + db.Materialize<Schema>(); + return true; + } + + void Complete(const TActorContext&) override { + Self->ExecuteTxLoad(); + } + }; + + Execute(std::make_unique<TTxInitSchema>(this)); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/op_load.cpp b/ydb/core/blob_depot/op_load.cpp new file mode 100644 index 00000000000..240bc5ad6d2 --- /dev/null +++ b/ydb/core/blob_depot/op_load.cpp @@ -0,0 +1,66 @@ +#include "blob_depot_tablet.h" +#include "schema.h" + +namespace NKikimr::NBlobDepot { + + void TBlobDepot::ExecuteTxLoad() { + class TTxLoad : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { + public: + TTxLoad(TBlobDepot *self) + : TTransactionBase(self) + {} + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + NIceDb::TNiceDb db(txc.DB); + + if (!Precharge(db)) { + return false; + } + + // Config + { + auto table = db.Table<Schema::Config>().Key(Schema::Config::Key::Value).Select(); + if (!table.IsReady()) { + return false; + } else if (table.IsValid()) { + if (table.HaveValue<Schema::Config::ConfigProtobuf>()) { + const bool success = Self->Config.ParseFromString(table.GetValue<Schema::Config::ConfigProtobuf>()); + Y_VERIFY(success); + } + } + } + + // Blocks + { + auto table = db.Table<Schema::Blocks>().Select(); + if (!table.IsReady()) { + return false; + } + while (table.IsValid()) { + Self->BlocksManager.AddBlockOnLoad( + table.GetValue<Schema::Blocks::TabletId>(), + table.GetValue<Schema::Blocks::BlockedGeneration>() + ); + if (!table.Next()) { + return false; + } + } + } + + return true; + } + + bool Precharge(NIceDb::TNiceDb& db) { + auto config = db.Table<Schema::Config>().Select(); + auto blocks = db.Table<Schema::Blocks>().Select(); + return config.IsReady() && blocks.IsReady(); + } + + void Complete(const TActorContext&) override { + } + }; + + Execute(std::make_unique<TTxLoad>(this)); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/op_resolve.cpp b/ydb/core/blob_depot/op_resolve.cpp new file mode 100644 index 00000000000..39b36197480 --- /dev/null +++ b/ydb/core/blob_depot/op_resolve.cpp @@ -0,0 +1,73 @@ +#include "blob_depot_tablet.h" + +namespace NKikimr::NBlobDepot { + + void TBlobDepot::Handle(TEvBlobDepot::TEvResolve::TPtr ev) { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDR01, "TEvResolve", (TabletId, TabletID()), (Msg, ev->Get()->ToString()), + (Sender, ev->Sender), (Recipient, ev->Recipient), (Cookie, ev->Cookie)); + + // collect records if needed + + auto response = std::make_unique<TEvBlobDepot::TEvResolveResult>(NKikimrProto::OK, std::nullopt); + ui32 messageSize = response->CalculateSerializedSize(); + auto sendMessage = [&](bool more) { + if (more) { + response->Record.SetStatus(NKikimrProto::OVERRUN); + } + + STLOG(PRI_DEBUG, BLOB_DEPOT, BDR02, "sending TEvResolveResult", (TabletId, TabletID()), (Msg, response->Record)); + + auto handle = std::make_unique<IEventHandle>(ev->Sender, SelfId(), response.release(), 0, ev->Cookie); + if (ev->InterconnectSession) { + handle->Rewrite(TEvInterconnect::EvForward, ev->InterconnectSession); + } + TActivationContext::Send(handle.release()); + + if (more) { + response = std::make_unique<TEvBlobDepot::TEvResolveResult>(NKikimrProto::OK, std::nullopt); + messageSize = response->CalculateSerializedSize(); + } + }; + + ui32 itemIndex = 0; + for (const auto& item : ev->Get()->Record.GetItems()) { + auto beginIt = !item.HasBeginningKey() ? Data.begin() + : item.GetIncludeBeginning() ? Data.lower_bound(item.GetBeginningKey()) + : Data.upper_bound(item.GetBeginningKey()); + + auto endIt = !item.HasEndingKey() ? Data.end() + : item.GetIncludeEnding() ? Data.upper_bound(item.GetEndingKey()) + : Data.lower_bound(item.GetEndingKey()); + + ui32 numItems = 0; + auto addKey = [&](auto it) { + NKikimrBlobDepot::TEvResolveResult::TResolvedKey resolvedKey; + resolvedKey.SetItemIndex(itemIndex); + resolvedKey.SetKey(it->first); + + const ui32 keySize = resolvedKey.ByteSizeLong(); + if (messageSize + keySize > EventMaxByteSize) { + sendMessage(true); + } + + // put resolved key into the result + resolvedKey.Swap(response->Record.AddResolvedKeys()); + messageSize += keySize; + ++numItems; + + return !item.HasMaxKeys() || numItems != item.GetMaxKeys(); + }; + + if (item.GetReverse()) { + while (beginIt != endIt && addKey(--endIt)) {} + } else { + while (beginIt != endIt && addKey(beginIt++)) {} + } + + ++itemIndex; + } + + sendMessage(false); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/schema.h b/ydb/core/blob_depot/schema.h new file mode 100644 index 00000000000..ae96c5422fa --- /dev/null +++ b/ydb/core/blob_depot/schema.h @@ -0,0 +1,92 @@ +#pragma once + +#include "defs.h" +#include "types.h" + +namespace NKikimr::NBlobDepot { + + struct Schema : NIceDb::Schema { + struct Config : Table<1> { + struct Key : Column<1, NScheme::NTypeIds::Uint32> { static constexpr Type Value = 0; }; + struct ConfigProtobuf : Column<2, NScheme::NTypeIds::String> {}; + + using TKey = TableKey<Key>; + using TColumns = TableColumns< + Key, + ConfigProtobuf + >; + }; + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // BlobStorage-related parts + + struct Blocks : Table<2> { + struct TabletId : Column<1, NScheme::NTypeIds::Uint64> {}; + struct BlockedGeneration : Column<2, NScheme::NTypeIds::Uint32> {}; + struct IssueTimestamp : Column<3, NScheme::NTypeIds::Uint64> { using Type = TInstant; }; + struct IssuedByNode : Column<4, NScheme::NTypeIds::Uint32> {}; + + using TKey = TableKey<TabletId>; + using TColumns = TableColumns< + TabletId, + BlockedGeneration, + IssueTimestamp, + IssuedByNode + >; + }; + + struct Barriers : Table<3> { + struct TabletId : Column<1, NScheme::NTypeIds::Uint64> {}; + struct Channel : Column<2, NScheme::NTypeIds::Uint8> {}; + struct SoftGenStep : Column<3, NScheme::NTypeIds::Uint64> {}; + struct HardGenStep : Column<4, NScheme::NTypeIds::Uint64> {}; + + using TKey = TableKey<TabletId, Channel>; + using TColumns = TableColumns< + TabletId, + Channel, + SoftGenStep, + HardGenStep + >; + }; + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Common parts + + struct Data : Table<4> { + struct Key : Column<1, NScheme::NTypeIds::String> {}; + struct Meta : Column<2, NScheme::NTypeIds::String> {}; + struct Id : Column<3, NScheme::NTypeIds::Uint64> {}; + struct CGSI : Column<4, NScheme::NTypeIds::String> {}; + struct Checksum : Column<5, NScheme::NTypeIds::Uint32> {}; + struct TotalDataLen : Column<6, NScheme::NTypeIds::Uint64> {}; + struct KeepState : Column<7, NScheme::NTypeIds::Uint8> { using Type = EKeepState; }; + struct Public : Column<8, NScheme::NTypeIds::Bool> {}; + + using TKey = TableKey<Key>; + using TColumns = TableColumns< + Key, + Meta, + Id, + CGSI, + Checksum, + TotalDataLen, + KeepState, + Public + >; + }; + + using TTables = SchemaTables< + Config, + Blocks, + Barriers, + Data + >; + + using TSettings = SchemaSettings< + ExecutorLogBatching<true>, + ExecutorLogFlushPeriod<TDuration::MicroSeconds(512).GetValue()> + >; + }; + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/types.h b/ydb/core/blob_depot/types.h index 9dd722b6bda..141696bd4f8 100644 --- a/ydb/core/blob_depot/types.h +++ b/ydb/core/blob_depot/types.h @@ -4,11 +4,40 @@ namespace NKikimr::NBlobDepot { + static constexpr ui32 BaseDataChannel = 2; + struct TCGSI { + static constexpr ui32 IndexBits = 20; + static constexpr ui32 MaxIndex = (1 << IndexBits) - 1; + ui32 Channel; ui32 Generation; ui32 Step; ui32 Index; + + ui64 ToBinary(ui32 numChannels) const { + Y_VERIFY_DEBUG(numChannels > BaseDataChannel); + Y_VERIFY_DEBUG(Index <= MaxIndex); + return (static_cast<ui64>(Step) << IndexBits | Index) * (numChannels - BaseDataChannel) + (Channel - BaseDataChannel); + } + + static TCGSI FromBinary(ui32 generation, ui32 numChannels, ui64 value) { + static_assert(sizeof(long long) >= sizeof(ui64)); + auto res = std::lldiv(value, numChannels - BaseDataChannel); + + return TCGSI{ + .Channel = static_cast<ui32>(res.rem + BaseDataChannel), + .Generation = generation, + .Step = static_cast<ui32>(res.quot >> IndexBits), + .Index = static_cast<ui32>(res.quot) & MaxIndex + }; + } + }; + + enum class EKeepState : ui8 { + Default, + Keep, + DoNotKeep }; } // NKikimr::NBlobDepot diff --git a/ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.cpp b/ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.cpp index f0d696e82dd..0b5d0dee043 100644 --- a/ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.cpp +++ b/ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.cpp @@ -657,6 +657,9 @@ TIntrusivePtr<TBlobStorageGroupInfo> TBlobStorageGroupInfo::Parse(const NKikimrB } auto res = MakeIntrusive<TBlobStorageGroupInfo>(std::move(topology), std::move(dyn), group.GetStoragePoolName(), acceptedScope, commonDeviceType); + if (group.HasBlobDepotId()) { + res->BlobDepotId = group.GetBlobDepotId(); + } // process encryption parameters res->EncryptionMode = static_cast<EEncryptionMode>(group.GetEncryptionMode()); diff --git a/ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.h b/ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.h index a1390f0495b..acbb52778e7 100644 --- a/ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.h +++ b/ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.h @@ -413,6 +413,8 @@ public: const ui32 GroupGeneration; // erasure primarily const TBlobStorageGroupType Type; + // virtual group BlobDepot tablet id + std::optional<ui64> BlobDepotId; // origin of the group info content std::optional<NKikimrBlobStorage::TGroupInfo> Group; diff --git a/ydb/core/blobstorage/nodewarden/CMakeLists.txt b/ydb/core/blobstorage/nodewarden/CMakeLists.txt index fe97c15e817..33c640008da 100644 --- a/ydb/core/blobstorage/nodewarden/CMakeLists.txt +++ b/ydb/core/blobstorage/nodewarden/CMakeLists.txt @@ -13,6 +13,7 @@ target_link_libraries(core-blobstorage-nodewarden PUBLIC yutil library-cpp-json ydb-core-base + core-blob_depot-agent core-blobstorage-groupinfo core-blobstorage-pdisk ydb-core-control diff --git a/ydb/core/blobstorage/nodewarden/defs.h b/ydb/core/blobstorage/nodewarden/defs.h index a3a658d27f5..4977537493e 100644 --- a/ydb/core/blobstorage/nodewarden/defs.h +++ b/ydb/core/blobstorage/nodewarden/defs.h @@ -15,6 +15,7 @@ #include <ydb/core/base/appdata.h> #include <ydb/core/base/tablet_pipe.h> #include <ydb/core/base/tablet_resolver.h> +#include <ydb/core/blob_depot/agent/agent.h> #include <ydb/core/node_whiteboard/node_whiteboard.h> #include <ydb/core/blobstorage/base/blobstorage_events.h> #include <ydb/core/blobstorage/vdisk/vdisk_actor.h> diff --git a/ydb/core/blobstorage/nodewarden/node_warden_group.cpp b/ydb/core/blobstorage/nodewarden/node_warden_group.cpp index 215cf321d91..32ce91af38f 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_group.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_group.cpp @@ -207,7 +207,7 @@ namespace NKikimr::NStorage { } if (const auto& info = group.Info) { - Send(WhiteboardId, new NNodeWhiteboard::TEvWhiteboard::TEvBSGroupStateUpdate(info, info->GetStoragePoolName())); + Send(WhiteboardId, new NNodeWhiteboard::TEvWhiteboard::TEvBSGroupStateUpdate(info)); for (auto& vdisk : group.VDisksOfGroup) { UpdateGroupInfoForDisk(vdisk, info); } diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.h b/ydb/core/blobstorage/nodewarden/node_warden_impl.h index be67e5ef670..9b72c8dfdc1 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_impl.h +++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.h @@ -112,6 +112,9 @@ namespace NKikimr::NStorage { TReplQuoter::TPtr ReplNodeResponseQuoter; public: + struct TGroupRecord; + + public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::NODE_WARDEN; } @@ -152,6 +155,7 @@ namespace NKikimr::NStorage { void StartInvalidGroupProxy(); void StopInvalidGroupProxy(); void StartLocalProxy(ui32 groupId); + void StartVirtualGroupAgent(ui32 groupId); void StartStaticProxies(); TVector<NPDisk::TDriveData> ListLocalDrives(); diff --git a/ydb/core/blobstorage/nodewarden/node_warden_proxy.cpp b/ydb/core/blobstorage/nodewarden/node_warden_proxy.cpp index 6000a0cb52a..faea46d7334 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_proxy.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_proxy.cpp @@ -9,9 +9,6 @@ TActorId TNodeWarden::StartEjectedProxy(ui32 groupId) { } void TNodeWarden::StartLocalProxy(ui32 groupId) { - auto& group = Groups[groupId]; - Y_VERIFY(!group.ProxyRunning); - group.ProxyRunning = true; STLOG(PRI_DEBUG, BS_NODE, NW12, "StartLocalProxy", (GroupId, groupId)); std::unique_ptr<IActor> proxy; @@ -34,6 +31,19 @@ void TNodeWarden::StartLocalProxy(ui32 groupId) { AppData()->SystemPoolId)); } +void TNodeWarden::StartVirtualGroupAgent(ui32 groupId) { + STLOG(PRI_DEBUG, BS_NODE, NW40, "StartVirtualGroupProxy", (GroupId, groupId)); + + TActorSystem *as = TActivationContext::ActorSystem(); + const TActorId actorId = as->Register(NBlobDepot::CreateBlobDepotAgent(groupId), TMailboxType::ReadAsFilled, + AppData()->SystemPoolId); + if (auto info = NeedGroupInfo(groupId)) { + auto counters = DsProxyPerPoolCounters->GetPoolCounters(info->GetStoragePoolName(), info->GetDeviceType()); + Send(actorId, new TEvBlobStorage::TEvConfigureProxy(std::move(info), std::move(counters))); + } + as->RegisterLocalService(MakeBlobStorageProxyID(groupId), actorId); +} + void TNodeWarden::StartStaticProxies() { for (const auto& group : Cfg->ServiceSet.GetGroups()) { StartLocalProxy(group.GetGroupID()); @@ -55,7 +65,12 @@ void TNodeWarden::HandleForwarded(TAutoPtr<::NActors::IEventHandle> &ev) { TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, errorProxy, {}, nullptr, 0)); return; } else if (TGroupRecord& group = Groups[id]; !group.ProxyRunning) { - StartLocalProxy(id); + group.ProxyRunning = true; + if (TGroupID(id).ConfigurationType() == EGroupConfigurationType::Virtual) { + StartVirtualGroupAgent(id); + } else { + StartLocalProxy(id); + } } TActivationContext::Send(ev->Forward(ev->GetForwardOnNondeliveryRecipient())); diff --git a/ydb/core/blobstorage/ut_blobstorage/blob_depot.cpp b/ydb/core/blobstorage/ut_blobstorage/blob_depot.cpp new file mode 100644 index 00000000000..c0e03555c96 --- /dev/null +++ b/ydb/core/blobstorage/ut_blobstorage/blob_depot.cpp @@ -0,0 +1,37 @@ +#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h> + +Y_UNIT_TEST_SUITE(BlobDepot) { + + Y_UNIT_TEST(Basic) { + TEnvironmentSetup env{{ + .SetupTablets = true + }}; + + env.CreateBoxAndPool(1, 1); + env.Sim(TDuration::Seconds(20)); + + NKikimrBlobStorage::TConfigRequest request; + auto *cmd = request.AddCommand()->MutableAllocateVirtualGroup(); + cmd->SetVirtualGroupPool("vg"); + cmd->SetStoragePoolName(env.StoragePoolName); + cmd->SetParentDir("/Root"); + auto *prof = cmd->AddChannelProfiles(); + prof->SetStoragePoolKind(env.StoragePoolName); + prof->SetCount(3); + + auto response = env.Invoke(request); + UNIT_ASSERT_C(response.GetSuccess(), response.GetErrorDescription()); + + ui32 groupId = response.GetStatus(0).GetGroupId(0); + Cerr << "groupId# " << groupId << Endl; + const TActorId& proxy = MakeBlobStorageProxyID(groupId); + + auto sender = env.Runtime->AllocateEdgeActor(1); + TString data = "hello!"; + TLogoBlobID id(1, 1, 1, 0, data.size(), 0); + env.Runtime->Send(new IEventHandle(proxy, sender, new TEvBlobStorage::TEvPut(id, data, TInstant::Max())), 1); + auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(sender); + Cerr << res->Get()->ToString() << Endl; + } + +} diff --git a/ydb/core/blobstorage/ut_blobstorage/lib/CMakeLists.txt b/ydb/core/blobstorage/ut_blobstorage/lib/CMakeLists.txt index e137266212c..8d4b52db8ea 100644 --- a/ydb/core/blobstorage/ut_blobstorage/lib/CMakeLists.txt +++ b/ydb/core/blobstorage/ut_blobstorage/lib/CMakeLists.txt @@ -14,13 +14,19 @@ target_link_libraries(blobstorage-ut_blobstorage-lib PUBLIC cpp-digest-md5 cpp-testing-unittest ydb-core-base + ydb-core-blob_depot core-blobstorage-backpressure blobstorage-dsproxy-mock core-blobstorage-nodewarden blobstorage-pdisk-mock blobstorage-vdisk-common + ydb-core-mind core-mind-bscontroller + core-mind-hive core-tx-scheme_board + core-tx-tx_allocator + core-tx-mediator + core-tx-coordinator ydb-core-util udf-service-stub yql-sql-pg_dummy diff --git a/ydb/core/blobstorage/ut_blobstorage/lib/defs.h b/ydb/core/blobstorage/ut_blobstorage/lib/defs.h index 1385b8fca36..c6835f67504 100644 --- a/ydb/core/blobstorage/ut_blobstorage/lib/defs.h +++ b/ydb/core/blobstorage/ut_blobstorage/lib/defs.h @@ -1,5 +1,7 @@ #pragma once +#include <ydb/core/base/hive.h> +#include <ydb/core/blob_depot/blob_depot.h> #include <ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.h> #include <ydb/core/blobstorage/dsproxy/mock/model.h> #include <ydb/core/blobstorage/pdisk/mock/pdisk_mock.h> @@ -10,6 +12,11 @@ #include <ydb/core/mind/bscontroller/bsc.h> #include <ydb/core/mind/bscontroller/types.h> #include <ydb/core/mind/dynamic_nameserver.h> +#include <ydb/core/mind/local.h> +#include <ydb/core/tx/coordinator/coordinator.h> +#include <ydb/core/tx/tx_allocator/txallocator.h> +#include <ydb/core/tx/mediator/mediator.h> +#include <ydb/core/tx/scheme_board/cache.h> #include <ydb/core/util/testactorsys.h> #include <library/cpp/testing/unittest/registar.h> #include <util/system/rusage.h> diff --git a/ydb/core/blobstorage/ut_blobstorage/lib/env.h b/ydb/core/blobstorage/ut_blobstorage/lib/env.h index 3aef0e6fcd3..a68b4757db3 100644 --- a/ydb/core/blobstorage/ut_blobstorage/lib/env.h +++ b/ydb/core/blobstorage/ut_blobstorage/lib/env.h @@ -31,6 +31,7 @@ struct TEnvironmentSetup { const bool Cache = false; const ui32 NumDataCenters = 0; const std::function<TNodeLocation(ui32)> LocationGenerator; + const bool SetupTablets = false; }; const TSettings Settings; @@ -108,7 +109,11 @@ struct TEnvironmentSetup { SetupLogging(); Runtime->Start(); auto *appData = Runtime->GetAppData(); - appData->DomainsInfo->AddDomain(TDomainsInfo::TDomain::ConstructEmptyDomain("dom", DomainId).Release()); + if (Settings.SetupTablets) { + SetupDomainForTablets(); + } else { + appData->DomainsInfo->AddDomain(TDomainsInfo::TDomain::ConstructEmptyDomain("dom", DomainId).Release()); + } if (Settings.LocationGenerator) { Runtime->SetupTabletRuntime(Settings.LocationGenerator, Settings.ControllerNodeId); } else { @@ -117,6 +122,9 @@ struct TEnvironmentSetup { SetupStaticStorage(); SetupTablet(); SetupStorage(); + if (Settings.SetupTablets) { + InitRoot(); + } } void StopNode(ui32 nodeId) { @@ -225,6 +233,8 @@ struct TEnvironmentSetup { // NKikimrServices::BS_PROXY_INDEXRESTOREGET, // NKikimrServices::BS_PROXY_STATUS, NActorsServices::TEST, + NKikimrServices::BLOB_DEPOT, + NKikimrServices::BLOB_DEPOT_AGENT, // NActorsServices::INTERCONNECT, // NActorsServices::INTERCONNECT_SESSION, }; @@ -274,14 +284,103 @@ struct TEnvironmentSetup { } } + void SetupDomainForTablets() { + TAppData *appData = Runtime->GetAppData(); + + const ui32 hiveUid = 1; + + appData->DomainsInfo->AddDomain(TDomainsInfo::TDomain::ConstructDomain<std::vector<ui32>, std::vector<ui64>>( + "Root", + DomainId, + 72075186232723360, + 1, + 1, + {1}, + hiveUid, + {hiveUid}, + 100, + {1, 2, 3}, + {1, 2, 3}, + {1, 2, 3} + ).Release()); + + appData->DomainsInfo->AddHive(hiveUid, MakeDefaultHiveID(1)); + } + void SetupTablet() { - Runtime->CreateTestBootstrapper( - TTestActorSystem::CreateTestTabletInfo(TabletId, TTabletTypes::BSController, Settings.Erasure.GetErasure(), GroupId), - &CreateFlatBsController, - Settings.ControllerNodeId); + struct TTabletInfo { + ui64 TabletId; + TTabletTypes::EType Type; + IActor* (*Create)(const TActorId&, TTabletStorageInfo*); + }; + std::vector<TTabletInfo> tablets{ + {MakeBSControllerID(DomainId), TTabletTypes::BSController, &CreateFlatBsController}, + }; + + auto *appData = Runtime->GetAppData(); + + for (const auto& [uid, tabletId] : appData->DomainsInfo->HivesByHiveUid) { + tablets.push_back(TTabletInfo{tabletId, TTabletTypes::Hive, &CreateDefaultHive}); + } + + const TDomainsInfo::TDomain& domain = appData->DomainsInfo->GetDomain(DomainId); + if (domain.SchemeRoot) { + tablets.push_back(TTabletInfo{domain.SchemeRoot, TTabletTypes::SchemeShard, &CreateFlatTxSchemeShard}); + } + for (const ui64 tabletId : domain.Coordinators) { + tablets.push_back(TTabletInfo{tabletId, TTabletTypes::Coordinator, &CreateFlatTxCoordinator}); + } + for (const ui64 tabletId : domain.Mediators) { + tablets.push_back(TTabletInfo{tabletId, TTabletTypes::Mediator, &CreateTxMediator}); + } + TVector<ui64> allocators; + for (const ui64 tabletId : domain.TxAllocators) { + tablets.push_back(TTabletInfo{tabletId, TTabletTypes::TxAllocator, &CreateTxAllocator}); + allocators.push_back(tabletId); + } + + for (const TTabletInfo& tablet : tablets) { + Runtime->CreateTestBootstrapper( + TTestActorSystem::CreateTestTabletInfo(tablet.TabletId, tablet.Type, Settings.Erasure.GetErasure(), GroupId), + tablet.Create, Settings.ControllerNodeId); + + bool working = true; + Runtime->Sim([&] { return working; }, [&](IEventHandle& event) { working = event.GetTypeRewrite() != TEvTablet::EvBoot; }); + } + + auto localConfig = MakeIntrusive<TLocalConfig>(); + localConfig->TabletClassInfo[TTabletTypes::BlobDepot] = TLocalConfig::TTabletClassInfo(new TTabletSetupInfo( + &NBlobDepot::CreateBlobDepot, TMailboxType::ReadAsFilled, appData->SystemPoolId, TMailboxType::ReadAsFilled, + appData->SystemPoolId)); - bool working = true; - Runtime->Sim([&] { return working; }, [&](IEventHandle& event) { working = event.GetTypeRewrite() != TEvTablet::EvBoot; }); + if (Settings.SetupTablets) { + for (ui32 nodeId : Runtime->GetNodes()) { + Runtime->RegisterService(MakeTxProxyID(), Runtime->Register(CreateTxProxy(allocators), nodeId)); + Runtime->RegisterService(MakeLocalID(nodeId), Runtime->Register(CreateLocal(localConfig.Get()), nodeId)); + + auto config = MakeIntrusive<NSchemeCache::TSchemeCacheConfig>(); + config->Roots.emplace_back(DomainId, domain.SchemeRoot, domain.Name); + config->Counters = MakeIntrusive<NMonitoring::TDynamicCounters>(); + Runtime->RegisterService(MakeSchemeCacheID(), Runtime->Register(CreateSchemeBoardSchemeCache(config.Get()), nodeId)); + } + } + } + + void InitRoot() { + auto *appData = Runtime->GetAppData(); + auto& domain = appData->DomainsInfo->GetDomain(DomainId); + if (domain.SchemeRoot) { + auto edge = Runtime->AllocateEdgeActor(1); + auto pipeId = Runtime->Register(NTabletPipe::CreateClient(edge, domain.SchemeRoot), 1); + Runtime->WrapInActorContext(edge, [&] { + const TActorContext& ctx = TActivationContext::AsActorContext(); + NTabletPipe::SendData(ctx, pipeId, new NSchemeShard::TEvSchemeShard::TEvInitRootShard(edge, domain.DomainRootTag(), domain.Name)); + NTabletPipe::CloseClient(ctx, pipeId); + }); + auto res = WaitForEdgeActorEvent<NSchemeShard::TEvSchemeShard::TEvInitRootShardResult>(edge); + Cerr << res->Get()->ToString() << Endl; + ::exit(1); + } } void CreateBoxAndPool(ui32 numDrivesPerNode = 0, ui32 numGroups = 0, ui32 numStorageNodes = 0) { @@ -312,6 +411,7 @@ struct TEnvironmentSetup { cmd2->SetBoxId(1); cmd2->SetStoragePoolId(1); cmd2->SetName(StoragePoolName); + cmd2->SetKind(StoragePoolName); cmd2->SetErasureSpecies(TBlobStorageGroupType::ErasureSpeciesName(Settings.Erasure.GetErasure())); cmd2->SetVDiskKind("Default"); cmd2->SetNumGroups(numGroups ? numGroups : NumGroups); diff --git a/ydb/core/blobstorage/ut_blobstorage/ut_blob_depot/CMakeLists.darwin.txt b/ydb/core/blobstorage/ut_blobstorage/ut_blob_depot/CMakeLists.darwin.txt new file mode 100644 index 00000000000..2f4720a4100 --- /dev/null +++ b/ydb/core/blobstorage/ut_blobstorage/ut_blob_depot/CMakeLists.darwin.txt @@ -0,0 +1,43 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-blobstorage-ut_blobstorage-ut_blob_depot) +target_include_directories(ydb-core-blobstorage-ut_blobstorage-ut_blob_depot PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage +) +target_link_libraries(ydb-core-blobstorage-ut_blobstorage-ut_blob_depot PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + blobstorage-ut_blobstorage-lib +) +target_link_options(ydb-core-blobstorage-ut_blobstorage-ut_blob_depot PRIVATE + -Wl,-no_deduplicate + -Wl,-sdk_version,10.15 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(ydb-core-blobstorage-ut_blobstorage-ut_blob_depot PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/blob_depot.cpp +) +add_test( + NAME + ydb-core-blobstorage-ut_blobstorage-ut_blob_depot + COMMAND + ydb-core-blobstorage-ut_blobstorage-ut_blob_depot + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +vcs_info(ydb-core-blobstorage-ut_blobstorage-ut_blob_depot) diff --git a/ydb/core/blobstorage/ut_blobstorage/ut_blob_depot/CMakeLists.linux.txt b/ydb/core/blobstorage/ut_blobstorage/ut_blob_depot/CMakeLists.linux.txt new file mode 100644 index 00000000000..618dea3234f --- /dev/null +++ b/ydb/core/blobstorage/ut_blobstorage/ut_blob_depot/CMakeLists.linux.txt @@ -0,0 +1,46 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-blobstorage-ut_blobstorage-ut_blob_depot) +target_include_directories(ydb-core-blobstorage-ut_blobstorage-ut_blob_depot PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage +) +target_link_libraries(ydb-core-blobstorage-ut_blobstorage-ut_blob_depot PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-lfalloc + library-cpp-cpuid_check + cpp-testing-unittest_main + blobstorage-ut_blobstorage-lib +) +target_link_options(ydb-core-blobstorage-ut_blobstorage-ut_blob_depot PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-core-blobstorage-ut_blobstorage-ut_blob_depot PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/blob_depot.cpp +) +add_test( + NAME + ydb-core-blobstorage-ut_blobstorage-ut_blob_depot + COMMAND + ydb-core-blobstorage-ut_blobstorage-ut_blob_depot + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +vcs_info(ydb-core-blobstorage-ut_blobstorage-ut_blob_depot) diff --git a/ydb/core/blobstorage/ut_blobstorage/ut_blob_depot/CMakeLists.txt b/ydb/core/blobstorage/ut_blobstorage/ut_blob_depot/CMakeLists.txt new file mode 100644 index 00000000000..a681d385f3e --- /dev/null +++ b/ydb/core/blobstorage/ut_blobstorage/ut_blob_depot/CMakeLists.txt @@ -0,0 +1,13 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (APPLE) + include(CMakeLists.darwin.txt) +elseif (UNIX) + include(CMakeLists.linux.txt) +endif() diff --git a/ydb/core/mind/bscontroller/config.cpp b/ydb/core/mind/bscontroller/config.cpp index 2f1978ae380..7c63032a6e2 100644 --- a/ydb/core/mind/bscontroller/config.cpp +++ b/ydb/core/mind/bscontroller/config.cpp @@ -894,9 +894,7 @@ namespace NKikimr::NBsController { const TString& storagePoolName, const TMaybe<TKikimrScopeId>& scopeId) { group->SetGroupID(groupInfo.ID); group->SetGroupGeneration(groupInfo.Generation); - group->SetErasureSpecies(groupInfo.ErasureSpecies); group->SetStoragePoolName(storagePoolName); - group->SetDeviceType(PDiskTypeToPDiskType(groupInfo.GetCommonDeviceType())); group->SetEncryptionMode(groupInfo.EncryptionMode.GetOrElse(0)); group->SetLifeCyclePhase(groupInfo.LifeCyclePhase.GetOrElse(0)); @@ -912,28 +910,38 @@ namespace NKikimr::NBsController { pb->SetX2(x.second); } - std::vector<std::pair<TVDiskID, const TVSlotInfo*>> vdisks; - for (const auto& vslot : groupInfo.VDisksInGroup) { - vdisks.emplace_back(vslot->GetVDiskId(), vslot); - } - auto comp = [](const auto& x, const auto& y) { return x.first < y.first; }; - std::sort(vdisks.begin(), vdisks.end(), comp); + if (!groupInfo.VirtualGroupState) { + group->SetErasureSpecies(groupInfo.ErasureSpecies); + group->SetDeviceType(PDiskTypeToPDiskType(groupInfo.GetCommonDeviceType())); - TVDiskID prevVDiskId; - NKikimrBlobStorage::TGroupInfo::TFailRealm *realm = nullptr; - NKikimrBlobStorage::TGroupInfo::TFailRealm::TFailDomain *domain = nullptr; - for (const auto& [vdiskId, vslot] : vdisks) { - if (!realm || prevVDiskId.FailRealm != vdiskId.FailRealm) { - realm = group->AddRings(); - domain = nullptr; - } - if (!domain || prevVDiskId.FailDomain != vdiskId.FailDomain) { - Y_VERIFY(realm); - domain = realm->AddFailDomains(); + std::vector<std::pair<TVDiskID, const TVSlotInfo*>> vdisks; + for (const auto& vslot : groupInfo.VDisksInGroup) { + vdisks.emplace_back(vslot->GetVDiskId(), vslot); } - prevVDiskId = vdiskId; + auto comp = [](const auto& x, const auto& y) { return x.first < y.first; }; + std::sort(vdisks.begin(), vdisks.end(), comp); + + TVDiskID prevVDiskId; + NKikimrBlobStorage::TGroupInfo::TFailRealm *realm = nullptr; + NKikimrBlobStorage::TGroupInfo::TFailRealm::TFailDomain *domain = nullptr; + for (const auto& [vdiskId, vslot] : vdisks) { + if (!realm || prevVDiskId.FailRealm != vdiskId.FailRealm) { + realm = group->AddRings(); + domain = nullptr; + } + if (!domain || prevVDiskId.FailDomain != vdiskId.FailDomain) { + Y_VERIFY(realm); + domain = realm->AddFailDomains(); + } + prevVDiskId = vdiskId; - Serialize(domain->AddVDiskLocations(), *vslot); + Serialize(domain->AddVDiskLocations(), *vslot); + } + } else if (*groupInfo.VirtualGroupState == NKikimrBlobStorage::EVirtualGroupState::WORKING) { + Y_VERIFY(groupInfo.BlobDepotId); + group->SetBlobDepotId(*groupInfo.BlobDepotId); + } else if (*groupInfo.VirtualGroupState == NKikimrBlobStorage::EVirtualGroupState::CREATE_FAILED) { + group->SetBlobDepotId(0); } } diff --git a/ydb/core/mind/bscontroller/get_group.cpp b/ydb/core/mind/bscontroller/get_group.cpp index 235c3ee8d23..25c41f3b585 100644 --- a/ydb/core/mind/bscontroller/get_group.cpp +++ b/ydb/core/mind/bscontroller/get_group.cpp @@ -26,7 +26,7 @@ public: const TNodeId nodeId = request->Get()->Record.GetNodeID(); auto res = std::make_unique<TEvBlobStorage::TEvControllerNodeServiceSetUpdate>(NKikimrProto::OK, nodeId); - Self->ReadGroups(groupIDsToRead, true, res.get()); + Self->ReadGroups(groupIDsToRead, true, res.get(), nodeId); Response = std::make_unique<IEventHandle>(MakeBlobStorageNodeWardenID(nodeId), Self->SelfId(), res.release()); return true; diff --git a/ydb/core/mind/bscontroller/group_reconfigure_wipe.cpp b/ydb/core/mind/bscontroller/group_reconfigure_wipe.cpp index 7c274224baf..9a7aff6c7fd 100644 --- a/ydb/core/mind/bscontroller/group_reconfigure_wipe.cpp +++ b/ydb/core/mind/bscontroller/group_reconfigure_wipe.cpp @@ -52,7 +52,7 @@ public: Self->ReadVSlot(*info, msg.Get()); TSet<ui32> groupIDsToRead; groupIDsToRead.insert(info->GroupId); - Self->ReadGroups(groupIDsToRead, false, msg.Get()); + Self->ReadGroups(groupIDsToRead, false, msg.Get(), id.NodeId); for (const TGroupId groupId : groupIDsToRead) { STLOG(PRI_ERROR, BS_CONTROLLER, BSCTXGRW05, "No configuration for group", (GroupId, groupId)); } diff --git a/ydb/core/mind/bscontroller/impl.h b/ydb/core/mind/bscontroller/impl.h index e8d55783750..0e9bcac4982 100644 --- a/ydb/core/mind/bscontroller/impl.h +++ b/ydb/core/mind/bscontroller/impl.h @@ -516,6 +516,9 @@ public: TActorId VirtualGroupSetupMachineId; + // nodes waiting for this group to become listable + THashSet<TNodeId> WaitingNodes; + // group's geometry; it doesn't ever change since the group is created const ui32 NumFailRealms = 0; const ui32 NumFailDomainsPerFailRealm = 0; @@ -641,6 +644,12 @@ public: Y_VERIFY(VDisksInGroup.size() == Topology->GetTotalVDisksNum()); } + bool Listable() const { + return !VirtualGroupState + || *VirtualGroupState == NKikimrBlobStorage::EVirtualGroupState::WORKING + || *VirtualGroupState == NKikimrBlobStorage::EVirtualGroupState::CREATE_FAILED; + } + void ClearVDisksInGroup() { std::fill(VDisksInGroup.begin(), VDisksInGroup.end(), nullptr); } @@ -798,6 +807,7 @@ public: TInstant LastDisconnectTimestamp; // in-mem only std::map<TString, NPDisk::TDriveData> KnownDrives; + THashSet<TGroupId> WaitingForGroups; template<typename T> static void Apply(TBlobStorageController* /*controller*/, T&& callback) { @@ -1589,7 +1599,8 @@ private: TDeque<TAutoPtr<IEventHandle>> InitQueue; THashMap<Schema::Group::Owner::Type, Schema::Group::ID::Type> OwnerIdIdxToGroup; - void ReadGroups(TSet<ui32>& groupIDsToRead, bool discard, TEvBlobStorage::TEvControllerNodeServiceSetUpdate *result); + void ReadGroups(TSet<ui32>& groupIDsToRead, bool discard, TEvBlobStorage::TEvControllerNodeServiceSetUpdate *result, + TNodeId nodeId); void ReadPDisk(const TPDiskId& pdiskId, const TPDiskInfo& pdisk, TEvBlobStorage::TEvControllerNodeServiceSetUpdate *result, diff --git a/ydb/core/mind/bscontroller/register_node.cpp b/ydb/core/mind/bscontroller/register_node.cpp index 658e6d92b2e..f1a88a8546e 100644 --- a/ydb/core/mind/bscontroller/register_node.cpp +++ b/ydb/core/mind/bscontroller/register_node.cpp @@ -280,10 +280,10 @@ public: } } - Self->ReadGroups(groupIDsToRead, false, res.get()); + Self->ReadGroups(groupIDsToRead, false, res.get(), nodeId); Y_VERIFY(groupIDsToRead.empty()); - Self->ReadGroups(groupsToDiscard, true, res.get()); + Self->ReadGroups(groupsToDiscard, true, res.get(), nodeId); for (auto it = Self->PDisks.lower_bound(minPDiskId); it != Self->PDisks.end() && it->first.NodeId == nodeId; ++it) { Self->ReadPDisk(it->first, *it->second, res.get(), NKikimrBlobStorage::INITIAL); @@ -331,16 +331,17 @@ public: }; void TBlobStorageController::ReadGroups(TSet<ui32>& groupIDsToRead, bool discard, - TEvBlobStorage::TEvControllerNodeServiceSetUpdate *result) { + TEvBlobStorage::TEvControllerNodeServiceSetUpdate *result, TNodeId nodeId) { for (auto it = groupIDsToRead.begin(); it != groupIDsToRead.end(); ) { const TGroupId groupId = *it; - if (TGroupInfo *group = FindGroup(groupId); group || discard) { + TGroupInfo *group = FindGroup(groupId); + if (group || discard) { NKikimrBlobStorage::TNodeWardenServiceSet *serviceSetProto = result->Record.MutableServiceSet(); NKikimrBlobStorage::TGroupInfo *groupProto = serviceSetProto->AddGroups(); if (!group) { groupProto->SetGroupID(groupId); groupProto->SetEntityStatus(NKikimrBlobStorage::DESTROY); - } else { + } else if (group->Listable()) { const TStoragePoolInfo& info = StoragePools.at(group->StoragePoolId); TMaybe<TKikimrScopeId> scopeId; @@ -351,6 +352,10 @@ void TBlobStorageController::ReadGroups(TSet<ui32>& groupIDsToRead, bool discard } SerializeGroupInfo(groupProto, *group, info.Name, scopeId); + } else { + // group is not listable, so we have to postpone the request from NW + group->WaitingNodes.insert(nodeId); + GetNode(nodeId).WaitingForGroups.insert(group->ID); } // this group is processed, remove it from the set @@ -467,6 +472,12 @@ void TBlobStorageController::OnWardenDisconnected(TNodeId nodeId) { return; // there are still some connections from this NW } + for (const TGroupId groupId : std::exchange(node.WaitingForGroups, {})) { + if (TGroupInfo *group = FindGroup(groupId)) { + group->WaitingNodes.erase(nodeId); + } + } + const TInstant now = TActivationContext::Now(); std::vector<std::pair<TVSlotId, TInstant>> lastSeenReadyQ; for (auto it = PDisks.lower_bound(TPDiskId::MinForNode(nodeId)); it != PDisks.end() && it->first.NodeId == nodeId; ++it) { diff --git a/ydb/core/mind/bscontroller/select_groups.h b/ydb/core/mind/bscontroller/select_groups.h index 6a2988949c8..3533a972179 100644 --- a/ydb/core/mind/bscontroller/select_groups.h +++ b/ydb/core/mind/bscontroller/select_groups.h @@ -23,7 +23,7 @@ namespace NKikimr { for (TGroupId groupId : iter->second) { const TGroupInfo *group = controller.FindGroup(groupId); Y_VERIFY_DEBUG(group); - if (group) { + if (group && group->Listable()) { groups.push_back(group); } } @@ -50,7 +50,7 @@ namespace NKikimr { for (auto it = storagePoolGroups.lower_bound(id); it != storagePoolGroups.end() && it->first == id; ++it) { const TGroupInfo *groupInfo = findGroupCallback(it->second); Y_VERIFY_DEBUG(groupInfo); - if (groupInfo) { + if (groupInfo && groupInfo->Listable()) { groups.push_back(groupInfo); } } diff --git a/ydb/core/mind/bscontroller/virtual_group.cpp b/ydb/core/mind/bscontroller/virtual_group.cpp index 7937fe57933..24e041ea9eb 100644 --- a/ydb/core/mind/bscontroller/virtual_group.cpp +++ b/ydb/core/mind/bscontroller/virtual_group.cpp @@ -168,7 +168,8 @@ namespace NKikimr::NBsController { case NKikimrBlobStorage::EVirtualGroupState::CREATE_FAILED: case NKikimrBlobStorage::EVirtualGroupState::WORKING: - GetGroup()->VirtualGroupSetupMachineId = {}; + IssueNodeNotifications(group); + group->VirtualGroupSetupMachineId = {}; PassAway(); break; @@ -291,6 +292,20 @@ namespace NKikimr::NBsController { //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + void IssueNodeNotifications(TGroupInfo *group) { + for (const TNodeId nodeId : std::exchange(group->WaitingNodes, {})) { + TNodeInfo& node = Self->GetNode(nodeId); + node.WaitingForGroups.erase(group->ID); + auto ev = std::make_unique<TEvBlobStorage::TEvControllerNodeServiceSetUpdate>(NKikimrProto::OK, nodeId); + TSet<ui32> groups; + groups.insert(group->ID); + Self->ReadGroups(groups, false, ev.get(), nodeId); + Send(MakeBlobStorageNodeWardenID(nodeId), ev.release()); + } + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + void PassAway() override { NTabletPipe::CloseAndForgetClient(SelfId(), SchemeshardPipeId); TActorBootstrapped::PassAway(); diff --git a/ydb/core/node_whiteboard/node_whiteboard.h b/ydb/core/node_whiteboard/node_whiteboard.h index 9c7a1cabb02..8769068f371 100644 --- a/ydb/core/node_whiteboard/node_whiteboard.h +++ b/ydb/core/node_whiteboard/node_whiteboard.h @@ -236,18 +236,14 @@ struct TEvWhiteboard{ struct TEvBSGroupStateUpdate : TEventPB<TEvBSGroupStateUpdate, NKikimrWhiteboard::TBSGroupStateInfo, EvBSGroupStateUpdate> { TEvBSGroupStateUpdate() = default; - TEvBSGroupStateUpdate(const TIntrusivePtr<TBlobStorageGroupInfo>& groupInfo, const TMaybe<TString>& storagePoolName) { + TEvBSGroupStateUpdate(const TIntrusivePtr<TBlobStorageGroupInfo>& groupInfo) { Record.SetGroupID(groupInfo->GroupID); Record.SetGroupGeneration(groupInfo->GroupGeneration); Record.SetErasureSpecies(groupInfo->Type.ErasureSpeciesName(groupInfo->Type.GetErasure())); - for (auto it = groupInfo->VDisksBegin(), end = groupInfo->VDisksEnd(); it != end; ++it) { - auto vd = groupInfo->GetVDiskId(it->OrderNumber); - NKikimrBlobStorage::TVDiskID* addedVDisk = Record.AddVDiskIds(); - VDiskIDFromVDiskID(vd, addedVDisk); - } - if (storagePoolName) { - Record.SetStoragePoolName(*storagePoolName); + for (ui32 i = 0; i < groupInfo->GetTotalVDisksNum(); ++i) { + VDiskIDFromVDiskID(groupInfo->GetVDiskId(i), Record.AddVDiskIds()); } + Record.SetStoragePoolName(groupInfo->GetStoragePoolName()); } }; diff --git a/ydb/core/protos/blob_depot.proto b/ydb/core/protos/blob_depot.proto index d1707a43f1f..a1cca5e67b0 100644 --- a/ydb/core/protos/blob_depot.proto +++ b/ydb/core/protos/blob_depot.proto @@ -1,7 +1,28 @@ import "ydb/core/protos/blob_depot_config.proto"; +import "ydb/core/protos/base.proto"; package NKikimrBlobDepot; +message TBlobSeqId { + optional uint32 Channel = 1; + optional uint32 Generation = 2; + optional uint32 Step = 3; + optional uint32 Index = 4; +} + +message TBlobLocator { + optional uint32 GroupId = 1; + optional TBlobSeqId BlobSeqId = 2; + optional uint32 Checksum = 3; + optional uint64 TotalDataLen = 4; +} + +message TValueChain { + optional TBlobLocator Locator = 1; + optional uint64 SubrangeBegin = 2; + optional uint64 SubrangeEnd = 3; +} + message TEvApplyConfig { optional uint64 TxId = 1; optional NKikimrBlobDepot.TBlobDepotConfig Config = 2; @@ -11,3 +32,94 @@ message TEvApplyConfigResult { optional uint64 TabletId = 1; optional uint64 TxId = 2; } + +message TEvRegisterAgent { + optional uint32 VirtualGroupId = 1; // for validation purposes +} + +message TEvRegisterAgentResult { + repeated uint32 ChannelGroups = 1; + optional uint32 Generation = 2; +} + +message TEvAllocateIds { +} + +message TEvAllocateIdsResult { + optional uint32 Generation = 1; // executor generation, for validation purposes + optional uint64 RangeBegin = 2; // <Generation> <Step> <Channel> + optional uint64 RangeEnd = 3; +} + +message TEvBlock { + optional fixed64 TabletId = 1; + optional uint32 BlockedGeneration = 2; +} + +message TEvBlockResult { + optional NKikimrProto.EReplyStatus Status = 1; + optional string ErrorReason = 2; + optional uint32 TimeToLiveMs = 3; +} + +message TEvPushNotify { // BlobDepot -> Agent push notification (to take some action) + repeated fixed64 UpdateBlocksForTabletIds = 1; // notify about some changed blocks +} + +message TEvQueryBlocks { + repeated fixed64 TabletIds = 1; +} + +message TEvQueryBlocksResult { + repeated uint32 BlockedGenerations = 1; + optional uint32 TimeToLiveMs = 2; // TTL starting since sending TEvQueryBlocks at agent +} + +message TEvCommitBlobSeq { + message TItem { + optional TBlobLocator BlobLocator = 1; // GroupId and Generation are for validation purposes + } + + repeated TItem Items = 1; +} + +message TEvCommitBlobSeqResult { + message TItem { + optional NKikimrProto.EReplyStatus Status = 1; + optional string ErrorReason = 2; + } + + repeated TItem Items = 1; +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// TEvResolve -- lookup key in the Data table (and apply noncommitted changes if necessary) and return value chain for +// each of the found entries. + +message TEvResolve { + message TItem { + optional bytes BeginningKey = 1; // start from the very first key (if not set) + optional bool IncludeBeginning = 2 [default = true]; + optional bytes EndingKey = 3; // end with the key beyond the last one (if not set) + optional bool IncludeEnding = 4 [default = false]; + optional uint32 MaxKeys = 5 [default = 0]; + optional bool ReturnMeta = 6 [default = false]; + optional bool ReturnOwners = 7 [default = false]; + optional bool Reverse = 8 [default = false]; // reverse output + } + + repeated TItem Items = 1; +} + +message TEvResolveResult { + message TResolvedKey { + optional uint32 ItemIndex = 1; + optional bytes Key = 2; + repeated TValueChain ValueChain = 3; + optional bytes Meta = 4; + repeated uint64 Owners = 5; + } + optional NKikimrProto.EReplyStatus Status = 1; // OVERRUN means there are more messages on the way + optional string ErrorReason = 2; + repeated TResolvedKey ResolvedKeys = 3; +} diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto index ab2765b94ab..f59945ce01d 100644 --- a/ydb/core/protos/services.proto +++ b/ydb/core/protos/services.proto @@ -316,6 +316,7 @@ enum EServiceKikimr { // Blob depot BLOB_DEPOT = 1300; + BLOB_DEPOT_AGENT = 1301; }; message TActivity { |