aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@yandex-team.ru>2022-03-19 02:21:07 +0300
committerhor911 <hor911@yandex-team.ru>2022-03-19 02:21:07 +0300
commit91c4444d50a2eb5df5072ef181a6f2bd5471b77b (patch)
treef96420d885f5d5ba558a5ab2d7e82cb2f65aabb4
parentf65d25f7eead90c91e7f37ef07fbbf2a274e3d2b (diff)
downloadydb-91c4444d50a2eb5df5072ef181a6f2bd5471b77b.tar.gz
Local Discovery Service
ref:a48c37a6b5935e8713b1568472b0dff75deb5ddd
-rw-r--r--CMakeLists.darwin.txt1
-rw-r--r--CMakeLists.linux.txt1
-rw-r--r--ydb/core/driver_lib/run/CMakeLists.txt1
-rw-r--r--ydb/core/driver_lib/run/run.cpp13
-rw-r--r--ydb/services/local_discovery/CMakeLists.txt22
-rw-r--r--ydb/services/local_discovery/grpc_func_call.h58
-rw-r--r--ydb/services/local_discovery/grpc_service.cpp130
-rw-r--r--ydb/services/local_discovery/grpc_service.h45
-rw-r--r--ydb/tests/library/harness/kikimr_port_allocator.py11
9 files changed, 281 insertions, 1 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt
index 730d1f5090..19fb08dad6 100644
--- a/CMakeLists.darwin.txt
+++ b/CMakeLists.darwin.txt
@@ -813,6 +813,7 @@ add_subdirectory(ydb/services/ydb)
add_subdirectory(ydb/core/client)
add_subdirectory(ydb/services/discovery)
add_subdirectory(ydb/services/kesus)
+add_subdirectory(ydb/services/local_discovery)
add_subdirectory(ydb/services/monitoring)
add_subdirectory(ydb/services/persqueue_cluster_discovery)
add_subdirectory(ydb/services/persqueue_cluster_discovery/cluster_ordering)
diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt
index 43e757b33c..c428d841d2 100644
--- a/CMakeLists.linux.txt
+++ b/CMakeLists.linux.txt
@@ -893,6 +893,7 @@ add_subdirectory(ydb/services/ydb)
add_subdirectory(ydb/core/client)
add_subdirectory(ydb/services/discovery)
add_subdirectory(ydb/services/kesus)
+add_subdirectory(ydb/services/local_discovery)
add_subdirectory(ydb/services/monitoring)
add_subdirectory(ydb/services/persqueue_cluster_discovery)
add_subdirectory(ydb/services/persqueue_cluster_discovery/cluster_ordering)
diff --git a/ydb/core/driver_lib/run/CMakeLists.txt b/ydb/core/driver_lib/run/CMakeLists.txt
index 237264020c..34bcb6b887 100644
--- a/ydb/core/driver_lib/run/CMakeLists.txt
+++ b/ydb/core/driver_lib/run/CMakeLists.txt
@@ -114,6 +114,7 @@ target_link_libraries(run PUBLIC
ydb-services-datastreams
ydb-services-discovery
ydb-services-kesus
+ ydb-services-local_discovery
ydb-services-monitoring
ydb-services-persqueue_cluster_discovery
ydb-services-persqueue_v1
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp
index dc98c2b81c..00e2fd2a64 100644
--- a/ydb/core/driver_lib/run/run.cpp
+++ b/ydb/core/driver_lib/run/run.cpp
@@ -99,6 +99,7 @@
#include <ydb/services/persqueue_v1/persqueue.h>
#include <ydb/services/rate_limiter/grpc_service.h>
#include <ydb/services/discovery/grpc_service.h>
+#include <ydb/services/local_discovery/grpc_service.h>
#include <ydb/services/yq/grpc_service.h>
#include <ydb/core/yq/libs/init/init.h>
@@ -503,6 +504,8 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
names["monitoring"] = &hasMonitoring;
bool hasDiscovery = services.empty();
names["discovery"] = &hasDiscovery;
+ bool hasLocalDiscovery = false;
+ names["local_discovery"] = &hasLocalDiscovery;
bool hasTableService = services.empty();
names["table_service"] = &hasTableService;
bool hasSchemeService = false;
@@ -537,6 +540,8 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
names["datastreams"] = &hasDataStreams;
bool hasYandexQuery = false;
names["yq"] = &hasYandexQuery;
+ bool hasYandexQueryPrivate = false;
+ names["yq_private"] = &hasYandexQueryPrivate;
bool hasLogStore = false;
names["logstore"] = &hasLogStore;
bool hasAuth = services.empty();
@@ -699,6 +704,10 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
server.AddService(new NGRpcService::TGRpcDiscoveryService(ActorSystem.Get(), Counters, grpcRequestProxyId));
}
+ if (hasLocalDiscovery) {
+ server.AddService(new NGRpcService::TGRpcLocalDiscoveryService(grpcConfig, ActorSystem.Get(), Counters, grpcRequestProxyId));
+ }
+
if (hasRateLimiter) {
server.AddService(new NQuoter::TRateLimiterGRpcService(ActorSystem.Get(), Counters, grpcRequestProxyId));
}
@@ -717,6 +726,9 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
if (hasYandexQuery) {
server.AddService(new NGRpcService::TGRpcYandexQueryService(ActorSystem.Get(), Counters, grpcRequestProxyId));
+ // TODO: REMOVE next line after migration to "yq_private"
+ server.AddService(new NGRpcService::TGRpcYqPrivateTaskService(ActorSystem.Get(), Counters, grpcRequestProxyId));
+ } /* REMOVE */ else /* THIS else as well and separate ifs */ if (hasYandexQueryPrivate) {
server.AddService(new NGRpcService::TGRpcYqPrivateTaskService(ActorSystem.Get(), Counters, grpcRequestProxyId));
}
@@ -793,7 +805,6 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
if (ex.GetPort()) {
NGrpc::TServerOptions xopts = opts;
xopts.SetPort(ex.GetPort());
-
if (ex.GetHost())
xopts.SetHost(ex.GetHost());
diff --git a/ydb/services/local_discovery/CMakeLists.txt b/ydb/services/local_discovery/CMakeLists.txt
new file mode 100644
index 0000000000..439926c1d3
--- /dev/null
+++ b/ydb/services/local_discovery/CMakeLists.txt
@@ -0,0 +1,22 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(ydb-services-local_discovery)
+target_link_libraries(ydb-services-local_discovery PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-grpc-server
+ ydb-core-grpc_services
+ ydb-core-mind
+ api-grpc
+ public-lib-operation_id
+)
+target_sources(ydb-services-local_discovery PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/local_discovery/grpc_service.cpp
+)
diff --git a/ydb/services/local_discovery/grpc_func_call.h b/ydb/services/local_discovery/grpc_func_call.h
new file mode 100644
index 0000000000..905e76c053
--- /dev/null
+++ b/ydb/services/local_discovery/grpc_func_call.h
@@ -0,0 +1,58 @@
+#include <ydb/core/grpc_services/base/base.h>
+
+#include <util/system/hostname.h>
+
+namespace NKikimr {
+namespace NGRpcService {
+
+using TFuncCallback = std::function<void(std::unique_ptr<IRequestOpCtx>, const IFacilityProvider&)>;
+
+template <typename TReq, typename TResp>
+class TGrpcRequestFunctionCall
+ : public std::conditional_t<TProtoHasValidate<TReq>::Value,
+ TGRpcRequestValidationWrapperImpl<
+ TRpcServices::EvGrpcRuntimeRequest, TReq, TResp, true, TGrpcRequestFunctionCall<TReq, TResp>>,
+ TGRpcRequestWrapperImpl<
+ TRpcServices::EvGrpcRuntimeRequest, TReq, TResp, true, TGrpcRequestFunctionCall<TReq, TResp>>>
+ {
+public:
+ static constexpr bool IsOp = true;
+ static IActor* CreateRpcActor(IRequestOpCtx* msg);
+ using TBase = std::conditional_t<TProtoHasValidate<TReq>::Value,
+ TGRpcRequestValidationWrapperImpl<
+ TRpcServices::EvGrpcRuntimeRequest, TReq, TResp, true, TGrpcRequestFunctionCall<TReq, TResp>>,
+ TGRpcRequestWrapperImpl<
+ TRpcServices::EvGrpcRuntimeRequest, TReq, TResp, true, TGrpcRequestFunctionCall<TReq, TResp>>>;
+
+ TGrpcRequestFunctionCall(NGrpc::IRequestContextBase* ctx,
+ TFuncCallback cb, TRequestAuxSettings auxSettings = {})
+ : TBase(ctx)
+ , PassMethod(cb)
+ , AuxSettings(std::move(auxSettings))
+ { }
+
+ void Pass(const IFacilityProvider& facility) override {
+ PassMethod(std::move(std::unique_ptr<IRequestOpCtx>(this)), facility);
+ }
+
+ TRateLimiterMode GetRlMode() const override {
+ return AuxSettings.RlMode;
+ }
+
+ bool TryCustomAttributeProcess(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData,
+ ICheckerIface* iface) override
+ {
+ if (!AuxSettings.CustomAttributeProcessor) {
+ return false;
+ } else {
+ AuxSettings.CustomAttributeProcessor(schemeData, iface);
+ return true;
+ }
+ }
+private:
+ TFuncCallback PassMethod;
+ const TRequestAuxSettings AuxSettings;
+};
+
+} // namespace NGRpcService
+} // namespace NKikimr
diff --git a/ydb/services/local_discovery/grpc_service.cpp b/ydb/services/local_discovery/grpc_service.cpp
new file mode 100644
index 0000000000..9bb1257547
--- /dev/null
+++ b/ydb/services/local_discovery/grpc_service.cpp
@@ -0,0 +1,130 @@
+#include "grpc_func_call.h"
+#include "grpc_service.h"
+
+#include <ydb/core/grpc_services/service_discovery.h>
+#include <ydb/core/grpc_services/base/base.h>
+#include <ydb/core/grpc_services/grpc_helper.h>
+#include <ydb/core/grpc_services/grpc_request_proxy.h>
+#include <ydb/core/grpc_services/rpc_calls.h>
+
+#include <util/system/hostname.h>
+
+namespace {
+
+void FillEnpointInfo(const TString& host, ui32 port, const TString& publicHost, ui32 publicPort, bool ssl, Ydb::Discovery::EndpointInfo& info) {
+ auto effectivePublicHost = publicHost ? publicHost : host;
+ auto effectivePublicPort = publicPort ? publicPort : port;
+ info.set_address(effectivePublicHost);
+ info.set_port(effectivePublicPort);
+ info.set_ssl(ssl);
+}
+
+TString InferPublicHostFromServerHost(const TString& serverHost) {
+ return serverHost && serverHost != "[::]" ? serverHost : FQDNHostName();
+}
+
+void AddEndpointsForGrpcConfig(const NKikimrConfig::TGRpcConfig& grpcConfig, Ydb::Discovery::ListEndpointsResult& result) {
+ const TString& address = InferPublicHostFromServerHost(grpcConfig.GetHost());
+ if (const ui32 port = grpcConfig.GetPort()) {
+ FillEnpointInfo(address, port, grpcConfig.GetPublicHost(), grpcConfig.GetPublicPort(), false, *result.add_endpoints());
+ }
+
+ if (const ui32 sslPort = grpcConfig.GetSslPort()) {
+ FillEnpointInfo(address, sslPort, grpcConfig.GetPublicHost(), grpcConfig.GetPublicSslPort(), true, *result.add_endpoints());
+ }
+}
+
+}
+
+namespace NKikimr {
+namespace NGRpcService {
+
+static TString GetSdkBuildInfo(NGrpc::IRequestContextBase* reqCtx) {
+ const auto& res = reqCtx->GetPeerMetaValues(NYdb::YDB_SDK_BUILD_INFO_HEADER);
+ if (res.empty()) {
+ return {};
+ }
+ return TString{res[0]};
+}
+
+TGRpcLocalDiscoveryService::TGRpcLocalDiscoveryService(const NKikimrConfig::TGRpcConfig& grpcConfig,
+ NActors::TActorSystem *system,
+ TIntrusivePtr<NMonitoring::TDynamicCounters> counters,
+ NActors::TActorId id)
+ : GrpcConfig(grpcConfig)
+ , ActorSystem_(system)
+ , Counters_(counters)
+ , GRpcRequestProxyId_(id)
+{
+}
+
+void TGRpcLocalDiscoveryService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) {
+ CQ_ = cq;
+ SetupIncomingRequests(std::move(logger));
+}
+
+void TGRpcLocalDiscoveryService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter *limiter) {
+ Limiter_ = limiter;
+}
+
+bool TGRpcLocalDiscoveryService::IncRequest() {
+ return Limiter_->Inc();
+}
+
+void TGRpcLocalDiscoveryService::DecRequest() {
+ Limiter_->Dec();
+ Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0);
+}
+
+void TGRpcLocalDiscoveryService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
+ auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_);
+ using namespace Ydb;
+#ifdef ADD_REQUEST
+#error macro already defined
+#endif
+
+#define ADD_REQUEST(NAME, CB) \
+ MakeIntrusive<TGRpcRequest<Discovery::NAME##Request, Discovery::NAME##Response, TGRpcLocalDiscoveryService>> \
+ (this, &Service_, CQ_, \
+ [this](NGrpc::IRequestContextBase *ctx) { \
+ NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer(), GetSdkBuildInfo(ctx)); \
+ ActorSystem_->Send(GRpcRequestProxyId_, \
+ new TGrpcRequestOperationCall<Discovery::NAME##Request, Discovery::NAME##Response> \
+ (ctx, &CB, TRequestAuxSettings{TRateLimiterMode::Rps, nullptr})); \
+ }, &Ydb::Discovery::V1::DiscoveryService::AsyncService::Request ## NAME, \
+ #NAME, logger, getCounterBlock("discovery", #NAME))->Run();
+
+ ADD_REQUEST(WhoAmI, DoWhoAmIRequest)
+#undef ADD_REQUEST
+
+using namespace std::placeholders;
+
+#ifdef ADD_METHOD
+#error macro already defined
+#endif
+
+#define ADD_METHOD(NAME, METHOD) \
+ MakeIntrusive<TGRpcRequest<Discovery::NAME##Request, Discovery::NAME##Response, TGRpcLocalDiscoveryService>> \
+ (this, &Service_, CQ_, \
+ [this](NGrpc::IRequestContextBase *ctx) { \
+ NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer(), GetSdkBuildInfo(ctx)); \
+ TFuncCallback cb = std::bind(&TGRpcLocalDiscoveryService::METHOD, this, _1, _2); \
+ ActorSystem_->Send(GRpcRequestProxyId_, \
+ new TGrpcRequestFunctionCall<Discovery::NAME##Request, Discovery::NAME##Response> \
+ (ctx, cb, TRequestAuxSettings{TRateLimiterMode::Rps, nullptr})); \
+ }, &Ydb::Discovery::V1::DiscoveryService::AsyncService::Request ## NAME, \
+ #NAME, logger, getCounterBlock("discovery", #NAME))->Run();
+
+ ADD_METHOD(ListEndpoints, DoListEndpointsRequest)
+#undef ADD_METHOD
+
+}
+
+void TGRpcLocalDiscoveryService::DoListEndpointsRequest(std::unique_ptr<IRequestOpCtx> request, const IFacilityProvider&) {
+ auto *response = TEvListEndpointsRequest::AllocateResult<Ydb::Discovery::ListEndpointsResult>(request);
+ AddEndpointsForGrpcConfig(GrpcConfig, *response);
+ request->SendResult(*response, Ydb::StatusIds::SUCCESS);
+}
+
+} // namespace NGRpcService
+} // namespace NKikimr
diff --git a/ydb/services/local_discovery/grpc_service.h b/ydb/services/local_discovery/grpc_service.h
new file mode 100644
index 0000000000..7dd7add0bc
--- /dev/null
+++ b/ydb/services/local_discovery/grpc_service.h
@@ -0,0 +1,45 @@
+#pragma once
+
+#include <library/cpp/actors/core/actorsystem.h>
+
+#include <ydb/core/protos/config.pb.h>
+#include <ydb/public/api/grpc/ydb_discovery_v1.grpc.pb.h>
+
+#include <library/cpp/grpc/server/grpc_server.h>
+
+namespace NKikimr {
+namespace NGRpcService {
+
+class IRequestOpCtx;
+class IFacilityProvider;
+
+class TGRpcLocalDiscoveryService
+ : public NGrpc::TGrpcServiceBase<Ydb::Discovery::V1::DiscoveryService>
+{
+public:
+ TGRpcLocalDiscoveryService(const NKikimrConfig::TGRpcConfig& grpcConfig,
+ NActors::TActorSystem* system,
+ TIntrusivePtr<NMonitoring::TDynamicCounters> counters,
+ NActors::TActorId id);
+
+ void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override;
+ void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override;
+
+ bool IncRequest();
+ void DecRequest();
+
+private:
+ void SetupIncomingRequests(NGrpc::TLoggerPtr logger);
+ void DoListEndpointsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& provider);
+
+ const NKikimrConfig::TGRpcConfig& GrpcConfig;
+ NActors::TActorSystem* ActorSystem_;
+ grpc::ServerCompletionQueue* CQ_ = nullptr;
+
+ TIntrusivePtr<NMonitoring::TDynamicCounters> Counters_;
+ NActors::TActorId GRpcRequestProxyId_;
+ NGrpc::TGlobalLimiter* Limiter_ = nullptr;
+};
+
+} // namespace NGRpcService
+} // namespace NKikimr
diff --git a/ydb/tests/library/harness/kikimr_port_allocator.py b/ydb/tests/library/harness/kikimr_port_allocator.py
index a32e3708a8..d92b63d19a 100644
--- a/ydb/tests/library/harness/kikimr_port_allocator.py
+++ b/ydb/tests/library/harness/kikimr_port_allocator.py
@@ -34,6 +34,10 @@ class KikimrNodePortAllocatorInterface(object):
def sqs_port(self):
pass
+ @abc.abstractproperty
+ def ext_port(self):
+ pass
+
class KikimrPortAllocatorInterface(object):
__metaclass__ = abc.ABCMeta
@@ -74,6 +78,7 @@ class KikimrPortManagerNodePortAllocator(KikimrNodePortAllocatorInterface):
self.__ic_port = None
self.__sqs_port = None
self.__grpc_ssl_port = None
+ self.__ext_port = None
@property
def mon_port(self):
@@ -111,6 +116,12 @@ class KikimrPortManagerNodePortAllocator(KikimrNodePortAllocatorInterface):
self.__sqs_port = self.__port_manager.get_port()
return self.__sqs_port
+ @property
+ def ext_port(self):
+ if self.__ext_port is None:
+ self.__ext_port = self.__port_manager.get_port()
+ return self.__ext_port
+
class KikimrPortManagerPortAllocator(KikimrPortAllocatorInterface):
def __init__(self, port_manager=None):