summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexander Rutkovsky <[email protected]>2022-06-10 05:06:12 +0300
committerAlexander Rutkovsky <[email protected]>2022-06-10 05:06:12 +0300
commit90319fb7ae588774acc4249c0ebf210a801e880f (patch)
tree8fad057213849d6a7be34f54308faefd9397f5c2
parentb62983df1309f8f2418ce4dfd289fe28cc3f3103 (diff)
Support BlobDepot agent config propagation KIKIMR-14867
ref:f247b4ec35abb72fe96a94b5ec8d8730c316dc2c
-rw-r--r--CMakeLists.darwin.txt3
-rw-r--r--CMakeLists.linux.txt3
-rw-r--r--ydb/core/blob_depot/CMakeLists.txt7
-rw-r--r--ydb/core/blob_depot/agent/CMakeLists.txt10
-rw-r--r--ydb/core/blob_depot/agent/agent.cpp4
-rw-r--r--ydb/core/blob_depot/agent/agent.h2
-rw-r--r--ydb/core/blob_depot/agent/agent_comm.cpp172
-rw-r--r--ydb/core/blob_depot/agent/agent_impl.h211
-rw-r--r--ydb/core/blob_depot/agent/agent_storage_block.cpp55
-rw-r--r--ydb/core/blob_depot/agent/agent_storage_collect_garbage.cpp10
-rw-r--r--ydb/core/blob_depot/agent/agent_storage_discover.cpp76
-rw-r--r--ydb/core/blob_depot/agent/agent_storage_get.cpp10
-rw-r--r--ydb/core/blob_depot/agent/agent_storage_patch.cpp10
-rw-r--r--ydb/core/blob_depot/agent/agent_storage_put.cpp46
-rw-r--r--ydb/core/blob_depot/agent/agent_storage_query.cpp67
-rw-r--r--ydb/core/blob_depot/agent/agent_storage_range.cpp10
-rw-r--r--ydb/core/blob_depot/agent/agent_storage_status.cpp10
-rw-r--r--ydb/core/blob_depot/agent/defs.h6
-rw-r--r--ydb/core/blob_depot/blob_depot_agent.cpp92
-rw-r--r--ydb/core/blob_depot/blob_depot_tablet.h112
-rw-r--r--ydb/core/blob_depot/blocks.cpp139
-rw-r--r--ydb/core/blob_depot/defs.h5
-rw-r--r--ydb/core/blob_depot/events.h60
-rw-r--r--ydb/core/blob_depot/op_apply_config.cpp48
-rw-r--r--ydb/core/blob_depot/op_commit_blob_seq.cpp9
-rw-r--r--ydb/core/blob_depot/op_init_schema.cpp27
-rw-r--r--ydb/core/blob_depot/op_load.cpp66
-rw-r--r--ydb/core/blob_depot/op_resolve.cpp73
-rw-r--r--ydb/core/blob_depot/schema.h92
-rw-r--r--ydb/core/blob_depot/types.h29
-rw-r--r--ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.cpp3
-rw-r--r--ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.h2
-rw-r--r--ydb/core/blobstorage/nodewarden/CMakeLists.txt1
-rw-r--r--ydb/core/blobstorage/nodewarden/defs.h1
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_group.cpp2
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_impl.h4
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_proxy.cpp23
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/blob_depot.cpp37
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/lib/CMakeLists.txt6
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/lib/defs.h7
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/lib/env.h114
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/ut_blob_depot/CMakeLists.darwin.txt43
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/ut_blob_depot/CMakeLists.linux.txt46
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/ut_blob_depot/CMakeLists.txt13
-rw-r--r--ydb/core/mind/bscontroller/config.cpp50
-rw-r--r--ydb/core/mind/bscontroller/get_group.cpp2
-rw-r--r--ydb/core/mind/bscontroller/group_reconfigure_wipe.cpp2
-rw-r--r--ydb/core/mind/bscontroller/impl.h13
-rw-r--r--ydb/core/mind/bscontroller/register_node.cpp21
-rw-r--r--ydb/core/mind/bscontroller/select_groups.h4
-rw-r--r--ydb/core/mind/bscontroller/virtual_group.cpp17
-rw-r--r--ydb/core/node_whiteboard/node_whiteboard.h12
-rw-r--r--ydb/core/protos/blob_depot.proto112
-rw-r--r--ydb/core/protos/services.proto1
54 files changed, 1914 insertions, 86 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt
index a803091f3d9..06ce997224a 100644
--- a/CMakeLists.darwin.txt
+++ b/CMakeLists.darwin.txt
@@ -438,6 +438,7 @@ add_subdirectory(ydb/core/blobstorage/dsproxy)
add_subdirectory(ydb/core/blobstorage/storagepoolmon)
add_subdirectory(ydb/core/blobstorage/incrhuge)
add_subdirectory(ydb/core/blobstorage/nodewarden)
+add_subdirectory(ydb/core/blob_depot/agent)
add_subdirectory(ydb/core/blobstorage/pdisk)
add_subdirectory(ydb/library/schlab)
add_subdirectory(ydb/library/schlab/schine)
@@ -938,7 +939,6 @@ add_subdirectory(ydb/core/blobstorage/ut_mirror3of4)
add_subdirectory(ydb/core/blobstorage/ut_vdisk)
add_subdirectory(ydb/core/blobstorage/ut_vdisk/lib)
add_subdirectory(ydb/core/blobstorage/ut_vdisk2)
-add_subdirectory(ydb/core/blob_depot/agent)
add_subdirectory(ydb/core/client/ut)
add_subdirectory(ydb/core/tablet_flat/test/libs/rows)
add_subdirectory(ydb/core/client/minikql_result_lib)
@@ -1014,6 +1014,7 @@ add_subdirectory(ydb/services/ydb/sdk_credprovider_ut)
add_subdirectory(ydb/services/ydb/ut)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_extension)
add_subdirectory(ydb/services/yq/ut_integration)
+add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_blob_depot)
add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_group_reconfiguration)
add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_osiris)
add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_replication)
diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt
index 6c5ab225d99..ca1e352eea0 100644
--- a/CMakeLists.linux.txt
+++ b/CMakeLists.linux.txt
@@ -518,6 +518,7 @@ add_subdirectory(ydb/core/blobstorage/dsproxy)
add_subdirectory(ydb/core/blobstorage/storagepoolmon)
add_subdirectory(ydb/core/blobstorage/incrhuge)
add_subdirectory(ydb/core/blobstorage/nodewarden)
+add_subdirectory(ydb/core/blob_depot/agent)
add_subdirectory(ydb/core/blobstorage/pdisk)
add_subdirectory(ydb/library/schlab)
add_subdirectory(ydb/library/schlab/schine)
@@ -1034,7 +1035,6 @@ add_subdirectory(ydb/core/blobstorage/ut_mirror3of4)
add_subdirectory(ydb/core/blobstorage/ut_vdisk)
add_subdirectory(ydb/core/blobstorage/ut_vdisk/lib)
add_subdirectory(ydb/core/blobstorage/ut_vdisk2)
-add_subdirectory(ydb/core/blob_depot/agent)
add_subdirectory(ydb/core/client/ut)
add_subdirectory(ydb/core/tablet_flat/test/libs/rows)
add_subdirectory(ydb/core/client/minikql_result_lib)
@@ -1109,6 +1109,7 @@ add_subdirectory(ydb/services/ydb/sdk_credprovider_ut)
add_subdirectory(ydb/services/ydb/ut)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_extension)
add_subdirectory(ydb/services/yq/ut_integration)
+add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_blob_depot)
add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_group_reconfiguration)
add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_osiris)
add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_replication)
diff --git a/ydb/core/blob_depot/CMakeLists.txt b/ydb/core/blob_depot/CMakeLists.txt
index 46eed711fc2..e0edc850baf 100644
--- a/ydb/core/blob_depot/CMakeLists.txt
+++ b/ydb/core/blob_depot/CMakeLists.txt
@@ -16,4 +16,11 @@ target_link_libraries(ydb-core-blob_depot PUBLIC
)
target_sources(ydb-core-blob_depot PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blob_depot.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blob_depot_agent.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blocks.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/op_apply_config.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/op_init_schema.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/op_load.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/op_resolve.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/op_commit_blob_seq.cpp
)
diff --git a/ydb/core/blob_depot/agent/CMakeLists.txt b/ydb/core/blob_depot/agent/CMakeLists.txt
index 41a1f8fc18f..76f8566f83f 100644
--- a/ydb/core/blob_depot/agent/CMakeLists.txt
+++ b/ydb/core/blob_depot/agent/CMakeLists.txt
@@ -16,4 +16,14 @@ target_link_libraries(core-blob_depot-agent PUBLIC
)
target_sources(core-blob_depot-agent PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_comm.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_storage_query.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_storage_put.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_storage_get.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_storage_block.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_storage_discover.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_storage_range.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_storage_collect_garbage.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_storage_status.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent_storage_patch.cpp
)
diff --git a/ydb/core/blob_depot/agent/agent.cpp b/ydb/core/blob_depot/agent/agent.cpp
index 73c9848652c..0fc3ea0fa4b 100644
--- a/ydb/core/blob_depot/agent/agent.cpp
+++ b/ydb/core/blob_depot/agent/agent.cpp
@@ -3,8 +3,8 @@
namespace NKikimr::NBlobDepot {
- IActor *CreateBlobDepotAgent(ui32 virtualGroupId, ui64 tabletId) {
- return new TBlobDepotAgent(virtualGroupId, tabletId);
+ IActor *CreateBlobDepotAgent(ui32 virtualGroupId) {
+ return new TBlobDepotAgent(virtualGroupId);
}
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/agent.h b/ydb/core/blob_depot/agent/agent.h
index e848f1969c1..4d66f9967b4 100644
--- a/ydb/core/blob_depot/agent/agent.h
+++ b/ydb/core/blob_depot/agent/agent.h
@@ -4,6 +4,6 @@
namespace NKikimr::NBlobDepot {
- IActor *CreateBlobDepotAgent(ui32 virtualGroupId, ui64 tabletId);
+ IActor *CreateBlobDepotAgent(ui32 virtualGroupId);
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/agent_comm.cpp b/ydb/core/blob_depot/agent/agent_comm.cpp
new file mode 100644
index 00000000000..87cd4f95cb4
--- /dev/null
+++ b/ydb/core/blob_depot/agent/agent_comm.cpp
@@ -0,0 +1,172 @@
+#include "agent_impl.h"
+
+namespace NKikimr::NBlobDepot {
+
+ void TBlobDepotAgent::Handle(TEvTabletPipe::TEvClientConnected::TPtr ev) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDAC01, "TEvClientConnected", (VirtualGroupId, VirtualGroupId),
+ (Msg, ev->Get()->ToString()));
+ }
+
+ void TBlobDepotAgent::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev) {
+ STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC02, "TEvClientDestroyed", (VirtualGroupId, VirtualGroupId),
+ (Msg, ev->Get()->ToString()));
+ PipeId = {};
+ OnDisconnect();
+ ConnectToBlobDepot();
+ }
+
+ void TBlobDepotAgent::ConnectToBlobDepot() {
+ PipeId = Register(NTabletPipe::CreateClient(SelfId(), TabletId, NTabletPipe::TClientRetryPolicy::WithRetries()));
+ const ui64 id = NextRequestId++;
+ NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobDepot::TEvRegisterAgent(VirtualGroupId), id);
+ RegisterRequest(id, nullptr, [this](IEventBase *ev) {
+ if (ev) {
+ auto& msg = *static_cast<TEvBlobDepot::TEvRegisterAgentResult*>(ev);
+ STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC06, "TEvRegisterAgentResult", (VirtualGroupId, VirtualGroupId),
+ (Msg, msg.Record));
+ Registered = true;
+ BlobDepotGeneration = msg.Record.GetGeneration();
+ auto& channelGroups = msg.Record.GetChannelGroups();
+ BlobDepotChannelGroups = {channelGroups.begin(), channelGroups.end()};
+ }
+ return true;
+ });
+ IssueAllocateIdsIfNeeded();
+ }
+
+ void TBlobDepotAgent::IssueAllocateIdsIfNeeded() {
+ STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC09, "IssueAllocateIdsIfNeeded", (VirtualGroupId, VirtualGroupId),
+ (IdAllocInFlight, IdAllocInFlight), (IdQ.size, IdQ.size()), (PreallocatedIdCount, PreallocatedIdCount),
+ (PipeId, PipeId));
+ if (!IdAllocInFlight && IdQ.size() < PreallocatedIdCount && PipeId) {
+ const ui64 id = NextRequestId++;
+ NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobDepot::TEvAllocateIds, id);
+ IdAllocInFlight = true;
+
+ RegisterRequest(id, nullptr, [this](IEventBase *ev) {
+ Y_VERIFY(IdAllocInFlight);
+ IdAllocInFlight = false;
+
+ if (ev) {
+ auto& msg = *static_cast<TEvBlobDepot::TEvAllocateIdsResult*>(ev);
+ STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC07, "TEvAllocateIdsResult", (VirtualGroupId, VirtualGroupId),
+ (Msg, msg.Record));
+ Y_VERIFY(msg.Record.GetGeneration() == BlobDepotGeneration);
+ IdQ.push_back(TAllocatedId{BlobDepotGeneration, msg.Record.GetRangeBegin(), msg.Record.GetRangeEnd()});
+
+ // FIXME notify waiting requests about new ids
+
+ // ask for more ids if needed
+ IssueAllocateIdsIfNeeded();
+ }
+
+ return true; // request complete, remove from queue
+ });
+ }
+ }
+
+ void TBlobDepotAgent::OnDisconnect() {
+ STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC04, "OnDisconnect", (VirtualGroupId, VirtualGroupId));
+
+ for (auto& [id, item] : std::exchange(RequestInFlight, {})) {
+ if (item.Sender) {
+ item.Sender->OnRequestComplete(id);
+ }
+ const bool done = item.Callback(nullptr);
+ Y_VERIFY(done);
+ }
+
+ Registered = false;
+ }
+
+ void TBlobDepotAgent::RegisterRequest(ui64 id, TRequestSender *sender, TRequestCompleteCallback callback) {
+ const auto [_, inserted] = RequestInFlight.emplace(id, TRequestInFlight{sender, std::move(callback)});
+ Y_VERIFY(inserted);
+ if (sender) {
+ sender->RegisterRequest(id);
+ }
+ }
+
+ template<typename TEvent>
+ void TBlobDepotAgent::HandleTabletResponse(TAutoPtr<TEventHandle<TEvent>> ev) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDAC05, "HandleTabletResponse", (VirtualGroupId, VirtualGroupId),
+ (Id, ev->Cookie), (Type, TypeName<TEvent>()));
+ if (const auto it = RequestInFlight.find(ev->Cookie); it != RequestInFlight.end()) {
+ auto& [id, item] = *it;
+ if (item.Sender) {
+ item.Sender->OnRequestComplete(id);
+ }
+ if (item.Callback(ev->Get())) {
+ RequestInFlight.erase(it);
+ }
+ } else {
+ Y_FAIL(); // don't know how this can happen without logic error
+ }
+ }
+
+ template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvRegisterAgentResult::TPtr ev);
+ template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvAllocateIdsResult::TPtr ev);
+ template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvBlockResult::TPtr ev);
+ template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvQueryBlocksResult::TPtr ev);
+ template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvCommitBlobSeqResult::TPtr ev);
+ template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvResolveResult::TPtr ev);
+
+ void TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvBlock msg, TRequestSender *sender, TRequestCompleteCallback callback) {
+ auto ev = std::make_unique<TEvBlobDepot::TEvBlock>();
+ msg.Swap(&ev->Record);
+ Issue(std::move(ev), sender, std::move(callback));
+ }
+
+ void TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvResolve msg, TRequestSender *sender, TRequestCompleteCallback callback) {
+ auto ev = std::make_unique<TEvBlobDepot::TEvResolve>();
+ msg.Swap(&ev->Record);
+ Issue(std::move(ev), sender, std::move(callback));
+ }
+
+ void TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvQueryBlocks msg, TRequestSender *sender, TRequestCompleteCallback callback) {
+ auto ev = std::make_unique<TEvBlobDepot::TEvQueryBlocks>();
+ msg.Swap(&ev->Record);
+ Issue(std::move(ev), sender, std::move(callback));
+ }
+
+ void TBlobDepotAgent::Issue(std::unique_ptr<IEventBase> ev, TRequestSender *sender, TRequestCompleteCallback callback) {
+ const ui64 id = NextRequestId++;
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDAC03, "Issue", (VirtualGroupId, VirtualGroupId), (Id, id), (Msg, ev->ToString()));
+ NTabletPipe::SendData(SelfId(), PipeId, ev.release(), id);
+ RegisterRequest(id, sender, std::move(callback));
+ }
+
+ NKikimrProto::EReplyStatus TBlobDepotAgent::CheckBlockForTablet(ui64 tabletId, ui32 generation, TExecutingQuery *query) {
+ auto& block = Blocks[tabletId];
+ const TMonotonic issueTime = TActivationContext::Monotonic();
+ if (generation <= block.BlockedGeneration) {
+ return NKikimrProto::RACE;
+ } else if (issueTime< block.ExpirationTimestamp) {
+ return NKikimrProto::OK;
+ } else if (!block.RefreshInFlight) {
+ NKikimrBlobDepot::TEvQueryBlocks queryBlocks;
+ queryBlocks.AddTabletIds(tabletId);
+ Issue(std::move(queryBlocks), nullptr, [=](IEventBase *ev) {
+ if (ev) {
+ auto& msg = *static_cast<TEvBlobDepot::TEvQueryBlocksResult*>(ev);
+ const ui64 tabletId_ = tabletId;
+ STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC08, "TEvQueryBlocksResult", (VirtualGroupId, VirtualGroupId),
+ (Msg, msg.Record), (TabletId, tabletId_));
+ Y_VERIFY(msg.Record.BlockedGenerationsSize() == 1);
+ auto& block = Blocks[tabletId];
+ Y_VERIFY(block.BlockedGeneration < generation);
+ block.BlockedGeneration = msg.Record.GetBlockedGenerations(1);
+ block.ExpirationTimestamp = issueTime + TDuration::MilliSeconds(msg.Record.GetTimeToLiveMs());
+ for (auto& query : block.PendingBlockChecks) {
+ query.OnUpdateBlock();
+ }
+ }
+ return true;
+ });
+ block.RefreshInFlight = true;
+ block.PendingBlockChecks.PushBack(query);
+ }
+ return NKikimrProto::NOTREADY;
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h
index 8d9d05328ca..e4caa77894b 100644
--- a/ydb/core/blob_depot/agent/agent_impl.h
+++ b/ydb/core/blob_depot/agent/agent_impl.h
@@ -4,14 +4,219 @@
namespace NKikimr::NBlobDepot {
- class TBlobDepotAgent : public TActorBootstrapped<TBlobDepotAgent> {
+#define ENUMERATE_INCOMING_EVENTS(XX) \
+ XX(EvPut) \
+ XX(EvGet) \
+ XX(EvBlock) \
+ XX(EvDiscover) \
+ XX(EvRange) \
+ XX(EvCollectGarbage) \
+ XX(EvStatus) \
+ XX(EvPatch) \
+ // END
+
+ class TBlobDepotAgent : public TActor<TBlobDepotAgent> {
+ const ui32 VirtualGroupId;
+ ui64 TabletId = Max<ui64>();
+ TActorId PipeId;
+
public:
- TBlobDepotAgent(ui32 virtualGroupId, ui64 tabletId);
- void Bootstrap();
+ TBlobDepotAgent(ui32 virtualGroupId)
+ : TActor(&TThis::StateFunc)
+ , VirtualGroupId(virtualGroupId)
+ {
+ Y_VERIFY(TGroupID(VirtualGroupId).ConfigurationType() == EGroupConfigurationType::Virtual);
+ }
+#define FORWARD_STORAGE_PROXY(TYPE) fFunc(TEvBlobStorage::TYPE, HandleStorageProxy);
STRICT_STFUNC(StateFunc,
cFunc(TEvents::TSystem::Poison, PassAway);
+ hFunc(TEvBlobStorage::TEvConfigureProxy, Handle);
+
+ hFunc(TEvTabletPipe::TEvClientConnected, Handle);
+ hFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
+
+ hFunc(TEvBlobDepot::TEvRegisterAgentResult, HandleTabletResponse);
+ hFunc(TEvBlobDepot::TEvAllocateIdsResult, HandleTabletResponse);
+ hFunc(TEvBlobDepot::TEvBlockResult, HandleTabletResponse);
+ hFunc(TEvBlobDepot::TEvQueryBlocksResult, HandleTabletResponse);
+ hFunc(TEvBlobDepot::TEvCommitBlobSeqResult, HandleTabletResponse);
+ hFunc(TEvBlobDepot::TEvResolveResult, HandleTabletResponse);
+
+ ENUMERATE_INCOMING_EVENTS(FORWARD_STORAGE_PROXY)
);
+#undef FORWARD_STORAGE_PROXY
+
+ void PassAway() override {
+ NTabletPipe::CloseAndForgetClient(SelfId(), PipeId);
+ TActor::PassAway();
+ }
+
+ void Handle(TEvBlobStorage::TEvConfigureProxy::TPtr ev) {
+ const auto& info = ev->Get()->Info;
+ Y_VERIFY(info);
+ Y_VERIFY(info->BlobDepotId);
+ TabletId = *info->BlobDepotId;
+ ConnectToBlobDepot();
+
+ for (auto& ev : std::exchange(PendingEventQ, {})) {
+ TActivationContext::Send(ev.release());
+ }
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // BlobDepot communications
+
+ using TRequestCompleteCallback = std::function<bool(IEventBase*)>;
+
+ class TRequestSender {
+ THashSet<ui64> IdsInFlight;
+
+ protected:
+ TBlobDepotAgent& Agent;
+
+ public:
+ TRequestSender(TBlobDepotAgent& agent)
+ : Agent(agent)
+ {}
+
+ ~TRequestSender() {
+ for (const ui64 id : IdsInFlight) {
+ const size_t num = Agent.RequestInFlight.erase(id);
+ Y_VERIFY(num);
+ }
+ }
+
+ void RegisterRequest(ui64 id) {
+ const auto [_, inserted] = IdsInFlight.insert(id);
+ Y_VERIFY(inserted);
+ }
+
+ void OnRequestComplete(ui64 id) {
+ const size_t num = IdsInFlight.erase(id);
+ Y_VERIFY(num);
+ }
+ };
+
+ struct TRequestInFlight {
+ TRequestSender *Sender;
+ TRequestCompleteCallback Callback;
+ };
+
+ ui64 NextRequestId = 1;
+ THashMap<ui64, TRequestInFlight> RequestInFlight;
+
+ void RegisterRequest(ui64 id, TRequestSender *sender, TRequestCompleteCallback callback);
+
+ template<typename TEvent>
+ void HandleTabletResponse(TAutoPtr<TEventHandle<TEvent>> ev);
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ bool Registered = false;
+ ui32 BlobDepotGeneration = 0;
+ std::vector<ui32> BlobDepotChannelGroups;
+
+ void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev);
+ void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev);
+ void ConnectToBlobDepot();
+ void IssueAllocateIdsIfNeeded();
+ void OnDisconnect();
+
+ void Issue(NKikimrBlobDepot::TEvBlock msg, TRequestSender *sender, TRequestCompleteCallback callback);
+ void Issue(NKikimrBlobDepot::TEvResolve msg, TRequestSender *sender, TRequestCompleteCallback callback);
+ void Issue(NKikimrBlobDepot::TEvQueryBlocks msg, TRequestSender *sender, TRequestCompleteCallback callback);
+
+ void Issue(std::unique_ptr<IEventBase> ev, TRequestSender *sender, TRequestCompleteCallback callback);
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ struct TAllocatedId {
+ ui32 Generation;
+ ui64 Begin;
+ ui64 End;
+ };
+
+ bool IdAllocInFlight = false;
+ std::deque<TAllocatedId> IdQ;
+ static constexpr size_t PreallocatedIdCount = 2;
+
+ std::pair<TLogoBlobID, ui32> AllocateDataBlobId(ui32 size, ui32 type) {
+ if (IdQ.empty()) {
+ return {};
+ }
+
+ auto& item = IdQ.front();
+ auto cgsi = TCGSI::FromBinary(BlobDepotGeneration, BlobDepotChannelGroups.size(), item.Begin++);
+ if (item.Begin == item.End) {
+ IdQ.pop_front();
+ IssueAllocateIdsIfNeeded();
+ }
+ static constexpr ui32 typeBits = 24 - TCGSI::IndexBits;
+ Y_VERIFY(type < (1 << typeBits));
+ const ui32 cookie = cgsi.Index << typeBits | type;
+ const TLogoBlobID id(TabletId, cgsi.Generation, cgsi.Step, cgsi.Channel, size, cookie);
+ const ui32 groupId = BlobDepotChannelGroups[cgsi.Channel];
+ return {id, groupId};
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ struct TExecutingQueries {};
+ struct TPendingBlockChecks {};
+
+ class TExecutingQuery
+ : public TIntrusiveListItem<TExecutingQuery, TExecutingQueries>
+ , public TIntrusiveListItem<TExecutingQuery, TPendingBlockChecks>
+ , public TRequestSender
+ {
+ protected:
+ std::unique_ptr<IEventHandle> Event; // original query event
+ const ui64 QueryId;
+
+ public:
+ TExecutingQuery(TBlobDepotAgent& agent, std::unique_ptr<IEventHandle> event)
+ : TRequestSender(agent)
+ , Event(std::move(event))
+ , QueryId(RandomNumber<ui64>())
+ {}
+
+ virtual ~TExecutingQuery() = default;
+
+ void EndWithError(NKikimrProto::EReplyStatus status, const TString& errorReason);
+ void EndWithSuccess(std::unique_ptr<IEventBase> response);
+ TString GetName() const;
+ ui64 GetQueryId() const { return QueryId; }
+ virtual void Initiate() = 0;
+
+ virtual void OnUpdateBlock() {}
+
+ public:
+ struct TDeleter {
+ static void Destroy(TExecutingQuery *query) { delete query; }
+ };
+ };
+
+ std::deque<std::unique_ptr<IEventHandle>> PendingEventQ;
+ TIntrusiveListWithAutoDelete<TExecutingQuery, TExecutingQuery::TDeleter, TExecutingQueries> ExecutingQueries;
+
+ void HandleStorageProxy(TAutoPtr<IEventHandle> ev);
+ TExecutingQuery *CreateExecutingQuery(TAutoPtr<IEventHandle> ev);
+ template<ui32 EventType> TExecutingQuery *CreateExecutingQuery(std::unique_ptr<IEventHandle> ev);
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Blocks
+
+ struct TBlockInfo {
+ ui32 BlockedGeneration;
+ TMonotonic ExpirationTimestamp; // not valid after
+ bool RefreshInFlight = false;
+ TIntrusiveList<TExecutingQuery, TPendingBlockChecks> PendingBlockChecks;
+ };
+
+ std::unordered_map<ui64, TBlockInfo> Blocks;
+
+ NKikimrProto::EReplyStatus CheckBlockForTablet(ui64 tabletId, ui32 generation, TExecutingQuery *query);
};
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/agent_storage_block.cpp b/ydb/core/blob_depot/agent/agent_storage_block.cpp
new file mode 100644
index 00000000000..1d90be19463
--- /dev/null
+++ b/ydb/core/blob_depot/agent/agent_storage_block.cpp
@@ -0,0 +1,55 @@
+#include "agent_impl.h"
+
+namespace NKikimr::NBlobDepot {
+
+ template<>
+ TBlobDepotAgent::TExecutingQuery *TBlobDepotAgent::CreateExecutingQuery<TEvBlobStorage::EvBlock>(std::unique_ptr<IEventHandle> ev) {
+ class TBlockExecutingQuery : public TExecutingQuery {
+ public:
+ using TExecutingQuery::TExecutingQuery;
+
+ void Initiate() override {
+ auto& msg = *Event->Get<TEvBlobStorage::TEvBlock>();
+
+ // lookup existing blocks to try fail-fast
+ if (const auto it = Agent.Blocks.find(msg.TabletId); it != Agent.Blocks.end()) {
+ const TBlockInfo& block = it->second;
+ if (msg.Generation <= block.BlockedGeneration) {
+ // we don't consider ExpirationTimestamp here because blocked generation may only increase
+ return EndWithError(NKikimrProto::RACE, "block race detected");
+ }
+ }
+
+ // issue request to the tablet
+ NKikimrBlobDepot::TEvBlock block;
+ block.SetTabletId(msg.TabletId);
+ block.SetBlockedGeneration(msg.Generation);
+ Agent.Issue(std::move(block), this, std::bind(&TBlockExecutingQuery::HandleBlockResult, this,
+ std::placeholders::_1, TActivationContext::Monotonic()));
+ }
+
+ bool HandleBlockResult(IEventBase *event, TMonotonic issueTimestamp) {
+ if (!event) {
+ EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected");
+ } else if (const auto& msg = static_cast<TEvBlobDepot::TEvBlockResult&>(*event); !msg.Record.HasStatus()) {
+ EndWithError(NKikimrProto::ERROR, "incorrect TEvBlockResult response");
+ } else if (const auto status = msg.Record.GetStatus(); status != NKikimrProto::OK) {
+ EndWithError(status, msg.Record.GetErrorReason());
+ } else {
+ // update blocks cache
+ auto& query = *Event->Get<TEvBlobStorage::TEvBlock>();
+ auto& block = Agent.Blocks[query.TabletId];
+ Y_VERIFY(block.BlockedGeneration <= query.Generation); // TODO: QueryBlocks can race with Block?
+ block.BlockedGeneration = query.Generation;
+ block.ExpirationTimestamp = issueTimestamp + TDuration::MilliSeconds(msg.Record.GetTimeToLiveMs());
+
+ EndWithSuccess(std::make_unique<TEvBlobStorage::TEvBlockResult>(NKikimrProto::OK));
+ }
+ return true;
+ }
+ };
+
+ return new TBlockExecutingQuery(*this, std::move(ev));
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/agent_storage_collect_garbage.cpp b/ydb/core/blob_depot/agent/agent_storage_collect_garbage.cpp
new file mode 100644
index 00000000000..8a411246a52
--- /dev/null
+++ b/ydb/core/blob_depot/agent/agent_storage_collect_garbage.cpp
@@ -0,0 +1,10 @@
+#include "agent_impl.h"
+
+namespace NKikimr::NBlobDepot {
+
+ template<>
+ TBlobDepotAgent::TExecutingQuery *TBlobDepotAgent::CreateExecutingQuery<TEvBlobStorage::EvCollectGarbage>(std::unique_ptr<IEventHandle> ev) {
+ return (void)ev, nullptr;
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/agent_storage_discover.cpp b/ydb/core/blob_depot/agent/agent_storage_discover.cpp
new file mode 100644
index 00000000000..6cce45058a6
--- /dev/null
+++ b/ydb/core/blob_depot/agent/agent_storage_discover.cpp
@@ -0,0 +1,76 @@
+#include "agent_impl.h"
+
+namespace NKikimr::NBlobDepot {
+
+ template<>
+ TBlobDepotAgent::TExecutingQuery *TBlobDepotAgent::CreateExecutingQuery<TEvBlobStorage::EvDiscover>(std::unique_ptr<IEventHandle> ev) {
+ class TDiscoverExecutingQuery : public TExecutingQuery {
+ public:
+ using TExecutingQuery::TExecutingQuery;
+
+ void Initiate() override {
+ auto& msg = *Event->Get<TEvBlobStorage::TEvDiscover>();
+
+ const TLogoBlobID from(msg.TabletId, Max<ui32>(), Max<ui32>(), 0, TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie);
+ const TLogoBlobID to(msg.TabletId, 0, 0, 0, 0, 0);
+
+ NKikimrBlobDepot::TEvResolve resolve;
+ auto *item = resolve.AddItems();
+ item->SetBeginningKey(from.GetRaw(), 3 * sizeof(ui64));
+ item->SetIncludeBeginning(true);
+ item->SetEndingKey(to.GetRaw(), 3 * sizeof(ui64));
+ item->SetIncludeEnding(true);
+ item->SetMaxKeys(1);
+ item->SetReverse(true);
+
+ Agent.Issue(std::move(resolve), this, std::bind(&TDiscoverExecutingQuery::HandleResolveResult,
+ this, std::placeholders::_1));
+
+ if (msg.DiscoverBlockedGeneration) {
+ NKikimrBlobDepot::TEvQueryBlocks queryBlocks;
+ queryBlocks.AddTabletIds(msg.TabletId);
+ Agent.Issue(std::move(queryBlocks), this, std::bind(
+ &TDiscoverExecutingQuery::HandleQueryBlocksResult, this,
+ std::placeholders::_1, TActivationContext::Now()));
+ }
+ }
+
+ bool HandleResolveResult(IEventBase *result) {
+ if (!result) { // server has disconnected before request got processed
+ EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected");
+ return true;
+ }
+
+ auto& msg = static_cast<TEvBlobDepot::TEvResolveResult&>(*result);
+
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDASD01, "HandleResolveResult", (VirtualGroupId, Agent.VirtualGroupId),
+ (QueryId, QueryId), (Msg, msg.Record));
+
+ const NKikimrProto::EReplyStatus status = msg.Record.GetStatus();
+ if (status != NKikimrProto::OK && status != NKikimrProto::OVERRUN) {
+ EndWithError(status, msg.Record.GetErrorReason());
+ return true;
+ }
+
+ return status != NKikimrProto::OVERRUN;
+ }
+
+ bool HandleQueryBlocksResult(IEventBase *result, TInstant /*issueTimestamp*/) {
+ if (!result) {
+ EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected");
+ return true;
+ }
+
+ auto& msg = static_cast<TEvBlobDepot::TEvQueryBlocksResult&>(*result);
+
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDASD02, "HandleQueryBlocksResult", (VirtualGroupId, Agent.VirtualGroupId),
+ (QueryId, QueryId), (Msg, msg.Record));
+
+ return true;
+ }
+ };
+
+ return new TDiscoverExecutingQuery(*this, std::move(ev));
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/agent_storage_get.cpp b/ydb/core/blob_depot/agent/agent_storage_get.cpp
new file mode 100644
index 00000000000..55d4e166904
--- /dev/null
+++ b/ydb/core/blob_depot/agent/agent_storage_get.cpp
@@ -0,0 +1,10 @@
+#include "agent_impl.h"
+
+namespace NKikimr::NBlobDepot {
+
+ template<>
+ TBlobDepotAgent::TExecutingQuery *TBlobDepotAgent::CreateExecutingQuery<TEvBlobStorage::EvGet>(std::unique_ptr<IEventHandle> ev) {
+ return (void)ev, nullptr;
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/agent_storage_patch.cpp b/ydb/core/blob_depot/agent/agent_storage_patch.cpp
new file mode 100644
index 00000000000..92bc96c9d01
--- /dev/null
+++ b/ydb/core/blob_depot/agent/agent_storage_patch.cpp
@@ -0,0 +1,10 @@
+#include "agent_impl.h"
+
+namespace NKikimr::NBlobDepot {
+
+ template<>
+ TBlobDepotAgent::TExecutingQuery *TBlobDepotAgent::CreateExecutingQuery<TEvBlobStorage::EvPatch>(std::unique_ptr<IEventHandle> ev) {
+ return (void)ev, nullptr;
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/agent_storage_put.cpp b/ydb/core/blob_depot/agent/agent_storage_put.cpp
new file mode 100644
index 00000000000..9628abdf27b
--- /dev/null
+++ b/ydb/core/blob_depot/agent/agent_storage_put.cpp
@@ -0,0 +1,46 @@
+#include "agent_impl.h"
+
+namespace NKikimr::NBlobDepot {
+
+ template<>
+ TBlobDepotAgent::TExecutingQuery *TBlobDepotAgent::CreateExecutingQuery<TEvBlobStorage::EvPut>(std::unique_ptr<IEventHandle> ev) {
+ class TPutExecutingQuery : public TExecutingQuery {
+ ui32 BlockChecksRemain = 3;
+
+ public:
+ using TExecutingQuery::TExecutingQuery;
+
+ void Initiate() override {
+ auto& msg = *Event->Get<TEvBlobStorage::TEvPut>();
+
+ // first step -- check blocks
+ switch (Agent.CheckBlockForTablet(msg.Id.TabletID(), msg.Id.Generation(), this)) {
+ case NKikimrProto::OK:
+ return IssuePuts();
+
+ case NKikimrProto::RACE:
+ return EndWithError(NKikimrProto::RACE, "block race detected");
+
+ case NKikimrProto::NOTREADY:
+ if (!--BlockChecksRemain) {
+ EndWithError(NKikimrProto::ERROR, "failed to acquire blocks");
+ }
+ return;
+
+ default:
+ Y_FAIL();
+ }
+ }
+
+ void IssuePuts() {
+ }
+
+ void OnUpdateBlock() override {
+ Initiate(); // just restart request
+ }
+ };
+
+ return new TPutExecutingQuery(*this, std::move(ev));
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/agent_storage_query.cpp b/ydb/core/blob_depot/agent/agent_storage_query.cpp
new file mode 100644
index 00000000000..a0dc670d536
--- /dev/null
+++ b/ydb/core/blob_depot/agent/agent_storage_query.cpp
@@ -0,0 +1,67 @@
+#include "agent_impl.h"
+
+namespace NKikimr::NBlobDepot {
+
+ void TBlobDepotAgent::HandleStorageProxy(TAutoPtr<IEventHandle> ev) {
+ if (TabletId == Max<ui64>()) {
+ // TODO: memory usage control
+ PendingEventQ.emplace_back(ev.Release());
+ } else {
+ auto *query = CreateExecutingQuery(ev);
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA01, "new query", (VirtualGroupId, VirtualGroupId),
+ (QueryId, query->GetQueryId()), (Name, query->GetName()));
+ if (!TabletId) {
+ query->EndWithError(NKikimrProto::ERROR, "group is in error state");
+ } else {
+ query->Initiate();
+ }
+ }
+ }
+
+ TBlobDepotAgent::TExecutingQuery *TBlobDepotAgent::CreateExecutingQuery(TAutoPtr<IEventHandle> ev) {
+ switch (ev->GetTypeRewrite()) {
+#define XX(TYPE) \
+ case TEvBlobStorage::TYPE: return CreateExecutingQuery<TEvBlobStorage::TYPE>(std::unique_ptr<IEventHandle>(ev.Release()));
+
+ ENUMERATE_INCOMING_EVENTS(XX)
+#undef XX
+ }
+ Y_FAIL();
+ }
+
+ void TBlobDepotAgent::TExecutingQuery::EndWithError(NKikimrProto::EReplyStatus status, const TString& errorReason) {
+ STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA02, "query ends with error", (VirtualGroupId, Agent.VirtualGroupId),
+ (QueryId, QueryId), (Status, status), (ErrorReason, errorReason));
+
+ std::unique_ptr<IEventBase> response;
+ switch (Event->GetTypeRewrite()) {
+#define XX(TYPE) \
+ case TEvBlobStorage::TYPE: \
+ response = Event->Get<TEvBlobStorage::T##TYPE>()->MakeErrorResponse(status, errorReason, Agent.VirtualGroupId); \
+ break;
+
+ ENUMERATE_INCOMING_EVENTS(XX)
+#undef XX
+ }
+ Y_VERIFY(response);
+ Agent.SelfId().Send(Event->Sender, response.release(), 0, Event->Cookie);
+ delete this;
+ }
+
+ void TBlobDepotAgent::TExecutingQuery::EndWithSuccess(std::unique_ptr<IEventBase> response) {
+ STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA02, "query ends with success", (VirtualGroupId, Agent.VirtualGroupId),
+ (QueryId, QueryId), (Response, response->ToString()));
+ Agent.SelfId().Send(Event->Sender, response.release(), 0, Event->Cookie);
+ delete this;
+ }
+
+ TString TBlobDepotAgent::TExecutingQuery::GetName() const {
+ switch (Event->GetTypeRewrite()) {
+#define XX(TYPE) case TEvBlobStorage::TYPE: return #TYPE;
+ ENUMERATE_INCOMING_EVENTS(XX)
+#undef XX
+ }
+ Y_FAIL();
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/agent_storage_range.cpp b/ydb/core/blob_depot/agent/agent_storage_range.cpp
new file mode 100644
index 00000000000..d4dec509ee0
--- /dev/null
+++ b/ydb/core/blob_depot/agent/agent_storage_range.cpp
@@ -0,0 +1,10 @@
+#include "agent_impl.h"
+
+namespace NKikimr::NBlobDepot {
+
+ template<>
+ TBlobDepotAgent::TExecutingQuery *TBlobDepotAgent::CreateExecutingQuery<TEvBlobStorage::EvRange>(std::unique_ptr<IEventHandle> ev) {
+ return (void)ev, nullptr;
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/agent_storage_status.cpp b/ydb/core/blob_depot/agent/agent_storage_status.cpp
new file mode 100644
index 00000000000..36dcdf209bf
--- /dev/null
+++ b/ydb/core/blob_depot/agent/agent_storage_status.cpp
@@ -0,0 +1,10 @@
+#include "agent_impl.h"
+
+namespace NKikimr::NBlobDepot {
+
+ template<>
+ TBlobDepotAgent::TExecutingQuery *TBlobDepotAgent::CreateExecutingQuery<TEvBlobStorage::EvStatus>(std::unique_ptr<IEventHandle> ev) {
+ return (void)ev, nullptr;
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/defs.h b/ydb/core/blob_depot/agent/defs.h
index c5e120f3006..15052a2ae6c 100644
--- a/ydb/core/blob_depot/agent/defs.h
+++ b/ydb/core/blob_depot/agent/defs.h
@@ -1,3 +1,9 @@
#pragma once
#include <ydb/core/blob_depot/defs.h>
+
+#include <ydb/core/blob_depot/events.h>
+#include <ydb/core/blob_depot/types.h>
+
+#include <ydb/core/blobstorage/vdisk/common/vdisk_events.h>
+#include <ydb/core/util/stlog.h>
diff --git a/ydb/core/blob_depot/blob_depot_agent.cpp b/ydb/core/blob_depot/blob_depot_agent.cpp
new file mode 100644
index 00000000000..7b4e0c45ec5
--- /dev/null
+++ b/ydb/core/blob_depot/blob_depot_agent.cpp
@@ -0,0 +1,92 @@
+#include "blob_depot_tablet.h"
+
+namespace NKikimr::NBlobDepot {
+
+ void TBlobDepot::Handle(TEvTabletPipe::TEvServerConnected::TPtr ev) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BD01, "TEvServerConnected", (TabletId, TabletID()), (Msg, ev->Get()->ToString()));
+ const auto [it, inserted] = PipeServerToNode.emplace(ev->Get()->ServerId, std::nullopt);
+ Y_VERIFY(inserted);
+ }
+
+ void TBlobDepot::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr ev) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BD02, "TEvServerDisconnected", (TabletId, TabletID()), (Msg, ev->Get()->ToString()));
+ const auto it = PipeServerToNode.find(ev->Get()->ServerId);
+ Y_VERIFY(it != PipeServerToNode.end());
+ if (const auto& nodeId = it->second) {
+ if (const auto agentIt = Agents.find(*nodeId); agentIt != Agents.end()) {
+ if (TAgentInfo& agent = agentIt->second; agent.ConnectedAgent == it->first) {
+ OnAgentDisconnect(agent);
+ agent.ConnectedAgent.reset();
+ agent.ConnectedNodeId = 0;
+ agent.ExpirationTimestamp = TActivationContext::Now() + ExpirationTimeout;
+ }
+ }
+ }
+ PipeServerToNode.erase(it);
+ }
+
+ void TBlobDepot::OnAgentDisconnect(TAgentInfo& agent) {
+ BlocksManager.OnAgentDisconnect(agent);
+ }
+
+ void TBlobDepot::Handle(TEvBlobDepot::TEvRegisterAgent::TPtr ev) {
+ const auto it = PipeServerToNode.find(ev->Recipient);
+ Y_VERIFY(it != PipeServerToNode.end());
+ const ui32 nodeId = ev->Sender.NodeId();
+ Y_VERIFY(!it->second || *it->second == nodeId);
+ it->second = nodeId;
+ auto& agent = Agents[nodeId];
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BD03, "TEvRegisterAgent", (TabletId, TabletID()), (Msg, ev->Get()->Record),
+ (NodeId, nodeId), (PipeServerId, it->first));
+ agent.ConnectedAgent = it->first;
+ agent.ConnectedNodeId = nodeId;
+ agent.ExpirationTimestamp = TInstant::Max();
+ OnAgentConnect(agent);
+
+ auto response = std::make_unique<TEvBlobDepot::TEvRegisterAgentResult>();
+ auto& record = response->Record;
+ record.SetGeneration(Executor()->Generation());
+ for (const auto& channel : Info()->Channels) {
+ Y_VERIFY(channel.Channel == record.ChannelGroupsSize());
+ record.AddChannelGroups(channel.History ? channel.History.back().GroupID : 0);
+ }
+
+ SendResponseToAgent(*ev, std::move(response));
+ }
+
+ void TBlobDepot::OnAgentConnect(TAgentInfo& agent) {
+ BlocksManager.OnAgentConnect(agent);
+ }
+
+ void TBlobDepot::Handle(TEvBlobDepot::TEvAllocateIds::TPtr ev) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BD04, "TEvAllocateIds", (TabletId, TabletID()), (Msg, ev->Get()->Record),
+ (PipeServerId, ev->Recipient));
+ auto response = std::make_unique<TEvBlobDepot::TEvAllocateIdsResult>();
+ auto& record = response->Record;
+ const ui32 generation = Executor()->Generation();
+ record.SetGeneration(generation);
+ record.SetRangeBegin(NextBlobSeqId);
+ NextBlobSeqId += PreallocatedIdCount;
+ record.SetRangeEnd(NextBlobSeqId);
+ SendResponseToAgent(*ev, std::move(response));
+ }
+
+ TBlobDepot::TAgentInfo& TBlobDepot::GetAgent(const TActorId& pipeServerId) {
+ const auto it = PipeServerToNode.find(pipeServerId);
+ Y_VERIFY(it != PipeServerToNode.end());
+ Y_VERIFY(it->second);
+ const auto agentIt = Agents.find(*it->second);
+ Y_VERIFY(agentIt != Agents.end());
+ Y_VERIFY(agentIt->second.ConnectedAgent == pipeServerId);
+ return agentIt->second;
+ }
+
+ void TBlobDepot::SendResponseToAgent(IEventHandle& request, std::unique_ptr<IEventBase> response) {
+ auto handle = std::make_unique<IEventHandle>(request.Sender, SelfId(), response.release(), 0, request.Cookie);
+ if (request.InterconnectSession) {
+ handle->Rewrite(TEvInterconnect::EvForward, request.InterconnectSession);
+ }
+ TActivationContext::Send(handle.release());
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h
index f2b50523849..014ad61d42f 100644
--- a/ydb/core/blob_depot/blob_depot_tablet.h
+++ b/ydb/core/blob_depot/blob_depot_tablet.h
@@ -2,6 +2,7 @@
#include "defs.h"
#include "events.h"
+#include "types.h"
namespace NKikimr::NBlobDepot {
@@ -11,32 +12,51 @@ namespace NKikimr::NBlobDepot {
: public TActor<TBlobDepot>
, public TTabletExecutedFlat
{
+ struct TEvPrivate {
+ enum {
+ EvCheckExpiredAgents = EventSpaceBegin(TEvents::ES_PRIVATE),
+ };
+ };
+
public:
TBlobDepot(TActorId tablet, TTabletStorageInfo *info)
: TActor(&TThis::StateInit)
, TTabletExecutedFlat(info, tablet, new NMiniKQL::TMiniKQLFactory)
+ , BlocksManager(this)
{}
- void Handle(TEvBlobDepot::TEvApplyConfig::TPtr ev) {
- auto response = std::make_unique<TEvBlobDepot::TEvApplyConfigResult>(TabletID(), ev->Get()->Record.GetTxId());
- Send(ev->Sender, response.release(), 0, ev->Cookie);
- }
-
- void Handle(TEvTabletPipe::TEvServerConnected::TPtr ev) {
- Y_UNUSED(ev);
- }
-
- void Handle(TEvTabletPipe::TEvServerDisconnected::TPtr ev) {
- Y_UNUSED(ev);
- }
-
void HandlePoison() {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT02, "HandlePoison", (TabletId, TabletID()));
Become(&TThis::StateZombie);
Send(Tablet(), new TEvents::TEvPoison);
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ static constexpr TDuration ExpirationTimeout = TDuration::Minutes(1);
+ static constexpr ui32 PreallocatedIdCount = 100;
+
+ struct TAgentInfo {
+ std::optional<TActorId> ConnectedAgent;
+ ui32 ConnectedNodeId;
+ TInstant ExpirationTimestamp;
+ };
+
+ THashMap<TActorId, std::optional<ui32>> PipeServerToNode;
+ THashMap<ui32, TAgentInfo> Agents; // NodeId -> Agent
+
+ ui64 NextBlobSeqId = 0;
+
+ void Handle(TEvTabletPipe::TEvServerConnected::TPtr ev);
+ void Handle(TEvTabletPipe::TEvServerDisconnected::TPtr ev);
+ void OnAgentDisconnect(TAgentInfo& agent);
+ void Handle(TEvBlobDepot::TEvRegisterAgent::TPtr ev);
+ void OnAgentConnect(TAgentInfo& agent);
+ void Handle(TEvBlobDepot::TEvAllocateIds::TPtr ev);
+ TAgentInfo& GetAgent(const TActorId& pipeServerId);
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
std::deque<std::unique_ptr<IEventHandle>> InitialEventsQ;
void Enqueue(TAutoPtr<IEventHandle>& ev, const TActorContext&) override {
@@ -44,21 +64,32 @@ namespace NKikimr::NBlobDepot {
}
void OnActivateExecutor(const TActorContext&) override {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT03, "OnActivateExecutor", (TabletId, TabletID()));
+
+ ExecuteTxInitSchema();
+
Become(&TThis::StateWork);
for (auto&& ev : std::exchange(InitialEventsQ, {})) {
TActivationContext::Send(ev.release());
}
+
+ NextBlobSeqId = TCGSI{BaseDataChannel, Executor()->Generation(), 1, 0}.ToBinary(Info()->Channels.size());
}
void OnDetach(const TActorContext&) override {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT04, "OnDetach", (TabletId, TabletID()));
+
// TODO: what does this callback mean
PassAway();
}
void OnTabletDead(TEvTablet::TEvTabletDead::TPtr& /*ev*/, const TActorContext&) override {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT05, "OnTabletDead", (TabletId, TabletID()));
PassAway();
}
+ void SendResponseToAgent(IEventHandle& request, std::unique_ptr<IEventBase> response);
+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
STFUNC(StateInit) {
@@ -78,6 +109,13 @@ namespace NKikimr::NBlobDepot {
cFunc(TEvents::TSystem::Poison, HandlePoison);
hFunc(TEvBlobDepot::TEvApplyConfig, Handle);
+ hFunc(TEvBlobDepot::TEvRegisterAgent, Handle);
+ hFunc(TEvBlobDepot::TEvAllocateIds, Handle);
+ hFunc(TEvBlobDepot::TEvCommitBlobSeq, Handle);
+ hFunc(TEvBlobDepot::TEvResolve, Handle);
+
+ hFunc(TEvBlobDepot::TEvBlock, BlocksManager.Handle);
+ hFunc(TEvBlobDepot::TEvQueryBlocks, BlocksManager.Handle);
hFunc(TEvTabletPipe::TEvServerConnected, Handle);
hFunc(TEvTabletPipe::TEvServerDisconnected, Handle);
@@ -95,6 +133,54 @@ namespace NKikimr::NBlobDepot {
bool ReassignChannelsEnabled() const override {
return true;
}
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ void Execute(std::unique_ptr<NTabletFlatExecutor::TTransactionBase<TBlobDepot>> tx) {
+ Executor()->Execute(tx.release(), TActivationContext::AsActorContext());
+ }
+
+ void ExecuteTxInitSchema();
+ void ExecuteTxLoad();
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Configuration
+
+ NKikimrBlobDepot::TBlobDepotConfig Config;
+
+ void Handle(TEvBlobDepot::TEvApplyConfig::TPtr ev);
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Blocks
+
+ class TBlocksManager {
+ class TImpl;
+ std::unique_ptr<TImpl> Impl;
+
+ public:
+ TBlocksManager(TBlobDepot *self);
+ ~TBlocksManager();
+ void AddBlockOnLoad(ui64 tabletId, ui32 blockedGeneration);
+ void OnAgentConnect(TAgentInfo& agent);
+ void OnAgentDisconnect(TAgentInfo& agent);
+
+ void Handle(TEvBlobDepot::TEvBlock::TPtr ev);
+ void Handle(TEvBlobDepot::TEvQueryBlocks::TPtr ev);
+ };
+
+ TBlocksManager BlocksManager;
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Key operation
+
+ struct TKeyValue {
+ };
+
+ std::map<TString, TKeyValue> Data;
+
+ void Handle(TEvBlobDepot::TEvCommitBlobSeq::TPtr ev);
+
+ void Handle(TEvBlobDepot::TEvResolve::TPtr ev);
};
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/blocks.cpp b/ydb/core/blob_depot/blocks.cpp
new file mode 100644
index 00000000000..1550255eb14
--- /dev/null
+++ b/ydb/core/blob_depot/blocks.cpp
@@ -0,0 +1,139 @@
+#include "blob_depot_tablet.h"
+#include "schema.h"
+
+namespace NKikimr::NBlobDepot {
+
+ class TBlobDepot::TBlocksManager::TImpl {
+ TBlobDepot *Self;
+ THashMap<ui64, ui32> Blocks;
+
+ private:
+ class TTxUpdateBlock : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
+ const ui64 TabletId;
+ const ui32 BlockedGeneration;
+ const ui32 NodeId;
+ const TInstant Timestamp;
+ std::unique_ptr<IEventHandle> Response;
+ bool RaceDetected;
+
+ public:
+ TTxUpdateBlock(TBlobDepot *self, ui64 tabletId, ui32 blockedGeneration, ui32 nodeId, TInstant timestamp,
+ std::unique_ptr<IEventHandle> response)
+ : TTransactionBase(self)
+ , TabletId(tabletId)
+ , BlockedGeneration(blockedGeneration)
+ , NodeId(nodeId)
+ , Timestamp(timestamp)
+ , Response(std::move(response))
+ {}
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ TImpl& impl = *Self->BlocksManager.Impl;
+ const auto [it, inserted] = impl.Blocks.emplace(TabletId, BlockedGeneration);
+ RaceDetected = !inserted && BlockedGeneration <= it->second;
+ if (RaceDetected) {
+ Response->Get<TEvBlobDepot::TEvBlockResult>()->Record.SetStatus(NKikimrProto::RACE);
+ } else {
+ NIceDb::TNiceDb db(txc.DB);
+ db.Table<Schema::Blocks>().Key(TabletId).Update(
+ NIceDb::TUpdate<Schema::Blocks::BlockedGeneration>(BlockedGeneration),
+ NIceDb::TUpdate<Schema::Blocks::IssuedByNode>(NodeId),
+ NIceDb::TUpdate<Schema::Blocks::IssueTimestamp>(Timestamp)
+ );
+ }
+ return true;
+ }
+
+ void Complete(const TActorContext&) override {
+ if (RaceDetected) {
+ TActivationContext::Send(Response.release());
+ } else {
+ Self->BlocksManager.Impl->OnBlockCommitted(TabletId, BlockedGeneration, std::move(Response));
+ }
+ }
+ };
+
+ public:
+ TImpl(TBlobDepot *self)
+ : Self(self)
+ {}
+
+ void AddBlockOnLoad(ui64 tabletId, ui32 generation) {
+ Blocks.emplace(tabletId, generation);
+ }
+
+ void OnBlockCommitted(ui64 tabletId, ui32 blockedGeneration, std::unique_ptr<IEventHandle> response) {
+ (void)tabletId, (void)blockedGeneration, (void)response;
+ }
+
+ void OnAgentConnect(TAgentInfo& agent) {
+ (void)agent;
+ }
+
+ void OnAgentDisconnect(TAgentInfo& agent) {
+ (void)agent;
+ }
+
+ void Handle(TEvBlobDepot::TEvBlock::TPtr ev) {
+ auto event = std::make_unique<TEvBlobDepot::TEvBlockResult>(NKikimrProto::OK, std::nullopt);
+ auto& responseRecord = event->Record;
+ auto response = std::make_unique<IEventHandle>(ev->Sender, Self->SelfId(), event.release(), 0, ev->Cookie);
+ if (ev->InterconnectSession) {
+ response->Rewrite(TEvInterconnect::EvForward, ev->InterconnectSession);
+ }
+
+ const auto& record = ev->Get()->Record;
+ if (!record.HasTabletId() || !record.HasBlockedGeneration()) {
+ responseRecord.SetStatus(NKikimrProto::ERROR);
+ responseRecord.SetErrorReason("incorrect protobuf");
+ } else {
+ const ui64 tabletId = record.GetTabletId();
+ const ui32 blockedGeneration = record.GetBlockedGeneration();
+ if (const auto it = Blocks.find(tabletId); it != Blocks.end() && blockedGeneration <= it->second) {
+ responseRecord.SetStatus(NKikimrProto::ERROR);
+ } else {
+ TAgentInfo& agent = Self->GetAgent(ev->Recipient);
+ Self->Execute(std::make_unique<TTxUpdateBlock>(Self, tabletId, blockedGeneration,
+ agent.ConnectedNodeId, TActivationContext::Now(), std::move(response)));
+ }
+ }
+
+ TActivationContext::Send(response.release()); // not sent if the request got processed and response now is nullptr
+ }
+
+ void Handle(TEvBlobDepot::TEvQueryBlocks::TPtr ev) {
+ (void)ev;
+ }
+ };
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // TBlocksManager wrapper
+
+ TBlobDepot::TBlocksManager::TBlocksManager(TBlobDepot *self)
+ : Impl(std::make_unique<TImpl>(self))
+ {}
+
+ TBlobDepot::TBlocksManager::~TBlocksManager()
+ {}
+
+ void TBlobDepot::TBlocksManager::AddBlockOnLoad(ui64 tabletId, ui32 generation) {
+ Impl->AddBlockOnLoad(tabletId, generation);
+ }
+
+ void TBlobDepot::TBlocksManager::OnAgentConnect(TAgentInfo& agent) {
+ Impl->OnAgentConnect(agent);
+ }
+
+ void TBlobDepot::TBlocksManager::OnAgentDisconnect(TAgentInfo& agent) {
+ Impl->OnAgentDisconnect(agent);
+ }
+
+ void TBlobDepot::TBlocksManager::Handle(TEvBlobDepot::TEvBlock::TPtr ev) {
+ return Impl->Handle(ev);
+ }
+
+ void TBlobDepot::TBlocksManager::Handle(TEvBlobDepot::TEvQueryBlocks::TPtr ev) {
+ return Impl->Handle(ev);
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/defs.h b/ydb/core/blob_depot/defs.h
index 51a5a82fad6..9d531864287 100644
--- a/ydb/core/blob_depot/defs.h
+++ b/ydb/core/blob_depot/defs.h
@@ -1,5 +1,10 @@
#pragma once
#include <ydb/core/base/tablet_pipe.h>
+#include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.h>
#include <ydb/core/engine/minikql/flat_local_tx_factory.h>
#include <ydb/core/tablet_flat/tablet_flat_executed.h>
+#include <ydb/core/tablet_flat/flat_cxx_database.h>
+#include <ydb/core/util/stlog.h>
+
+#include <util/generic/va_args.h>
diff --git a/ydb/core/blob_depot/events.h b/ydb/core/blob_depot/events.h
index 312e2fc5d54..7c01329c502 100644
--- a/ydb/core/blob_depot/events.h
+++ b/ydb/core/blob_depot/events.h
@@ -10,25 +10,59 @@ namespace NKikimr {
enum {
EvApplyConfig = EventSpaceBegin(TKikimrEvents::ES_BLOB_DEPOT),
EvApplyConfigResult,
+ EvRegisterAgent,
+ EvRegisterAgentResult,
+ EvAllocateIds,
+ EvAllocateIdsResult,
+ EvBlock,
+ EvBlockResult,
+ EvPushNotify,
+ EvQueryBlocks,
+ EvQueryBlocksResult,
+ EvCommitBlobSeq,
+ EvCommitBlobSeqResult,
+ EvResolve,
+ EvResolveResult,
};
- struct TEvApplyConfig : TEventPB<TEvApplyConfig, NKikimrBlobDepot::TEvApplyConfig, TEvBlobDepot::EvApplyConfig> {
- TEvApplyConfig() = default;
+#define BLOBDEPOT_PARAM_ARG(ARG) std::optional<std::decay_t<decltype(Record.Get##ARG())>> param##ARG,
- TEvApplyConfig(ui64 txId) {
- Record.SetTxId(txId);
- }
- };
+#define BLOBDEPOT_SETTER(ARG) \
+ if (param##ARG) { \
+ Record.Set##ARG(*param##ARG); \
+ }
- struct TEvApplyConfigResult : TEventPB<TEvApplyConfigResult, NKikimrBlobDepot::TEvApplyConfigResult, TEvBlobDepot::EvApplyConfigResult> {
- TEvApplyConfigResult() = default;
+#define BLOBDEPOT_EVENT_PB_NO_ARGS(NAME) \
+ struct T##NAME : TEventPB<T##NAME, NKikimrBlobDepot::T##NAME, NAME> { \
+ T##NAME() = default; \
+ }
- TEvApplyConfigResult(ui64 tabletId, ui64 txId) {
- Record.SetTabletId(tabletId);
- Record.SetTxId(txId);
- }
- };
+#define BLOBDEPOT_EVENT_PB(NAME, ...) \
+ struct T##NAME : TEventPB<T##NAME, NKikimrBlobDepot::T##NAME, NAME> { \
+ T##NAME() = default; \
+ \
+ struct TArgListTerminator {}; \
+ \
+ T##NAME(Y_MAP_ARGS(BLOBDEPOT_PARAM_ARG, __VA_ARGS__) TArgListTerminator = {}) { \
+ Y_MAP_ARGS(BLOBDEPOT_SETTER, __VA_ARGS__) \
+ } \
+ }
+ BLOBDEPOT_EVENT_PB(EvApplyConfig, TxId);
+ BLOBDEPOT_EVENT_PB(EvApplyConfigResult, TabletId, TxId);
+ BLOBDEPOT_EVENT_PB(EvRegisterAgent, VirtualGroupId);
+ BLOBDEPOT_EVENT_PB_NO_ARGS(EvRegisterAgentResult);
+ BLOBDEPOT_EVENT_PB_NO_ARGS(EvAllocateIds);
+ BLOBDEPOT_EVENT_PB_NO_ARGS(EvAllocateIdsResult);
+ BLOBDEPOT_EVENT_PB(EvBlock, TabletId, BlockedGeneration);
+ BLOBDEPOT_EVENT_PB(EvBlockResult, Status, ErrorReason);
+ BLOBDEPOT_EVENT_PB_NO_ARGS(EvPushNotify);
+ BLOBDEPOT_EVENT_PB_NO_ARGS(EvQueryBlocks);
+ BLOBDEPOT_EVENT_PB_NO_ARGS(EvQueryBlocksResult);
+ BLOBDEPOT_EVENT_PB_NO_ARGS(EvCommitBlobSeq);
+ BLOBDEPOT_EVENT_PB_NO_ARGS(EvCommitBlobSeqResult);
+ BLOBDEPOT_EVENT_PB_NO_ARGS(EvResolve);
+ BLOBDEPOT_EVENT_PB(EvResolveResult, Status, ErrorReason);
};
} // NKikimr
diff --git a/ydb/core/blob_depot/op_apply_config.cpp b/ydb/core/blob_depot/op_apply_config.cpp
new file mode 100644
index 00000000000..57b77e0183d
--- /dev/null
+++ b/ydb/core/blob_depot/op_apply_config.cpp
@@ -0,0 +1,48 @@
+#include "blob_depot_tablet.h"
+#include "schema.h"
+
+namespace NKikimr::NBlobDepot {
+
+ void TBlobDepot::Handle(TEvBlobDepot::TEvApplyConfig::TPtr ev) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT01, "TEvApplyConfig", (TabletId, TabletID()), (Msg, ev->Get()->Record));
+
+ class TTxApplyConfig : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
+ std::unique_ptr<IEventHandle> Response;
+ const TActorId InterconnectSession;
+ TString ConfigProtobuf;
+
+ public:
+ TTxApplyConfig(TBlobDepot *self, TEvBlobDepot::TEvApplyConfig& ev, std::unique_ptr<IEventHandle> response,
+ TActorId interconnectSession)
+ : TTransactionBase(self)
+ , Response(std::move(response))
+ , InterconnectSession(interconnectSession)
+ {
+ const bool success = ev.Record.SerializeToString(&ConfigProtobuf);
+ Y_VERIFY(success);
+ }
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ NIceDb::TNiceDb db(txc.DB);
+ db.Table<Schema::Config>().Key(Schema::Config::Key::Value).Update(
+ NIceDb::TUpdate<Schema::Config::ConfigProtobuf>(ConfigProtobuf)
+ );
+ return true;
+ }
+
+ void Complete(const TActorContext&) override {
+ const bool success = Self->Config.ParseFromString(ConfigProtobuf);
+ Y_VERIFY(success);
+ if (InterconnectSession) {
+ Response->Rewrite(TEvInterconnect::EvForward, InterconnectSession);
+ }
+ TActivationContext::Send(Response.release());
+ }
+ };
+
+ auto responseEvent = std::make_unique<TEvBlobDepot::TEvApplyConfigResult>(TabletID(), ev->Get()->Record.GetTxId());
+ auto response = std::make_unique<IEventHandle>(ev->Sender, SelfId(), responseEvent.release(), 0, ev->Cookie);
+ Execute(std::make_unique<TTxApplyConfig>(this, *ev->Get(), std::move(response), ev->InterconnectSession));
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/op_commit_blob_seq.cpp b/ydb/core/blob_depot/op_commit_blob_seq.cpp
new file mode 100644
index 00000000000..4f5d628ac74
--- /dev/null
+++ b/ydb/core/blob_depot/op_commit_blob_seq.cpp
@@ -0,0 +1,9 @@
+#include "blob_depot_tablet.h"
+
+namespace NKikimr::NBlobDepot {
+
+ void TBlobDepot::Handle(TEvBlobDepot::TEvCommitBlobSeq::TPtr ev) {
+ (void)ev;
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/op_init_schema.cpp b/ydb/core/blob_depot/op_init_schema.cpp
new file mode 100644
index 00000000000..6e2ba92d639
--- /dev/null
+++ b/ydb/core/blob_depot/op_init_schema.cpp
@@ -0,0 +1,27 @@
+#include "blob_depot_tablet.h"
+#include "schema.h"
+
+namespace NKikimr::NBlobDepot {
+
+ void TBlobDepot::ExecuteTxInitSchema() {
+ class TTxInitSchema : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
+ public:
+ TTxInitSchema(TBlobDepot *self)
+ : TTransactionBase(self)
+ {}
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ NIceDb::TNiceDb db(txc.DB);
+ db.Materialize<Schema>();
+ return true;
+ }
+
+ void Complete(const TActorContext&) override {
+ Self->ExecuteTxLoad();
+ }
+ };
+
+ Execute(std::make_unique<TTxInitSchema>(this));
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/op_load.cpp b/ydb/core/blob_depot/op_load.cpp
new file mode 100644
index 00000000000..240bc5ad6d2
--- /dev/null
+++ b/ydb/core/blob_depot/op_load.cpp
@@ -0,0 +1,66 @@
+#include "blob_depot_tablet.h"
+#include "schema.h"
+
+namespace NKikimr::NBlobDepot {
+
+ void TBlobDepot::ExecuteTxLoad() {
+ class TTxLoad : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
+ public:
+ TTxLoad(TBlobDepot *self)
+ : TTransactionBase(self)
+ {}
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ NIceDb::TNiceDb db(txc.DB);
+
+ if (!Precharge(db)) {
+ return false;
+ }
+
+ // Config
+ {
+ auto table = db.Table<Schema::Config>().Key(Schema::Config::Key::Value).Select();
+ if (!table.IsReady()) {
+ return false;
+ } else if (table.IsValid()) {
+ if (table.HaveValue<Schema::Config::ConfigProtobuf>()) {
+ const bool success = Self->Config.ParseFromString(table.GetValue<Schema::Config::ConfigProtobuf>());
+ Y_VERIFY(success);
+ }
+ }
+ }
+
+ // Blocks
+ {
+ auto table = db.Table<Schema::Blocks>().Select();
+ if (!table.IsReady()) {
+ return false;
+ }
+ while (table.IsValid()) {
+ Self->BlocksManager.AddBlockOnLoad(
+ table.GetValue<Schema::Blocks::TabletId>(),
+ table.GetValue<Schema::Blocks::BlockedGeneration>()
+ );
+ if (!table.Next()) {
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+
+ bool Precharge(NIceDb::TNiceDb& db) {
+ auto config = db.Table<Schema::Config>().Select();
+ auto blocks = db.Table<Schema::Blocks>().Select();
+ return config.IsReady() && blocks.IsReady();
+ }
+
+ void Complete(const TActorContext&) override {
+ }
+ };
+
+ Execute(std::make_unique<TTxLoad>(this));
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/op_resolve.cpp b/ydb/core/blob_depot/op_resolve.cpp
new file mode 100644
index 00000000000..39b36197480
--- /dev/null
+++ b/ydb/core/blob_depot/op_resolve.cpp
@@ -0,0 +1,73 @@
+#include "blob_depot_tablet.h"
+
+namespace NKikimr::NBlobDepot {
+
+ void TBlobDepot::Handle(TEvBlobDepot::TEvResolve::TPtr ev) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDR01, "TEvResolve", (TabletId, TabletID()), (Msg, ev->Get()->ToString()),
+ (Sender, ev->Sender), (Recipient, ev->Recipient), (Cookie, ev->Cookie));
+
+ // collect records if needed
+
+ auto response = std::make_unique<TEvBlobDepot::TEvResolveResult>(NKikimrProto::OK, std::nullopt);
+ ui32 messageSize = response->CalculateSerializedSize();
+ auto sendMessage = [&](bool more) {
+ if (more) {
+ response->Record.SetStatus(NKikimrProto::OVERRUN);
+ }
+
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDR02, "sending TEvResolveResult", (TabletId, TabletID()), (Msg, response->Record));
+
+ auto handle = std::make_unique<IEventHandle>(ev->Sender, SelfId(), response.release(), 0, ev->Cookie);
+ if (ev->InterconnectSession) {
+ handle->Rewrite(TEvInterconnect::EvForward, ev->InterconnectSession);
+ }
+ TActivationContext::Send(handle.release());
+
+ if (more) {
+ response = std::make_unique<TEvBlobDepot::TEvResolveResult>(NKikimrProto::OK, std::nullopt);
+ messageSize = response->CalculateSerializedSize();
+ }
+ };
+
+ ui32 itemIndex = 0;
+ for (const auto& item : ev->Get()->Record.GetItems()) {
+ auto beginIt = !item.HasBeginningKey() ? Data.begin()
+ : item.GetIncludeBeginning() ? Data.lower_bound(item.GetBeginningKey())
+ : Data.upper_bound(item.GetBeginningKey());
+
+ auto endIt = !item.HasEndingKey() ? Data.end()
+ : item.GetIncludeEnding() ? Data.upper_bound(item.GetEndingKey())
+ : Data.lower_bound(item.GetEndingKey());
+
+ ui32 numItems = 0;
+ auto addKey = [&](auto it) {
+ NKikimrBlobDepot::TEvResolveResult::TResolvedKey resolvedKey;
+ resolvedKey.SetItemIndex(itemIndex);
+ resolvedKey.SetKey(it->first);
+
+ const ui32 keySize = resolvedKey.ByteSizeLong();
+ if (messageSize + keySize > EventMaxByteSize) {
+ sendMessage(true);
+ }
+
+ // put resolved key into the result
+ resolvedKey.Swap(response->Record.AddResolvedKeys());
+ messageSize += keySize;
+ ++numItems;
+
+ return !item.HasMaxKeys() || numItems != item.GetMaxKeys();
+ };
+
+ if (item.GetReverse()) {
+ while (beginIt != endIt && addKey(--endIt)) {}
+ } else {
+ while (beginIt != endIt && addKey(beginIt++)) {}
+ }
+
+ ++itemIndex;
+ }
+
+ sendMessage(false);
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/schema.h b/ydb/core/blob_depot/schema.h
new file mode 100644
index 00000000000..ae96c5422fa
--- /dev/null
+++ b/ydb/core/blob_depot/schema.h
@@ -0,0 +1,92 @@
+#pragma once
+
+#include "defs.h"
+#include "types.h"
+
+namespace NKikimr::NBlobDepot {
+
+ struct Schema : NIceDb::Schema {
+ struct Config : Table<1> {
+ struct Key : Column<1, NScheme::NTypeIds::Uint32> { static constexpr Type Value = 0; };
+ struct ConfigProtobuf : Column<2, NScheme::NTypeIds::String> {};
+
+ using TKey = TableKey<Key>;
+ using TColumns = TableColumns<
+ Key,
+ ConfigProtobuf
+ >;
+ };
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // BlobStorage-related parts
+
+ struct Blocks : Table<2> {
+ struct TabletId : Column<1, NScheme::NTypeIds::Uint64> {};
+ struct BlockedGeneration : Column<2, NScheme::NTypeIds::Uint32> {};
+ struct IssueTimestamp : Column<3, NScheme::NTypeIds::Uint64> { using Type = TInstant; };
+ struct IssuedByNode : Column<4, NScheme::NTypeIds::Uint32> {};
+
+ using TKey = TableKey<TabletId>;
+ using TColumns = TableColumns<
+ TabletId,
+ BlockedGeneration,
+ IssueTimestamp,
+ IssuedByNode
+ >;
+ };
+
+ struct Barriers : Table<3> {
+ struct TabletId : Column<1, NScheme::NTypeIds::Uint64> {};
+ struct Channel : Column<2, NScheme::NTypeIds::Uint8> {};
+ struct SoftGenStep : Column<3, NScheme::NTypeIds::Uint64> {};
+ struct HardGenStep : Column<4, NScheme::NTypeIds::Uint64> {};
+
+ using TKey = TableKey<TabletId, Channel>;
+ using TColumns = TableColumns<
+ TabletId,
+ Channel,
+ SoftGenStep,
+ HardGenStep
+ >;
+ };
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Common parts
+
+ struct Data : Table<4> {
+ struct Key : Column<1, NScheme::NTypeIds::String> {};
+ struct Meta : Column<2, NScheme::NTypeIds::String> {};
+ struct Id : Column<3, NScheme::NTypeIds::Uint64> {};
+ struct CGSI : Column<4, NScheme::NTypeIds::String> {};
+ struct Checksum : Column<5, NScheme::NTypeIds::Uint32> {};
+ struct TotalDataLen : Column<6, NScheme::NTypeIds::Uint64> {};
+ struct KeepState : Column<7, NScheme::NTypeIds::Uint8> { using Type = EKeepState; };
+ struct Public : Column<8, NScheme::NTypeIds::Bool> {};
+
+ using TKey = TableKey<Key>;
+ using TColumns = TableColumns<
+ Key,
+ Meta,
+ Id,
+ CGSI,
+ Checksum,
+ TotalDataLen,
+ KeepState,
+ Public
+ >;
+ };
+
+ using TTables = SchemaTables<
+ Config,
+ Blocks,
+ Barriers,
+ Data
+ >;
+
+ using TSettings = SchemaSettings<
+ ExecutorLogBatching<true>,
+ ExecutorLogFlushPeriod<TDuration::MicroSeconds(512).GetValue()>
+ >;
+ };
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/types.h b/ydb/core/blob_depot/types.h
index 9dd722b6bda..141696bd4f8 100644
--- a/ydb/core/blob_depot/types.h
+++ b/ydb/core/blob_depot/types.h
@@ -4,11 +4,40 @@
namespace NKikimr::NBlobDepot {
+ static constexpr ui32 BaseDataChannel = 2;
+
struct TCGSI {
+ static constexpr ui32 IndexBits = 20;
+ static constexpr ui32 MaxIndex = (1 << IndexBits) - 1;
+
ui32 Channel;
ui32 Generation;
ui32 Step;
ui32 Index;
+
+ ui64 ToBinary(ui32 numChannels) const {
+ Y_VERIFY_DEBUG(numChannels > BaseDataChannel);
+ Y_VERIFY_DEBUG(Index <= MaxIndex);
+ return (static_cast<ui64>(Step) << IndexBits | Index) * (numChannels - BaseDataChannel) + (Channel - BaseDataChannel);
+ }
+
+ static TCGSI FromBinary(ui32 generation, ui32 numChannels, ui64 value) {
+ static_assert(sizeof(long long) >= sizeof(ui64));
+ auto res = std::lldiv(value, numChannels - BaseDataChannel);
+
+ return TCGSI{
+ .Channel = static_cast<ui32>(res.rem + BaseDataChannel),
+ .Generation = generation,
+ .Step = static_cast<ui32>(res.quot >> IndexBits),
+ .Index = static_cast<ui32>(res.quot) & MaxIndex
+ };
+ }
+ };
+
+ enum class EKeepState : ui8 {
+ Default,
+ Keep,
+ DoNotKeep
};
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.cpp b/ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.cpp
index f0d696e82dd..0b5d0dee043 100644
--- a/ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.cpp
+++ b/ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.cpp
@@ -657,6 +657,9 @@ TIntrusivePtr<TBlobStorageGroupInfo> TBlobStorageGroupInfo::Parse(const NKikimrB
}
auto res = MakeIntrusive<TBlobStorageGroupInfo>(std::move(topology), std::move(dyn), group.GetStoragePoolName(),
acceptedScope, commonDeviceType);
+ if (group.HasBlobDepotId()) {
+ res->BlobDepotId = group.GetBlobDepotId();
+ }
// process encryption parameters
res->EncryptionMode = static_cast<EEncryptionMode>(group.GetEncryptionMode());
diff --git a/ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.h b/ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.h
index a1390f0495b..acbb52778e7 100644
--- a/ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.h
+++ b/ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.h
@@ -413,6 +413,8 @@ public:
const ui32 GroupGeneration;
// erasure primarily
const TBlobStorageGroupType Type;
+ // virtual group BlobDepot tablet id
+ std::optional<ui64> BlobDepotId;
// origin of the group info content
std::optional<NKikimrBlobStorage::TGroupInfo> Group;
diff --git a/ydb/core/blobstorage/nodewarden/CMakeLists.txt b/ydb/core/blobstorage/nodewarden/CMakeLists.txt
index fe97c15e817..33c640008da 100644
--- a/ydb/core/blobstorage/nodewarden/CMakeLists.txt
+++ b/ydb/core/blobstorage/nodewarden/CMakeLists.txt
@@ -13,6 +13,7 @@ target_link_libraries(core-blobstorage-nodewarden PUBLIC
yutil
library-cpp-json
ydb-core-base
+ core-blob_depot-agent
core-blobstorage-groupinfo
core-blobstorage-pdisk
ydb-core-control
diff --git a/ydb/core/blobstorage/nodewarden/defs.h b/ydb/core/blobstorage/nodewarden/defs.h
index a3a658d27f5..4977537493e 100644
--- a/ydb/core/blobstorage/nodewarden/defs.h
+++ b/ydb/core/blobstorage/nodewarden/defs.h
@@ -15,6 +15,7 @@
#include <ydb/core/base/appdata.h>
#include <ydb/core/base/tablet_pipe.h>
#include <ydb/core/base/tablet_resolver.h>
+#include <ydb/core/blob_depot/agent/agent.h>
#include <ydb/core/node_whiteboard/node_whiteboard.h>
#include <ydb/core/blobstorage/base/blobstorage_events.h>
#include <ydb/core/blobstorage/vdisk/vdisk_actor.h>
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_group.cpp b/ydb/core/blobstorage/nodewarden/node_warden_group.cpp
index 215cf321d91..32ce91af38f 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_group.cpp
+++ b/ydb/core/blobstorage/nodewarden/node_warden_group.cpp
@@ -207,7 +207,7 @@ namespace NKikimr::NStorage {
}
if (const auto& info = group.Info) {
- Send(WhiteboardId, new NNodeWhiteboard::TEvWhiteboard::TEvBSGroupStateUpdate(info, info->GetStoragePoolName()));
+ Send(WhiteboardId, new NNodeWhiteboard::TEvWhiteboard::TEvBSGroupStateUpdate(info));
for (auto& vdisk : group.VDisksOfGroup) {
UpdateGroupInfoForDisk(vdisk, info);
}
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.h b/ydb/core/blobstorage/nodewarden/node_warden_impl.h
index be67e5ef670..9b72c8dfdc1 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_impl.h
+++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.h
@@ -112,6 +112,9 @@ namespace NKikimr::NStorage {
TReplQuoter::TPtr ReplNodeResponseQuoter;
public:
+ struct TGroupRecord;
+
+ public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::NODE_WARDEN;
}
@@ -152,6 +155,7 @@ namespace NKikimr::NStorage {
void StartInvalidGroupProxy();
void StopInvalidGroupProxy();
void StartLocalProxy(ui32 groupId);
+ void StartVirtualGroupAgent(ui32 groupId);
void StartStaticProxies();
TVector<NPDisk::TDriveData> ListLocalDrives();
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_proxy.cpp b/ydb/core/blobstorage/nodewarden/node_warden_proxy.cpp
index 6000a0cb52a..faea46d7334 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_proxy.cpp
+++ b/ydb/core/blobstorage/nodewarden/node_warden_proxy.cpp
@@ -9,9 +9,6 @@ TActorId TNodeWarden::StartEjectedProxy(ui32 groupId) {
}
void TNodeWarden::StartLocalProxy(ui32 groupId) {
- auto& group = Groups[groupId];
- Y_VERIFY(!group.ProxyRunning);
- group.ProxyRunning = true;
STLOG(PRI_DEBUG, BS_NODE, NW12, "StartLocalProxy", (GroupId, groupId));
std::unique_ptr<IActor> proxy;
@@ -34,6 +31,19 @@ void TNodeWarden::StartLocalProxy(ui32 groupId) {
AppData()->SystemPoolId));
}
+void TNodeWarden::StartVirtualGroupAgent(ui32 groupId) {
+ STLOG(PRI_DEBUG, BS_NODE, NW40, "StartVirtualGroupProxy", (GroupId, groupId));
+
+ TActorSystem *as = TActivationContext::ActorSystem();
+ const TActorId actorId = as->Register(NBlobDepot::CreateBlobDepotAgent(groupId), TMailboxType::ReadAsFilled,
+ AppData()->SystemPoolId);
+ if (auto info = NeedGroupInfo(groupId)) {
+ auto counters = DsProxyPerPoolCounters->GetPoolCounters(info->GetStoragePoolName(), info->GetDeviceType());
+ Send(actorId, new TEvBlobStorage::TEvConfigureProxy(std::move(info), std::move(counters)));
+ }
+ as->RegisterLocalService(MakeBlobStorageProxyID(groupId), actorId);
+}
+
void TNodeWarden::StartStaticProxies() {
for (const auto& group : Cfg->ServiceSet.GetGroups()) {
StartLocalProxy(group.GetGroupID());
@@ -55,7 +65,12 @@ void TNodeWarden::HandleForwarded(TAutoPtr<::NActors::IEventHandle> &ev) {
TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, errorProxy, {}, nullptr, 0));
return;
} else if (TGroupRecord& group = Groups[id]; !group.ProxyRunning) {
- StartLocalProxy(id);
+ group.ProxyRunning = true;
+ if (TGroupID(id).ConfigurationType() == EGroupConfigurationType::Virtual) {
+ StartVirtualGroupAgent(id);
+ } else {
+ StartLocalProxy(id);
+ }
}
TActivationContext::Send(ev->Forward(ev->GetForwardOnNondeliveryRecipient()));
diff --git a/ydb/core/blobstorage/ut_blobstorage/blob_depot.cpp b/ydb/core/blobstorage/ut_blobstorage/blob_depot.cpp
new file mode 100644
index 00000000000..c0e03555c96
--- /dev/null
+++ b/ydb/core/blobstorage/ut_blobstorage/blob_depot.cpp
@@ -0,0 +1,37 @@
+#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h>
+
+Y_UNIT_TEST_SUITE(BlobDepot) {
+
+ Y_UNIT_TEST(Basic) {
+ TEnvironmentSetup env{{
+ .SetupTablets = true
+ }};
+
+ env.CreateBoxAndPool(1, 1);
+ env.Sim(TDuration::Seconds(20));
+
+ NKikimrBlobStorage::TConfigRequest request;
+ auto *cmd = request.AddCommand()->MutableAllocateVirtualGroup();
+ cmd->SetVirtualGroupPool("vg");
+ cmd->SetStoragePoolName(env.StoragePoolName);
+ cmd->SetParentDir("/Root");
+ auto *prof = cmd->AddChannelProfiles();
+ prof->SetStoragePoolKind(env.StoragePoolName);
+ prof->SetCount(3);
+
+ auto response = env.Invoke(request);
+ UNIT_ASSERT_C(response.GetSuccess(), response.GetErrorDescription());
+
+ ui32 groupId = response.GetStatus(0).GetGroupId(0);
+ Cerr << "groupId# " << groupId << Endl;
+ const TActorId& proxy = MakeBlobStorageProxyID(groupId);
+
+ auto sender = env.Runtime->AllocateEdgeActor(1);
+ TString data = "hello!";
+ TLogoBlobID id(1, 1, 1, 0, data.size(), 0);
+ env.Runtime->Send(new IEventHandle(proxy, sender, new TEvBlobStorage::TEvPut(id, data, TInstant::Max())), 1);
+ auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(sender);
+ Cerr << res->Get()->ToString() << Endl;
+ }
+
+}
diff --git a/ydb/core/blobstorage/ut_blobstorage/lib/CMakeLists.txt b/ydb/core/blobstorage/ut_blobstorage/lib/CMakeLists.txt
index e137266212c..8d4b52db8ea 100644
--- a/ydb/core/blobstorage/ut_blobstorage/lib/CMakeLists.txt
+++ b/ydb/core/blobstorage/ut_blobstorage/lib/CMakeLists.txt
@@ -14,13 +14,19 @@ target_link_libraries(blobstorage-ut_blobstorage-lib PUBLIC
cpp-digest-md5
cpp-testing-unittest
ydb-core-base
+ ydb-core-blob_depot
core-blobstorage-backpressure
blobstorage-dsproxy-mock
core-blobstorage-nodewarden
blobstorage-pdisk-mock
blobstorage-vdisk-common
+ ydb-core-mind
core-mind-bscontroller
+ core-mind-hive
core-tx-scheme_board
+ core-tx-tx_allocator
+ core-tx-mediator
+ core-tx-coordinator
ydb-core-util
udf-service-stub
yql-sql-pg_dummy
diff --git a/ydb/core/blobstorage/ut_blobstorage/lib/defs.h b/ydb/core/blobstorage/ut_blobstorage/lib/defs.h
index 1385b8fca36..c6835f67504 100644
--- a/ydb/core/blobstorage/ut_blobstorage/lib/defs.h
+++ b/ydb/core/blobstorage/ut_blobstorage/lib/defs.h
@@ -1,5 +1,7 @@
#pragma once
+#include <ydb/core/base/hive.h>
+#include <ydb/core/blob_depot/blob_depot.h>
#include <ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.h>
#include <ydb/core/blobstorage/dsproxy/mock/model.h>
#include <ydb/core/blobstorage/pdisk/mock/pdisk_mock.h>
@@ -10,6 +12,11 @@
#include <ydb/core/mind/bscontroller/bsc.h>
#include <ydb/core/mind/bscontroller/types.h>
#include <ydb/core/mind/dynamic_nameserver.h>
+#include <ydb/core/mind/local.h>
+#include <ydb/core/tx/coordinator/coordinator.h>
+#include <ydb/core/tx/tx_allocator/txallocator.h>
+#include <ydb/core/tx/mediator/mediator.h>
+#include <ydb/core/tx/scheme_board/cache.h>
#include <ydb/core/util/testactorsys.h>
#include <library/cpp/testing/unittest/registar.h>
#include <util/system/rusage.h>
diff --git a/ydb/core/blobstorage/ut_blobstorage/lib/env.h b/ydb/core/blobstorage/ut_blobstorage/lib/env.h
index 3aef0e6fcd3..a68b4757db3 100644
--- a/ydb/core/blobstorage/ut_blobstorage/lib/env.h
+++ b/ydb/core/blobstorage/ut_blobstorage/lib/env.h
@@ -31,6 +31,7 @@ struct TEnvironmentSetup {
const bool Cache = false;
const ui32 NumDataCenters = 0;
const std::function<TNodeLocation(ui32)> LocationGenerator;
+ const bool SetupTablets = false;
};
const TSettings Settings;
@@ -108,7 +109,11 @@ struct TEnvironmentSetup {
SetupLogging();
Runtime->Start();
auto *appData = Runtime->GetAppData();
- appData->DomainsInfo->AddDomain(TDomainsInfo::TDomain::ConstructEmptyDomain("dom", DomainId).Release());
+ if (Settings.SetupTablets) {
+ SetupDomainForTablets();
+ } else {
+ appData->DomainsInfo->AddDomain(TDomainsInfo::TDomain::ConstructEmptyDomain("dom", DomainId).Release());
+ }
if (Settings.LocationGenerator) {
Runtime->SetupTabletRuntime(Settings.LocationGenerator, Settings.ControllerNodeId);
} else {
@@ -117,6 +122,9 @@ struct TEnvironmentSetup {
SetupStaticStorage();
SetupTablet();
SetupStorage();
+ if (Settings.SetupTablets) {
+ InitRoot();
+ }
}
void StopNode(ui32 nodeId) {
@@ -225,6 +233,8 @@ struct TEnvironmentSetup {
// NKikimrServices::BS_PROXY_INDEXRESTOREGET,
// NKikimrServices::BS_PROXY_STATUS,
NActorsServices::TEST,
+ NKikimrServices::BLOB_DEPOT,
+ NKikimrServices::BLOB_DEPOT_AGENT,
// NActorsServices::INTERCONNECT,
// NActorsServices::INTERCONNECT_SESSION,
};
@@ -274,14 +284,103 @@ struct TEnvironmentSetup {
}
}
+ void SetupDomainForTablets() {
+ TAppData *appData = Runtime->GetAppData();
+
+ const ui32 hiveUid = 1;
+
+ appData->DomainsInfo->AddDomain(TDomainsInfo::TDomain::ConstructDomain<std::vector<ui32>, std::vector<ui64>>(
+ "Root",
+ DomainId,
+ 72075186232723360,
+ 1,
+ 1,
+ {1},
+ hiveUid,
+ {hiveUid},
+ 100,
+ {1, 2, 3},
+ {1, 2, 3},
+ {1, 2, 3}
+ ).Release());
+
+ appData->DomainsInfo->AddHive(hiveUid, MakeDefaultHiveID(1));
+ }
+
void SetupTablet() {
- Runtime->CreateTestBootstrapper(
- TTestActorSystem::CreateTestTabletInfo(TabletId, TTabletTypes::BSController, Settings.Erasure.GetErasure(), GroupId),
- &CreateFlatBsController,
- Settings.ControllerNodeId);
+ struct TTabletInfo {
+ ui64 TabletId;
+ TTabletTypes::EType Type;
+ IActor* (*Create)(const TActorId&, TTabletStorageInfo*);
+ };
+ std::vector<TTabletInfo> tablets{
+ {MakeBSControllerID(DomainId), TTabletTypes::BSController, &CreateFlatBsController},
+ };
+
+ auto *appData = Runtime->GetAppData();
+
+ for (const auto& [uid, tabletId] : appData->DomainsInfo->HivesByHiveUid) {
+ tablets.push_back(TTabletInfo{tabletId, TTabletTypes::Hive, &CreateDefaultHive});
+ }
+
+ const TDomainsInfo::TDomain& domain = appData->DomainsInfo->GetDomain(DomainId);
+ if (domain.SchemeRoot) {
+ tablets.push_back(TTabletInfo{domain.SchemeRoot, TTabletTypes::SchemeShard, &CreateFlatTxSchemeShard});
+ }
+ for (const ui64 tabletId : domain.Coordinators) {
+ tablets.push_back(TTabletInfo{tabletId, TTabletTypes::Coordinator, &CreateFlatTxCoordinator});
+ }
+ for (const ui64 tabletId : domain.Mediators) {
+ tablets.push_back(TTabletInfo{tabletId, TTabletTypes::Mediator, &CreateTxMediator});
+ }
+ TVector<ui64> allocators;
+ for (const ui64 tabletId : domain.TxAllocators) {
+ tablets.push_back(TTabletInfo{tabletId, TTabletTypes::TxAllocator, &CreateTxAllocator});
+ allocators.push_back(tabletId);
+ }
+
+ for (const TTabletInfo& tablet : tablets) {
+ Runtime->CreateTestBootstrapper(
+ TTestActorSystem::CreateTestTabletInfo(tablet.TabletId, tablet.Type, Settings.Erasure.GetErasure(), GroupId),
+ tablet.Create, Settings.ControllerNodeId);
+
+ bool working = true;
+ Runtime->Sim([&] { return working; }, [&](IEventHandle& event) { working = event.GetTypeRewrite() != TEvTablet::EvBoot; });
+ }
+
+ auto localConfig = MakeIntrusive<TLocalConfig>();
+ localConfig->TabletClassInfo[TTabletTypes::BlobDepot] = TLocalConfig::TTabletClassInfo(new TTabletSetupInfo(
+ &NBlobDepot::CreateBlobDepot, TMailboxType::ReadAsFilled, appData->SystemPoolId, TMailboxType::ReadAsFilled,
+ appData->SystemPoolId));
- bool working = true;
- Runtime->Sim([&] { return working; }, [&](IEventHandle& event) { working = event.GetTypeRewrite() != TEvTablet::EvBoot; });
+ if (Settings.SetupTablets) {
+ for (ui32 nodeId : Runtime->GetNodes()) {
+ Runtime->RegisterService(MakeTxProxyID(), Runtime->Register(CreateTxProxy(allocators), nodeId));
+ Runtime->RegisterService(MakeLocalID(nodeId), Runtime->Register(CreateLocal(localConfig.Get()), nodeId));
+
+ auto config = MakeIntrusive<NSchemeCache::TSchemeCacheConfig>();
+ config->Roots.emplace_back(DomainId, domain.SchemeRoot, domain.Name);
+ config->Counters = MakeIntrusive<NMonitoring::TDynamicCounters>();
+ Runtime->RegisterService(MakeSchemeCacheID(), Runtime->Register(CreateSchemeBoardSchemeCache(config.Get()), nodeId));
+ }
+ }
+ }
+
+ void InitRoot() {
+ auto *appData = Runtime->GetAppData();
+ auto& domain = appData->DomainsInfo->GetDomain(DomainId);
+ if (domain.SchemeRoot) {
+ auto edge = Runtime->AllocateEdgeActor(1);
+ auto pipeId = Runtime->Register(NTabletPipe::CreateClient(edge, domain.SchemeRoot), 1);
+ Runtime->WrapInActorContext(edge, [&] {
+ const TActorContext& ctx = TActivationContext::AsActorContext();
+ NTabletPipe::SendData(ctx, pipeId, new NSchemeShard::TEvSchemeShard::TEvInitRootShard(edge, domain.DomainRootTag(), domain.Name));
+ NTabletPipe::CloseClient(ctx, pipeId);
+ });
+ auto res = WaitForEdgeActorEvent<NSchemeShard::TEvSchemeShard::TEvInitRootShardResult>(edge);
+ Cerr << res->Get()->ToString() << Endl;
+ ::exit(1);
+ }
}
void CreateBoxAndPool(ui32 numDrivesPerNode = 0, ui32 numGroups = 0, ui32 numStorageNodes = 0) {
@@ -312,6 +411,7 @@ struct TEnvironmentSetup {
cmd2->SetBoxId(1);
cmd2->SetStoragePoolId(1);
cmd2->SetName(StoragePoolName);
+ cmd2->SetKind(StoragePoolName);
cmd2->SetErasureSpecies(TBlobStorageGroupType::ErasureSpeciesName(Settings.Erasure.GetErasure()));
cmd2->SetVDiskKind("Default");
cmd2->SetNumGroups(numGroups ? numGroups : NumGroups);
diff --git a/ydb/core/blobstorage/ut_blobstorage/ut_blob_depot/CMakeLists.darwin.txt b/ydb/core/blobstorage/ut_blobstorage/ut_blob_depot/CMakeLists.darwin.txt
new file mode 100644
index 00000000000..2f4720a4100
--- /dev/null
+++ b/ydb/core/blobstorage/ut_blobstorage/ut_blob_depot/CMakeLists.darwin.txt
@@ -0,0 +1,43 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-core-blobstorage-ut_blobstorage-ut_blob_depot)
+target_include_directories(ydb-core-blobstorage-ut_blobstorage-ut_blob_depot PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage
+)
+target_link_libraries(ydb-core-blobstorage-ut_blobstorage-ut_blob_depot PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ blobstorage-ut_blobstorage-lib
+)
+target_link_options(ydb-core-blobstorage-ut_blobstorage-ut_blob_depot PRIVATE
+ -Wl,-no_deduplicate
+ -Wl,-sdk_version,10.15
+ -fPIC
+ -fPIC
+ -framework
+ CoreFoundation
+)
+target_sources(ydb-core-blobstorage-ut_blobstorage-ut_blob_depot PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/blob_depot.cpp
+)
+add_test(
+ NAME
+ ydb-core-blobstorage-ut_blobstorage-ut_blob_depot
+ COMMAND
+ ydb-core-blobstorage-ut_blobstorage-ut_blob_depot
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+vcs_info(ydb-core-blobstorage-ut_blobstorage-ut_blob_depot)
diff --git a/ydb/core/blobstorage/ut_blobstorage/ut_blob_depot/CMakeLists.linux.txt b/ydb/core/blobstorage/ut_blobstorage/ut_blob_depot/CMakeLists.linux.txt
new file mode 100644
index 00000000000..618dea3234f
--- /dev/null
+++ b/ydb/core/blobstorage/ut_blobstorage/ut_blob_depot/CMakeLists.linux.txt
@@ -0,0 +1,46 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-core-blobstorage-ut_blobstorage-ut_blob_depot)
+target_include_directories(ydb-core-blobstorage-ut_blobstorage-ut_blob_depot PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage
+)
+target_link_libraries(ydb-core-blobstorage-ut_blobstorage-ut_blob_depot PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-lfalloc
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ blobstorage-ut_blobstorage-lib
+)
+target_link_options(ydb-core-blobstorage-ut_blobstorage-ut_blob_depot PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(ydb-core-blobstorage-ut_blobstorage-ut_blob_depot PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/ut_blobstorage/blob_depot.cpp
+)
+add_test(
+ NAME
+ ydb-core-blobstorage-ut_blobstorage-ut_blob_depot
+ COMMAND
+ ydb-core-blobstorage-ut_blobstorage-ut_blob_depot
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+vcs_info(ydb-core-blobstorage-ut_blobstorage-ut_blob_depot)
diff --git a/ydb/core/blobstorage/ut_blobstorage/ut_blob_depot/CMakeLists.txt b/ydb/core/blobstorage/ut_blobstorage/ut_blob_depot/CMakeLists.txt
new file mode 100644
index 00000000000..a681d385f3e
--- /dev/null
+++ b/ydb/core/blobstorage/ut_blobstorage/ut_blob_depot/CMakeLists.txt
@@ -0,0 +1,13 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (APPLE)
+ include(CMakeLists.darwin.txt)
+elseif (UNIX)
+ include(CMakeLists.linux.txt)
+endif()
diff --git a/ydb/core/mind/bscontroller/config.cpp b/ydb/core/mind/bscontroller/config.cpp
index 2f1978ae380..7c63032a6e2 100644
--- a/ydb/core/mind/bscontroller/config.cpp
+++ b/ydb/core/mind/bscontroller/config.cpp
@@ -894,9 +894,7 @@ namespace NKikimr::NBsController {
const TString& storagePoolName, const TMaybe<TKikimrScopeId>& scopeId) {
group->SetGroupID(groupInfo.ID);
group->SetGroupGeneration(groupInfo.Generation);
- group->SetErasureSpecies(groupInfo.ErasureSpecies);
group->SetStoragePoolName(storagePoolName);
- group->SetDeviceType(PDiskTypeToPDiskType(groupInfo.GetCommonDeviceType()));
group->SetEncryptionMode(groupInfo.EncryptionMode.GetOrElse(0));
group->SetLifeCyclePhase(groupInfo.LifeCyclePhase.GetOrElse(0));
@@ -912,28 +910,38 @@ namespace NKikimr::NBsController {
pb->SetX2(x.second);
}
- std::vector<std::pair<TVDiskID, const TVSlotInfo*>> vdisks;
- for (const auto& vslot : groupInfo.VDisksInGroup) {
- vdisks.emplace_back(vslot->GetVDiskId(), vslot);
- }
- auto comp = [](const auto& x, const auto& y) { return x.first < y.first; };
- std::sort(vdisks.begin(), vdisks.end(), comp);
+ if (!groupInfo.VirtualGroupState) {
+ group->SetErasureSpecies(groupInfo.ErasureSpecies);
+ group->SetDeviceType(PDiskTypeToPDiskType(groupInfo.GetCommonDeviceType()));
- TVDiskID prevVDiskId;
- NKikimrBlobStorage::TGroupInfo::TFailRealm *realm = nullptr;
- NKikimrBlobStorage::TGroupInfo::TFailRealm::TFailDomain *domain = nullptr;
- for (const auto& [vdiskId, vslot] : vdisks) {
- if (!realm || prevVDiskId.FailRealm != vdiskId.FailRealm) {
- realm = group->AddRings();
- domain = nullptr;
- }
- if (!domain || prevVDiskId.FailDomain != vdiskId.FailDomain) {
- Y_VERIFY(realm);
- domain = realm->AddFailDomains();
+ std::vector<std::pair<TVDiskID, const TVSlotInfo*>> vdisks;
+ for (const auto& vslot : groupInfo.VDisksInGroup) {
+ vdisks.emplace_back(vslot->GetVDiskId(), vslot);
}
- prevVDiskId = vdiskId;
+ auto comp = [](const auto& x, const auto& y) { return x.first < y.first; };
+ std::sort(vdisks.begin(), vdisks.end(), comp);
+
+ TVDiskID prevVDiskId;
+ NKikimrBlobStorage::TGroupInfo::TFailRealm *realm = nullptr;
+ NKikimrBlobStorage::TGroupInfo::TFailRealm::TFailDomain *domain = nullptr;
+ for (const auto& [vdiskId, vslot] : vdisks) {
+ if (!realm || prevVDiskId.FailRealm != vdiskId.FailRealm) {
+ realm = group->AddRings();
+ domain = nullptr;
+ }
+ if (!domain || prevVDiskId.FailDomain != vdiskId.FailDomain) {
+ Y_VERIFY(realm);
+ domain = realm->AddFailDomains();
+ }
+ prevVDiskId = vdiskId;
- Serialize(domain->AddVDiskLocations(), *vslot);
+ Serialize(domain->AddVDiskLocations(), *vslot);
+ }
+ } else if (*groupInfo.VirtualGroupState == NKikimrBlobStorage::EVirtualGroupState::WORKING) {
+ Y_VERIFY(groupInfo.BlobDepotId);
+ group->SetBlobDepotId(*groupInfo.BlobDepotId);
+ } else if (*groupInfo.VirtualGroupState == NKikimrBlobStorage::EVirtualGroupState::CREATE_FAILED) {
+ group->SetBlobDepotId(0);
}
}
diff --git a/ydb/core/mind/bscontroller/get_group.cpp b/ydb/core/mind/bscontroller/get_group.cpp
index 235c3ee8d23..25c41f3b585 100644
--- a/ydb/core/mind/bscontroller/get_group.cpp
+++ b/ydb/core/mind/bscontroller/get_group.cpp
@@ -26,7 +26,7 @@ public:
const TNodeId nodeId = request->Get()->Record.GetNodeID();
auto res = std::make_unique<TEvBlobStorage::TEvControllerNodeServiceSetUpdate>(NKikimrProto::OK, nodeId);
- Self->ReadGroups(groupIDsToRead, true, res.get());
+ Self->ReadGroups(groupIDsToRead, true, res.get(), nodeId);
Response = std::make_unique<IEventHandle>(MakeBlobStorageNodeWardenID(nodeId), Self->SelfId(), res.release());
return true;
diff --git a/ydb/core/mind/bscontroller/group_reconfigure_wipe.cpp b/ydb/core/mind/bscontroller/group_reconfigure_wipe.cpp
index 7c274224baf..9a7aff6c7fd 100644
--- a/ydb/core/mind/bscontroller/group_reconfigure_wipe.cpp
+++ b/ydb/core/mind/bscontroller/group_reconfigure_wipe.cpp
@@ -52,7 +52,7 @@ public:
Self->ReadVSlot(*info, msg.Get());
TSet<ui32> groupIDsToRead;
groupIDsToRead.insert(info->GroupId);
- Self->ReadGroups(groupIDsToRead, false, msg.Get());
+ Self->ReadGroups(groupIDsToRead, false, msg.Get(), id.NodeId);
for (const TGroupId groupId : groupIDsToRead) {
STLOG(PRI_ERROR, BS_CONTROLLER, BSCTXGRW05, "No configuration for group", (GroupId, groupId));
}
diff --git a/ydb/core/mind/bscontroller/impl.h b/ydb/core/mind/bscontroller/impl.h
index e8d55783750..0e9bcac4982 100644
--- a/ydb/core/mind/bscontroller/impl.h
+++ b/ydb/core/mind/bscontroller/impl.h
@@ -516,6 +516,9 @@ public:
TActorId VirtualGroupSetupMachineId;
+ // nodes waiting for this group to become listable
+ THashSet<TNodeId> WaitingNodes;
+
// group's geometry; it doesn't ever change since the group is created
const ui32 NumFailRealms = 0;
const ui32 NumFailDomainsPerFailRealm = 0;
@@ -641,6 +644,12 @@ public:
Y_VERIFY(VDisksInGroup.size() == Topology->GetTotalVDisksNum());
}
+ bool Listable() const {
+ return !VirtualGroupState
+ || *VirtualGroupState == NKikimrBlobStorage::EVirtualGroupState::WORKING
+ || *VirtualGroupState == NKikimrBlobStorage::EVirtualGroupState::CREATE_FAILED;
+ }
+
void ClearVDisksInGroup() {
std::fill(VDisksInGroup.begin(), VDisksInGroup.end(), nullptr);
}
@@ -798,6 +807,7 @@ public:
TInstant LastDisconnectTimestamp;
// in-mem only
std::map<TString, NPDisk::TDriveData> KnownDrives;
+ THashSet<TGroupId> WaitingForGroups;
template<typename T>
static void Apply(TBlobStorageController* /*controller*/, T&& callback) {
@@ -1589,7 +1599,8 @@ private:
TDeque<TAutoPtr<IEventHandle>> InitQueue;
THashMap<Schema::Group::Owner::Type, Schema::Group::ID::Type> OwnerIdIdxToGroup;
- void ReadGroups(TSet<ui32>& groupIDsToRead, bool discard, TEvBlobStorage::TEvControllerNodeServiceSetUpdate *result);
+ void ReadGroups(TSet<ui32>& groupIDsToRead, bool discard, TEvBlobStorage::TEvControllerNodeServiceSetUpdate *result,
+ TNodeId nodeId);
void ReadPDisk(const TPDiskId& pdiskId, const TPDiskInfo& pdisk,
TEvBlobStorage::TEvControllerNodeServiceSetUpdate *result,
diff --git a/ydb/core/mind/bscontroller/register_node.cpp b/ydb/core/mind/bscontroller/register_node.cpp
index 658e6d92b2e..f1a88a8546e 100644
--- a/ydb/core/mind/bscontroller/register_node.cpp
+++ b/ydb/core/mind/bscontroller/register_node.cpp
@@ -280,10 +280,10 @@ public:
}
}
- Self->ReadGroups(groupIDsToRead, false, res.get());
+ Self->ReadGroups(groupIDsToRead, false, res.get(), nodeId);
Y_VERIFY(groupIDsToRead.empty());
- Self->ReadGroups(groupsToDiscard, true, res.get());
+ Self->ReadGroups(groupsToDiscard, true, res.get(), nodeId);
for (auto it = Self->PDisks.lower_bound(minPDiskId); it != Self->PDisks.end() && it->first.NodeId == nodeId; ++it) {
Self->ReadPDisk(it->first, *it->second, res.get(), NKikimrBlobStorage::INITIAL);
@@ -331,16 +331,17 @@ public:
};
void TBlobStorageController::ReadGroups(TSet<ui32>& groupIDsToRead, bool discard,
- TEvBlobStorage::TEvControllerNodeServiceSetUpdate *result) {
+ TEvBlobStorage::TEvControllerNodeServiceSetUpdate *result, TNodeId nodeId) {
for (auto it = groupIDsToRead.begin(); it != groupIDsToRead.end(); ) {
const TGroupId groupId = *it;
- if (TGroupInfo *group = FindGroup(groupId); group || discard) {
+ TGroupInfo *group = FindGroup(groupId);
+ if (group || discard) {
NKikimrBlobStorage::TNodeWardenServiceSet *serviceSetProto = result->Record.MutableServiceSet();
NKikimrBlobStorage::TGroupInfo *groupProto = serviceSetProto->AddGroups();
if (!group) {
groupProto->SetGroupID(groupId);
groupProto->SetEntityStatus(NKikimrBlobStorage::DESTROY);
- } else {
+ } else if (group->Listable()) {
const TStoragePoolInfo& info = StoragePools.at(group->StoragePoolId);
TMaybe<TKikimrScopeId> scopeId;
@@ -351,6 +352,10 @@ void TBlobStorageController::ReadGroups(TSet<ui32>& groupIDsToRead, bool discard
}
SerializeGroupInfo(groupProto, *group, info.Name, scopeId);
+ } else {
+ // group is not listable, so we have to postpone the request from NW
+ group->WaitingNodes.insert(nodeId);
+ GetNode(nodeId).WaitingForGroups.insert(group->ID);
}
// this group is processed, remove it from the set
@@ -467,6 +472,12 @@ void TBlobStorageController::OnWardenDisconnected(TNodeId nodeId) {
return; // there are still some connections from this NW
}
+ for (const TGroupId groupId : std::exchange(node.WaitingForGroups, {})) {
+ if (TGroupInfo *group = FindGroup(groupId)) {
+ group->WaitingNodes.erase(nodeId);
+ }
+ }
+
const TInstant now = TActivationContext::Now();
std::vector<std::pair<TVSlotId, TInstant>> lastSeenReadyQ;
for (auto it = PDisks.lower_bound(TPDiskId::MinForNode(nodeId)); it != PDisks.end() && it->first.NodeId == nodeId; ++it) {
diff --git a/ydb/core/mind/bscontroller/select_groups.h b/ydb/core/mind/bscontroller/select_groups.h
index 6a2988949c8..3533a972179 100644
--- a/ydb/core/mind/bscontroller/select_groups.h
+++ b/ydb/core/mind/bscontroller/select_groups.h
@@ -23,7 +23,7 @@ namespace NKikimr {
for (TGroupId groupId : iter->second) {
const TGroupInfo *group = controller.FindGroup(groupId);
Y_VERIFY_DEBUG(group);
- if (group) {
+ if (group && group->Listable()) {
groups.push_back(group);
}
}
@@ -50,7 +50,7 @@ namespace NKikimr {
for (auto it = storagePoolGroups.lower_bound(id); it != storagePoolGroups.end() && it->first == id; ++it) {
const TGroupInfo *groupInfo = findGroupCallback(it->second);
Y_VERIFY_DEBUG(groupInfo);
- if (groupInfo) {
+ if (groupInfo && groupInfo->Listable()) {
groups.push_back(groupInfo);
}
}
diff --git a/ydb/core/mind/bscontroller/virtual_group.cpp b/ydb/core/mind/bscontroller/virtual_group.cpp
index 7937fe57933..24e041ea9eb 100644
--- a/ydb/core/mind/bscontroller/virtual_group.cpp
+++ b/ydb/core/mind/bscontroller/virtual_group.cpp
@@ -168,7 +168,8 @@ namespace NKikimr::NBsController {
case NKikimrBlobStorage::EVirtualGroupState::CREATE_FAILED:
case NKikimrBlobStorage::EVirtualGroupState::WORKING:
- GetGroup()->VirtualGroupSetupMachineId = {};
+ IssueNodeNotifications(group);
+ group->VirtualGroupSetupMachineId = {};
PassAway();
break;
@@ -291,6 +292,20 @@ namespace NKikimr::NBsController {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ void IssueNodeNotifications(TGroupInfo *group) {
+ for (const TNodeId nodeId : std::exchange(group->WaitingNodes, {})) {
+ TNodeInfo& node = Self->GetNode(nodeId);
+ node.WaitingForGroups.erase(group->ID);
+ auto ev = std::make_unique<TEvBlobStorage::TEvControllerNodeServiceSetUpdate>(NKikimrProto::OK, nodeId);
+ TSet<ui32> groups;
+ groups.insert(group->ID);
+ Self->ReadGroups(groups, false, ev.get(), nodeId);
+ Send(MakeBlobStorageNodeWardenID(nodeId), ev.release());
+ }
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
void PassAway() override {
NTabletPipe::CloseAndForgetClient(SelfId(), SchemeshardPipeId);
TActorBootstrapped::PassAway();
diff --git a/ydb/core/node_whiteboard/node_whiteboard.h b/ydb/core/node_whiteboard/node_whiteboard.h
index 9c7a1cabb02..8769068f371 100644
--- a/ydb/core/node_whiteboard/node_whiteboard.h
+++ b/ydb/core/node_whiteboard/node_whiteboard.h
@@ -236,18 +236,14 @@ struct TEvWhiteboard{
struct TEvBSGroupStateUpdate : TEventPB<TEvBSGroupStateUpdate, NKikimrWhiteboard::TBSGroupStateInfo, EvBSGroupStateUpdate> {
TEvBSGroupStateUpdate() = default;
- TEvBSGroupStateUpdate(const TIntrusivePtr<TBlobStorageGroupInfo>& groupInfo, const TMaybe<TString>& storagePoolName) {
+ TEvBSGroupStateUpdate(const TIntrusivePtr<TBlobStorageGroupInfo>& groupInfo) {
Record.SetGroupID(groupInfo->GroupID);
Record.SetGroupGeneration(groupInfo->GroupGeneration);
Record.SetErasureSpecies(groupInfo->Type.ErasureSpeciesName(groupInfo->Type.GetErasure()));
- for (auto it = groupInfo->VDisksBegin(), end = groupInfo->VDisksEnd(); it != end; ++it) {
- auto vd = groupInfo->GetVDiskId(it->OrderNumber);
- NKikimrBlobStorage::TVDiskID* addedVDisk = Record.AddVDiskIds();
- VDiskIDFromVDiskID(vd, addedVDisk);
- }
- if (storagePoolName) {
- Record.SetStoragePoolName(*storagePoolName);
+ for (ui32 i = 0; i < groupInfo->GetTotalVDisksNum(); ++i) {
+ VDiskIDFromVDiskID(groupInfo->GetVDiskId(i), Record.AddVDiskIds());
}
+ Record.SetStoragePoolName(groupInfo->GetStoragePoolName());
}
};
diff --git a/ydb/core/protos/blob_depot.proto b/ydb/core/protos/blob_depot.proto
index d1707a43f1f..a1cca5e67b0 100644
--- a/ydb/core/protos/blob_depot.proto
+++ b/ydb/core/protos/blob_depot.proto
@@ -1,7 +1,28 @@
import "ydb/core/protos/blob_depot_config.proto";
+import "ydb/core/protos/base.proto";
package NKikimrBlobDepot;
+message TBlobSeqId {
+ optional uint32 Channel = 1;
+ optional uint32 Generation = 2;
+ optional uint32 Step = 3;
+ optional uint32 Index = 4;
+}
+
+message TBlobLocator {
+ optional uint32 GroupId = 1;
+ optional TBlobSeqId BlobSeqId = 2;
+ optional uint32 Checksum = 3;
+ optional uint64 TotalDataLen = 4;
+}
+
+message TValueChain {
+ optional TBlobLocator Locator = 1;
+ optional uint64 SubrangeBegin = 2;
+ optional uint64 SubrangeEnd = 3;
+}
+
message TEvApplyConfig {
optional uint64 TxId = 1;
optional NKikimrBlobDepot.TBlobDepotConfig Config = 2;
@@ -11,3 +32,94 @@ message TEvApplyConfigResult {
optional uint64 TabletId = 1;
optional uint64 TxId = 2;
}
+
+message TEvRegisterAgent {
+ optional uint32 VirtualGroupId = 1; // for validation purposes
+}
+
+message TEvRegisterAgentResult {
+ repeated uint32 ChannelGroups = 1;
+ optional uint32 Generation = 2;
+}
+
+message TEvAllocateIds {
+}
+
+message TEvAllocateIdsResult {
+ optional uint32 Generation = 1; // executor generation, for validation purposes
+ optional uint64 RangeBegin = 2; // <Generation> <Step> <Channel>
+ optional uint64 RangeEnd = 3;
+}
+
+message TEvBlock {
+ optional fixed64 TabletId = 1;
+ optional uint32 BlockedGeneration = 2;
+}
+
+message TEvBlockResult {
+ optional NKikimrProto.EReplyStatus Status = 1;
+ optional string ErrorReason = 2;
+ optional uint32 TimeToLiveMs = 3;
+}
+
+message TEvPushNotify { // BlobDepot -> Agent push notification (to take some action)
+ repeated fixed64 UpdateBlocksForTabletIds = 1; // notify about some changed blocks
+}
+
+message TEvQueryBlocks {
+ repeated fixed64 TabletIds = 1;
+}
+
+message TEvQueryBlocksResult {
+ repeated uint32 BlockedGenerations = 1;
+ optional uint32 TimeToLiveMs = 2; // TTL starting since sending TEvQueryBlocks at agent
+}
+
+message TEvCommitBlobSeq {
+ message TItem {
+ optional TBlobLocator BlobLocator = 1; // GroupId and Generation are for validation purposes
+ }
+
+ repeated TItem Items = 1;
+}
+
+message TEvCommitBlobSeqResult {
+ message TItem {
+ optional NKikimrProto.EReplyStatus Status = 1;
+ optional string ErrorReason = 2;
+ }
+
+ repeated TItem Items = 1;
+}
+
+////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+// TEvResolve -- lookup key in the Data table (and apply noncommitted changes if necessary) and return value chain for
+// each of the found entries.
+
+message TEvResolve {
+ message TItem {
+ optional bytes BeginningKey = 1; // start from the very first key (if not set)
+ optional bool IncludeBeginning = 2 [default = true];
+ optional bytes EndingKey = 3; // end with the key beyond the last one (if not set)
+ optional bool IncludeEnding = 4 [default = false];
+ optional uint32 MaxKeys = 5 [default = 0];
+ optional bool ReturnMeta = 6 [default = false];
+ optional bool ReturnOwners = 7 [default = false];
+ optional bool Reverse = 8 [default = false]; // reverse output
+ }
+
+ repeated TItem Items = 1;
+}
+
+message TEvResolveResult {
+ message TResolvedKey {
+ optional uint32 ItemIndex = 1;
+ optional bytes Key = 2;
+ repeated TValueChain ValueChain = 3;
+ optional bytes Meta = 4;
+ repeated uint64 Owners = 5;
+ }
+ optional NKikimrProto.EReplyStatus Status = 1; // OVERRUN means there are more messages on the way
+ optional string ErrorReason = 2;
+ repeated TResolvedKey ResolvedKeys = 3;
+}
diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto
index ab2765b94ab..f59945ce01d 100644
--- a/ydb/core/protos/services.proto
+++ b/ydb/core/protos/services.proto
@@ -316,6 +316,7 @@ enum EServiceKikimr {
// Blob depot
BLOB_DEPOT = 1300;
+ BLOB_DEPOT_AGENT = 1301;
};
message TActivity {