diff options
author | uzhas <uzhas@ydb.tech> | 2022-08-10 18:58:14 +0300 |
---|---|---|
committer | uzhas <uzhas@ydb.tech> | 2022-08-10 18:58:14 +0300 |
commit | d76ca4f851f50282332ef0b1b47921203651515f (patch) | |
tree | 5151c32c51d1055228f3b7ad4be84e3c7715da85 | |
parent | 20c80c1024743302d91facc70fdecc4162a5b0f8 (diff) | |
download | ydb-d76ca4f851f50282332ef0b1b47921203651515f.tar.gz |
share grpc connection to token accessor with all tasks in single process
9 files changed, 130 insertions, 55 deletions
diff --git a/contrib/restricted/boost/callable_traits/README.md b/contrib/restricted/boost/callable_traits/README.md index 935520d2f3..2e1f378425 100644 --- a/contrib/restricted/boost/callable_traits/README.md +++ b/contrib/restricted/boost/callable_traits/README.md @@ -1,19 +1,18 @@ <!-- -Copyright Barrett Adair 2016-2017 +Copyright Barrett Adair 2016-2021 Distributed under the Boost Software License, Version 1.0. (See accompanying file LICENSE.md or copy at http://boost.org/LICENSE_1_0.txt) --> -# Boost.CallableTraits <a target="_blank" href="https://travis-ci.org/boostorg/callable_traits">![Travis status][badge.Travis]</a> <a target="_blank" href="https://ci.appveyor.com/project/boostorg/callable-traits">![Appveyor status][badge.Appveyor]</a> +# Boost.CallableTraits <a target="_blank" href="https://github.com/boostorg/callable_traits/actions/workflows/ci.yml">![CI][badge.CI]</a> -CallableTraits is a C++11 header-only library for the inspection, synthesis, and decomposition of callable types. +CallableTraits is a standalone C++11 header-only library for the inspection, synthesis, and decomposition of callable types. Language features added in later C++ standards are also supported. -The latest documentation is available [here](http://www.boost.org/doc/libs/develop/libs/callable_traits/doc/html/index.html). +The latest documentation is available [here](http://www.boost.org/doc/libs/master/libs/callable_traits/doc/html/index.html). -CallableTraits was [formally reviewed](http://www.boost.org/community/reviews.html) and [accepted](https://lists.boost.org/Archives/boost/2017/04/234513.php) into the [Boost C++ Libraries](http://www.boost.org/). CallableTraits is available in Boost 1.66 (December 2017) and later. You can also download CallableTraits as a standalone library [here](https://github.com/boostorg/callable_traits/releases/latest). +CallableTraits is released as part of the [Boost C++ Libraries](http://www.boost.org/). Since it only depends on the standard library headers, you can also download it as a standalone library [here](https://github.com/boostorg/callable_traits/releases/latest). Licensed under the [Boost Software License, Version 1.0](LICENSE.md). <!-- Links --> -[badge.Appveyor]: https://ci.appveyor.com/api/projects/status/uf0l91v7l4wc4kw6/branch/master?svg=true -[badge.Travis]: https://travis-ci.org/boostorg/callable_traits.svg?branch=master +[badge.CI]: https://github.com/boostorg/callable_traits/actions/workflows/ci.yml/badge.svg diff --git a/ydb/core/yq/libs/config/protos/token_accessor.proto b/ydb/core/yq/libs/config/protos/token_accessor.proto index 9321f0abbe..5226d77d96 100644 --- a/ydb/core/yq/libs/config/protos/token_accessor.proto +++ b/ydb/core/yq/libs/config/protos/token_accessor.proto @@ -12,4 +12,5 @@ message TTokenAccessorConfig { bool UseSsl = 3; // Whether to use SSL string HmacSecretFile = 4; string SslCaCert = 5; + uint64 ConnectionPoolSize = 6; } diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp index a23982a630..051f38a775 100644 --- a/ydb/core/yq/libs/init/init.cpp +++ b/ydb/core/yq/libs/init/init.cpp @@ -150,7 +150,7 @@ void Init( caContent = TUnbufferedFileInput(path).ReadAll(); } - credentialsFactory = NYql::CreateSecuredServiceAccountCredentialsOverTokenAccessorFactory(tokenAccessorConfig.GetEndpoint(), tokenAccessorConfig.GetUseSsl(), caContent); + credentialsFactory = NYql::CreateSecuredServiceAccountCredentialsOverTokenAccessorFactory(tokenAccessorConfig.GetEndpoint(), tokenAccessorConfig.GetUseSsl(), caContent, tokenAccessorConfig.GetConnectionPoolSize()); } if (protoConfig.GetPrivateApi().GetEnabled()) { diff --git a/ydb/library/yql/providers/common/token_accessor/client/factory.cpp b/ydb/library/yql/providers/common/token_accessor/client/factory.cpp index b077c56d33..0a7e9b6af4 100644 --- a/ydb/library/yql/providers/common/token_accessor/client/factory.cpp +++ b/ydb/library/yql/providers/common/token_accessor/client/factory.cpp @@ -9,35 +9,52 @@ namespace NYql { namespace { +using TTokenAccessorConnectionPool = std::vector<std::shared_ptr<NGrpc::TServiceConnection<TokenAccessorService>>>; + class TSecuredServiceAccountCredentialsFactoryImpl : public ISecuredServiceAccountCredentialsFactory { public: TSecuredServiceAccountCredentialsFactoryImpl( const TString& tokenAccessorEndpoint, bool useSsl, const TString& sslCaCert, + ui32 connectionPoolSize, const TDuration& refreshPeriod, const TDuration& requestTimeout ) - : TokenAccessorEndpoint(tokenAccessorEndpoint) - , UseSsl(useSsl) - , SslCaCert(sslCaCert) - , RefreshPeriod(refreshPeriod) - , RequestTimeout(requestTimeout) { + : RefreshPeriod(refreshPeriod) + , RequestTimeout(requestTimeout) + , Client(std::make_shared<NGrpc::TGRpcClientLow>()) + { + GrpcClientConfig.Locator = tokenAccessorEndpoint; + GrpcClientConfig.EnableSsl = useSsl; + GrpcClientConfig.SslCaCert = sslCaCert; + Connections.reserve(connectionPoolSize); + for (ui32 i = 0; i < connectionPoolSize; ++i) { + Connections.push_back(Client->CreateGRpcServiceConnection<TokenAccessorService>(GrpcClientConfig)); + } } std::shared_ptr<NYdb::ICredentialsProviderFactory> Create(const TString& serviceAccountId, const TString& serviceAccountIdSignature) override { Y_ENSURE(serviceAccountId); Y_ENSURE(serviceAccountIdSignature); - return CreateTokenAccessorCredentialsProviderFactory(TokenAccessorEndpoint, UseSsl, SslCaCert, serviceAccountId, serviceAccountIdSignature, RefreshPeriod, RequestTimeout); + std::shared_ptr<NGrpc::TServiceConnection<TokenAccessorService>> connection; + if (Connections.empty()) { + connection = Client->CreateGRpcServiceConnection<TokenAccessorService>(GrpcClientConfig); + } else { + connection = Connections[NextConnectionIndex++ % Connections.size()]; + } + + return CreateTokenAccessorCredentialsProviderFactory(Client, std::move(connection), serviceAccountId, serviceAccountIdSignature, RefreshPeriod, RequestTimeout); } private: - const TString TokenAccessorEndpoint; - const bool UseSsl; - const TString SslCaCert; + NGrpc::TGRpcClientConfig GrpcClientConfig; const TDuration RefreshPeriod; const TDuration RequestTimeout; + const std::shared_ptr<NGrpc::TGRpcClientLow> Client; + TTokenAccessorConnectionPool Connections; + mutable std::atomic<ui32> NextConnectionIndex = 0; }; std::shared_ptr<NYdb::ICredentialsProviderFactory> WrapWithBearerIfNeeded(std::shared_ptr<NYdb::ICredentialsProviderFactory> delegatee, bool addBearerToToken) { @@ -52,9 +69,10 @@ ISecuredServiceAccountCredentialsFactory::TPtr CreateSecuredServiceAccountCreden const TString& tokenAccessorEndpoint, bool useSsl, const TString& sslCaCert, + ui32 connectionPoolSize, const TDuration& refreshPeriod, const TDuration& requestTimeout) { - return std::make_shared<TSecuredServiceAccountCredentialsFactoryImpl>(tokenAccessorEndpoint, useSsl, sslCaCert, refreshPeriod, requestTimeout); + return std::make_shared<TSecuredServiceAccountCredentialsFactoryImpl>(tokenAccessorEndpoint, useSsl, sslCaCert, connectionPoolSize, refreshPeriod, requestTimeout); } std::shared_ptr<NYdb::ICredentialsProviderFactory> CreateCredentialsProviderFactoryForStructuredToken(ISecuredServiceAccountCredentialsFactory::TPtr factory, const TString& structuredTokenJson, bool addBearerToToken) { diff --git a/ydb/library/yql/providers/common/token_accessor/client/factory.h b/ydb/library/yql/providers/common/token_accessor/client/factory.h index f798949e29..c98989417a 100644 --- a/ydb/library/yql/providers/common/token_accessor/client/factory.h +++ b/ydb/library/yql/providers/common/token_accessor/client/factory.h @@ -18,6 +18,7 @@ ISecuredServiceAccountCredentialsFactory::TPtr CreateSecuredServiceAccountCreden const TString& tokenAccessorEndpoint, bool useSsl, const TString& sslCaCert, + ui32 connectionPoolSize = 0, const TDuration& refreshPeriod = TDuration::Hours(1), const TDuration& requestTimeout = TDuration::Seconds(10) ); diff --git a/ydb/library/yql/providers/common/token_accessor/client/token_accessor_client.cpp b/ydb/library/yql/providers/common/token_accessor/client/token_accessor_client.cpp index 220f624e3f..b11c1d0f47 100644 --- a/ydb/library/yql/providers/common/token_accessor/client/token_accessor_client.cpp +++ b/ydb/library/yql/providers/common/token_accessor/client/token_accessor_client.cpp @@ -20,27 +20,22 @@ class TTokenAccessorCredentialsProvider : public NYdb::ICredentialsProvider { private: class TImpl : public std::enable_shared_from_this<TImpl> { public: - TImpl(const TString& tokenAccessorEndpoint, - bool useSsl, - const TString& sslCaCert, + TImpl( + std::shared_ptr<NGrpc::TGRpcClientLow> client, + std::shared_ptr<NGrpc::TServiceConnection<TokenAccessorService>> connection, const TString& serviceAccountId, const TString& serviceAccountIdSignature, const TDuration& refreshPeriod, const TDuration& requestTimeout) - : Client(std::make_unique<NGrpc::TGRpcClientLow>()) + : Client(std::move(client)) + , Connection(std::move(connection)) , NextTicketUpdate(TInstant::Zero()) - , TokenAccessorEndpoint(tokenAccessorEndpoint) , ServiceAccountId(serviceAccountId) , ServiceAccountIdSignature(serviceAccountIdSignature) , RefreshPeriod(refreshPeriod) , RequestTimeout(requestTimeout) , Infly(0) { - NGrpc::TGRpcClientConfig grpcConf; - grpcConf.Locator = tokenAccessorEndpoint; - grpcConf.EnableSsl = useSsl; - grpcConf.SslCaCert = sslCaCert; - Connection = Client->CreateGRpcServiceConnection<TokenAccessorService>(grpcConf); } void UpdateTicket(bool sync = false) const { @@ -51,9 +46,11 @@ private: RequestInflight = true; auto resultPromise = NThreading::NewPromise(); - std::shared_ptr<const TImpl> self = shared_from_this(); - auto cb = [self, resultPromise, sync](NGrpc::TGrpcStatus&& status, GetTokenResponse&& result) mutable { - self->ProcessResponse(std::move(status), std::move(result), sync); + std::weak_ptr<const TImpl> weakSelf = shared_from_this(); + auto cb = [weakSelf, resultPromise, sync](NGrpc::TGrpcStatus&& status, GetTokenResponse&& result) mutable { + if (auto self = weakSelf.lock()) { + self->ProcessResponse(std::move(status), std::move(result), sync); + } resultPromise.SetValue(); }; @@ -95,8 +92,6 @@ private: void Stop() { NeedStop = true; - - Client.reset(); // Will trigger destroy } private: @@ -106,7 +101,7 @@ private: --Infly; LastRequestError = TStringBuilder() << "Last request error was at " << TInstant::Now() << ". GrpcStatusCode: " << status.GRpcStatusCode << " Message: \"" << status.Msg - << "\" internal: " << status.InternalError << " token accessor endpoint: \"" << TokenAccessorEndpoint << "\""; + << "\" internal: " << status.InternalError; } RequestInflight = false; Sleep(std::min(BackoffTimeout, BACKOFF_MAX)); @@ -125,11 +120,10 @@ private: } private: - std::unique_ptr<NGrpc::TGRpcClientLow> Client; - std::unique_ptr<NGrpc::TServiceConnection<TokenAccessorService>> Connection; + const std::shared_ptr<NGrpc::TGRpcClientLow> Client; + const std::shared_ptr<NGrpc::TServiceConnection<TokenAccessorService>> Connection; mutable TString Ticket; mutable TInstant NextTicketUpdate; - const TString TokenAccessorEndpoint; const TString ServiceAccountId; const TString ServiceAccountIdSignature; const TDuration RefreshPeriod; @@ -144,15 +138,14 @@ private: public: TTokenAccessorCredentialsProvider( - const TString& tokenAccessorEndpoint, - bool useSsl, - const TString& sslCaCert, + std::shared_ptr<NGrpc::TGRpcClientLow> client, + std::shared_ptr<NGrpc::TServiceConnection<TokenAccessorService>> connection, const TString& serviceAccountId, const TString& serviceAccountIdSignature, const TDuration& refreshPeriod, const TDuration& requestTimeout ) - : Impl(std::make_shared<TImpl>(tokenAccessorEndpoint, useSsl, sslCaCert, serviceAccountId, serviceAccountIdSignature, refreshPeriod, requestTimeout)) + : Impl(std::make_shared<TImpl>(std::move(client), std::move(connection), serviceAccountId, serviceAccountIdSignature, refreshPeriod, requestTimeout)) { Impl->UpdateTicket(true); } @@ -183,7 +176,28 @@ std::shared_ptr<NYdb::ICredentialsProvider> CreateTokenAccessorCredentialsProvid const TString& serviceAccountIdSignature, const TDuration& refreshPeriod, const TDuration& requestTimeout -) { - return std::make_shared<TTokenAccessorCredentialsProvider>(tokenAccessorEndpoint, useSsl, sslCaCert, serviceAccountId, serviceAccountIdSignature, refreshPeriod, requestTimeout); +) +{ + auto client = std::make_unique<NGrpc::TGRpcClientLow>(); + NGrpc::TGRpcClientConfig grpcConf; + grpcConf.Locator = tokenAccessorEndpoint; + grpcConf.EnableSsl = useSsl; + grpcConf.SslCaCert = sslCaCert; + std::shared_ptr<NGrpc::TServiceConnection<TokenAccessorService>> connection = client->CreateGRpcServiceConnection<TokenAccessorService>(grpcConf); + + return CreateTokenAccessorCredentialsProvider(std::move(client), std::move(connection), serviceAccountId, serviceAccountIdSignature, refreshPeriod, requestTimeout); +} + +std::shared_ptr<NYdb::ICredentialsProvider> CreateTokenAccessorCredentialsProvider( + std::shared_ptr<NGrpc::TGRpcClientLow> client, + std::shared_ptr<NGrpc::TServiceConnection<TokenAccessorService>> connection, + const TString& serviceAccountId, + const TString& serviceAccountIdSignature, + const TDuration& refreshPeriod, + const TDuration& requestTimeout +) +{ + return std::make_shared<TTokenAccessorCredentialsProvider>(std::move(client), std::move(connection), serviceAccountId, serviceAccountIdSignature, refreshPeriod, requestTimeout); } + } diff --git a/ydb/library/yql/providers/common/token_accessor/client/token_accessor_client.h b/ydb/library/yql/providers/common/token_accessor/client/token_accessor_client.h index 768a12f5e7..141ce36f83 100644 --- a/ydb/library/yql/providers/common/token_accessor/client/token_accessor_client.h +++ b/ydb/library/yql/providers/common/token_accessor/client/token_accessor_client.h @@ -1,5 +1,8 @@ #pragma once +#include <ydb/library/yql/providers/common/token_accessor/grpc/token_accessor_pb.grpc.pb.h> +#include <library/cpp/grpc/client/grpc_client_low.h> + #include <ydb/public/sdk/cpp/client/ydb_types/credentials/credentials.h> #include <util/datetime/base.h> @@ -15,4 +18,13 @@ std::shared_ptr<NYdb::ICredentialsProvider> CreateTokenAccessorCredentialsProvid const TDuration& requestTimeout = TDuration::Seconds(10) ); +std::shared_ptr<NYdb::ICredentialsProvider> CreateTokenAccessorCredentialsProvider( + std::shared_ptr<NGrpc::TGRpcClientLow> client, + std::shared_ptr<NGrpc::TServiceConnection<TokenAccessorService>> connection, + const TString& serviceAccountId, + const TString& serviceAccountIdSignature, + const TDuration& refreshPeriod = TDuration::Hours(1), + const TDuration& requestTimeout = TDuration::Seconds(10) +); + } diff --git a/ydb/library/yql/providers/common/token_accessor/client/token_accessor_client_factory.cpp b/ydb/library/yql/providers/common/token_accessor/client/token_accessor_client_factory.cpp index 65ed25856f..da77d08ff6 100644 --- a/ydb/library/yql/providers/common/token_accessor/client/token_accessor_client_factory.cpp +++ b/ydb/library/yql/providers/common/token_accessor/client/token_accessor_client_factory.cpp @@ -10,17 +10,15 @@ namespace { class TTokenAccessorCredentialsProviderFactory : public NYdb::ICredentialsProviderFactory { public: TTokenAccessorCredentialsProviderFactory( - const TString& tokenAccessorEndpoint, - bool useSsl, - const TString& sslCaCert, + std::shared_ptr<NGrpc::TGRpcClientLow> client, + std::shared_ptr<NGrpc::TServiceConnection<TokenAccessorService>> connection, const TString& serviceAccountId, const TString& serviceAccountIdSignature, const TDuration& refreshPeriod, const TDuration& requestTimeout ) - : TokenAccessorEndpoint(tokenAccessorEndpoint) - , UseSsl(useSsl) - , SslCaCert(sslCaCert) + : Client(std::move(client)) + , Connection(std::move(connection)) , ServiceAccountId(serviceAccountId) , ServiceAccountIdSignature(serviceAccountIdSignature) , RefreshPeriod(refreshPeriod) @@ -29,13 +27,12 @@ public: } std::shared_ptr<NYdb::ICredentialsProvider> CreateProvider() const override { - return CreateTokenAccessorCredentialsProvider(TokenAccessorEndpoint, UseSsl, SslCaCert, ServiceAccountId, ServiceAccountIdSignature, RefreshPeriod, RequestTimeout); + return CreateTokenAccessorCredentialsProvider(Client, Connection, ServiceAccountId, ServiceAccountIdSignature, RefreshPeriod, RequestTimeout); } private: - const TString TokenAccessorEndpoint; - const bool UseSsl; - const TString SslCaCert; + const std::shared_ptr<NGrpc::TGRpcClientLow> Client; + const std::shared_ptr<NGrpc::TServiceConnection<TokenAccessorService>> Connection; const TString ServiceAccountId; const TString ServiceAccountIdSignature; const TDuration RefreshPeriod; @@ -51,8 +48,29 @@ std::shared_ptr<NYdb::ICredentialsProviderFactory> CreateTokenAccessorCredential const TString& serviceAccountId, const TString& serviceAccountIdSignature, const TDuration& refreshPeriod, - const TDuration& requestTimeout) + const TDuration& requestTimeout +) { - return std::make_shared<TTokenAccessorCredentialsProviderFactory>(tokenAccessorEndpoint, useSsl, sslCaCert, serviceAccountId, serviceAccountIdSignature, refreshPeriod, requestTimeout); + auto client = std::make_unique<NGrpc::TGRpcClientLow>(); + NGrpc::TGRpcClientConfig grpcConf; + grpcConf.Locator = tokenAccessorEndpoint; + grpcConf.EnableSsl = useSsl; + grpcConf.SslCaCert = sslCaCert; + std::shared_ptr<NGrpc::TServiceConnection<TokenAccessorService>> connection = client->CreateGRpcServiceConnection<TokenAccessorService>(grpcConf); + + return CreateTokenAccessorCredentialsProviderFactory(std::move(client), std::move(connection), serviceAccountId, serviceAccountIdSignature, refreshPeriod, requestTimeout); } + +std::shared_ptr<NYdb::ICredentialsProviderFactory> CreateTokenAccessorCredentialsProviderFactory( + std::shared_ptr<NGrpc::TGRpcClientLow> client, + std::shared_ptr<NGrpc::TServiceConnection<TokenAccessorService>> connection, + const TString& serviceAccountId, + const TString& serviceAccountIdSignature, + const TDuration& refreshPeriod, + const TDuration& requestTimeout +) +{ + return std::make_shared<TTokenAccessorCredentialsProviderFactory>(std::move(client), std::move(connection), serviceAccountId, serviceAccountIdSignature, refreshPeriod, requestTimeout); +} + } diff --git a/ydb/library/yql/providers/common/token_accessor/client/token_accessor_client_factory.h b/ydb/library/yql/providers/common/token_accessor/client/token_accessor_client_factory.h index b3b82acdb2..2005db5819 100644 --- a/ydb/library/yql/providers/common/token_accessor/client/token_accessor_client_factory.h +++ b/ydb/library/yql/providers/common/token_accessor/client/token_accessor_client_factory.h @@ -1,5 +1,8 @@ #pragma once +#include <ydb/library/yql/providers/common/token_accessor/grpc/token_accessor_pb.grpc.pb.h> +#include <library/cpp/grpc/client/grpc_client_low.h> + #include <ydb/public/sdk/cpp/client/ydb_types/credentials/credentials.h> #include <util/datetime/base.h> @@ -15,5 +18,14 @@ std::shared_ptr<NYdb::ICredentialsProviderFactory> CreateTokenAccessorCredential const TDuration& requestTimeout = TDuration::Seconds(10) ); +std::shared_ptr<NYdb::ICredentialsProviderFactory> CreateTokenAccessorCredentialsProviderFactory( + std::shared_ptr<NGrpc::TGRpcClientLow> client, + std::shared_ptr<NGrpc::TServiceConnection<TokenAccessorService>> connection, + const TString& serviceAccountId, + const TString& serviceAccountIdSignature, + const TDuration& refreshPeriod = TDuration::Hours(1), + const TDuration& requestTimeout = TDuration::Seconds(10) +); + } |