diff options
author | hor911 <hor911@yandex-team.ru> | 2022-03-19 02:21:07 +0300 |
---|---|---|
committer | hor911 <hor911@yandex-team.ru> | 2022-03-19 02:21:07 +0300 |
commit | 91c4444d50a2eb5df5072ef181a6f2bd5471b77b (patch) | |
tree | f96420d885f5d5ba558a5ab2d7e82cb2f65aabb4 | |
parent | f65d25f7eead90c91e7f37ef07fbbf2a274e3d2b (diff) | |
download | ydb-91c4444d50a2eb5df5072ef181a6f2bd5471b77b.tar.gz |
Local Discovery Service
ref:a48c37a6b5935e8713b1568472b0dff75deb5ddd
-rw-r--r-- | CMakeLists.darwin.txt | 1 | ||||
-rw-r--r-- | CMakeLists.linux.txt | 1 | ||||
-rw-r--r-- | ydb/core/driver_lib/run/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/core/driver_lib/run/run.cpp | 13 | ||||
-rw-r--r-- | ydb/services/local_discovery/CMakeLists.txt | 22 | ||||
-rw-r--r-- | ydb/services/local_discovery/grpc_func_call.h | 58 | ||||
-rw-r--r-- | ydb/services/local_discovery/grpc_service.cpp | 130 | ||||
-rw-r--r-- | ydb/services/local_discovery/grpc_service.h | 45 | ||||
-rw-r--r-- | ydb/tests/library/harness/kikimr_port_allocator.py | 11 |
9 files changed, 281 insertions, 1 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt index 730d1f5090..19fb08dad6 100644 --- a/CMakeLists.darwin.txt +++ b/CMakeLists.darwin.txt @@ -813,6 +813,7 @@ add_subdirectory(ydb/services/ydb) add_subdirectory(ydb/core/client) add_subdirectory(ydb/services/discovery) add_subdirectory(ydb/services/kesus) +add_subdirectory(ydb/services/local_discovery) add_subdirectory(ydb/services/monitoring) add_subdirectory(ydb/services/persqueue_cluster_discovery) add_subdirectory(ydb/services/persqueue_cluster_discovery/cluster_ordering) diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt index 43e757b33c..c428d841d2 100644 --- a/CMakeLists.linux.txt +++ b/CMakeLists.linux.txt @@ -893,6 +893,7 @@ add_subdirectory(ydb/services/ydb) add_subdirectory(ydb/core/client) add_subdirectory(ydb/services/discovery) add_subdirectory(ydb/services/kesus) +add_subdirectory(ydb/services/local_discovery) add_subdirectory(ydb/services/monitoring) add_subdirectory(ydb/services/persqueue_cluster_discovery) add_subdirectory(ydb/services/persqueue_cluster_discovery/cluster_ordering) diff --git a/ydb/core/driver_lib/run/CMakeLists.txt b/ydb/core/driver_lib/run/CMakeLists.txt index 237264020c..34bcb6b887 100644 --- a/ydb/core/driver_lib/run/CMakeLists.txt +++ b/ydb/core/driver_lib/run/CMakeLists.txt @@ -114,6 +114,7 @@ target_link_libraries(run PUBLIC ydb-services-datastreams ydb-services-discovery ydb-services-kesus + ydb-services-local_discovery ydb-services-monitoring ydb-services-persqueue_cluster_discovery ydb-services-persqueue_v1 diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index dc98c2b81c..00e2fd2a64 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -99,6 +99,7 @@ #include <ydb/services/persqueue_v1/persqueue.h> #include <ydb/services/rate_limiter/grpc_service.h> #include <ydb/services/discovery/grpc_service.h> +#include <ydb/services/local_discovery/grpc_service.h> #include <ydb/services/yq/grpc_service.h> #include <ydb/core/yq/libs/init/init.h> @@ -503,6 +504,8 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { names["monitoring"] = &hasMonitoring; bool hasDiscovery = services.empty(); names["discovery"] = &hasDiscovery; + bool hasLocalDiscovery = false; + names["local_discovery"] = &hasLocalDiscovery; bool hasTableService = services.empty(); names["table_service"] = &hasTableService; bool hasSchemeService = false; @@ -537,6 +540,8 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { names["datastreams"] = &hasDataStreams; bool hasYandexQuery = false; names["yq"] = &hasYandexQuery; + bool hasYandexQueryPrivate = false; + names["yq_private"] = &hasYandexQueryPrivate; bool hasLogStore = false; names["logstore"] = &hasLogStore; bool hasAuth = services.empty(); @@ -699,6 +704,10 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { server.AddService(new NGRpcService::TGRpcDiscoveryService(ActorSystem.Get(), Counters, grpcRequestProxyId)); } + if (hasLocalDiscovery) { + server.AddService(new NGRpcService::TGRpcLocalDiscoveryService(grpcConfig, ActorSystem.Get(), Counters, grpcRequestProxyId)); + } + if (hasRateLimiter) { server.AddService(new NQuoter::TRateLimiterGRpcService(ActorSystem.Get(), Counters, grpcRequestProxyId)); } @@ -717,6 +726,9 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { if (hasYandexQuery) { server.AddService(new NGRpcService::TGRpcYandexQueryService(ActorSystem.Get(), Counters, grpcRequestProxyId)); + // TODO: REMOVE next line after migration to "yq_private" + server.AddService(new NGRpcService::TGRpcYqPrivateTaskService(ActorSystem.Get(), Counters, grpcRequestProxyId)); + } /* REMOVE */ else /* THIS else as well and separate ifs */ if (hasYandexQueryPrivate) { server.AddService(new NGRpcService::TGRpcYqPrivateTaskService(ActorSystem.Get(), Counters, grpcRequestProxyId)); } @@ -793,7 +805,6 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { if (ex.GetPort()) { NGrpc::TServerOptions xopts = opts; xopts.SetPort(ex.GetPort()); - if (ex.GetHost()) xopts.SetHost(ex.GetHost()); diff --git a/ydb/services/local_discovery/CMakeLists.txt b/ydb/services/local_discovery/CMakeLists.txt new file mode 100644 index 0000000000..439926c1d3 --- /dev/null +++ b/ydb/services/local_discovery/CMakeLists.txt @@ -0,0 +1,22 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(ydb-services-local_discovery) +target_link_libraries(ydb-services-local_discovery PUBLIC + contrib-libs-cxxsupp + yutil + cpp-grpc-server + ydb-core-grpc_services + ydb-core-mind + api-grpc + public-lib-operation_id +) +target_sources(ydb-services-local_discovery PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/local_discovery/grpc_service.cpp +) diff --git a/ydb/services/local_discovery/grpc_func_call.h b/ydb/services/local_discovery/grpc_func_call.h new file mode 100644 index 0000000000..905e76c053 --- /dev/null +++ b/ydb/services/local_discovery/grpc_func_call.h @@ -0,0 +1,58 @@ +#include <ydb/core/grpc_services/base/base.h> + +#include <util/system/hostname.h> + +namespace NKikimr { +namespace NGRpcService { + +using TFuncCallback = std::function<void(std::unique_ptr<IRequestOpCtx>, const IFacilityProvider&)>; + +template <typename TReq, typename TResp> +class TGrpcRequestFunctionCall + : public std::conditional_t<TProtoHasValidate<TReq>::Value, + TGRpcRequestValidationWrapperImpl< + TRpcServices::EvGrpcRuntimeRequest, TReq, TResp, true, TGrpcRequestFunctionCall<TReq, TResp>>, + TGRpcRequestWrapperImpl< + TRpcServices::EvGrpcRuntimeRequest, TReq, TResp, true, TGrpcRequestFunctionCall<TReq, TResp>>> + { +public: + static constexpr bool IsOp = true; + static IActor* CreateRpcActor(IRequestOpCtx* msg); + using TBase = std::conditional_t<TProtoHasValidate<TReq>::Value, + TGRpcRequestValidationWrapperImpl< + TRpcServices::EvGrpcRuntimeRequest, TReq, TResp, true, TGrpcRequestFunctionCall<TReq, TResp>>, + TGRpcRequestWrapperImpl< + TRpcServices::EvGrpcRuntimeRequest, TReq, TResp, true, TGrpcRequestFunctionCall<TReq, TResp>>>; + + TGrpcRequestFunctionCall(NGrpc::IRequestContextBase* ctx, + TFuncCallback cb, TRequestAuxSettings auxSettings = {}) + : TBase(ctx) + , PassMethod(cb) + , AuxSettings(std::move(auxSettings)) + { } + + void Pass(const IFacilityProvider& facility) override { + PassMethod(std::move(std::unique_ptr<IRequestOpCtx>(this)), facility); + } + + TRateLimiterMode GetRlMode() const override { + return AuxSettings.RlMode; + } + + bool TryCustomAttributeProcess(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData, + ICheckerIface* iface) override + { + if (!AuxSettings.CustomAttributeProcessor) { + return false; + } else { + AuxSettings.CustomAttributeProcessor(schemeData, iface); + return true; + } + } +private: + TFuncCallback PassMethod; + const TRequestAuxSettings AuxSettings; +}; + +} // namespace NGRpcService +} // namespace NKikimr diff --git a/ydb/services/local_discovery/grpc_service.cpp b/ydb/services/local_discovery/grpc_service.cpp new file mode 100644 index 0000000000..9bb1257547 --- /dev/null +++ b/ydb/services/local_discovery/grpc_service.cpp @@ -0,0 +1,130 @@ +#include "grpc_func_call.h" +#include "grpc_service.h" + +#include <ydb/core/grpc_services/service_discovery.h> +#include <ydb/core/grpc_services/base/base.h> +#include <ydb/core/grpc_services/grpc_helper.h> +#include <ydb/core/grpc_services/grpc_request_proxy.h> +#include <ydb/core/grpc_services/rpc_calls.h> + +#include <util/system/hostname.h> + +namespace { + +void FillEnpointInfo(const TString& host, ui32 port, const TString& publicHost, ui32 publicPort, bool ssl, Ydb::Discovery::EndpointInfo& info) { + auto effectivePublicHost = publicHost ? publicHost : host; + auto effectivePublicPort = publicPort ? publicPort : port; + info.set_address(effectivePublicHost); + info.set_port(effectivePublicPort); + info.set_ssl(ssl); +} + +TString InferPublicHostFromServerHost(const TString& serverHost) { + return serverHost && serverHost != "[::]" ? serverHost : FQDNHostName(); +} + +void AddEndpointsForGrpcConfig(const NKikimrConfig::TGRpcConfig& grpcConfig, Ydb::Discovery::ListEndpointsResult& result) { + const TString& address = InferPublicHostFromServerHost(grpcConfig.GetHost()); + if (const ui32 port = grpcConfig.GetPort()) { + FillEnpointInfo(address, port, grpcConfig.GetPublicHost(), grpcConfig.GetPublicPort(), false, *result.add_endpoints()); + } + + if (const ui32 sslPort = grpcConfig.GetSslPort()) { + FillEnpointInfo(address, sslPort, grpcConfig.GetPublicHost(), grpcConfig.GetPublicSslPort(), true, *result.add_endpoints()); + } +} + +} + +namespace NKikimr { +namespace NGRpcService { + +static TString GetSdkBuildInfo(NGrpc::IRequestContextBase* reqCtx) { + const auto& res = reqCtx->GetPeerMetaValues(NYdb::YDB_SDK_BUILD_INFO_HEADER); + if (res.empty()) { + return {}; + } + return TString{res[0]}; +} + +TGRpcLocalDiscoveryService::TGRpcLocalDiscoveryService(const NKikimrConfig::TGRpcConfig& grpcConfig, + NActors::TActorSystem *system, + TIntrusivePtr<NMonitoring::TDynamicCounters> counters, + NActors::TActorId id) + : GrpcConfig(grpcConfig) + , ActorSystem_(system) + , Counters_(counters) + , GRpcRequestProxyId_(id) +{ +} + +void TGRpcLocalDiscoveryService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) { + CQ_ = cq; + SetupIncomingRequests(std::move(logger)); +} + +void TGRpcLocalDiscoveryService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter *limiter) { + Limiter_ = limiter; +} + +bool TGRpcLocalDiscoveryService::IncRequest() { + return Limiter_->Inc(); +} + +void TGRpcLocalDiscoveryService::DecRequest() { + Limiter_->Dec(); + Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0); +} + +void TGRpcLocalDiscoveryService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { + auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); + using namespace Ydb; +#ifdef ADD_REQUEST +#error macro already defined +#endif + +#define ADD_REQUEST(NAME, CB) \ + MakeIntrusive<TGRpcRequest<Discovery::NAME##Request, Discovery::NAME##Response, TGRpcLocalDiscoveryService>> \ + (this, &Service_, CQ_, \ + [this](NGrpc::IRequestContextBase *ctx) { \ + NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer(), GetSdkBuildInfo(ctx)); \ + ActorSystem_->Send(GRpcRequestProxyId_, \ + new TGrpcRequestOperationCall<Discovery::NAME##Request, Discovery::NAME##Response> \ + (ctx, &CB, TRequestAuxSettings{TRateLimiterMode::Rps, nullptr})); \ + }, &Ydb::Discovery::V1::DiscoveryService::AsyncService::Request ## NAME, \ + #NAME, logger, getCounterBlock("discovery", #NAME))->Run(); + + ADD_REQUEST(WhoAmI, DoWhoAmIRequest) +#undef ADD_REQUEST + +using namespace std::placeholders; + +#ifdef ADD_METHOD +#error macro already defined +#endif + +#define ADD_METHOD(NAME, METHOD) \ + MakeIntrusive<TGRpcRequest<Discovery::NAME##Request, Discovery::NAME##Response, TGRpcLocalDiscoveryService>> \ + (this, &Service_, CQ_, \ + [this](NGrpc::IRequestContextBase *ctx) { \ + NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer(), GetSdkBuildInfo(ctx)); \ + TFuncCallback cb = std::bind(&TGRpcLocalDiscoveryService::METHOD, this, _1, _2); \ + ActorSystem_->Send(GRpcRequestProxyId_, \ + new TGrpcRequestFunctionCall<Discovery::NAME##Request, Discovery::NAME##Response> \ + (ctx, cb, TRequestAuxSettings{TRateLimiterMode::Rps, nullptr})); \ + }, &Ydb::Discovery::V1::DiscoveryService::AsyncService::Request ## NAME, \ + #NAME, logger, getCounterBlock("discovery", #NAME))->Run(); + + ADD_METHOD(ListEndpoints, DoListEndpointsRequest) +#undef ADD_METHOD + +} + +void TGRpcLocalDiscoveryService::DoListEndpointsRequest(std::unique_ptr<IRequestOpCtx> request, const IFacilityProvider&) { + auto *response = TEvListEndpointsRequest::AllocateResult<Ydb::Discovery::ListEndpointsResult>(request); + AddEndpointsForGrpcConfig(GrpcConfig, *response); + request->SendResult(*response, Ydb::StatusIds::SUCCESS); +} + +} // namespace NGRpcService +} // namespace NKikimr diff --git a/ydb/services/local_discovery/grpc_service.h b/ydb/services/local_discovery/grpc_service.h new file mode 100644 index 0000000000..7dd7add0bc --- /dev/null +++ b/ydb/services/local_discovery/grpc_service.h @@ -0,0 +1,45 @@ +#pragma once + +#include <library/cpp/actors/core/actorsystem.h> + +#include <ydb/core/protos/config.pb.h> +#include <ydb/public/api/grpc/ydb_discovery_v1.grpc.pb.h> + +#include <library/cpp/grpc/server/grpc_server.h> + +namespace NKikimr { +namespace NGRpcService { + +class IRequestOpCtx; +class IFacilityProvider; + +class TGRpcLocalDiscoveryService + : public NGrpc::TGrpcServiceBase<Ydb::Discovery::V1::DiscoveryService> +{ +public: + TGRpcLocalDiscoveryService(const NKikimrConfig::TGRpcConfig& grpcConfig, + NActors::TActorSystem* system, + TIntrusivePtr<NMonitoring::TDynamicCounters> counters, + NActors::TActorId id); + + void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override; + void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override; + + bool IncRequest(); + void DecRequest(); + +private: + void SetupIncomingRequests(NGrpc::TLoggerPtr logger); + void DoListEndpointsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& provider); + + const NKikimrConfig::TGRpcConfig& GrpcConfig; + NActors::TActorSystem* ActorSystem_; + grpc::ServerCompletionQueue* CQ_ = nullptr; + + TIntrusivePtr<NMonitoring::TDynamicCounters> Counters_; + NActors::TActorId GRpcRequestProxyId_; + NGrpc::TGlobalLimiter* Limiter_ = nullptr; +}; + +} // namespace NGRpcService +} // namespace NKikimr diff --git a/ydb/tests/library/harness/kikimr_port_allocator.py b/ydb/tests/library/harness/kikimr_port_allocator.py index a32e3708a8..d92b63d19a 100644 --- a/ydb/tests/library/harness/kikimr_port_allocator.py +++ b/ydb/tests/library/harness/kikimr_port_allocator.py @@ -34,6 +34,10 @@ class KikimrNodePortAllocatorInterface(object): def sqs_port(self): pass + @abc.abstractproperty + def ext_port(self): + pass + class KikimrPortAllocatorInterface(object): __metaclass__ = abc.ABCMeta @@ -74,6 +78,7 @@ class KikimrPortManagerNodePortAllocator(KikimrNodePortAllocatorInterface): self.__ic_port = None self.__sqs_port = None self.__grpc_ssl_port = None + self.__ext_port = None @property def mon_port(self): @@ -111,6 +116,12 @@ class KikimrPortManagerNodePortAllocator(KikimrNodePortAllocatorInterface): self.__sqs_port = self.__port_manager.get_port() return self.__sqs_port + @property + def ext_port(self): + if self.__ext_port is None: + self.__ext_port = self.__port_manager.get_port() + return self.__ext_port + class KikimrPortManagerPortAllocator(KikimrPortAllocatorInterface): def __init__(self, port_manager=None): |