diff options
author | shumkovnd <shumkovnd@yandex-team.com> | 2023-04-27 19:47:45 +0300 |
---|---|---|
committer | shumkovnd <shumkovnd@yandex-team.com> | 2023-04-27 19:47:45 +0300 |
commit | 500dfab0737be786ff38f6cd96a0572f0f7ebf23 (patch) | |
tree | ea4e8b8b1f69145dc2dd2a130730084e471a6d3c | |
parent | bf8606832efaa409859a241235f10c6100a20bb9 (diff) | |
download | ydb-500dfab0737be786ff38f6cd96a0572f0f7ebf23.tar.gz |
add cached message for list endpoints
-rw-r--r-- | ydb/core/discovery/discovery.cpp | 243 | ||||
-rw-r--r-- | ydb/core/discovery/discovery.h | 35 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_discovery.cpp | 127 | ||||
-rw-r--r-- | ydb/core/tx/replication/controller/controller.cpp | 9 | ||||
-rw-r--r-- | ydb/core/tx/replication/controller/controller_impl.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/replication/controller/nodes_manager.cpp | 11 | ||||
-rw-r--r-- | ydb/core/tx/replication/controller/nodes_manager.h | 2 |
7 files changed, 286 insertions, 143 deletions
diff --git a/ydb/core/discovery/discovery.cpp b/ydb/core/discovery/discovery.cpp index d513f2a3b38..70436d48536 100644 --- a/ydb/core/discovery/discovery.cpp +++ b/ydb/core/discovery/discovery.cpp @@ -4,11 +4,15 @@ #include <ydb/core/base/path.h> #include <ydb/core/base/statestorage.h> #include <ydb/core/tx/scheme_cache/scheme_cache.h> +#include <ydb/public/api/protos/ydb_discovery.pb.h> #include <library/cpp/actors/core/actor.h> #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/hfunc.h> +#include <util/generic/xrange.h> +#include <util/random/shuffle.h> + #define LOG_T(service, stream) LOG_TRACE_S(*TlsActivationContext, service, stream) #define LOG_D(service, stream) LOG_DEBUG_S(*TlsActivationContext, service, stream) @@ -20,6 +24,151 @@ namespace NKikimr { +namespace NDiscovery { + using TEndpointKey = std::pair<TString, ui32>; + struct TEndpointState { + int Index = -1; + int Count = 0; + float LoadFactor = 0; + THashSet<TString> Locations; + THashSet<TString> Services; + }; + + bool CheckServices(const TSet<TString> &req, const NKikimrStateStorage::TEndpointBoardEntry &entry) { + if (req.empty()) + return true; + + for (const auto &x : entry.GetServices()) + if (req.count(x)) + return true; + + return false; + } + + bool IsSafeLocationMarker(TStringBuf location) { + const ui8* isrc = reinterpret_cast<const ui8*>(location.data()); + for (auto idx : xrange(location.size())) { + if (isrc[idx] >= 0x80) + return false; + } + return true; + } + + void AddEndpoint( + Ydb::Discovery::ListEndpointsResult& result, + THashMap<TEndpointKey, TEndpointState>& states, + const NKikimrStateStorage::TEndpointBoardEntry& entry) { + Ydb::Discovery::EndpointInfo *xres; + + auto& state = states[TEndpointKey(entry.GetAddress(), entry.GetPort())]; + if (state.Index >= 0) { + xres = result.mutable_endpoints(state.Index); + ++state.Count; + // FIXME: do we want a mean or a sum here? + // xres->set_load_factor(xres->load_factor() + (entry.GetLoad() - xres->load_factor()) / state.Count); + xres->set_load_factor(xres->load_factor() + entry.GetLoad()); + } else { + state.Index = result.endpoints_size(); + state.Count = 1; + xres = result.add_endpoints(); + xres->set_address(entry.GetAddress()); + xres->set_port(entry.GetPort()); + if (entry.GetSsl()) + xres->set_ssl(true); + xres->set_load_factor(entry.GetLoad()); + xres->set_node_id(entry.GetNodeId()); + if (entry.AddressesV4Size()) { + xres->mutable_ip_v4()->Reserve(entry.AddressesV4Size()); + for (const auto& addr : entry.GetAddressesV4()) { + xres->add_ip_v4(addr); + } + } + if (entry.AddressesV6Size()) { + xres->mutable_ip_v6()->Reserve(entry.AddressesV6Size()); + for (const auto& addr : entry.GetAddressesV6()) { + xres->add_ip_v6(addr); + } + } + xres->set_ssl_target_name_override(entry.GetTargetNameOverride()); + } + + if (IsSafeLocationMarker(entry.GetDataCenter())) { + if (state.Locations.insert(entry.GetDataCenter()).second) { + if (xres->location().empty()) { + xres->set_location(entry.GetDataCenter()); + } else { + xres->set_location(xres->location() + "/" + entry.GetDataCenter()); + } + } + } + + for (auto &service : entry.GetServices()) { + if (state.Services.insert(service).second) { + xres->add_service(service); + } + } + } + + TString SerializeResult(const Ydb::Discovery::ListEndpointsResult& result) { + Ydb::Discovery::ListEndpointsResponse response; + TString out; + auto deferred = response.mutable_operation(); + deferred->set_ready(true); + deferred->set_status(Ydb::StatusIds::SUCCESS); + + auto data = deferred->mutable_result(); + data->PackFrom(result); + + Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out); + return out; + } + + std::pair<TString, TString> CreateSerializedMessage( + const THolder<TEvStateStorage::TEvBoardInfo>& info, + TSet<TString> services, + const THolder<TEvInterconnect::TEvNodeInfo>& nameserviceResponse) { + TStackVec<const TString*> entries; + entries.reserve(info->InfoEntries.size()); + for (auto &xpair : info->InfoEntries) + entries.emplace_back(&xpair.second.Payload); + Shuffle(entries.begin(), entries.end()); + + Ydb::Discovery::ListEndpointsResult result; + result.mutable_endpoints()->Reserve(info->InfoEntries.size()); + + Ydb::Discovery::ListEndpointsResult resultSsl; + resultSsl.mutable_endpoints()->Reserve(info->InfoEntries.size()); + + THashMap<TEndpointKey, TEndpointState> states; + THashMap<TEndpointKey, TEndpointState> statesSsl; + + NKikimrStateStorage::TEndpointBoardEntry entry; + for (const TString *xpayload : entries) { + Y_PROTOBUF_SUPPRESS_NODISCARD entry.ParseFromString(*xpayload); + if (!CheckServices(services, entry)) { + continue; + } + + if (entry.GetSsl()) { + AddEndpoint(resultSsl, statesSsl, entry); + } else { + AddEndpoint(result, states, entry); + } + } + + const auto &nodeInfo = nameserviceResponse->Node; + if (nodeInfo && nodeInfo->Location.GetDataCenterId()) { + const auto &location = nodeInfo->Location.GetDataCenterId(); + if (IsSafeLocationMarker(location)) { + result.set_self_location(location); + resultSsl.set_self_location(location); + } + } + + return {SerializeResult(result), SerializeResult(resultSsl)}; + } +} + namespace NDiscoveryPrivate { struct TEvPrivate { enum EEv { @@ -39,9 +188,10 @@ namespace NDiscoveryPrivate { }; }; - class TDiscoveryCache: public TActor<TDiscoveryCache> { - THashMap<TString, THolder<TEvStateStorage::TEvBoardInfo>> OldInfo; - THashMap<TString, THolder<TEvStateStorage::TEvBoardInfo>> NewInfo; + class TDiscoveryCache: public TActorBootstrapped<TDiscoveryCache> { + THashMap<TString, std::shared_ptr<NDiscovery::TCachedMessageData>> NewCachedMessages; + THashMap<TString, std::shared_ptr<NDiscovery::TCachedMessageData>> OldCachedMessages; + THolder<TEvInterconnect::TEvNodeInfo> NameserviceResponse; struct TWaiter { TActorId ActorId; @@ -67,23 +217,38 @@ namespace NDiscoveryPrivate { it->second.push_back(waiter); } + void Handle(TEvInterconnect::TEvNodeInfo::TPtr &ev) { + NameserviceResponse.Reset(ev->Release().Release()); + } + void Handle(TEvStateStorage::TEvBoardInfo::TPtr& ev) { CLOG_T("Handle " << ev->Get()->ToString()); THolder<TEvStateStorage::TEvBoardInfo> msg = ev->Release(); const auto& path = msg->Path; + auto newCachedData = std::make_shared<NDiscovery::TCachedMessageData>(); + + if (NameserviceResponse) { + auto result = NDiscovery::CreateSerializedMessage(msg, {}, NameserviceResponse); + + newCachedData->CachedMessage = result.first; + newCachedData->CachedMessageSsl = result.second; + } + + newCachedData->Info = std::move(msg); + + OldCachedMessages.erase(path); + NewCachedMessages.emplace(path, newCachedData); + if (auto it = Requested.find(path); it != Requested.end()) { for (const auto& waiter : it->second) { - Send(waiter.ActorId, new TEvStateStorage::TEvBoardInfo(*msg), 0, waiter.Cookie); + Send(waiter.ActorId, + new TEvDiscovery::TEvDiscoveryData(newCachedData), 0, waiter.Cookie); } - Requested.erase(it); } - OldInfo.erase(path); - NewInfo.emplace(path, std::move(msg)); - if (!Scheduled) { Scheduled = true; Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup()); @@ -91,10 +256,10 @@ namespace NDiscoveryPrivate { } void Wakeup() { - OldInfo.swap(NewInfo); - NewInfo.clear(); + OldCachedMessages.swap(NewCachedMessages); + NewCachedMessages.clear(); - if (!OldInfo.empty()) { + if (!OldCachedMessages.empty()) { Scheduled = true; Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup()); } else { @@ -107,18 +272,17 @@ namespace NDiscoveryPrivate { const auto* msg = ev->Get(); - if (const auto* x = NewInfo.FindPtr(msg->Database)) { - Send(ev->Sender, new TEvStateStorage::TEvBoardInfo(**x), 0, ev->Cookie); - return; - } - - if (const auto* x = OldInfo.FindPtr(msg->Database)) { + const auto* cachedData = NewCachedMessages.FindPtr(msg->Database); + if (cachedData == nullptr) { + cachedData = OldCachedMessages.FindPtr(msg->Database); + if (cachedData == nullptr) { + Request(msg->Database, msg->StateStorageId, {ev->Sender, ev->Cookie}); + return; + } Request(msg->Database, msg->StateStorageId); - Send(ev->Sender, new TEvStateStorage::TEvBoardInfo(**x), 0, ev->Cookie); - return; } - Request(msg->Database, msg->StateStorageId, {ev->Sender, ev->Cookie}); + Send(ev->Sender, new TEvDiscovery::TEvDiscoveryData(*cachedData), 0, ev->Cookie); } public: @@ -126,15 +290,17 @@ namespace NDiscoveryPrivate { return NKikimrServices::TActivity::DISCOVERY_CACHE_ACTOR; } - TDiscoveryCache() - : TActor(&TThis::StateWork) - { + void Bootstrap() { + Send(GetNameserviceActorId(), new TEvInterconnect::TEvGetNode(SelfId().NodeId())); + + Become(&TThis::StateWork); } STATEFN(StateWork) { switch (ev->GetTypeRewrite()) { hFunc(TEvPrivate::TEvRequest, Handle); hFunc(TEvStateStorage::TEvBoardInfo, Handle); + hFunc(TEvInterconnect::TEvNodeInfo, Handle); sFunc(TEvents::TEvWakeup, Wakeup); sFunc(TEvents::TEvPoison, PassAway); } @@ -148,7 +314,7 @@ class TDiscoverer: public TActorBootstrapped<TDiscoverer> { const TActorId ReplyTo; const TActorId CacheId; - THolder<TEvStateStorage::TEvBoardInfo> LookupResponse; + THolder<TEvDiscovery::TEvDiscoveryData> LookupResponse; THolder<TEvTxProxySchemeCache::TEvNavigateKeySetResult> SchemeCacheResponse; bool ResolveResources = false; @@ -164,7 +330,9 @@ public: return NKikimrServices::TActivity::DISCOVERY_ACTOR; } - explicit TDiscoverer(TLookupPathFunc f, const TString& database, const TActorId& replyTo, const TActorId& cacheId) + explicit TDiscoverer( + TLookupPathFunc f, const TString& database, + const TActorId& replyTo, const TActorId& cacheId) : MakeLookupPath(f) , Database(database) , ReplyTo(replyTo) @@ -181,15 +349,19 @@ public: STATEFN(StateWork) { switch (ev->GetTypeRewrite()) { - hFunc(TEvStateStorage::TEvBoardInfo, Handle); + hFunc(TEvDiscovery::TEvDiscoveryData, Handle); hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); sFunc(TEvents::TEvPoison, PassAway); } } - void Handle(TEvStateStorage::TEvBoardInfo::TPtr& ev) { - DLOG_T("Handle " << LookupResponse->ToString() - << ": cookie# " << ev->Cookie); + void Handle(TEvDiscovery::TEvDiscoveryData::TPtr& ev) { + Y_VERIFY(ev->Get()->CachedMessageData); + + if (ev->Get()->CachedMessageData->Info) { + DLOG_T("Handle " << ev->Get()->CachedMessageData->Info->ToString() + << ": cookie# " << ev->Cookie); + } if (ev->Cookie != LookupCookie) { DLOG_D("Stale lookup response" @@ -270,14 +442,15 @@ public: } } - if (LookupResponse->Status != TEvStateStorage::TEvBoardInfo::EStatus::Ok) { + if (LookupResponse->CachedMessageData->Info && + LookupResponse->CachedMessageData->Info->Status != TEvStateStorage::TEvBoardInfo::EStatus::Ok) { DLOG_D("Lookup error" - << ": status# " << ui64(LookupResponse->Status)); + << ": status# " << ui64(LookupResponse->CachedMessageData->Info->Status)); return Reply(new TEvDiscovery::TEvError(TEvDiscovery::TEvError::RESOLVE_ERROR, "Database nodes resolve failed with no certain result")); } - Reply(new TEvStateStorage::TEvBoardInfo(*LookupResponse)); + Reply(LookupResponse.Release()); } void Lookup(const TString& db) { @@ -336,7 +509,11 @@ public: } }; -IActor* CreateDiscoverer(TLookupPathFunc f, const TString& database, const TActorId& replyTo, const TActorId& cacheId) { +IActor* CreateDiscoverer( + TLookupPathFunc f, + const TString& database, + const TActorId& replyTo, + const TActorId& cacheId) { return new TDiscoverer(f, database, replyTo, cacheId); } diff --git a/ydb/core/discovery/discovery.h b/ydb/core/discovery/discovery.h index 5174fa4e07c..01e5d5d0926 100644 --- a/ydb/core/discovery/discovery.h +++ b/ydb/core/discovery/discovery.h @@ -1,14 +1,27 @@ #pragma once #include <ydb/core/base/events.h> +#include <ydb/core/base/statestorage.h> #include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/interconnect.h> +#include <library/cpp/actors/interconnect/interconnect.h> namespace NKikimr { +namespace NDiscovery { + struct TCachedMessageData { + TString CachedMessage; + TString CachedMessageSsl; + THolder<TEvStateStorage::TEvBoardInfo> Info; + }; +} + struct TEvDiscovery { + enum EEv { EvError = EventSpaceBegin(TKikimrEvents::ES_DISCOVERY), + EvDiscoveryData, EvEnd }; @@ -29,14 +42,34 @@ struct TEvDiscovery { { } }; + + struct TEvDiscoveryData : public TEventLocal<TEvDiscoveryData, EvDiscoveryData> { + std::shared_ptr<const NDiscovery::TCachedMessageData> CachedMessageData; + + TEvDiscoveryData( + std::shared_ptr<const NDiscovery::TCachedMessageData> cachedMessageData) + : CachedMessageData(std::move(cachedMessageData)) + {} + }; }; +namespace NDiscovery { + std::pair<TString, TString> CreateSerializedMessage( + const THolder<TEvStateStorage::TEvBoardInfo>&, + TSet<TString>, + const THolder<TEvInterconnect::TEvNodeInfo>&); +} + using TLookupPathFunc = std::function<TString(const TString&)>; // Reply with: // - in case of success: TEvStateStorage::TEvBoardInfo // - otherwise: TEvDiscovery::TEvError -IActor* CreateDiscoverer(TLookupPathFunc f, const TString& database, const TActorId& replyTo, const TActorId& cacheId); +IActor* CreateDiscoverer( + TLookupPathFunc f, + const TString& database, + const TActorId& replyTo, + const TActorId& cacheId); // Used to reduce number of requests to Board IActor* CreateDiscoveryCache(); diff --git a/ydb/core/grpc_services/rpc_discovery.cpp b/ydb/core/grpc_services/rpc_discovery.cpp index ee1285e4ab0..820866e5c03 100644 --- a/ydb/core/grpc_services/rpc_discovery.cpp +++ b/ydb/core/grpc_services/rpc_discovery.cpp @@ -26,7 +26,7 @@ class TListEndpointsRPC : public TActorBootstrapped<TListEndpointsRPC> { const TActorId CacheId; TActorId Discoverer; - THolder<TEvStateStorage::TEvBoardInfo> LookupResponse; + THolder<TEvDiscovery::TEvDiscoveryData> LookupResponse; THolder<TEvInterconnect::TEvNodeInfo> NameserviceResponse; public: @@ -60,13 +60,14 @@ public: STATEFN(StateWait) { switch (ev->GetTypeRewrite()) { - hFunc(TEvStateStorage::TEvBoardInfo, Handle); + hFunc(TEvDiscovery::TEvDiscoveryData, Handle); hFunc(TEvInterconnect::TEvNodeInfo, Handle); hFunc(TEvDiscovery::TEvError, Handle); } } - void Handle(TEvStateStorage::TEvBoardInfo::TPtr &ev) { + void Handle(TEvDiscovery::TEvDiscoveryData::TPtr &ev) { + Y_VERIFY(ev->Get()->CachedMessageData); Discoverer = {}; LookupResponse.Reset(ev->Release().Release()); @@ -122,98 +123,33 @@ public: if (!NameserviceResponse || !LookupResponse) return; - Y_VERIFY(LookupResponse->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok); - - TStackVec<const TString*> entries; - entries.reserve(LookupResponse->InfoEntries.size()); - for (auto &xpair : LookupResponse->InfoEntries) - entries.emplace_back(&xpair.second.Payload); - Shuffle(entries.begin(), entries.end()); - - auto *result = TEvListEndpointsRequest::AllocateResult<Ydb::Discovery::ListEndpointsResult>(Request); - result->mutable_endpoints()->Reserve(LookupResponse->InfoEntries.size()); - - const TSet<TString> services(Request->GetProtoRequest()->Getservice().begin(), Request->GetProtoRequest()->Getservice().end()); - const bool sslServer = Request->SslServer(); - - using TEndpointKey = std::pair<TString, ui32>; - struct TEndpointState { - int Index = -1; - int Count = 0; - float LoadFactor = 0; - THashSet<TString> Locations; - THashSet<TString> Services; - }; - THashMap<TEndpointKey, TEndpointState> states; - - NKikimrStateStorage::TEndpointBoardEntry entry; - for (const TString *xpayload : entries) { - Y_PROTOBUF_SUPPRESS_NODISCARD entry.ParseFromString(*xpayload); - if (!CheckServices(services, entry)) - continue; - - if (entry.GetSsl() != sslServer) - continue; - - Ydb::Discovery::EndpointInfo *xres; - - auto& state = states[TEndpointKey(entry.GetAddress(), entry.GetPort())]; - if (state.Index >= 0) { - xres = result->mutable_endpoints(state.Index); - ++state.Count; - // FIXME: do we want a mean or a sum here? - // xres->set_load_factor(xres->load_factor() + (entry.GetLoad() - xres->load_factor()) / state.Count); - xres->set_load_factor(xres->load_factor() + entry.GetLoad()); - } else { - state.Index = result->endpoints_size(); - state.Count = 1; - xres = result->add_endpoints(); - xres->set_address(entry.GetAddress()); - xres->set_port(entry.GetPort()); - if (entry.GetSsl()) - xres->set_ssl(true); - xres->set_load_factor(entry.GetLoad()); - xres->set_node_id(entry.GetNodeId()); - if (entry.AddressesV4Size()) { - xres->mutable_ip_v4()->Reserve(entry.AddressesV4Size()); - for (const auto& addr : entry.GetAddressesV4()) { - xres->add_ip_v4(addr); - } - } - if (entry.AddressesV6Size()) { - xres->mutable_ip_v6()->Reserve(entry.AddressesV6Size()); - for (const auto& addr : entry.GetAddressesV6()) { - xres->add_ip_v6(addr); - } - } - xres->set_ssl_target_name_override(entry.GetTargetNameOverride()); - } - - if (IsSafeLocationMarker(entry.GetDataCenter())) { - if (state.Locations.insert(entry.GetDataCenter()).second) { - if (xres->location().empty()) { - xres->set_location(entry.GetDataCenter()); - } else { - xres->set_location(xres->location() + "/" + entry.GetDataCenter()); - } - } - } - - for (auto &service : entry.GetServices()) { - if (state.Services.insert(service).second) { - xres->add_service(service); - } - } + Y_VERIFY(LookupResponse->CachedMessageData->Info && + LookupResponse->CachedMessageData->Info->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok); + + const TSet<TString> services( + Request->GetProtoRequest()->Getservice().begin(), Request->GetProtoRequest()->Getservice().end()); + + TString cachedMessage, cachedMessageSsl; + + if (services.empty() && !LookupResponse->CachedMessageData->CachedMessage.empty() && + !LookupResponse->CachedMessageData->CachedMessageSsl.empty()) { + cachedMessage = LookupResponse->CachedMessageData->CachedMessage; + cachedMessageSsl = LookupResponse->CachedMessageData->CachedMessageSsl; + } else { + std::tie(cachedMessage, cachedMessageSsl) = NDiscovery::CreateSerializedMessage( + LookupResponse->CachedMessageData->Info, std::move(services), NameserviceResponse); } - auto &nodeInfo = NameserviceResponse->Node; - if (nodeInfo && nodeInfo->Location.GetDataCenterId()) { - const auto &location = nodeInfo->Location.GetDataCenterId(); - if (IsSafeLocationMarker(location)) - result->set_self_location(location); + if (Request->SslServer()) { + ReplySerialized(std::move(cachedMessageSsl), Ydb::StatusIds::SUCCESS); + } else { + ReplySerialized(std::move(cachedMessage), Ydb::StatusIds::SUCCESS); } + } - Reply(*result, Ydb::StatusIds::SUCCESS); + void ReplySerialized(TString message, Ydb::StatusIds::StatusCode status) { + Request->SendSerializedResult(std::move(message), status); + PassAway(); } template <typename... Args> @@ -221,15 +157,6 @@ public: Request->SendResult(std::forward<Args>(args)...); PassAway(); } - - bool IsSafeLocationMarker(TStringBuf location) { - const ui8* isrc = reinterpret_cast<const ui8*>(location.data()); - for (auto idx : xrange(location.size())) { - if (isrc[idx] >= 0x80) - return false; - } - return true; - } }; void TGRpcRequestProxy::Handle(TEvListEndpointsRequest::TPtr& ev, const TActorContext& ctx) { diff --git a/ydb/core/tx/replication/controller/controller.cpp b/ydb/core/tx/replication/controller/controller.cpp index ad412f9d248..dcaf754e8b7 100644 --- a/ydb/core/tx/replication/controller/controller.cpp +++ b/ydb/core/tx/replication/controller/controller.cpp @@ -59,7 +59,7 @@ STFUNC(TController::StateWork) { HFunc(TEvPrivate::TEvDropDstResult, Handle); HFunc(TEvPrivate::TEvResolveTenantResult, Handle); HFunc(TEvPrivate::TEvUpdateTenantNodes, Handle); - HFunc(TEvStateStorage::TEvBoardInfo, Handle); + HFunc(TEvDiscovery::TEvDiscoveryData, Handle); HFunc(TEvDiscovery::TEvError, Handle); HFunc(TEvents::TEvPoison, Handle); } @@ -175,8 +175,11 @@ void TController::Handle(TEvPrivate::TEvUpdateTenantNodes::TPtr& ev, const TActo } } -void TController::Handle(TEvStateStorage::TEvBoardInfo::TPtr& ev, const TActorContext& ctx) { - CLOG_T(ctx, "Handle " << ev->Get()->ToString()); +void TController::Handle(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx) { + Y_VERIFY(ev->Get()->CachedMessageData && ev->Get()->CachedMessageData->Info); + + CLOG_T(ctx, "Handle " << ev->Get()->CachedMessageData->Info->ToString()); + NodesManager.ProcessResponse(ev, ctx); } diff --git a/ydb/core/tx/replication/controller/controller_impl.h b/ydb/core/tx/replication/controller/controller_impl.h index 1e521293e6e..784b06d17ed 100644 --- a/ydb/core/tx/replication/controller/controller_impl.h +++ b/ydb/core/tx/replication/controller/controller_impl.h @@ -72,7 +72,7 @@ private: void Handle(TEvPrivate::TEvDropDstResult::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvResolveTenantResult::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvUpdateTenantNodes::TPtr& ev, const TActorContext& ctx); - void Handle(TEvStateStorage::TEvBoardInfo::TPtr& ev, const TActorContext& ctx); + void Handle(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx); void Handle(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx); void Handle(TEvents::TEvPoison::TPtr& ev, const TActorContext& ctx); diff --git a/ydb/core/tx/replication/controller/nodes_manager.cpp b/ydb/core/tx/replication/controller/nodes_manager.cpp index 40877c1815e..0a7863dfa3f 100644 --- a/ydb/core/tx/replication/controller/nodes_manager.cpp +++ b/ydb/core/tx/replication/controller/nodes_manager.cpp @@ -16,11 +16,14 @@ const THashSet<ui32>& TNodesManager::GetNodes(const TString& tenant) const { void TNodesManager::DiscoverNodes(const TString& tenant, const TActorId& cache, const TActorContext& ctx) { TenantNodes.emplace(tenant, THashSet<ui32>()); - NodeDiscoverers.emplace(ctx.Register(CreateDiscoverer(&NService::MakeDiscoveryPath, tenant, ctx.SelfID, cache)), tenant); + NodeDiscoverers.emplace( + ctx.Register(CreateDiscoverer(&NService::MakeDiscoveryPath, tenant, ctx.SelfID, cache)), tenant + ); } -void TNodesManager::ProcessResponse(TEvStateStorage::TEvBoardInfo::TPtr& ev, const TActorContext& ctx) { - Y_VERIFY(ev->Get()->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok); +void TNodesManager::ProcessResponse(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx) { + Y_VERIFY(ev->Get()->CachedMessageData && ev->Get()->CachedMessageData->Info); + Y_VERIFY(ev->Get()->CachedMessageData->Info->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok); auto it = NodeDiscoverers.find(ev->Sender); if (it == NodeDiscoverers.end()) { @@ -30,7 +33,7 @@ void TNodesManager::ProcessResponse(TEvStateStorage::TEvBoardInfo::TPtr& ev, con auto& nodes = TenantNodes[it->second]; nodes.clear(); - for (const auto& [actorId, _] : ev->Get()->InfoEntries) { + for (const auto& [actorId, _] : ev->Get()->CachedMessageData->Info->InfoEntries) { nodes.insert(actorId.NodeId()); } diff --git a/ydb/core/tx/replication/controller/nodes_manager.h b/ydb/core/tx/replication/controller/nodes_manager.h index d07c0de826d..36a4598300c 100644 --- a/ydb/core/tx/replication/controller/nodes_manager.h +++ b/ydb/core/tx/replication/controller/nodes_manager.h @@ -17,7 +17,7 @@ public: const THashSet<ui32>& GetNodes(const TString& tenant) const; void DiscoverNodes(const TString& tenant, const TActorId& cache, const TActorContext& ctx); - void ProcessResponse(TEvStateStorage::TEvBoardInfo::TPtr& ev, const TActorContext& ctx); + void ProcessResponse(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx); void ProcessResponse(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx); void Shutdown(const TActorContext& ctx); |