aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2022-08-03 17:48:58 +0300
committeralexvru <alexvru@ydb.tech>2022-08-03 17:48:58 +0300
commit591ac6799945ac99f1e5a377d2221bd3b4d516e8 (patch)
treea2c9df301811f5b76ad6c212cc2df207e601429f
parent1ebdd3a7ff6912ae7909a98c7d472a793983b71e (diff)
downloadydb-591ac6799945ac99f1e5a377d2221bd3b4d516e8.tar.gz
BlobDepot work in progress
-rw-r--r--CMakeLists.darwin.txt36
-rw-r--r--CMakeLists.linux.txt36
-rw-r--r--ydb/core/blob_depot/CMakeLists.txt2
-rw-r--r--ydb/core/blob_depot/assimilator.cpp168
-rw-r--r--ydb/core/blob_depot/assimilator_fetch_machine.cpp286
-rw-r--r--ydb/core/blob_depot/assimilator_fetch_machine.h132
-rw-r--r--ydb/core/blob_depot/blob_depot.cpp11
-rw-r--r--ydb/core/blob_depot/blob_depot_tablet.h136
-rw-r--r--ydb/core/blob_depot/defs.h3
-rw-r--r--ydb/core/blob_depot/op_apply_config.cpp2
-rw-r--r--ydb/core/blob_depot/op_load.cpp2
-rw-r--r--ydb/core/mind/bscontroller/config.cpp2
-rw-r--r--ydb/core/mind/bscontroller/get_group.cpp15
-rw-r--r--ydb/core/mind/bscontroller/load_everything.cpp4
-rw-r--r--ydb/core/mind/bscontroller/register_node.cpp2
-rw-r--r--ydb/core/mind/bscontroller/virtual_group.cpp2
16 files changed, 788 insertions, 51 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt
index ad538189a1..65ba8074c9 100644
--- a/CMakeLists.darwin.txt
+++ b/CMakeLists.darwin.txt
@@ -428,8 +428,21 @@ add_subdirectory(library/cpp/ipmath)
add_subdirectory(library/cpp/ipv6_address)
add_subdirectory(library/cpp/int128)
add_subdirectory(ydb/core/blob_depot)
-add_subdirectory(ydb/core/tablet_flat)
+add_subdirectory(ydb/core/blobstorage/vdisk/common)
+add_subdirectory(ydb/core/blobstorage/vdisk/hulldb/base)
+add_subdirectory(ydb/core/blobstorage/pdisk)
+add_subdirectory(ydb/core/blobstorage/lwtrace_probes)
add_subdirectory(ydb/core/control)
+add_subdirectory(ydb/library/schlab)
+add_subdirectory(ydb/library/schlab/schine)
+add_subdirectory(ydb/library/schlab/probes)
+add_subdirectory(ydb/library/schlab/mon)
+add_subdirectory(ydb/library/schlab/schemu)
+add_subdirectory(ydb/library/schlab/schoot)
+add_subdirectory(library/cpp/protobuf/json)
+add_subdirectory(ydb/library/schlab/protos)
+add_subdirectory(ydb/core/blobstorage/vdisk/protos)
+add_subdirectory(ydb/core/tablet_flat)
add_subdirectory(ydb/core/tablet)
add_subdirectory(ydb/core/mon_alloc)
add_subdirectory(library/cpp/lfalloc/alloc_profiler)
@@ -446,19 +459,6 @@ add_subdirectory(ydb/library/persqueue/topic_parser_public)
add_subdirectory(ydb/core/tablet_flat/protos)
add_subdirectory(ydb/core/blobstorage)
add_subdirectory(ydb/core/blobstorage/backpressure)
-add_subdirectory(ydb/core/blobstorage/lwtrace_probes)
-add_subdirectory(ydb/core/blobstorage/vdisk/common)
-add_subdirectory(ydb/core/blobstorage/vdisk/hulldb/base)
-add_subdirectory(ydb/core/blobstorage/pdisk)
-add_subdirectory(ydb/library/schlab)
-add_subdirectory(ydb/library/schlab/schine)
-add_subdirectory(ydb/library/schlab/probes)
-add_subdirectory(ydb/library/schlab/mon)
-add_subdirectory(ydb/library/schlab/schemu)
-add_subdirectory(ydb/library/schlab/schoot)
-add_subdirectory(library/cpp/protobuf/json)
-add_subdirectory(ydb/library/schlab/protos)
-add_subdirectory(ydb/core/blobstorage/vdisk/protos)
add_subdirectory(ydb/core/blobstorage/dsproxy)
add_subdirectory(ydb/core/blobstorage/storagepoolmon)
add_subdirectory(ydb/core/blobstorage/incrhuge)
@@ -1146,6 +1146,10 @@ add_subdirectory(ydb/library/yql/minikql/perf/param)
add_subdirectory(ydb/library/yql/minikql/perf/presort)
add_subdirectory(library/cpp/presort)
add_subdirectory(ydb/library/yql/sql/v1/perf)
+add_subdirectory(ydb/library/schlab/mon/test)
+add_subdirectory(ydb/library/schlab/mon/static)
+add_subdirectory(ydb/library/schlab/mon/static/css)
+add_subdirectory(ydb/library/schlab/mon/static/js)
add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_blob_depot)
add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_donor)
add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_group_reconfiguration)
@@ -1165,10 +1169,6 @@ add_subdirectory(ydb/core/blobstorage/nodewarden/ut)
add_subdirectory(ydb/core/blobstorage/nodewarden/ut_sequence)
add_subdirectory(ydb/core/blobstorage/pdisk/ut)
add_subdirectory(ydb/core/blobstorage/storagepoolmon/ut)
-add_subdirectory(ydb/library/schlab/mon/test)
-add_subdirectory(ydb/library/schlab/mon/static)
-add_subdirectory(ydb/library/schlab/mon/static/css)
-add_subdirectory(ydb/library/schlab/mon/static/js)
add_subdirectory(ydb/core/tx/balance_coverage/ut)
add_subdirectory(ydb/core/tx/columnshard/ut)
add_subdirectory(ydb/core/tx/coordinator/ut)
diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt
index 8427acedd0..790425d950 100644
--- a/CMakeLists.linux.txt
+++ b/CMakeLists.linux.txt
@@ -431,8 +431,21 @@ add_subdirectory(library/cpp/ipmath)
add_subdirectory(library/cpp/ipv6_address)
add_subdirectory(library/cpp/int128)
add_subdirectory(ydb/core/blob_depot)
-add_subdirectory(ydb/core/tablet_flat)
+add_subdirectory(ydb/core/blobstorage/vdisk/common)
+add_subdirectory(ydb/core/blobstorage/vdisk/hulldb/base)
+add_subdirectory(ydb/core/blobstorage/pdisk)
+add_subdirectory(ydb/core/blobstorage/lwtrace_probes)
add_subdirectory(ydb/core/control)
+add_subdirectory(ydb/library/schlab)
+add_subdirectory(ydb/library/schlab/schine)
+add_subdirectory(ydb/library/schlab/probes)
+add_subdirectory(ydb/library/schlab/mon)
+add_subdirectory(ydb/library/schlab/schemu)
+add_subdirectory(ydb/library/schlab/schoot)
+add_subdirectory(library/cpp/protobuf/json)
+add_subdirectory(ydb/library/schlab/protos)
+add_subdirectory(ydb/core/blobstorage/vdisk/protos)
+add_subdirectory(ydb/core/tablet_flat)
add_subdirectory(ydb/core/tablet)
add_subdirectory(ydb/core/mon_alloc)
add_subdirectory(library/cpp/lfalloc/alloc_profiler)
@@ -449,19 +462,6 @@ add_subdirectory(ydb/library/persqueue/topic_parser_public)
add_subdirectory(ydb/core/tablet_flat/protos)
add_subdirectory(ydb/core/blobstorage)
add_subdirectory(ydb/core/blobstorage/backpressure)
-add_subdirectory(ydb/core/blobstorage/lwtrace_probes)
-add_subdirectory(ydb/core/blobstorage/vdisk/common)
-add_subdirectory(ydb/core/blobstorage/vdisk/hulldb/base)
-add_subdirectory(ydb/core/blobstorage/pdisk)
-add_subdirectory(ydb/library/schlab)
-add_subdirectory(ydb/library/schlab/schine)
-add_subdirectory(ydb/library/schlab/probes)
-add_subdirectory(ydb/library/schlab/mon)
-add_subdirectory(ydb/library/schlab/schemu)
-add_subdirectory(ydb/library/schlab/schoot)
-add_subdirectory(library/cpp/protobuf/json)
-add_subdirectory(ydb/library/schlab/protos)
-add_subdirectory(ydb/core/blobstorage/vdisk/protos)
add_subdirectory(ydb/core/blobstorage/dsproxy)
add_subdirectory(ydb/core/blobstorage/storagepoolmon)
add_subdirectory(ydb/core/blobstorage/incrhuge)
@@ -1168,6 +1168,10 @@ add_subdirectory(ydb/library/yql/minikql/perf/param)
add_subdirectory(ydb/library/yql/minikql/perf/presort)
add_subdirectory(library/cpp/presort)
add_subdirectory(ydb/library/yql/sql/v1/perf)
+add_subdirectory(ydb/library/schlab/mon/test)
+add_subdirectory(ydb/library/schlab/mon/static)
+add_subdirectory(ydb/library/schlab/mon/static/css)
+add_subdirectory(ydb/library/schlab/mon/static/js)
add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_blob_depot)
add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_donor)
add_subdirectory(ydb/core/blobstorage/ut_blobstorage/ut_group_reconfiguration)
@@ -1189,10 +1193,6 @@ add_subdirectory(ydb/core/blobstorage/nodewarden/ut)
add_subdirectory(ydb/core/blobstorage/nodewarden/ut_sequence)
add_subdirectory(ydb/core/blobstorage/pdisk/ut)
add_subdirectory(ydb/core/blobstorage/storagepoolmon/ut)
-add_subdirectory(ydb/library/schlab/mon/test)
-add_subdirectory(ydb/library/schlab/mon/static)
-add_subdirectory(ydb/library/schlab/mon/static/css)
-add_subdirectory(ydb/library/schlab/mon/static/js)
add_subdirectory(ydb/core/tx/balance_coverage/ut)
add_subdirectory(ydb/core/tx/columnshard/ut)
add_subdirectory(ydb/core/tx/coordinator/ut)
diff --git a/ydb/core/blob_depot/CMakeLists.txt b/ydb/core/blob_depot/CMakeLists.txt
index 1890d87354..b3f2690218 100644
--- a/ydb/core/blob_depot/CMakeLists.txt
+++ b/ydb/core/blob_depot/CMakeLists.txt
@@ -11,6 +11,7 @@ add_library(ydb-core-blob_depot)
target_link_libraries(ydb-core-blob_depot PUBLIC
contrib-libs-cxxsupp
yutil
+ blobstorage-vdisk-common
ydb-core-tablet_flat
ydb-core-protos
)
@@ -18,6 +19,7 @@ target_sources(ydb-core-blob_depot PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blob_depot.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/assimilator.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/assimilator_fetch_machine.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blocks.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_gc.cpp
diff --git a/ydb/core/blob_depot/assimilator.cpp b/ydb/core/blob_depot/assimilator.cpp
index d6528981c4..5b2c9790e9 100644
--- a/ydb/core/blob_depot/assimilator.cpp
+++ b/ydb/core/blob_depot/assimilator.cpp
@@ -1,20 +1,176 @@
#include "blob_depot_tablet.h"
+#include "assimilator_fetch_machine.h"
namespace NKikimr::NBlobDepot {
class TBlobDepot::TGroupAssimilator : public TActorBootstrapped<TGroupAssimilator> {
+ enum {
+ EvReconnectToController = EventSpaceBegin(TEvents::ES_PRIVATE),
+ };
+
const ui32 GroupId;
- const TActorId BlobDepotId;
+ const NKikimrBlobDepot::TBlobDepotConfig Config;
+ TActorId BlobDepotId;
+ TIntrusivePtr<TBlobStorageGroupInfo> Info;
public:
- TGroupAssimilator(ui32 groupId, TActorId blobDepotId)
+ TGroupAssimilator(ui32 groupId, const NKikimrBlobDepot::TBlobDepotConfig& config)
: GroupId(groupId)
- , BlobDepotId(blobDepotId)
- {}
+ , Config(config)
+ {
+ Y_VERIFY(Config.GetOperationMode() == NKikimrBlobDepot::EOperationMode::VirtualGroup);
+ }
+
+ void Bootstrap(TActorId parentId) {
+ BlobDepotId = parentId;
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT31, "TGroupAssimilator::Bootstrap", (GroupId, GroupId));
+ QueryGroupConfiguration();
+ }
+
+ void PassAway() override {
+ FetchMachine->OnPassAway();
+ TActorBootstrapped::PassAway();
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // BSC interaction
+
+ TActorId ControllerPipeId;
+
+ void QueryGroupConfiguration() {
+ TGroupID groupId(GroupId);
+ if (groupId.ConfigurationType() != EGroupConfigurationType::Dynamic) {
+ AbortWithError("group configuration type is not dynamic");
+ }
+
+ const ui64 controllerId = MakeBSControllerID(groupId.AvailabilityDomainID());
+ ControllerPipeId = Register(NTabletPipe::CreateClient(SelfId(), controllerId));
+ Become(&TThis::StateQueryController);
+ }
+
+ STRICT_STFUNC(StateQueryController,
+ cFunc(TEvents::TSystem::Poison, PassAway);
+
+ cFunc(EvReconnectToController, QueryGroupConfiguration);
+ hFunc(TEvTabletPipe::TEvClientConnected, Handle);
+ hFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
+ hFunc(TEvBlobStorage::TEvControllerNodeServiceSetUpdate, Handle);
+ );
+
+ void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT00, "TGroupAssimilator::TEvClientConnected", (GroupId, GroupId));
+ Y_VERIFY(ev->Get()->ClientId == ControllerPipeId);
+ if (ev->Get()->Status == NKikimrProto::OK) {
+ NTabletPipe::SendData(SelfId(), ControllerPipeId, new TEvBlobStorage::TEvControllerGetGroup(0, GroupId));
+ } else {
+ Reconnect();
+ }
+ }
+
+ void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT32, "TGroupAssimilator::TEvClientDestroyed", (GroupId, GroupId));
+ Y_VERIFY(ev->Get()->ClientId == ControllerPipeId);
+ Reconnect();
+ }
+
+ void Handle(TEvBlobStorage::TEvControllerNodeServiceSetUpdate::TPtr ev) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT33, "TGroupAssimilator::TEvControllerNodeServiceSetUpdate", (GroupId, GroupId),
+ (Msg, ev->Get()->ToString()));
+ NTabletPipe::CloseAndForgetClient(SelfId(), ControllerPipeId);
+
+ auto& record = ev->Get()->Record;
+ if (record.HasStatus() && record.GetStatus() == NKikimrProto::OK && record.HasServiceSet()) {
+ const auto& ss = record.GetServiceSet();
+ for (const auto& group : ss.GetGroups()) {
+ if (group.GetGroupID() == GroupId) {
+ if (group.GetEntityStatus() == NKikimrBlobStorage::EEntityStatus::DESTROY) {
+ return AbortWithError("the group being decommitted was destroyed");
+ } else if (!group.HasAssimilatorGroupId()) {
+ return AbortWithError("the group being decommitted is not in assimilation mode");
+ } else if (group.HasBlobDepotId()) {
+ const TString msg = "the group being decommitted is a virtual one";
+ Y_VERIFY_DEBUG(false, "%s", msg.data());
+ STLOG(PRI_CRIT, BLOB_DEPOT, BDT35, msg, (GroupId, GroupId));
+ return AbortWithError(msg);
+ } else {
+ Info = TBlobStorageGroupInfo::Parse(group, nullptr, nullptr);
+ StartAssimilation();
+ return;
+ }
+ }
+ }
+ }
+
+ // retry operation in some time
+ Reconnect();
+ }
- void Bootstrap() {
- (void)GroupId;
+ void Reconnect() {
+ NTabletPipe::CloseAndForgetClient(SelfId(), ControllerPipeId);
+ TActivationContext::Schedule(TDuration::Seconds(1), new IEventHandle(EvReconnectToController, 0,
+ SelfId(), {}, nullptr, 0));
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ std::unique_ptr<TGroupAssimilatorFetchMachine> FetchMachine;
+
+ void StartAssimilation() {
+ Become(&TThis::StateAssimilate);
+ FetchMachine = std::make_unique<TGroupAssimilatorFetchMachine>(SelfId(), Info, BlobDepotId);
+ }
+
+ void StateAssimilate(STFUNC_SIG) {
+ Y_UNUSED(ctx);
+
+ switch (ev->GetTypeRewrite()) {
+ cFunc(TEvents::TSystem::Poison, PassAway);
+ IgnoreFunc(TEvTabletPipe::TEvClientDestroyed);
+
+ default:
+ return FetchMachine->Handle(ev);
+ }
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ void AbortWithError(TString error) {
+ STLOG(PRI_ERROR, BLOB_DEPOT, BDT34, "failed to assimilate group", (GroupId, GroupId), (Error, error));
+ TActivationContext::Send(new IEventHandle(TEvents::TSystem::Gone, 0, BlobDepotId, SelfId(), nullptr, GroupId));
+ PassAway();
}
};
+ void TBlobDepot::StartGroupAssimilators() {
+ for (const ui32 groupId : Config.GetDecommittingGroups()) {
+ if (Config.GetOperationMode() != NKikimrBlobDepot::EOperationMode::VirtualGroup) {
+ STLOG(PRI_CRIT, BLOB_DEPOT, BDT36, "incorrect operating mode of BlobDepot", (TabletId, TabletID()));
+ Y_VERIFY_DEBUG(false, "incorrect operating mode of BlobDepot");
+ return;
+ }
+
+ StartGroupAssimilator(groupId);
+ }
+ }
+
+ void TBlobDepot::StartGroupAssimilator(ui32 groupId) {
+ if (RunningGroupAssimilators.contains(groupId)) {
+ return;
+ }
+
+ RunningGroupAssimilators[groupId] = Register(new TGroupAssimilator(groupId, Config));
+ }
+
+ void TBlobDepot::HandleGone(TAutoPtr<IEventHandle> ev) {
+ if (const auto it = RunningGroupAssimilators.find(ev->Cookie); it != RunningGroupAssimilators.end()) {
+ Y_VERIFY(it->second == ev->Sender);
+ RunningGroupAssimilators.erase(it);
+ } else {
+ Y_FAIL("unexpected event");
+ }
+ }
+
+ void TBlobDepot::Handle(TEvAssimilatedData::TPtr /*ev*/) {
+ }
+
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/assimilator_fetch_machine.cpp b/ydb/core/blob_depot/assimilator_fetch_machine.cpp
new file mode 100644
index 0000000000..a9910d2584
--- /dev/null
+++ b/ydb/core/blob_depot/assimilator_fetch_machine.cpp
@@ -0,0 +1,286 @@
+#include "assimilator_fetch_machine.h"
+
+namespace NKikimr::NBlobDepot {
+
+ using TFetchMachine = TBlobDepot::TGroupAssimilatorFetchMachine;
+
+ TFetchMachine::TGroupAssimilatorFetchMachine(TActorIdentity self, TIntrusivePtr<TBlobStorageGroupInfo> info,
+ TActorId blobDepotId)
+ : Self(self)
+ , Info(std::move(info))
+ , BlobDepotId(blobDepotId)
+ {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT38, "TGroupAssimilatorFetchMachine start", (GroupId, Info->GroupID));
+ PerDiskState.resize(Info->GetTotalVDisksNum());
+ VDisksInHeap.emplace(&Info->GetTopology());
+ for (ui32 i = 0; i < PerDiskState.size(); ++i) {
+ const TActorId actorId = Info->GetActorId(i);
+ const ui32 nodeId = actorId.NodeId();
+ TNodeInfo& node = Nodes[nodeId];
+ node.OrderNumbers.push_back(i);
+ }
+ for (const auto& [nodeId, node] : Nodes) {
+ if (nodeId != Self.NodeId()) {
+ TActivationContext::Send(new IEventHandle(TEvInterconnect::EvConnectNode, 0,
+ TActivationContext::InterconnectProxy(nodeId), Self, nullptr, 0));
+ } else {
+ for (const ui32 orderNumber : node.OrderNumbers) {
+ IssueAssimilateCmdToVDisk(orderNumber);
+ }
+ }
+ }
+ }
+
+ void TFetchMachine::Handle(TAutoPtr<IEventHandle>& ev) {
+ switch (const ui32 type = ev->GetTypeRewrite()) {
+ hFunc(TEvBlobStorage::TEvVAssimilateResult, Handle);
+ hFunc(TEvents::TEvUndelivered, Handle);
+ hFunc(TEvInterconnect::TEvNodeConnected, Handle);
+ hFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
+
+ default:
+ Y_VERIFY_DEBUG(false, "unexpected event Type# %08" PRIx32, type);
+ }
+ }
+
+ void TFetchMachine::OnPassAway() {
+ for (const auto& [nodeId, node] : Nodes) {
+ if (nodeId != Self.NodeId()) {
+ TActivationContext::Send(new IEventHandle(TEvents::TSystem::Unsubscribe, 0,
+ TActivationContext::InterconnectProxy(nodeId), Self, nullptr, 0));
+ }
+ }
+ }
+
+ void TFetchMachine::IssueAssimilateCmdToVDisk(ui32 orderNumber) {
+ const TActorId actorId = Info->GetActorId(orderNumber);
+
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT39, "IssueAssimilateCmdToVDisk", (GroupId, Info->GroupID),
+ (OrderNumber, orderNumber), (ActorId, actorId));
+
+ TPerDiskState& state = PerDiskState[orderNumber];
+ Y_VERIFY(!state.Finished);
+
+ auto ev = std::make_unique<TEvBlobStorage::TEvVAssimilate>(Info->GetVDiskId(orderNumber));
+ auto& record = ev->Record;
+ if (state.LastBlock) {
+ record.SetSkipBlocksUpTo(*state.LastBlock);
+ }
+ if (state.LastBarrier) {
+ auto *x = record.MutableSkipBarriersUpTo();
+ x->SetTabletId(std::get<0>(*state.LastBarrier));
+ x->SetChannel(std::get<1>(*state.LastBarrier));
+ }
+ if (state.LastBlob) {
+ LogoBlobIDFromLogoBlobID(*state.LastBlob, record.MutableSkipBlobsUpTo());
+ }
+
+ const ui64 id = ++LastRequestId;
+ Self.Send(actorId, ev.release(), IEventHandle::FlagTrackDelivery, id);
+
+ const auto [it, inserted] = RequestsInFlight.emplace(id, TRequestInFlight{orderNumber});
+ Y_VERIFY(inserted);
+
+ const ui32 nodeId = actorId.NodeId();
+ Nodes[nodeId].RequestsInFlight.insert(&*it);
+ }
+
+ void TFetchMachine::Handle(TEvInterconnect::TEvNodeConnected::TPtr ev) {
+ const ui32 nodeId = ev->Get()->NodeId;
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT40, "NodeConnected", (GroupId, Info->GroupID), (NodeId, nodeId));
+ TNodeInfo& node = Nodes[nodeId];
+ for (const ui32 orderNumber : node.OrderNumbers) {
+ auto& state = PerDiskState[orderNumber];
+ if (!state.Finished) {
+ IssueAssimilateCmdToVDisk(orderNumber);
+ }
+ }
+ }
+
+ void TFetchMachine::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr ev) {
+ const ui32 nodeId = ev->Get()->NodeId;
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT41, "NodeDisconnected", (GroupId, Info->GroupID), (NodeId, nodeId));
+ TNodeInfo& node = Nodes[nodeId];
+ for (const auto *kv : std::exchange(node.RequestsInFlight, {})) {
+ const size_t num = RequestsInFlight.erase(kv->first);
+ Y_VERIFY(num == 1);
+ }
+ }
+
+ void TFetchMachine::Handle(TEvents::TEvUndelivered::TPtr ev) {
+ if (ev->Get()->SourceType == TEvBlobStorage::EvVAssimilate) {
+ // TODO: undelivery may be caused by moving VDisk actor out, handle it
+ EndRequest(ev->Cookie);
+ }
+ }
+
+ ui32 TFetchMachine::EndRequest(ui64 id) {
+ const auto it = RequestsInFlight.find(id);
+ Y_VERIFY(it != RequestsInFlight.end());
+ const ui32 orderNumber = it->second.OrderNumber;
+ const TActorId actorId = Info->GetActorId(orderNumber);
+ const ui32 nodeId = actorId.NodeId();
+ TNodeInfo& node = Nodes[nodeId];
+ const size_t num = node.RequestsInFlight.erase(&*it);
+ Y_VERIFY(num == 1);
+ RequestsInFlight.erase(it);
+ return orderNumber;
+ }
+
+ void TFetchMachine::Handle(TEvBlobStorage::TEvVAssimilateResult::TPtr ev) {
+ const ui32 orderNumber = EndRequest(ev->Cookie);
+ const auto& record = ev->Get()->Record;
+
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT42, "EvVAssimilate", (GroupId, Info->GroupID), (Id, ev->Cookie),
+ (OrderNumber, orderNumber), (Status, record.GetStatus()), (Blocks.size, record.BlocksSize()),
+ (Barriers.size, record.BarriersSize()), (Blobs.size, record.BlobsSize()));
+
+ TPerDiskState& state = PerDiskState[orderNumber];
+
+ const bool wasExhausted = state.Exhausted();
+
+ for (const auto& item : record.GetBlocks()) {
+ const ui64 tabletId = item.GetTabletId();
+ state.LastBlock.emplace(tabletId);
+ if (CurrentBlock <= *state.LastBlock) {
+ state.Blocks.emplace_back(item);
+ }
+ }
+ for (const auto& item : record.GetBarriers()) {
+ const ui64 tabletId = item.GetTabletId();
+ const ui8 channel = item.GetChannel();
+ state.LastBarrier.emplace(tabletId, channel);
+ if (CurrentBarrier <= *state.LastBarrier) {
+ state.Barriers.emplace_back(item);
+ }
+ }
+ ui64 raw[3] = {0, 0, 0};
+ for (const auto& item : record.GetBlobs()) {
+ if (item.HasRawX1()) {
+ raw[0] = item.GetRawX1();
+ } else if (item.HasDiffX1()) {
+ raw[0] += item.GetDiffX1();
+ }
+ if (item.HasRawX2()) {
+ raw[1] = item.GetRawX2();
+ } else if (item.HasDiffX2()) {
+ raw[1] += item.GetDiffX2();
+ }
+ if (item.HasRawX3()) {
+ raw[2] = item.GetRawX3();
+ } else if (item.HasDiffX3()) {
+ raw[2] += item.GetDiffX3();
+ }
+ const TLogoBlobID id(raw);
+ state.LastBlob.emplace(id);
+ if (CurrentBlob <= *state.LastBlob) {
+ state.Blobs.emplace_back(item, id);
+ }
+ }
+
+ if (wasExhausted && !state.Exhausted()) {
+ Heap.push_back(&state);
+ std::push_heap(Heap.begin(), Heap.end(), TPerDiskState::THeapCompare());
+ *VDisksInHeap |= {&Info->GetTopology(), Info->GetVDiskId(orderNumber)};
+ }
+
+ if (record.BlocksSize() + record.BarriersSize() + record.BlobsSize() == 0 && record.GetStatus() == NKikimrProto::OK) {
+ state.Finished = true;
+ } else if (state.Exhausted()) { // still no records; for example, when all were skipped
+ return IssueAssimilateCmdToVDisk(orderNumber);
+ }
+
+ Merge();
+ }
+
+ void TFetchMachine::Merge() {
+ auto ev = std::make_unique<TEvAssimilatedData>(Info->GroupID);
+
+ bool quorumCorrect = Info->GetQuorumChecker().CheckQuorumForGroup(*VDisksInHeap);
+ while (quorumCorrect) {
+ if (Heap.empty()) {
+ CurrentBlob = Max<TLogoBlobID>();
+ break;
+ }
+
+ std::optional<TBlock> block;
+ std::optional<TBarrier> barrier;
+ std::optional<TBlob> blob;
+
+ auto callback = [&](auto&& value, ui32 orderNumber) {
+ using T = std::decay_t<decltype(value)>;
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT37, "AssimilatedItem", (GroupId, Info->GroupID), (OrderNumber, orderNumber),
+ (Value, value));
+ if constexpr (std::is_same_v<T, TBlock>) {
+ Y_VERIFY(CurrentBlock <= value.TabletId);
+ CurrentBlock = value.TabletId;
+ if (block) {
+ block->Merge(value);
+ } else {
+ block.emplace(std::move(value));
+ }
+ } else if constexpr (std::is_same_v<T, TBarrier>) {
+ CurrentBlock = Max<ui64>();
+ Y_VERIFY(CurrentBarrier <= std::make_tuple(value.TabletId, value.Channel));
+ CurrentBarrier = {value.TabletId, value.Channel};
+ if (barrier) {
+ barrier->Merge(value);
+ } else {
+ barrier.emplace(std::move(value));
+ }
+ } else if constexpr (std::is_same_v<T, TBlob>) {
+ CurrentBarrier = {Max<ui64>(), Max<ui8>()};
+ Y_VERIFY(CurrentBlob <= value.Id);
+ CurrentBlob = value.Id;
+ if (blob) {
+ blob->Merge(value);
+ } else {
+ blob.emplace(std::move(value));
+ }
+ // special handling for part layout here -- we can't just merge the blobs
+ } else {
+ static_assert(TDependentFalse<T>, "incorrect case");
+ }
+ };
+
+ TPerDiskState& head = *Heap.front();
+ auto key = head.FirstKey();
+ while (!Heap.empty() && Heap.front()->FirstKey() == key) {
+ std::pop_heap(Heap.begin(), Heap.end(), TPerDiskState::THeapCompare());
+ const ui32 orderNumber = Heap.back() - PerDiskState.data();
+ TPerDiskState& item = PerDiskState[orderNumber];
+ item.PopFirstItem(std::bind(callback, std::placeholders::_1, orderNumber));
+ if (item.Exhausted()) {
+ if (!item.Finished) {
+ // data not yet received -- ask for it
+ IssueAssimilateCmdToVDisk(orderNumber);
+ // mark disk temporarily unavailable
+ *VDisksInHeap -= {&Info->GetTopology(), Info->GetVDiskId(orderNumber)};
+ quorumCorrect = Info->GetQuorumChecker().CheckQuorumForGroup(*VDisksInHeap);
+ }
+ // remove item from the heap -- it has no valid data to process
+ Heap.pop_back();
+ } else {
+ // more items to do
+ std::push_heap(Heap.begin(), Heap.end(), TPerDiskState::THeapCompare());
+ }
+ }
+
+ if (block) {
+ ev->Blocks.push_back(std::move(*block));
+ } else if (barrier) {
+ ev->Barriers.push_back(std::move(*barrier));
+ } else if (blob) {
+ ev->Blobs.push_back(std::move(*blob));
+ } else {
+ Y_FAIL();
+ }
+ }
+
+ ev->BlobsFinished = CurrentBlob == Max<TLogoBlobID>();
+ ev->BarriersFinished = ev->BlobsFinished || CurrentBarrier == std::make_tuple(Max<ui64>(), Max<ui8>());
+ ev->BlocksFinished = ev->BarriersFinished || CurrentBlock == Max<ui64>();
+ Self.Send(BlobDepotId, ev.release());
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/assimilator_fetch_machine.h b/ydb/core/blob_depot/assimilator_fetch_machine.h
new file mode 100644
index 0000000000..27469f027a
--- /dev/null
+++ b/ydb/core/blob_depot/assimilator_fetch_machine.h
@@ -0,0 +1,132 @@
+#pragma once
+
+#include "defs.h"
+#include "blob_depot_tablet.h"
+
+namespace NKikimr::NBlobDepot {
+
+ class TBlobDepot::TGroupAssimilatorFetchMachine {
+ TActorIdentity Self;
+ TIntrusivePtr<TBlobStorageGroupInfo> Info;
+ TActorId BlobDepotId;
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Data processing
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ struct TPerDiskState {
+ std::deque<TBlock> Blocks;
+ std::optional<ui64> LastBlock;
+ std::deque<TBarrier> Barriers;
+ std::optional<std::tuple<ui64, ui8>> LastBarrier;
+ std::deque<TBlob> Blobs;
+ std::optional<TLogoBlobID> LastBlob;
+ bool Finished = false;
+
+ ui64 FirstBlock() const {
+ Y_VERIFY_DEBUG(!Blocks.empty());
+ return Blocks.front().TabletId;
+ }
+
+ std::tuple<ui64, ui8> FirstBarrier() const {
+ Y_VERIFY_DEBUG(!Barriers.empty());
+ const auto& barrier = Barriers.front();
+ return {barrier.TabletId, barrier.Channel};
+ }
+
+ TLogoBlobID FirstBlob() const {
+ Y_VERIFY_DEBUG(!Blobs.empty());
+ return Blobs.front().Id;
+ }
+
+ std::variant<ui64, std::tuple<ui64, ui8>, TLogoBlobID> FirstKey() const {
+ if (!Blocks.empty()) {
+ return FirstBlock();
+ } else if (!Barriers.empty()) {
+ return FirstBarrier();
+ } else if (!Blobs.empty()) {
+ return FirstBlob();
+ } else {
+ Y_FAIL();
+ }
+ }
+
+ template<typename T>
+ void PopFirstItem(T&& callback) {
+ if (!Blocks.empty()) {
+ callback(Blocks.front());
+ Blocks.pop_front();
+ } else if (!Barriers.empty()) {
+ callback(Barriers.front());
+ Barriers.pop_front();
+ } else if (!Blobs.empty()) {
+ callback(Blobs.front());
+ Blobs.pop_front();
+ } else {
+ Y_FAIL();
+ }
+ }
+
+ struct THeapCompare {
+ bool operator ()(const TPerDiskState *x, const TPerDiskState *y) const {
+ if (!x->Blocks.empty() && !y->Blocks.empty()) {
+ return x->FirstBlock() > y->FirstBlock();
+ } else if (x->Blocks.empty() != y->Blocks.empty()) {
+ return x->Blocks.empty() > y->Blocks.empty();
+ } else if (!x->Barriers.empty() && !y->Barriers.empty()) {
+ return x->FirstBarrier() > y->FirstBarrier();
+ } else if (x->Barriers.empty() != y->Barriers.empty()) {
+ return x->Barriers.empty() > y->Barriers.empty();
+ } else if (!x->Blobs.empty() && !y->Blobs.empty()) {
+ return x->FirstBlob() > y->FirstBlob();
+ } else {
+ return x->Barriers.empty() > y->Barriers.empty();
+ }
+ }
+ };
+
+ bool Exhausted() const {
+ return Blocks.empty() && Barriers.empty() && Blobs.empty();
+ }
+ };
+
+ std::vector<TPerDiskState> PerDiskState;
+ std::vector<TPerDiskState*> Heap;
+ ui64 CurrentBlock = 0;
+ std::tuple<ui64, ui8> CurrentBarrier{0, 0};
+ TLogoBlobID CurrentBlob;
+ std::optional<TBlobStorageGroupInfo::TGroupVDisks> VDisksInHeap;
+
+ struct TRequestInFlight {
+ ui32 OrderNumber;
+ };
+
+ using TRequestsInFlight = THashMap<ui64, TRequestInFlight>;
+
+ struct TNodeInfo {
+ std::vector<ui32> OrderNumbers;
+ THashSet<TRequestsInFlight::value_type*> RequestsInFlight;
+ };
+
+ ui32 LastRequestId = 0;
+ TRequestsInFlight RequestsInFlight;
+ THashMap<ui32, TNodeInfo> Nodes;
+
+ public:
+ TGroupAssimilatorFetchMachine(TActorIdentity self, TIntrusivePtr<TBlobStorageGroupInfo> info,
+ TActorId blobDepotId);
+ void Handle(TAutoPtr<IEventHandle>& ev);
+ void OnPassAway();
+
+ private:
+ void IssueAssimilateCmdToVDisk(ui32 orderNumber);
+ void Handle(TEvInterconnect::TEvNodeConnected::TPtr ev);
+ void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr ev);
+ void Handle(TEvents::TEvUndelivered::TPtr ev);
+ ui32 EndRequest(ui64 id);
+ void Handle(TEvBlobStorage::TEvVAssimilateResult::TPtr ev);
+ void Merge();
+ void MergeDone();
+ };
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/blob_depot.cpp b/ydb/core/blob_depot/blob_depot.cpp
index bba9917e1a..a93015f652 100644
--- a/ydb/core/blob_depot/blob_depot.cpp
+++ b/ydb/core/blob_depot/blob_depot.cpp
@@ -40,6 +40,9 @@ namespace NKikimr::NBlobDepot {
hFunc(TEvTabletPipe::TEvServerConnected, Handle);
hFunc(TEvTabletPipe::TEvServerDisconnected, Handle);
+ fFunc(TEvents::TSystem::Gone, HandleGone);
+ hFunc(TEvAssimilatedData, Handle);
+
default:
if (!HandleDefaultEvents(ev, ctx)) {
Y_FAIL("unexpected event Type# 0x%08" PRIx32, type);
@@ -51,6 +54,14 @@ namespace NKikimr::NBlobDepot {
}
}
+ void TBlobDepot::PassAway() {
+ for (const auto& [_, actorId] : RunningGroupAssimilators) {
+ TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, actorId, SelfId(), nullptr, 0));
+ }
+
+ TActor::PassAway();
+ }
+
IActor *CreateBlobDepot(const TActorId& tablet, TTabletStorageInfo *info) {
return new TBlobDepot(tablet, info);
}
diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h
index e9e58aa960..55bd284f0c 100644
--- a/ydb/core/blob_depot/blob_depot_tablet.h
+++ b/ydb/core/blob_depot/blob_depot_tablet.h
@@ -15,9 +15,130 @@ namespace NKikimr::NBlobDepot {
struct TEvPrivate {
enum {
EvCheckExpiredAgents = EventSpaceBegin(TEvents::ES_PRIVATE),
+ EvAssimilatedData,
};
};
+ struct TBlock {
+ ui64 TabletId;
+ ui32 BlockedGeneration;
+
+ TBlock() = default;
+
+ TBlock(const NKikimrBlobStorage::TEvVAssimilateResult::TBlock& item)
+ : TabletId(item.GetTabletId())
+ , BlockedGeneration(item.GetBlockedGeneration())
+ {}
+
+ void Merge(TBlock& other) {
+ Y_VERIFY_DEBUG(other.TabletId == TabletId);
+ BlockedGeneration = Max(BlockedGeneration, other.BlockedGeneration);
+ }
+
+ TString ToString() const {
+ return TStringBuilder() << "{" << TabletId << ":" << BlockedGeneration << "}";
+ }
+ };
+
+ struct TBarrier {
+ struct TValue {
+ ui32 RecordGeneration;
+ ui32 PerGenerationCounter;
+ ui32 CollectGeneration;
+ ui32 CollectStep;
+
+ TValue(const NKikimrBlobStorage::TEvVAssimilateResult::TBarrier::TValue& value)
+ : RecordGeneration(value.GetRecordGeneration())
+ , PerGenerationCounter(value.GetPerGenerationCounter())
+ , CollectGeneration(value.GetCollectGeneration())
+ , CollectStep(value.GetCollectStep())
+ {}
+
+ void Merge(TValue& other) {
+ if (KeyAsTuple() < other.KeyAsTuple()) {
+ *this = other;
+ }
+ }
+
+ TString ToString() const {
+ return TStringBuilder() << "{" << RecordGeneration << ":" << PerGenerationCounter
+ << "=>" << CollectGeneration << ":" << CollectStep << "}";
+ }
+
+ std::tuple<ui32, ui32> KeyAsTuple() const {
+ return {RecordGeneration, PerGenerationCounter};
+ }
+ };
+
+ ui64 TabletId;
+ ui8 Channel;
+ std::optional<TValue> Hard;
+ std::optional<TValue> Soft;
+
+ TBarrier() = default;
+
+ TBarrier(const NKikimrBlobStorage::TEvVAssimilateResult::TBarrier& item)
+ : TabletId(item.GetTabletId())
+ , Channel(item.GetChannel())
+ , Hard(item.HasHard() ? std::make_optional(TValue(item.GetHard())) : std::nullopt)
+ , Soft(item.HasSoft() ? std::make_optional(TValue(item.GetSoft())) : std::nullopt)
+ {}
+
+ void Merge(TBarrier& other) {
+ Y_VERIFY_DEBUG(TabletId == other.TabletId && Channel == other.Channel);
+ if (Hard && other.Hard) {
+ Hard->Merge(*other.Hard);
+ } else if (other.Hard) {
+ Hard = std::move(other.Hard);
+ }
+ if (Soft && other.Soft) {
+ Soft->Merge(*other.Soft);
+ } else if (other.Soft) {
+ Soft = std::move(other.Soft);
+ }
+ }
+
+ TString ToString() const {
+ return TStringBuilder() << "{" << TabletId << ":" << int(Channel) << "@" << (Hard ? Hard->ToString() : "")
+ << "/" << (Soft ? Soft->ToString() : "") << "}";
+ }
+ };
+
+ struct TBlob {
+ TLogoBlobID Id;
+ ui64 Ingress;
+
+ TBlob() = default;
+
+ TBlob(const NKikimrBlobStorage::TEvVAssimilateResult::TBlob& item, const TLogoBlobID& id)
+ : Id(id)
+ , Ingress(item.GetIngress())
+ {}
+
+ void Merge(TBlob& other) {
+ Y_VERIFY_DEBUG(Id == other.Id);
+ Ingress |= other.Ingress;
+ }
+
+ TString ToString() const {
+ return TStringBuilder() << "{" << Id.ToString() << "/" << Ingress << "}";
+ }
+ };
+
+ struct TEvAssimilatedData : TEventLocal<TEvAssimilatedData, TEvPrivate::EvAssimilatedData> {
+ const ui32 GroupId;
+ std::deque<TBlock> Blocks;
+ bool BlocksFinished = false;
+ std::deque<TBarrier> Barriers;
+ bool BarriersFinished = false;
+ std::deque<TBlob> Blobs;
+ bool BlobsFinished = false;
+
+ TEvAssimilatedData(ui32 groupId)
+ : GroupId(groupId)
+ {}
+ };
+
public:
TBlobDepot(TActorId tablet, TTabletStorageInfo *info);
~TBlobDepot();
@@ -96,6 +217,11 @@ namespace NKikimr::NBlobDepot {
SignalTabletActive(TActivationContext::AsActorContext());
}
+ void StartOperation() {
+ InitChannelKinds();
+ StartGroupAssimilators();
+ }
+
void OnDetach(const TActorContext&) override {
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT26, "OnDetach", (TabletId, TabletID()));
@@ -108,6 +234,8 @@ namespace NKikimr::NBlobDepot {
PassAway();
}
+ void PassAway() override;
+
void InitChannelKinds();
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -180,7 +308,15 @@ namespace NKikimr::NBlobDepot {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Group assimilation
+ THashMap<ui32, TActorId> RunningGroupAssimilators;
+
class TGroupAssimilator;
+ class TGroupAssimilatorFetchMachine;
+
+ void StartGroupAssimilators();
+ void StartGroupAssimilator(ui32 groupId);
+ void HandleGone(TAutoPtr<IEventHandle> ev);
+ void Handle(TEvAssimilatedData::TPtr ev);
};
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/defs.h b/ydb/core/blob_depot/defs.h
index 8fceb52d73..b0b79bd6da 100644
--- a/ydb/core/blob_depot/defs.h
+++ b/ydb/core/blob_depot/defs.h
@@ -1,7 +1,10 @@
#pragma once
#include <ydb/core/base/tablet_pipe.h>
+#include <ydb/core/blobstorage/base/blobstorage_events.h>
#include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.h>
+#include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo_sets.h>
+#include <ydb/core/blobstorage/vdisk/common/vdisk_events.h>
#include <ydb/core/engine/minikql/flat_local_tx_factory.h>
#include <ydb/core/tablet_flat/tablet_flat_executed.h>
#include <ydb/core/tablet_flat/flat_cxx_database.h>
diff --git a/ydb/core/blob_depot/op_apply_config.cpp b/ydb/core/blob_depot/op_apply_config.cpp
index aaed6ed727..7c6f1353ac 100644
--- a/ydb/core/blob_depot/op_apply_config.cpp
+++ b/ydb/core/blob_depot/op_apply_config.cpp
@@ -50,7 +50,7 @@ namespace NKikimr::NBlobDepot {
(WasConfigured, WasConfigured));
if (!WasConfigured) {
- Self->InitChannelKinds();
+ Self->StartOperation();
}
TActivationContext::Send(Response.release());
}
diff --git a/ydb/core/blob_depot/op_load.cpp b/ydb/core/blob_depot/op_load.cpp
index e166186398..d3947b5ab9 100644
--- a/ydb/core/blob_depot/op_load.cpp
+++ b/ydb/core/blob_depot/op_load.cpp
@@ -90,7 +90,7 @@ namespace NKikimr::NBlobDepot {
(Configured, Configured));
if (Configured) {
- Self->InitChannelKinds();
+ Self->StartOperation();
}
Self->OnLoadFinished();
diff --git a/ydb/core/mind/bscontroller/config.cpp b/ydb/core/mind/bscontroller/config.cpp
index 5a7d62ee8a..0b2f820ac8 100644
--- a/ydb/core/mind/bscontroller/config.cpp
+++ b/ydb/core/mind/bscontroller/config.cpp
@@ -636,6 +636,8 @@ namespace NKikimr::NBsController {
Y_VERIFY(vslot->GroupId == groupId);
Y_VERIFY(vslot->GroupGeneration == group.Generation);
}
+ Y_VERIFY_DEBUG((group.DecommitStatus == NKikimrBlobStorage::EGroupDecommitStatus::NONE) ==
+ group.AssimilatorGroupId.Empty());
});
#endif
}
diff --git a/ydb/core/mind/bscontroller/get_group.cpp b/ydb/core/mind/bscontroller/get_group.cpp
index 25c41f3b58..78b5d28bca 100644
--- a/ydb/core/mind/bscontroller/get_group.cpp
+++ b/ydb/core/mind/bscontroller/get_group.cpp
@@ -18,17 +18,18 @@ public:
Self->TabletCounters->Cumulative()[NBlobStorageController::COUNTER_GET_GROUP_COUNT].Increment(1);
TRequestCounter counter(Self->TabletCounters, NBlobStorageController::COUNTER_GET_GROUP_USEC);
- auto request = std::move(Request);
- STLOG(PRI_DEBUG, BS_CONTROLLER, BSCTXGG01, "Handle TEvControllerGetGroup", (Request, request->Get()->Record));
+ STLOG(PRI_DEBUG, BS_CONTROLLER, BSCTXGG01, "Handle TEvControllerGetGroup", (Request, Request->Get()->Record));
- const auto& v = request->Get()->Record.GetGroupIDs();
+ const auto& v = Request->Get()->Record.GetGroupIDs();
TSet<ui32> groupIDsToRead(v.begin(), v.end());
- const TNodeId nodeId = request->Get()->Record.GetNodeID();
+ const TNodeId nodeId = Request->Get()->Record.GetNodeID();
auto res = std::make_unique<TEvBlobStorage::TEvControllerNodeServiceSetUpdate>(NKikimrProto::OK, nodeId);
Self->ReadGroups(groupIDsToRead, true, res.get(), nodeId);
- Response = std::make_unique<IEventHandle>(MakeBlobStorageNodeWardenID(nodeId), Self->SelfId(), res.release());
+ Response = std::make_unique<IEventHandle>(nodeId ? MakeBlobStorageNodeWardenID(nodeId) : Request->Sender,
+ Self->SelfId(), res.release());
+
return true;
}
@@ -38,6 +39,10 @@ public:
};
void TBlobStorageController::Handle(TEvBlobStorage::TEvControllerGetGroup::TPtr& ev) {
+ STLOG(PRI_DEBUG, BS_CONTROLLER, BSCTXGG02, "TEvControllerGetGroup", (Sender, ev->Sender), (Cookie, ev->Cookie),
+ (Recipient, ev->Recipient), (RecipientRewrite, ev->GetRecipientRewrite()), (Request, ev->Get()->Record),
+ (StopGivingGroups, StopGivingGroups));
+
if (!StopGivingGroups) {
Execute(new TTxGetGroup(ev, this));
}
diff --git a/ydb/core/mind/bscontroller/load_everything.cpp b/ydb/core/mind/bscontroller/load_everything.cpp
index e01099456a..3b7a9a2f95 100644
--- a/ydb/core/mind/bscontroller/load_everything.cpp
+++ b/ydb/core/mind/bscontroller/load_everything.cpp
@@ -200,6 +200,10 @@ public:
isVirtualGroup ? 0 : std::get<2>(geomIt->second));
group.DecommitStatus = groups.GetValueOrDefault<T::DecommitStatus>();
+ if (groups.HaveValue<T::AssimilatorGroupId>()) {
+ group.AssimilatorGroupId = groups.GetValue<T::AssimilatorGroupId>();
+ }
+
#define OPTIONAL(NAME) \
if (groups.HaveValue<T::NAME>()) { \
group.NAME = groups.GetValue<T::NAME>(); \
diff --git a/ydb/core/mind/bscontroller/register_node.cpp b/ydb/core/mind/bscontroller/register_node.cpp
index f1a88a8546..d4dac9cf48 100644
--- a/ydb/core/mind/bscontroller/register_node.cpp
+++ b/ydb/core/mind/bscontroller/register_node.cpp
@@ -352,7 +352,7 @@ void TBlobStorageController::ReadGroups(TSet<ui32>& groupIDsToRead, bool discard
}
SerializeGroupInfo(groupProto, *group, info.Name, scopeId);
- } else {
+ } else if (nodeId) {
// group is not listable, so we have to postpone the request from NW
group->WaitingNodes.insert(nodeId);
GetNode(nodeId).WaitingForGroups.insert(group->ID);
diff --git a/ydb/core/mind/bscontroller/virtual_group.cpp b/ydb/core/mind/bscontroller/virtual_group.cpp
index 1a21ea6a11..768bc8f68c 100644
--- a/ydb/core/mind/bscontroller/virtual_group.cpp
+++ b/ydb/core/mind/bscontroller/virtual_group.cpp
@@ -117,7 +117,7 @@ namespace NKikimr::NBsController {
}
group->DecommitStatus = NKikimrBlobStorage::EGroupDecommitStatus::PENDING;
group->AssimilatorGroupId = virtualGroup->ID;
- ++group->Generation; // advance group generation to push configs forcibly to all concerned nodes
+ group->ContentChanged = true; // advance group generation to push configs forcibly to all concerned nodes
NKikimrBlobDepot::TBlobDepotConfig blobDepotConfig;
if (!blobDepotConfig.ParseFromString(*virtualGroup->BlobDepotConfig)) {