aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorshumkovnd <shumkovnd@yandex-team.com>2023-04-27 19:47:45 +0300
committershumkovnd <shumkovnd@yandex-team.com>2023-04-27 19:47:45 +0300
commit500dfab0737be786ff38f6cd96a0572f0f7ebf23 (patch)
treeea4e8b8b1f69145dc2dd2a130730084e471a6d3c
parentbf8606832efaa409859a241235f10c6100a20bb9 (diff)
downloadydb-500dfab0737be786ff38f6cd96a0572f0f7ebf23.tar.gz
add cached message for list endpoints
-rw-r--r--ydb/core/discovery/discovery.cpp243
-rw-r--r--ydb/core/discovery/discovery.h35
-rw-r--r--ydb/core/grpc_services/rpc_discovery.cpp127
-rw-r--r--ydb/core/tx/replication/controller/controller.cpp9
-rw-r--r--ydb/core/tx/replication/controller/controller_impl.h2
-rw-r--r--ydb/core/tx/replication/controller/nodes_manager.cpp11
-rw-r--r--ydb/core/tx/replication/controller/nodes_manager.h2
7 files changed, 286 insertions, 143 deletions
diff --git a/ydb/core/discovery/discovery.cpp b/ydb/core/discovery/discovery.cpp
index d513f2a3b38..70436d48536 100644
--- a/ydb/core/discovery/discovery.cpp
+++ b/ydb/core/discovery/discovery.cpp
@@ -4,11 +4,15 @@
#include <ydb/core/base/path.h>
#include <ydb/core/base/statestorage.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
+#include <ydb/public/api/protos/ydb_discovery.pb.h>
#include <library/cpp/actors/core/actor.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/hfunc.h>
+#include <util/generic/xrange.h>
+#include <util/random/shuffle.h>
+
#define LOG_T(service, stream) LOG_TRACE_S(*TlsActivationContext, service, stream)
#define LOG_D(service, stream) LOG_DEBUG_S(*TlsActivationContext, service, stream)
@@ -20,6 +24,151 @@
namespace NKikimr {
+namespace NDiscovery {
+ using TEndpointKey = std::pair<TString, ui32>;
+ struct TEndpointState {
+ int Index = -1;
+ int Count = 0;
+ float LoadFactor = 0;
+ THashSet<TString> Locations;
+ THashSet<TString> Services;
+ };
+
+ bool CheckServices(const TSet<TString> &req, const NKikimrStateStorage::TEndpointBoardEntry &entry) {
+ if (req.empty())
+ return true;
+
+ for (const auto &x : entry.GetServices())
+ if (req.count(x))
+ return true;
+
+ return false;
+ }
+
+ bool IsSafeLocationMarker(TStringBuf location) {
+ const ui8* isrc = reinterpret_cast<const ui8*>(location.data());
+ for (auto idx : xrange(location.size())) {
+ if (isrc[idx] >= 0x80)
+ return false;
+ }
+ return true;
+ }
+
+ void AddEndpoint(
+ Ydb::Discovery::ListEndpointsResult& result,
+ THashMap<TEndpointKey, TEndpointState>& states,
+ const NKikimrStateStorage::TEndpointBoardEntry& entry) {
+ Ydb::Discovery::EndpointInfo *xres;
+
+ auto& state = states[TEndpointKey(entry.GetAddress(), entry.GetPort())];
+ if (state.Index >= 0) {
+ xres = result.mutable_endpoints(state.Index);
+ ++state.Count;
+ // FIXME: do we want a mean or a sum here?
+ // xres->set_load_factor(xres->load_factor() + (entry.GetLoad() - xres->load_factor()) / state.Count);
+ xres->set_load_factor(xres->load_factor() + entry.GetLoad());
+ } else {
+ state.Index = result.endpoints_size();
+ state.Count = 1;
+ xres = result.add_endpoints();
+ xres->set_address(entry.GetAddress());
+ xres->set_port(entry.GetPort());
+ if (entry.GetSsl())
+ xres->set_ssl(true);
+ xres->set_load_factor(entry.GetLoad());
+ xres->set_node_id(entry.GetNodeId());
+ if (entry.AddressesV4Size()) {
+ xres->mutable_ip_v4()->Reserve(entry.AddressesV4Size());
+ for (const auto& addr : entry.GetAddressesV4()) {
+ xres->add_ip_v4(addr);
+ }
+ }
+ if (entry.AddressesV6Size()) {
+ xres->mutable_ip_v6()->Reserve(entry.AddressesV6Size());
+ for (const auto& addr : entry.GetAddressesV6()) {
+ xres->add_ip_v6(addr);
+ }
+ }
+ xres->set_ssl_target_name_override(entry.GetTargetNameOverride());
+ }
+
+ if (IsSafeLocationMarker(entry.GetDataCenter())) {
+ if (state.Locations.insert(entry.GetDataCenter()).second) {
+ if (xres->location().empty()) {
+ xres->set_location(entry.GetDataCenter());
+ } else {
+ xres->set_location(xres->location() + "/" + entry.GetDataCenter());
+ }
+ }
+ }
+
+ for (auto &service : entry.GetServices()) {
+ if (state.Services.insert(service).second) {
+ xres->add_service(service);
+ }
+ }
+ }
+
+ TString SerializeResult(const Ydb::Discovery::ListEndpointsResult& result) {
+ Ydb::Discovery::ListEndpointsResponse response;
+ TString out;
+ auto deferred = response.mutable_operation();
+ deferred->set_ready(true);
+ deferred->set_status(Ydb::StatusIds::SUCCESS);
+
+ auto data = deferred->mutable_result();
+ data->PackFrom(result);
+
+ Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);
+ return out;
+ }
+
+ std::pair<TString, TString> CreateSerializedMessage(
+ const THolder<TEvStateStorage::TEvBoardInfo>& info,
+ TSet<TString> services,
+ const THolder<TEvInterconnect::TEvNodeInfo>& nameserviceResponse) {
+ TStackVec<const TString*> entries;
+ entries.reserve(info->InfoEntries.size());
+ for (auto &xpair : info->InfoEntries)
+ entries.emplace_back(&xpair.second.Payload);
+ Shuffle(entries.begin(), entries.end());
+
+ Ydb::Discovery::ListEndpointsResult result;
+ result.mutable_endpoints()->Reserve(info->InfoEntries.size());
+
+ Ydb::Discovery::ListEndpointsResult resultSsl;
+ resultSsl.mutable_endpoints()->Reserve(info->InfoEntries.size());
+
+ THashMap<TEndpointKey, TEndpointState> states;
+ THashMap<TEndpointKey, TEndpointState> statesSsl;
+
+ NKikimrStateStorage::TEndpointBoardEntry entry;
+ for (const TString *xpayload : entries) {
+ Y_PROTOBUF_SUPPRESS_NODISCARD entry.ParseFromString(*xpayload);
+ if (!CheckServices(services, entry)) {
+ continue;
+ }
+
+ if (entry.GetSsl()) {
+ AddEndpoint(resultSsl, statesSsl, entry);
+ } else {
+ AddEndpoint(result, states, entry);
+ }
+ }
+
+ const auto &nodeInfo = nameserviceResponse->Node;
+ if (nodeInfo && nodeInfo->Location.GetDataCenterId()) {
+ const auto &location = nodeInfo->Location.GetDataCenterId();
+ if (IsSafeLocationMarker(location)) {
+ result.set_self_location(location);
+ resultSsl.set_self_location(location);
+ }
+ }
+
+ return {SerializeResult(result), SerializeResult(resultSsl)};
+ }
+}
+
namespace NDiscoveryPrivate {
struct TEvPrivate {
enum EEv {
@@ -39,9 +188,10 @@ namespace NDiscoveryPrivate {
};
};
- class TDiscoveryCache: public TActor<TDiscoveryCache> {
- THashMap<TString, THolder<TEvStateStorage::TEvBoardInfo>> OldInfo;
- THashMap<TString, THolder<TEvStateStorage::TEvBoardInfo>> NewInfo;
+ class TDiscoveryCache: public TActorBootstrapped<TDiscoveryCache> {
+ THashMap<TString, std::shared_ptr<NDiscovery::TCachedMessageData>> NewCachedMessages;
+ THashMap<TString, std::shared_ptr<NDiscovery::TCachedMessageData>> OldCachedMessages;
+ THolder<TEvInterconnect::TEvNodeInfo> NameserviceResponse;
struct TWaiter {
TActorId ActorId;
@@ -67,23 +217,38 @@ namespace NDiscoveryPrivate {
it->second.push_back(waiter);
}
+ void Handle(TEvInterconnect::TEvNodeInfo::TPtr &ev) {
+ NameserviceResponse.Reset(ev->Release().Release());
+ }
+
void Handle(TEvStateStorage::TEvBoardInfo::TPtr& ev) {
CLOG_T("Handle " << ev->Get()->ToString());
THolder<TEvStateStorage::TEvBoardInfo> msg = ev->Release();
const auto& path = msg->Path;
+ auto newCachedData = std::make_shared<NDiscovery::TCachedMessageData>();
+
+ if (NameserviceResponse) {
+ auto result = NDiscovery::CreateSerializedMessage(msg, {}, NameserviceResponse);
+
+ newCachedData->CachedMessage = result.first;
+ newCachedData->CachedMessageSsl = result.second;
+ }
+
+ newCachedData->Info = std::move(msg);
+
+ OldCachedMessages.erase(path);
+ NewCachedMessages.emplace(path, newCachedData);
+
if (auto it = Requested.find(path); it != Requested.end()) {
for (const auto& waiter : it->second) {
- Send(waiter.ActorId, new TEvStateStorage::TEvBoardInfo(*msg), 0, waiter.Cookie);
+ Send(waiter.ActorId,
+ new TEvDiscovery::TEvDiscoveryData(newCachedData), 0, waiter.Cookie);
}
-
Requested.erase(it);
}
- OldInfo.erase(path);
- NewInfo.emplace(path, std::move(msg));
-
if (!Scheduled) {
Scheduled = true;
Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup());
@@ -91,10 +256,10 @@ namespace NDiscoveryPrivate {
}
void Wakeup() {
- OldInfo.swap(NewInfo);
- NewInfo.clear();
+ OldCachedMessages.swap(NewCachedMessages);
+ NewCachedMessages.clear();
- if (!OldInfo.empty()) {
+ if (!OldCachedMessages.empty()) {
Scheduled = true;
Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup());
} else {
@@ -107,18 +272,17 @@ namespace NDiscoveryPrivate {
const auto* msg = ev->Get();
- if (const auto* x = NewInfo.FindPtr(msg->Database)) {
- Send(ev->Sender, new TEvStateStorage::TEvBoardInfo(**x), 0, ev->Cookie);
- return;
- }
-
- if (const auto* x = OldInfo.FindPtr(msg->Database)) {
+ const auto* cachedData = NewCachedMessages.FindPtr(msg->Database);
+ if (cachedData == nullptr) {
+ cachedData = OldCachedMessages.FindPtr(msg->Database);
+ if (cachedData == nullptr) {
+ Request(msg->Database, msg->StateStorageId, {ev->Sender, ev->Cookie});
+ return;
+ }
Request(msg->Database, msg->StateStorageId);
- Send(ev->Sender, new TEvStateStorage::TEvBoardInfo(**x), 0, ev->Cookie);
- return;
}
- Request(msg->Database, msg->StateStorageId, {ev->Sender, ev->Cookie});
+ Send(ev->Sender, new TEvDiscovery::TEvDiscoveryData(*cachedData), 0, ev->Cookie);
}
public:
@@ -126,15 +290,17 @@ namespace NDiscoveryPrivate {
return NKikimrServices::TActivity::DISCOVERY_CACHE_ACTOR;
}
- TDiscoveryCache()
- : TActor(&TThis::StateWork)
- {
+ void Bootstrap() {
+ Send(GetNameserviceActorId(), new TEvInterconnect::TEvGetNode(SelfId().NodeId()));
+
+ Become(&TThis::StateWork);
}
STATEFN(StateWork) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvPrivate::TEvRequest, Handle);
hFunc(TEvStateStorage::TEvBoardInfo, Handle);
+ hFunc(TEvInterconnect::TEvNodeInfo, Handle);
sFunc(TEvents::TEvWakeup, Wakeup);
sFunc(TEvents::TEvPoison, PassAway);
}
@@ -148,7 +314,7 @@ class TDiscoverer: public TActorBootstrapped<TDiscoverer> {
const TActorId ReplyTo;
const TActorId CacheId;
- THolder<TEvStateStorage::TEvBoardInfo> LookupResponse;
+ THolder<TEvDiscovery::TEvDiscoveryData> LookupResponse;
THolder<TEvTxProxySchemeCache::TEvNavigateKeySetResult> SchemeCacheResponse;
bool ResolveResources = false;
@@ -164,7 +330,9 @@ public:
return NKikimrServices::TActivity::DISCOVERY_ACTOR;
}
- explicit TDiscoverer(TLookupPathFunc f, const TString& database, const TActorId& replyTo, const TActorId& cacheId)
+ explicit TDiscoverer(
+ TLookupPathFunc f, const TString& database,
+ const TActorId& replyTo, const TActorId& cacheId)
: MakeLookupPath(f)
, Database(database)
, ReplyTo(replyTo)
@@ -181,15 +349,19 @@ public:
STATEFN(StateWork) {
switch (ev->GetTypeRewrite()) {
- hFunc(TEvStateStorage::TEvBoardInfo, Handle);
+ hFunc(TEvDiscovery::TEvDiscoveryData, Handle);
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
sFunc(TEvents::TEvPoison, PassAway);
}
}
- void Handle(TEvStateStorage::TEvBoardInfo::TPtr& ev) {
- DLOG_T("Handle " << LookupResponse->ToString()
- << ": cookie# " << ev->Cookie);
+ void Handle(TEvDiscovery::TEvDiscoveryData::TPtr& ev) {
+ Y_VERIFY(ev->Get()->CachedMessageData);
+
+ if (ev->Get()->CachedMessageData->Info) {
+ DLOG_T("Handle " << ev->Get()->CachedMessageData->Info->ToString()
+ << ": cookie# " << ev->Cookie);
+ }
if (ev->Cookie != LookupCookie) {
DLOG_D("Stale lookup response"
@@ -270,14 +442,15 @@ public:
}
}
- if (LookupResponse->Status != TEvStateStorage::TEvBoardInfo::EStatus::Ok) {
+ if (LookupResponse->CachedMessageData->Info &&
+ LookupResponse->CachedMessageData->Info->Status != TEvStateStorage::TEvBoardInfo::EStatus::Ok) {
DLOG_D("Lookup error"
- << ": status# " << ui64(LookupResponse->Status));
+ << ": status# " << ui64(LookupResponse->CachedMessageData->Info->Status));
return Reply(new TEvDiscovery::TEvError(TEvDiscovery::TEvError::RESOLVE_ERROR,
"Database nodes resolve failed with no certain result"));
}
- Reply(new TEvStateStorage::TEvBoardInfo(*LookupResponse));
+ Reply(LookupResponse.Release());
}
void Lookup(const TString& db) {
@@ -336,7 +509,11 @@ public:
}
};
-IActor* CreateDiscoverer(TLookupPathFunc f, const TString& database, const TActorId& replyTo, const TActorId& cacheId) {
+IActor* CreateDiscoverer(
+ TLookupPathFunc f,
+ const TString& database,
+ const TActorId& replyTo,
+ const TActorId& cacheId) {
return new TDiscoverer(f, database, replyTo, cacheId);
}
diff --git a/ydb/core/discovery/discovery.h b/ydb/core/discovery/discovery.h
index 5174fa4e07c..01e5d5d0926 100644
--- a/ydb/core/discovery/discovery.h
+++ b/ydb/core/discovery/discovery.h
@@ -1,14 +1,27 @@
#pragma once
#include <ydb/core/base/events.h>
+#include <ydb/core/base/statestorage.h>
#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/interconnect.h>
+#include <library/cpp/actors/interconnect/interconnect.h>
namespace NKikimr {
+namespace NDiscovery {
+ struct TCachedMessageData {
+ TString CachedMessage;
+ TString CachedMessageSsl;
+ THolder<TEvStateStorage::TEvBoardInfo> Info;
+ };
+}
+
struct TEvDiscovery {
+
enum EEv {
EvError = EventSpaceBegin(TKikimrEvents::ES_DISCOVERY),
+ EvDiscoveryData,
EvEnd
};
@@ -29,14 +42,34 @@ struct TEvDiscovery {
{
}
};
+
+ struct TEvDiscoveryData : public TEventLocal<TEvDiscoveryData, EvDiscoveryData> {
+ std::shared_ptr<const NDiscovery::TCachedMessageData> CachedMessageData;
+
+ TEvDiscoveryData(
+ std::shared_ptr<const NDiscovery::TCachedMessageData> cachedMessageData)
+ : CachedMessageData(std::move(cachedMessageData))
+ {}
+ };
};
+namespace NDiscovery {
+ std::pair<TString, TString> CreateSerializedMessage(
+ const THolder<TEvStateStorage::TEvBoardInfo>&,
+ TSet<TString>,
+ const THolder<TEvInterconnect::TEvNodeInfo>&);
+}
+
using TLookupPathFunc = std::function<TString(const TString&)>;
// Reply with:
// - in case of success: TEvStateStorage::TEvBoardInfo
// - otherwise: TEvDiscovery::TEvError
-IActor* CreateDiscoverer(TLookupPathFunc f, const TString& database, const TActorId& replyTo, const TActorId& cacheId);
+IActor* CreateDiscoverer(
+ TLookupPathFunc f,
+ const TString& database,
+ const TActorId& replyTo,
+ const TActorId& cacheId);
// Used to reduce number of requests to Board
IActor* CreateDiscoveryCache();
diff --git a/ydb/core/grpc_services/rpc_discovery.cpp b/ydb/core/grpc_services/rpc_discovery.cpp
index ee1285e4ab0..820866e5c03 100644
--- a/ydb/core/grpc_services/rpc_discovery.cpp
+++ b/ydb/core/grpc_services/rpc_discovery.cpp
@@ -26,7 +26,7 @@ class TListEndpointsRPC : public TActorBootstrapped<TListEndpointsRPC> {
const TActorId CacheId;
TActorId Discoverer;
- THolder<TEvStateStorage::TEvBoardInfo> LookupResponse;
+ THolder<TEvDiscovery::TEvDiscoveryData> LookupResponse;
THolder<TEvInterconnect::TEvNodeInfo> NameserviceResponse;
public:
@@ -60,13 +60,14 @@ public:
STATEFN(StateWait) {
switch (ev->GetTypeRewrite()) {
- hFunc(TEvStateStorage::TEvBoardInfo, Handle);
+ hFunc(TEvDiscovery::TEvDiscoveryData, Handle);
hFunc(TEvInterconnect::TEvNodeInfo, Handle);
hFunc(TEvDiscovery::TEvError, Handle);
}
}
- void Handle(TEvStateStorage::TEvBoardInfo::TPtr &ev) {
+ void Handle(TEvDiscovery::TEvDiscoveryData::TPtr &ev) {
+ Y_VERIFY(ev->Get()->CachedMessageData);
Discoverer = {};
LookupResponse.Reset(ev->Release().Release());
@@ -122,98 +123,33 @@ public:
if (!NameserviceResponse || !LookupResponse)
return;
- Y_VERIFY(LookupResponse->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok);
-
- TStackVec<const TString*> entries;
- entries.reserve(LookupResponse->InfoEntries.size());
- for (auto &xpair : LookupResponse->InfoEntries)
- entries.emplace_back(&xpair.second.Payload);
- Shuffle(entries.begin(), entries.end());
-
- auto *result = TEvListEndpointsRequest::AllocateResult<Ydb::Discovery::ListEndpointsResult>(Request);
- result->mutable_endpoints()->Reserve(LookupResponse->InfoEntries.size());
-
- const TSet<TString> services(Request->GetProtoRequest()->Getservice().begin(), Request->GetProtoRequest()->Getservice().end());
- const bool sslServer = Request->SslServer();
-
- using TEndpointKey = std::pair<TString, ui32>;
- struct TEndpointState {
- int Index = -1;
- int Count = 0;
- float LoadFactor = 0;
- THashSet<TString> Locations;
- THashSet<TString> Services;
- };
- THashMap<TEndpointKey, TEndpointState> states;
-
- NKikimrStateStorage::TEndpointBoardEntry entry;
- for (const TString *xpayload : entries) {
- Y_PROTOBUF_SUPPRESS_NODISCARD entry.ParseFromString(*xpayload);
- if (!CheckServices(services, entry))
- continue;
-
- if (entry.GetSsl() != sslServer)
- continue;
-
- Ydb::Discovery::EndpointInfo *xres;
-
- auto& state = states[TEndpointKey(entry.GetAddress(), entry.GetPort())];
- if (state.Index >= 0) {
- xres = result->mutable_endpoints(state.Index);
- ++state.Count;
- // FIXME: do we want a mean or a sum here?
- // xres->set_load_factor(xres->load_factor() + (entry.GetLoad() - xres->load_factor()) / state.Count);
- xres->set_load_factor(xres->load_factor() + entry.GetLoad());
- } else {
- state.Index = result->endpoints_size();
- state.Count = 1;
- xres = result->add_endpoints();
- xres->set_address(entry.GetAddress());
- xres->set_port(entry.GetPort());
- if (entry.GetSsl())
- xres->set_ssl(true);
- xres->set_load_factor(entry.GetLoad());
- xres->set_node_id(entry.GetNodeId());
- if (entry.AddressesV4Size()) {
- xres->mutable_ip_v4()->Reserve(entry.AddressesV4Size());
- for (const auto& addr : entry.GetAddressesV4()) {
- xres->add_ip_v4(addr);
- }
- }
- if (entry.AddressesV6Size()) {
- xres->mutable_ip_v6()->Reserve(entry.AddressesV6Size());
- for (const auto& addr : entry.GetAddressesV6()) {
- xres->add_ip_v6(addr);
- }
- }
- xres->set_ssl_target_name_override(entry.GetTargetNameOverride());
- }
-
- if (IsSafeLocationMarker(entry.GetDataCenter())) {
- if (state.Locations.insert(entry.GetDataCenter()).second) {
- if (xres->location().empty()) {
- xres->set_location(entry.GetDataCenter());
- } else {
- xres->set_location(xres->location() + "/" + entry.GetDataCenter());
- }
- }
- }
-
- for (auto &service : entry.GetServices()) {
- if (state.Services.insert(service).second) {
- xres->add_service(service);
- }
- }
+ Y_VERIFY(LookupResponse->CachedMessageData->Info &&
+ LookupResponse->CachedMessageData->Info->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok);
+
+ const TSet<TString> services(
+ Request->GetProtoRequest()->Getservice().begin(), Request->GetProtoRequest()->Getservice().end());
+
+ TString cachedMessage, cachedMessageSsl;
+
+ if (services.empty() && !LookupResponse->CachedMessageData->CachedMessage.empty() &&
+ !LookupResponse->CachedMessageData->CachedMessageSsl.empty()) {
+ cachedMessage = LookupResponse->CachedMessageData->CachedMessage;
+ cachedMessageSsl = LookupResponse->CachedMessageData->CachedMessageSsl;
+ } else {
+ std::tie(cachedMessage, cachedMessageSsl) = NDiscovery::CreateSerializedMessage(
+ LookupResponse->CachedMessageData->Info, std::move(services), NameserviceResponse);
}
- auto &nodeInfo = NameserviceResponse->Node;
- if (nodeInfo && nodeInfo->Location.GetDataCenterId()) {
- const auto &location = nodeInfo->Location.GetDataCenterId();
- if (IsSafeLocationMarker(location))
- result->set_self_location(location);
+ if (Request->SslServer()) {
+ ReplySerialized(std::move(cachedMessageSsl), Ydb::StatusIds::SUCCESS);
+ } else {
+ ReplySerialized(std::move(cachedMessage), Ydb::StatusIds::SUCCESS);
}
+ }
- Reply(*result, Ydb::StatusIds::SUCCESS);
+ void ReplySerialized(TString message, Ydb::StatusIds::StatusCode status) {
+ Request->SendSerializedResult(std::move(message), status);
+ PassAway();
}
template <typename... Args>
@@ -221,15 +157,6 @@ public:
Request->SendResult(std::forward<Args>(args)...);
PassAway();
}
-
- bool IsSafeLocationMarker(TStringBuf location) {
- const ui8* isrc = reinterpret_cast<const ui8*>(location.data());
- for (auto idx : xrange(location.size())) {
- if (isrc[idx] >= 0x80)
- return false;
- }
- return true;
- }
};
void TGRpcRequestProxy::Handle(TEvListEndpointsRequest::TPtr& ev, const TActorContext& ctx) {
diff --git a/ydb/core/tx/replication/controller/controller.cpp b/ydb/core/tx/replication/controller/controller.cpp
index ad412f9d248..dcaf754e8b7 100644
--- a/ydb/core/tx/replication/controller/controller.cpp
+++ b/ydb/core/tx/replication/controller/controller.cpp
@@ -59,7 +59,7 @@ STFUNC(TController::StateWork) {
HFunc(TEvPrivate::TEvDropDstResult, Handle);
HFunc(TEvPrivate::TEvResolveTenantResult, Handle);
HFunc(TEvPrivate::TEvUpdateTenantNodes, Handle);
- HFunc(TEvStateStorage::TEvBoardInfo, Handle);
+ HFunc(TEvDiscovery::TEvDiscoveryData, Handle);
HFunc(TEvDiscovery::TEvError, Handle);
HFunc(TEvents::TEvPoison, Handle);
}
@@ -175,8 +175,11 @@ void TController::Handle(TEvPrivate::TEvUpdateTenantNodes::TPtr& ev, const TActo
}
}
-void TController::Handle(TEvStateStorage::TEvBoardInfo::TPtr& ev, const TActorContext& ctx) {
- CLOG_T(ctx, "Handle " << ev->Get()->ToString());
+void TController::Handle(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx) {
+ Y_VERIFY(ev->Get()->CachedMessageData && ev->Get()->CachedMessageData->Info);
+
+ CLOG_T(ctx, "Handle " << ev->Get()->CachedMessageData->Info->ToString());
+
NodesManager.ProcessResponse(ev, ctx);
}
diff --git a/ydb/core/tx/replication/controller/controller_impl.h b/ydb/core/tx/replication/controller/controller_impl.h
index 1e521293e6e..784b06d17ed 100644
--- a/ydb/core/tx/replication/controller/controller_impl.h
+++ b/ydb/core/tx/replication/controller/controller_impl.h
@@ -72,7 +72,7 @@ private:
void Handle(TEvPrivate::TEvDropDstResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvResolveTenantResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvUpdateTenantNodes::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvStateStorage::TEvBoardInfo::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx);
void Handle(TEvents::TEvPoison::TPtr& ev, const TActorContext& ctx);
diff --git a/ydb/core/tx/replication/controller/nodes_manager.cpp b/ydb/core/tx/replication/controller/nodes_manager.cpp
index 40877c1815e..0a7863dfa3f 100644
--- a/ydb/core/tx/replication/controller/nodes_manager.cpp
+++ b/ydb/core/tx/replication/controller/nodes_manager.cpp
@@ -16,11 +16,14 @@ const THashSet<ui32>& TNodesManager::GetNodes(const TString& tenant) const {
void TNodesManager::DiscoverNodes(const TString& tenant, const TActorId& cache, const TActorContext& ctx) {
TenantNodes.emplace(tenant, THashSet<ui32>());
- NodeDiscoverers.emplace(ctx.Register(CreateDiscoverer(&NService::MakeDiscoveryPath, tenant, ctx.SelfID, cache)), tenant);
+ NodeDiscoverers.emplace(
+ ctx.Register(CreateDiscoverer(&NService::MakeDiscoveryPath, tenant, ctx.SelfID, cache)), tenant
+ );
}
-void TNodesManager::ProcessResponse(TEvStateStorage::TEvBoardInfo::TPtr& ev, const TActorContext& ctx) {
- Y_VERIFY(ev->Get()->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok);
+void TNodesManager::ProcessResponse(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx) {
+ Y_VERIFY(ev->Get()->CachedMessageData && ev->Get()->CachedMessageData->Info);
+ Y_VERIFY(ev->Get()->CachedMessageData->Info->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok);
auto it = NodeDiscoverers.find(ev->Sender);
if (it == NodeDiscoverers.end()) {
@@ -30,7 +33,7 @@ void TNodesManager::ProcessResponse(TEvStateStorage::TEvBoardInfo::TPtr& ev, con
auto& nodes = TenantNodes[it->second];
nodes.clear();
- for (const auto& [actorId, _] : ev->Get()->InfoEntries) {
+ for (const auto& [actorId, _] : ev->Get()->CachedMessageData->Info->InfoEntries) {
nodes.insert(actorId.NodeId());
}
diff --git a/ydb/core/tx/replication/controller/nodes_manager.h b/ydb/core/tx/replication/controller/nodes_manager.h
index d07c0de826d..36a4598300c 100644
--- a/ydb/core/tx/replication/controller/nodes_manager.h
+++ b/ydb/core/tx/replication/controller/nodes_manager.h
@@ -17,7 +17,7 @@ public:
const THashSet<ui32>& GetNodes(const TString& tenant) const;
void DiscoverNodes(const TString& tenant, const TActorId& cache, const TActorContext& ctx);
- void ProcessResponse(TEvStateStorage::TEvBoardInfo::TPtr& ev, const TActorContext& ctx);
+ void ProcessResponse(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx);
void ProcessResponse(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx);
void Shutdown(const TActorContext& ctx);