diff options
author | vitalyisaev <vitalyisaev@yandex-team.com> | 2023-08-29 13:24:18 +0300 |
---|---|---|
committer | vitalyisaev <vitalyisaev@yandex-team.com> | 2023-08-29 13:46:06 +0300 |
commit | 0e130abb0980cee82bd511ac6f197a350e22c3a1 (patch) | |
tree | 0b52312e511327065c4f6de98d6b264e35f816ba | |
parent | baf6a87500751b9ab0cd6c83b83543cdc3931b17 (diff) | |
download | ydb-0e130abb0980cee82bd511ac6f197a350e22c3a1.tar.gz |
YQ Connector: prepare generic provider for KQP integration
18 files changed, 190 insertions, 142 deletions
diff --git a/ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.json b/ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.json index a426ccb1ed6..a36acc049e8 100644 --- a/ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.json +++ b/ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.json @@ -32,8 +32,7 @@ {"Index": 0, "Name": "World", "Type": "TExprBase"}, {"Index": 1, "Name": "DataSource", "Type": "TGenDataSource"}, {"Index": 2, "Name": "Table", "Type": "TCoAtom"}, - {"Index": 3, "Name": "Columns", "Type": "TExprBase"}, - {"Index": 4, "Name": "Timezone", "Type": "TCoAtom"} + {"Index": 3, "Name": "Columns", "Type": "TExprBase"} ] }, { diff --git a/ydb/library/yql/providers/generic/provider/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/generic/provider/CMakeLists.darwin-x86_64.txt index d41aad8c1c6..df173a76dfc 100644 --- a/ydb/library/yql/providers/generic/provider/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/providers/generic/provider/CMakeLists.darwin-x86_64.txt @@ -51,4 +51,6 @@ target_sources(providers-generic-provider PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_mkql_compiler.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_state.cpp ) diff --git a/ydb/library/yql/providers/generic/provider/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/generic/provider/CMakeLists.linux-aarch64.txt index 712f9b54de7..d208cb547b6 100644 --- a/ydb/library/yql/providers/generic/provider/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/providers/generic/provider/CMakeLists.linux-aarch64.txt @@ -52,4 +52,6 @@ target_sources(providers-generic-provider PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_mkql_compiler.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_state.cpp ) diff --git a/ydb/library/yql/providers/generic/provider/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/generic/provider/CMakeLists.linux-x86_64.txt index 712f9b54de7..d208cb547b6 100644 --- a/ydb/library/yql/providers/generic/provider/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/providers/generic/provider/CMakeLists.linux-x86_64.txt @@ -52,4 +52,6 @@ target_sources(providers-generic-provider PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_mkql_compiler.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_state.cpp ) diff --git a/ydb/library/yql/providers/generic/provider/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/generic/provider/CMakeLists.windows-x86_64.txt index d41aad8c1c6..df173a76dfc 100644 --- a/ydb/library/yql/providers/generic/provider/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/providers/generic/provider/CMakeLists.windows-x86_64.txt @@ -51,4 +51,6 @@ target_sources(providers-generic-provider PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_mkql_compiler.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_state.cpp ) diff --git a/ydb/library/yql/providers/generic/provider/ya.make b/ydb/library/yql/providers/generic/provider/ya.make index 478c925ec09..62dbf63531b 100644 --- a/ydb/library/yql/providers/generic/provider/ya.make +++ b/ydb/library/yql/providers/generic/provider/ya.make @@ -16,7 +16,9 @@ SRCS( yql_generic_provider.h yql_generic_provider_impl.h yql_generic_settings.h + yql_generic_settings.cpp yql_generic_state.h + yql_generic_state.cpp ) YQL_LAST_ABI_VERSION() diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_datasource.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_datasource.cpp index 7b5de4b2dd3..8919b68d452 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_datasource.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_datasource.cpp @@ -2,12 +2,13 @@ #include "yql_generic_provider_impl.h" #include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h> +#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h> #include <ydb/library/yql/providers/common/provider/yql_data_provider_impl.h> #include <ydb/library/yql/providers/common/provider/yql_provider.h> #include <ydb/library/yql/providers/common/provider/yql_provider_names.h> +#include <ydb/library/yql/providers/generic/connector/libcpp/client.h> #include <ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.h> #include <ydb/library/yql/utils/log/log.h> -#include <ydb/library/yql/providers/generic/connector/libcpp/client.h> namespace NYql { @@ -17,10 +18,10 @@ namespace NYql { class TGenericDataSource: public TDataProviderBase { public: - TGenericDataSource(TGenericState::TPtr state, NConnector::IClient::TPtr client) + TGenericDataSource(TGenericState::TPtr state) : State_(state) , IODiscoveryTransformer_(CreateGenericIODiscoveryTransformer(State_)) - , LoadMetaDataTransformer_(CreateGenericLoadTableMetadataTransformer(State_, std::move(client))) + , LoadMetaDataTransformer_(CreateGenericLoadTableMetadataTransformer(State_)) , TypeAnnotationTransformer_(CreateGenericDataSourceTypeAnnotationTransformer(State_)) , DqIntegration_(CreateGenericDqIntegration(State_)) { @@ -138,8 +139,8 @@ namespace NYql { } - TIntrusivePtr<IDataProvider> CreateGenericDataSource(TGenericState::TPtr state, NConnector::IClient::TPtr client) { - return new TGenericDataSource(std::move(state), std::move(client)); + TIntrusivePtr<IDataProvider> CreateGenericDataSource(TGenericState::TPtr state) { + return new TGenericDataSource(std::move(state)); } } // namespace NYql diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp index 801ae451642..c36aaa1eca6 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp @@ -90,7 +90,7 @@ namespace NYql { TStatus HandleReadTable(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { Y_UNUSED(output); - if (!EnsureArgsCount(*input, 5, ctx)) { + if (!EnsureArgsCount(*input, 4, ctx)) { return TStatus::Error; } @@ -106,10 +106,6 @@ namespace NYql { return TStatus::Error; } - if (!EnsureAtom(*input->Child(TGenReadTable::idx_Timezone), ctx)) { - return TStatus::Error; - } - TMaybe<THashSet<TStringBuf>> columnSet; auto columns = input->Child(TGenReadTable::idx_Columns); if (!columns->IsCallable(TCoVoid::CallableName())) { diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp index af5325d7d56..9ef5bdef89e 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp @@ -11,6 +11,7 @@ #include <ydb/library/yql/providers/dq/common/yql_dq_settings.h> #include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h> #include <ydb/library/yql/providers/generic/connector/libcpp/utils.h> +#include <ydb/library/yql/utils/log/log.h> namespace NYql { diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp index c6684516288..83fbf8a405b 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp @@ -6,6 +6,7 @@ #include <ydb/library/yql/core/yql_graph_transformer.h> #include <ydb/library/yql/providers/common/provider/yql_provider_names.h> #include <ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.h> +#include <ydb/library/yql/utils/log/log.h> namespace NYql { diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp index a7416e27c6e..de548cf5d78 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp @@ -42,9 +42,8 @@ namespace NYql { using TMapType = std::unordered_map<TGenericState::TTableAddress, TGenericTableDescription, THash<TGenericState::TTableAddress>>; public: - TGenericLoadTableMetadataTransformer(TGenericState::TPtr state, NConnector::IClient::TPtr client) + TGenericLoadTableMetadataTransformer(TGenericState::TPtr state) : State_(std::move(state)) - , Client_(std::move(client)) { } @@ -127,7 +126,7 @@ namespace NYql { dsi->set_use_tls(clusterConfig.GetUseSsl()); // NOTE: errors will be checked further in DoApplyAsyncChanges - Results_.emplace(item, TGenericTableDescription(request.data_source_instance(), Client_->DescribeTable(request))); + Results_.emplace(item, TGenericTableDescription(request.data_source_instance(), State_->GenericClient->DescribeTable(request))); // FIXME: for the sake of simplicity, asynchronous workflow is broken now. Fix it some day. auto promise = NThreading::NewPromise(); @@ -177,9 +176,8 @@ namespace NYql { const auto& parse = ParseTableMeta(tableMeta.Schema, clusterName, tableName, ctx, tableMeta.ColumnOrder); - if (parse.first) { - tableMeta.ItemType = parse.first; - State_->Timezones[read.DataSource().Cluster().Value()] = ctx.AppendString(parse.second); + if (parse) { + tableMeta.ItemType = parse; if (const auto ins = replaces.emplace(read.Raw(), TExprNode::TPtr()); ins.second) { // clang-format off ins.first->second = Build<TGenReadTable>(ctx, read.Pos()) @@ -187,7 +185,6 @@ namespace NYql { .DataSource(read.DataSource()) .Table().Value(tableName).Build() .Columns<TCoVoid>().Build() - .Timezone().Value(parse.second).Build() .Done().Ptr(); // clang-format on } @@ -224,16 +221,16 @@ namespace NYql { } private: - std::pair<const TStructExprType*, TString> ParseTableMeta(const NConnector::NApi::TSchema& schema, - const std::string_view& cluster, - const std::string_view& table, TExprContext& ctx, - TVector<TString>& columnOrder) try { + const TStructExprType* ParseTableMeta(const NConnector::NApi::TSchema& schema, + const std::string_view& cluster, + const std::string_view& table, TExprContext& ctx, + TVector<TString>& columnOrder) try { TVector<const TItemExprType*> items; auto columns = schema.columns(); if (columns.empty()) { ctx.AddError(TIssue({}, TStringBuilder() << "Table " << cluster << '.' << table << " doesn't exist.")); - return {nullptr, {}}; + return nullptr; } for (auto i = 0; i < columns.size(); i++) { @@ -246,23 +243,21 @@ namespace NYql { columnOrder.emplace_back(columns.Get(i).name()); } // FIXME: handle on Connector's side? - return std::make_pair(ctx.MakeType<TStructExprType>(items), TString("Europe/Moscow")); + return ctx.MakeType<TStructExprType>(items); } catch (std::exception&) { ctx.AddError(TIssue({}, TStringBuilder() << "Failed to parse table metadata: " << CurrentExceptionMessage())); - return {nullptr, {}}; + return nullptr; } private: const TGenericState::TPtr State_; - const NConnector::IClient::TPtr Client_; TMapType Results_; NThreading::TFuture<void> AsyncFuture_; }; - THolder<IGraphTransformer> CreateGenericLoadTableMetadataTransformer(TGenericState::TPtr state, - NConnector::IClient::TPtr client) { - return MakeHolder<TGenericLoadTableMetadataTransformer>(std::move(state), std::move(client)); + THolder<IGraphTransformer> CreateGenericLoadTableMetadataTransformer(TGenericState::TPtr state) { + return MakeHolder<TGenericLoadTableMetadataTransformer>(std::move(state)); } } // namespace NYql diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp index d6ed0ab63d1..d3122f5d938 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp @@ -21,19 +21,17 @@ namespace NYql { Y_UNUSED(progressWriter); Y_UNUSED(operationOptions); - auto state = MakeIntrusive<TGenericState>(); - - state->Types = typeCtx.Get(); - state->FunctionRegistry = functionRegistry; - state->DatabaseResolver = dbResolver; - if (gatewaysConfig) { - state->Configuration->Init(gatewaysConfig->GetGeneric(), state->DatabaseResolver, state->DatabaseAuth, typeCtx->Credentials); - } + auto state = MakeIntrusive<TGenericState>( + typeCtx.Get(), + functionRegistry, + dbResolver, + genericClient, + gatewaysConfig); TDataProviderInfo info; info.Names.insert({TString{GenericProviderName}}); - info.Source = CreateGenericDataSource(state, genericClient); + info.Source = CreateGenericDataSource(state); info.Sink = CreateGenericDataSink(state); return info; diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_provider.h b/ydb/library/yql/providers/generic/provider/yql_generic_provider.h index 8f3144f151f..d990b2084bb 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_provider.h +++ b/ydb/library/yql/providers/generic/provider/yql_generic_provider.h @@ -7,12 +7,12 @@ #include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h> namespace NYql { - TDataProviderInitializer - GetGenericDataProviderInitializer(NConnector::IClient::TPtr genericClient, - std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver = nullptr); + TDataProviderInitializer GetGenericDataProviderInitializer( + NConnector::IClient::TPtr genericClient, // required + std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver = nullptr // can be missing in on-prem installations + ); - TIntrusivePtr<IDataProvider> CreateGenericDataSource(TGenericState::TPtr state, - NConnector::IClient::TPtr genericClient); + TIntrusivePtr<IDataProvider> CreateGenericDataSource(TGenericState::TPtr state); TIntrusivePtr<IDataProvider> CreateGenericDataSink(TGenericState::TPtr state); diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_provider_impl.h b/ydb/library/yql/providers/generic/provider/yql_generic_provider_impl.h index 73aa4e77ddd..1ad7e3f7b9b 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_provider_impl.h +++ b/ydb/library/yql/providers/generic/provider/yql_generic_provider_impl.h @@ -10,7 +10,7 @@ namespace NYql { THolder<IGraphTransformer> CreateGenericIODiscoveryTransformer(TGenericState::TPtr state); - THolder<IGraphTransformer> CreateGenericLoadTableMetadataTransformer(TGenericState::TPtr state, NConnector::IClient::TPtr client); + THolder<IGraphTransformer> CreateGenericLoadTableMetadataTransformer(TGenericState::TPtr state); THolder<TVisitorTransformerBase> CreateGenericDataSourceTypeAnnotationTransformer(TGenericState::TPtr state); THolder<TVisitorTransformerBase> CreateGenericDataSinkTypeAnnotationTransformer(TGenericState::TPtr state); diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp new file mode 100644 index 00000000000..fbf995061fb --- /dev/null +++ b/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp @@ -0,0 +1,84 @@ +#include "yql_generic_settings.h" + +#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h> +#include <ydb/library/yql/utils/log/log.h> + +namespace NYql { + + void TGenericConfiguration::Init(const NYql::TGenericGatewayConfig& gatewayConfig, + const std::shared_ptr<NYql::IDatabaseAsyncResolver> databaseResolver, + NYql::IDatabaseAsyncResolver::TDatabaseAuthMap& databaseAuth, + const TCredentials::TPtr& credentials) + { + for (const auto& cluster : gatewayConfig.GetClusterMapping()) { + AddCluster(cluster, databaseResolver, databaseAuth, credentials); + } + + // TODO: check if it's necessary + this->FreezeDefaults(); + } + + void TGenericConfiguration::AddCluster(const TGenericClusterConfig& clusterConfig, + const std::shared_ptr<NYql::IDatabaseAsyncResolver> databaseResolver, + NYql::IDatabaseAsyncResolver::TDatabaseAuthMap& databaseAuth, + const TCredentials::TPtr& credentials) { + const auto& clusterName = clusterConfig.GetName(); + const auto& databaseId = clusterConfig.GetDatabaseId(); + const auto& endpoint = clusterConfig.GetEndpoint(); + + YQL_CLOG(DEBUG, ProviderGeneric) + << "add cluster" + << ": name = " << clusterName + << ", database id = " << databaseId + << ", endpoint = " << endpoint; + + // if cluster FQDN's is not known + if (databaseResolver && databaseId) { + const auto token = MakeStructuredToken(clusterConfig, credentials); + + databaseAuth[std::make_pair(databaseId, DataSourceKindToDatabaseType(clusterConfig.GetKind()))] = + NYql::TDatabaseAuth{token, /*AddBearer=*/true}; + + DatabaseIdsToClusterNames[databaseId].emplace_back(clusterName); + YQL_CLOG(DEBUG, ProviderGeneric) << "database id '" << databaseId << "' added to mapping"; + } + + // NOTE: Tokens map is filled just because it's required by YQL engine code. + // The only reason for provider to store these tokens is + // to keep compatibility with YQL engine. + // Real credentials are stored in TGenericClusterConfig. + Tokens[clusterConfig.GetName()] = " "; + + // preserve cluster config entirely for the further use + ClusterNamesToClusterConfigs[clusterName] = clusterConfig; + + // Add cluster to the list of valid clusters + this->ValidClusters.insert(clusterConfig.GetName()); + } + + // Structured tokens are used to access MDB API. They can be constructed either from IAM tokens, or from SA credentials. + TString TGenericConfiguration::MakeStructuredToken(const TGenericClusterConfig& cluster, const TCredentials::TPtr& credentials) const { + TStructuredTokenBuilder b; + + const auto iamToken = credentials->FindCredentialContent("default_" + cluster.name(), "default_generic", cluster.GetToken()); + if (iamToken) { + return b.SetIAMToken(iamToken).ToJson(); + } + + if (cluster.HasServiceAccountId() && cluster.HasServiceAccountIdSignature()) { + return b.SetServiceAccountIdAuth(cluster.GetServiceAccountId(), cluster.GetServiceAccountIdSignature()).ToJson(); + } + + ythrow yexception() << "you should either provide IAM Token via credential system or cluster config, " + "or set (ServiceAccountId && ServiceAccountIdSignature) in cluster config"; + } + + TGenericSettings::TConstPtr TGenericConfiguration::Snapshot() const { + return std::make_shared<const TGenericSettings>(*this); + } + + bool TGenericConfiguration::HasCluster(TStringBuf cluster) const { + return ValidClusters.contains(cluster); + } + +}
\ No newline at end of file diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_settings.h b/ydb/library/yql/providers/generic/provider/yql_generic_settings.h index 9c2516b2fa5..685f0004ee4 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_settings.h +++ b/ydb/library/yql/providers/generic/provider/yql_generic_settings.h @@ -4,9 +4,6 @@ #include <ydb/library/yql/providers/common/config/yql_setting.h> #include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h> #include <ydb/library/yql/providers/common/proto/gateways_config.pb.h> -#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h> -#include <ydb/library/yql/providers/generic/connector/api/service/protos/connector.pb.h> -#include <ydb/library/yql/utils/log/log.h> namespace NYql { @@ -20,83 +17,23 @@ namespace NYql { TGenericConfiguration(){}; TGenericConfiguration(const TGenericConfiguration&) = delete; - template <typename TProtoConfig> - void Init(const TProtoConfig& config, + void Init(const NYql::TGenericGatewayConfig& gatewayConfig, const std::shared_ptr<NYql::IDatabaseAsyncResolver> databaseResolver, NYql::IDatabaseAsyncResolver::TDatabaseAuthMap& databaseAuth, - const TCredentials::TPtr& credentials) - { - TVector<TString> clusterNames(Reserve(config.ClusterMappingSize())); + const TCredentials::TPtr& credentials); - for (auto& cluster : config.GetClusterMapping()) { - clusterNames.push_back(cluster.GetName()); - ClusterNamesToClusterConfigs[cluster.GetName()] = cluster; - } - this->SetValidClusters(clusterNames); + void AddCluster(const TGenericClusterConfig& clusterConfig, + const std::shared_ptr<NYql::IDatabaseAsyncResolver> databaseResolver, + NYql::IDatabaseAsyncResolver::TDatabaseAuthMap& databaseAuth, + const TCredentials::TPtr& credentials); - for (const auto& cluster : config.GetClusterMapping()) { - InitCluster(cluster, databaseResolver, databaseAuth, credentials); - } - this->FreezeDefaults(); - } + TGenericSettings::TConstPtr Snapshot() const; + bool HasCluster(TStringBuf cluster) const; private: - void InitCluster(const TGenericClusterConfig& cluster, - const std::shared_ptr<NYql::IDatabaseAsyncResolver> databaseResolver, - NYql::IDatabaseAsyncResolver::TDatabaseAuthMap& databaseAuth, - const TCredentials::TPtr& credentials) { - const auto& clusterName = cluster.GetName(); - const auto& databaseId = cluster.GetDatabaseId(); - const auto& endpoint = cluster.GetEndpoint(); - - YQL_CLOG(DEBUG, ProviderGeneric) - << "initialize cluster" - << ": name = " << clusterName - << ", database id = " << databaseId - << ", endpoint = " << endpoint; - - if (databaseResolver && databaseId) { - const auto token = MakeStructuredToken(cluster, credentials); - - databaseAuth[std::make_pair(databaseId, DataSourceKindToDatabaseType(cluster.GetKind()))] = NYql::TDatabaseAuth{token, /*AddBearer=*/true}; - - DatabaseIdsToClusterNames[databaseId].emplace_back(clusterName); - YQL_CLOG(DEBUG, ProviderGeneric) << "database id '" << databaseId << "' added to mapping"; - } - - // NOTE: Tokens map is filled just because it's required by YQL legacy code. - // The only reason for provider to store these tokens is - // to keep compatibility with YQL engine. - // Real credentials are stored in TGenericClusterConfig. - Tokens[cluster.GetName()] = " "; - } - - // Structured tokens are used to access MDB API. They can be constructed from two - TString MakeStructuredToken(const TGenericClusterConfig& cluster, const TCredentials::TPtr& credentials) { - TStructuredTokenBuilder b; - - const auto iamToken = credentials->FindCredentialContent("default_" + cluster.name(), "default_generic", cluster.GetToken()); - if (iamToken) { - return b.SetIAMToken(iamToken).ToJson(); - } - - if (cluster.HasServiceAccountId() && cluster.HasServiceAccountIdSignature()) { - return b.SetServiceAccountIdAuth(cluster.GetServiceAccountId(), cluster.GetServiceAccountIdSignature()).ToJson(); - } - - ythrow yexception() << "you should either provide IAM Token via credential system or cluster config, " - "or set (ServiceAccountId && ServiceAccountIdSignature) in cluster config"; - } + TString MakeStructuredToken(const TGenericClusterConfig& clusterConfig, const TCredentials::TPtr& credentials) const; public: - TGenericSettings::TConstPtr Snapshot() const { - return std::make_shared<const TGenericSettings>(*this); - } - - bool HasCluster(TStringBuf cluster) const { - return ValidClusters.contains(cluster); - } - THashMap<TString, TString> Tokens; THashMap<TString, TGenericClusterConfig> ClusterNamesToClusterConfigs; // cluster name -> cluster config THashMap<TString, TVector<TString>> DatabaseIdsToClusterNames; // database id -> cluster name diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_state.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_state.cpp new file mode 100644 index 00000000000..a2d06296b90 --- /dev/null +++ b/ydb/library/yql/providers/generic/provider/yql_generic_state.cpp @@ -0,0 +1,28 @@ +#include "yql_generic_state.h" + +namespace NYql { + void TGenericState::AddTable(const TStringBuf& clusterName, const TStringBuf& tableName, TTableMeta&& tableMeta) { + Tables_.emplace(TTableAddress(clusterName, tableName), tableMeta); + } + + TGenericState::TGetTableResult TGenericState::GetTable(const TStringBuf& clusterName, const TStringBuf& tableName) const { + auto result = Tables_.FindPtr(TTableAddress(clusterName, tableName)); + if (result) { + return std::make_pair(result, std::nullopt); + } + + return std::make_pair( + std::nullopt, + TIssue(TStringBuilder() << "no metadata for table " << clusterName << "." << tableName)); + }; + + TGenericState::TGetTableResult TGenericState::GetTable(const TStringBuf& clusterName, const TStringBuf& tableName, const TPosition& position) const { + auto pair = TGenericState::GetTable(clusterName, tableName); + if (pair.second.has_value()) { + pair.second->Position = position; + } + + return pair; + } + +} diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_state.h b/ydb/library/yql/providers/generic/provider/yql_generic_state.h index c99b6d04263..f1c51892d47 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_state.h +++ b/ydb/library/yql/providers/generic/provider/yql_generic_state.h @@ -24,41 +24,39 @@ namespace NYql { using TGetTableResult = std::pair<std::optional<const TTableMeta*>, std::optional<TIssue>>; - void AddTable(const TStringBuf& clusterName, const TStringBuf& tableName, TTableMeta&& tableMeta) { - Tables_.emplace(TTableAddress(clusterName, tableName), tableMeta); - } - - TGetTableResult GetTable(const TStringBuf& clusterName, const TStringBuf& tableName) const { - auto result = Tables_.FindPtr(TTableAddress(clusterName, tableName)); - if (result) { - return std::make_pair(result, std::nullopt); - } - - return std::make_pair( - std::nullopt, - TIssue(TStringBuilder() << "no metadata for table " << clusterName << "." << tableName)); - }; - - TGetTableResult GetTable(const TStringBuf& clusterName, const TStringBuf& tableName, const TPosition& position) const { - auto pair = GetTable(clusterName, tableName); - if (pair.second.has_value()) { - pair.second->Position = position; + TGenericState() = delete; + + TGenericState( + TTypeAnnotationContext* types, + const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, + const std::shared_ptr<NYql::IDatabaseAsyncResolver>& databaseResolver, + const NConnector::IClient::TPtr& genericClient, + const TGatewaysConfig* gatewaysConfig = nullptr) + : Types(types) + , Configuration(MakeIntrusive<TGenericConfiguration>()) + , FunctionRegistry(functionRegistry) + , DatabaseResolver(databaseResolver) + , GenericClient(genericClient) + { + if (gatewaysConfig) { + Configuration->Init(gatewaysConfig->GetGeneric(), databaseResolver, DatabaseAuth, types->Credentials); } - - return pair; } - // FIXME: not used anymore, delete it some day - std::unordered_map<std::string_view, std::string_view> Timezones; + void AddTable(const TStringBuf& clusterName, const TStringBuf& tableName, TTableMeta&& tableMeta); + TGetTableResult GetTable(const TStringBuf& clusterName, const TStringBuf& tableName) const; + TGetTableResult GetTable(const TStringBuf& clusterName, const TStringBuf& tableName, const TPosition& position) const; - TTypeAnnotationContext* Types = nullptr; + TTypeAnnotationContext* Types; TGenericConfiguration::TPtr Configuration = MakeIntrusive<TGenericConfiguration>(); - const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr; + const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry; // key - (database id, database type), value - credentials to access MDB API NYql::IDatabaseAsyncResolver::TDatabaseAuthMap DatabaseAuth; std::shared_ptr<NYql::IDatabaseAsyncResolver> DatabaseResolver; + NConnector::IClient::TPtr GenericClient; + private: THashMap<TTableAddress, TTableMeta> Tables_; }; |