diff options
author | shumkovnd <shumkovnd@yandex-team.com> | 2023-06-01 19:47:36 +0300 |
---|---|---|
committer | shumkovnd <shumkovnd@yandex-team.com> | 2023-06-01 19:47:36 +0300 |
commit | 1f257fb40788f27cb1952afec8343924db6a3d53 (patch) | |
tree | fcc0f149acc2e3042def70c0f5c7492e2fd56880 | |
parent | 85915cf42494dc5b4fecde3c4dbde42d8811c642 (diff) | |
download | ydb-1f257fb40788f27cb1952afec8343924db6a3d53.tar.gz |
Add subscribers to discovery cache
-rw-r--r-- | ydb/core/base/board_lookup.cpp | 21 | ||||
-rw-r--r-- | ydb/core/base/statestorage.h | 27 | ||||
-rw-r--r-- | ydb/core/discovery/discovery.cpp | 154 | ||||
-rw-r--r-- | ydb/core/discovery/discovery.h | 12 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_discovery.cpp | 11 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 1 | ||||
-rw-r--r-- | ydb/core/tx/replication/controller/controller.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/replication/controller/nodes_manager.cpp | 7 |
8 files changed, 159 insertions, 78 deletions
diff --git a/ydb/core/base/board_lookup.cpp b/ydb/core/base/board_lookup.cpp index d991cbc8101..94c1a59faef 100644 --- a/ydb/core/base/board_lookup.cpp +++ b/ydb/core/base/board_lookup.cpp @@ -64,7 +64,7 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> { TVector<TReplica> Replicas; - TMap<TActorId, TEvStateStorage::TEvBoardInfo::TInfoEntry> Info; + TMap<TActorId, TEvStateStorage::TBoardInfoEntry> Info; THashMap<TActorId, THashSet<ui32>> InfoReplicas; ui32 WaitForReplicasToSuccess; @@ -103,11 +103,12 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> { void NotAvailable() { if (CurrentStateFunc() != &TThis::StateSubscribe) { - Send(Owner, new TEvStateStorage::TEvBoardInfo(TEvStateStorage::TEvBoardInfo::EStatus::NotAvailable, Path)); + Send(Owner, new TEvStateStorage::TEvBoardInfo( + TEvStateStorage::TEvBoardInfo::EStatus::NotAvailable, Path, StateStorageGroupId)); } else { Send(Owner, new TEvStateStorage::TEvBoardInfoUpdate( - TEvStateStorage::TEvBoardInfo::EStatus::NotAvailable, Path + TEvStateStorage::TEvBoardInfo::EStatus::NotAvailable, Path, StateStorageGroupId ) ); } @@ -119,7 +120,7 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> { if ((!Subscriber && Stats.HasInfo == WaitForReplicasToSuccess) || (Subscriber && Stats.HasInfo + Stats.NoInfo == WaitForReplicasToSuccess)) { auto reply = MakeHolder<TEvStateStorage::TEvBoardInfo>( - TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path); + TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path, StateStorageGroupId); reply->InfoEntries = std::move(Info); Send(Owner, std::move(reply)); if (Subscriber) { @@ -213,7 +214,7 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> { } if (CurrentStateFunc() == &TThis::StateSubscribe) { - std::optional<TEvStateStorage::TEvBoardInfoUpdate::TInfoEntryUpdate> update; + std::optional<TEvStateStorage::TBoardInfoEntry> update; if (info.GetDropped()) { if (!replicas.empty()) { return; @@ -230,7 +231,7 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> { } if (update.has_value()) { auto reply = MakeHolder<TEvStateStorage::TEvBoardInfoUpdate>( - TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path); + TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path, StateStorageGroupId); reply->Updates = { { oid, std::move(update.value()) } }; Send(Owner, std::move(reply)); } @@ -279,7 +280,7 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> { ++Stats.HasInfo; bool isStateSubscribe = (CurrentStateFunc() == &TThis::StateSubscribe); - TMap<TActorId, TEvStateStorage::TEvBoardInfoUpdate::TInfoEntryUpdate> updates; + TMap<TActorId, TEvStateStorage::TBoardInfoEntry> updates; for (const auto &x : record.GetInfo()) { const TActorId oid = ActorIdFromProto(x.GetOwner()); @@ -298,7 +299,7 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> { if (isStateSubscribe && !updates.empty()) { auto reply = MakeHolder<TEvStateStorage::TEvBoardInfoUpdate>( - TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path); + TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path, StateStorageGroupId); reply->Updates = std::move(updates); Send(Owner, std::move(reply)); } @@ -446,7 +447,7 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> { void ClearInfosByReplica(ui32 replicaIdx) { bool isStateSubscribe = (CurrentStateFunc() == &TThis::StateSubscribe); - TMap<TActorId, TEvStateStorage::TEvBoardInfoUpdate::TInfoEntryUpdate> updates; + TMap<TActorId, TEvStateStorage::TBoardInfoEntry> updates; const auto& replica = Replicas[replicaIdx]; for (auto infoId : replica.Infos) { @@ -466,7 +467,7 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> { } if (isStateSubscribe && !updates.empty()) { auto reply = MakeHolder<TEvStateStorage::TEvBoardInfoUpdate>( - TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path); + TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path, StateStorageGroupId); reply->Updates = std::move(updates); Send(Owner, std::move(reply)); } diff --git a/ydb/core/base/statestorage.h b/ydb/core/base/statestorage.h index b7ac8c124a4..5375da33e5a 100644 --- a/ydb/core/base/statestorage.h +++ b/ydb/core/base/statestorage.h @@ -429,6 +429,11 @@ struct TEvStateStorage { } }; + struct TBoardInfoEntry { + TString Payload; + bool Dropped = false; + }; + struct TEvBoardInfo : public TEventLocal<TEvBoardInfo, EvBoardInfo> { enum class EStatus { Unknown, @@ -436,40 +441,36 @@ struct TEvStateStorage { NotAvailable, }; - struct TInfoEntry { - TString Payload; - }; - const EStatus Status; const TString Path; - TMap<TActorId, TInfoEntry> InfoEntries; + const ui32 StateStorageGroupId; + TMap<TActorId, TBoardInfoEntry> InfoEntries; - TEvBoardInfo(EStatus status, const TString &path) + TEvBoardInfo(EStatus status, const TString &path, ui32 stateStorageGroupId) : Status(status) , Path(path) + , StateStorageGroupId(stateStorageGroupId) {} TEvBoardInfo(const TEvBoardInfo &x) : Status(x.Status) , Path(x.Path) + , StateStorageGroupId(x.StateStorageGroupId) , InfoEntries(x.InfoEntries) {} }; struct TEvBoardInfoUpdate : public TEventLocal<TEvBoardInfoUpdate, EvBoardInfoUpdate> { - struct TInfoEntryUpdate { - TString Payload; - bool Dropped = false; - }; - const TEvBoardInfo::EStatus Status; const TString Path; - TMap<TActorId,TInfoEntryUpdate> Updates; + const ui32 StateStorageGroupId; + TMap<TActorId, TBoardInfoEntry> Updates; - TEvBoardInfoUpdate(TEvBoardInfo::EStatus status, const TString &path) + TEvBoardInfoUpdate(TEvBoardInfo::EStatus status, const TString &path, ui32 stateStorageGroupId) : Status(status) , Path(path) + , StateStorageGroupId(stateStorageGroupId) {} }; }; diff --git a/ydb/core/discovery/discovery.cpp b/ydb/core/discovery/discovery.cpp index a124bcecda7..93dc41a003a 100644 --- a/ydb/core/discovery/discovery.cpp +++ b/ydb/core/discovery/discovery.cpp @@ -123,21 +123,41 @@ namespace NDiscovery { return out; } - std::pair<TString, TString> CreateSerializedMessage( - const THolder<TEvStateStorage::TEvBoardInfo>& info, + NDiscovery::TCachedMessageData CreateCachedMessage( + const TMap<TActorId, TEvStateStorage::TBoardInfoEntry>& prevInfoEntries, + TMap<TActorId, TEvStateStorage::TBoardInfoEntry> newInfoEntries, TSet<TString> services, const THolder<TEvInterconnect::TEvNodeInfo>& nameserviceResponse) { + TMap<TActorId, TEvStateStorage::TBoardInfoEntry> infoEntries; + if (prevInfoEntries.empty()) { + infoEntries = std::move(newInfoEntries); + } else { + infoEntries = prevInfoEntries; + for (auto& [actorId, info] : newInfoEntries) { + if (info.Dropped) { + infoEntries.erase(actorId); + } else { + infoEntries[actorId].Payload = std::move(info.Payload); + } + } + } + + if (!nameserviceResponse) { + return {"", "", std::move(infoEntries)}; + } + TStackVec<const TString*> entries; - entries.reserve(info->InfoEntries.size()); - for (auto &xpair : info->InfoEntries) + entries.reserve(infoEntries.size()); + for (auto& xpair : infoEntries) { entries.emplace_back(&xpair.second.Payload); + } Shuffle(entries.begin(), entries.end()); - Ydb::Discovery::ListEndpointsResult result; - result.mutable_endpoints()->Reserve(info->InfoEntries.size()); + Ydb::Discovery::ListEndpointsResult cachedMessage; + cachedMessage.mutable_endpoints()->Reserve(infoEntries.size()); - Ydb::Discovery::ListEndpointsResult resultSsl; - resultSsl.mutable_endpoints()->Reserve(info->InfoEntries.size()); + Ydb::Discovery::ListEndpointsResult cachedMessageSsl; + cachedMessageSsl.mutable_endpoints()->Reserve(infoEntries.size()); THashMap<TEndpointKey, TEndpointState> states; THashMap<TEndpointKey, TEndpointState> statesSsl; @@ -150,9 +170,9 @@ namespace NDiscovery { } if (entry.GetSsl()) { - AddEndpoint(resultSsl, statesSsl, entry); + AddEndpoint(cachedMessageSsl, statesSsl, entry); } else { - AddEndpoint(result, states, entry); + AddEndpoint(cachedMessage, states, entry); } } @@ -160,12 +180,12 @@ namespace NDiscovery { if (nodeInfo && nodeInfo->Location.GetDataCenterId()) { const auto &location = nodeInfo->Location.GetDataCenterId(); if (IsSafeLocationMarker(location)) { - result.set_self_location(location); - resultSsl.set_self_location(location); + cachedMessage.set_self_location(location); + cachedMessageSsl.set_self_location(location); } } - return {SerializeResult(result), SerializeResult(resultSsl)}; + return {SerializeResult(cachedMessage), SerializeResult(cachedMessageSsl), std::move(infoEntries)}; } } @@ -189,8 +209,9 @@ namespace NDiscoveryPrivate { }; class TDiscoveryCache: public TActorBootstrapped<TDiscoveryCache> { - THashMap<TString, std::shared_ptr<NDiscovery::TCachedMessageData>> NewCachedMessages; - THashMap<TString, std::shared_ptr<NDiscovery::TCachedMessageData>> OldCachedMessages; + THashMap<TString, std::shared_ptr<NDiscovery::TCachedMessageData>> CurrentCachedMessages; + THashMap<TString, std::shared_ptr<NDiscovery::TCachedMessageData>> OldCachedMessages; // when subscriptions are enabled + THashMap<TString, std::shared_ptr<NDiscovery::TCachedMessageData>> CachedNotAvailable; // for subscriptions THolder<TEvInterconnect::TEvNodeInfo> NameserviceResponse; struct TWaiter { @@ -204,9 +225,13 @@ namespace NDiscoveryPrivate { auto Request(const TString& database, ui32 groupId) { auto result = Requested.emplace(database, TVector<TWaiter>()); if (result.second) { + auto mode = EBoardLookupMode::Second; + if (AppData()->FeatureFlags.GetEnableSubscriptionsInDiscovery()) { + mode = EBoardLookupMode::Subscription; + } CLOG_D("Lookup" << ": path# " << database); - Register(CreateBoardLookupActor(database, SelfId(), groupId, EBoardLookupMode::Second)); + Register(CreateBoardLookupActor(database, SelfId(), groupId, mode)); } return result.first; @@ -221,26 +246,56 @@ namespace NDiscoveryPrivate { NameserviceResponse.Reset(ev->Release().Release()); } + void Handle(TEvStateStorage::TEvBoardInfoUpdate::TPtr& ev) { + CLOG_T("Handle " << ev->Get()->ToString()); + if (!AppData()->FeatureFlags.GetEnableSubscriptionsInDiscovery()) { + return; + } + THolder<TEvStateStorage::TEvBoardInfoUpdate> msg = ev->Release(); + const auto& path = msg->Path; + + if (msg->Status != TEvStateStorage::TEvBoardInfo::EStatus::Ok) { + CurrentCachedMessages.erase(path); + return; + } + + auto& currentCachedMessage = CurrentCachedMessages[path]; + + Y_VERIFY(currentCachedMessage); + + currentCachedMessage = std::make_shared<NDiscovery::TCachedMessageData>( + NDiscovery::CreateCachedMessage( + currentCachedMessage->InfoEntries, std::move(msg->Updates), {}, NameserviceResponse) + ); + + auto it = Requested.find(path); + Y_VERIFY(it == Requested.end()); + } + void Handle(TEvStateStorage::TEvBoardInfo::TPtr& ev) { CLOG_T("Handle " << ev->Get()->ToString()); THolder<TEvStateStorage::TEvBoardInfo> msg = ev->Release(); const auto& path = msg->Path; - auto newCachedData = std::make_shared<NDiscovery::TCachedMessageData>(); + auto newCachedData = std::make_shared<NDiscovery::TCachedMessageData>( + NDiscovery::CreateCachedMessage({}, std::move(msg->InfoEntries), {}, NameserviceResponse) + ); + newCachedData->Status = msg->Status; - if (NameserviceResponse) { - auto result = NDiscovery::CreateSerializedMessage(msg, {}, NameserviceResponse); - newCachedData->CachedMessage = result.first; - newCachedData->CachedMessageSsl = result.second; + if (AppData()->FeatureFlags.GetEnableSubscriptionsInDiscovery()) { + if (msg->Status != TEvStateStorage::TEvBoardInfo::EStatus::Ok) { + CurrentCachedMessages.erase(path); + CachedNotAvailable[path] = newCachedData; + } else { + CurrentCachedMessages[path] = newCachedData; + } + } else { + CurrentCachedMessages[path] = newCachedData; + OldCachedMessages.erase(path); } - newCachedData->Info = std::move(msg); - - OldCachedMessages.erase(path); - NewCachedMessages.emplace(path, newCachedData); - if (auto it = Requested.find(path); it != Requested.end()) { for (const auto& waiter : it->second) { Send(waiter.ActorId, @@ -249,6 +304,7 @@ namespace NDiscoveryPrivate { Requested.erase(it); } + if (!Scheduled) { Scheduled = true; Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup()); @@ -256,8 +312,14 @@ namespace NDiscoveryPrivate { } void Wakeup() { - OldCachedMessages.swap(NewCachedMessages); - NewCachedMessages.clear(); + auto enableSubscriptions = AppData()->FeatureFlags.GetEnableSubscriptionsInDiscovery(); + + if (enableSubscriptions) { + CachedNotAvailable.clear(); + } else { + OldCachedMessages.swap(CurrentCachedMessages); + CurrentCachedMessages.clear(); + } if (!OldCachedMessages.empty()) { Scheduled = true; @@ -272,14 +334,24 @@ namespace NDiscoveryPrivate { const auto* msg = ev->Get(); - const auto* cachedData = NewCachedMessages.FindPtr(msg->Database); + auto enableSubscriptions = AppData()->FeatureFlags.GetEnableSubscriptionsInDiscovery(); + + const auto* cachedData = CurrentCachedMessages.FindPtr(msg->Database); if (cachedData == nullptr) { - cachedData = OldCachedMessages.FindPtr(msg->Database); - if (cachedData == nullptr) { - Request(msg->Database, msg->StateStorageId, {ev->Sender, ev->Cookie}); - return; + if (enableSubscriptions) { + cachedData = CachedNotAvailable.FindPtr(msg->Database); + if (cachedData == nullptr) { + Request(msg->Database, msg->StateStorageId, {ev->Sender, ev->Cookie}); + return; + } + } else { + cachedData = OldCachedMessages.FindPtr(msg->Database); + if (cachedData == nullptr) { + Request(msg->Database, msg->StateStorageId, {ev->Sender, ev->Cookie}); + return; + } + Request(msg->Database, msg->StateStorageId); } - Request(msg->Database, msg->StateStorageId); } Send(ev->Sender, new TEvDiscovery::TEvDiscoveryData(*cachedData), 0, ev->Cookie); @@ -300,6 +372,7 @@ namespace NDiscoveryPrivate { switch (ev->GetTypeRewrite()) { hFunc(TEvPrivate::TEvRequest, Handle); hFunc(TEvStateStorage::TEvBoardInfo, Handle); + hFunc(TEvStateStorage::TEvBoardInfoUpdate, Handle); hFunc(TEvInterconnect::TEvNodeInfo, Handle); sFunc(TEvents::TEvWakeup, Wakeup); sFunc(TEvents::TEvPoison, PassAway); @@ -358,10 +431,8 @@ public: void Handle(TEvDiscovery::TEvDiscoveryData::TPtr& ev) { Y_VERIFY(ev->Get()->CachedMessageData); - if (ev->Get()->CachedMessageData->Info) { - DLOG_T("Handle " << ev->Get()->CachedMessageData->Info->ToString() - << ": cookie# " << ev->Cookie); - } + DLOG_T("Handle " << ev->ToString() + << ": cookie# " << ev->Cookie); if (ev->Cookie != LookupCookie) { DLOG_D("Stale lookup response" @@ -442,10 +513,11 @@ public: } } - if (LookupResponse->CachedMessageData->Info && - LookupResponse->CachedMessageData->Info->Status != TEvStateStorage::TEvBoardInfo::EStatus::Ok) { + if (LookupResponse->CachedMessageData && + (LookupResponse->CachedMessageData->InfoEntries.empty() || + LookupResponse->CachedMessageData->Status != TEvStateStorage::TEvBoardInfo::EStatus::Ok)) { DLOG_D("Lookup error" - << ": status# " << ui64(LookupResponse->CachedMessageData->Info->Status)); + << ": status# " << ui64(LookupResponse->CachedMessageData->Status)); return Reply(new TEvDiscovery::TEvError(TEvDiscovery::TEvError::RESOLVE_ERROR, "Database nodes resolve failed with no certain result")); } diff --git a/ydb/core/discovery/discovery.h b/ydb/core/discovery/discovery.h index 01e5d5d0926..da4d046dfbc 100644 --- a/ydb/core/discovery/discovery.h +++ b/ydb/core/discovery/discovery.h @@ -13,7 +13,8 @@ namespace NDiscovery { struct TCachedMessageData { TString CachedMessage; TString CachedMessageSsl; - THolder<TEvStateStorage::TEvBoardInfo> Info; + TMap<TActorId, TEvStateStorage::TBoardInfoEntry> InfoEntries; // OwnerId -> Payload + TEvStateStorage::TEvBoardInfo::EStatus Status = TEvStateStorage::TEvBoardInfo::EStatus::Ok; }; } @@ -54,10 +55,11 @@ struct TEvDiscovery { }; namespace NDiscovery { - std::pair<TString, TString> CreateSerializedMessage( - const THolder<TEvStateStorage::TEvBoardInfo>&, - TSet<TString>, - const THolder<TEvInterconnect::TEvNodeInfo>&); + NDiscovery::TCachedMessageData CreateCachedMessage( + const TMap<TActorId, TEvStateStorage::TBoardInfoEntry>&, + TMap<TActorId, TEvStateStorage::TBoardInfoEntry>, + TSet<TString>, + const THolder<TEvInterconnect::TEvNodeInfo>&); } using TLookupPathFunc = std::function<TString(const TString&)>; diff --git a/ydb/core/grpc_services/rpc_discovery.cpp b/ydb/core/grpc_services/rpc_discovery.cpp index 820866e5c03..bec37d1ccfa 100644 --- a/ydb/core/grpc_services/rpc_discovery.cpp +++ b/ydb/core/grpc_services/rpc_discovery.cpp @@ -123,8 +123,8 @@ public: if (!NameserviceResponse || !LookupResponse) return; - Y_VERIFY(LookupResponse->CachedMessageData->Info && - LookupResponse->CachedMessageData->Info->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok); + Y_VERIFY(LookupResponse->CachedMessageData && !LookupResponse->CachedMessageData->InfoEntries.empty() && + LookupResponse->CachedMessageData->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok); const TSet<TString> services( Request->GetProtoRequest()->Getservice().begin(), Request->GetProtoRequest()->Getservice().end()); @@ -136,8 +136,11 @@ public: cachedMessage = LookupResponse->CachedMessageData->CachedMessage; cachedMessageSsl = LookupResponse->CachedMessageData->CachedMessageSsl; } else { - std::tie(cachedMessage, cachedMessageSsl) = NDiscovery::CreateSerializedMessage( - LookupResponse->CachedMessageData->Info, std::move(services), NameserviceResponse); + auto cachedMessageData = NDiscovery::CreateCachedMessage( + {}, std::move(LookupResponse->CachedMessageData->InfoEntries), + std::move(services), NameserviceResponse); + cachedMessage = std::move(cachedMessageData.CachedMessage); + cachedMessageSsl = std::move(cachedMessageData.CachedMessageSsl); } if (Request->SslServer()) { diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index d37ae97b329..f23b4750c80 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -809,6 +809,7 @@ message TFeatureFlags { optional bool EnableTopicSplitMerge = 95 [default = false]; optional bool EnableChangefeedDynamoDBStreamsFormat = 96 [default = false]; optional bool ForceColumnTablesCompositeMarks = 97 [default = false]; + optional bool EnableSubscriptionsInDiscovery = 98 [default = false]; } message THttpProxyConfig { diff --git a/ydb/core/tx/replication/controller/controller.cpp b/ydb/core/tx/replication/controller/controller.cpp index dcaf754e8b7..8d600042ab9 100644 --- a/ydb/core/tx/replication/controller/controller.cpp +++ b/ydb/core/tx/replication/controller/controller.cpp @@ -176,9 +176,9 @@ void TController::Handle(TEvPrivate::TEvUpdateTenantNodes::TPtr& ev, const TActo } void TController::Handle(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx) { - Y_VERIFY(ev->Get()->CachedMessageData && ev->Get()->CachedMessageData->Info); + Y_VERIFY(ev->Get()->CachedMessageData); - CLOG_T(ctx, "Handle " << ev->Get()->CachedMessageData->Info->ToString()); + CLOG_T(ctx, "Handle " << ev->Get()->ToString()); NodesManager.ProcessResponse(ev, ctx); } diff --git a/ydb/core/tx/replication/controller/nodes_manager.cpp b/ydb/core/tx/replication/controller/nodes_manager.cpp index 0a7863dfa3f..e04cbccf7a6 100644 --- a/ydb/core/tx/replication/controller/nodes_manager.cpp +++ b/ydb/core/tx/replication/controller/nodes_manager.cpp @@ -22,8 +22,9 @@ void TNodesManager::DiscoverNodes(const TString& tenant, const TActorId& cache, } void TNodesManager::ProcessResponse(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx) { - Y_VERIFY(ev->Get()->CachedMessageData && ev->Get()->CachedMessageData->Info); - Y_VERIFY(ev->Get()->CachedMessageData->Info->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok); + Y_VERIFY(ev->Get()->CachedMessageData); + Y_VERIFY(!ev->Get()->CachedMessageData->InfoEntries.empty()); + Y_VERIFY(ev->Get()->CachedMessageData->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok); auto it = NodeDiscoverers.find(ev->Sender); if (it == NodeDiscoverers.end()) { @@ -33,7 +34,7 @@ void TNodesManager::ProcessResponse(TEvDiscovery::TEvDiscoveryData::TPtr& ev, co auto& nodes = TenantNodes[it->second]; nodes.clear(); - for (const auto& [actorId, _] : ev->Get()->CachedMessageData->Info->InfoEntries) { + for (const auto& [actorId, _] : ev->Get()->CachedMessageData->InfoEntries) { nodes.insert(actorId.NodeId()); } |