aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoruzhas <uzhas@ydb.tech>2022-08-10 18:58:14 +0300
committeruzhas <uzhas@ydb.tech>2022-08-10 18:58:14 +0300
commitd76ca4f851f50282332ef0b1b47921203651515f (patch)
tree5151c32c51d1055228f3b7ad4be84e3c7715da85
parent20c80c1024743302d91facc70fdecc4162a5b0f8 (diff)
downloadydb-d76ca4f851f50282332ef0b1b47921203651515f.tar.gz
share grpc connection to token accessor with all tasks in single process
-rw-r--r--contrib/restricted/boost/callable_traits/README.md13
-rw-r--r--ydb/core/yq/libs/config/protos/token_accessor.proto1
-rw-r--r--ydb/core/yq/libs/init/init.cpp2
-rw-r--r--ydb/library/yql/providers/common/token_accessor/client/factory.cpp38
-rw-r--r--ydb/library/yql/providers/common/token_accessor/client/factory.h1
-rw-r--r--ydb/library/yql/providers/common/token_accessor/client/token_accessor_client.cpp64
-rw-r--r--ydb/library/yql/providers/common/token_accessor/client/token_accessor_client.h12
-rw-r--r--ydb/library/yql/providers/common/token_accessor/client/token_accessor_client_factory.cpp42
-rw-r--r--ydb/library/yql/providers/common/token_accessor/client/token_accessor_client_factory.h12
9 files changed, 130 insertions, 55 deletions
diff --git a/contrib/restricted/boost/callable_traits/README.md b/contrib/restricted/boost/callable_traits/README.md
index 935520d2f3..2e1f378425 100644
--- a/contrib/restricted/boost/callable_traits/README.md
+++ b/contrib/restricted/boost/callable_traits/README.md
@@ -1,19 +1,18 @@
<!--
-Copyright Barrett Adair 2016-2017
+Copyright Barrett Adair 2016-2021
Distributed under the Boost Software License, Version 1.0.
(See accompanying file LICENSE.md or copy at http://boost.org/LICENSE_1_0.txt)
-->
-# Boost.CallableTraits <a target="_blank" href="https://travis-ci.org/boostorg/callable_traits">![Travis status][badge.Travis]</a> <a target="_blank" href="https://ci.appveyor.com/project/boostorg/callable-traits">![Appveyor status][badge.Appveyor]</a>
+# Boost.CallableTraits <a target="_blank" href="https://github.com/boostorg/callable_traits/actions/workflows/ci.yml">![CI][badge.CI]</a>
-CallableTraits is a C++11 header-only library for the inspection, synthesis, and decomposition of callable types.
+CallableTraits is a standalone C++11 header-only library for the inspection, synthesis, and decomposition of callable types. Language features added in later C++ standards are also supported.
-The latest documentation is available [here](http://www.boost.org/doc/libs/develop/libs/callable_traits/doc/html/index.html).
+The latest documentation is available [here](http://www.boost.org/doc/libs/master/libs/callable_traits/doc/html/index.html).
-CallableTraits was [formally reviewed](http://www.boost.org/community/reviews.html) and [accepted](https://lists.boost.org/Archives/boost/2017/04/234513.php) into the [Boost C++ Libraries](http://www.boost.org/). CallableTraits is available in Boost 1.66 (December 2017) and later. You can also download CallableTraits as a standalone library [here](https://github.com/boostorg/callable_traits/releases/latest).
+CallableTraits is released as part of the [Boost C++ Libraries](http://www.boost.org/). Since it only depends on the standard library headers, you can also download it as a standalone library [here](https://github.com/boostorg/callable_traits/releases/latest).
Licensed under the [Boost Software License, Version 1.0](LICENSE.md).
<!-- Links -->
-[badge.Appveyor]: https://ci.appveyor.com/api/projects/status/uf0l91v7l4wc4kw6/branch/master?svg=true
-[badge.Travis]: https://travis-ci.org/boostorg/callable_traits.svg?branch=master
+[badge.CI]: https://github.com/boostorg/callable_traits/actions/workflows/ci.yml/badge.svg
diff --git a/ydb/core/yq/libs/config/protos/token_accessor.proto b/ydb/core/yq/libs/config/protos/token_accessor.proto
index 9321f0abbe..5226d77d96 100644
--- a/ydb/core/yq/libs/config/protos/token_accessor.proto
+++ b/ydb/core/yq/libs/config/protos/token_accessor.proto
@@ -12,4 +12,5 @@ message TTokenAccessorConfig {
bool UseSsl = 3; // Whether to use SSL
string HmacSecretFile = 4;
string SslCaCert = 5;
+ uint64 ConnectionPoolSize = 6;
}
diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp
index a23982a630..051f38a775 100644
--- a/ydb/core/yq/libs/init/init.cpp
+++ b/ydb/core/yq/libs/init/init.cpp
@@ -150,7 +150,7 @@ void Init(
caContent = TUnbufferedFileInput(path).ReadAll();
}
- credentialsFactory = NYql::CreateSecuredServiceAccountCredentialsOverTokenAccessorFactory(tokenAccessorConfig.GetEndpoint(), tokenAccessorConfig.GetUseSsl(), caContent);
+ credentialsFactory = NYql::CreateSecuredServiceAccountCredentialsOverTokenAccessorFactory(tokenAccessorConfig.GetEndpoint(), tokenAccessorConfig.GetUseSsl(), caContent, tokenAccessorConfig.GetConnectionPoolSize());
}
if (protoConfig.GetPrivateApi().GetEnabled()) {
diff --git a/ydb/library/yql/providers/common/token_accessor/client/factory.cpp b/ydb/library/yql/providers/common/token_accessor/client/factory.cpp
index b077c56d33..0a7e9b6af4 100644
--- a/ydb/library/yql/providers/common/token_accessor/client/factory.cpp
+++ b/ydb/library/yql/providers/common/token_accessor/client/factory.cpp
@@ -9,35 +9,52 @@ namespace NYql {
namespace {
+using TTokenAccessorConnectionPool = std::vector<std::shared_ptr<NGrpc::TServiceConnection<TokenAccessorService>>>;
+
class TSecuredServiceAccountCredentialsFactoryImpl : public ISecuredServiceAccountCredentialsFactory {
public:
TSecuredServiceAccountCredentialsFactoryImpl(
const TString& tokenAccessorEndpoint,
bool useSsl,
const TString& sslCaCert,
+ ui32 connectionPoolSize,
const TDuration& refreshPeriod,
const TDuration& requestTimeout
)
- : TokenAccessorEndpoint(tokenAccessorEndpoint)
- , UseSsl(useSsl)
- , SslCaCert(sslCaCert)
- , RefreshPeriod(refreshPeriod)
- , RequestTimeout(requestTimeout) {
+ : RefreshPeriod(refreshPeriod)
+ , RequestTimeout(requestTimeout)
+ , Client(std::make_shared<NGrpc::TGRpcClientLow>())
+ {
+ GrpcClientConfig.Locator = tokenAccessorEndpoint;
+ GrpcClientConfig.EnableSsl = useSsl;
+ GrpcClientConfig.SslCaCert = sslCaCert;
+ Connections.reserve(connectionPoolSize);
+ for (ui32 i = 0; i < connectionPoolSize; ++i) {
+ Connections.push_back(Client->CreateGRpcServiceConnection<TokenAccessorService>(GrpcClientConfig));
+ }
}
std::shared_ptr<NYdb::ICredentialsProviderFactory> Create(const TString& serviceAccountId, const TString& serviceAccountIdSignature) override {
Y_ENSURE(serviceAccountId);
Y_ENSURE(serviceAccountIdSignature);
- return CreateTokenAccessorCredentialsProviderFactory(TokenAccessorEndpoint, UseSsl, SslCaCert, serviceAccountId, serviceAccountIdSignature, RefreshPeriod, RequestTimeout);
+ std::shared_ptr<NGrpc::TServiceConnection<TokenAccessorService>> connection;
+ if (Connections.empty()) {
+ connection = Client->CreateGRpcServiceConnection<TokenAccessorService>(GrpcClientConfig);
+ } else {
+ connection = Connections[NextConnectionIndex++ % Connections.size()];
+ }
+
+ return CreateTokenAccessorCredentialsProviderFactory(Client, std::move(connection), serviceAccountId, serviceAccountIdSignature, RefreshPeriod, RequestTimeout);
}
private:
- const TString TokenAccessorEndpoint;
- const bool UseSsl;
- const TString SslCaCert;
+ NGrpc::TGRpcClientConfig GrpcClientConfig;
const TDuration RefreshPeriod;
const TDuration RequestTimeout;
+ const std::shared_ptr<NGrpc::TGRpcClientLow> Client;
+ TTokenAccessorConnectionPool Connections;
+ mutable std::atomic<ui32> NextConnectionIndex = 0;
};
std::shared_ptr<NYdb::ICredentialsProviderFactory> WrapWithBearerIfNeeded(std::shared_ptr<NYdb::ICredentialsProviderFactory> delegatee, bool addBearerToToken) {
@@ -52,9 +69,10 @@ ISecuredServiceAccountCredentialsFactory::TPtr CreateSecuredServiceAccountCreden
const TString& tokenAccessorEndpoint,
bool useSsl,
const TString& sslCaCert,
+ ui32 connectionPoolSize,
const TDuration& refreshPeriod,
const TDuration& requestTimeout) {
- return std::make_shared<TSecuredServiceAccountCredentialsFactoryImpl>(tokenAccessorEndpoint, useSsl, sslCaCert, refreshPeriod, requestTimeout);
+ return std::make_shared<TSecuredServiceAccountCredentialsFactoryImpl>(tokenAccessorEndpoint, useSsl, sslCaCert, connectionPoolSize, refreshPeriod, requestTimeout);
}
std::shared_ptr<NYdb::ICredentialsProviderFactory> CreateCredentialsProviderFactoryForStructuredToken(ISecuredServiceAccountCredentialsFactory::TPtr factory, const TString& structuredTokenJson, bool addBearerToToken) {
diff --git a/ydb/library/yql/providers/common/token_accessor/client/factory.h b/ydb/library/yql/providers/common/token_accessor/client/factory.h
index f798949e29..c98989417a 100644
--- a/ydb/library/yql/providers/common/token_accessor/client/factory.h
+++ b/ydb/library/yql/providers/common/token_accessor/client/factory.h
@@ -18,6 +18,7 @@ ISecuredServiceAccountCredentialsFactory::TPtr CreateSecuredServiceAccountCreden
const TString& tokenAccessorEndpoint,
bool useSsl,
const TString& sslCaCert,
+ ui32 connectionPoolSize = 0,
const TDuration& refreshPeriod = TDuration::Hours(1),
const TDuration& requestTimeout = TDuration::Seconds(10)
);
diff --git a/ydb/library/yql/providers/common/token_accessor/client/token_accessor_client.cpp b/ydb/library/yql/providers/common/token_accessor/client/token_accessor_client.cpp
index 220f624e3f..b11c1d0f47 100644
--- a/ydb/library/yql/providers/common/token_accessor/client/token_accessor_client.cpp
+++ b/ydb/library/yql/providers/common/token_accessor/client/token_accessor_client.cpp
@@ -20,27 +20,22 @@ class TTokenAccessorCredentialsProvider : public NYdb::ICredentialsProvider {
private:
class TImpl : public std::enable_shared_from_this<TImpl> {
public:
- TImpl(const TString& tokenAccessorEndpoint,
- bool useSsl,
- const TString& sslCaCert,
+ TImpl(
+ std::shared_ptr<NGrpc::TGRpcClientLow> client,
+ std::shared_ptr<NGrpc::TServiceConnection<TokenAccessorService>> connection,
const TString& serviceAccountId,
const TString& serviceAccountIdSignature,
const TDuration& refreshPeriod,
const TDuration& requestTimeout)
- : Client(std::make_unique<NGrpc::TGRpcClientLow>())
+ : Client(std::move(client))
+ , Connection(std::move(connection))
, NextTicketUpdate(TInstant::Zero())
- , TokenAccessorEndpoint(tokenAccessorEndpoint)
, ServiceAccountId(serviceAccountId)
, ServiceAccountIdSignature(serviceAccountIdSignature)
, RefreshPeriod(refreshPeriod)
, RequestTimeout(requestTimeout)
, Infly(0)
{
- NGrpc::TGRpcClientConfig grpcConf;
- grpcConf.Locator = tokenAccessorEndpoint;
- grpcConf.EnableSsl = useSsl;
- grpcConf.SslCaCert = sslCaCert;
- Connection = Client->CreateGRpcServiceConnection<TokenAccessorService>(grpcConf);
}
void UpdateTicket(bool sync = false) const {
@@ -51,9 +46,11 @@ private:
RequestInflight = true;
auto resultPromise = NThreading::NewPromise();
- std::shared_ptr<const TImpl> self = shared_from_this();
- auto cb = [self, resultPromise, sync](NGrpc::TGrpcStatus&& status, GetTokenResponse&& result) mutable {
- self->ProcessResponse(std::move(status), std::move(result), sync);
+ std::weak_ptr<const TImpl> weakSelf = shared_from_this();
+ auto cb = [weakSelf, resultPromise, sync](NGrpc::TGrpcStatus&& status, GetTokenResponse&& result) mutable {
+ if (auto self = weakSelf.lock()) {
+ self->ProcessResponse(std::move(status), std::move(result), sync);
+ }
resultPromise.SetValue();
};
@@ -95,8 +92,6 @@ private:
void Stop() {
NeedStop = true;
-
- Client.reset(); // Will trigger destroy
}
private:
@@ -106,7 +101,7 @@ private:
--Infly;
LastRequestError = TStringBuilder() << "Last request error was at " << TInstant::Now()
<< ". GrpcStatusCode: " << status.GRpcStatusCode << " Message: \"" << status.Msg
- << "\" internal: " << status.InternalError << " token accessor endpoint: \"" << TokenAccessorEndpoint << "\"";
+ << "\" internal: " << status.InternalError;
}
RequestInflight = false;
Sleep(std::min(BackoffTimeout, BACKOFF_MAX));
@@ -125,11 +120,10 @@ private:
}
private:
- std::unique_ptr<NGrpc::TGRpcClientLow> Client;
- std::unique_ptr<NGrpc::TServiceConnection<TokenAccessorService>> Connection;
+ const std::shared_ptr<NGrpc::TGRpcClientLow> Client;
+ const std::shared_ptr<NGrpc::TServiceConnection<TokenAccessorService>> Connection;
mutable TString Ticket;
mutable TInstant NextTicketUpdate;
- const TString TokenAccessorEndpoint;
const TString ServiceAccountId;
const TString ServiceAccountIdSignature;
const TDuration RefreshPeriod;
@@ -144,15 +138,14 @@ private:
public:
TTokenAccessorCredentialsProvider(
- const TString& tokenAccessorEndpoint,
- bool useSsl,
- const TString& sslCaCert,
+ std::shared_ptr<NGrpc::TGRpcClientLow> client,
+ std::shared_ptr<NGrpc::TServiceConnection<TokenAccessorService>> connection,
const TString& serviceAccountId,
const TString& serviceAccountIdSignature,
const TDuration& refreshPeriod,
const TDuration& requestTimeout
)
- : Impl(std::make_shared<TImpl>(tokenAccessorEndpoint, useSsl, sslCaCert, serviceAccountId, serviceAccountIdSignature, refreshPeriod, requestTimeout))
+ : Impl(std::make_shared<TImpl>(std::move(client), std::move(connection), serviceAccountId, serviceAccountIdSignature, refreshPeriod, requestTimeout))
{
Impl->UpdateTicket(true);
}
@@ -183,7 +176,28 @@ std::shared_ptr<NYdb::ICredentialsProvider> CreateTokenAccessorCredentialsProvid
const TString& serviceAccountIdSignature,
const TDuration& refreshPeriod,
const TDuration& requestTimeout
-) {
- return std::make_shared<TTokenAccessorCredentialsProvider>(tokenAccessorEndpoint, useSsl, sslCaCert, serviceAccountId, serviceAccountIdSignature, refreshPeriod, requestTimeout);
+)
+{
+ auto client = std::make_unique<NGrpc::TGRpcClientLow>();
+ NGrpc::TGRpcClientConfig grpcConf;
+ grpcConf.Locator = tokenAccessorEndpoint;
+ grpcConf.EnableSsl = useSsl;
+ grpcConf.SslCaCert = sslCaCert;
+ std::shared_ptr<NGrpc::TServiceConnection<TokenAccessorService>> connection = client->CreateGRpcServiceConnection<TokenAccessorService>(grpcConf);
+
+ return CreateTokenAccessorCredentialsProvider(std::move(client), std::move(connection), serviceAccountId, serviceAccountIdSignature, refreshPeriod, requestTimeout);
+}
+
+std::shared_ptr<NYdb::ICredentialsProvider> CreateTokenAccessorCredentialsProvider(
+ std::shared_ptr<NGrpc::TGRpcClientLow> client,
+ std::shared_ptr<NGrpc::TServiceConnection<TokenAccessorService>> connection,
+ const TString& serviceAccountId,
+ const TString& serviceAccountIdSignature,
+ const TDuration& refreshPeriod,
+ const TDuration& requestTimeout
+)
+{
+ return std::make_shared<TTokenAccessorCredentialsProvider>(std::move(client), std::move(connection), serviceAccountId, serviceAccountIdSignature, refreshPeriod, requestTimeout);
}
+
}
diff --git a/ydb/library/yql/providers/common/token_accessor/client/token_accessor_client.h b/ydb/library/yql/providers/common/token_accessor/client/token_accessor_client.h
index 768a12f5e7..141ce36f83 100644
--- a/ydb/library/yql/providers/common/token_accessor/client/token_accessor_client.h
+++ b/ydb/library/yql/providers/common/token_accessor/client/token_accessor_client.h
@@ -1,5 +1,8 @@
#pragma once
+#include <ydb/library/yql/providers/common/token_accessor/grpc/token_accessor_pb.grpc.pb.h>
+#include <library/cpp/grpc/client/grpc_client_low.h>
+
#include <ydb/public/sdk/cpp/client/ydb_types/credentials/credentials.h>
#include <util/datetime/base.h>
@@ -15,4 +18,13 @@ std::shared_ptr<NYdb::ICredentialsProvider> CreateTokenAccessorCredentialsProvid
const TDuration& requestTimeout = TDuration::Seconds(10)
);
+std::shared_ptr<NYdb::ICredentialsProvider> CreateTokenAccessorCredentialsProvider(
+ std::shared_ptr<NGrpc::TGRpcClientLow> client,
+ std::shared_ptr<NGrpc::TServiceConnection<TokenAccessorService>> connection,
+ const TString& serviceAccountId,
+ const TString& serviceAccountIdSignature,
+ const TDuration& refreshPeriod = TDuration::Hours(1),
+ const TDuration& requestTimeout = TDuration::Seconds(10)
+);
+
}
diff --git a/ydb/library/yql/providers/common/token_accessor/client/token_accessor_client_factory.cpp b/ydb/library/yql/providers/common/token_accessor/client/token_accessor_client_factory.cpp
index 65ed25856f..da77d08ff6 100644
--- a/ydb/library/yql/providers/common/token_accessor/client/token_accessor_client_factory.cpp
+++ b/ydb/library/yql/providers/common/token_accessor/client/token_accessor_client_factory.cpp
@@ -10,17 +10,15 @@ namespace {
class TTokenAccessorCredentialsProviderFactory : public NYdb::ICredentialsProviderFactory {
public:
TTokenAccessorCredentialsProviderFactory(
- const TString& tokenAccessorEndpoint,
- bool useSsl,
- const TString& sslCaCert,
+ std::shared_ptr<NGrpc::TGRpcClientLow> client,
+ std::shared_ptr<NGrpc::TServiceConnection<TokenAccessorService>> connection,
const TString& serviceAccountId,
const TString& serviceAccountIdSignature,
const TDuration& refreshPeriod,
const TDuration& requestTimeout
)
- : TokenAccessorEndpoint(tokenAccessorEndpoint)
- , UseSsl(useSsl)
- , SslCaCert(sslCaCert)
+ : Client(std::move(client))
+ , Connection(std::move(connection))
, ServiceAccountId(serviceAccountId)
, ServiceAccountIdSignature(serviceAccountIdSignature)
, RefreshPeriod(refreshPeriod)
@@ -29,13 +27,12 @@ public:
}
std::shared_ptr<NYdb::ICredentialsProvider> CreateProvider() const override {
- return CreateTokenAccessorCredentialsProvider(TokenAccessorEndpoint, UseSsl, SslCaCert, ServiceAccountId, ServiceAccountIdSignature, RefreshPeriod, RequestTimeout);
+ return CreateTokenAccessorCredentialsProvider(Client, Connection, ServiceAccountId, ServiceAccountIdSignature, RefreshPeriod, RequestTimeout);
}
private:
- const TString TokenAccessorEndpoint;
- const bool UseSsl;
- const TString SslCaCert;
+ const std::shared_ptr<NGrpc::TGRpcClientLow> Client;
+ const std::shared_ptr<NGrpc::TServiceConnection<TokenAccessorService>> Connection;
const TString ServiceAccountId;
const TString ServiceAccountIdSignature;
const TDuration RefreshPeriod;
@@ -51,8 +48,29 @@ std::shared_ptr<NYdb::ICredentialsProviderFactory> CreateTokenAccessorCredential
const TString& serviceAccountId,
const TString& serviceAccountIdSignature,
const TDuration& refreshPeriod,
- const TDuration& requestTimeout)
+ const TDuration& requestTimeout
+)
{
- return std::make_shared<TTokenAccessorCredentialsProviderFactory>(tokenAccessorEndpoint, useSsl, sslCaCert, serviceAccountId, serviceAccountIdSignature, refreshPeriod, requestTimeout);
+ auto client = std::make_unique<NGrpc::TGRpcClientLow>();
+ NGrpc::TGRpcClientConfig grpcConf;
+ grpcConf.Locator = tokenAccessorEndpoint;
+ grpcConf.EnableSsl = useSsl;
+ grpcConf.SslCaCert = sslCaCert;
+ std::shared_ptr<NGrpc::TServiceConnection<TokenAccessorService>> connection = client->CreateGRpcServiceConnection<TokenAccessorService>(grpcConf);
+
+ return CreateTokenAccessorCredentialsProviderFactory(std::move(client), std::move(connection), serviceAccountId, serviceAccountIdSignature, refreshPeriod, requestTimeout);
}
+
+std::shared_ptr<NYdb::ICredentialsProviderFactory> CreateTokenAccessorCredentialsProviderFactory(
+ std::shared_ptr<NGrpc::TGRpcClientLow> client,
+ std::shared_ptr<NGrpc::TServiceConnection<TokenAccessorService>> connection,
+ const TString& serviceAccountId,
+ const TString& serviceAccountIdSignature,
+ const TDuration& refreshPeriod,
+ const TDuration& requestTimeout
+)
+{
+ return std::make_shared<TTokenAccessorCredentialsProviderFactory>(std::move(client), std::move(connection), serviceAccountId, serviceAccountIdSignature, refreshPeriod, requestTimeout);
+}
+
}
diff --git a/ydb/library/yql/providers/common/token_accessor/client/token_accessor_client_factory.h b/ydb/library/yql/providers/common/token_accessor/client/token_accessor_client_factory.h
index b3b82acdb2..2005db5819 100644
--- a/ydb/library/yql/providers/common/token_accessor/client/token_accessor_client_factory.h
+++ b/ydb/library/yql/providers/common/token_accessor/client/token_accessor_client_factory.h
@@ -1,5 +1,8 @@
#pragma once
+#include <ydb/library/yql/providers/common/token_accessor/grpc/token_accessor_pb.grpc.pb.h>
+#include <library/cpp/grpc/client/grpc_client_low.h>
+
#include <ydb/public/sdk/cpp/client/ydb_types/credentials/credentials.h>
#include <util/datetime/base.h>
@@ -15,5 +18,14 @@ std::shared_ptr<NYdb::ICredentialsProviderFactory> CreateTokenAccessorCredential
const TDuration& requestTimeout = TDuration::Seconds(10)
);
+std::shared_ptr<NYdb::ICredentialsProviderFactory> CreateTokenAccessorCredentialsProviderFactory(
+ std::shared_ptr<NGrpc::TGRpcClientLow> client,
+ std::shared_ptr<NGrpc::TServiceConnection<TokenAccessorService>> connection,
+ const TString& serviceAccountId,
+ const TString& serviceAccountIdSignature,
+ const TDuration& refreshPeriod = TDuration::Hours(1),
+ const TDuration& requestTimeout = TDuration::Seconds(10)
+);
+
}