aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvitalyisaev <vitalyisaev@yandex-team.com>2023-08-29 13:24:18 +0300
committervitalyisaev <vitalyisaev@yandex-team.com>2023-08-29 13:46:06 +0300
commit0e130abb0980cee82bd511ac6f197a350e22c3a1 (patch)
tree0b52312e511327065c4f6de98d6b264e35f816ba
parentbaf6a87500751b9ab0cd6c83b83543cdc3931b17 (diff)
downloadydb-0e130abb0980cee82bd511ac6f197a350e22c3a1.tar.gz
YQ Connector: prepare generic provider for KQP integration
-rw-r--r--ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.json3
-rw-r--r--ydb/library/yql/providers/generic/provider/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/library/yql/providers/generic/provider/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/library/yql/providers/generic/provider/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/library/yql/providers/generic/provider/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/library/yql/providers/generic/provider/ya.make2
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_datasource.cpp11
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp6
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp1
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp1
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp31
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp16
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_provider.h10
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_provider_impl.h2
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp84
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_settings.h81
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_state.cpp28
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_state.h48
18 files changed, 190 insertions, 142 deletions
diff --git a/ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.json b/ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.json
index a426ccb1ed6..a36acc049e8 100644
--- a/ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.json
+++ b/ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.json
@@ -32,8 +32,7 @@
{"Index": 0, "Name": "World", "Type": "TExprBase"},
{"Index": 1, "Name": "DataSource", "Type": "TGenDataSource"},
{"Index": 2, "Name": "Table", "Type": "TCoAtom"},
- {"Index": 3, "Name": "Columns", "Type": "TExprBase"},
- {"Index": 4, "Name": "Timezone", "Type": "TCoAtom"}
+ {"Index": 3, "Name": "Columns", "Type": "TExprBase"}
]
},
{
diff --git a/ydb/library/yql/providers/generic/provider/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/generic/provider/CMakeLists.darwin-x86_64.txt
index d41aad8c1c6..df173a76dfc 100644
--- a/ydb/library/yql/providers/generic/provider/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/providers/generic/provider/CMakeLists.darwin-x86_64.txt
@@ -51,4 +51,6 @@ target_sources(providers-generic-provider PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_mkql_compiler.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_state.cpp
)
diff --git a/ydb/library/yql/providers/generic/provider/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/generic/provider/CMakeLists.linux-aarch64.txt
index 712f9b54de7..d208cb547b6 100644
--- a/ydb/library/yql/providers/generic/provider/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/providers/generic/provider/CMakeLists.linux-aarch64.txt
@@ -52,4 +52,6 @@ target_sources(providers-generic-provider PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_mkql_compiler.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_state.cpp
)
diff --git a/ydb/library/yql/providers/generic/provider/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/generic/provider/CMakeLists.linux-x86_64.txt
index 712f9b54de7..d208cb547b6 100644
--- a/ydb/library/yql/providers/generic/provider/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/providers/generic/provider/CMakeLists.linux-x86_64.txt
@@ -52,4 +52,6 @@ target_sources(providers-generic-provider PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_mkql_compiler.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_state.cpp
)
diff --git a/ydb/library/yql/providers/generic/provider/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/generic/provider/CMakeLists.windows-x86_64.txt
index d41aad8c1c6..df173a76dfc 100644
--- a/ydb/library/yql/providers/generic/provider/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/providers/generic/provider/CMakeLists.windows-x86_64.txt
@@ -51,4 +51,6 @@ target_sources(providers-generic-provider PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_mkql_compiler.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_state.cpp
)
diff --git a/ydb/library/yql/providers/generic/provider/ya.make b/ydb/library/yql/providers/generic/provider/ya.make
index 478c925ec09..62dbf63531b 100644
--- a/ydb/library/yql/providers/generic/provider/ya.make
+++ b/ydb/library/yql/providers/generic/provider/ya.make
@@ -16,7 +16,9 @@ SRCS(
yql_generic_provider.h
yql_generic_provider_impl.h
yql_generic_settings.h
+ yql_generic_settings.cpp
yql_generic_state.h
+ yql_generic_state.cpp
)
YQL_LAST_ABI_VERSION()
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_datasource.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_datasource.cpp
index 7b5de4b2dd3..8919b68d452 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_datasource.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_datasource.cpp
@@ -2,12 +2,13 @@
#include "yql_generic_provider_impl.h"
#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
+#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h>
#include <ydb/library/yql/providers/common/provider/yql_data_provider_impl.h>
#include <ydb/library/yql/providers/common/provider/yql_provider.h>
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
+#include <ydb/library/yql/providers/generic/connector/libcpp/client.h>
#include <ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.h>
#include <ydb/library/yql/utils/log/log.h>
-#include <ydb/library/yql/providers/generic/connector/libcpp/client.h>
namespace NYql {
@@ -17,10 +18,10 @@ namespace NYql {
class TGenericDataSource: public TDataProviderBase {
public:
- TGenericDataSource(TGenericState::TPtr state, NConnector::IClient::TPtr client)
+ TGenericDataSource(TGenericState::TPtr state)
: State_(state)
, IODiscoveryTransformer_(CreateGenericIODiscoveryTransformer(State_))
- , LoadMetaDataTransformer_(CreateGenericLoadTableMetadataTransformer(State_, std::move(client)))
+ , LoadMetaDataTransformer_(CreateGenericLoadTableMetadataTransformer(State_))
, TypeAnnotationTransformer_(CreateGenericDataSourceTypeAnnotationTransformer(State_))
, DqIntegration_(CreateGenericDqIntegration(State_))
{
@@ -138,8 +139,8 @@ namespace NYql {
}
- TIntrusivePtr<IDataProvider> CreateGenericDataSource(TGenericState::TPtr state, NConnector::IClient::TPtr client) {
- return new TGenericDataSource(std::move(state), std::move(client));
+ TIntrusivePtr<IDataProvider> CreateGenericDataSource(TGenericState::TPtr state) {
+ return new TGenericDataSource(std::move(state));
}
} // namespace NYql
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp
index 801ae451642..c36aaa1eca6 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp
@@ -90,7 +90,7 @@ namespace NYql {
TStatus HandleReadTable(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
Y_UNUSED(output);
- if (!EnsureArgsCount(*input, 5, ctx)) {
+ if (!EnsureArgsCount(*input, 4, ctx)) {
return TStatus::Error;
}
@@ -106,10 +106,6 @@ namespace NYql {
return TStatus::Error;
}
- if (!EnsureAtom(*input->Child(TGenReadTable::idx_Timezone), ctx)) {
- return TStatus::Error;
- }
-
TMaybe<THashSet<TStringBuf>> columnSet;
auto columns = input->Child(TGenReadTable::idx_Columns);
if (!columns->IsCallable(TCoVoid::CallableName())) {
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp
index af5325d7d56..9ef5bdef89e 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp
@@ -11,6 +11,7 @@
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h>
#include <ydb/library/yql/providers/generic/connector/libcpp/utils.h>
+#include <ydb/library/yql/utils/log/log.h>
namespace NYql {
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp
index c6684516288..83fbf8a405b 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp
@@ -6,6 +6,7 @@
#include <ydb/library/yql/core/yql_graph_transformer.h>
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
#include <ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.h>
+#include <ydb/library/yql/utils/log/log.h>
namespace NYql {
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp
index a7416e27c6e..de548cf5d78 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp
@@ -42,9 +42,8 @@ namespace NYql {
using TMapType = std::unordered_map<TGenericState::TTableAddress, TGenericTableDescription, THash<TGenericState::TTableAddress>>;
public:
- TGenericLoadTableMetadataTransformer(TGenericState::TPtr state, NConnector::IClient::TPtr client)
+ TGenericLoadTableMetadataTransformer(TGenericState::TPtr state)
: State_(std::move(state))
- , Client_(std::move(client))
{
}
@@ -127,7 +126,7 @@ namespace NYql {
dsi->set_use_tls(clusterConfig.GetUseSsl());
// NOTE: errors will be checked further in DoApplyAsyncChanges
- Results_.emplace(item, TGenericTableDescription(request.data_source_instance(), Client_->DescribeTable(request)));
+ Results_.emplace(item, TGenericTableDescription(request.data_source_instance(), State_->GenericClient->DescribeTable(request)));
// FIXME: for the sake of simplicity, asynchronous workflow is broken now. Fix it some day.
auto promise = NThreading::NewPromise();
@@ -177,9 +176,8 @@ namespace NYql {
const auto& parse = ParseTableMeta(tableMeta.Schema, clusterName, tableName, ctx, tableMeta.ColumnOrder);
- if (parse.first) {
- tableMeta.ItemType = parse.first;
- State_->Timezones[read.DataSource().Cluster().Value()] = ctx.AppendString(parse.second);
+ if (parse) {
+ tableMeta.ItemType = parse;
if (const auto ins = replaces.emplace(read.Raw(), TExprNode::TPtr()); ins.second) {
// clang-format off
ins.first->second = Build<TGenReadTable>(ctx, read.Pos())
@@ -187,7 +185,6 @@ namespace NYql {
.DataSource(read.DataSource())
.Table().Value(tableName).Build()
.Columns<TCoVoid>().Build()
- .Timezone().Value(parse.second).Build()
.Done().Ptr();
// clang-format on
}
@@ -224,16 +221,16 @@ namespace NYql {
}
private:
- std::pair<const TStructExprType*, TString> ParseTableMeta(const NConnector::NApi::TSchema& schema,
- const std::string_view& cluster,
- const std::string_view& table, TExprContext& ctx,
- TVector<TString>& columnOrder) try {
+ const TStructExprType* ParseTableMeta(const NConnector::NApi::TSchema& schema,
+ const std::string_view& cluster,
+ const std::string_view& table, TExprContext& ctx,
+ TVector<TString>& columnOrder) try {
TVector<const TItemExprType*> items;
auto columns = schema.columns();
if (columns.empty()) {
ctx.AddError(TIssue({}, TStringBuilder() << "Table " << cluster << '.' << table << " doesn't exist."));
- return {nullptr, {}};
+ return nullptr;
}
for (auto i = 0; i < columns.size(); i++) {
@@ -246,23 +243,21 @@ namespace NYql {
columnOrder.emplace_back(columns.Get(i).name());
}
// FIXME: handle on Connector's side?
- return std::make_pair(ctx.MakeType<TStructExprType>(items), TString("Europe/Moscow"));
+ return ctx.MakeType<TStructExprType>(items);
} catch (std::exception&) {
ctx.AddError(TIssue({}, TStringBuilder() << "Failed to parse table metadata: " << CurrentExceptionMessage()));
- return {nullptr, {}};
+ return nullptr;
}
private:
const TGenericState::TPtr State_;
- const NConnector::IClient::TPtr Client_;
TMapType Results_;
NThreading::TFuture<void> AsyncFuture_;
};
- THolder<IGraphTransformer> CreateGenericLoadTableMetadataTransformer(TGenericState::TPtr state,
- NConnector::IClient::TPtr client) {
- return MakeHolder<TGenericLoadTableMetadataTransformer>(std::move(state), std::move(client));
+ THolder<IGraphTransformer> CreateGenericLoadTableMetadataTransformer(TGenericState::TPtr state) {
+ return MakeHolder<TGenericLoadTableMetadataTransformer>(std::move(state));
}
} // namespace NYql
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp
index d6ed0ab63d1..d3122f5d938 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp
@@ -21,19 +21,17 @@ namespace NYql {
Y_UNUSED(progressWriter);
Y_UNUSED(operationOptions);
- auto state = MakeIntrusive<TGenericState>();
-
- state->Types = typeCtx.Get();
- state->FunctionRegistry = functionRegistry;
- state->DatabaseResolver = dbResolver;
- if (gatewaysConfig) {
- state->Configuration->Init(gatewaysConfig->GetGeneric(), state->DatabaseResolver, state->DatabaseAuth, typeCtx->Credentials);
- }
+ auto state = MakeIntrusive<TGenericState>(
+ typeCtx.Get(),
+ functionRegistry,
+ dbResolver,
+ genericClient,
+ gatewaysConfig);
TDataProviderInfo info;
info.Names.insert({TString{GenericProviderName}});
- info.Source = CreateGenericDataSource(state, genericClient);
+ info.Source = CreateGenericDataSource(state);
info.Sink = CreateGenericDataSink(state);
return info;
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_provider.h b/ydb/library/yql/providers/generic/provider/yql_generic_provider.h
index 8f3144f151f..d990b2084bb 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_provider.h
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_provider.h
@@ -7,12 +7,12 @@
#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h>
namespace NYql {
- TDataProviderInitializer
- GetGenericDataProviderInitializer(NConnector::IClient::TPtr genericClient,
- std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver = nullptr);
+ TDataProviderInitializer GetGenericDataProviderInitializer(
+ NConnector::IClient::TPtr genericClient, // required
+ std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver = nullptr // can be missing in on-prem installations
+ );
- TIntrusivePtr<IDataProvider> CreateGenericDataSource(TGenericState::TPtr state,
- NConnector::IClient::TPtr genericClient);
+ TIntrusivePtr<IDataProvider> CreateGenericDataSource(TGenericState::TPtr state);
TIntrusivePtr<IDataProvider> CreateGenericDataSink(TGenericState::TPtr state);
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_provider_impl.h b/ydb/library/yql/providers/generic/provider/yql_generic_provider_impl.h
index 73aa4e77ddd..1ad7e3f7b9b 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_provider_impl.h
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_provider_impl.h
@@ -10,7 +10,7 @@
namespace NYql {
THolder<IGraphTransformer> CreateGenericIODiscoveryTransformer(TGenericState::TPtr state);
- THolder<IGraphTransformer> CreateGenericLoadTableMetadataTransformer(TGenericState::TPtr state, NConnector::IClient::TPtr client);
+ THolder<IGraphTransformer> CreateGenericLoadTableMetadataTransformer(TGenericState::TPtr state);
THolder<TVisitorTransformerBase> CreateGenericDataSourceTypeAnnotationTransformer(TGenericState::TPtr state);
THolder<TVisitorTransformerBase> CreateGenericDataSinkTypeAnnotationTransformer(TGenericState::TPtr state);
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp
new file mode 100644
index 00000000000..fbf995061fb
--- /dev/null
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp
@@ -0,0 +1,84 @@
+#include "yql_generic_settings.h"
+
+#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h>
+#include <ydb/library/yql/utils/log/log.h>
+
+namespace NYql {
+
+ void TGenericConfiguration::Init(const NYql::TGenericGatewayConfig& gatewayConfig,
+ const std::shared_ptr<NYql::IDatabaseAsyncResolver> databaseResolver,
+ NYql::IDatabaseAsyncResolver::TDatabaseAuthMap& databaseAuth,
+ const TCredentials::TPtr& credentials)
+ {
+ for (const auto& cluster : gatewayConfig.GetClusterMapping()) {
+ AddCluster(cluster, databaseResolver, databaseAuth, credentials);
+ }
+
+ // TODO: check if it's necessary
+ this->FreezeDefaults();
+ }
+
+ void TGenericConfiguration::AddCluster(const TGenericClusterConfig& clusterConfig,
+ const std::shared_ptr<NYql::IDatabaseAsyncResolver> databaseResolver,
+ NYql::IDatabaseAsyncResolver::TDatabaseAuthMap& databaseAuth,
+ const TCredentials::TPtr& credentials) {
+ const auto& clusterName = clusterConfig.GetName();
+ const auto& databaseId = clusterConfig.GetDatabaseId();
+ const auto& endpoint = clusterConfig.GetEndpoint();
+
+ YQL_CLOG(DEBUG, ProviderGeneric)
+ << "add cluster"
+ << ": name = " << clusterName
+ << ", database id = " << databaseId
+ << ", endpoint = " << endpoint;
+
+ // if cluster FQDN's is not known
+ if (databaseResolver && databaseId) {
+ const auto token = MakeStructuredToken(clusterConfig, credentials);
+
+ databaseAuth[std::make_pair(databaseId, DataSourceKindToDatabaseType(clusterConfig.GetKind()))] =
+ NYql::TDatabaseAuth{token, /*AddBearer=*/true};
+
+ DatabaseIdsToClusterNames[databaseId].emplace_back(clusterName);
+ YQL_CLOG(DEBUG, ProviderGeneric) << "database id '" << databaseId << "' added to mapping";
+ }
+
+ // NOTE: Tokens map is filled just because it's required by YQL engine code.
+ // The only reason for provider to store these tokens is
+ // to keep compatibility with YQL engine.
+ // Real credentials are stored in TGenericClusterConfig.
+ Tokens[clusterConfig.GetName()] = " ";
+
+ // preserve cluster config entirely for the further use
+ ClusterNamesToClusterConfigs[clusterName] = clusterConfig;
+
+ // Add cluster to the list of valid clusters
+ this->ValidClusters.insert(clusterConfig.GetName());
+ }
+
+ // Structured tokens are used to access MDB API. They can be constructed either from IAM tokens, or from SA credentials.
+ TString TGenericConfiguration::MakeStructuredToken(const TGenericClusterConfig& cluster, const TCredentials::TPtr& credentials) const {
+ TStructuredTokenBuilder b;
+
+ const auto iamToken = credentials->FindCredentialContent("default_" + cluster.name(), "default_generic", cluster.GetToken());
+ if (iamToken) {
+ return b.SetIAMToken(iamToken).ToJson();
+ }
+
+ if (cluster.HasServiceAccountId() && cluster.HasServiceAccountIdSignature()) {
+ return b.SetServiceAccountIdAuth(cluster.GetServiceAccountId(), cluster.GetServiceAccountIdSignature()).ToJson();
+ }
+
+ ythrow yexception() << "you should either provide IAM Token via credential system or cluster config, "
+ "or set (ServiceAccountId && ServiceAccountIdSignature) in cluster config";
+ }
+
+ TGenericSettings::TConstPtr TGenericConfiguration::Snapshot() const {
+ return std::make_shared<const TGenericSettings>(*this);
+ }
+
+ bool TGenericConfiguration::HasCluster(TStringBuf cluster) const {
+ return ValidClusters.contains(cluster);
+ }
+
+} \ No newline at end of file
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_settings.h b/ydb/library/yql/providers/generic/provider/yql_generic_settings.h
index 9c2516b2fa5..685f0004ee4 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_settings.h
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_settings.h
@@ -4,9 +4,6 @@
#include <ydb/library/yql/providers/common/config/yql_setting.h>
#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h>
#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h>
-#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h>
-#include <ydb/library/yql/providers/generic/connector/api/service/protos/connector.pb.h>
-#include <ydb/library/yql/utils/log/log.h>
namespace NYql {
@@ -20,83 +17,23 @@ namespace NYql {
TGenericConfiguration(){};
TGenericConfiguration(const TGenericConfiguration&) = delete;
- template <typename TProtoConfig>
- void Init(const TProtoConfig& config,
+ void Init(const NYql::TGenericGatewayConfig& gatewayConfig,
const std::shared_ptr<NYql::IDatabaseAsyncResolver> databaseResolver,
NYql::IDatabaseAsyncResolver::TDatabaseAuthMap& databaseAuth,
- const TCredentials::TPtr& credentials)
- {
- TVector<TString> clusterNames(Reserve(config.ClusterMappingSize()));
+ const TCredentials::TPtr& credentials);
- for (auto& cluster : config.GetClusterMapping()) {
- clusterNames.push_back(cluster.GetName());
- ClusterNamesToClusterConfigs[cluster.GetName()] = cluster;
- }
- this->SetValidClusters(clusterNames);
+ void AddCluster(const TGenericClusterConfig& clusterConfig,
+ const std::shared_ptr<NYql::IDatabaseAsyncResolver> databaseResolver,
+ NYql::IDatabaseAsyncResolver::TDatabaseAuthMap& databaseAuth,
+ const TCredentials::TPtr& credentials);
- for (const auto& cluster : config.GetClusterMapping()) {
- InitCluster(cluster, databaseResolver, databaseAuth, credentials);
- }
- this->FreezeDefaults();
- }
+ TGenericSettings::TConstPtr Snapshot() const;
+ bool HasCluster(TStringBuf cluster) const;
private:
- void InitCluster(const TGenericClusterConfig& cluster,
- const std::shared_ptr<NYql::IDatabaseAsyncResolver> databaseResolver,
- NYql::IDatabaseAsyncResolver::TDatabaseAuthMap& databaseAuth,
- const TCredentials::TPtr& credentials) {
- const auto& clusterName = cluster.GetName();
- const auto& databaseId = cluster.GetDatabaseId();
- const auto& endpoint = cluster.GetEndpoint();
-
- YQL_CLOG(DEBUG, ProviderGeneric)
- << "initialize cluster"
- << ": name = " << clusterName
- << ", database id = " << databaseId
- << ", endpoint = " << endpoint;
-
- if (databaseResolver && databaseId) {
- const auto token = MakeStructuredToken(cluster, credentials);
-
- databaseAuth[std::make_pair(databaseId, DataSourceKindToDatabaseType(cluster.GetKind()))] = NYql::TDatabaseAuth{token, /*AddBearer=*/true};
-
- DatabaseIdsToClusterNames[databaseId].emplace_back(clusterName);
- YQL_CLOG(DEBUG, ProviderGeneric) << "database id '" << databaseId << "' added to mapping";
- }
-
- // NOTE: Tokens map is filled just because it's required by YQL legacy code.
- // The only reason for provider to store these tokens is
- // to keep compatibility with YQL engine.
- // Real credentials are stored in TGenericClusterConfig.
- Tokens[cluster.GetName()] = " ";
- }
-
- // Structured tokens are used to access MDB API. They can be constructed from two
- TString MakeStructuredToken(const TGenericClusterConfig& cluster, const TCredentials::TPtr& credentials) {
- TStructuredTokenBuilder b;
-
- const auto iamToken = credentials->FindCredentialContent("default_" + cluster.name(), "default_generic", cluster.GetToken());
- if (iamToken) {
- return b.SetIAMToken(iamToken).ToJson();
- }
-
- if (cluster.HasServiceAccountId() && cluster.HasServiceAccountIdSignature()) {
- return b.SetServiceAccountIdAuth(cluster.GetServiceAccountId(), cluster.GetServiceAccountIdSignature()).ToJson();
- }
-
- ythrow yexception() << "you should either provide IAM Token via credential system or cluster config, "
- "or set (ServiceAccountId && ServiceAccountIdSignature) in cluster config";
- }
+ TString MakeStructuredToken(const TGenericClusterConfig& clusterConfig, const TCredentials::TPtr& credentials) const;
public:
- TGenericSettings::TConstPtr Snapshot() const {
- return std::make_shared<const TGenericSettings>(*this);
- }
-
- bool HasCluster(TStringBuf cluster) const {
- return ValidClusters.contains(cluster);
- }
-
THashMap<TString, TString> Tokens;
THashMap<TString, TGenericClusterConfig> ClusterNamesToClusterConfigs; // cluster name -> cluster config
THashMap<TString, TVector<TString>> DatabaseIdsToClusterNames; // database id -> cluster name
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_state.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_state.cpp
new file mode 100644
index 00000000000..a2d06296b90
--- /dev/null
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_state.cpp
@@ -0,0 +1,28 @@
+#include "yql_generic_state.h"
+
+namespace NYql {
+ void TGenericState::AddTable(const TStringBuf& clusterName, const TStringBuf& tableName, TTableMeta&& tableMeta) {
+ Tables_.emplace(TTableAddress(clusterName, tableName), tableMeta);
+ }
+
+ TGenericState::TGetTableResult TGenericState::GetTable(const TStringBuf& clusterName, const TStringBuf& tableName) const {
+ auto result = Tables_.FindPtr(TTableAddress(clusterName, tableName));
+ if (result) {
+ return std::make_pair(result, std::nullopt);
+ }
+
+ return std::make_pair(
+ std::nullopt,
+ TIssue(TStringBuilder() << "no metadata for table " << clusterName << "." << tableName));
+ };
+
+ TGenericState::TGetTableResult TGenericState::GetTable(const TStringBuf& clusterName, const TStringBuf& tableName, const TPosition& position) const {
+ auto pair = TGenericState::GetTable(clusterName, tableName);
+ if (pair.second.has_value()) {
+ pair.second->Position = position;
+ }
+
+ return pair;
+ }
+
+}
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_state.h b/ydb/library/yql/providers/generic/provider/yql_generic_state.h
index c99b6d04263..f1c51892d47 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_state.h
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_state.h
@@ -24,41 +24,39 @@ namespace NYql {
using TGetTableResult = std::pair<std::optional<const TTableMeta*>, std::optional<TIssue>>;
- void AddTable(const TStringBuf& clusterName, const TStringBuf& tableName, TTableMeta&& tableMeta) {
- Tables_.emplace(TTableAddress(clusterName, tableName), tableMeta);
- }
-
- TGetTableResult GetTable(const TStringBuf& clusterName, const TStringBuf& tableName) const {
- auto result = Tables_.FindPtr(TTableAddress(clusterName, tableName));
- if (result) {
- return std::make_pair(result, std::nullopt);
- }
-
- return std::make_pair(
- std::nullopt,
- TIssue(TStringBuilder() << "no metadata for table " << clusterName << "." << tableName));
- };
-
- TGetTableResult GetTable(const TStringBuf& clusterName, const TStringBuf& tableName, const TPosition& position) const {
- auto pair = GetTable(clusterName, tableName);
- if (pair.second.has_value()) {
- pair.second->Position = position;
+ TGenericState() = delete;
+
+ TGenericState(
+ TTypeAnnotationContext* types,
+ const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
+ const std::shared_ptr<NYql::IDatabaseAsyncResolver>& databaseResolver,
+ const NConnector::IClient::TPtr& genericClient,
+ const TGatewaysConfig* gatewaysConfig = nullptr)
+ : Types(types)
+ , Configuration(MakeIntrusive<TGenericConfiguration>())
+ , FunctionRegistry(functionRegistry)
+ , DatabaseResolver(databaseResolver)
+ , GenericClient(genericClient)
+ {
+ if (gatewaysConfig) {
+ Configuration->Init(gatewaysConfig->GetGeneric(), databaseResolver, DatabaseAuth, types->Credentials);
}
-
- return pair;
}
- // FIXME: not used anymore, delete it some day
- std::unordered_map<std::string_view, std::string_view> Timezones;
+ void AddTable(const TStringBuf& clusterName, const TStringBuf& tableName, TTableMeta&& tableMeta);
+ TGetTableResult GetTable(const TStringBuf& clusterName, const TStringBuf& tableName) const;
+ TGetTableResult GetTable(const TStringBuf& clusterName, const TStringBuf& tableName, const TPosition& position) const;
- TTypeAnnotationContext* Types = nullptr;
+ TTypeAnnotationContext* Types;
TGenericConfiguration::TPtr Configuration = MakeIntrusive<TGenericConfiguration>();
- const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr;
+ const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry;
// key - (database id, database type), value - credentials to access MDB API
NYql::IDatabaseAsyncResolver::TDatabaseAuthMap DatabaseAuth;
std::shared_ptr<NYql::IDatabaseAsyncResolver> DatabaseResolver;
+ NConnector::IClient::TPtr GenericClient;
+
private:
THashMap<TTableAddress, TTableMeta> Tables_;
};