aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorshumkovnd <shumkovnd@yandex-team.com>2023-06-01 19:47:36 +0300
committershumkovnd <shumkovnd@yandex-team.com>2023-06-01 19:47:36 +0300
commit1f257fb40788f27cb1952afec8343924db6a3d53 (patch)
treefcc0f149acc2e3042def70c0f5c7492e2fd56880
parent85915cf42494dc5b4fecde3c4dbde42d8811c642 (diff)
downloadydb-1f257fb40788f27cb1952afec8343924db6a3d53.tar.gz
Add subscribers to discovery cache
-rw-r--r--ydb/core/base/board_lookup.cpp21
-rw-r--r--ydb/core/base/statestorage.h27
-rw-r--r--ydb/core/discovery/discovery.cpp154
-rw-r--r--ydb/core/discovery/discovery.h12
-rw-r--r--ydb/core/grpc_services/rpc_discovery.cpp11
-rw-r--r--ydb/core/protos/config.proto1
-rw-r--r--ydb/core/tx/replication/controller/controller.cpp4
-rw-r--r--ydb/core/tx/replication/controller/nodes_manager.cpp7
8 files changed, 159 insertions, 78 deletions
diff --git a/ydb/core/base/board_lookup.cpp b/ydb/core/base/board_lookup.cpp
index d991cbc8101..94c1a59faef 100644
--- a/ydb/core/base/board_lookup.cpp
+++ b/ydb/core/base/board_lookup.cpp
@@ -64,7 +64,7 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
TVector<TReplica> Replicas;
- TMap<TActorId, TEvStateStorage::TEvBoardInfo::TInfoEntry> Info;
+ TMap<TActorId, TEvStateStorage::TBoardInfoEntry> Info;
THashMap<TActorId, THashSet<ui32>> InfoReplicas;
ui32 WaitForReplicasToSuccess;
@@ -103,11 +103,12 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
void NotAvailable() {
if (CurrentStateFunc() != &TThis::StateSubscribe) {
- Send(Owner, new TEvStateStorage::TEvBoardInfo(TEvStateStorage::TEvBoardInfo::EStatus::NotAvailable, Path));
+ Send(Owner, new TEvStateStorage::TEvBoardInfo(
+ TEvStateStorage::TEvBoardInfo::EStatus::NotAvailable, Path, StateStorageGroupId));
} else {
Send(Owner,
new TEvStateStorage::TEvBoardInfoUpdate(
- TEvStateStorage::TEvBoardInfo::EStatus::NotAvailable, Path
+ TEvStateStorage::TEvBoardInfo::EStatus::NotAvailable, Path, StateStorageGroupId
)
);
}
@@ -119,7 +120,7 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
if ((!Subscriber && Stats.HasInfo == WaitForReplicasToSuccess) ||
(Subscriber && Stats.HasInfo + Stats.NoInfo == WaitForReplicasToSuccess)) {
auto reply = MakeHolder<TEvStateStorage::TEvBoardInfo>(
- TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path);
+ TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path, StateStorageGroupId);
reply->InfoEntries = std::move(Info);
Send(Owner, std::move(reply));
if (Subscriber) {
@@ -213,7 +214,7 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
}
if (CurrentStateFunc() == &TThis::StateSubscribe) {
- std::optional<TEvStateStorage::TEvBoardInfoUpdate::TInfoEntryUpdate> update;
+ std::optional<TEvStateStorage::TBoardInfoEntry> update;
if (info.GetDropped()) {
if (!replicas.empty()) {
return;
@@ -230,7 +231,7 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
}
if (update.has_value()) {
auto reply = MakeHolder<TEvStateStorage::TEvBoardInfoUpdate>(
- TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path);
+ TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path, StateStorageGroupId);
reply->Updates = { { oid, std::move(update.value()) } };
Send(Owner, std::move(reply));
}
@@ -279,7 +280,7 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
++Stats.HasInfo;
bool isStateSubscribe = (CurrentStateFunc() == &TThis::StateSubscribe);
- TMap<TActorId, TEvStateStorage::TEvBoardInfoUpdate::TInfoEntryUpdate> updates;
+ TMap<TActorId, TEvStateStorage::TBoardInfoEntry> updates;
for (const auto &x : record.GetInfo()) {
const TActorId oid = ActorIdFromProto(x.GetOwner());
@@ -298,7 +299,7 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
if (isStateSubscribe && !updates.empty()) {
auto reply = MakeHolder<TEvStateStorage::TEvBoardInfoUpdate>(
- TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path);
+ TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path, StateStorageGroupId);
reply->Updates = std::move(updates);
Send(Owner, std::move(reply));
}
@@ -446,7 +447,7 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
void ClearInfosByReplica(ui32 replicaIdx) {
bool isStateSubscribe = (CurrentStateFunc() == &TThis::StateSubscribe);
- TMap<TActorId, TEvStateStorage::TEvBoardInfoUpdate::TInfoEntryUpdate> updates;
+ TMap<TActorId, TEvStateStorage::TBoardInfoEntry> updates;
const auto& replica = Replicas[replicaIdx];
for (auto infoId : replica.Infos) {
@@ -466,7 +467,7 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
}
if (isStateSubscribe && !updates.empty()) {
auto reply = MakeHolder<TEvStateStorage::TEvBoardInfoUpdate>(
- TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path);
+ TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path, StateStorageGroupId);
reply->Updates = std::move(updates);
Send(Owner, std::move(reply));
}
diff --git a/ydb/core/base/statestorage.h b/ydb/core/base/statestorage.h
index b7ac8c124a4..5375da33e5a 100644
--- a/ydb/core/base/statestorage.h
+++ b/ydb/core/base/statestorage.h
@@ -429,6 +429,11 @@ struct TEvStateStorage {
}
};
+ struct TBoardInfoEntry {
+ TString Payload;
+ bool Dropped = false;
+ };
+
struct TEvBoardInfo : public TEventLocal<TEvBoardInfo, EvBoardInfo> {
enum class EStatus {
Unknown,
@@ -436,40 +441,36 @@ struct TEvStateStorage {
NotAvailable,
};
- struct TInfoEntry {
- TString Payload;
- };
-
const EStatus Status;
const TString Path;
- TMap<TActorId, TInfoEntry> InfoEntries;
+ const ui32 StateStorageGroupId;
+ TMap<TActorId, TBoardInfoEntry> InfoEntries;
- TEvBoardInfo(EStatus status, const TString &path)
+ TEvBoardInfo(EStatus status, const TString &path, ui32 stateStorageGroupId)
: Status(status)
, Path(path)
+ , StateStorageGroupId(stateStorageGroupId)
{}
TEvBoardInfo(const TEvBoardInfo &x)
: Status(x.Status)
, Path(x.Path)
+ , StateStorageGroupId(x.StateStorageGroupId)
, InfoEntries(x.InfoEntries)
{}
};
struct TEvBoardInfoUpdate : public TEventLocal<TEvBoardInfoUpdate, EvBoardInfoUpdate> {
- struct TInfoEntryUpdate {
- TString Payload;
- bool Dropped = false;
- };
-
const TEvBoardInfo::EStatus Status;
const TString Path;
- TMap<TActorId,TInfoEntryUpdate> Updates;
+ const ui32 StateStorageGroupId;
+ TMap<TActorId, TBoardInfoEntry> Updates;
- TEvBoardInfoUpdate(TEvBoardInfo::EStatus status, const TString &path)
+ TEvBoardInfoUpdate(TEvBoardInfo::EStatus status, const TString &path, ui32 stateStorageGroupId)
: Status(status)
, Path(path)
+ , StateStorageGroupId(stateStorageGroupId)
{}
};
};
diff --git a/ydb/core/discovery/discovery.cpp b/ydb/core/discovery/discovery.cpp
index a124bcecda7..93dc41a003a 100644
--- a/ydb/core/discovery/discovery.cpp
+++ b/ydb/core/discovery/discovery.cpp
@@ -123,21 +123,41 @@ namespace NDiscovery {
return out;
}
- std::pair<TString, TString> CreateSerializedMessage(
- const THolder<TEvStateStorage::TEvBoardInfo>& info,
+ NDiscovery::TCachedMessageData CreateCachedMessage(
+ const TMap<TActorId, TEvStateStorage::TBoardInfoEntry>& prevInfoEntries,
+ TMap<TActorId, TEvStateStorage::TBoardInfoEntry> newInfoEntries,
TSet<TString> services,
const THolder<TEvInterconnect::TEvNodeInfo>& nameserviceResponse) {
+ TMap<TActorId, TEvStateStorage::TBoardInfoEntry> infoEntries;
+ if (prevInfoEntries.empty()) {
+ infoEntries = std::move(newInfoEntries);
+ } else {
+ infoEntries = prevInfoEntries;
+ for (auto& [actorId, info] : newInfoEntries) {
+ if (info.Dropped) {
+ infoEntries.erase(actorId);
+ } else {
+ infoEntries[actorId].Payload = std::move(info.Payload);
+ }
+ }
+ }
+
+ if (!nameserviceResponse) {
+ return {"", "", std::move(infoEntries)};
+ }
+
TStackVec<const TString*> entries;
- entries.reserve(info->InfoEntries.size());
- for (auto &xpair : info->InfoEntries)
+ entries.reserve(infoEntries.size());
+ for (auto& xpair : infoEntries) {
entries.emplace_back(&xpair.second.Payload);
+ }
Shuffle(entries.begin(), entries.end());
- Ydb::Discovery::ListEndpointsResult result;
- result.mutable_endpoints()->Reserve(info->InfoEntries.size());
+ Ydb::Discovery::ListEndpointsResult cachedMessage;
+ cachedMessage.mutable_endpoints()->Reserve(infoEntries.size());
- Ydb::Discovery::ListEndpointsResult resultSsl;
- resultSsl.mutable_endpoints()->Reserve(info->InfoEntries.size());
+ Ydb::Discovery::ListEndpointsResult cachedMessageSsl;
+ cachedMessageSsl.mutable_endpoints()->Reserve(infoEntries.size());
THashMap<TEndpointKey, TEndpointState> states;
THashMap<TEndpointKey, TEndpointState> statesSsl;
@@ -150,9 +170,9 @@ namespace NDiscovery {
}
if (entry.GetSsl()) {
- AddEndpoint(resultSsl, statesSsl, entry);
+ AddEndpoint(cachedMessageSsl, statesSsl, entry);
} else {
- AddEndpoint(result, states, entry);
+ AddEndpoint(cachedMessage, states, entry);
}
}
@@ -160,12 +180,12 @@ namespace NDiscovery {
if (nodeInfo && nodeInfo->Location.GetDataCenterId()) {
const auto &location = nodeInfo->Location.GetDataCenterId();
if (IsSafeLocationMarker(location)) {
- result.set_self_location(location);
- resultSsl.set_self_location(location);
+ cachedMessage.set_self_location(location);
+ cachedMessageSsl.set_self_location(location);
}
}
- return {SerializeResult(result), SerializeResult(resultSsl)};
+ return {SerializeResult(cachedMessage), SerializeResult(cachedMessageSsl), std::move(infoEntries)};
}
}
@@ -189,8 +209,9 @@ namespace NDiscoveryPrivate {
};
class TDiscoveryCache: public TActorBootstrapped<TDiscoveryCache> {
- THashMap<TString, std::shared_ptr<NDiscovery::TCachedMessageData>> NewCachedMessages;
- THashMap<TString, std::shared_ptr<NDiscovery::TCachedMessageData>> OldCachedMessages;
+ THashMap<TString, std::shared_ptr<NDiscovery::TCachedMessageData>> CurrentCachedMessages;
+ THashMap<TString, std::shared_ptr<NDiscovery::TCachedMessageData>> OldCachedMessages; // when subscriptions are enabled
+ THashMap<TString, std::shared_ptr<NDiscovery::TCachedMessageData>> CachedNotAvailable; // for subscriptions
THolder<TEvInterconnect::TEvNodeInfo> NameserviceResponse;
struct TWaiter {
@@ -204,9 +225,13 @@ namespace NDiscoveryPrivate {
auto Request(const TString& database, ui32 groupId) {
auto result = Requested.emplace(database, TVector<TWaiter>());
if (result.second) {
+ auto mode = EBoardLookupMode::Second;
+ if (AppData()->FeatureFlags.GetEnableSubscriptionsInDiscovery()) {
+ mode = EBoardLookupMode::Subscription;
+ }
CLOG_D("Lookup"
<< ": path# " << database);
- Register(CreateBoardLookupActor(database, SelfId(), groupId, EBoardLookupMode::Second));
+ Register(CreateBoardLookupActor(database, SelfId(), groupId, mode));
}
return result.first;
@@ -221,26 +246,56 @@ namespace NDiscoveryPrivate {
NameserviceResponse.Reset(ev->Release().Release());
}
+ void Handle(TEvStateStorage::TEvBoardInfoUpdate::TPtr& ev) {
+ CLOG_T("Handle " << ev->Get()->ToString());
+ if (!AppData()->FeatureFlags.GetEnableSubscriptionsInDiscovery()) {
+ return;
+ }
+ THolder<TEvStateStorage::TEvBoardInfoUpdate> msg = ev->Release();
+ const auto& path = msg->Path;
+
+ if (msg->Status != TEvStateStorage::TEvBoardInfo::EStatus::Ok) {
+ CurrentCachedMessages.erase(path);
+ return;
+ }
+
+ auto& currentCachedMessage = CurrentCachedMessages[path];
+
+ Y_VERIFY(currentCachedMessage);
+
+ currentCachedMessage = std::make_shared<NDiscovery::TCachedMessageData>(
+ NDiscovery::CreateCachedMessage(
+ currentCachedMessage->InfoEntries, std::move(msg->Updates), {}, NameserviceResponse)
+ );
+
+ auto it = Requested.find(path);
+ Y_VERIFY(it == Requested.end());
+ }
+
void Handle(TEvStateStorage::TEvBoardInfo::TPtr& ev) {
CLOG_T("Handle " << ev->Get()->ToString());
THolder<TEvStateStorage::TEvBoardInfo> msg = ev->Release();
const auto& path = msg->Path;
- auto newCachedData = std::make_shared<NDiscovery::TCachedMessageData>();
+ auto newCachedData = std::make_shared<NDiscovery::TCachedMessageData>(
+ NDiscovery::CreateCachedMessage({}, std::move(msg->InfoEntries), {}, NameserviceResponse)
+ );
+ newCachedData->Status = msg->Status;
- if (NameserviceResponse) {
- auto result = NDiscovery::CreateSerializedMessage(msg, {}, NameserviceResponse);
- newCachedData->CachedMessage = result.first;
- newCachedData->CachedMessageSsl = result.second;
+ if (AppData()->FeatureFlags.GetEnableSubscriptionsInDiscovery()) {
+ if (msg->Status != TEvStateStorage::TEvBoardInfo::EStatus::Ok) {
+ CurrentCachedMessages.erase(path);
+ CachedNotAvailable[path] = newCachedData;
+ } else {
+ CurrentCachedMessages[path] = newCachedData;
+ }
+ } else {
+ CurrentCachedMessages[path] = newCachedData;
+ OldCachedMessages.erase(path);
}
- newCachedData->Info = std::move(msg);
-
- OldCachedMessages.erase(path);
- NewCachedMessages.emplace(path, newCachedData);
-
if (auto it = Requested.find(path); it != Requested.end()) {
for (const auto& waiter : it->second) {
Send(waiter.ActorId,
@@ -249,6 +304,7 @@ namespace NDiscoveryPrivate {
Requested.erase(it);
}
+
if (!Scheduled) {
Scheduled = true;
Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup());
@@ -256,8 +312,14 @@ namespace NDiscoveryPrivate {
}
void Wakeup() {
- OldCachedMessages.swap(NewCachedMessages);
- NewCachedMessages.clear();
+ auto enableSubscriptions = AppData()->FeatureFlags.GetEnableSubscriptionsInDiscovery();
+
+ if (enableSubscriptions) {
+ CachedNotAvailable.clear();
+ } else {
+ OldCachedMessages.swap(CurrentCachedMessages);
+ CurrentCachedMessages.clear();
+ }
if (!OldCachedMessages.empty()) {
Scheduled = true;
@@ -272,14 +334,24 @@ namespace NDiscoveryPrivate {
const auto* msg = ev->Get();
- const auto* cachedData = NewCachedMessages.FindPtr(msg->Database);
+ auto enableSubscriptions = AppData()->FeatureFlags.GetEnableSubscriptionsInDiscovery();
+
+ const auto* cachedData = CurrentCachedMessages.FindPtr(msg->Database);
if (cachedData == nullptr) {
- cachedData = OldCachedMessages.FindPtr(msg->Database);
- if (cachedData == nullptr) {
- Request(msg->Database, msg->StateStorageId, {ev->Sender, ev->Cookie});
- return;
+ if (enableSubscriptions) {
+ cachedData = CachedNotAvailable.FindPtr(msg->Database);
+ if (cachedData == nullptr) {
+ Request(msg->Database, msg->StateStorageId, {ev->Sender, ev->Cookie});
+ return;
+ }
+ } else {
+ cachedData = OldCachedMessages.FindPtr(msg->Database);
+ if (cachedData == nullptr) {
+ Request(msg->Database, msg->StateStorageId, {ev->Sender, ev->Cookie});
+ return;
+ }
+ Request(msg->Database, msg->StateStorageId);
}
- Request(msg->Database, msg->StateStorageId);
}
Send(ev->Sender, new TEvDiscovery::TEvDiscoveryData(*cachedData), 0, ev->Cookie);
@@ -300,6 +372,7 @@ namespace NDiscoveryPrivate {
switch (ev->GetTypeRewrite()) {
hFunc(TEvPrivate::TEvRequest, Handle);
hFunc(TEvStateStorage::TEvBoardInfo, Handle);
+ hFunc(TEvStateStorage::TEvBoardInfoUpdate, Handle);
hFunc(TEvInterconnect::TEvNodeInfo, Handle);
sFunc(TEvents::TEvWakeup, Wakeup);
sFunc(TEvents::TEvPoison, PassAway);
@@ -358,10 +431,8 @@ public:
void Handle(TEvDiscovery::TEvDiscoveryData::TPtr& ev) {
Y_VERIFY(ev->Get()->CachedMessageData);
- if (ev->Get()->CachedMessageData->Info) {
- DLOG_T("Handle " << ev->Get()->CachedMessageData->Info->ToString()
- << ": cookie# " << ev->Cookie);
- }
+ DLOG_T("Handle " << ev->ToString()
+ << ": cookie# " << ev->Cookie);
if (ev->Cookie != LookupCookie) {
DLOG_D("Stale lookup response"
@@ -442,10 +513,11 @@ public:
}
}
- if (LookupResponse->CachedMessageData->Info &&
- LookupResponse->CachedMessageData->Info->Status != TEvStateStorage::TEvBoardInfo::EStatus::Ok) {
+ if (LookupResponse->CachedMessageData &&
+ (LookupResponse->CachedMessageData->InfoEntries.empty() ||
+ LookupResponse->CachedMessageData->Status != TEvStateStorage::TEvBoardInfo::EStatus::Ok)) {
DLOG_D("Lookup error"
- << ": status# " << ui64(LookupResponse->CachedMessageData->Info->Status));
+ << ": status# " << ui64(LookupResponse->CachedMessageData->Status));
return Reply(new TEvDiscovery::TEvError(TEvDiscovery::TEvError::RESOLVE_ERROR,
"Database nodes resolve failed with no certain result"));
}
diff --git a/ydb/core/discovery/discovery.h b/ydb/core/discovery/discovery.h
index 01e5d5d0926..da4d046dfbc 100644
--- a/ydb/core/discovery/discovery.h
+++ b/ydb/core/discovery/discovery.h
@@ -13,7 +13,8 @@ namespace NDiscovery {
struct TCachedMessageData {
TString CachedMessage;
TString CachedMessageSsl;
- THolder<TEvStateStorage::TEvBoardInfo> Info;
+ TMap<TActorId, TEvStateStorage::TBoardInfoEntry> InfoEntries; // OwnerId -> Payload
+ TEvStateStorage::TEvBoardInfo::EStatus Status = TEvStateStorage::TEvBoardInfo::EStatus::Ok;
};
}
@@ -54,10 +55,11 @@ struct TEvDiscovery {
};
namespace NDiscovery {
- std::pair<TString, TString> CreateSerializedMessage(
- const THolder<TEvStateStorage::TEvBoardInfo>&,
- TSet<TString>,
- const THolder<TEvInterconnect::TEvNodeInfo>&);
+ NDiscovery::TCachedMessageData CreateCachedMessage(
+ const TMap<TActorId, TEvStateStorage::TBoardInfoEntry>&,
+ TMap<TActorId, TEvStateStorage::TBoardInfoEntry>,
+ TSet<TString>,
+ const THolder<TEvInterconnect::TEvNodeInfo>&);
}
using TLookupPathFunc = std::function<TString(const TString&)>;
diff --git a/ydb/core/grpc_services/rpc_discovery.cpp b/ydb/core/grpc_services/rpc_discovery.cpp
index 820866e5c03..bec37d1ccfa 100644
--- a/ydb/core/grpc_services/rpc_discovery.cpp
+++ b/ydb/core/grpc_services/rpc_discovery.cpp
@@ -123,8 +123,8 @@ public:
if (!NameserviceResponse || !LookupResponse)
return;
- Y_VERIFY(LookupResponse->CachedMessageData->Info &&
- LookupResponse->CachedMessageData->Info->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok);
+ Y_VERIFY(LookupResponse->CachedMessageData && !LookupResponse->CachedMessageData->InfoEntries.empty() &&
+ LookupResponse->CachedMessageData->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok);
const TSet<TString> services(
Request->GetProtoRequest()->Getservice().begin(), Request->GetProtoRequest()->Getservice().end());
@@ -136,8 +136,11 @@ public:
cachedMessage = LookupResponse->CachedMessageData->CachedMessage;
cachedMessageSsl = LookupResponse->CachedMessageData->CachedMessageSsl;
} else {
- std::tie(cachedMessage, cachedMessageSsl) = NDiscovery::CreateSerializedMessage(
- LookupResponse->CachedMessageData->Info, std::move(services), NameserviceResponse);
+ auto cachedMessageData = NDiscovery::CreateCachedMessage(
+ {}, std::move(LookupResponse->CachedMessageData->InfoEntries),
+ std::move(services), NameserviceResponse);
+ cachedMessage = std::move(cachedMessageData.CachedMessage);
+ cachedMessageSsl = std::move(cachedMessageData.CachedMessageSsl);
}
if (Request->SslServer()) {
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index d37ae97b329..f23b4750c80 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -809,6 +809,7 @@ message TFeatureFlags {
optional bool EnableTopicSplitMerge = 95 [default = false];
optional bool EnableChangefeedDynamoDBStreamsFormat = 96 [default = false];
optional bool ForceColumnTablesCompositeMarks = 97 [default = false];
+ optional bool EnableSubscriptionsInDiscovery = 98 [default = false];
}
message THttpProxyConfig {
diff --git a/ydb/core/tx/replication/controller/controller.cpp b/ydb/core/tx/replication/controller/controller.cpp
index dcaf754e8b7..8d600042ab9 100644
--- a/ydb/core/tx/replication/controller/controller.cpp
+++ b/ydb/core/tx/replication/controller/controller.cpp
@@ -176,9 +176,9 @@ void TController::Handle(TEvPrivate::TEvUpdateTenantNodes::TPtr& ev, const TActo
}
void TController::Handle(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx) {
- Y_VERIFY(ev->Get()->CachedMessageData && ev->Get()->CachedMessageData->Info);
+ Y_VERIFY(ev->Get()->CachedMessageData);
- CLOG_T(ctx, "Handle " << ev->Get()->CachedMessageData->Info->ToString());
+ CLOG_T(ctx, "Handle " << ev->Get()->ToString());
NodesManager.ProcessResponse(ev, ctx);
}
diff --git a/ydb/core/tx/replication/controller/nodes_manager.cpp b/ydb/core/tx/replication/controller/nodes_manager.cpp
index 0a7863dfa3f..e04cbccf7a6 100644
--- a/ydb/core/tx/replication/controller/nodes_manager.cpp
+++ b/ydb/core/tx/replication/controller/nodes_manager.cpp
@@ -22,8 +22,9 @@ void TNodesManager::DiscoverNodes(const TString& tenant, const TActorId& cache,
}
void TNodesManager::ProcessResponse(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx) {
- Y_VERIFY(ev->Get()->CachedMessageData && ev->Get()->CachedMessageData->Info);
- Y_VERIFY(ev->Get()->CachedMessageData->Info->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok);
+ Y_VERIFY(ev->Get()->CachedMessageData);
+ Y_VERIFY(!ev->Get()->CachedMessageData->InfoEntries.empty());
+ Y_VERIFY(ev->Get()->CachedMessageData->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok);
auto it = NodeDiscoverers.find(ev->Sender);
if (it == NodeDiscoverers.end()) {
@@ -33,7 +34,7 @@ void TNodesManager::ProcessResponse(TEvDiscovery::TEvDiscoveryData::TPtr& ev, co
auto& nodes = TenantNodes[it->second];
nodes.clear();
- for (const auto& [actorId, _] : ev->Get()->CachedMessageData->Info->InfoEntries) {
+ for (const auto& [actorId, _] : ev->Get()->CachedMessageData->InfoEntries) {
nodes.insert(actorId.NodeId());
}