diff options
author | alexvru <alexvru@ydb.tech> | 2022-08-03 17:48:58 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2022-08-03 17:48:58 +0300 |
commit | 591ac6799945ac99f1e5a377d2221bd3b4d516e8 (patch) | |
tree | a2c9df301811f5b76ad6c212cc2df207e601429f | |
parent | 1ebdd3a7ff6912ae7909a98c7d472a793983b71e (diff) | |
download | ydb-591ac6799945ac99f1e5a377d2221bd3b4d516e8.tar.gz |
BlobDepot work in progress
-rw-r--r-- | CMakeLists.darwin.txt | 36 | ||||
-rw-r--r-- | CMakeLists.linux.txt | 36 | ||||
-rw-r--r-- | ydb/core/blob_depot/CMakeLists.txt | 2 | ||||
-rw-r--r-- | ydb/core/blob_depot/assimilator.cpp | 168 | ||||
-rw-r--r-- | ydb/core/blob_depot/assimilator_fetch_machine.cpp | 286 | ||||
-rw-r--r-- | ydb/core/blob_depot/assimilator_fetch_machine.h | 132 | ||||
-rw-r--r-- | ydb/core/blob_depot/blob_depot.cpp | 11 | ||||
-rw-r--r-- | ydb/core/blob_depot/blob_depot_tablet.h | 136 | ||||
-rw-r--r-- | ydb/core/blob_depot/defs.h | 3 | ||||
-rw-r--r-- | ydb/core/blob_depot/op_apply_config.cpp | 2 | ||||
-rw-r--r-- | ydb/core/blob_depot/op_load.cpp | 2 | ||||
-rw-r--r-- | ydb/core/mind/bscontroller/config.cpp | 2 | ||||
-rw-r--r-- | ydb/core/mind/bscontroller/get_group.cpp | 15 | ||||
-rw-r--r-- | ydb/core/mind/bscontroller/load_everything.cpp | 4 | ||||
-rw-r--r-- | ydb/core/mind/bscontroller/register_node.cpp | 2 | ||||
-rw-r--r-- | ydb/core/mind/bscontroller/virtual_group.cpp | 2 |
16 files changed, 788 insertions, 51 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt index ad538189a1..65ba8074c9 100644 --- a/CMakeLists.darwin.txt +++ b/CMakeLists.darwin.txt @@ -428,8 +428,21 @@ add_subdirectory(library/cpp/ipmath) add_subdirectory(library/cpp/ipv6_address) add_subdirectory(library/cpp/int128) add_subdirectory(ydb/core/blob_depot) -add_subdirectory(ydb/core/tablet_flat) +add_subdirectory(ydb/core/blobstorage/vdisk/common) +add_subdirectory(ydb/core/blobstorage/vdisk/hulldb/base) +add_subdirectory(ydb/core/blobstorage/pdisk) +add_subdirectory(ydb/core/blobstorage/lwtrace_probes) add_subdirectory(ydb/core/control) +add_subdirectory(ydb/library/schlab) +add_subdirectory(ydb/library/schlab/schine) +add_subdirectory(ydb/library/schlab/probes) +add_subdirectory(ydb/library/schlab/mon) +add_subdirectory(ydb/library/schlab/schemu) +add_subdirectory(ydb/library/schlab/schoot) +add_subdirectory(library/cpp/protobuf/json) +add_subdirectory(ydb/library/schlab/protos) +add_subdirectory(ydb/core/blobstorage/vdisk/protos) +add_subdirectory(ydb/core/tablet_flat) add_subdirectory(ydb/core/tablet) add_subdirectory(ydb/core/mon_alloc) add_subdirectory(library/cpp/lfalloc/alloc_profiler) @@ -446,19 +459,6 @@ add_subdirectory(ydb/library/persqueue/topic_parser_public) add_subdirectory(ydb/core/tablet_flat/protos) add_subdirectory(ydb/core/blobstorage) add_subdirectory(ydb/core/blobstorage/backpressure) -add_subdirectory(ydb/core/blobstorage/lwtrace_probes) -add_subdirectory(ydb/core/blobstorage/vdisk/common) -add_subdirectory(ydb/core/blobstorage/vdisk/hulldb/base) -add_subdirectory(ydb/core/blobstorage/pdisk) -add_subdirectory(ydb/library/schlab) -add_subdirectory(ydb/library/schlab/schine) -add_subdirectory(ydb/library/schlab/probes) -add_subdirectory(ydb/library/schlab/mon) -add_subdirectory(ydb/library/schlab/schemu) -add_subdirectory(ydb/library/schlab/schoot) -add_subdirectory(library/cpp/protobuf/json) -add_subdirectory(ydb/library/schlab/protos) -add_subdirectory(ydb/core/blobstorage/vdisk/protos) add_subdirectory(ydb/core/blobstorage/dsproxy) add_subdirectory(ydb/core/blobstorage/storagepoolmon) add_subdirectory(ydb/core/blobstorage/incrhuge) @@ -1146,6 +1146,10 @@ add_subdirectory(ydb/library/yql/minikql/perf/param) add_subdirectory(ydb/library/yql/minikql/perf/presort) add_subdirectory(library/cpp/presort) add_subdirectory(ydb/library/yql/sql/v1/perf) +add_subdirectory(ydb/library/schlab/mon/test) +add_subdirectory(ydb/library/schlab/mon/static) +add_subdirectory(ydb/library/schlab/mon/static/css) +add_subdirectory(ydb/library/schlab/mon/static/js) add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_blob_depot) add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_donor) add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_group_reconfiguration) @@ -1165,10 +1169,6 @@ add_subdirectory(ydb/core/blobstorage/nodewarden/ut) add_subdirectory(ydb/core/blobstorage/nodewarden/ut_sequence) add_subdirectory(ydb/core/blobstorage/pdisk/ut) add_subdirectory(ydb/core/blobstorage/storagepoolmon/ut) -add_subdirectory(ydb/library/schlab/mon/test) -add_subdirectory(ydb/library/schlab/mon/static) -add_subdirectory(ydb/library/schlab/mon/static/css) -add_subdirectory(ydb/library/schlab/mon/static/js) add_subdirectory(ydb/core/tx/balance_coverage/ut) add_subdirectory(ydb/core/tx/columnshard/ut) add_subdirectory(ydb/core/tx/coordinator/ut) diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt index 8427acedd0..790425d950 100644 --- a/CMakeLists.linux.txt +++ b/CMakeLists.linux.txt @@ -431,8 +431,21 @@ add_subdirectory(library/cpp/ipmath) add_subdirectory(library/cpp/ipv6_address) add_subdirectory(library/cpp/int128) add_subdirectory(ydb/core/blob_depot) -add_subdirectory(ydb/core/tablet_flat) +add_subdirectory(ydb/core/blobstorage/vdisk/common) +add_subdirectory(ydb/core/blobstorage/vdisk/hulldb/base) +add_subdirectory(ydb/core/blobstorage/pdisk) +add_subdirectory(ydb/core/blobstorage/lwtrace_probes) add_subdirectory(ydb/core/control) +add_subdirectory(ydb/library/schlab) +add_subdirectory(ydb/library/schlab/schine) +add_subdirectory(ydb/library/schlab/probes) +add_subdirectory(ydb/library/schlab/mon) +add_subdirectory(ydb/library/schlab/schemu) +add_subdirectory(ydb/library/schlab/schoot) +add_subdirectory(library/cpp/protobuf/json) +add_subdirectory(ydb/library/schlab/protos) +add_subdirectory(ydb/core/blobstorage/vdisk/protos) +add_subdirectory(ydb/core/tablet_flat) add_subdirectory(ydb/core/tablet) add_subdirectory(ydb/core/mon_alloc) add_subdirectory(library/cpp/lfalloc/alloc_profiler) @@ -449,19 +462,6 @@ add_subdirectory(ydb/library/persqueue/topic_parser_public) add_subdirectory(ydb/core/tablet_flat/protos) add_subdirectory(ydb/core/blobstorage) add_subdirectory(ydb/core/blobstorage/backpressure) -add_subdirectory(ydb/core/blobstorage/lwtrace_probes) -add_subdirectory(ydb/core/blobstorage/vdisk/common) -add_subdirectory(ydb/core/blobstorage/vdisk/hulldb/base) -add_subdirectory(ydb/core/blobstorage/pdisk) -add_subdirectory(ydb/library/schlab) -add_subdirectory(ydb/library/schlab/schine) -add_subdirectory(ydb/library/schlab/probes) -add_subdirectory(ydb/library/schlab/mon) -add_subdirectory(ydb/library/schlab/schemu) -add_subdirectory(ydb/library/schlab/schoot) -add_subdirectory(library/cpp/protobuf/json) -add_subdirectory(ydb/library/schlab/protos) -add_subdirectory(ydb/core/blobstorage/vdisk/protos) add_subdirectory(ydb/core/blobstorage/dsproxy) add_subdirectory(ydb/core/blobstorage/storagepoolmon) add_subdirectory(ydb/core/blobstorage/incrhuge) @@ -1168,6 +1168,10 @@ add_subdirectory(ydb/library/yql/minikql/perf/param) add_subdirectory(ydb/library/yql/minikql/perf/presort) add_subdirectory(library/cpp/presort) add_subdirectory(ydb/library/yql/sql/v1/perf) +add_subdirectory(ydb/library/schlab/mon/test) +add_subdirectory(ydb/library/schlab/mon/static) +add_subdirectory(ydb/library/schlab/mon/static/css) +add_subdirectory(ydb/library/schlab/mon/static/js) add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_blob_depot) add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_donor) add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_group_reconfiguration) @@ -1189,10 +1193,6 @@ add_subdirectory(ydb/core/blobstorage/nodewarden/ut) add_subdirectory(ydb/core/blobstorage/nodewarden/ut_sequence) add_subdirectory(ydb/core/blobstorage/pdisk/ut) add_subdirectory(ydb/core/blobstorage/storagepoolmon/ut) -add_subdirectory(ydb/library/schlab/mon/test) -add_subdirectory(ydb/library/schlab/mon/static) -add_subdirectory(ydb/library/schlab/mon/static/css) -add_subdirectory(ydb/library/schlab/mon/static/js) add_subdirectory(ydb/core/tx/balance_coverage/ut) add_subdirectory(ydb/core/tx/columnshard/ut) add_subdirectory(ydb/core/tx/coordinator/ut) diff --git a/ydb/core/blob_depot/CMakeLists.txt b/ydb/core/blob_depot/CMakeLists.txt index 1890d87354..b3f2690218 100644 --- a/ydb/core/blob_depot/CMakeLists.txt +++ b/ydb/core/blob_depot/CMakeLists.txt @@ -11,6 +11,7 @@ add_library(ydb-core-blob_depot) target_link_libraries(ydb-core-blob_depot PUBLIC contrib-libs-cxxsupp yutil + blobstorage-vdisk-common ydb-core-tablet_flat ydb-core-protos ) @@ -18,6 +19,7 @@ target_sources(ydb-core-blob_depot PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blob_depot.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/assimilator.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/assimilator_fetch_machine.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blocks.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_gc.cpp diff --git a/ydb/core/blob_depot/assimilator.cpp b/ydb/core/blob_depot/assimilator.cpp index d6528981c4..5b2c9790e9 100644 --- a/ydb/core/blob_depot/assimilator.cpp +++ b/ydb/core/blob_depot/assimilator.cpp @@ -1,20 +1,176 @@ #include "blob_depot_tablet.h" +#include "assimilator_fetch_machine.h" namespace NKikimr::NBlobDepot { class TBlobDepot::TGroupAssimilator : public TActorBootstrapped<TGroupAssimilator> { + enum { + EvReconnectToController = EventSpaceBegin(TEvents::ES_PRIVATE), + }; + const ui32 GroupId; - const TActorId BlobDepotId; + const NKikimrBlobDepot::TBlobDepotConfig Config; + TActorId BlobDepotId; + TIntrusivePtr<TBlobStorageGroupInfo> Info; public: - TGroupAssimilator(ui32 groupId, TActorId blobDepotId) + TGroupAssimilator(ui32 groupId, const NKikimrBlobDepot::TBlobDepotConfig& config) : GroupId(groupId) - , BlobDepotId(blobDepotId) - {} + , Config(config) + { + Y_VERIFY(Config.GetOperationMode() == NKikimrBlobDepot::EOperationMode::VirtualGroup); + } + + void Bootstrap(TActorId parentId) { + BlobDepotId = parentId; + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT31, "TGroupAssimilator::Bootstrap", (GroupId, GroupId)); + QueryGroupConfiguration(); + } + + void PassAway() override { + FetchMachine->OnPassAway(); + TActorBootstrapped::PassAway(); + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // BSC interaction + + TActorId ControllerPipeId; + + void QueryGroupConfiguration() { + TGroupID groupId(GroupId); + if (groupId.ConfigurationType() != EGroupConfigurationType::Dynamic) { + AbortWithError("group configuration type is not dynamic"); + } + + const ui64 controllerId = MakeBSControllerID(groupId.AvailabilityDomainID()); + ControllerPipeId = Register(NTabletPipe::CreateClient(SelfId(), controllerId)); + Become(&TThis::StateQueryController); + } + + STRICT_STFUNC(StateQueryController, + cFunc(TEvents::TSystem::Poison, PassAway); + + cFunc(EvReconnectToController, QueryGroupConfiguration); + hFunc(TEvTabletPipe::TEvClientConnected, Handle); + hFunc(TEvTabletPipe::TEvClientDestroyed, Handle); + hFunc(TEvBlobStorage::TEvControllerNodeServiceSetUpdate, Handle); + ); + + void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev) { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT00, "TGroupAssimilator::TEvClientConnected", (GroupId, GroupId)); + Y_VERIFY(ev->Get()->ClientId == ControllerPipeId); + if (ev->Get()->Status == NKikimrProto::OK) { + NTabletPipe::SendData(SelfId(), ControllerPipeId, new TEvBlobStorage::TEvControllerGetGroup(0, GroupId)); + } else { + Reconnect(); + } + } + + void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev) { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT32, "TGroupAssimilator::TEvClientDestroyed", (GroupId, GroupId)); + Y_VERIFY(ev->Get()->ClientId == ControllerPipeId); + Reconnect(); + } + + void Handle(TEvBlobStorage::TEvControllerNodeServiceSetUpdate::TPtr ev) { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT33, "TGroupAssimilator::TEvControllerNodeServiceSetUpdate", (GroupId, GroupId), + (Msg, ev->Get()->ToString())); + NTabletPipe::CloseAndForgetClient(SelfId(), ControllerPipeId); + + auto& record = ev->Get()->Record; + if (record.HasStatus() && record.GetStatus() == NKikimrProto::OK && record.HasServiceSet()) { + const auto& ss = record.GetServiceSet(); + for (const auto& group : ss.GetGroups()) { + if (group.GetGroupID() == GroupId) { + if (group.GetEntityStatus() == NKikimrBlobStorage::EEntityStatus::DESTROY) { + return AbortWithError("the group being decommitted was destroyed"); + } else if (!group.HasAssimilatorGroupId()) { + return AbortWithError("the group being decommitted is not in assimilation mode"); + } else if (group.HasBlobDepotId()) { + const TString msg = "the group being decommitted is a virtual one"; + Y_VERIFY_DEBUG(false, "%s", msg.data()); + STLOG(PRI_CRIT, BLOB_DEPOT, BDT35, msg, (GroupId, GroupId)); + return AbortWithError(msg); + } else { + Info = TBlobStorageGroupInfo::Parse(group, nullptr, nullptr); + StartAssimilation(); + return; + } + } + } + } + + // retry operation in some time + Reconnect(); + } - void Bootstrap() { - (void)GroupId; + void Reconnect() { + NTabletPipe::CloseAndForgetClient(SelfId(), ControllerPipeId); + TActivationContext::Schedule(TDuration::Seconds(1), new IEventHandle(EvReconnectToController, 0, + SelfId(), {}, nullptr, 0)); + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + std::unique_ptr<TGroupAssimilatorFetchMachine> FetchMachine; + + void StartAssimilation() { + Become(&TThis::StateAssimilate); + FetchMachine = std::make_unique<TGroupAssimilatorFetchMachine>(SelfId(), Info, BlobDepotId); + } + + void StateAssimilate(STFUNC_SIG) { + Y_UNUSED(ctx); + + switch (ev->GetTypeRewrite()) { + cFunc(TEvents::TSystem::Poison, PassAway); + IgnoreFunc(TEvTabletPipe::TEvClientDestroyed); + + default: + return FetchMachine->Handle(ev); + } + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + void AbortWithError(TString error) { + STLOG(PRI_ERROR, BLOB_DEPOT, BDT34, "failed to assimilate group", (GroupId, GroupId), (Error, error)); + TActivationContext::Send(new IEventHandle(TEvents::TSystem::Gone, 0, BlobDepotId, SelfId(), nullptr, GroupId)); + PassAway(); } }; + void TBlobDepot::StartGroupAssimilators() { + for (const ui32 groupId : Config.GetDecommittingGroups()) { + if (Config.GetOperationMode() != NKikimrBlobDepot::EOperationMode::VirtualGroup) { + STLOG(PRI_CRIT, BLOB_DEPOT, BDT36, "incorrect operating mode of BlobDepot", (TabletId, TabletID())); + Y_VERIFY_DEBUG(false, "incorrect operating mode of BlobDepot"); + return; + } + + StartGroupAssimilator(groupId); + } + } + + void TBlobDepot::StartGroupAssimilator(ui32 groupId) { + if (RunningGroupAssimilators.contains(groupId)) { + return; + } + + RunningGroupAssimilators[groupId] = Register(new TGroupAssimilator(groupId, Config)); + } + + void TBlobDepot::HandleGone(TAutoPtr<IEventHandle> ev) { + if (const auto it = RunningGroupAssimilators.find(ev->Cookie); it != RunningGroupAssimilators.end()) { + Y_VERIFY(it->second == ev->Sender); + RunningGroupAssimilators.erase(it); + } else { + Y_FAIL("unexpected event"); + } + } + + void TBlobDepot::Handle(TEvAssimilatedData::TPtr /*ev*/) { + } + } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/assimilator_fetch_machine.cpp b/ydb/core/blob_depot/assimilator_fetch_machine.cpp new file mode 100644 index 0000000000..a9910d2584 --- /dev/null +++ b/ydb/core/blob_depot/assimilator_fetch_machine.cpp @@ -0,0 +1,286 @@ +#include "assimilator_fetch_machine.h" + +namespace NKikimr::NBlobDepot { + + using TFetchMachine = TBlobDepot::TGroupAssimilatorFetchMachine; + + TFetchMachine::TGroupAssimilatorFetchMachine(TActorIdentity self, TIntrusivePtr<TBlobStorageGroupInfo> info, + TActorId blobDepotId) + : Self(self) + , Info(std::move(info)) + , BlobDepotId(blobDepotId) + { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT38, "TGroupAssimilatorFetchMachine start", (GroupId, Info->GroupID)); + PerDiskState.resize(Info->GetTotalVDisksNum()); + VDisksInHeap.emplace(&Info->GetTopology()); + for (ui32 i = 0; i < PerDiskState.size(); ++i) { + const TActorId actorId = Info->GetActorId(i); + const ui32 nodeId = actorId.NodeId(); + TNodeInfo& node = Nodes[nodeId]; + node.OrderNumbers.push_back(i); + } + for (const auto& [nodeId, node] : Nodes) { + if (nodeId != Self.NodeId()) { + TActivationContext::Send(new IEventHandle(TEvInterconnect::EvConnectNode, 0, + TActivationContext::InterconnectProxy(nodeId), Self, nullptr, 0)); + } else { + for (const ui32 orderNumber : node.OrderNumbers) { + IssueAssimilateCmdToVDisk(orderNumber); + } + } + } + } + + void TFetchMachine::Handle(TAutoPtr<IEventHandle>& ev) { + switch (const ui32 type = ev->GetTypeRewrite()) { + hFunc(TEvBlobStorage::TEvVAssimilateResult, Handle); + hFunc(TEvents::TEvUndelivered, Handle); + hFunc(TEvInterconnect::TEvNodeConnected, Handle); + hFunc(TEvInterconnect::TEvNodeDisconnected, Handle); + + default: + Y_VERIFY_DEBUG(false, "unexpected event Type# %08" PRIx32, type); + } + } + + void TFetchMachine::OnPassAway() { + for (const auto& [nodeId, node] : Nodes) { + if (nodeId != Self.NodeId()) { + TActivationContext::Send(new IEventHandle(TEvents::TSystem::Unsubscribe, 0, + TActivationContext::InterconnectProxy(nodeId), Self, nullptr, 0)); + } + } + } + + void TFetchMachine::IssueAssimilateCmdToVDisk(ui32 orderNumber) { + const TActorId actorId = Info->GetActorId(orderNumber); + + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT39, "IssueAssimilateCmdToVDisk", (GroupId, Info->GroupID), + (OrderNumber, orderNumber), (ActorId, actorId)); + + TPerDiskState& state = PerDiskState[orderNumber]; + Y_VERIFY(!state.Finished); + + auto ev = std::make_unique<TEvBlobStorage::TEvVAssimilate>(Info->GetVDiskId(orderNumber)); + auto& record = ev->Record; + if (state.LastBlock) { + record.SetSkipBlocksUpTo(*state.LastBlock); + } + if (state.LastBarrier) { + auto *x = record.MutableSkipBarriersUpTo(); + x->SetTabletId(std::get<0>(*state.LastBarrier)); + x->SetChannel(std::get<1>(*state.LastBarrier)); + } + if (state.LastBlob) { + LogoBlobIDFromLogoBlobID(*state.LastBlob, record.MutableSkipBlobsUpTo()); + } + + const ui64 id = ++LastRequestId; + Self.Send(actorId, ev.release(), IEventHandle::FlagTrackDelivery, id); + + const auto [it, inserted] = RequestsInFlight.emplace(id, TRequestInFlight{orderNumber}); + Y_VERIFY(inserted); + + const ui32 nodeId = actorId.NodeId(); + Nodes[nodeId].RequestsInFlight.insert(&*it); + } + + void TFetchMachine::Handle(TEvInterconnect::TEvNodeConnected::TPtr ev) { + const ui32 nodeId = ev->Get()->NodeId; + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT40, "NodeConnected", (GroupId, Info->GroupID), (NodeId, nodeId)); + TNodeInfo& node = Nodes[nodeId]; + for (const ui32 orderNumber : node.OrderNumbers) { + auto& state = PerDiskState[orderNumber]; + if (!state.Finished) { + IssueAssimilateCmdToVDisk(orderNumber); + } + } + } + + void TFetchMachine::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr ev) { + const ui32 nodeId = ev->Get()->NodeId; + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT41, "NodeDisconnected", (GroupId, Info->GroupID), (NodeId, nodeId)); + TNodeInfo& node = Nodes[nodeId]; + for (const auto *kv : std::exchange(node.RequestsInFlight, {})) { + const size_t num = RequestsInFlight.erase(kv->first); + Y_VERIFY(num == 1); + } + } + + void TFetchMachine::Handle(TEvents::TEvUndelivered::TPtr ev) { + if (ev->Get()->SourceType == TEvBlobStorage::EvVAssimilate) { + // TODO: undelivery may be caused by moving VDisk actor out, handle it + EndRequest(ev->Cookie); + } + } + + ui32 TFetchMachine::EndRequest(ui64 id) { + const auto it = RequestsInFlight.find(id); + Y_VERIFY(it != RequestsInFlight.end()); + const ui32 orderNumber = it->second.OrderNumber; + const TActorId actorId = Info->GetActorId(orderNumber); + const ui32 nodeId = actorId.NodeId(); + TNodeInfo& node = Nodes[nodeId]; + const size_t num = node.RequestsInFlight.erase(&*it); + Y_VERIFY(num == 1); + RequestsInFlight.erase(it); + return orderNumber; + } + + void TFetchMachine::Handle(TEvBlobStorage::TEvVAssimilateResult::TPtr ev) { + const ui32 orderNumber = EndRequest(ev->Cookie); + const auto& record = ev->Get()->Record; + + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT42, "EvVAssimilate", (GroupId, Info->GroupID), (Id, ev->Cookie), + (OrderNumber, orderNumber), (Status, record.GetStatus()), (Blocks.size, record.BlocksSize()), + (Barriers.size, record.BarriersSize()), (Blobs.size, record.BlobsSize())); + + TPerDiskState& state = PerDiskState[orderNumber]; + + const bool wasExhausted = state.Exhausted(); + + for (const auto& item : record.GetBlocks()) { + const ui64 tabletId = item.GetTabletId(); + state.LastBlock.emplace(tabletId); + if (CurrentBlock <= *state.LastBlock) { + state.Blocks.emplace_back(item); + } + } + for (const auto& item : record.GetBarriers()) { + const ui64 tabletId = item.GetTabletId(); + const ui8 channel = item.GetChannel(); + state.LastBarrier.emplace(tabletId, channel); + if (CurrentBarrier <= *state.LastBarrier) { + state.Barriers.emplace_back(item); + } + } + ui64 raw[3] = {0, 0, 0}; + for (const auto& item : record.GetBlobs()) { + if (item.HasRawX1()) { + raw[0] = item.GetRawX1(); + } else if (item.HasDiffX1()) { + raw[0] += item.GetDiffX1(); + } + if (item.HasRawX2()) { + raw[1] = item.GetRawX2(); + } else if (item.HasDiffX2()) { + raw[1] += item.GetDiffX2(); + } + if (item.HasRawX3()) { + raw[2] = item.GetRawX3(); + } else if (item.HasDiffX3()) { + raw[2] += item.GetDiffX3(); + } + const TLogoBlobID id(raw); + state.LastBlob.emplace(id); + if (CurrentBlob <= *state.LastBlob) { + state.Blobs.emplace_back(item, id); + } + } + + if (wasExhausted && !state.Exhausted()) { + Heap.push_back(&state); + std::push_heap(Heap.begin(), Heap.end(), TPerDiskState::THeapCompare()); + *VDisksInHeap |= {&Info->GetTopology(), Info->GetVDiskId(orderNumber)}; + } + + if (record.BlocksSize() + record.BarriersSize() + record.BlobsSize() == 0 && record.GetStatus() == NKikimrProto::OK) { + state.Finished = true; + } else if (state.Exhausted()) { // still no records; for example, when all were skipped + return IssueAssimilateCmdToVDisk(orderNumber); + } + + Merge(); + } + + void TFetchMachine::Merge() { + auto ev = std::make_unique<TEvAssimilatedData>(Info->GroupID); + + bool quorumCorrect = Info->GetQuorumChecker().CheckQuorumForGroup(*VDisksInHeap); + while (quorumCorrect) { + if (Heap.empty()) { + CurrentBlob = Max<TLogoBlobID>(); + break; + } + + std::optional<TBlock> block; + std::optional<TBarrier> barrier; + std::optional<TBlob> blob; + + auto callback = [&](auto&& value, ui32 orderNumber) { + using T = std::decay_t<decltype(value)>; + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT37, "AssimilatedItem", (GroupId, Info->GroupID), (OrderNumber, orderNumber), + (Value, value)); + if constexpr (std::is_same_v<T, TBlock>) { + Y_VERIFY(CurrentBlock <= value.TabletId); + CurrentBlock = value.TabletId; + if (block) { + block->Merge(value); + } else { + block.emplace(std::move(value)); + } + } else if constexpr (std::is_same_v<T, TBarrier>) { + CurrentBlock = Max<ui64>(); + Y_VERIFY(CurrentBarrier <= std::make_tuple(value.TabletId, value.Channel)); + CurrentBarrier = {value.TabletId, value.Channel}; + if (barrier) { + barrier->Merge(value); + } else { + barrier.emplace(std::move(value)); + } + } else if constexpr (std::is_same_v<T, TBlob>) { + CurrentBarrier = {Max<ui64>(), Max<ui8>()}; + Y_VERIFY(CurrentBlob <= value.Id); + CurrentBlob = value.Id; + if (blob) { + blob->Merge(value); + } else { + blob.emplace(std::move(value)); + } + // special handling for part layout here -- we can't just merge the blobs + } else { + static_assert(TDependentFalse<T>, "incorrect case"); + } + }; + + TPerDiskState& head = *Heap.front(); + auto key = head.FirstKey(); + while (!Heap.empty() && Heap.front()->FirstKey() == key) { + std::pop_heap(Heap.begin(), Heap.end(), TPerDiskState::THeapCompare()); + const ui32 orderNumber = Heap.back() - PerDiskState.data(); + TPerDiskState& item = PerDiskState[orderNumber]; + item.PopFirstItem(std::bind(callback, std::placeholders::_1, orderNumber)); + if (item.Exhausted()) { + if (!item.Finished) { + // data not yet received -- ask for it + IssueAssimilateCmdToVDisk(orderNumber); + // mark disk temporarily unavailable + *VDisksInHeap -= {&Info->GetTopology(), Info->GetVDiskId(orderNumber)}; + quorumCorrect = Info->GetQuorumChecker().CheckQuorumForGroup(*VDisksInHeap); + } + // remove item from the heap -- it has no valid data to process + Heap.pop_back(); + } else { + // more items to do + std::push_heap(Heap.begin(), Heap.end(), TPerDiskState::THeapCompare()); + } + } + + if (block) { + ev->Blocks.push_back(std::move(*block)); + } else if (barrier) { + ev->Barriers.push_back(std::move(*barrier)); + } else if (blob) { + ev->Blobs.push_back(std::move(*blob)); + } else { + Y_FAIL(); + } + } + + ev->BlobsFinished = CurrentBlob == Max<TLogoBlobID>(); + ev->BarriersFinished = ev->BlobsFinished || CurrentBarrier == std::make_tuple(Max<ui64>(), Max<ui8>()); + ev->BlocksFinished = ev->BarriersFinished || CurrentBlock == Max<ui64>(); + Self.Send(BlobDepotId, ev.release()); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/assimilator_fetch_machine.h b/ydb/core/blob_depot/assimilator_fetch_machine.h new file mode 100644 index 0000000000..27469f027a --- /dev/null +++ b/ydb/core/blob_depot/assimilator_fetch_machine.h @@ -0,0 +1,132 @@ +#pragma once + +#include "defs.h" +#include "blob_depot_tablet.h" + +namespace NKikimr::NBlobDepot { + + class TBlobDepot::TGroupAssimilatorFetchMachine { + TActorIdentity Self; + TIntrusivePtr<TBlobStorageGroupInfo> Info; + TActorId BlobDepotId; + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Data processing + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + struct TPerDiskState { + std::deque<TBlock> Blocks; + std::optional<ui64> LastBlock; + std::deque<TBarrier> Barriers; + std::optional<std::tuple<ui64, ui8>> LastBarrier; + std::deque<TBlob> Blobs; + std::optional<TLogoBlobID> LastBlob; + bool Finished = false; + + ui64 FirstBlock() const { + Y_VERIFY_DEBUG(!Blocks.empty()); + return Blocks.front().TabletId; + } + + std::tuple<ui64, ui8> FirstBarrier() const { + Y_VERIFY_DEBUG(!Barriers.empty()); + const auto& barrier = Barriers.front(); + return {barrier.TabletId, barrier.Channel}; + } + + TLogoBlobID FirstBlob() const { + Y_VERIFY_DEBUG(!Blobs.empty()); + return Blobs.front().Id; + } + + std::variant<ui64, std::tuple<ui64, ui8>, TLogoBlobID> FirstKey() const { + if (!Blocks.empty()) { + return FirstBlock(); + } else if (!Barriers.empty()) { + return FirstBarrier(); + } else if (!Blobs.empty()) { + return FirstBlob(); + } else { + Y_FAIL(); + } + } + + template<typename T> + void PopFirstItem(T&& callback) { + if (!Blocks.empty()) { + callback(Blocks.front()); + Blocks.pop_front(); + } else if (!Barriers.empty()) { + callback(Barriers.front()); + Barriers.pop_front(); + } else if (!Blobs.empty()) { + callback(Blobs.front()); + Blobs.pop_front(); + } else { + Y_FAIL(); + } + } + + struct THeapCompare { + bool operator ()(const TPerDiskState *x, const TPerDiskState *y) const { + if (!x->Blocks.empty() && !y->Blocks.empty()) { + return x->FirstBlock() > y->FirstBlock(); + } else if (x->Blocks.empty() != y->Blocks.empty()) { + return x->Blocks.empty() > y->Blocks.empty(); + } else if (!x->Barriers.empty() && !y->Barriers.empty()) { + return x->FirstBarrier() > y->FirstBarrier(); + } else if (x->Barriers.empty() != y->Barriers.empty()) { + return x->Barriers.empty() > y->Barriers.empty(); + } else if (!x->Blobs.empty() && !y->Blobs.empty()) { + return x->FirstBlob() > y->FirstBlob(); + } else { + return x->Barriers.empty() > y->Barriers.empty(); + } + } + }; + + bool Exhausted() const { + return Blocks.empty() && Barriers.empty() && Blobs.empty(); + } + }; + + std::vector<TPerDiskState> PerDiskState; + std::vector<TPerDiskState*> Heap; + ui64 CurrentBlock = 0; + std::tuple<ui64, ui8> CurrentBarrier{0, 0}; + TLogoBlobID CurrentBlob; + std::optional<TBlobStorageGroupInfo::TGroupVDisks> VDisksInHeap; + + struct TRequestInFlight { + ui32 OrderNumber; + }; + + using TRequestsInFlight = THashMap<ui64, TRequestInFlight>; + + struct TNodeInfo { + std::vector<ui32> OrderNumbers; + THashSet<TRequestsInFlight::value_type*> RequestsInFlight; + }; + + ui32 LastRequestId = 0; + TRequestsInFlight RequestsInFlight; + THashMap<ui32, TNodeInfo> Nodes; + + public: + TGroupAssimilatorFetchMachine(TActorIdentity self, TIntrusivePtr<TBlobStorageGroupInfo> info, + TActorId blobDepotId); + void Handle(TAutoPtr<IEventHandle>& ev); + void OnPassAway(); + + private: + void IssueAssimilateCmdToVDisk(ui32 orderNumber); + void Handle(TEvInterconnect::TEvNodeConnected::TPtr ev); + void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr ev); + void Handle(TEvents::TEvUndelivered::TPtr ev); + ui32 EndRequest(ui64 id); + void Handle(TEvBlobStorage::TEvVAssimilateResult::TPtr ev); + void Merge(); + void MergeDone(); + }; + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/blob_depot.cpp b/ydb/core/blob_depot/blob_depot.cpp index bba9917e1a..a93015f652 100644 --- a/ydb/core/blob_depot/blob_depot.cpp +++ b/ydb/core/blob_depot/blob_depot.cpp @@ -40,6 +40,9 @@ namespace NKikimr::NBlobDepot { hFunc(TEvTabletPipe::TEvServerConnected, Handle); hFunc(TEvTabletPipe::TEvServerDisconnected, Handle); + fFunc(TEvents::TSystem::Gone, HandleGone); + hFunc(TEvAssimilatedData, Handle); + default: if (!HandleDefaultEvents(ev, ctx)) { Y_FAIL("unexpected event Type# 0x%08" PRIx32, type); @@ -51,6 +54,14 @@ namespace NKikimr::NBlobDepot { } } + void TBlobDepot::PassAway() { + for (const auto& [_, actorId] : RunningGroupAssimilators) { + TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, actorId, SelfId(), nullptr, 0)); + } + + TActor::PassAway(); + } + IActor *CreateBlobDepot(const TActorId& tablet, TTabletStorageInfo *info) { return new TBlobDepot(tablet, info); } diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h index e9e58aa960..55bd284f0c 100644 --- a/ydb/core/blob_depot/blob_depot_tablet.h +++ b/ydb/core/blob_depot/blob_depot_tablet.h @@ -15,9 +15,130 @@ namespace NKikimr::NBlobDepot { struct TEvPrivate { enum { EvCheckExpiredAgents = EventSpaceBegin(TEvents::ES_PRIVATE), + EvAssimilatedData, }; }; + struct TBlock { + ui64 TabletId; + ui32 BlockedGeneration; + + TBlock() = default; + + TBlock(const NKikimrBlobStorage::TEvVAssimilateResult::TBlock& item) + : TabletId(item.GetTabletId()) + , BlockedGeneration(item.GetBlockedGeneration()) + {} + + void Merge(TBlock& other) { + Y_VERIFY_DEBUG(other.TabletId == TabletId); + BlockedGeneration = Max(BlockedGeneration, other.BlockedGeneration); + } + + TString ToString() const { + return TStringBuilder() << "{" << TabletId << ":" << BlockedGeneration << "}"; + } + }; + + struct TBarrier { + struct TValue { + ui32 RecordGeneration; + ui32 PerGenerationCounter; + ui32 CollectGeneration; + ui32 CollectStep; + + TValue(const NKikimrBlobStorage::TEvVAssimilateResult::TBarrier::TValue& value) + : RecordGeneration(value.GetRecordGeneration()) + , PerGenerationCounter(value.GetPerGenerationCounter()) + , CollectGeneration(value.GetCollectGeneration()) + , CollectStep(value.GetCollectStep()) + {} + + void Merge(TValue& other) { + if (KeyAsTuple() < other.KeyAsTuple()) { + *this = other; + } + } + + TString ToString() const { + return TStringBuilder() << "{" << RecordGeneration << ":" << PerGenerationCounter + << "=>" << CollectGeneration << ":" << CollectStep << "}"; + } + + std::tuple<ui32, ui32> KeyAsTuple() const { + return {RecordGeneration, PerGenerationCounter}; + } + }; + + ui64 TabletId; + ui8 Channel; + std::optional<TValue> Hard; + std::optional<TValue> Soft; + + TBarrier() = default; + + TBarrier(const NKikimrBlobStorage::TEvVAssimilateResult::TBarrier& item) + : TabletId(item.GetTabletId()) + , Channel(item.GetChannel()) + , Hard(item.HasHard() ? std::make_optional(TValue(item.GetHard())) : std::nullopt) + , Soft(item.HasSoft() ? std::make_optional(TValue(item.GetSoft())) : std::nullopt) + {} + + void Merge(TBarrier& other) { + Y_VERIFY_DEBUG(TabletId == other.TabletId && Channel == other.Channel); + if (Hard && other.Hard) { + Hard->Merge(*other.Hard); + } else if (other.Hard) { + Hard = std::move(other.Hard); + } + if (Soft && other.Soft) { + Soft->Merge(*other.Soft); + } else if (other.Soft) { + Soft = std::move(other.Soft); + } + } + + TString ToString() const { + return TStringBuilder() << "{" << TabletId << ":" << int(Channel) << "@" << (Hard ? Hard->ToString() : "") + << "/" << (Soft ? Soft->ToString() : "") << "}"; + } + }; + + struct TBlob { + TLogoBlobID Id; + ui64 Ingress; + + TBlob() = default; + + TBlob(const NKikimrBlobStorage::TEvVAssimilateResult::TBlob& item, const TLogoBlobID& id) + : Id(id) + , Ingress(item.GetIngress()) + {} + + void Merge(TBlob& other) { + Y_VERIFY_DEBUG(Id == other.Id); + Ingress |= other.Ingress; + } + + TString ToString() const { + return TStringBuilder() << "{" << Id.ToString() << "/" << Ingress << "}"; + } + }; + + struct TEvAssimilatedData : TEventLocal<TEvAssimilatedData, TEvPrivate::EvAssimilatedData> { + const ui32 GroupId; + std::deque<TBlock> Blocks; + bool BlocksFinished = false; + std::deque<TBarrier> Barriers; + bool BarriersFinished = false; + std::deque<TBlob> Blobs; + bool BlobsFinished = false; + + TEvAssimilatedData(ui32 groupId) + : GroupId(groupId) + {} + }; + public: TBlobDepot(TActorId tablet, TTabletStorageInfo *info); ~TBlobDepot(); @@ -96,6 +217,11 @@ namespace NKikimr::NBlobDepot { SignalTabletActive(TActivationContext::AsActorContext()); } + void StartOperation() { + InitChannelKinds(); + StartGroupAssimilators(); + } + void OnDetach(const TActorContext&) override { STLOG(PRI_DEBUG, BLOB_DEPOT, BDT26, "OnDetach", (TabletId, TabletID())); @@ -108,6 +234,8 @@ namespace NKikimr::NBlobDepot { PassAway(); } + void PassAway() override; + void InitChannelKinds(); //////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -180,7 +308,15 @@ namespace NKikimr::NBlobDepot { //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Group assimilation + THashMap<ui32, TActorId> RunningGroupAssimilators; + class TGroupAssimilator; + class TGroupAssimilatorFetchMachine; + + void StartGroupAssimilators(); + void StartGroupAssimilator(ui32 groupId); + void HandleGone(TAutoPtr<IEventHandle> ev); + void Handle(TEvAssimilatedData::TPtr ev); }; } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/defs.h b/ydb/core/blob_depot/defs.h index 8fceb52d73..b0b79bd6da 100644 --- a/ydb/core/blob_depot/defs.h +++ b/ydb/core/blob_depot/defs.h @@ -1,7 +1,10 @@ #pragma once #include <ydb/core/base/tablet_pipe.h> +#include <ydb/core/blobstorage/base/blobstorage_events.h> #include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.h> +#include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo_sets.h> +#include <ydb/core/blobstorage/vdisk/common/vdisk_events.h> #include <ydb/core/engine/minikql/flat_local_tx_factory.h> #include <ydb/core/tablet_flat/tablet_flat_executed.h> #include <ydb/core/tablet_flat/flat_cxx_database.h> diff --git a/ydb/core/blob_depot/op_apply_config.cpp b/ydb/core/blob_depot/op_apply_config.cpp index aaed6ed727..7c6f1353ac 100644 --- a/ydb/core/blob_depot/op_apply_config.cpp +++ b/ydb/core/blob_depot/op_apply_config.cpp @@ -50,7 +50,7 @@ namespace NKikimr::NBlobDepot { (WasConfigured, WasConfigured)); if (!WasConfigured) { - Self->InitChannelKinds(); + Self->StartOperation(); } TActivationContext::Send(Response.release()); } diff --git a/ydb/core/blob_depot/op_load.cpp b/ydb/core/blob_depot/op_load.cpp index e166186398..d3947b5ab9 100644 --- a/ydb/core/blob_depot/op_load.cpp +++ b/ydb/core/blob_depot/op_load.cpp @@ -90,7 +90,7 @@ namespace NKikimr::NBlobDepot { (Configured, Configured)); if (Configured) { - Self->InitChannelKinds(); + Self->StartOperation(); } Self->OnLoadFinished(); diff --git a/ydb/core/mind/bscontroller/config.cpp b/ydb/core/mind/bscontroller/config.cpp index 5a7d62ee8a..0b2f820ac8 100644 --- a/ydb/core/mind/bscontroller/config.cpp +++ b/ydb/core/mind/bscontroller/config.cpp @@ -636,6 +636,8 @@ namespace NKikimr::NBsController { Y_VERIFY(vslot->GroupId == groupId); Y_VERIFY(vslot->GroupGeneration == group.Generation); } + Y_VERIFY_DEBUG((group.DecommitStatus == NKikimrBlobStorage::EGroupDecommitStatus::NONE) == + group.AssimilatorGroupId.Empty()); }); #endif } diff --git a/ydb/core/mind/bscontroller/get_group.cpp b/ydb/core/mind/bscontroller/get_group.cpp index 25c41f3b58..78b5d28bca 100644 --- a/ydb/core/mind/bscontroller/get_group.cpp +++ b/ydb/core/mind/bscontroller/get_group.cpp @@ -18,17 +18,18 @@ public: Self->TabletCounters->Cumulative()[NBlobStorageController::COUNTER_GET_GROUP_COUNT].Increment(1); TRequestCounter counter(Self->TabletCounters, NBlobStorageController::COUNTER_GET_GROUP_USEC); - auto request = std::move(Request); - STLOG(PRI_DEBUG, BS_CONTROLLER, BSCTXGG01, "Handle TEvControllerGetGroup", (Request, request->Get()->Record)); + STLOG(PRI_DEBUG, BS_CONTROLLER, BSCTXGG01, "Handle TEvControllerGetGroup", (Request, Request->Get()->Record)); - const auto& v = request->Get()->Record.GetGroupIDs(); + const auto& v = Request->Get()->Record.GetGroupIDs(); TSet<ui32> groupIDsToRead(v.begin(), v.end()); - const TNodeId nodeId = request->Get()->Record.GetNodeID(); + const TNodeId nodeId = Request->Get()->Record.GetNodeID(); auto res = std::make_unique<TEvBlobStorage::TEvControllerNodeServiceSetUpdate>(NKikimrProto::OK, nodeId); Self->ReadGroups(groupIDsToRead, true, res.get(), nodeId); - Response = std::make_unique<IEventHandle>(MakeBlobStorageNodeWardenID(nodeId), Self->SelfId(), res.release()); + Response = std::make_unique<IEventHandle>(nodeId ? MakeBlobStorageNodeWardenID(nodeId) : Request->Sender, + Self->SelfId(), res.release()); + return true; } @@ -38,6 +39,10 @@ public: }; void TBlobStorageController::Handle(TEvBlobStorage::TEvControllerGetGroup::TPtr& ev) { + STLOG(PRI_DEBUG, BS_CONTROLLER, BSCTXGG02, "TEvControllerGetGroup", (Sender, ev->Sender), (Cookie, ev->Cookie), + (Recipient, ev->Recipient), (RecipientRewrite, ev->GetRecipientRewrite()), (Request, ev->Get()->Record), + (StopGivingGroups, StopGivingGroups)); + if (!StopGivingGroups) { Execute(new TTxGetGroup(ev, this)); } diff --git a/ydb/core/mind/bscontroller/load_everything.cpp b/ydb/core/mind/bscontroller/load_everything.cpp index e01099456a..3b7a9a2f95 100644 --- a/ydb/core/mind/bscontroller/load_everything.cpp +++ b/ydb/core/mind/bscontroller/load_everything.cpp @@ -200,6 +200,10 @@ public: isVirtualGroup ? 0 : std::get<2>(geomIt->second)); group.DecommitStatus = groups.GetValueOrDefault<T::DecommitStatus>(); + if (groups.HaveValue<T::AssimilatorGroupId>()) { + group.AssimilatorGroupId = groups.GetValue<T::AssimilatorGroupId>(); + } + #define OPTIONAL(NAME) \ if (groups.HaveValue<T::NAME>()) { \ group.NAME = groups.GetValue<T::NAME>(); \ diff --git a/ydb/core/mind/bscontroller/register_node.cpp b/ydb/core/mind/bscontroller/register_node.cpp index f1a88a8546..d4dac9cf48 100644 --- a/ydb/core/mind/bscontroller/register_node.cpp +++ b/ydb/core/mind/bscontroller/register_node.cpp @@ -352,7 +352,7 @@ void TBlobStorageController::ReadGroups(TSet<ui32>& groupIDsToRead, bool discard } SerializeGroupInfo(groupProto, *group, info.Name, scopeId); - } else { + } else if (nodeId) { // group is not listable, so we have to postpone the request from NW group->WaitingNodes.insert(nodeId); GetNode(nodeId).WaitingForGroups.insert(group->ID); diff --git a/ydb/core/mind/bscontroller/virtual_group.cpp b/ydb/core/mind/bscontroller/virtual_group.cpp index 1a21ea6a11..768bc8f68c 100644 --- a/ydb/core/mind/bscontroller/virtual_group.cpp +++ b/ydb/core/mind/bscontroller/virtual_group.cpp @@ -117,7 +117,7 @@ namespace NKikimr::NBsController { } group->DecommitStatus = NKikimrBlobStorage::EGroupDecommitStatus::PENDING; group->AssimilatorGroupId = virtualGroup->ID; - ++group->Generation; // advance group generation to push configs forcibly to all concerned nodes + group->ContentChanged = true; // advance group generation to push configs forcibly to all concerned nodes NKikimrBlobDepot::TBlobDepotConfig blobDepotConfig; if (!blobDepotConfig.ParseFromString(*virtualGroup->BlobDepotConfig)) { |