diff options
author | vitalyisaev <vitalyisaev@yandex-team.com> | 2023-07-17 18:34:48 +0300 |
---|---|---|
committer | vitalyisaev <vitalyisaev@yandex-team.com> | 2023-07-17 18:34:48 +0300 |
commit | f3c0e9b4c9d4e1be2466f3fd3652a7050c689e0b (patch) | |
tree | 3ac56aaab00f4378fad58e810c0c8f6ed09d33af | |
parent | 73554a93bde316abed1cb16560985db25564cc3f (diff) | |
download | ydb-f3c0e9b4c9d4e1be2466f3fd3652a7050c689e0b.tar.gz |
YQ Connector: support multiple tables in a single request
1. Исправлена ошибка в generic provider, не позволявшая в одном запросе извлекать данные из нескольких таблиц (в том числе, делать JOIN).
2. Добавлены интеграционные тесты на JOIN, которые проверяют различные варианты комбинаций таблиц и источников данных.
11 files changed, 173 insertions, 148 deletions
diff --git a/ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.json b/ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.json index 393287f800c..a426ccb1ed6 100644 --- a/ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.json +++ b/ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.json @@ -41,9 +41,10 @@ "Base": "TCallable", "Match": {"Type": "Callable", "Name": "GenSourceSettings"}, "Children": [ - {"Index": 0, "Name": "Table", "Type": "TCoAtom"}, - {"Index": 1, "Name": "Token", "Type": "TCoSecureParam"}, - {"Index": 2, "Name": "Columns", "Type": "TCoAtomList"} + {"Index": 0, "Name": "Cluster", "Type": "TCoAtom"}, + {"Index": 1, "Name": "Table", "Type": "TCoAtom"}, + {"Index": 2, "Name": "Token", "Type": "TCoSecureParam"}, + {"Index": 3, "Name": "Columns", "Type": "TCoAtomList"} ] } ] diff --git a/ydb/library/yql/providers/generic/provider/ya.make b/ydb/library/yql/providers/generic/provider/ya.make index 9129151fcd4..478c925ec09 100644 --- a/ydb/library/yql/providers/generic/provider/ya.make +++ b/ydb/library/yql/providers/generic/provider/ya.make @@ -16,6 +16,7 @@ SRCS( yql_generic_provider.h yql_generic_provider_impl.h yql_generic_settings.h + yql_generic_state.h ) YQL_LAST_ABI_VERSION() diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp index 32c4c52068b..6c6959d331f 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp @@ -9,6 +9,11 @@ #include <ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.h> #include <ydb/library/yql/utils/log/log.h> +// You may want to change AST, graph nodes, types, but finally you'll +// return to the existing structure, inherited from ClickHouse and S3 providers. +// In this case please increment this counter: +// Hours wasted: 5 + namespace NYql { using namespace NNodes; @@ -25,7 +30,11 @@ namespace NYql { } TStatus HandleSourceSettings(const TExprNode::TPtr& input, TExprContext& ctx) { - if (!EnsureArgsCount(*input, 3U, ctx)) { + if (!EnsureArgsCount(*input, 4, ctx)) { + return TStatus::Error; + } + + if (!EnsureAtom(*input->Child(TGenSourceSettings::idx_Cluster), ctx)) { return TStatus::Error; } @@ -40,33 +49,35 @@ namespace NYql { return TStatus::Error; } + // Find requested table metadata + TString clusterName{input->Child(TGenSourceSettings::idx_Cluster)->Content()}; + TString tableName{input->Child(TGenSourceSettings::idx_Table)->Content()}; + + auto [tableMeta, issue] = State_->GetTable(clusterName, tableName, ctx.GetPosition(input->Pos())); + if (issue.has_value()) { + ctx.AddError(issue.value()); + return TStatus::Error; + } + // Create type annotation - const TTypeAnnotationNode* structExprType = nullptr; TVector<const TItemExprType*> blockRowTypeItems; - for (const auto& table : State_->Tables) { - const auto structExprType = table.second.ItemType; - for (const auto& item : structExprType->GetItems()) { - blockRowTypeItems.push_back( - ctx.MakeType<TItemExprType>(item->GetName(), ctx.MakeType<TBlockExprType>(item->GetItemType()))); - } - - // FIXME: YQ-2190 - // Clickhouse provider used to work with multiple tables simultaneously; - // I don't know what to do with others. - break; + const auto structExprType = tableMeta.value()->ItemType; + for (const auto& item : structExprType->GetItems()) { + blockRowTypeItems.push_back( + ctx.MakeType<TItemExprType>(item->GetName(), ctx.MakeType<TBlockExprType>(item->GetItemType()))); } blockRowTypeItems.push_back(ctx.MakeType<TItemExprType>( BlockLengthColumnName, ctx.MakeType<TScalarExprType>(ctx.MakeType<TDataExprType>(EDataSlot::Uint64)))); - structExprType = ctx.MakeType<TStructExprType>(blockRowTypeItems); + const TTypeAnnotationNode* typeAnnotationNode = ctx.MakeType<TStructExprType>(blockRowTypeItems); // Struct column order YQL_CLOG(INFO, ProviderGeneric) << "StructExprType column order:" - << (static_cast<const TStructExprType*>(structExprType))->ToString(); + << (static_cast<const TStructExprType*>(typeAnnotationNode))->ToString(); - auto streamExprType = ctx.MakeType<TStreamExprType>(structExprType); + auto streamExprType = ctx.MakeType<TStreamExprType>(typeAnnotationNode); input->SetTypeAnn(streamExprType); return TStatus::Ok; @@ -116,17 +127,17 @@ namespace NYql { } } - TString cluster{input->Child(TGenReadTable::idx_DataSource)->Child(1)->Content()}; - TString table{input->Child(TGenReadTable::idx_Table)->Content()}; - auto found = State_->Tables.FindPtr(std::make_pair(cluster, table)); - if (!found) { - ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), - TStringBuilder() << "No metadata for table: `" << cluster << "`.`" << table << "`")); + TString clusterName{input->Child(TGenReadTable::idx_DataSource)->Child(1)->Content()}; + TString tableName{input->Child(TGenReadTable::idx_Table)->Content()}; + + auto [tableMeta, issue] = State_->GetTable(clusterName, tableName, ctx.GetPosition(input->Pos())); + if (issue.has_value()) { + ctx.AddError(issue.value()); return TStatus::Error; } - auto itemType = found->ItemType; - auto columnOrder = found->ColumnOrder; + auto itemType = tableMeta.value()->ItemType; + auto columnOrder = tableMeta.value()->ColumnOrder; YQL_CLOG(INFO, ProviderGeneric) << "Custom column order:" << StateColumnOrderToString(columnOrder); diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp index ca51bfca1f8..13123410d5e 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp @@ -39,18 +39,18 @@ namespace NYql { TExprNode::TPtr WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext& ctx) override { if (const auto maybeGenReadTable = TMaybeNode<TGenReadTable>(read)) { - const auto clReadTable = maybeGenReadTable.Cast(); - const auto token = TString("cluster:default_") += clReadTable.DataSource().Cluster().StringValue(); + const auto genReadTable = maybeGenReadTable.Cast(); + const auto token = TString("cluster:default_") += genReadTable.DataSource().Cluster().StringValue(); YQL_CLOG(INFO, ProviderGeneric) << "Wrap " << read->Content() << " with token: " << token; - const auto rowType = clReadTable.Ref() + const auto rowType = genReadTable.Ref() .GetTypeAnn() ->Cast<TTupleExprType>() ->GetItems() .back() ->Cast<TListExprType>() ->GetItemType(); - auto columns = clReadTable.Columns().Ptr(); + auto columns = genReadTable.Columns().Ptr(); if (!columns->IsList()) { const auto pos = columns->Pos(); const auto& items = rowType->Cast<TStructExprType>()->GetItems(); @@ -64,14 +64,15 @@ namespace NYql { // clang-format off return Build<TDqSourceWrap>(ctx, read->Pos()) .Input<TGenSourceSettings>() - .Table(clReadTable.Table()) + .Cluster(genReadTable.DataSource().Cluster()) + .Table(genReadTable.Table()) .Token<TCoSecureParam>() .Name().Build(token) .Build() .Columns(std::move(columns)) .Build() - .RowType(ExpandType(clReadTable.Pos(), *rowType, ctx)) - .DataSource(clReadTable.DataSource().Cast<TCoDataSource>()) + .RowType(ExpandType(genReadTable.Pos(), *rowType, ctx)) + .DataSource(genReadTable.DataSource().Cast<TCoDataSource>()) .Done().Ptr(); // clang-format on } @@ -122,7 +123,10 @@ namespace NYql { auto items = select->mutable_what()->mutable_items(); - const auto& tableMeta = State_->GetTable(cluster, table); + auto [tableMeta, issue] = State_->GetTable(cluster, table); + if (issue.has_value()) { + ythrow yexception() << "Get table metadata: " << issue.value(); + } for (size_t i = 0; i < columns.Size(); i++) { // assign column name @@ -131,12 +135,12 @@ namespace NYql { column->mutable_name()->assign(column_name); // assign column type - auto type = NConnector::GetColumnTypeByName(tableMeta.Schema, column_name); + auto type = NConnector::GetColumnTypeByName(tableMeta.value()->Schema, column_name); column->mutable_type()->CopyFrom(type); } // store data source instance - srcDesc.mutable_data_source_instance()->CopyFrom(tableMeta.DataSourceInstance); + srcDesc.mutable_data_source_instance()->CopyFrom(tableMeta.value()->DataSourceInstance); // preserve source description for read actor protoSettings.PackFrom(srcDesc); diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp index 279fe4fb606..c6684516288 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp @@ -26,7 +26,7 @@ namespace NYql { if (ctx.Step.IsDone(TExprStep::DiscoveryIO)) return TStatus::Ok; - if (!State_->DbResolver) + if (!State_->DatabaseResolver) return TStatus::Ok; THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseAuth> ids; @@ -53,8 +53,8 @@ namespace NYql { if (databaseId) { YQL_CLOG(DEBUG, ProviderGeneric) << "found database id: " << databaseId; const auto idKey = std::make_pair(databaseId, DataSourceKindToDatabaseType(cluster.GetKind())); - const auto iter = State_->DatabaseIds.find(idKey); - if (iter != State_->DatabaseIds.end()) { + const auto iter = State_->DatabaseAuth.find(idKey); + if (iter != State_->DatabaseAuth.end()) { YQL_CLOG(DEBUG, ProviderGeneric) << "resolve database id: " << databaseId; ids[idKey] = iter->second; } @@ -70,7 +70,7 @@ namespace NYql { // FIXME: overengineered code - instead of using weak_ptr, directly copy shared_ptr in callback in this way: // Apply([response = DbResolverResponse_](...)) const std::weak_ptr<NYql::TDatabaseResolverResponse> response = DbResolverResponse_; - AsyncFuture_ = State_->DbResolver->ResolveIds(ids).Apply([response](auto future) { + AsyncFuture_ = State_->DatabaseResolver->ResolveIds(ids).Apply([response](auto future) { if (const auto res = response.lock()) { *res = std::move(future.ExtractValue()); } diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp index 98c3171948c..f3f2679b7a8 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp @@ -39,8 +39,7 @@ namespace NYql { }; class TGenericLoadTableMetadataTransformer: public TGraphTransformerBase { - using TMapType = - std::unordered_map<std::pair<TString, TString>, TGenericTableDescription, THash<std::pair<TString, TString>>>; + using TMapType = std::unordered_map<TGenericState::TTableAddress, TGenericTableDescription, THash<TGenericState::TTableAddress>>; public: TGenericLoadTableMetadataTransformer(TGenericState::TPtr state, NConnector::IClient::TPtr client) @@ -57,27 +56,26 @@ namespace NYql { } std::unordered_set<TMapType::key_type, TMapType::hasher> pendingTables; - if (const auto& reads = FindNodes(input, - [&](const TExprNode::TPtr& node) { - if (const auto maybeRead = TMaybeNode<TGenRead>(node)) { - return maybeRead.Cast().DataSource().Category().Value() == - GenericProviderName; - } - return false; - }); - !reads.empty()) { + const auto& reads = FindNodes(input, + [&](const TExprNode::TPtr& node) { + if (const auto maybeRead = TMaybeNode<TGenRead>(node)) { + return maybeRead.Cast().DataSource().Category().Value() == GenericProviderName; + } + return false; + }); + if (!reads.empty()) { for (const auto& r : reads) { const TGenRead read(r); if (!read.FreeArgs().Get(2).Ref().IsCallable("MrTableConcat")) { ctx.AddError( - TIssue(ctx.GetPosition(read.FreeArgs().Get(0).Pos()), TStringBuilder() << "Expected Key")); + TIssue(ctx.GetPosition(read.FreeArgs().Get(0).Pos()), TStringBuilder() << "Expected key")); return TStatus::Error; } const auto maybeKey = TExprBase(read.FreeArgs().Get(2).Ref().HeadPtr()).Maybe<TCoKey>(); if (!maybeKey) { ctx.AddError( - TIssue(ctx.GetPosition(read.FreeArgs().Get(0).Pos()), TStringBuilder() << "Expected Key")); + TIssue(ctx.GetPosition(read.FreeArgs().Get(0).Pos()), TStringBuilder() << "Expected key")); return TStatus::Error; } @@ -89,11 +87,11 @@ namespace NYql { return TStatus::Error; } - const auto cluster = read.DataSource().Cluster().StringValue(); + const auto clusterName = read.DataSource().Cluster().StringValue(); const auto tableName = TString(keyArg.Tail().Head().Content()); - if (pendingTables.insert(std::make_pair(cluster, tableName)).second) { + if (pendingTables.insert(TGenericState::TTableAddress(clusterName, tableName)).second) { YQL_CLOG(INFO, ProviderGeneric) - << "Load table meta for: `" << cluster << "`.`" << tableName << "`"; + << "Loading table meta for: `" << clusterName << "`.`" << tableName << "`"; } } } @@ -107,7 +105,7 @@ namespace NYql { const auto& clusterName = item.first; const auto it = State_->Configuration->ClusterNamesToClusterConfigs.find(clusterName); - YQL_ENSURE(State_->Configuration->ClusterNamesToClusterConfigs.cend() != it, "cluster not found:" << clusterName); + YQL_ENSURE(State_->Configuration->ClusterNamesToClusterConfigs.cend() != it, "cluster not found: " << clusterName); const auto& clusterConfig = it->second; @@ -162,36 +160,36 @@ namespace NYql { for (const auto& r : reads) { const TGenRead read(r); - const auto cluster = read.DataSource().Cluster().StringValue(); + const auto clusterName = read.DataSource().Cluster().StringValue(); const auto& keyArg = TExprBase(read.FreeArgs().Get(2).Ref().HeadPtr()).Cast<TCoKey>().Ref().Head(); - const auto table = TString(keyArg.Tail().Head().Content()); + const auto tableName = TString(keyArg.Tail().Head().Content()); - const auto it = Results_.find(std::make_pair(cluster, table)); + const auto it = Results_.find(TGenericState::TTableAddress(clusterName, tableName)); if (Results_.cend() != it) { const auto& result = it->second.Result; const auto& error = result->Error; if (NConnector::ErrorIsSuccess(error)) { - TGenericState::TTableMeta meta; - meta.Schema = result->Schema; - meta.DataSourceInstance = it->second.DataSourceInstance; + TGenericState::TTableMeta tableMeta; + tableMeta.Schema = result->Schema; + tableMeta.DataSourceInstance = it->second.DataSourceInstance; - const auto& parse = ParseTableMeta(meta.Schema, cluster, table, ctx, meta.ColumnOrder); + const auto& parse = ParseTableMeta(tableMeta.Schema, clusterName, tableName, ctx, tableMeta.ColumnOrder); if (parse.first) { - meta.ItemType = parse.first; + tableMeta.ItemType = parse.first; State_->Timezones[read.DataSource().Cluster().Value()] = ctx.AppendString(parse.second); if (const auto ins = replaces.emplace(read.Raw(), TExprNode::TPtr()); ins.second) { // clang-format off ins.first->second = Build<TGenReadTable>(ctx, read.Pos()) .World(read.World()) .DataSource(read.DataSource()) - .Table().Value(table).Build() + .Table().Value(tableName).Build() .Columns<TCoVoid>().Build() .Timezone().Value(parse.second).Build() .Done().Ptr(); // clang-format on } - State_->Tables.emplace(it->first, meta); + State_->AddTable(clusterName, tableName, std::move(tableMeta)); } else { hasErrors = true; break; @@ -199,13 +197,13 @@ namespace NYql { } else { NConnector::ErrorToExprCtx(error, ctx, ctx.GetPosition(read.Pos()), TStringBuilder() - << "loading metadata for table: " << cluster << '.' << table); + << "Loading metadata for table: " << clusterName << '.' << tableName); hasErrors = true; break; } } else { ctx.AddError(TIssue(ctx.GetPosition(read.Pos()), - TStringBuilder() << "Not found result for " << cluster << '.' << table)); + TStringBuilder() << "Not found result for " << clusterName << '.' << tableName)); hasErrors = true; break; } diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp index a845ad9d2e9..5141983d112 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp @@ -39,9 +39,18 @@ namespace NYql { ->Cast<TMultiExprType>() ->GetSize()) { const auto& read = maybe.Cast(); - const auto structType = - State_->Tables[std::make_pair(read.DataSource().Cluster().Value(), read.Table().Value())] - .ItemType; + + // Get table metadata + const auto [tableMeta, issue] = State_->GetTable( + read.DataSource().Cluster().Value(), + read.Table().Value(), + ctx.GetPosition(node.Pos())); + if (issue.has_value()) { + ctx.AddError(issue.value()); + return node; + } + + const auto structType = tableMeta.value()->ItemType; YQL_ENSURE(structType->GetSize()); auto columns = ctx.NewList(read.Pos(), {ctx.NewAtom(read.Pos(), GetLightColumn(*structType)->GetName())}); diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp index e3310235249..d6ed0ab63d1 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp @@ -25,9 +25,9 @@ namespace NYql { state->Types = typeCtx.Get(); state->FunctionRegistry = functionRegistry; - state->DbResolver = dbResolver; + state->DatabaseResolver = dbResolver; if (gatewaysConfig) { - state->Configuration->Init(gatewaysConfig->GetGeneric(), state->DbResolver, state->DatabaseIds, typeCtx->Credentials); + state->Configuration->Init(gatewaysConfig->GetGeneric(), state->DatabaseResolver, state->DatabaseAuth, typeCtx->Credentials); } TDataProviderInfo info; diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_provider.h b/ydb/library/yql/providers/generic/provider/yql_generic_provider.h index 784890ec530..8f3144f151f 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_provider.h +++ b/ydb/library/yql/providers/generic/provider/yql_generic_provider.h @@ -1,83 +1,19 @@ #pragma once -#include "yql_generic_settings.h" +#include "yql_generic_state.h" #include <sstream> #include <ydb/library/yql/core/yql_data_provider.h> #include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h> -#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> -#include <ydb/library/yql/providers/generic/connector/libcpp/client.h> - -namespace NKikimr::NMiniKQL { - class IFunctionRegistry; -} namespace NYql { - - struct TGenericState: public TThrRefBase { - using TPtr = TIntrusivePtr<TGenericState>; - - struct TTableMeta { - const TStructExprType* ItemType = nullptr; - TVector<TString> ColumnOrder; - NYql::NConnector::NApi::TSchema Schema; - NYql::NConnector::NApi::TDataSourceInstance DataSourceInstance; - - TString ToString() const { - TStringBuilder sb; - sb << "Schema: " << Schema.ShortDebugString(); - sb << "; ColumnOrder: "; - for (size_t i = 0; i < ColumnOrder.size(); i++) { - sb << i << " " << ColumnOrder[i]; - } - if (ItemType) { - sb << "; ItemType: " << ItemType->ToString(); - } else { - sb << "; ItemType: nullptr"; - } - - return sb; - } - }; - - TTableMeta& GetTable(const TString& cluster, const TString& table) { - auto search = Tables.find(std::make_pair(cluster, table)); - if (search != Tables.end()) { - return search->second; - } - - ythrow yexception() << "unknown (" << cluster << ", " << table << ") pair"; - }; - - TString ToString() const { - TStringBuilder sb; - if (Tables) { - for (const auto& kv : Tables) { - sb << "Table '" << kv.first << "':"; - sb << kv.second.ToString() << "\n"; - } - } - return sb; - } - - THashMap<std::pair<TString, TString>, TTableMeta> Tables; - std::unordered_map<std::string_view, std::string_view> Timezones; - - TTypeAnnotationContext* Types = nullptr; - TGenericConfiguration::TPtr Configuration = MakeIntrusive<TGenericConfiguration>(); - const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr; - - // key - database id, value - credentials to access MDB API - NYql::IDatabaseAsyncResolver::TDatabaseAuthMap DatabaseIds; - std::shared_ptr<NYql::IDatabaseAsyncResolver> DbResolver; - }; - TDataProviderInitializer GetGenericDataProviderInitializer(NConnector::IClient::TPtr genericClient, std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver = nullptr); TIntrusivePtr<IDataProvider> CreateGenericDataSource(TGenericState::TPtr state, NConnector::IClient::TPtr genericClient); + TIntrusivePtr<IDataProvider> CreateGenericDataSink(TGenericState::TPtr state); } // namespace NYql diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_settings.h b/ydb/library/yql/providers/generic/provider/yql_generic_settings.h index b6e043e4f2d..9c2516b2fa5 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_settings.h +++ b/ydb/library/yql/providers/generic/provider/yql_generic_settings.h @@ -22,8 +22,8 @@ namespace NYql { template <typename TProtoConfig> void Init(const TProtoConfig& config, - const std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver, - NYql::IDatabaseAsyncResolver::TDatabaseAuthMap& databaseIds, + const std::shared_ptr<NYql::IDatabaseAsyncResolver> databaseResolver, + NYql::IDatabaseAsyncResolver::TDatabaseAuthMap& databaseAuth, const TCredentials::TPtr& credentials) { TVector<TString> clusterNames(Reserve(config.ClusterMappingSize())); @@ -35,15 +35,15 @@ namespace NYql { this->SetValidClusters(clusterNames); for (const auto& cluster : config.GetClusterMapping()) { - InitCluster(cluster, dbResolver, databaseIds, credentials); + InitCluster(cluster, databaseResolver, databaseAuth, credentials); } this->FreezeDefaults(); } private: void InitCluster(const TGenericClusterConfig& cluster, - const std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver, - NYql::IDatabaseAsyncResolver::TDatabaseAuthMap& databaseIds, + const std::shared_ptr<NYql::IDatabaseAsyncResolver> databaseResolver, + NYql::IDatabaseAsyncResolver::TDatabaseAuthMap& databaseAuth, const TCredentials::TPtr& credentials) { const auto& clusterName = cluster.GetName(); const auto& databaseId = cluster.GetDatabaseId(); @@ -55,10 +55,10 @@ namespace NYql { << ", database id = " << databaseId << ", endpoint = " << endpoint; - if (dbResolver && databaseId) { + if (databaseResolver && databaseId) { const auto token = MakeStructuredToken(cluster, credentials); - databaseIds[std::make_pair(databaseId, DataSourceKindToDatabaseType(cluster.GetKind()))] = NYql::TDatabaseAuth{token, /*AddBearer=*/true}; + databaseAuth[std::make_pair(databaseId, DataSourceKindToDatabaseType(cluster.GetKind()))] = NYql::TDatabaseAuth{token, /*AddBearer=*/true}; DatabaseIdsToClusterNames[databaseId].emplace_back(clusterName); YQL_CLOG(DEBUG, ProviderGeneric) << "database id '" << databaseId << "' added to mapping"; diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_state.h b/ydb/library/yql/providers/generic/provider/yql_generic_state.h new file mode 100644 index 00000000000..c99b6d04263 --- /dev/null +++ b/ydb/library/yql/providers/generic/provider/yql_generic_state.h @@ -0,0 +1,65 @@ +#pragma once + +#include "yql_generic_settings.h" + +#include <ydb/library/yql/core/yql_data_provider.h> +#include <ydb/library/yql/providers/generic/connector/libcpp/client.h> + +namespace NKikimr::NMiniKQL { + class IFunctionRegistry; +} + +namespace NYql { + struct TGenericState: public TThrRefBase { + using TPtr = TIntrusivePtr<TGenericState>; + + using TTableAddress = std::pair<TString, TString>; // std::pair<clusterName, tableName> + + struct TTableMeta { + const TStructExprType* ItemType = nullptr; + TVector<TString> ColumnOrder; + NYql::NConnector::NApi::TSchema Schema; + NYql::NConnector::NApi::TDataSourceInstance DataSourceInstance; + }; + + using TGetTableResult = std::pair<std::optional<const TTableMeta*>, std::optional<TIssue>>; + + void AddTable(const TStringBuf& clusterName, const TStringBuf& tableName, TTableMeta&& tableMeta) { + Tables_.emplace(TTableAddress(clusterName, tableName), tableMeta); + } + + TGetTableResult GetTable(const TStringBuf& clusterName, const TStringBuf& tableName) const { + auto result = Tables_.FindPtr(TTableAddress(clusterName, tableName)); + if (result) { + return std::make_pair(result, std::nullopt); + } + + return std::make_pair( + std::nullopt, + TIssue(TStringBuilder() << "no metadata for table " << clusterName << "." << tableName)); + }; + + TGetTableResult GetTable(const TStringBuf& clusterName, const TStringBuf& tableName, const TPosition& position) const { + auto pair = GetTable(clusterName, tableName); + if (pair.second.has_value()) { + pair.second->Position = position; + } + + return pair; + } + + // FIXME: not used anymore, delete it some day + std::unordered_map<std::string_view, std::string_view> Timezones; + + TTypeAnnotationContext* Types = nullptr; + TGenericConfiguration::TPtr Configuration = MakeIntrusive<TGenericConfiguration>(); + const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr; + + // key - (database id, database type), value - credentials to access MDB API + NYql::IDatabaseAsyncResolver::TDatabaseAuthMap DatabaseAuth; + std::shared_ptr<NYql::IDatabaseAsyncResolver> DatabaseResolver; + + private: + THashMap<TTableAddress, TTableMeta> Tables_; + }; +} |