diff options
author | grigminakov <grigminakov@yandex-team.com> | 2023-11-24 11:08:54 +0300 |
---|---|---|
committer | grigminakov <grigminakov@yandex-team.com> | 2023-11-24 11:32:05 +0300 |
commit | efe5b5f8d2da503eda4d172f6f2e85aac64ba6a6 (patch) | |
tree | c5b23bbb7b8d2e11ad572d8ab3dfeda2cac70cf7 | |
parent | 72bdc12af15fe8bf46563e1158ba786b0b5defad (diff) | |
download | ydb-efe5b5f8d2da503eda4d172f6f2e85aac64ba6a6.tar.gz |
YT-20546: Peer discovery encapsulation
-rw-r--r-- | yt/yt/client/api/rpc_proxy/connection_impl.cpp | 3 | ||||
-rw-r--r-- | yt/yt/core/CMakeLists.darwin-arm64.txt | 1 | ||||
-rw-r--r-- | yt/yt/core/CMakeLists.darwin-x86_64.txt | 1 | ||||
-rw-r--r-- | yt/yt/core/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | yt/yt/core/CMakeLists.linux-x86_64.txt | 1 | ||||
-rw-r--r-- | yt/yt/core/CMakeLists.windows-x86_64.txt | 1 | ||||
-rw-r--r-- | yt/yt/core/rpc/balancing_channel.cpp | 23 | ||||
-rw-r--r-- | yt/yt/core/rpc/balancing_channel.h | 5 | ||||
-rw-r--r-- | yt/yt/core/rpc/dynamic_channel_pool.cpp | 58 | ||||
-rw-r--r-- | yt/yt/core/rpc/dynamic_channel_pool.h | 3 | ||||
-rw-r--r-- | yt/yt/core/rpc/peer_discovery.cpp | 58 | ||||
-rw-r--r-- | yt/yt/core/rpc/peer_discovery.h | 33 | ||||
-rw-r--r-- | yt/yt/core/rpc/public.h | 1 | ||||
-rw-r--r-- | yt/yt/core/ya.make | 1 |
14 files changed, 135 insertions, 55 deletions
diff --git a/yt/yt/client/api/rpc_proxy/connection_impl.cpp b/yt/yt/client/api/rpc_proxy/connection_impl.cpp index 11a7347bae..fea7273ff6 100644 --- a/yt/yt/client/api/rpc_proxy/connection_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/connection_impl.cpp @@ -28,6 +28,7 @@ #include <yt/yt/core/rpc/caching_channel_factory.h> #include <yt/yt/core/rpc/dynamic_channel_pool.h> #include <yt/yt/core/rpc/dispatcher.h> +#include <yt/yt/core/rpc/peer_discovery.h> #include <yt/yt/core/ytree/fluent.h> @@ -243,7 +244,7 @@ TConnection::TConnection(TConnectionConfigPtr config, TConnectionOptions options MakeEndpointDescription(Config_, ConnectionId_), MakeEndpointAttributes(Config_, ConnectionId_), TApiServiceProxy::GetDescriptor().ServiceName, - TDiscoverRequestHook())) + CreateDefaultPeerDiscovery(TDiscoverRequestHook()))) { if (options.ConnectionInvoker) { ConnectionInvoker_ = options.ConnectionInvoker; diff --git a/yt/yt/core/CMakeLists.darwin-arm64.txt b/yt/yt/core/CMakeLists.darwin-arm64.txt index 39189652b2..21ed07c2d3 100644 --- a/yt/yt/core/CMakeLists.darwin-arm64.txt +++ b/yt/yt/core/CMakeLists.darwin-arm64.txt @@ -222,6 +222,7 @@ target_sources(yt-yt-core PRIVATE ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/message.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/message_format.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/null_channel.cpp + ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/peer_discovery.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/per_user_request_queue_provider.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/protocol_version.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/public.cpp diff --git a/yt/yt/core/CMakeLists.darwin-x86_64.txt b/yt/yt/core/CMakeLists.darwin-x86_64.txt index c683eefee9..7a6160415a 100644 --- a/yt/yt/core/CMakeLists.darwin-x86_64.txt +++ b/yt/yt/core/CMakeLists.darwin-x86_64.txt @@ -223,6 +223,7 @@ target_sources(yt-yt-core PRIVATE ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/message.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/message_format.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/null_channel.cpp + ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/peer_discovery.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/per_user_request_queue_provider.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/protocol_version.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/public.cpp diff --git a/yt/yt/core/CMakeLists.linux-aarch64.txt b/yt/yt/core/CMakeLists.linux-aarch64.txt index fc9f31ca0a..fb60bd26a5 100644 --- a/yt/yt/core/CMakeLists.linux-aarch64.txt +++ b/yt/yt/core/CMakeLists.linux-aarch64.txt @@ -223,6 +223,7 @@ target_sources(yt-yt-core PRIVATE ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/message.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/message_format.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/null_channel.cpp + ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/peer_discovery.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/per_user_request_queue_provider.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/protocol_version.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/public.cpp diff --git a/yt/yt/core/CMakeLists.linux-x86_64.txt b/yt/yt/core/CMakeLists.linux-x86_64.txt index 104037bd3c..02cb5341e7 100644 --- a/yt/yt/core/CMakeLists.linux-x86_64.txt +++ b/yt/yt/core/CMakeLists.linux-x86_64.txt @@ -224,6 +224,7 @@ target_sources(yt-yt-core PRIVATE ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/message.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/message_format.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/null_channel.cpp + ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/peer_discovery.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/per_user_request_queue_provider.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/protocol_version.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/public.cpp diff --git a/yt/yt/core/CMakeLists.windows-x86_64.txt b/yt/yt/core/CMakeLists.windows-x86_64.txt index 2e3be60648..dba46ac9ef 100644 --- a/yt/yt/core/CMakeLists.windows-x86_64.txt +++ b/yt/yt/core/CMakeLists.windows-x86_64.txt @@ -222,6 +222,7 @@ target_sources(yt-yt-core PRIVATE ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/message.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/message_format.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/null_channel.cpp + ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/peer_discovery.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/per_user_request_queue_provider.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/protocol_version.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/rpc/public.cpp diff --git a/yt/yt/core/rpc/balancing_channel.cpp b/yt/yt/core/rpc/balancing_channel.cpp index fcc9911377..2cc9595e89 100644 --- a/yt/yt/core/rpc/balancing_channel.cpp +++ b/yt/yt/core/rpc/balancing_channel.cpp @@ -45,7 +45,7 @@ public: TString endpointDescription, IAttributeDictionaryPtr endpointAttributes, std::string serviceName, - TDiscoverRequestHook discoverRequestHook) + IPeerDiscoveryPtr peerDiscovery) : Config_(std::move(config)) , EndpointDescription_(endpointDescription) , EndpointAttributes_(ConvertToAttributes(BuildYsonStringFluently() @@ -54,14 +54,13 @@ public: .Item("service").Value(serviceName) .EndMap())) , ServiceName_(std::move(serviceName)) - , DiscoverRequestHook_(std::move(discoverRequestHook)) , Pool_(New<TDynamicChannelPool>( Config_, std::move(channelFactory), EndpointDescription_, EndpointAttributes_, ServiceName_, - DiscoverRequestHook_)) + peerDiscovery)) { if (Config_->Addresses) { ConfigureFromAddresses(); @@ -100,7 +99,6 @@ private: const TString EndpointDescription_; const IAttributeDictionaryPtr EndpointAttributes_; const TString ServiceName_; - const TDiscoverRequestHook DiscoverRequestHook_; const TDynamicChannelPoolPtr Pool_; @@ -180,10 +178,9 @@ public: IChannelFactoryPtr channelFactory, TString endpointDescription, IAttributeDictionaryPtr endpointAttributes, - TDiscoverRequestHook discoverRequestHook) + IPeerDiscoveryPtr peerDiscovery) : Config_(std::move(config)) , ChannelFactory_(std::move(channelFactory)) - , DiscoverRequestHook_(std::move(discoverRequestHook)) , EndpointDescription_(Format("%v%v", endpointDescription, Config_->Addresses)) @@ -192,6 +189,7 @@ public: .Item("addresses").Value(Config_->Addresses) .Items(*endpointAttributes) .EndMap())) + , PeerDiscovery_(peerDiscovery) { } const TString& GetEndpointDescription() const override @@ -244,10 +242,10 @@ public: private: const TBalancingChannelConfigPtr Config_; const IChannelFactoryPtr ChannelFactory_; - const TDiscoverRequestHook DiscoverRequestHook_; const TString EndpointDescription_; const IAttributeDictionaryPtr EndpointAttributes_; + const IPeerDiscoveryPtr PeerDiscovery_; YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, SpinLock_); THashMap<std::string, TBalancingChannelSubproviderPtr> SubproviderMap_; @@ -276,7 +274,7 @@ private: EndpointDescription_, EndpointAttributes_, serviceName, - DiscoverRequestHook_); + PeerDiscovery_); EmplaceOrCrash(SubproviderMap_, serviceName, subprovider); return subprovider; } @@ -292,14 +290,15 @@ IChannelPtr CreateBalancingChannel( IChannelFactoryPtr channelFactory, TString endpointDescription, IAttributeDictionaryPtr endpointAttributes, - TDiscoverRequestHook discoverRequestHook) + IPeerDiscoveryPtr peerDiscovery) { auto channelProvider = CreateBalancingChannelProvider( std::move(config), std::move(channelFactory), std::move(endpointDescription), std::move(endpointAttributes), - std::move(discoverRequestHook)); + std::move(peerDiscovery)); + return CreateRoamingChannel(channelProvider); } @@ -308,7 +307,7 @@ IRoamingChannelProviderPtr CreateBalancingChannelProvider( IChannelFactoryPtr channelFactory, TString endpointDescription, IAttributeDictionaryPtr endpointAttributes, - TDiscoverRequestHook discoverRequestHook) + IPeerDiscoveryPtr peerDiscovery) { YT_VERIFY(config); YT_VERIFY(channelFactory); @@ -318,7 +317,7 @@ IRoamingChannelProviderPtr CreateBalancingChannelProvider( std::move(channelFactory), std::move(endpointDescription), std::move(endpointAttributes), - std::move(discoverRequestHook)); + std::move(peerDiscovery)); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/rpc/balancing_channel.h b/yt/yt/core/rpc/balancing_channel.h index 2241bcffb1..d8d709abfd 100644 --- a/yt/yt/core/rpc/balancing_channel.h +++ b/yt/yt/core/rpc/balancing_channel.h @@ -1,6 +1,7 @@ #pragma once #include "public.h" +#include "peer_discovery.h" #include <yt/yt/core/actions/callback.h> @@ -17,14 +18,14 @@ IChannelPtr CreateBalancingChannel( IChannelFactoryPtr channelFactory, TString endpointDescription, NYTree::IAttributeDictionaryPtr endpointAttributes, - TDiscoverRequestHook discoverRequestHook = {}); + IPeerDiscoveryPtr peerDiscovery = CreateDefaultPeerDiscovery(TDiscoverRequestHook{})); IRoamingChannelProviderPtr CreateBalancingChannelProvider( TBalancingChannelConfigPtr config, IChannelFactoryPtr channelFactory, TString endpointDescription, NYTree::IAttributeDictionaryPtr endpointAttributes, - TDiscoverRequestHook discoverRequestHook = {}); + IPeerDiscoveryPtr peerDiscovery = CreateDefaultPeerDiscovery(TDiscoverRequestHook{})); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/rpc/dynamic_channel_pool.cpp b/yt/yt/core/rpc/dynamic_channel_pool.cpp index 0afc245c7e..9c41efd163 100644 --- a/yt/yt/core/rpc/dynamic_channel_pool.cpp +++ b/yt/yt/core/rpc/dynamic_channel_pool.cpp @@ -46,7 +46,7 @@ public: TString endpointDescription, IAttributeDictionaryPtr endpointAttributes, std::string serviceName, - TDiscoverRequestHook discoverRequestHook) + IPeerDiscoveryPtr peerDiscovery) : Config_(std::move(config)) , ChannelFactory_(std::move(channelFactory)) , EndpointDescription_(std::move(endpointDescription)) @@ -56,7 +56,7 @@ public: .Item("service").Value(serviceName) .EndMap())) , ServiceName_(std::move(serviceName)) - , DiscoverRequestHook_(std::move(discoverRequestHook)) + , PeerDiscovery_(std::move(peerDiscovery)) , Logger(RpcClientLogger.WithTag( "ChannelId: %v, Endpoint: %v, Service: %v", TGuid::Create(), @@ -191,7 +191,7 @@ private: const TString EndpointDescription_; const IAttributeDictionaryPtr EndpointAttributes_; const std::string ServiceName_; - const TDiscoverRequestHook DiscoverRequestHook_; + IPeerDiscoveryPtr PeerDiscovery_; const NLogging::TLogger Logger; @@ -306,17 +306,15 @@ private: YT_LOG_DEBUG("Querying peer (Address: %v)", address); auto channel = owner->ChannelFactory_->CreateChannel(address); - auto proxy = owner->CreateGenericProxy(channel); - proxy.SetDefaultTimeout(owner->Config_->DiscoverTimeout); - - auto req = proxy.Discover(); - if (owner->DiscoverRequestHook_) { - owner->DiscoverRequestHook_.Run(req.Get()); - } + auto request = owner->PeerDiscovery_->Discover( + channel, + owner->Config_->DiscoverTimeout, + /*replyDelay*/ TDuration::Zero(), + owner->ServiceName_); // NB: Via prevents stack overflow due to QueryPeer -> OnResponse -> DoRun loop in // case when Invoke() is immediately set. - req->Invoke().Subscribe(BIND( + request.Subscribe(BIND( &TDiscoverySession::OnResponse, MakeStrong(this), address) @@ -325,7 +323,7 @@ private: void OnResponse( const TString& address, - const TGenericProxy::TErrorOrRspDiscoverPtr& rspOrError) + const TErrorOr<TPeerDiscoveryResponse> rspOrError) { auto owner = Owner_.Lock(); if (!owner) { @@ -337,11 +335,9 @@ private: YT_LOG_DEBUG_IF(authError, "Peer has reported authentication error on discovery (Address: %v)", address); if (rspOrError.IsOK() || authError) { - // const auto& rsp = rspOrError.Value(); - // bool up = rsp->up(); - // auto suggestedAddresses = FromProto<std::vector<TString>>(rsp->suggested_addresses()); - bool up = authError ? true : rspOrError.Value()->up(); - auto suggestedAddresses = authError ? std::vector<TString>() : FromProto<std::vector<TString>>(rspOrError.Value()->suggested_addresses()); + NProto::TRspDiscover resp; + auto suggestedAddresses = FromProto<std::vector<TString>>(resp.suggested_addresses()); + bool up = authError ? true : rspOrError.Value().IsUp; if (!suggestedAddresses.empty()) { YT_LOG_DEBUG("Peers suggested (SuggestorAddress: %v, SuggestedAddresses: %v)", @@ -537,31 +533,22 @@ private: auto peerPollingPeriod = owner->Config_->PeerPollingPeriod + RandomDuration(owner->Config_->PeerPollingPeriodSplay); auto channel = owner->ChannelFactory_->CreateChannel(PeerAddress_); - auto proxy = owner->CreateGenericProxy(channel); - auto requestTimeout = peerPollingPeriod + owner->Config_->PeerPollingRequestTimeout; - auto req = proxy.Discover(); - req->set_reply_delay(peerPollingPeriod.GetValue()); - req->SetTimeout(requestTimeout); - if (owner->DiscoverRequestHook_) { - owner->DiscoverRequestHook_.Run(req.Get()); - } - + auto req = owner->PeerDiscovery_->Discover(channel, requestTimeout, /*replyDelay*/ peerPollingPeriod, owner->ServiceName_); YT_LOG_DEBUG("Polling peer (PollingPeriod: %v, RequestTimeout: %v)", peerPollingPeriod, requestTimeout); owner.Reset(); - req->Invoke() - .Subscribe(BIND([=, this, this_ = MakeStrong(this)] (const TGenericProxy::TErrorOrRspDiscoverPtr& rspOrError) { + req.Subscribe(BIND([=, this, this_ = MakeStrong(this)] (const TErrorOr<TPeerDiscoveryResponse>& rspOrError) { auto owner = Owner_.Lock(); if (!owner) { return; } if (rspOrError.IsOK()) { - auto isUp = rspOrError.Value()->up(); + auto isUp = rspOrError.Value().IsUp; if (isUp) { YT_LOG_DEBUG("Peer is up"); owner->UnbanPeer(PeerAddress_); @@ -857,13 +844,6 @@ private: evicted); } - TGenericProxy CreateGenericProxy(IChannelPtr peerChannel) - { - auto serviceDescriptor = TServiceDescriptor(ServiceName_) - .SetProtocolVersion(GenericProtocolVersion); - return TGenericProxy(std::move(peerChannel), serviceDescriptor); - } - IChannelPtr CreateChannel(const TString& address) { return CreateFailureDetectingChannel( @@ -879,16 +859,16 @@ TDynamicChannelPool::TDynamicChannelPool( TDynamicChannelPoolConfigPtr config, IChannelFactoryPtr channelFactory, TString endpointDescription, - IAttributeDictionaryPtr endpointAttributes, + NYTree::IAttributeDictionaryPtr endpointAttributes, std::string serviceName, - TDiscoverRequestHook discoverRequestHook) + IPeerDiscoveryPtr peerDiscovery) : Impl_(New<TImpl>( std::move(config), std::move(channelFactory), std::move(endpointDescription), std::move(endpointAttributes), std::move(serviceName), - std::move(discoverRequestHook))) + std::move(peerDiscovery))) { } TDynamicChannelPool::~TDynamicChannelPool() = default; diff --git a/yt/yt/core/rpc/dynamic_channel_pool.h b/yt/yt/core/rpc/dynamic_channel_pool.h index 94c67fa5d8..76013e753a 100644 --- a/yt/yt/core/rpc/dynamic_channel_pool.h +++ b/yt/yt/core/rpc/dynamic_channel_pool.h @@ -1,6 +1,7 @@ #pragma once #include "public.h" +#include "peer_discovery.h" #include "hedging_channel.h" #include <yt/yt/core/actions/callback.h> @@ -30,7 +31,7 @@ public: TString endpointDescription, NYTree::IAttributeDictionaryPtr endpointAttributes, std::string serviceName, - TDiscoverRequestHook discoverRequestHook = {}); + IPeerDiscoveryPtr peerDiscovery); ~TDynamicChannelPool(); TFuture<IChannelPtr> GetRandomChannel(); diff --git a/yt/yt/core/rpc/peer_discovery.cpp b/yt/yt/core/rpc/peer_discovery.cpp new file mode 100644 index 0000000000..fbc218e96d --- /dev/null +++ b/yt/yt/core/rpc/peer_discovery.cpp @@ -0,0 +1,58 @@ +#include "peer_discovery.h" + +#include "client.h" + +#include <yt/yt/core/misc/protobuf_helpers.h> + +namespace NYT::NRpc { + +//////////////////////////////////////////////////////////////////////////////// + +class TDefaultPeerDiscovery + : public IPeerDiscovery +{ +public: + TDefaultPeerDiscovery(TDiscoverRequestHook hook) + : Hook_(std::move(hook)) + { } + + TFuture<TPeerDiscoveryResponse> Discover( + IChannelPtr channel, + TDuration timeout, + TDuration replyDelay, + const std::string& serviceName) override + { + auto serviceDescriptor = TServiceDescriptor(serviceName) + .SetProtocolVersion(GenericProtocolVersion); + TGenericProxy proxy(std::move(channel), serviceDescriptor); + auto req = proxy.Discover(); + if (Hook_) { + Hook_(req.Get()); + } + req->SetTimeout(timeout); + req->set_reply_delay(replyDelay.GetValue()); + return req->Invoke().Apply(BIND(&TDefaultPeerDiscovery::ConvertResponse)); + } + +private: + TDiscoverRequestHook Hook_; + + static TPeerDiscoveryResponse ConvertResponse(const TIntrusivePtr<TTypedClientResponse<NProto::TRspDiscover>>& rsp) + { + TPeerDiscoveryResponse response; + response.Addresses = NYT::FromProto<std::vector<TString>>(rsp->suggested_addresses()); + response.IsUp = rsp->up(); + return response; + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +IPeerDiscoveryPtr CreateDefaultPeerDiscovery(TDiscoverRequestHook hook) +{ + return New<TDefaultPeerDiscovery>(std::move(hook)); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NRpc diff --git a/yt/yt/core/rpc/peer_discovery.h b/yt/yt/core/rpc/peer_discovery.h new file mode 100644 index 0000000000..e1b0f20101 --- /dev/null +++ b/yt/yt/core/rpc/peer_discovery.h @@ -0,0 +1,33 @@ +#pragma once + +#include "public.h" + +namespace NYT::NRpc { + +//////////////////////////////////////////////////////////////////////////////// + +struct TPeerDiscoveryResponse +{ + bool IsUp; + std::vector<TString> Addresses; +}; + +struct IPeerDiscovery + : public TRefCounted +{ + virtual TFuture<TPeerDiscoveryResponse> Discover( + IChannelPtr channel, + TDuration timeout, + TDuration replyDelay, + const std::string& serviceName) = 0; +}; + +DEFINE_REFCOUNTED_TYPE(IPeerDiscovery) + +//////////////////////////////////////////////////////////////////////////////// + +IPeerDiscoveryPtr CreateDefaultPeerDiscovery(TDiscoverRequestHook hook); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NRpc diff --git a/yt/yt/core/rpc/public.h b/yt/yt/core/rpc/public.h index e5fca23b2d..af88870ad9 100644 --- a/yt/yt/core/rpc/public.h +++ b/yt/yt/core/rpc/public.h @@ -78,6 +78,7 @@ DECLARE_REFCOUNTED_CLASS(TAttachmentsInputStream) DECLARE_REFCOUNTED_CLASS(TAttachmentsOutputStream) DECLARE_REFCOUNTED_STRUCT(IViablePeerRegistry) +DECLARE_REFCOUNTED_STRUCT(IPeerDiscovery) DECLARE_REFCOUNTED_CLASS(TDynamicChannelPool) template < diff --git a/yt/yt/core/ya.make b/yt/yt/core/ya.make index d114db5cf8..0dc4666a01 100644 --- a/yt/yt/core/ya.make +++ b/yt/yt/core/ya.make @@ -188,6 +188,7 @@ SRCS( rpc/message.cpp rpc/message_format.cpp rpc/null_channel.cpp + rpc/peer_discovery.cpp rpc/per_user_request_queue_provider.cpp rpc/protocol_version.cpp rpc/public.cpp |