1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
#include "yql_generic_cluster_config.h"
#include "yql_generic_settings.h"
#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h>
#include <ydb/library/yql/utils/log/log.h>
namespace NYql {
const TString TGenericSettings::TDefault::DateTimeFormat = "string";
TGenericConfiguration::TGenericConfiguration() {
REGISTER_SETTING(*this, UsePredicatePushdown);
REGISTER_SETTING(*this, DateTimeFormat);
}
void TGenericConfiguration::Init(const NYql::TGenericGatewayConfig& gatewayConfig,
const std::shared_ptr<NYql::IDatabaseAsyncResolver> databaseResolver,
NYql::IDatabaseAsyncResolver::TDatabaseAuthMap& databaseAuth,
const TCredentials::TPtr& credentials)
{
Dispatch(gatewayConfig.GetDefaultSettings());
for (const auto& cluster : gatewayConfig.GetClusterMapping()) {
AddCluster(cluster, databaseResolver, databaseAuth, credentials);
}
// TODO: check if it's necessary
FreezeDefaults();
}
void TGenericConfiguration::AddCluster(const TGenericClusterConfig& clusterConfig,
const std::shared_ptr<NYql::IDatabaseAsyncResolver> databaseResolver,
NYql::IDatabaseAsyncResolver::TDatabaseAuthMap& databaseAuth,
const TCredentials::TPtr& credentials) {
ValidateGenericClusterConfig(clusterConfig, "TGenericConfiguration::AddCluster");
YQL_CLOG(INFO, ProviderGeneric) << "generic provider add cluster: " << DumpGenericClusterConfig(clusterConfig);
const auto& clusterName = clusterConfig.GetName();
const auto& databaseId = clusterConfig.GetDatabaseId();
if (databaseId) {
if (!databaseResolver) {
ythrow yexception() << "You're trying to access managed database, but database resolver is not configured.";
}
const auto token = MakeStructuredToken(clusterConfig, credentials);
databaseAuth[std::make_pair(databaseId, DatabaseTypeFromDataSourceKind(clusterConfig.GetKind()))] =
NYql::TDatabaseAuth{
.StructuredToken = token,
.AddBearerToToken = true,
.UseTls = clusterConfig.GetUseSsl(),
.Protocol = clusterConfig.GetProtocol()};
DatabaseIdsToClusterNames[databaseId].emplace_back(clusterName);
YQL_CLOG(DEBUG, ProviderGeneric) << "database id '" << databaseId << "' added to mapping";
}
// NOTE: Tokens map is filled just because it's required by DQ/KQP.
// The only reason for provider to store these tokens is
// to keep compatibility with these engines.
// Real credentials are stored in TGenericClusterConfig.
Tokens[clusterConfig.GetName()] =
TStructuredTokenBuilder()
.SetBasicAuth(
clusterConfig.GetCredentials().basic().username(),
clusterConfig.GetCredentials().basic().password())
.ToJson();
// preserve cluster config entirely for the further use
ClusterNamesToClusterConfigs[clusterName] = clusterConfig;
// Add cluster to the list of valid clusters
this->ValidClusters.insert(clusterConfig.GetName());
}
// Structured tokens are used to access MDB API. They can be constructed either from IAM tokens, or from SA credentials.
TString TGenericConfiguration::MakeStructuredToken(const TGenericClusterConfig& cluster, const TCredentials::TPtr& credentials) const {
TStructuredTokenBuilder b;
const auto iamToken = credentials->FindCredentialContent(
"default_" + cluster.name(),
"default_generic",
cluster.GetToken());
if (iamToken) {
return b.SetIAMToken(iamToken).ToJson();
}
if (cluster.HasServiceAccountId() && cluster.HasServiceAccountIdSignature()) {
return b.SetServiceAccountIdAuth(cluster.GetServiceAccountId(), cluster.GetServiceAccountIdSignature()).ToJson();
}
ythrow yexception() << "you should either provide IAM Token via credential system or cluster config, "
"or set (ServiceAccountId && ServiceAccountIdSignature) in cluster config";
}
TString TGenericConfiguration::DumpGenericClusterConfig(const TGenericClusterConfig& clusterConfig) const {
TStringBuilder sb;
sb << "name = " << clusterConfig.GetName()
<< ", kind = " << NConnector::NApi::EDataSourceKind_Name(clusterConfig.GetKind())
<< ", database name = " << clusterConfig.GetDatabaseName()
<< ", database id = " << clusterConfig.GetName()
<< ", endpoint = " << clusterConfig.GetEndpoint()
<< ", use tls = " << clusterConfig.GetUseSsl()
<< ", protocol = " << NConnector::NApi::EProtocol_Name(clusterConfig.GetProtocol());
for (const auto& [key, value] : clusterConfig.GetDataSourceOptions()) {
sb << ", " << key << " = " << value;
}
return sb;
}
TGenericSettings::TConstPtr TGenericConfiguration::Snapshot() const {
return std::make_shared<const TGenericSettings>(*this);
}
bool TGenericConfiguration::HasCluster(TStringBuf cluster) const {
return ValidClusters.contains(cluster);
}
}
|