aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgrigminakov <grigminakov@yandex-team.com>2023-11-24 11:08:54 +0300
committergrigminakov <grigminakov@yandex-team.com>2023-11-24 11:32:05 +0300
commitefe5b5f8d2da503eda4d172f6f2e85aac64ba6a6 (patch)
treec5b23bbb7b8d2e11ad572d8ab3dfeda2cac70cf7
parent72bdc12af15fe8bf46563e1158ba786b0b5defad (diff)
downloadydb-efe5b5f8d2da503eda4d172f6f2e85aac64ba6a6.tar.gz
YT-20546: Peer discovery encapsulation
-rw-r--r--yt/yt/client/api/rpc_proxy/connection_impl.cpp3
-rw-r--r--yt/yt/core/CMakeLists.darwin-arm64.txt1
-rw-r--r--yt/yt/core/CMakeLists.darwin-x86_64.txt1
-rw-r--r--yt/yt/core/CMakeLists.linux-aarch64.txt1
-rw-r--r--yt/yt/core/CMakeLists.linux-x86_64.txt1
-rw-r--r--yt/yt/core/CMakeLists.windows-x86_64.txt1
-rw-r--r--yt/yt/core/rpc/balancing_channel.cpp23
-rw-r--r--yt/yt/core/rpc/balancing_channel.h5
-rw-r--r--yt/yt/core/rpc/dynamic_channel_pool.cpp58
-rw-r--r--yt/yt/core/rpc/dynamic_channel_pool.h3
-rw-r--r--yt/yt/core/rpc/peer_discovery.cpp58
-rw-r--r--yt/yt/core/rpc/peer_discovery.h33
-rw-r--r--yt/yt/core/rpc/public.h1
-rw-r--r--yt/yt/core/ya.make1
14 files changed, 135 insertions, 55 deletions
diff --git a/yt/yt/client/api/rpc_proxy/connection_impl.cpp b/yt/yt/client/api/rpc_proxy/connection_impl.cpp
index 11a7347bae..fea7273ff6 100644
--- a/yt/yt/client/api/rpc_proxy/connection_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/connection_impl.cpp
@@ -28,6 +28,7 @@
#include <yt/yt/core/rpc/caching_channel_factory.h>
#include <yt/yt/core/rpc/dynamic_channel_pool.h>
#include <yt/yt/core/rpc/dispatcher.h>
+#include <yt/yt/core/rpc/peer_discovery.h>
#include <yt/yt/core/ytree/fluent.h>
@@ -243,7 +244,7 @@ TConnection::TConnection(TConnectionConfigPtr config, TConnectionOptions options
MakeEndpointDescription(Config_, ConnectionId_),
MakeEndpointAttributes(Config_, ConnectionId_),
TApiServiceProxy::GetDescriptor().ServiceName,
- TDiscoverRequestHook()))
+ CreateDefaultPeerDiscovery(TDiscoverRequestHook())))
{
if (options.ConnectionInvoker) {
ConnectionInvoker_ = options.ConnectionInvoker;
diff --git a/yt/yt/core/CMakeLists.darwin-arm64.txt b/yt/yt/core/CMakeLists.darwin-arm64.txt
index 39189652b2..21ed07c2d3 100644
--- a/yt/yt/core/CMakeLists.darwin-arm64.txt
+++ b/yt/yt/core/CMakeLists.darwin-arm64.txt
@@ -222,6 +222,7 @@ target_sources(yt-yt-core PRIVATE
${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/message.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/message_format.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/null_channel.cpp
+ ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/peer_discovery.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/per_user_request_queue_provider.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/protocol_version.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/public.cpp
diff --git a/yt/yt/core/CMakeLists.darwin-x86_64.txt b/yt/yt/core/CMakeLists.darwin-x86_64.txt
index c683eefee9..7a6160415a 100644
--- a/yt/yt/core/CMakeLists.darwin-x86_64.txt
+++ b/yt/yt/core/CMakeLists.darwin-x86_64.txt
@@ -223,6 +223,7 @@ target_sources(yt-yt-core PRIVATE
${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/message.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/message_format.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/null_channel.cpp
+ ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/peer_discovery.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/per_user_request_queue_provider.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/protocol_version.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/public.cpp
diff --git a/yt/yt/core/CMakeLists.linux-aarch64.txt b/yt/yt/core/CMakeLists.linux-aarch64.txt
index fc9f31ca0a..fb60bd26a5 100644
--- a/yt/yt/core/CMakeLists.linux-aarch64.txt
+++ b/yt/yt/core/CMakeLists.linux-aarch64.txt
@@ -223,6 +223,7 @@ target_sources(yt-yt-core PRIVATE
${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/message.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/message_format.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/null_channel.cpp
+ ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/peer_discovery.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/per_user_request_queue_provider.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/protocol_version.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/public.cpp
diff --git a/yt/yt/core/CMakeLists.linux-x86_64.txt b/yt/yt/core/CMakeLists.linux-x86_64.txt
index 104037bd3c..02cb5341e7 100644
--- a/yt/yt/core/CMakeLists.linux-x86_64.txt
+++ b/yt/yt/core/CMakeLists.linux-x86_64.txt
@@ -224,6 +224,7 @@ target_sources(yt-yt-core PRIVATE
${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/message.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/message_format.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/null_channel.cpp
+ ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/peer_discovery.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/per_user_request_queue_provider.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/protocol_version.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/public.cpp
diff --git a/yt/yt/core/CMakeLists.windows-x86_64.txt b/yt/yt/core/CMakeLists.windows-x86_64.txt
index 2e3be60648..dba46ac9ef 100644
--- a/yt/yt/core/CMakeLists.windows-x86_64.txt
+++ b/yt/yt/core/CMakeLists.windows-x86_64.txt
@@ -222,6 +222,7 @@ target_sources(yt-yt-core PRIVATE
${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/message.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/message_format.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/null_channel.cpp
+ ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/peer_discovery.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/per_user_request_queue_provider.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/protocol_version.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/public.cpp
diff --git a/yt/yt/core/rpc/balancing_channel.cpp b/yt/yt/core/rpc/balancing_channel.cpp
index fcc9911377..2cc9595e89 100644
--- a/yt/yt/core/rpc/balancing_channel.cpp
+++ b/yt/yt/core/rpc/balancing_channel.cpp
@@ -45,7 +45,7 @@ public:
TString endpointDescription,
IAttributeDictionaryPtr endpointAttributes,
std::string serviceName,
- TDiscoverRequestHook discoverRequestHook)
+ IPeerDiscoveryPtr peerDiscovery)
: Config_(std::move(config))
, EndpointDescription_(endpointDescription)
, EndpointAttributes_(ConvertToAttributes(BuildYsonStringFluently()
@@ -54,14 +54,13 @@ public:
.Item("service").Value(serviceName)
.EndMap()))
, ServiceName_(std::move(serviceName))
- , DiscoverRequestHook_(std::move(discoverRequestHook))
, Pool_(New<TDynamicChannelPool>(
Config_,
std::move(channelFactory),
EndpointDescription_,
EndpointAttributes_,
ServiceName_,
- DiscoverRequestHook_))
+ peerDiscovery))
{
if (Config_->Addresses) {
ConfigureFromAddresses();
@@ -100,7 +99,6 @@ private:
const TString EndpointDescription_;
const IAttributeDictionaryPtr EndpointAttributes_;
const TString ServiceName_;
- const TDiscoverRequestHook DiscoverRequestHook_;
const TDynamicChannelPoolPtr Pool_;
@@ -180,10 +178,9 @@ public:
IChannelFactoryPtr channelFactory,
TString endpointDescription,
IAttributeDictionaryPtr endpointAttributes,
- TDiscoverRequestHook discoverRequestHook)
+ IPeerDiscoveryPtr peerDiscovery)
: Config_(std::move(config))
, ChannelFactory_(std::move(channelFactory))
- , DiscoverRequestHook_(std::move(discoverRequestHook))
, EndpointDescription_(Format("%v%v",
endpointDescription,
Config_->Addresses))
@@ -192,6 +189,7 @@ public:
.Item("addresses").Value(Config_->Addresses)
.Items(*endpointAttributes)
.EndMap()))
+ , PeerDiscovery_(peerDiscovery)
{ }
const TString& GetEndpointDescription() const override
@@ -244,10 +242,10 @@ public:
private:
const TBalancingChannelConfigPtr Config_;
const IChannelFactoryPtr ChannelFactory_;
- const TDiscoverRequestHook DiscoverRequestHook_;
const TString EndpointDescription_;
const IAttributeDictionaryPtr EndpointAttributes_;
+ const IPeerDiscoveryPtr PeerDiscovery_;
YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, SpinLock_);
THashMap<std::string, TBalancingChannelSubproviderPtr> SubproviderMap_;
@@ -276,7 +274,7 @@ private:
EndpointDescription_,
EndpointAttributes_,
serviceName,
- DiscoverRequestHook_);
+ PeerDiscovery_);
EmplaceOrCrash(SubproviderMap_, serviceName, subprovider);
return subprovider;
}
@@ -292,14 +290,15 @@ IChannelPtr CreateBalancingChannel(
IChannelFactoryPtr channelFactory,
TString endpointDescription,
IAttributeDictionaryPtr endpointAttributes,
- TDiscoverRequestHook discoverRequestHook)
+ IPeerDiscoveryPtr peerDiscovery)
{
auto channelProvider = CreateBalancingChannelProvider(
std::move(config),
std::move(channelFactory),
std::move(endpointDescription),
std::move(endpointAttributes),
- std::move(discoverRequestHook));
+ std::move(peerDiscovery));
+
return CreateRoamingChannel(channelProvider);
}
@@ -308,7 +307,7 @@ IRoamingChannelProviderPtr CreateBalancingChannelProvider(
IChannelFactoryPtr channelFactory,
TString endpointDescription,
IAttributeDictionaryPtr endpointAttributes,
- TDiscoverRequestHook discoverRequestHook)
+ IPeerDiscoveryPtr peerDiscovery)
{
YT_VERIFY(config);
YT_VERIFY(channelFactory);
@@ -318,7 +317,7 @@ IRoamingChannelProviderPtr CreateBalancingChannelProvider(
std::move(channelFactory),
std::move(endpointDescription),
std::move(endpointAttributes),
- std::move(discoverRequestHook));
+ std::move(peerDiscovery));
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/rpc/balancing_channel.h b/yt/yt/core/rpc/balancing_channel.h
index 2241bcffb1..d8d709abfd 100644
--- a/yt/yt/core/rpc/balancing_channel.h
+++ b/yt/yt/core/rpc/balancing_channel.h
@@ -1,6 +1,7 @@
#pragma once
#include "public.h"
+#include "peer_discovery.h"
#include <yt/yt/core/actions/callback.h>
@@ -17,14 +18,14 @@ IChannelPtr CreateBalancingChannel(
IChannelFactoryPtr channelFactory,
TString endpointDescription,
NYTree::IAttributeDictionaryPtr endpointAttributes,
- TDiscoverRequestHook discoverRequestHook = {});
+ IPeerDiscoveryPtr peerDiscovery = CreateDefaultPeerDiscovery(TDiscoverRequestHook{}));
IRoamingChannelProviderPtr CreateBalancingChannelProvider(
TBalancingChannelConfigPtr config,
IChannelFactoryPtr channelFactory,
TString endpointDescription,
NYTree::IAttributeDictionaryPtr endpointAttributes,
- TDiscoverRequestHook discoverRequestHook = {});
+ IPeerDiscoveryPtr peerDiscovery = CreateDefaultPeerDiscovery(TDiscoverRequestHook{}));
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/rpc/dynamic_channel_pool.cpp b/yt/yt/core/rpc/dynamic_channel_pool.cpp
index 0afc245c7e..9c41efd163 100644
--- a/yt/yt/core/rpc/dynamic_channel_pool.cpp
+++ b/yt/yt/core/rpc/dynamic_channel_pool.cpp
@@ -46,7 +46,7 @@ public:
TString endpointDescription,
IAttributeDictionaryPtr endpointAttributes,
std::string serviceName,
- TDiscoverRequestHook discoverRequestHook)
+ IPeerDiscoveryPtr peerDiscovery)
: Config_(std::move(config))
, ChannelFactory_(std::move(channelFactory))
, EndpointDescription_(std::move(endpointDescription))
@@ -56,7 +56,7 @@ public:
.Item("service").Value(serviceName)
.EndMap()))
, ServiceName_(std::move(serviceName))
- , DiscoverRequestHook_(std::move(discoverRequestHook))
+ , PeerDiscovery_(std::move(peerDiscovery))
, Logger(RpcClientLogger.WithTag(
"ChannelId: %v, Endpoint: %v, Service: %v",
TGuid::Create(),
@@ -191,7 +191,7 @@ private:
const TString EndpointDescription_;
const IAttributeDictionaryPtr EndpointAttributes_;
const std::string ServiceName_;
- const TDiscoverRequestHook DiscoverRequestHook_;
+ IPeerDiscoveryPtr PeerDiscovery_;
const NLogging::TLogger Logger;
@@ -306,17 +306,15 @@ private:
YT_LOG_DEBUG("Querying peer (Address: %v)", address);
auto channel = owner->ChannelFactory_->CreateChannel(address);
- auto proxy = owner->CreateGenericProxy(channel);
- proxy.SetDefaultTimeout(owner->Config_->DiscoverTimeout);
-
- auto req = proxy.Discover();
- if (owner->DiscoverRequestHook_) {
- owner->DiscoverRequestHook_.Run(req.Get());
- }
+ auto request = owner->PeerDiscovery_->Discover(
+ channel,
+ owner->Config_->DiscoverTimeout,
+ /*replyDelay*/ TDuration::Zero(),
+ owner->ServiceName_);
// NB: Via prevents stack overflow due to QueryPeer -> OnResponse -> DoRun loop in
// case when Invoke() is immediately set.
- req->Invoke().Subscribe(BIND(
+ request.Subscribe(BIND(
&TDiscoverySession::OnResponse,
MakeStrong(this),
address)
@@ -325,7 +323,7 @@ private:
void OnResponse(
const TString& address,
- const TGenericProxy::TErrorOrRspDiscoverPtr& rspOrError)
+ const TErrorOr<TPeerDiscoveryResponse> rspOrError)
{
auto owner = Owner_.Lock();
if (!owner) {
@@ -337,11 +335,9 @@ private:
YT_LOG_DEBUG_IF(authError, "Peer has reported authentication error on discovery (Address: %v)",
address);
if (rspOrError.IsOK() || authError) {
- // const auto& rsp = rspOrError.Value();
- // bool up = rsp->up();
- // auto suggestedAddresses = FromProto<std::vector<TString>>(rsp->suggested_addresses());
- bool up = authError ? true : rspOrError.Value()->up();
- auto suggestedAddresses = authError ? std::vector<TString>() : FromProto<std::vector<TString>>(rspOrError.Value()->suggested_addresses());
+ NProto::TRspDiscover resp;
+ auto suggestedAddresses = FromProto<std::vector<TString>>(resp.suggested_addresses());
+ bool up = authError ? true : rspOrError.Value().IsUp;
if (!suggestedAddresses.empty()) {
YT_LOG_DEBUG("Peers suggested (SuggestorAddress: %v, SuggestedAddresses: %v)",
@@ -537,31 +533,22 @@ private:
auto peerPollingPeriod = owner->Config_->PeerPollingPeriod + RandomDuration(owner->Config_->PeerPollingPeriodSplay);
auto channel = owner->ChannelFactory_->CreateChannel(PeerAddress_);
- auto proxy = owner->CreateGenericProxy(channel);
-
auto requestTimeout = peerPollingPeriod + owner->Config_->PeerPollingRequestTimeout;
- auto req = proxy.Discover();
- req->set_reply_delay(peerPollingPeriod.GetValue());
- req->SetTimeout(requestTimeout);
- if (owner->DiscoverRequestHook_) {
- owner->DiscoverRequestHook_.Run(req.Get());
- }
-
+ auto req = owner->PeerDiscovery_->Discover(channel, requestTimeout, /*replyDelay*/ peerPollingPeriod, owner->ServiceName_);
YT_LOG_DEBUG("Polling peer (PollingPeriod: %v, RequestTimeout: %v)",
peerPollingPeriod,
requestTimeout);
owner.Reset();
- req->Invoke()
- .Subscribe(BIND([=, this, this_ = MakeStrong(this)] (const TGenericProxy::TErrorOrRspDiscoverPtr& rspOrError) {
+ req.Subscribe(BIND([=, this, this_ = MakeStrong(this)] (const TErrorOr<TPeerDiscoveryResponse>& rspOrError) {
auto owner = Owner_.Lock();
if (!owner) {
return;
}
if (rspOrError.IsOK()) {
- auto isUp = rspOrError.Value()->up();
+ auto isUp = rspOrError.Value().IsUp;
if (isUp) {
YT_LOG_DEBUG("Peer is up");
owner->UnbanPeer(PeerAddress_);
@@ -857,13 +844,6 @@ private:
evicted);
}
- TGenericProxy CreateGenericProxy(IChannelPtr peerChannel)
- {
- auto serviceDescriptor = TServiceDescriptor(ServiceName_)
- .SetProtocolVersion(GenericProtocolVersion);
- return TGenericProxy(std::move(peerChannel), serviceDescriptor);
- }
-
IChannelPtr CreateChannel(const TString& address)
{
return CreateFailureDetectingChannel(
@@ -879,16 +859,16 @@ TDynamicChannelPool::TDynamicChannelPool(
TDynamicChannelPoolConfigPtr config,
IChannelFactoryPtr channelFactory,
TString endpointDescription,
- IAttributeDictionaryPtr endpointAttributes,
+ NYTree::IAttributeDictionaryPtr endpointAttributes,
std::string serviceName,
- TDiscoverRequestHook discoverRequestHook)
+ IPeerDiscoveryPtr peerDiscovery)
: Impl_(New<TImpl>(
std::move(config),
std::move(channelFactory),
std::move(endpointDescription),
std::move(endpointAttributes),
std::move(serviceName),
- std::move(discoverRequestHook)))
+ std::move(peerDiscovery)))
{ }
TDynamicChannelPool::~TDynamicChannelPool() = default;
diff --git a/yt/yt/core/rpc/dynamic_channel_pool.h b/yt/yt/core/rpc/dynamic_channel_pool.h
index 94c67fa5d8..76013e753a 100644
--- a/yt/yt/core/rpc/dynamic_channel_pool.h
+++ b/yt/yt/core/rpc/dynamic_channel_pool.h
@@ -1,6 +1,7 @@
#pragma once
#include "public.h"
+#include "peer_discovery.h"
#include "hedging_channel.h"
#include <yt/yt/core/actions/callback.h>
@@ -30,7 +31,7 @@ public:
TString endpointDescription,
NYTree::IAttributeDictionaryPtr endpointAttributes,
std::string serviceName,
- TDiscoverRequestHook discoverRequestHook = {});
+ IPeerDiscoveryPtr peerDiscovery);
~TDynamicChannelPool();
TFuture<IChannelPtr> GetRandomChannel();
diff --git a/yt/yt/core/rpc/peer_discovery.cpp b/yt/yt/core/rpc/peer_discovery.cpp
new file mode 100644
index 0000000000..fbc218e96d
--- /dev/null
+++ b/yt/yt/core/rpc/peer_discovery.cpp
@@ -0,0 +1,58 @@
+#include "peer_discovery.h"
+
+#include "client.h"
+
+#include <yt/yt/core/misc/protobuf_helpers.h>
+
+namespace NYT::NRpc {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TDefaultPeerDiscovery
+ : public IPeerDiscovery
+{
+public:
+ TDefaultPeerDiscovery(TDiscoverRequestHook hook)
+ : Hook_(std::move(hook))
+ { }
+
+ TFuture<TPeerDiscoveryResponse> Discover(
+ IChannelPtr channel,
+ TDuration timeout,
+ TDuration replyDelay,
+ const std::string& serviceName) override
+ {
+ auto serviceDescriptor = TServiceDescriptor(serviceName)
+ .SetProtocolVersion(GenericProtocolVersion);
+ TGenericProxy proxy(std::move(channel), serviceDescriptor);
+ auto req = proxy.Discover();
+ if (Hook_) {
+ Hook_(req.Get());
+ }
+ req->SetTimeout(timeout);
+ req->set_reply_delay(replyDelay.GetValue());
+ return req->Invoke().Apply(BIND(&TDefaultPeerDiscovery::ConvertResponse));
+ }
+
+private:
+ TDiscoverRequestHook Hook_;
+
+ static TPeerDiscoveryResponse ConvertResponse(const TIntrusivePtr<TTypedClientResponse<NProto::TRspDiscover>>& rsp)
+ {
+ TPeerDiscoveryResponse response;
+ response.Addresses = NYT::FromProto<std::vector<TString>>(rsp->suggested_addresses());
+ response.IsUp = rsp->up();
+ return response;
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+IPeerDiscoveryPtr CreateDefaultPeerDiscovery(TDiscoverRequestHook hook)
+{
+ return New<TDefaultPeerDiscovery>(std::move(hook));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NRpc
diff --git a/yt/yt/core/rpc/peer_discovery.h b/yt/yt/core/rpc/peer_discovery.h
new file mode 100644
index 0000000000..e1b0f20101
--- /dev/null
+++ b/yt/yt/core/rpc/peer_discovery.h
@@ -0,0 +1,33 @@
+#pragma once
+
+#include "public.h"
+
+namespace NYT::NRpc {
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TPeerDiscoveryResponse
+{
+ bool IsUp;
+ std::vector<TString> Addresses;
+};
+
+struct IPeerDiscovery
+ : public TRefCounted
+{
+ virtual TFuture<TPeerDiscoveryResponse> Discover(
+ IChannelPtr channel,
+ TDuration timeout,
+ TDuration replyDelay,
+ const std::string& serviceName) = 0;
+};
+
+DEFINE_REFCOUNTED_TYPE(IPeerDiscovery)
+
+////////////////////////////////////////////////////////////////////////////////
+
+IPeerDiscoveryPtr CreateDefaultPeerDiscovery(TDiscoverRequestHook hook);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NRpc
diff --git a/yt/yt/core/rpc/public.h b/yt/yt/core/rpc/public.h
index e5fca23b2d..af88870ad9 100644
--- a/yt/yt/core/rpc/public.h
+++ b/yt/yt/core/rpc/public.h
@@ -78,6 +78,7 @@ DECLARE_REFCOUNTED_CLASS(TAttachmentsInputStream)
DECLARE_REFCOUNTED_CLASS(TAttachmentsOutputStream)
DECLARE_REFCOUNTED_STRUCT(IViablePeerRegistry)
+DECLARE_REFCOUNTED_STRUCT(IPeerDiscovery)
DECLARE_REFCOUNTED_CLASS(TDynamicChannelPool)
template <
diff --git a/yt/yt/core/ya.make b/yt/yt/core/ya.make
index d114db5cf8..0dc4666a01 100644
--- a/yt/yt/core/ya.make
+++ b/yt/yt/core/ya.make
@@ -188,6 +188,7 @@ SRCS(
rpc/message.cpp
rpc/message_format.cpp
rpc/null_channel.cpp
+ rpc/peer_discovery.cpp
rpc/per_user_request_queue_provider.cpp
rpc/protocol_version.cpp
rpc/public.cpp