aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvitalyisaev <vitalyisaev@yandex-team.com>2023-07-17 18:34:48 +0300
committervitalyisaev <vitalyisaev@yandex-team.com>2023-07-17 18:34:48 +0300
commitf3c0e9b4c9d4e1be2466f3fd3652a7050c689e0b (patch)
tree3ac56aaab00f4378fad58e810c0c8f6ed09d33af
parent73554a93bde316abed1cb16560985db25564cc3f (diff)
downloadydb-f3c0e9b4c9d4e1be2466f3fd3652a7050c689e0b.tar.gz
YQ Connector: support multiple tables in a single request
1. Исправлена ошибка в generic provider, не позволявшая в одном запросе извлекать данные из нескольких таблиц (в том числе, делать JOIN). 2. Добавлены интеграционные тесты на JOIN, которые проверяют различные варианты комбинаций таблиц и источников данных.
-rw-r--r--ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.json7
-rw-r--r--ydb/library/yql/providers/generic/provider/ya.make1
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp59
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp24
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp8
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp56
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp15
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp4
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_provider.h68
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_settings.h14
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_state.h65
11 files changed, 173 insertions, 148 deletions
diff --git a/ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.json b/ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.json
index 393287f800c..a426ccb1ed6 100644
--- a/ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.json
+++ b/ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.json
@@ -41,9 +41,10 @@
"Base": "TCallable",
"Match": {"Type": "Callable", "Name": "GenSourceSettings"},
"Children": [
- {"Index": 0, "Name": "Table", "Type": "TCoAtom"},
- {"Index": 1, "Name": "Token", "Type": "TCoSecureParam"},
- {"Index": 2, "Name": "Columns", "Type": "TCoAtomList"}
+ {"Index": 0, "Name": "Cluster", "Type": "TCoAtom"},
+ {"Index": 1, "Name": "Table", "Type": "TCoAtom"},
+ {"Index": 2, "Name": "Token", "Type": "TCoSecureParam"},
+ {"Index": 3, "Name": "Columns", "Type": "TCoAtomList"}
]
}
]
diff --git a/ydb/library/yql/providers/generic/provider/ya.make b/ydb/library/yql/providers/generic/provider/ya.make
index 9129151fcd4..478c925ec09 100644
--- a/ydb/library/yql/providers/generic/provider/ya.make
+++ b/ydb/library/yql/providers/generic/provider/ya.make
@@ -16,6 +16,7 @@ SRCS(
yql_generic_provider.h
yql_generic_provider_impl.h
yql_generic_settings.h
+ yql_generic_state.h
)
YQL_LAST_ABI_VERSION()
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp
index 32c4c52068b..6c6959d331f 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp
@@ -9,6 +9,11 @@
#include <ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.h>
#include <ydb/library/yql/utils/log/log.h>
+// You may want to change AST, graph nodes, types, but finally you'll
+// return to the existing structure, inherited from ClickHouse and S3 providers.
+// In this case please increment this counter:
+// Hours wasted: 5
+
namespace NYql {
using namespace NNodes;
@@ -25,7 +30,11 @@ namespace NYql {
}
TStatus HandleSourceSettings(const TExprNode::TPtr& input, TExprContext& ctx) {
- if (!EnsureArgsCount(*input, 3U, ctx)) {
+ if (!EnsureArgsCount(*input, 4, ctx)) {
+ return TStatus::Error;
+ }
+
+ if (!EnsureAtom(*input->Child(TGenSourceSettings::idx_Cluster), ctx)) {
return TStatus::Error;
}
@@ -40,33 +49,35 @@ namespace NYql {
return TStatus::Error;
}
+ // Find requested table metadata
+ TString clusterName{input->Child(TGenSourceSettings::idx_Cluster)->Content()};
+ TString tableName{input->Child(TGenSourceSettings::idx_Table)->Content()};
+
+ auto [tableMeta, issue] = State_->GetTable(clusterName, tableName, ctx.GetPosition(input->Pos()));
+ if (issue.has_value()) {
+ ctx.AddError(issue.value());
+ return TStatus::Error;
+ }
+
// Create type annotation
- const TTypeAnnotationNode* structExprType = nullptr;
TVector<const TItemExprType*> blockRowTypeItems;
- for (const auto& table : State_->Tables) {
- const auto structExprType = table.second.ItemType;
- for (const auto& item : structExprType->GetItems()) {
- blockRowTypeItems.push_back(
- ctx.MakeType<TItemExprType>(item->GetName(), ctx.MakeType<TBlockExprType>(item->GetItemType())));
- }
-
- // FIXME: YQ-2190
- // Clickhouse provider used to work with multiple tables simultaneously;
- // I don't know what to do with others.
- break;
+ const auto structExprType = tableMeta.value()->ItemType;
+ for (const auto& item : structExprType->GetItems()) {
+ blockRowTypeItems.push_back(
+ ctx.MakeType<TItemExprType>(item->GetName(), ctx.MakeType<TBlockExprType>(item->GetItemType())));
}
blockRowTypeItems.push_back(ctx.MakeType<TItemExprType>(
BlockLengthColumnName, ctx.MakeType<TScalarExprType>(ctx.MakeType<TDataExprType>(EDataSlot::Uint64))));
- structExprType = ctx.MakeType<TStructExprType>(blockRowTypeItems);
+ const TTypeAnnotationNode* typeAnnotationNode = ctx.MakeType<TStructExprType>(blockRowTypeItems);
// Struct column order
YQL_CLOG(INFO, ProviderGeneric)
<< "StructExprType column order:"
- << (static_cast<const TStructExprType*>(structExprType))->ToString();
+ << (static_cast<const TStructExprType*>(typeAnnotationNode))->ToString();
- auto streamExprType = ctx.MakeType<TStreamExprType>(structExprType);
+ auto streamExprType = ctx.MakeType<TStreamExprType>(typeAnnotationNode);
input->SetTypeAnn(streamExprType);
return TStatus::Ok;
@@ -116,17 +127,17 @@ namespace NYql {
}
}
- TString cluster{input->Child(TGenReadTable::idx_DataSource)->Child(1)->Content()};
- TString table{input->Child(TGenReadTable::idx_Table)->Content()};
- auto found = State_->Tables.FindPtr(std::make_pair(cluster, table));
- if (!found) {
- ctx.AddError(TIssue(ctx.GetPosition(input->Pos()),
- TStringBuilder() << "No metadata for table: `" << cluster << "`.`" << table << "`"));
+ TString clusterName{input->Child(TGenReadTable::idx_DataSource)->Child(1)->Content()};
+ TString tableName{input->Child(TGenReadTable::idx_Table)->Content()};
+
+ auto [tableMeta, issue] = State_->GetTable(clusterName, tableName, ctx.GetPosition(input->Pos()));
+ if (issue.has_value()) {
+ ctx.AddError(issue.value());
return TStatus::Error;
}
- auto itemType = found->ItemType;
- auto columnOrder = found->ColumnOrder;
+ auto itemType = tableMeta.value()->ItemType;
+ auto columnOrder = tableMeta.value()->ColumnOrder;
YQL_CLOG(INFO, ProviderGeneric) << "Custom column order:" << StateColumnOrderToString(columnOrder);
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp
index ca51bfca1f8..13123410d5e 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp
@@ -39,18 +39,18 @@ namespace NYql {
TExprNode::TPtr WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext& ctx) override {
if (const auto maybeGenReadTable = TMaybeNode<TGenReadTable>(read)) {
- const auto clReadTable = maybeGenReadTable.Cast();
- const auto token = TString("cluster:default_") += clReadTable.DataSource().Cluster().StringValue();
+ const auto genReadTable = maybeGenReadTable.Cast();
+ const auto token = TString("cluster:default_") += genReadTable.DataSource().Cluster().StringValue();
YQL_CLOG(INFO, ProviderGeneric) << "Wrap " << read->Content() << " with token: " << token;
- const auto rowType = clReadTable.Ref()
+ const auto rowType = genReadTable.Ref()
.GetTypeAnn()
->Cast<TTupleExprType>()
->GetItems()
.back()
->Cast<TListExprType>()
->GetItemType();
- auto columns = clReadTable.Columns().Ptr();
+ auto columns = genReadTable.Columns().Ptr();
if (!columns->IsList()) {
const auto pos = columns->Pos();
const auto& items = rowType->Cast<TStructExprType>()->GetItems();
@@ -64,14 +64,15 @@ namespace NYql {
// clang-format off
return Build<TDqSourceWrap>(ctx, read->Pos())
.Input<TGenSourceSettings>()
- .Table(clReadTable.Table())
+ .Cluster(genReadTable.DataSource().Cluster())
+ .Table(genReadTable.Table())
.Token<TCoSecureParam>()
.Name().Build(token)
.Build()
.Columns(std::move(columns))
.Build()
- .RowType(ExpandType(clReadTable.Pos(), *rowType, ctx))
- .DataSource(clReadTable.DataSource().Cast<TCoDataSource>())
+ .RowType(ExpandType(genReadTable.Pos(), *rowType, ctx))
+ .DataSource(genReadTable.DataSource().Cast<TCoDataSource>())
.Done().Ptr();
// clang-format on
}
@@ -122,7 +123,10 @@ namespace NYql {
auto items = select->mutable_what()->mutable_items();
- const auto& tableMeta = State_->GetTable(cluster, table);
+ auto [tableMeta, issue] = State_->GetTable(cluster, table);
+ if (issue.has_value()) {
+ ythrow yexception() << "Get table metadata: " << issue.value();
+ }
for (size_t i = 0; i < columns.Size(); i++) {
// assign column name
@@ -131,12 +135,12 @@ namespace NYql {
column->mutable_name()->assign(column_name);
// assign column type
- auto type = NConnector::GetColumnTypeByName(tableMeta.Schema, column_name);
+ auto type = NConnector::GetColumnTypeByName(tableMeta.value()->Schema, column_name);
column->mutable_type()->CopyFrom(type);
}
// store data source instance
- srcDesc.mutable_data_source_instance()->CopyFrom(tableMeta.DataSourceInstance);
+ srcDesc.mutable_data_source_instance()->CopyFrom(tableMeta.value()->DataSourceInstance);
// preserve source description for read actor
protoSettings.PackFrom(srcDesc);
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp
index 279fe4fb606..c6684516288 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp
@@ -26,7 +26,7 @@ namespace NYql {
if (ctx.Step.IsDone(TExprStep::DiscoveryIO))
return TStatus::Ok;
- if (!State_->DbResolver)
+ if (!State_->DatabaseResolver)
return TStatus::Ok;
THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseAuth> ids;
@@ -53,8 +53,8 @@ namespace NYql {
if (databaseId) {
YQL_CLOG(DEBUG, ProviderGeneric) << "found database id: " << databaseId;
const auto idKey = std::make_pair(databaseId, DataSourceKindToDatabaseType(cluster.GetKind()));
- const auto iter = State_->DatabaseIds.find(idKey);
- if (iter != State_->DatabaseIds.end()) {
+ const auto iter = State_->DatabaseAuth.find(idKey);
+ if (iter != State_->DatabaseAuth.end()) {
YQL_CLOG(DEBUG, ProviderGeneric) << "resolve database id: " << databaseId;
ids[idKey] = iter->second;
}
@@ -70,7 +70,7 @@ namespace NYql {
// FIXME: overengineered code - instead of using weak_ptr, directly copy shared_ptr in callback in this way:
// Apply([response = DbResolverResponse_](...))
const std::weak_ptr<NYql::TDatabaseResolverResponse> response = DbResolverResponse_;
- AsyncFuture_ = State_->DbResolver->ResolveIds(ids).Apply([response](auto future) {
+ AsyncFuture_ = State_->DatabaseResolver->ResolveIds(ids).Apply([response](auto future) {
if (const auto res = response.lock()) {
*res = std::move(future.ExtractValue());
}
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp
index 98c3171948c..f3f2679b7a8 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp
@@ -39,8 +39,7 @@ namespace NYql {
};
class TGenericLoadTableMetadataTransformer: public TGraphTransformerBase {
- using TMapType =
- std::unordered_map<std::pair<TString, TString>, TGenericTableDescription, THash<std::pair<TString, TString>>>;
+ using TMapType = std::unordered_map<TGenericState::TTableAddress, TGenericTableDescription, THash<TGenericState::TTableAddress>>;
public:
TGenericLoadTableMetadataTransformer(TGenericState::TPtr state, NConnector::IClient::TPtr client)
@@ -57,27 +56,26 @@ namespace NYql {
}
std::unordered_set<TMapType::key_type, TMapType::hasher> pendingTables;
- if (const auto& reads = FindNodes(input,
- [&](const TExprNode::TPtr& node) {
- if (const auto maybeRead = TMaybeNode<TGenRead>(node)) {
- return maybeRead.Cast().DataSource().Category().Value() ==
- GenericProviderName;
- }
- return false;
- });
- !reads.empty()) {
+ const auto& reads = FindNodes(input,
+ [&](const TExprNode::TPtr& node) {
+ if (const auto maybeRead = TMaybeNode<TGenRead>(node)) {
+ return maybeRead.Cast().DataSource().Category().Value() == GenericProviderName;
+ }
+ return false;
+ });
+ if (!reads.empty()) {
for (const auto& r : reads) {
const TGenRead read(r);
if (!read.FreeArgs().Get(2).Ref().IsCallable("MrTableConcat")) {
ctx.AddError(
- TIssue(ctx.GetPosition(read.FreeArgs().Get(0).Pos()), TStringBuilder() << "Expected Key"));
+ TIssue(ctx.GetPosition(read.FreeArgs().Get(0).Pos()), TStringBuilder() << "Expected key"));
return TStatus::Error;
}
const auto maybeKey = TExprBase(read.FreeArgs().Get(2).Ref().HeadPtr()).Maybe<TCoKey>();
if (!maybeKey) {
ctx.AddError(
- TIssue(ctx.GetPosition(read.FreeArgs().Get(0).Pos()), TStringBuilder() << "Expected Key"));
+ TIssue(ctx.GetPosition(read.FreeArgs().Get(0).Pos()), TStringBuilder() << "Expected key"));
return TStatus::Error;
}
@@ -89,11 +87,11 @@ namespace NYql {
return TStatus::Error;
}
- const auto cluster = read.DataSource().Cluster().StringValue();
+ const auto clusterName = read.DataSource().Cluster().StringValue();
const auto tableName = TString(keyArg.Tail().Head().Content());
- if (pendingTables.insert(std::make_pair(cluster, tableName)).second) {
+ if (pendingTables.insert(TGenericState::TTableAddress(clusterName, tableName)).second) {
YQL_CLOG(INFO, ProviderGeneric)
- << "Load table meta for: `" << cluster << "`.`" << tableName << "`";
+ << "Loading table meta for: `" << clusterName << "`.`" << tableName << "`";
}
}
}
@@ -107,7 +105,7 @@ namespace NYql {
const auto& clusterName = item.first;
const auto it = State_->Configuration->ClusterNamesToClusterConfigs.find(clusterName);
- YQL_ENSURE(State_->Configuration->ClusterNamesToClusterConfigs.cend() != it, "cluster not found:" << clusterName);
+ YQL_ENSURE(State_->Configuration->ClusterNamesToClusterConfigs.cend() != it, "cluster not found: " << clusterName);
const auto& clusterConfig = it->second;
@@ -162,36 +160,36 @@ namespace NYql {
for (const auto& r : reads) {
const TGenRead read(r);
- const auto cluster = read.DataSource().Cluster().StringValue();
+ const auto clusterName = read.DataSource().Cluster().StringValue();
const auto& keyArg = TExprBase(read.FreeArgs().Get(2).Ref().HeadPtr()).Cast<TCoKey>().Ref().Head();
- const auto table = TString(keyArg.Tail().Head().Content());
+ const auto tableName = TString(keyArg.Tail().Head().Content());
- const auto it = Results_.find(std::make_pair(cluster, table));
+ const auto it = Results_.find(TGenericState::TTableAddress(clusterName, tableName));
if (Results_.cend() != it) {
const auto& result = it->second.Result;
const auto& error = result->Error;
if (NConnector::ErrorIsSuccess(error)) {
- TGenericState::TTableMeta meta;
- meta.Schema = result->Schema;
- meta.DataSourceInstance = it->second.DataSourceInstance;
+ TGenericState::TTableMeta tableMeta;
+ tableMeta.Schema = result->Schema;
+ tableMeta.DataSourceInstance = it->second.DataSourceInstance;
- const auto& parse = ParseTableMeta(meta.Schema, cluster, table, ctx, meta.ColumnOrder);
+ const auto& parse = ParseTableMeta(tableMeta.Schema, clusterName, tableName, ctx, tableMeta.ColumnOrder);
if (parse.first) {
- meta.ItemType = parse.first;
+ tableMeta.ItemType = parse.first;
State_->Timezones[read.DataSource().Cluster().Value()] = ctx.AppendString(parse.second);
if (const auto ins = replaces.emplace(read.Raw(), TExprNode::TPtr()); ins.second) {
// clang-format off
ins.first->second = Build<TGenReadTable>(ctx, read.Pos())
.World(read.World())
.DataSource(read.DataSource())
- .Table().Value(table).Build()
+ .Table().Value(tableName).Build()
.Columns<TCoVoid>().Build()
.Timezone().Value(parse.second).Build()
.Done().Ptr();
// clang-format on
}
- State_->Tables.emplace(it->first, meta);
+ State_->AddTable(clusterName, tableName, std::move(tableMeta));
} else {
hasErrors = true;
break;
@@ -199,13 +197,13 @@ namespace NYql {
} else {
NConnector::ErrorToExprCtx(error, ctx, ctx.GetPosition(read.Pos()),
TStringBuilder()
- << "loading metadata for table: " << cluster << '.' << table);
+ << "Loading metadata for table: " << clusterName << '.' << tableName);
hasErrors = true;
break;
}
} else {
ctx.AddError(TIssue(ctx.GetPosition(read.Pos()),
- TStringBuilder() << "Not found result for " << cluster << '.' << table));
+ TStringBuilder() << "Not found result for " << clusterName << '.' << tableName));
hasErrors = true;
break;
}
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp
index a845ad9d2e9..5141983d112 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp
@@ -39,9 +39,18 @@ namespace NYql {
->Cast<TMultiExprType>()
->GetSize()) {
const auto& read = maybe.Cast();
- const auto structType =
- State_->Tables[std::make_pair(read.DataSource().Cluster().Value(), read.Table().Value())]
- .ItemType;
+
+ // Get table metadata
+ const auto [tableMeta, issue] = State_->GetTable(
+ read.DataSource().Cluster().Value(),
+ read.Table().Value(),
+ ctx.GetPosition(node.Pos()));
+ if (issue.has_value()) {
+ ctx.AddError(issue.value());
+ return node;
+ }
+
+ const auto structType = tableMeta.value()->ItemType;
YQL_ENSURE(structType->GetSize());
auto columns =
ctx.NewList(read.Pos(), {ctx.NewAtom(read.Pos(), GetLightColumn(*structType)->GetName())});
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp
index e3310235249..d6ed0ab63d1 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp
@@ -25,9 +25,9 @@ namespace NYql {
state->Types = typeCtx.Get();
state->FunctionRegistry = functionRegistry;
- state->DbResolver = dbResolver;
+ state->DatabaseResolver = dbResolver;
if (gatewaysConfig) {
- state->Configuration->Init(gatewaysConfig->GetGeneric(), state->DbResolver, state->DatabaseIds, typeCtx->Credentials);
+ state->Configuration->Init(gatewaysConfig->GetGeneric(), state->DatabaseResolver, state->DatabaseAuth, typeCtx->Credentials);
}
TDataProviderInfo info;
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_provider.h b/ydb/library/yql/providers/generic/provider/yql_generic_provider.h
index 784890ec530..8f3144f151f 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_provider.h
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_provider.h
@@ -1,83 +1,19 @@
#pragma once
-#include "yql_generic_settings.h"
+#include "yql_generic_state.h"
#include <sstream>
#include <ydb/library/yql/core/yql_data_provider.h>
#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h>
-#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
-#include <ydb/library/yql/providers/generic/connector/libcpp/client.h>
-
-namespace NKikimr::NMiniKQL {
- class IFunctionRegistry;
-}
namespace NYql {
-
- struct TGenericState: public TThrRefBase {
- using TPtr = TIntrusivePtr<TGenericState>;
-
- struct TTableMeta {
- const TStructExprType* ItemType = nullptr;
- TVector<TString> ColumnOrder;
- NYql::NConnector::NApi::TSchema Schema;
- NYql::NConnector::NApi::TDataSourceInstance DataSourceInstance;
-
- TString ToString() const {
- TStringBuilder sb;
- sb << "Schema: " << Schema.ShortDebugString();
- sb << "; ColumnOrder: ";
- for (size_t i = 0; i < ColumnOrder.size(); i++) {
- sb << i << " " << ColumnOrder[i];
- }
- if (ItemType) {
- sb << "; ItemType: " << ItemType->ToString();
- } else {
- sb << "; ItemType: nullptr";
- }
-
- return sb;
- }
- };
-
- TTableMeta& GetTable(const TString& cluster, const TString& table) {
- auto search = Tables.find(std::make_pair(cluster, table));
- if (search != Tables.end()) {
- return search->second;
- }
-
- ythrow yexception() << "unknown (" << cluster << ", " << table << ") pair";
- };
-
- TString ToString() const {
- TStringBuilder sb;
- if (Tables) {
- for (const auto& kv : Tables) {
- sb << "Table '" << kv.first << "':";
- sb << kv.second.ToString() << "\n";
- }
- }
- return sb;
- }
-
- THashMap<std::pair<TString, TString>, TTableMeta> Tables;
- std::unordered_map<std::string_view, std::string_view> Timezones;
-
- TTypeAnnotationContext* Types = nullptr;
- TGenericConfiguration::TPtr Configuration = MakeIntrusive<TGenericConfiguration>();
- const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr;
-
- // key - database id, value - credentials to access MDB API
- NYql::IDatabaseAsyncResolver::TDatabaseAuthMap DatabaseIds;
- std::shared_ptr<NYql::IDatabaseAsyncResolver> DbResolver;
- };
-
TDataProviderInitializer
GetGenericDataProviderInitializer(NConnector::IClient::TPtr genericClient,
std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver = nullptr);
TIntrusivePtr<IDataProvider> CreateGenericDataSource(TGenericState::TPtr state,
NConnector::IClient::TPtr genericClient);
+
TIntrusivePtr<IDataProvider> CreateGenericDataSink(TGenericState::TPtr state);
} // namespace NYql
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_settings.h b/ydb/library/yql/providers/generic/provider/yql_generic_settings.h
index b6e043e4f2d..9c2516b2fa5 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_settings.h
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_settings.h
@@ -22,8 +22,8 @@ namespace NYql {
template <typename TProtoConfig>
void Init(const TProtoConfig& config,
- const std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver,
- NYql::IDatabaseAsyncResolver::TDatabaseAuthMap& databaseIds,
+ const std::shared_ptr<NYql::IDatabaseAsyncResolver> databaseResolver,
+ NYql::IDatabaseAsyncResolver::TDatabaseAuthMap& databaseAuth,
const TCredentials::TPtr& credentials)
{
TVector<TString> clusterNames(Reserve(config.ClusterMappingSize()));
@@ -35,15 +35,15 @@ namespace NYql {
this->SetValidClusters(clusterNames);
for (const auto& cluster : config.GetClusterMapping()) {
- InitCluster(cluster, dbResolver, databaseIds, credentials);
+ InitCluster(cluster, databaseResolver, databaseAuth, credentials);
}
this->FreezeDefaults();
}
private:
void InitCluster(const TGenericClusterConfig& cluster,
- const std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver,
- NYql::IDatabaseAsyncResolver::TDatabaseAuthMap& databaseIds,
+ const std::shared_ptr<NYql::IDatabaseAsyncResolver> databaseResolver,
+ NYql::IDatabaseAsyncResolver::TDatabaseAuthMap& databaseAuth,
const TCredentials::TPtr& credentials) {
const auto& clusterName = cluster.GetName();
const auto& databaseId = cluster.GetDatabaseId();
@@ -55,10 +55,10 @@ namespace NYql {
<< ", database id = " << databaseId
<< ", endpoint = " << endpoint;
- if (dbResolver && databaseId) {
+ if (databaseResolver && databaseId) {
const auto token = MakeStructuredToken(cluster, credentials);
- databaseIds[std::make_pair(databaseId, DataSourceKindToDatabaseType(cluster.GetKind()))] = NYql::TDatabaseAuth{token, /*AddBearer=*/true};
+ databaseAuth[std::make_pair(databaseId, DataSourceKindToDatabaseType(cluster.GetKind()))] = NYql::TDatabaseAuth{token, /*AddBearer=*/true};
DatabaseIdsToClusterNames[databaseId].emplace_back(clusterName);
YQL_CLOG(DEBUG, ProviderGeneric) << "database id '" << databaseId << "' added to mapping";
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_state.h b/ydb/library/yql/providers/generic/provider/yql_generic_state.h
new file mode 100644
index 00000000000..c99b6d04263
--- /dev/null
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_state.h
@@ -0,0 +1,65 @@
+#pragma once
+
+#include "yql_generic_settings.h"
+
+#include <ydb/library/yql/core/yql_data_provider.h>
+#include <ydb/library/yql/providers/generic/connector/libcpp/client.h>
+
+namespace NKikimr::NMiniKQL {
+ class IFunctionRegistry;
+}
+
+namespace NYql {
+ struct TGenericState: public TThrRefBase {
+ using TPtr = TIntrusivePtr<TGenericState>;
+
+ using TTableAddress = std::pair<TString, TString>; // std::pair<clusterName, tableName>
+
+ struct TTableMeta {
+ const TStructExprType* ItemType = nullptr;
+ TVector<TString> ColumnOrder;
+ NYql::NConnector::NApi::TSchema Schema;
+ NYql::NConnector::NApi::TDataSourceInstance DataSourceInstance;
+ };
+
+ using TGetTableResult = std::pair<std::optional<const TTableMeta*>, std::optional<TIssue>>;
+
+ void AddTable(const TStringBuf& clusterName, const TStringBuf& tableName, TTableMeta&& tableMeta) {
+ Tables_.emplace(TTableAddress(clusterName, tableName), tableMeta);
+ }
+
+ TGetTableResult GetTable(const TStringBuf& clusterName, const TStringBuf& tableName) const {
+ auto result = Tables_.FindPtr(TTableAddress(clusterName, tableName));
+ if (result) {
+ return std::make_pair(result, std::nullopt);
+ }
+
+ return std::make_pair(
+ std::nullopt,
+ TIssue(TStringBuilder() << "no metadata for table " << clusterName << "." << tableName));
+ };
+
+ TGetTableResult GetTable(const TStringBuf& clusterName, const TStringBuf& tableName, const TPosition& position) const {
+ auto pair = GetTable(clusterName, tableName);
+ if (pair.second.has_value()) {
+ pair.second->Position = position;
+ }
+
+ return pair;
+ }
+
+ // FIXME: not used anymore, delete it some day
+ std::unordered_map<std::string_view, std::string_view> Timezones;
+
+ TTypeAnnotationContext* Types = nullptr;
+ TGenericConfiguration::TPtr Configuration = MakeIntrusive<TGenericConfiguration>();
+ const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr;
+
+ // key - (database id, database type), value - credentials to access MDB API
+ NYql::IDatabaseAsyncResolver::TDatabaseAuthMap DatabaseAuth;
+ std::shared_ptr<NYql::IDatabaseAsyncResolver> DatabaseResolver;
+
+ private:
+ THashMap<TTableAddress, TTableMeta> Tables_;
+ };
+}