diff options
author | hcpp <hcpp@ydb.tech> | 2023-04-13 12:35:28 +0300 |
---|---|---|
committer | hcpp <hcpp@ydb.tech> | 2023-04-13 12:35:28 +0300 |
commit | cbde992ac1b298645d76d74f243af426cb74f5dc (patch) | |
tree | c7774c70c284794f062364770879932f91e17521 | |
parent | 3316c0db7951eddadcca1eb0e738209b1ec5a987 (diff) | |
download | ydb-cbde992ac1b298645d76d74f243af426cb74f5dc.tar.gz |
dynamic resolve has been added for external table
18 files changed, 575 insertions, 87 deletions
diff --git a/ydb/core/external_sources/external_source.h b/ydb/core/external_sources/external_source.h index 9b5f1afe400..763fdea0ba0 100644 --- a/ydb/core/external_sources/external_source.h +++ b/ydb/core/external_sources/external_source.h @@ -12,8 +12,30 @@ struct TExternalSourceException: public yexception { struct IExternalSource : public TThrRefBase { using TPtr = TIntrusivePtr<IExternalSource>; + /* + Packs TSchema, TGeneral into some string in arbitrary + format: proto, json, text, and others. The output returns a + string called content. Further, this string will be stored inside. + After that, it is passed to the GetParamters method. + Can throw an exception in case of an error. + */ virtual TString Pack(const NKikimrExternalSources::TSchema& schema, const NKikimrExternalSources::TGeneral& general) const = 0; + + /* + The name of the data source that is used inside the + implementation during the read phase. Must match provider name. + */ + virtual TString GetName() const = 0; + + /* + At the input, a string with the name of the content is passed, + which is obtained from the Pack method and returns a list of + parameters that will be put in the AST of the source. Also, + this data will be displayed in the viewer. + Can throw an exception in case of an error + */ + virtual TMap<TString, TString> GetParamters(const TString& content) const = 0; }; } diff --git a/ydb/core/external_sources/object_storage.cpp b/ydb/core/external_sources/object_storage.cpp index d5da28ab48d..4277114392d 100644 --- a/ydb/core/external_sources/object_storage.cpp +++ b/ydb/core/external_sources/object_storage.cpp @@ -1,6 +1,7 @@ #include "external_source.h" #include <ydb/core/protos/external_sources.pb.h> +#include <ydb/library/yql/providers/common/provider/yql_provider_names.h> #include <ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h> #include <ydb/public/api/protos/ydb_status_codes.pb.h> #include <ydb/public/sdk/cpp/client/ydb_value/value.h> @@ -45,6 +46,41 @@ struct TObjectStorageExternalSource : public IExternalSource { return objectStorage.SerializeAsString(); } + virtual TString GetName() const override { + return TString{NYql::S3ProviderName}; + } + + virtual TMap<TString, TString> GetParamters(const TString& content) const override { + NKikimrExternalSources::TObjectStorage objectStorage; + objectStorage.ParseFromStringOrThrow(content); + + TMap<TString, TString> parameters{objectStorage.format_setting().begin(), objectStorage.format_setting().end()}; + if (objectStorage.format()) { + parameters["format"] = objectStorage.format(); + } + + if (objectStorage.compression()) { + parameters["compression"] = objectStorage.compression(); + } + + NSc::TValue projection; + for (const auto& [key, value]: objectStorage.projection()) { + projection[key] = value; + } + + if (!projection.DictEmpty()) { + parameters["projection"] = projection.ToJson(); + } + + NSc::TValue partitionedBy; + partitionedBy.AppendAll(objectStorage.partitioned_by()); + if (!partitionedBy.ArrayEmpty()) { + parameters["partitioned_by"] = partitionedBy.ToJson(); + } + + return parameters; + } + private: static NYql::TIssues Validate(const NKikimrExternalSources::TSchema& schema, const NKikimrExternalSources::TObjectStorage& objectStorage) { NYql::TIssues issues; diff --git a/ydb/core/fq/libs/result_formatter/result_formatter.cpp b/ydb/core/fq/libs/result_formatter/result_formatter.cpp index 5cca0992cb8..8941dcb8860 100644 --- a/ydb/core/fq/libs/result_formatter/result_formatter.cpp +++ b/ydb/core/fq/libs/result_formatter/result_formatter.cpp @@ -79,24 +79,6 @@ NKikimr::NMiniKQL::TType* MakeListType(NKikimr::NMiniKQL::TType* underlying, NKi return TListType::Create(underlying, env); } -const NYql::TTypeAnnotationNode* MakeStructType( - const TVector<std::pair<TString, const NYql::TTypeAnnotationNode*>>& i, - NYql::TExprContext& ctx) -{ - TVector<const NYql::TItemExprType*> items; - items.reserve(i.size()); - for (const auto& [k, v] : i) { - items.push_back(ctx.MakeType<NYql::TItemExprType>(k, v)); - } - return ctx.MakeType<NYql::TStructExprType>(items); -} - -NKikimr::NMiniKQL::TType* MakeStructType( - const TVector<std::pair<TString, NKikimr::NMiniKQL::TType*>>& items, - NKikimr::NMiniKQL::TTypeEnvironment& env) -{ - return TStructType::Create(items.data(), items.size(), env); -} const NYql::TTypeAnnotationNode* MakeTupleType( const TVector<const NYql::TTypeAnnotationNode*>& items, @@ -455,4 +437,27 @@ void FormatResultSet(NJson::TJsonValue& root, const NYdb::TResultSet& resultSet, } } +const NYql::TTypeAnnotationNode* MakeStructType( + const TVector<std::pair<TString, const NYql::TTypeAnnotationNode*>>& i, + NYql::TExprContext& ctx) +{ + TVector<const NYql::TItemExprType*> items; + items.reserve(i.size()); + for (const auto& [k, v] : i) { + items.push_back(ctx.MakeType<NYql::TItemExprType>(k, v)); + } + return ctx.MakeType<NYql::TStructExprType>(items); +} + +NKikimr::NMiniKQL::TType* MakeStructType( + const TVector<std::pair<TString, NKikimr::NMiniKQL::TType*>>& items, + NKikimr::NMiniKQL::TTypeEnvironment& env) +{ + return TStructType::Create(items.data(), items.size(), env); +} + +const NYql::TTypeAnnotationNode* MakeType(NYdb::TTypeParser& parser, NYql::TExprContext& ctx) { + return MakeType<const NYql::TTypeAnnotationNode*, NYql::TExprContext>(parser, ctx); +} + } // namespace NFq diff --git a/ydb/core/fq/libs/result_formatter/result_formatter.h b/ydb/core/fq/libs/result_formatter/result_formatter.h index c46a44344a6..13bcd6c6754 100644 --- a/ydb/core/fq/libs/result_formatter/result_formatter.h +++ b/ydb/core/fq/libs/result_formatter/result_formatter.h @@ -8,9 +8,15 @@ #include <library/cpp/json/json_writer.h> +#include <ydb/library/yql/ast/yql_expr.h> +#include <ydb/library/yql/minikql/mkql_node.h> + namespace NFq { void FormatResultSet(NJson::TJsonValue& root, const NYdb::TResultSet& resultSet, bool typeNameAsString = false, bool prettyValueFormat = false); TString FormatSchema(const FederatedQuery::Schema& schema); +const NYql::TTypeAnnotationNode* MakeStructType(const TVector<std::pair<TString, const NYql::TTypeAnnotationNode*>>& i, NYql::TExprContext& ctx); +NKikimr::NMiniKQL::TType* MakeStructType(const TVector<std::pair<TString, NKikimr::NMiniKQL::TType*>>& items, NKikimr::NMiniKQL::TTypeEnvironment& env); +const NYql::TTypeAnnotationNode* MakeType(NYdb::TTypeParser& parser, NYql::TExprContext& ctx); } // namespace NFq diff --git a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp index 111e7250901..a11485f5e51 100644 --- a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp +++ b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp @@ -43,6 +43,19 @@ std::pair<TNavigate::TEntry, TString> CreateNavigateEntry(const std::pair<TIndex return {entry, pair.second}; } +std::optional<std::pair<TNavigate::TEntry, TString>> CreateNavigateExternalEntry(const TString& path) { + TNavigate::TEntry entry; + entry.Path = SplitPath(path); + entry.Operation = NSchemeCache::TSchemeCacheNavigate::EOp::OpUnknown; + entry.SyncVersion = true; + return {{entry, path}}; +} + +std::optional<std::pair<TNavigate::TEntry, TString>> CreateNavigateExternalEntry(const std::pair<TIndexId, TString>& pair) { + Y_UNUSED(pair); + return {}; +} + ui64 GetExpectedVersion(const std::pair<TIndexId, TString>& pathId) { return pathId.first.SchemaVersion; } @@ -80,37 +93,22 @@ void IndexProtoToMetadata(const TIndexProto& indexes, NYql::TKikimrTableMetadata } } +TString GetTypeName(const NScheme::TTypeInfoMod& typeInfoMod, bool notNull) { + TString typeName; + if (typeInfoMod.TypeInfo.GetTypeId() != NScheme::NTypeIds::Pg) { + YQL_ENSURE(NScheme::TryGetTypeName(typeInfoMod.TypeInfo.GetTypeId(), typeName)); + } else { + YQL_ENSURE(typeInfoMod.TypeInfo.GetTypeDesc(), "no pg type descriptor"); + YQL_ENSURE(!notNull, "pg not null types are not allowed"); + typeName = NPg::PgTypeNameFromTypeDesc(typeInfoMod.TypeInfo.GetTypeDesc(), typeInfoMod.TypeMod); + } + return typeName; +} -TTableMetadataResult GetLoadTableMetadataResult(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry, +TTableMetadataResult GetTableMetadataResult(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry, const TString& cluster, const TString& tableName) { - using TResult = NYql::IKikimrGateway::TTableMetadataResult; - using EStatus = NSchemeCache::TSchemeCacheNavigate::EStatus; using EKind = NSchemeCache::TSchemeCacheNavigate::EKind; - auto message = ToString(entry.Status); - - switch (entry.Status) { - case EStatus::Ok: - break; - case EStatus::PathErrorUnknown: - case EStatus::RootUnknown: { - TTableMetadataResult result; - result.SetSuccess(); - result.Metadata = new NYql::TKikimrTableMetadata(cluster, tableName); - return result; - } - case EStatus::PathNotTable: - case EStatus::TableCreationNotComplete: - return ResultFromError<TResult>(YqlIssue({}, TIssuesIds::KIKIMR_SCHEME_ERROR, message)); - case EStatus::LookupError: - case EStatus::RedirectLookupError: - return ResultFromError<TResult>(YqlIssue({}, TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, message)); - default: - return ResultFromError<TResult>(ToString(entry.Status)); - } - - YQL_ENSURE(entry.Kind == EKind::KindTable || entry.Kind == EKind::KindColumnTable); - TTableMetadataResult result; result.SetSuccess(); result.Metadata = new NYql::TKikimrTableMetadata(cluster, tableName); @@ -149,15 +147,8 @@ TTableMetadataResult GetLoadTableMetadataResult(const NSchemeCache::TSchemeCache std::map<ui32, TString, std::less<ui32>> columnOrder; for (auto& pair : entry.Columns) { const auto& columnDesc = pair.second; - TString typeName; auto notNull = entry.NotNullColumns.contains(columnDesc.Name); - if (columnDesc.PType.GetTypeId() != NScheme::NTypeIds::Pg) { - YQL_ENSURE(NScheme::TryGetTypeName(columnDesc.PType.GetTypeId(), typeName)); - } else { - Y_VERIFY(columnDesc.PType.GetTypeDesc(), "no pg type descriptor"); - Y_VERIFY(!notNull, "pg not null types are not allowed"); - typeName = NPg::PgTypeNameFromTypeDesc(columnDesc.PType.GetTypeDesc(), columnDesc.PTypeMod); - } + const TString typeName = GetTypeName(NScheme::TTypeInfoMod{columnDesc.PType, columnDesc.PTypeMod}, notNull); tableMeta->Columns.emplace( columnDesc.Name, NYql::TKikimrColumnMetadata( @@ -185,6 +176,125 @@ TTableMetadataResult GetLoadTableMetadataResult(const NSchemeCache::TSchemeCache return result; } +TTableMetadataResult GetExternalTableMetadataResult(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry, + const TString& cluster, const TString& tableName) { + const auto& description = entry.ExternalTableInfo->Description; + TTableMetadataResult result; + result.SetSuccess(); + result.Metadata = new NYql::TKikimrTableMetadata(cluster, tableName); + auto tableMeta = result.Metadata; + tableMeta->DoesExist = true; + tableMeta->PathId = NYql::TKikimrPathId(description.GetPathId().GetOwnerId(), description.GetPathId().GetLocalId()); + tableMeta->SchemaVersion = description.GetVersion(); + tableMeta->Kind = NYql::EKikimrTableKind::External; + + tableMeta->Attributes = entry.Attributes; + + for (auto& columnDesc : description.GetColumns()) { + const auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(columnDesc.GetTypeId(), + columnDesc.HasTypeInfo() ? &columnDesc.GetTypeInfo() : nullptr); + const TString typeName = GetTypeName(typeInfoMod, columnDesc.GetNotNull()); + tableMeta->Columns.emplace( + columnDesc.GetName(), + NYql::TKikimrColumnMetadata( + columnDesc.GetName(), columnDesc.GetId(), typeName, columnDesc.GetNotNull(), typeInfoMod.TypeInfo, typeInfoMod.TypeMod + ) + ); + } + + tableMeta->ExternalSource.Type = description.GetSourceType(); + tableMeta->ExternalSource.TableLocation = description.GetLocation(); + tableMeta->ExternalSource.TableContent = description.GetContent(); + tableMeta->ExternalSource.DataSourcePath = description.GetDataSourcePath(); + return result; +} + +TTableMetadataResult GetExternalDataSourceMetadataResult(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry, + const TString& cluster, const TString& tableName) { + const auto& description = entry.ExternalDataSourceInfo->Description; + TTableMetadataResult result; + result.SetSuccess(); + result.Metadata = new NYql::TKikimrTableMetadata(cluster, tableName); + auto tableMeta = result.Metadata; + tableMeta->DoesExist = true; + tableMeta->PathId = NYql::TKikimrPathId(description.GetPathId().GetOwnerId(), description.GetPathId().GetLocalId()); + tableMeta->SchemaVersion = description.GetVersion(); + tableMeta->Kind = NYql::EKikimrTableKind::External; + + tableMeta->Attributes = entry.Attributes; + + tableMeta->ExternalSource.Type = description.GetSourceType(); + tableMeta->ExternalSource.DataSourceLocation = description.GetLocation(); + tableMeta->ExternalSource.DataSourceInstallation = description.GetInstallation(); + tableMeta->ExternalSource.DataSourceAuth = description.GetAuth(); + return result; +} + +TTableMetadataResult GetLoadTableMetadataResult(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry, + const TString& cluster, const TString& tableName) { + using TResult = NYql::IKikimrGateway::TTableMetadataResult; + using EStatus = NSchemeCache::TSchemeCacheNavigate::EStatus; + using EKind = NSchemeCache::TSchemeCacheNavigate::EKind; + + auto message = ToString(entry.Status); + + switch (entry.Status) { + case EStatus::Ok: + break; + case EStatus::PathErrorUnknown: + case EStatus::RootUnknown: { + TTableMetadataResult result; + result.SetSuccess(); + result.Metadata = new NYql::TKikimrTableMetadata(cluster, tableName); + return result; + } + case EStatus::PathNotTable: + case EStatus::TableCreationNotComplete: + return ResultFromError<TResult>(YqlIssue({}, TIssuesIds::KIKIMR_SCHEME_ERROR, message)); + case EStatus::LookupError: + case EStatus::RedirectLookupError: + return ResultFromError<TResult>(YqlIssue({}, TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, message)); + default: + return ResultFromError<TResult>(ToString(entry.Status)); + } + + YQL_ENSURE(entry.Kind == EKind::KindTable || entry.Kind == EKind::KindColumnTable || entry.Kind == EKind::KindExternalTable || entry.Kind == EKind::KindExternalDataSource); + + TTableMetadataResult result; + switch (entry.Kind) { + case EKind::KindExternalTable: + result = GetExternalTableMetadataResult(entry, cluster, tableName); + break; + case EKind::KindExternalDataSource: + result = GetExternalDataSourceMetadataResult(entry, cluster, tableName); + break; + default: + result = GetTableMetadataResult(entry, cluster, tableName); + } + return result; +} + + +TTableMetadataResult EnrichExternalTable(const TTableMetadataResult& externalTable, const TTableMetadataResult& externalDataSource) { + TTableMetadataResult result; + if (!externalTable.Success()) { + result.AddIssues(externalTable.Issues()); + return result; + } + if (!externalDataSource.Success()) { + result.AddIssues(externalDataSource.Issues()); + return result; + } + + result.SetSuccess(); + result.Metadata = externalTable.Metadata; + auto tableMeta = result.Metadata; + tableMeta->ExternalSource.DataSourceLocation = externalDataSource.Metadata->ExternalSource.DataSourceLocation; + tableMeta->ExternalSource.DataSourceInstallation = externalDataSource.Metadata->ExternalSource.DataSourceInstallation; + tableMeta->ExternalSource.DataSourceAuth = externalDataSource.Metadata->ExternalSource.DataSourceAuth; + return result; +} + TString GetDebugString(const TString& id) { return TStringBuilder() << " Path: " << id; } @@ -235,6 +345,10 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta return MakeFuture(result); } + if (result.Metadata->Kind == NYql::EKikimrTableKind::External) { + return MakeFuture(result); + } + auto locked = ptr.lock(); if (!locked) { result.SetStatus(TIssuesIds::KIKIMR_INDEX_METADATA_LOAD_FAILED); @@ -374,6 +488,19 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadIndexMeta } } +NSchemeCache::TSchemeCacheNavigate::TEntry& InferEntry(NKikimr::NSchemeCache::TSchemeCacheNavigate::TResultSet& resultSet) { + using EStatus = NSchemeCache::TSchemeCacheNavigate::EStatus; + using EKind = NSchemeCache::TSchemeCacheNavigate::EKind; + + if (resultSet.size() != 2 || resultSet[1].Status != EStatus::Ok) { + return resultSet[0]; + } + + return IsIn({EKind::KindExternalDataSource, EKind::KindExternalTable}, resultSet[1].Kind) + ? resultSet[1] + : resultSet[0]; +} + // The type is TString or std::pair<TIndexId, TString> template<typename TPath> NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMetadataCache( @@ -388,12 +515,16 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta using EKind = NSchemeCache::TSchemeCacheNavigate::EKind; const auto entry = CreateNavigateEntry(id, settings); + const auto externalEntry = CreateNavigateExternalEntry(id); const ui64 expectedSchemaVersion = GetExpectedVersion(id); LOG_DEBUG_S(*ActorSystem, NKikimrServices::KQP_GATEWAY, "Load table metadata from cache by path, request" << GetDebugString(id)); auto navigate = MakeHolder<TNavigate>(); navigate->ResultSet.emplace_back(entry.first); + if (externalEntry) { + navigate->ResultSet.emplace_back(externalEntry->first); + } const TString& table = entry.second; navigate->DatabaseName = database; @@ -416,10 +547,15 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta YQL_ENSURE(response.Request); auto& navigate = *response.Request; - YQL_ENSURE(navigate.ResultSet.size() == 1); - auto& entry = navigate.ResultSet[0]; + YQL_ENSURE(1 <= navigate.ResultSet.size() && navigate.ResultSet.size() <= 2); + auto& entry = InferEntry(navigate.ResultSet); + + if (entry.Status != EStatus::Ok) { + promise.SetValue(GetLoadTableMetadataResult(entry, cluster, table)); + return; + } - if (entry.Status == EStatus::Ok && expectedSchemaVersion && entry.TableId.SchemaVersion) { + if (!IsIn({EKind::KindExternalDataSource, EKind::KindExternalTable}, entry.Kind) && expectedSchemaVersion && entry.TableId.SchemaVersion) { if (entry.TableId.SchemaVersion != expectedSchemaVersion) { const auto message = TStringBuilder() << "schema version mismatch during metadata loading for: " @@ -433,22 +569,46 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta } } - if (entry.Status == EStatus::Ok && entry.Kind == EKind::KindIndex) { - Y_ENSURE(entry.ListNodeEntry, "expected children list"); - Y_ENSURE(entry.ListNodeEntry->Children.size() == 1, "expected one child"); - - TIndexId pathId = TIndexId( - entry.ListNodeEntry->Children[0].PathId, - entry.ListNodeEntry->Children[0].SchemaVersion - ); - - LoadTableMetadataCache(cluster, std::make_pair(pathId, table), settings, database, userToken) - .Apply([promise](const TFuture<TTableMetadataResult>& result) mutable - { - promise.SetValue(result.GetValue()); - }); - } else { - promise.SetValue(GetLoadTableMetadataResult(entry, cluster, table)); + switch (entry.Kind) { + case EKind::KindExternalDataSource: { + promise.SetValue(GetLoadTableMetadataResult(entry, cluster, table)); + } + break; + case EKind::KindExternalTable: { + YQL_ENSURE(entry.ExternalTableInfo, "expected external table info"); + const auto& dataSourcePath = entry.ExternalTableInfo->Description.GetDataSourcePath(); + auto externalTableMetadata = GetLoadTableMetadataResult(entry, cluster, table); + if (!externalTableMetadata.Success()) { + promise.SetValue(externalTableMetadata); + return; + } + LoadTableMetadataCache(cluster, dataSourcePath, settings, database, userToken) + .Apply([promise, externalTableMetadata](const TFuture<TTableMetadataResult>& result) mutable + { + auto externalDataSourceMetadata = result.GetValue(); + promise.SetValue(EnrichExternalTable(externalTableMetadata, externalDataSourceMetadata)); + }); + } + break; + case EKind::KindIndex: { + Y_ENSURE(entry.ListNodeEntry, "expected children list"); + Y_ENSURE(entry.ListNodeEntry->Children.size() == 1, "expected one child"); + + TIndexId pathId = TIndexId( + entry.ListNodeEntry->Children[0].PathId, + entry.ListNodeEntry->Children[0].SchemaVersion + ); + + LoadTableMetadataCache(cluster, std::make_pair(pathId, table), settings, database, userToken) + .Apply([promise](const TFuture<TTableMetadataResult>& result) mutable + { + promise.SetValue(result.GetValue()); + }); + } + break; + default: { + promise.SetValue(GetLoadTableMetadataResult(entry, cluster, table)); + } } } catch (yexception& e) { diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index 32858840b19..a2de3e62de9 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -1,6 +1,7 @@ #include "kqp_host_impl.h" #include <ydb/core/base/appdata.h> +#include <ydb/core/external_sources/external_source_factory.h> #include <ydb/core/kqp/common/kqp_yql.h> #include <ydb/core/kqp/opt/kqp_query_plan.h> #include <ydb/core/kqp/provider/yql_kikimr_provider_impl.h> @@ -1477,7 +1478,7 @@ private: // Kikimr provider auto queryExecutor = MakeIntrusive<TKqpQueryExecutor>(Gateway, Cluster, SessionCtx, KqpRunner); - auto kikimrDataSource = CreateKikimrDataSource(*FuncRegistry, *TypesCtx, Gateway, SessionCtx); + auto kikimrDataSource = CreateKikimrDataSource(*FuncRegistry, *TypesCtx, Gateway, SessionCtx, ExternalSourceFactory); auto kikimrDataSink = CreateKikimrDataSink(*FuncRegistry, *TypesCtx, Gateway, SessionCtx, queryExecutor); FillSettings.AllResultsBytesLimit = Nothing(); @@ -1600,6 +1601,7 @@ private: TIntrusivePtr<TExecuteContext> ExecuteCtx; TIntrusivePtr<IKqpRunner> KqpRunner; + NExternalSource::IExternalSourceFactory::TPtr ExternalSourceFactory{NExternalSource::CreateExternalSourceFactory()}; }; } // namespace diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp index a963a8bdcb5..8e745b30c66 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp @@ -30,6 +30,7 @@ TMaybeNode<TExprBase> TryBuildTrivialReadTable(TCoFlatMap& flatmap, TKqlReadTabl case EKikimrTableKind::SysView: break; case EKikimrTableKind::Olap: + case EKikimrTableKind::External: case EKikimrTableKind::Unspecified: return {}; } diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp index 107cc3d410b..22cd6719023 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp @@ -285,7 +285,6 @@ TExprBase KqpBuildReadTableStage(TExprBase node, TExprContext& ctx, const TKqpOp .Settings(read.Settings()) .Done(); break; - default: YQL_ENSURE(false, "Unexpected table kind: " << (ui32)tableDesc.Metadata->Kind); break; diff --git a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp index 43028606cb9..87d46207c7d 100644 --- a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp @@ -5,8 +5,14 @@ #include <ydb/library/yql/core/yql_expr_optimize.h> #include <ydb/library/yql/core/yql_expr_type_annotation.h> +#include <ydb/library/yql/providers/common/schema/expr/yql_expr_schema.h> #include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h> +#include <ydb/core/external_sources/external_source_factory.h> +#include <ydb/core/fq/libs/result_formatter/result_formatter.h> + +#include <ydb/public/sdk/cpp/client/ydb_value/value.h> + #include <util/generic/is_in.h> namespace NYql { @@ -91,9 +97,13 @@ class TKiSourceLoadTableMetadataTransformer : public TGraphTransformerBase { public: TKiSourceLoadTableMetadataTransformer( TIntrusivePtr<IKikimrGateway> gateway, - TIntrusivePtr<TKikimrSessionContext> sessionCtx) + TIntrusivePtr<TKikimrSessionContext> sessionCtx, + TTypeAnnotationContext& types, + const NExternalSource::IExternalSourceFactory::TPtr& externalSourceFactory) : Gateway(gateway) - , SessionCtx(sessionCtx) {} + , SessionCtx(sessionCtx) + , Types(types) + , ExternalSourceFactory(externalSourceFactory) {} TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { output = input; @@ -161,6 +171,31 @@ public: return AsyncFuture; } + bool AddCluster(const std::pair<TString, TString>& table, IKikimrGateway::TTableMetadataResult& res, TExprNode::TPtr input, TExprContext& ctx) { + const auto& metadata = *res.Metadata; + if (metadata.Kind != EKikimrTableKind::External) { + return true; + } + auto source = ExternalSourceFactory->GetOrCreate(metadata.ExternalSource.Type); + auto it = Types.DataSourceMap.find(source->GetName()); + if (it == Types.DataSourceMap.end()) { + TIssueScopeGuard issueScope(ctx.IssueManager, [input, &table, &ctx]() { + return MakeIntrusive<TIssue>(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() + << "Failed to load metadata for table (data source doesn't exist): " + << NCommon::FullTableName(table.first, table.second))); + }); + + res.ReportIssues(ctx.IssueManager); + LoadResults.clear(); + return false; + } + it->second->AddCluster(metadata.ExternalSource.DataSourcePath, {{ + {"location", metadata.ExternalSource.DataSourceLocation }, + {"installation", metadata.ExternalSource.DataSourceInstallation } + }}); + return true; + } + TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { output = input; YQL_ENSURE(AsyncFuture.HasValue()); @@ -189,6 +224,10 @@ public: LoadResults.clear(); return TStatus::Error; } + + if (!AddCluster(table, res, input, ctx)) { + return TStatus::Error; + } } else { TIssueScopeGuard issueScope(ctx.IssueManager, [input, &table, &ctx]() { return MakeIntrusive<TIssue>(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() @@ -214,6 +253,8 @@ public: private: TIntrusivePtr<IKikimrGateway> Gateway; TIntrusivePtr<TKikimrSessionContext> SessionCtx; + TTypeAnnotationContext& Types; + NExternalSource::IExternalSourceFactory::TPtr ExternalSourceFactory; THashMap<std::pair<TString, TString>, std::shared_ptr<IKikimrGateway::TTableMetadataResult>> LoadResults; NThreading::TFuture<void> AsyncFuture; @@ -287,14 +328,16 @@ public: const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry, TTypeAnnotationContext& types, TIntrusivePtr<IKikimrGateway> gateway, - TIntrusivePtr<TKikimrSessionContext> sessionCtx) + TIntrusivePtr<TKikimrSessionContext> sessionCtx, + const NExternalSource::IExternalSourceFactory::TPtr& externalSourceFactory) : FunctionRegistry(functionRegistry) , Types(types) , Gateway(gateway) , SessionCtx(sessionCtx) + , ExternalSourceFactory(externalSourceFactory) , ConfigurationTransformer(new TKikimrConfigurationTransformer(sessionCtx, types)) , IntentDeterminationTransformer(new TKiSourceIntentDeterminationTransformer(sessionCtx)) - , LoadTableMetadataTransformer(CreateKiSourceLoadTableMetadataTransformer(gateway, sessionCtx)) + , LoadTableMetadataTransformer(CreateKiSourceLoadTableMetadataTransformer(gateway, sessionCtx, types, externalSourceFactory)) , TypeAnnotationTransformer(CreateKiSourceTypeAnnotationTransformer(sessionCtx, types)) , CallableExecutionTransformer(CreateKiSourceCallableExecutionTransformer(gateway, sessionCtx)) @@ -479,12 +522,54 @@ public: return false; } + static Ydb::Type CreateYdbType(const NScheme::TTypeInfo& typeInfo, bool notNull) { + Ydb::Type ydbType; + if (typeInfo.GetTypeId() == NScheme::NTypeIds::Pg) { + auto* typeDesc = typeInfo.GetTypeDesc(); + auto* pg = ydbType.mutable_pg_type(); + pg->set_type_name(NPg::PgTypeNameFromTypeDesc(typeDesc)); + pg->set_oid(NPg::PgTypeIdFromTypeDesc(typeDesc)); + } else { + auto& item = notNull + ? ydbType + : *ydbType.mutable_optional_type()->mutable_item(); + item.set_type_id((Ydb::Type::PrimitiveTypeId)typeInfo.GetTypeId()); + } + return ydbType; + } + + TExprNode::TPtr BuildSettings(TPositionHandle pos, TExprContext& ctx, const TMap<TString, NYql::TKikimrColumnMetadata>& columns, const NExternalSource::IExternalSource::TPtr& source, const TString& content) { + TVector<std::pair<TString, const NYql::TTypeAnnotationNode*>> typedColumns; + typedColumns.reserve(columns.size()); + for (const auto& [n, c] : columns) { + NYdb::TTypeParser parser(NYdb::TType(CreateYdbType(c.TypeInfo, c.NotNull))); + auto type = NFq::MakeType(parser, ctx); + typedColumns.emplace_back(n, type); + } + + const TString ysonSchema = NYql::NCommon::WriteTypeToYson(NFq::MakeStructType(typedColumns, ctx), NYson::EYsonFormat::Text); + TExprNode::TListType items; + auto schema = ctx.NewAtom(pos, ysonSchema); + auto type = ctx.NewCallable(pos, "SqlTypeFromYson"sv, { schema }); + auto order = ctx.NewCallable(pos, "SqlColumnOrderFromYson"sv, { schema }); + auto userSchema = ctx.NewAtom(pos, "userschema"sv); + items.emplace_back(ctx.NewList(pos, {userSchema, type, order})); + + for (const auto& [key, value]: source->GetParamters(content)) { + auto keyAtom = ctx.NewAtom(pos, key); + auto valueAtom = ctx.NewAtom(pos, value); + items.emplace_back(ctx.NewList(pos, {keyAtom, valueAtom})); + } + return ctx.NewList(pos, std::move(items)); + } + TExprNode::TPtr RewriteIO(const TExprNode::TPtr& node, TExprContext& ctx) override { auto read = node->Child(0); if (!read->IsCallable(ReadName)) { ythrow yexception() << "Expected Read!"; } + TKiDataSource source(read->ChildPtr(1)); TKikimrKey key(ctx); if (!key.Extract(*read->Child(2))) { return nullptr; @@ -505,6 +590,38 @@ public: YQL_ENSURE(false, "Unsupported Kikimr KeyType."); } + auto& tableDesc = SessionCtx->Tables().GetTable(TString{source.Cluster()}, key.GetTablePath()); + if (key.GetKeyType() == TKikimrKey::Type::Table && tableDesc.Metadata->Kind == EKikimrTableKind::External) { + const auto& source = ExternalSourceFactory->GetOrCreate(tableDesc.Metadata->ExternalSource.Type); + ctx.Step.Repeat(TExprStep::DiscoveryIO) + .Repeat(TExprStep::Epochs) + .Repeat(TExprStep::Intents) + .Repeat(TExprStep::LoadTablesMetadata) + .Repeat(TExprStep::RewriteIO); + TExprNode::TPtr path = ctx.NewCallable(node->Pos(), "String", { ctx.NewAtom(node->Pos(), tableDesc.Metadata->ExternalSource.TableLocation) }); + auto table = ctx.NewList(node->Pos(), {ctx.NewAtom(node->Pos(), "table"), path}); + auto key = ctx.NewCallable(node->Pos(), "Key", {table}); + auto newRead = Build<TCoRead>(ctx, node->Pos()) + .World(read->Child(0)) + .DataSource( + Build<TCoDataSource>(ctx, node->Pos()) + .Category(ctx.NewAtom(node->Pos(), source->GetName())) + .FreeArgs() + .Add(ctx.NewAtom(node->Pos(), tableDesc.Metadata->ExternalSource.DataSourcePath)) + .Build() + .Done().Ptr() + ) + .FreeArgs() + .Add(ctx.NewCallable(node->Pos(), "MrTableConcat", {key})) + .Add(ctx.NewCallable(node->Pos(), "Void", {})) + .Add(BuildSettings(node->Pos(), ctx, tableDesc.Metadata->Columns, source, tableDesc.Metadata->ExternalSource.TableContent)) + .Build() + .Done().Ptr(); + auto retChildren = node->ChildrenList(); + retChildren[0] = newRead; + return ctx.ChangeChildren(*node, std::move(retChildren)); + } + auto newRead = ctx.RenameNode(*read, newName); if (auto maybeRead = TMaybeNode<TKiReadTable>(newRead)) { @@ -613,6 +730,7 @@ private: TTypeAnnotationContext& Types; TIntrusivePtr<IKikimrGateway> Gateway; TIntrusivePtr<TKikimrSessionContext> SessionCtx; + NExternalSource::IExternalSourceFactory::TPtr ExternalSourceFactory; TAutoPtr<IGraphTransformer> ConfigurationTransformer; TAutoPtr<IGraphTransformer> IntentDeterminationTransformer; @@ -650,15 +768,18 @@ TIntrusivePtr<IDataProvider> CreateKikimrDataSource( const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry, TTypeAnnotationContext& types, TIntrusivePtr<IKikimrGateway> gateway, - TIntrusivePtr<TKikimrSessionContext> sessionCtx) + TIntrusivePtr<TKikimrSessionContext> sessionCtx, + const NExternalSource::IExternalSourceFactory::TPtr& externalSourceFactory) { - return new TKikimrDataSource(functionRegistry, types, gateway, sessionCtx); + return new TKikimrDataSource(functionRegistry, types, gateway, sessionCtx, externalSourceFactory); } TAutoPtr<IGraphTransformer> CreateKiSourceLoadTableMetadataTransformer(TIntrusivePtr<IKikimrGateway> gateway, - TIntrusivePtr<TKikimrSessionContext> sessionCtx) + TIntrusivePtr<TKikimrSessionContext> sessionCtx, + TTypeAnnotationContext& types, + const NExternalSource::IExternalSourceFactory::TPtr& externalSourceFactory) { - return new TKiSourceLoadTableMetadataTransformer(gateway, sessionCtx); + return new TKiSourceLoadTableMetadataTransformer(gateway, sessionCtx, types, externalSourceFactory); } } // namespace NYql diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h index 0493fdc0312..11109ee9f45 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway.h +++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h @@ -283,7 +283,8 @@ enum class EKikimrTableKind : ui32 { Unspecified = 0, Datashard = 1, SysView = 2, - Olap = 3 + Olap = 3, + External = 4 }; enum class ETableType : ui32 { @@ -305,6 +306,16 @@ enum class EStoreType : ui32 { Column = 1 }; +struct TExternalSource { + TString Type; + TString TableLocation; + TString TableContent; + TString DataSourcePath; + TString DataSourceLocation; + TString DataSourceInstallation; + NKikimrSchemeOp::TAuth DataSourceAuth; +}; + struct TKikimrTableMetadata : public TThrRefBase { bool DoesExist = false; TString Cluster; @@ -336,6 +347,8 @@ struct TKikimrTableMetadata : public TThrRefBase { TVector<TColumnFamily> ColumnFamilies; TTableSettings TableSettings; + TExternalSource ExternalSource; + TKikimrTableMetadata(const TString& cluster, const TString& table) : Cluster(cluster) , Name(table) diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp index 1340464e725..eaca7a69698 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp @@ -439,6 +439,40 @@ Y_UNIT_TEST_SUITE(KikimrIcGateway) { TestCreateExternalDataSource(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_data_source"); TestDropExternalDataSource(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_data_source"); } + + Y_UNIT_TEST(TestLoadExternalTable) { + TKikimrRunner kikimr; + kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + TString externalDataSourceName = "/Root/ExternalDataSource"; + TString externalTableName = "/Root/ExternalTable"; + auto query = TStringBuilder() << R"( + CREATE EXTERNAL DATA SOURCE `)" << externalDataSourceName << R"(` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="my-bucket", + AUTH_METHOD="NONE" + ); + CREATE EXTERNAL TABLE `)" << externalTableName << R"(` ( + Key Uint64, + Value String + ) WITH ( + DATA_SOURCE=")" << externalDataSourceName << R"(", + LOCATION="/" + );)"; + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + auto responseFuture = GetIcGateway(kikimr.GetTestServer())->LoadTableMetadata(TestCluster, externalTableName, IKikimrGateway::TLoadTableMetadataSettings()); + responseFuture.Wait(); + auto response = responseFuture.GetValue(); + response.Issues().PrintTo(Cerr); + UNIT_ASSERT(response.Success()); + UNIT_ASSERT_VALUES_EQUAL(response.Metadata->ExternalSource.Type, "ObjectStorage"); + UNIT_ASSERT_VALUES_EQUAL(response.Metadata->ExternalSource.TableLocation, "/"); + UNIT_ASSERT_VALUES_EQUAL(response.Metadata->ExternalSource.DataSourcePath, externalDataSourceName); + UNIT_ASSERT_VALUES_EQUAL(response.Metadata->ExternalSource.DataSourceLocation, "my-bucket"); + UNIT_ASSERT_VALUES_EQUAL(response.Metadata->Columns.size(), 2); + } } } // namespace NYql diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h index 560a81ccc50..1d104a4b839 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider.h @@ -3,6 +3,7 @@ #include "yql_kikimr_gateway.h" #include "yql_kikimr_settings.h" +#include <ydb/core/external_sources/external_source_factory.h> #include <ydb/core/kqp/query_data/kqp_query_data.h> #include <ydb/library/yql/ast/yql_gc_nodes.h> #include <ydb/library/yql/core/yql_type_annotation.h> @@ -452,7 +453,8 @@ TIntrusivePtr<IDataProvider> CreateKikimrDataSource( const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry, TTypeAnnotationContext& types, TIntrusivePtr<IKikimrGateway> gateway, - TIntrusivePtr<TKikimrSessionContext> sessionCtx); + TIntrusivePtr<TKikimrSessionContext> sessionCtx, + const NKikimr::NExternalSource::IExternalSourceFactory::TPtr& sourceFactory); TIntrusivePtr<IDataProvider> CreateKikimrDataSink( const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry, diff --git a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h index b94e2c2eb2f..d1d4702db5c 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h @@ -2,6 +2,7 @@ #include "yql_kikimr_provider.h" +#include <ydb/core/external_sources/external_source_factory.h> #include <ydb/core/kqp/provider/yql_kikimr_expr_nodes.h> #include <ydb/core/kqp/provider/yql_kikimr_results.h> @@ -155,7 +156,9 @@ TAutoPtr<IGraphTransformer> CreateKiLogicalOptProposalTransformer(TIntrusivePtr< TTypeAnnotationContext& types); TAutoPtr<IGraphTransformer> CreateKiPhysicalOptProposalTransformer(TIntrusivePtr<TKikimrSessionContext> sessionCtx); TAutoPtr<IGraphTransformer> CreateKiSourceLoadTableMetadataTransformer(TIntrusivePtr<IKikimrGateway> gateway, - TIntrusivePtr<TKikimrSessionContext> sessionCtx); + TIntrusivePtr<TKikimrSessionContext> sessionCtx, + TTypeAnnotationContext& types, + const NKikimr::NExternalSource::IExternalSourceFactory::TPtr& sourceFactory); TAutoPtr<IGraphTransformer> CreateKiSinkIntentDeterminationTransformer(TIntrusivePtr<TKikimrSessionContext> sessionCtx); TAutoPtr<IGraphTransformer> CreateKiSourceCallableExecutionTransformer( diff --git a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp index b64aad7b937..edf8e5d2456 100644 --- a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp @@ -8,6 +8,10 @@ #include <aws/s3/model/PutObjectRequest.h> #include <aws/s3/S3Client.h> +#include <ydb/library/yql/utils/log/log.h> + +#include <fmt/format.h> + namespace NKikimr { namespace NKqp { @@ -61,7 +65,6 @@ void CreateBucketWithObject(const TString& bucket, const TString& object, const Y_UNIT_TEST_SUITE(KqpFederatedQuery) { Y_UNIT_TEST(ExecuteScript) { - Cerr << "S3 endpoint: " << GetEnv("S3_ENDPOINT") << Endl; CreateBucketWithObject("test_bucket", "Root/test_object", TEST_CONTENT); SetEnv("TEST_S3_BUCKET", "test_bucket"); SetEnv("TEST_S3_OBJECT", "test_object"); @@ -101,6 +104,74 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2"); UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world"); } + + Y_UNIT_TEST(ExecuteScriptWithExternalTableResolve) { + using namespace fmt::literals; + const TString externalDataSourceName = "/Root/external_data_source"; + const TString externalTableName = "/Root/test_binding_resolve"; + const TString bucket = "test_bucket1"; + const TString object = "Root/test_object"; + + CreateBucketWithObject(bucket, object, TEST_CONTENT); + + auto kikimr = DefaultKikimrRunner(); + kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true); + + auto tc = kikimr.GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format(R"( + CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{location}", + AUTH_METHOD="NONE" + ); + CREATE EXTERNAL TABLE `{external_table}` ( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ) WITH ( + DATA_SOURCE="{external_source}", + LOCATION="{object}", + FORMAT="json_each_row" + );)", + "external_source"_a = externalTableName, + "external_table"_a = externalDataSourceName, + "location"_a = GetEnv("S3_ENDPOINT") + "/" + bucket + "/", + "object"_a = object + ); + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + auto db = kikimr.GetQueryClient(); + auto executeScrptsResult = db.ExecuteScript(fmt::format(R"( + SELECT * FROM `{external_table}` + )", "external_table"_a=externalDataSourceName)).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(executeScrptsResult.Status().GetStatus(), EStatus::SUCCESS, executeScrptsResult.Status().GetIssues().ToString()); + UNIT_ASSERT(executeScrptsResult.Metadata().ExecutionId); + + TMaybe<TFetchScriptResultsResult> results; + do { + Sleep(TDuration::MilliSeconds(50)); + TAsyncFetchScriptResultsResult future = db.FetchScriptResults(executeScrptsResult.Metadata().ExecutionId); + results.ConstructInPlace(future.ExtractValueSync()); + if (!results->IsSuccess()) { + UNIT_ASSERT_C(results->GetStatus() == NYdb::EStatus::BAD_REQUEST, results->GetStatus() << ": " << results->GetIssues().ToString()); + UNIT_ASSERT_STRING_CONTAINS(results->GetIssues().ToOneLineString(), "Results are not ready"); + } + } while (!results->HasResultSet()); + TResultSetParser resultSet(results->ExtractResultSet()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2); + UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2); + + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo"); + + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world"); + } + + } } // namespace NKqp diff --git a/ydb/library/yql/core/yql_data_provider.h b/ydb/library/yql/core/yql_data_provider.h index ac606ffcb57..8919fc17f5c 100644 --- a/ydb/library/yql/core/yql_data_provider.h +++ b/ydb/library/yql/core/yql_data_provider.h @@ -104,6 +104,7 @@ public: virtual IGraphTransformer& GetConfigurationTransformer() = 0; virtual TExprNode::TPtr GetClusterInfo(const TString& cluster, TExprContext& ctx) = 0; virtual const THashMap<TString, TString>* GetClusterTokens() = 0; + virtual void AddCluster(const TString& name, const THashMap<TString, TString>& properties) = 0; //-- discovery & rewrite virtual IGraphTransformer& GetIODiscoveryTransformer() = 0; diff --git a/ydb/library/yql/providers/common/provider/yql_data_provider_impl.cpp b/ydb/library/yql/providers/common/provider/yql_data_provider_impl.cpp index 279d11ad515..f43c44a7cc9 100644 --- a/ydb/library/yql/providers/common/provider/yql_data_provider_impl.cpp +++ b/ydb/library/yql/providers/common/provider/yql_data_provider_impl.cpp @@ -88,6 +88,10 @@ TExprNode::TPtr TDataProviderBase::GetClusterInfo(const TString& cluster, TExprC return {}; } +void TDataProviderBase::AddCluster(const TString& name, const THashMap<TString, TString>& properties) { + Y_UNUSED(name, properties); +} + const THashMap<TString, TString>* TDataProviderBase::GetClusterTokens() { return nullptr; } diff --git a/ydb/library/yql/providers/common/provider/yql_data_provider_impl.h b/ydb/library/yql/providers/common/provider/yql_data_provider_impl.h index afdaa465099..e661c757915 100644 --- a/ydb/library/yql/providers/common/provider/yql_data_provider_impl.h +++ b/ydb/library/yql/providers/common/provider/yql_data_provider_impl.h @@ -44,6 +44,7 @@ public: bool Initialize(TExprContext& ctx) override; IGraphTransformer& GetConfigurationTransformer() override; TExprNode::TPtr GetClusterInfo(const TString& cluster, TExprContext& ctx) override; + void AddCluster(const TString& name, const THashMap<TString, TString>& properties) override; const THashMap<TString, TString>* GetClusterTokens() override; IGraphTransformer& GetIODiscoveryTransformer() override; IGraphTransformer& GetEpochsTransformer() override; diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource.cpp index 88bdb04cb9a..7d5672179b2 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource.cpp @@ -1,8 +1,9 @@ #include "yql_s3_provider_impl.h" #include "yql_s3_dq_integration.h" -#include <ydb/library/yql/providers/common/config/yql_setting.h> #include <ydb/library/yql/providers/common/config/yql_configuration_transformer.h> +#include <ydb/library/yql/providers/common/config/yql_setting.h> +#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h> #include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h> #include <ydb/library/yql/providers/common/provider/yql_provider.h> #include <ydb/library/yql/providers/common/provider/yql_provider_names.h> @@ -28,6 +29,12 @@ public: , DqIntegration_(CreateS3DqIntegration(State_)) {} + void AddCluster(const TString& name, const THashMap<TString, TString>& properties) override { + auto& settings = State_->Configuration->Clusters[name]; + settings.Url = properties.Value("location", ""); + State_->Configuration->Tokens[name] = ComposeStructuredTokenJsonForServiceAccount(properties.Value("serviceAccountId", ""), properties.Value("serviceAccountIdSignature", ""), properties.Value("authToken", "")); + } + TStringBuf GetName() const override { return S3ProviderName; } |