aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhcpp <hcpp@ydb.tech>2023-04-13 12:35:28 +0300
committerhcpp <hcpp@ydb.tech>2023-04-13 12:35:28 +0300
commitcbde992ac1b298645d76d74f243af426cb74f5dc (patch)
treec7774c70c284794f062364770879932f91e17521
parent3316c0db7951eddadcca1eb0e738209b1ec5a987 (diff)
downloadydb-cbde992ac1b298645d76d74f243af426cb74f5dc.tar.gz
dynamic resolve has been added for external table
-rw-r--r--ydb/core/external_sources/external_source.h22
-rw-r--r--ydb/core/external_sources/object_storage.cpp36
-rw-r--r--ydb/core/fq/libs/result_formatter/result_formatter.cpp41
-rw-r--r--ydb/core/fq/libs/result_formatter/result_formatter.h6
-rw-r--r--ydb/core/kqp/gateway/kqp_metadata_loader.cpp268
-rw-r--r--ydb/core/kqp/host/kqp_host.cpp4
-rw-r--r--ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp1
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp1
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_datasource.cpp137
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_gateway.h15
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp34
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_provider.h4
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_provider_impl.h5
-rw-r--r--ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp73
-rw-r--r--ydb/library/yql/core/yql_data_provider.h1
-rw-r--r--ydb/library/yql/providers/common/provider/yql_data_provider_impl.cpp4
-rw-r--r--ydb/library/yql/providers/common/provider/yql_data_provider_impl.h1
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasource.cpp9
18 files changed, 575 insertions, 87 deletions
diff --git a/ydb/core/external_sources/external_source.h b/ydb/core/external_sources/external_source.h
index 9b5f1afe400..763fdea0ba0 100644
--- a/ydb/core/external_sources/external_source.h
+++ b/ydb/core/external_sources/external_source.h
@@ -12,8 +12,30 @@ struct TExternalSourceException: public yexception {
struct IExternalSource : public TThrRefBase {
using TPtr = TIntrusivePtr<IExternalSource>;
+ /*
+ Packs TSchema, TGeneral into some string in arbitrary
+ format: proto, json, text, and others. The output returns a
+ string called content. Further, this string will be stored inside.
+ After that, it is passed to the GetParamters method.
+ Can throw an exception in case of an error.
+ */
virtual TString Pack(const NKikimrExternalSources::TSchema& schema,
const NKikimrExternalSources::TGeneral& general) const = 0;
+
+ /*
+ The name of the data source that is used inside the
+ implementation during the read phase. Must match provider name.
+ */
+ virtual TString GetName() const = 0;
+
+ /*
+ At the input, a string with the name of the content is passed,
+ which is obtained from the Pack method and returns a list of
+ parameters that will be put in the AST of the source. Also,
+ this data will be displayed in the viewer.
+ Can throw an exception in case of an error
+ */
+ virtual TMap<TString, TString> GetParamters(const TString& content) const = 0;
};
}
diff --git a/ydb/core/external_sources/object_storage.cpp b/ydb/core/external_sources/object_storage.cpp
index d5da28ab48d..4277114392d 100644
--- a/ydb/core/external_sources/object_storage.cpp
+++ b/ydb/core/external_sources/object_storage.cpp
@@ -1,6 +1,7 @@
#include "external_source.h"
#include <ydb/core/protos/external_sources.pb.h>
+#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
#include <ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h>
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
#include <ydb/public/sdk/cpp/client/ydb_value/value.h>
@@ -45,6 +46,41 @@ struct TObjectStorageExternalSource : public IExternalSource {
return objectStorage.SerializeAsString();
}
+ virtual TString GetName() const override {
+ return TString{NYql::S3ProviderName};
+ }
+
+ virtual TMap<TString, TString> GetParamters(const TString& content) const override {
+ NKikimrExternalSources::TObjectStorage objectStorage;
+ objectStorage.ParseFromStringOrThrow(content);
+
+ TMap<TString, TString> parameters{objectStorage.format_setting().begin(), objectStorage.format_setting().end()};
+ if (objectStorage.format()) {
+ parameters["format"] = objectStorage.format();
+ }
+
+ if (objectStorage.compression()) {
+ parameters["compression"] = objectStorage.compression();
+ }
+
+ NSc::TValue projection;
+ for (const auto& [key, value]: objectStorage.projection()) {
+ projection[key] = value;
+ }
+
+ if (!projection.DictEmpty()) {
+ parameters["projection"] = projection.ToJson();
+ }
+
+ NSc::TValue partitionedBy;
+ partitionedBy.AppendAll(objectStorage.partitioned_by());
+ if (!partitionedBy.ArrayEmpty()) {
+ parameters["partitioned_by"] = partitionedBy.ToJson();
+ }
+
+ return parameters;
+ }
+
private:
static NYql::TIssues Validate(const NKikimrExternalSources::TSchema& schema, const NKikimrExternalSources::TObjectStorage& objectStorage) {
NYql::TIssues issues;
diff --git a/ydb/core/fq/libs/result_formatter/result_formatter.cpp b/ydb/core/fq/libs/result_formatter/result_formatter.cpp
index 5cca0992cb8..8941dcb8860 100644
--- a/ydb/core/fq/libs/result_formatter/result_formatter.cpp
+++ b/ydb/core/fq/libs/result_formatter/result_formatter.cpp
@@ -79,24 +79,6 @@ NKikimr::NMiniKQL::TType* MakeListType(NKikimr::NMiniKQL::TType* underlying, NKi
return TListType::Create(underlying, env);
}
-const NYql::TTypeAnnotationNode* MakeStructType(
- const TVector<std::pair<TString, const NYql::TTypeAnnotationNode*>>& i,
- NYql::TExprContext& ctx)
-{
- TVector<const NYql::TItemExprType*> items;
- items.reserve(i.size());
- for (const auto& [k, v] : i) {
- items.push_back(ctx.MakeType<NYql::TItemExprType>(k, v));
- }
- return ctx.MakeType<NYql::TStructExprType>(items);
-}
-
-NKikimr::NMiniKQL::TType* MakeStructType(
- const TVector<std::pair<TString, NKikimr::NMiniKQL::TType*>>& items,
- NKikimr::NMiniKQL::TTypeEnvironment& env)
-{
- return TStructType::Create(items.data(), items.size(), env);
-}
const NYql::TTypeAnnotationNode* MakeTupleType(
const TVector<const NYql::TTypeAnnotationNode*>& items,
@@ -455,4 +437,27 @@ void FormatResultSet(NJson::TJsonValue& root, const NYdb::TResultSet& resultSet,
}
}
+const NYql::TTypeAnnotationNode* MakeStructType(
+ const TVector<std::pair<TString, const NYql::TTypeAnnotationNode*>>& i,
+ NYql::TExprContext& ctx)
+{
+ TVector<const NYql::TItemExprType*> items;
+ items.reserve(i.size());
+ for (const auto& [k, v] : i) {
+ items.push_back(ctx.MakeType<NYql::TItemExprType>(k, v));
+ }
+ return ctx.MakeType<NYql::TStructExprType>(items);
+}
+
+NKikimr::NMiniKQL::TType* MakeStructType(
+ const TVector<std::pair<TString, NKikimr::NMiniKQL::TType*>>& items,
+ NKikimr::NMiniKQL::TTypeEnvironment& env)
+{
+ return TStructType::Create(items.data(), items.size(), env);
+}
+
+const NYql::TTypeAnnotationNode* MakeType(NYdb::TTypeParser& parser, NYql::TExprContext& ctx) {
+ return MakeType<const NYql::TTypeAnnotationNode*, NYql::TExprContext>(parser, ctx);
+}
+
} // namespace NFq
diff --git a/ydb/core/fq/libs/result_formatter/result_formatter.h b/ydb/core/fq/libs/result_formatter/result_formatter.h
index c46a44344a6..13bcd6c6754 100644
--- a/ydb/core/fq/libs/result_formatter/result_formatter.h
+++ b/ydb/core/fq/libs/result_formatter/result_formatter.h
@@ -8,9 +8,15 @@
#include <library/cpp/json/json_writer.h>
+#include <ydb/library/yql/ast/yql_expr.h>
+#include <ydb/library/yql/minikql/mkql_node.h>
+
namespace NFq {
void FormatResultSet(NJson::TJsonValue& root, const NYdb::TResultSet& resultSet, bool typeNameAsString = false, bool prettyValueFormat = false);
TString FormatSchema(const FederatedQuery::Schema& schema);
+const NYql::TTypeAnnotationNode* MakeStructType(const TVector<std::pair<TString, const NYql::TTypeAnnotationNode*>>& i, NYql::TExprContext& ctx);
+NKikimr::NMiniKQL::TType* MakeStructType(const TVector<std::pair<TString, NKikimr::NMiniKQL::TType*>>& items, NKikimr::NMiniKQL::TTypeEnvironment& env);
+const NYql::TTypeAnnotationNode* MakeType(NYdb::TTypeParser& parser, NYql::TExprContext& ctx);
} // namespace NFq
diff --git a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp
index 111e7250901..a11485f5e51 100644
--- a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp
+++ b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp
@@ -43,6 +43,19 @@ std::pair<TNavigate::TEntry, TString> CreateNavigateEntry(const std::pair<TIndex
return {entry, pair.second};
}
+std::optional<std::pair<TNavigate::TEntry, TString>> CreateNavigateExternalEntry(const TString& path) {
+ TNavigate::TEntry entry;
+ entry.Path = SplitPath(path);
+ entry.Operation = NSchemeCache::TSchemeCacheNavigate::EOp::OpUnknown;
+ entry.SyncVersion = true;
+ return {{entry, path}};
+}
+
+std::optional<std::pair<TNavigate::TEntry, TString>> CreateNavigateExternalEntry(const std::pair<TIndexId, TString>& pair) {
+ Y_UNUSED(pair);
+ return {};
+}
+
ui64 GetExpectedVersion(const std::pair<TIndexId, TString>& pathId) {
return pathId.first.SchemaVersion;
}
@@ -80,37 +93,22 @@ void IndexProtoToMetadata(const TIndexProto& indexes, NYql::TKikimrTableMetadata
}
}
+TString GetTypeName(const NScheme::TTypeInfoMod& typeInfoMod, bool notNull) {
+ TString typeName;
+ if (typeInfoMod.TypeInfo.GetTypeId() != NScheme::NTypeIds::Pg) {
+ YQL_ENSURE(NScheme::TryGetTypeName(typeInfoMod.TypeInfo.GetTypeId(), typeName));
+ } else {
+ YQL_ENSURE(typeInfoMod.TypeInfo.GetTypeDesc(), "no pg type descriptor");
+ YQL_ENSURE(!notNull, "pg not null types are not allowed");
+ typeName = NPg::PgTypeNameFromTypeDesc(typeInfoMod.TypeInfo.GetTypeDesc(), typeInfoMod.TypeMod);
+ }
+ return typeName;
+}
-TTableMetadataResult GetLoadTableMetadataResult(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry,
+TTableMetadataResult GetTableMetadataResult(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry,
const TString& cluster, const TString& tableName) {
- using TResult = NYql::IKikimrGateway::TTableMetadataResult;
- using EStatus = NSchemeCache::TSchemeCacheNavigate::EStatus;
using EKind = NSchemeCache::TSchemeCacheNavigate::EKind;
- auto message = ToString(entry.Status);
-
- switch (entry.Status) {
- case EStatus::Ok:
- break;
- case EStatus::PathErrorUnknown:
- case EStatus::RootUnknown: {
- TTableMetadataResult result;
- result.SetSuccess();
- result.Metadata = new NYql::TKikimrTableMetadata(cluster, tableName);
- return result;
- }
- case EStatus::PathNotTable:
- case EStatus::TableCreationNotComplete:
- return ResultFromError<TResult>(YqlIssue({}, TIssuesIds::KIKIMR_SCHEME_ERROR, message));
- case EStatus::LookupError:
- case EStatus::RedirectLookupError:
- return ResultFromError<TResult>(YqlIssue({}, TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, message));
- default:
- return ResultFromError<TResult>(ToString(entry.Status));
- }
-
- YQL_ENSURE(entry.Kind == EKind::KindTable || entry.Kind == EKind::KindColumnTable);
-
TTableMetadataResult result;
result.SetSuccess();
result.Metadata = new NYql::TKikimrTableMetadata(cluster, tableName);
@@ -149,15 +147,8 @@ TTableMetadataResult GetLoadTableMetadataResult(const NSchemeCache::TSchemeCache
std::map<ui32, TString, std::less<ui32>> columnOrder;
for (auto& pair : entry.Columns) {
const auto& columnDesc = pair.second;
- TString typeName;
auto notNull = entry.NotNullColumns.contains(columnDesc.Name);
- if (columnDesc.PType.GetTypeId() != NScheme::NTypeIds::Pg) {
- YQL_ENSURE(NScheme::TryGetTypeName(columnDesc.PType.GetTypeId(), typeName));
- } else {
- Y_VERIFY(columnDesc.PType.GetTypeDesc(), "no pg type descriptor");
- Y_VERIFY(!notNull, "pg not null types are not allowed");
- typeName = NPg::PgTypeNameFromTypeDesc(columnDesc.PType.GetTypeDesc(), columnDesc.PTypeMod);
- }
+ const TString typeName = GetTypeName(NScheme::TTypeInfoMod{columnDesc.PType, columnDesc.PTypeMod}, notNull);
tableMeta->Columns.emplace(
columnDesc.Name,
NYql::TKikimrColumnMetadata(
@@ -185,6 +176,125 @@ TTableMetadataResult GetLoadTableMetadataResult(const NSchemeCache::TSchemeCache
return result;
}
+TTableMetadataResult GetExternalTableMetadataResult(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry,
+ const TString& cluster, const TString& tableName) {
+ const auto& description = entry.ExternalTableInfo->Description;
+ TTableMetadataResult result;
+ result.SetSuccess();
+ result.Metadata = new NYql::TKikimrTableMetadata(cluster, tableName);
+ auto tableMeta = result.Metadata;
+ tableMeta->DoesExist = true;
+ tableMeta->PathId = NYql::TKikimrPathId(description.GetPathId().GetOwnerId(), description.GetPathId().GetLocalId());
+ tableMeta->SchemaVersion = description.GetVersion();
+ tableMeta->Kind = NYql::EKikimrTableKind::External;
+
+ tableMeta->Attributes = entry.Attributes;
+
+ for (auto& columnDesc : description.GetColumns()) {
+ const auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(columnDesc.GetTypeId(),
+ columnDesc.HasTypeInfo() ? &columnDesc.GetTypeInfo() : nullptr);
+ const TString typeName = GetTypeName(typeInfoMod, columnDesc.GetNotNull());
+ tableMeta->Columns.emplace(
+ columnDesc.GetName(),
+ NYql::TKikimrColumnMetadata(
+ columnDesc.GetName(), columnDesc.GetId(), typeName, columnDesc.GetNotNull(), typeInfoMod.TypeInfo, typeInfoMod.TypeMod
+ )
+ );
+ }
+
+ tableMeta->ExternalSource.Type = description.GetSourceType();
+ tableMeta->ExternalSource.TableLocation = description.GetLocation();
+ tableMeta->ExternalSource.TableContent = description.GetContent();
+ tableMeta->ExternalSource.DataSourcePath = description.GetDataSourcePath();
+ return result;
+}
+
+TTableMetadataResult GetExternalDataSourceMetadataResult(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry,
+ const TString& cluster, const TString& tableName) {
+ const auto& description = entry.ExternalDataSourceInfo->Description;
+ TTableMetadataResult result;
+ result.SetSuccess();
+ result.Metadata = new NYql::TKikimrTableMetadata(cluster, tableName);
+ auto tableMeta = result.Metadata;
+ tableMeta->DoesExist = true;
+ tableMeta->PathId = NYql::TKikimrPathId(description.GetPathId().GetOwnerId(), description.GetPathId().GetLocalId());
+ tableMeta->SchemaVersion = description.GetVersion();
+ tableMeta->Kind = NYql::EKikimrTableKind::External;
+
+ tableMeta->Attributes = entry.Attributes;
+
+ tableMeta->ExternalSource.Type = description.GetSourceType();
+ tableMeta->ExternalSource.DataSourceLocation = description.GetLocation();
+ tableMeta->ExternalSource.DataSourceInstallation = description.GetInstallation();
+ tableMeta->ExternalSource.DataSourceAuth = description.GetAuth();
+ return result;
+}
+
+TTableMetadataResult GetLoadTableMetadataResult(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry,
+ const TString& cluster, const TString& tableName) {
+ using TResult = NYql::IKikimrGateway::TTableMetadataResult;
+ using EStatus = NSchemeCache::TSchemeCacheNavigate::EStatus;
+ using EKind = NSchemeCache::TSchemeCacheNavigate::EKind;
+
+ auto message = ToString(entry.Status);
+
+ switch (entry.Status) {
+ case EStatus::Ok:
+ break;
+ case EStatus::PathErrorUnknown:
+ case EStatus::RootUnknown: {
+ TTableMetadataResult result;
+ result.SetSuccess();
+ result.Metadata = new NYql::TKikimrTableMetadata(cluster, tableName);
+ return result;
+ }
+ case EStatus::PathNotTable:
+ case EStatus::TableCreationNotComplete:
+ return ResultFromError<TResult>(YqlIssue({}, TIssuesIds::KIKIMR_SCHEME_ERROR, message));
+ case EStatus::LookupError:
+ case EStatus::RedirectLookupError:
+ return ResultFromError<TResult>(YqlIssue({}, TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, message));
+ default:
+ return ResultFromError<TResult>(ToString(entry.Status));
+ }
+
+ YQL_ENSURE(entry.Kind == EKind::KindTable || entry.Kind == EKind::KindColumnTable || entry.Kind == EKind::KindExternalTable || entry.Kind == EKind::KindExternalDataSource);
+
+ TTableMetadataResult result;
+ switch (entry.Kind) {
+ case EKind::KindExternalTable:
+ result = GetExternalTableMetadataResult(entry, cluster, tableName);
+ break;
+ case EKind::KindExternalDataSource:
+ result = GetExternalDataSourceMetadataResult(entry, cluster, tableName);
+ break;
+ default:
+ result = GetTableMetadataResult(entry, cluster, tableName);
+ }
+ return result;
+}
+
+
+TTableMetadataResult EnrichExternalTable(const TTableMetadataResult& externalTable, const TTableMetadataResult& externalDataSource) {
+ TTableMetadataResult result;
+ if (!externalTable.Success()) {
+ result.AddIssues(externalTable.Issues());
+ return result;
+ }
+ if (!externalDataSource.Success()) {
+ result.AddIssues(externalDataSource.Issues());
+ return result;
+ }
+
+ result.SetSuccess();
+ result.Metadata = externalTable.Metadata;
+ auto tableMeta = result.Metadata;
+ tableMeta->ExternalSource.DataSourceLocation = externalDataSource.Metadata->ExternalSource.DataSourceLocation;
+ tableMeta->ExternalSource.DataSourceInstallation = externalDataSource.Metadata->ExternalSource.DataSourceInstallation;
+ tableMeta->ExternalSource.DataSourceAuth = externalDataSource.Metadata->ExternalSource.DataSourceAuth;
+ return result;
+}
+
TString GetDebugString(const TString& id) {
return TStringBuilder() << " Path: " << id;
}
@@ -235,6 +345,10 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
return MakeFuture(result);
}
+ if (result.Metadata->Kind == NYql::EKikimrTableKind::External) {
+ return MakeFuture(result);
+ }
+
auto locked = ptr.lock();
if (!locked) {
result.SetStatus(TIssuesIds::KIKIMR_INDEX_METADATA_LOAD_FAILED);
@@ -374,6 +488,19 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadIndexMeta
}
}
+NSchemeCache::TSchemeCacheNavigate::TEntry& InferEntry(NKikimr::NSchemeCache::TSchemeCacheNavigate::TResultSet& resultSet) {
+ using EStatus = NSchemeCache::TSchemeCacheNavigate::EStatus;
+ using EKind = NSchemeCache::TSchemeCacheNavigate::EKind;
+
+ if (resultSet.size() != 2 || resultSet[1].Status != EStatus::Ok) {
+ return resultSet[0];
+ }
+
+ return IsIn({EKind::KindExternalDataSource, EKind::KindExternalTable}, resultSet[1].Kind)
+ ? resultSet[1]
+ : resultSet[0];
+}
+
// The type is TString or std::pair<TIndexId, TString>
template<typename TPath>
NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMetadataCache(
@@ -388,12 +515,16 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
using EKind = NSchemeCache::TSchemeCacheNavigate::EKind;
const auto entry = CreateNavigateEntry(id, settings);
+ const auto externalEntry = CreateNavigateExternalEntry(id);
const ui64 expectedSchemaVersion = GetExpectedVersion(id);
LOG_DEBUG_S(*ActorSystem, NKikimrServices::KQP_GATEWAY, "Load table metadata from cache by path, request" << GetDebugString(id));
auto navigate = MakeHolder<TNavigate>();
navigate->ResultSet.emplace_back(entry.first);
+ if (externalEntry) {
+ navigate->ResultSet.emplace_back(externalEntry->first);
+ }
const TString& table = entry.second;
navigate->DatabaseName = database;
@@ -416,10 +547,15 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
YQL_ENSURE(response.Request);
auto& navigate = *response.Request;
- YQL_ENSURE(navigate.ResultSet.size() == 1);
- auto& entry = navigate.ResultSet[0];
+ YQL_ENSURE(1 <= navigate.ResultSet.size() && navigate.ResultSet.size() <= 2);
+ auto& entry = InferEntry(navigate.ResultSet);
+
+ if (entry.Status != EStatus::Ok) {
+ promise.SetValue(GetLoadTableMetadataResult(entry, cluster, table));
+ return;
+ }
- if (entry.Status == EStatus::Ok && expectedSchemaVersion && entry.TableId.SchemaVersion) {
+ if (!IsIn({EKind::KindExternalDataSource, EKind::KindExternalTable}, entry.Kind) && expectedSchemaVersion && entry.TableId.SchemaVersion) {
if (entry.TableId.SchemaVersion != expectedSchemaVersion) {
const auto message = TStringBuilder()
<< "schema version mismatch during metadata loading for: "
@@ -433,22 +569,46 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
}
}
- if (entry.Status == EStatus::Ok && entry.Kind == EKind::KindIndex) {
- Y_ENSURE(entry.ListNodeEntry, "expected children list");
- Y_ENSURE(entry.ListNodeEntry->Children.size() == 1, "expected one child");
-
- TIndexId pathId = TIndexId(
- entry.ListNodeEntry->Children[0].PathId,
- entry.ListNodeEntry->Children[0].SchemaVersion
- );
-
- LoadTableMetadataCache(cluster, std::make_pair(pathId, table), settings, database, userToken)
- .Apply([promise](const TFuture<TTableMetadataResult>& result) mutable
- {
- promise.SetValue(result.GetValue());
- });
- } else {
- promise.SetValue(GetLoadTableMetadataResult(entry, cluster, table));
+ switch (entry.Kind) {
+ case EKind::KindExternalDataSource: {
+ promise.SetValue(GetLoadTableMetadataResult(entry, cluster, table));
+ }
+ break;
+ case EKind::KindExternalTable: {
+ YQL_ENSURE(entry.ExternalTableInfo, "expected external table info");
+ const auto& dataSourcePath = entry.ExternalTableInfo->Description.GetDataSourcePath();
+ auto externalTableMetadata = GetLoadTableMetadataResult(entry, cluster, table);
+ if (!externalTableMetadata.Success()) {
+ promise.SetValue(externalTableMetadata);
+ return;
+ }
+ LoadTableMetadataCache(cluster, dataSourcePath, settings, database, userToken)
+ .Apply([promise, externalTableMetadata](const TFuture<TTableMetadataResult>& result) mutable
+ {
+ auto externalDataSourceMetadata = result.GetValue();
+ promise.SetValue(EnrichExternalTable(externalTableMetadata, externalDataSourceMetadata));
+ });
+ }
+ break;
+ case EKind::KindIndex: {
+ Y_ENSURE(entry.ListNodeEntry, "expected children list");
+ Y_ENSURE(entry.ListNodeEntry->Children.size() == 1, "expected one child");
+
+ TIndexId pathId = TIndexId(
+ entry.ListNodeEntry->Children[0].PathId,
+ entry.ListNodeEntry->Children[0].SchemaVersion
+ );
+
+ LoadTableMetadataCache(cluster, std::make_pair(pathId, table), settings, database, userToken)
+ .Apply([promise](const TFuture<TTableMetadataResult>& result) mutable
+ {
+ promise.SetValue(result.GetValue());
+ });
+ }
+ break;
+ default: {
+ promise.SetValue(GetLoadTableMetadataResult(entry, cluster, table));
+ }
}
}
catch (yexception& e) {
diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp
index 32858840b19..a2de3e62de9 100644
--- a/ydb/core/kqp/host/kqp_host.cpp
+++ b/ydb/core/kqp/host/kqp_host.cpp
@@ -1,6 +1,7 @@
#include "kqp_host_impl.h"
#include <ydb/core/base/appdata.h>
+#include <ydb/core/external_sources/external_source_factory.h>
#include <ydb/core/kqp/common/kqp_yql.h>
#include <ydb/core/kqp/opt/kqp_query_plan.h>
#include <ydb/core/kqp/provider/yql_kikimr_provider_impl.h>
@@ -1477,7 +1478,7 @@ private:
// Kikimr provider
auto queryExecutor = MakeIntrusive<TKqpQueryExecutor>(Gateway, Cluster, SessionCtx, KqpRunner);
- auto kikimrDataSource = CreateKikimrDataSource(*FuncRegistry, *TypesCtx, Gateway, SessionCtx);
+ auto kikimrDataSource = CreateKikimrDataSource(*FuncRegistry, *TypesCtx, Gateway, SessionCtx, ExternalSourceFactory);
auto kikimrDataSink = CreateKikimrDataSink(*FuncRegistry, *TypesCtx, Gateway, SessionCtx, queryExecutor);
FillSettings.AllResultsBytesLimit = Nothing();
@@ -1600,6 +1601,7 @@ private:
TIntrusivePtr<TExecuteContext> ExecuteCtx;
TIntrusivePtr<IKqpRunner> KqpRunner;
+ NExternalSource::IExternalSourceFactory::TPtr ExternalSourceFactory{NExternalSource::CreateExternalSourceFactory()};
};
} // namespace
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp
index a963a8bdcb5..8e745b30c66 100644
--- a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp
+++ b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp
@@ -30,6 +30,7 @@ TMaybeNode<TExprBase> TryBuildTrivialReadTable(TCoFlatMap& flatmap, TKqlReadTabl
case EKikimrTableKind::SysView:
break;
case EKikimrTableKind::Olap:
+ case EKikimrTableKind::External:
case EKikimrTableKind::Unspecified:
return {};
}
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
index 107cc3d410b..22cd6719023 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
@@ -285,7 +285,6 @@ TExprBase KqpBuildReadTableStage(TExprBase node, TExprContext& ctx, const TKqpOp
.Settings(read.Settings())
.Done();
break;
-
default:
YQL_ENSURE(false, "Unexpected table kind: " << (ui32)tableDesc.Metadata->Kind);
break;
diff --git a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp
index 43028606cb9..87d46207c7d 100644
--- a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp
@@ -5,8 +5,14 @@
#include <ydb/library/yql/core/yql_expr_optimize.h>
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
+#include <ydb/library/yql/providers/common/schema/expr/yql_expr_schema.h>
#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h>
+#include <ydb/core/external_sources/external_source_factory.h>
+#include <ydb/core/fq/libs/result_formatter/result_formatter.h>
+
+#include <ydb/public/sdk/cpp/client/ydb_value/value.h>
+
#include <util/generic/is_in.h>
namespace NYql {
@@ -91,9 +97,13 @@ class TKiSourceLoadTableMetadataTransformer : public TGraphTransformerBase {
public:
TKiSourceLoadTableMetadataTransformer(
TIntrusivePtr<IKikimrGateway> gateway,
- TIntrusivePtr<TKikimrSessionContext> sessionCtx)
+ TIntrusivePtr<TKikimrSessionContext> sessionCtx,
+ TTypeAnnotationContext& types,
+ const NExternalSource::IExternalSourceFactory::TPtr& externalSourceFactory)
: Gateway(gateway)
- , SessionCtx(sessionCtx) {}
+ , SessionCtx(sessionCtx)
+ , Types(types)
+ , ExternalSourceFactory(externalSourceFactory) {}
TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
output = input;
@@ -161,6 +171,31 @@ public:
return AsyncFuture;
}
+ bool AddCluster(const std::pair<TString, TString>& table, IKikimrGateway::TTableMetadataResult& res, TExprNode::TPtr input, TExprContext& ctx) {
+ const auto& metadata = *res.Metadata;
+ if (metadata.Kind != EKikimrTableKind::External) {
+ return true;
+ }
+ auto source = ExternalSourceFactory->GetOrCreate(metadata.ExternalSource.Type);
+ auto it = Types.DataSourceMap.find(source->GetName());
+ if (it == Types.DataSourceMap.end()) {
+ TIssueScopeGuard issueScope(ctx.IssueManager, [input, &table, &ctx]() {
+ return MakeIntrusive<TIssue>(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder()
+ << "Failed to load metadata for table (data source doesn't exist): "
+ << NCommon::FullTableName(table.first, table.second)));
+ });
+
+ res.ReportIssues(ctx.IssueManager);
+ LoadResults.clear();
+ return false;
+ }
+ it->second->AddCluster(metadata.ExternalSource.DataSourcePath, {{
+ {"location", metadata.ExternalSource.DataSourceLocation },
+ {"installation", metadata.ExternalSource.DataSourceInstallation }
+ }});
+ return true;
+ }
+
TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
output = input;
YQL_ENSURE(AsyncFuture.HasValue());
@@ -189,6 +224,10 @@ public:
LoadResults.clear();
return TStatus::Error;
}
+
+ if (!AddCluster(table, res, input, ctx)) {
+ return TStatus::Error;
+ }
} else {
TIssueScopeGuard issueScope(ctx.IssueManager, [input, &table, &ctx]() {
return MakeIntrusive<TIssue>(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder()
@@ -214,6 +253,8 @@ public:
private:
TIntrusivePtr<IKikimrGateway> Gateway;
TIntrusivePtr<TKikimrSessionContext> SessionCtx;
+ TTypeAnnotationContext& Types;
+ NExternalSource::IExternalSourceFactory::TPtr ExternalSourceFactory;
THashMap<std::pair<TString, TString>, std::shared_ptr<IKikimrGateway::TTableMetadataResult>> LoadResults;
NThreading::TFuture<void> AsyncFuture;
@@ -287,14 +328,16 @@ public:
const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry,
TTypeAnnotationContext& types,
TIntrusivePtr<IKikimrGateway> gateway,
- TIntrusivePtr<TKikimrSessionContext> sessionCtx)
+ TIntrusivePtr<TKikimrSessionContext> sessionCtx,
+ const NExternalSource::IExternalSourceFactory::TPtr& externalSourceFactory)
: FunctionRegistry(functionRegistry)
, Types(types)
, Gateway(gateway)
, SessionCtx(sessionCtx)
+ , ExternalSourceFactory(externalSourceFactory)
, ConfigurationTransformer(new TKikimrConfigurationTransformer(sessionCtx, types))
, IntentDeterminationTransformer(new TKiSourceIntentDeterminationTransformer(sessionCtx))
- , LoadTableMetadataTransformer(CreateKiSourceLoadTableMetadataTransformer(gateway, sessionCtx))
+ , LoadTableMetadataTransformer(CreateKiSourceLoadTableMetadataTransformer(gateway, sessionCtx, types, externalSourceFactory))
, TypeAnnotationTransformer(CreateKiSourceTypeAnnotationTransformer(sessionCtx, types))
, CallableExecutionTransformer(CreateKiSourceCallableExecutionTransformer(gateway, sessionCtx))
@@ -479,12 +522,54 @@ public:
return false;
}
+ static Ydb::Type CreateYdbType(const NScheme::TTypeInfo& typeInfo, bool notNull) {
+ Ydb::Type ydbType;
+ if (typeInfo.GetTypeId() == NScheme::NTypeIds::Pg) {
+ auto* typeDesc = typeInfo.GetTypeDesc();
+ auto* pg = ydbType.mutable_pg_type();
+ pg->set_type_name(NPg::PgTypeNameFromTypeDesc(typeDesc));
+ pg->set_oid(NPg::PgTypeIdFromTypeDesc(typeDesc));
+ } else {
+ auto& item = notNull
+ ? ydbType
+ : *ydbType.mutable_optional_type()->mutable_item();
+ item.set_type_id((Ydb::Type::PrimitiveTypeId)typeInfo.GetTypeId());
+ }
+ return ydbType;
+ }
+
+ TExprNode::TPtr BuildSettings(TPositionHandle pos, TExprContext& ctx, const TMap<TString, NYql::TKikimrColumnMetadata>& columns, const NExternalSource::IExternalSource::TPtr& source, const TString& content) {
+ TVector<std::pair<TString, const NYql::TTypeAnnotationNode*>> typedColumns;
+ typedColumns.reserve(columns.size());
+ for (const auto& [n, c] : columns) {
+ NYdb::TTypeParser parser(NYdb::TType(CreateYdbType(c.TypeInfo, c.NotNull)));
+ auto type = NFq::MakeType(parser, ctx);
+ typedColumns.emplace_back(n, type);
+ }
+
+ const TString ysonSchema = NYql::NCommon::WriteTypeToYson(NFq::MakeStructType(typedColumns, ctx), NYson::EYsonFormat::Text);
+ TExprNode::TListType items;
+ auto schema = ctx.NewAtom(pos, ysonSchema);
+ auto type = ctx.NewCallable(pos, "SqlTypeFromYson"sv, { schema });
+ auto order = ctx.NewCallable(pos, "SqlColumnOrderFromYson"sv, { schema });
+ auto userSchema = ctx.NewAtom(pos, "userschema"sv);
+ items.emplace_back(ctx.NewList(pos, {userSchema, type, order}));
+
+ for (const auto& [key, value]: source->GetParamters(content)) {
+ auto keyAtom = ctx.NewAtom(pos, key);
+ auto valueAtom = ctx.NewAtom(pos, value);
+ items.emplace_back(ctx.NewList(pos, {keyAtom, valueAtom}));
+ }
+ return ctx.NewList(pos, std::move(items));
+ }
+
TExprNode::TPtr RewriteIO(const TExprNode::TPtr& node, TExprContext& ctx) override {
auto read = node->Child(0);
if (!read->IsCallable(ReadName)) {
ythrow yexception() << "Expected Read!";
}
+ TKiDataSource source(read->ChildPtr(1));
TKikimrKey key(ctx);
if (!key.Extract(*read->Child(2))) {
return nullptr;
@@ -505,6 +590,38 @@ public:
YQL_ENSURE(false, "Unsupported Kikimr KeyType.");
}
+ auto& tableDesc = SessionCtx->Tables().GetTable(TString{source.Cluster()}, key.GetTablePath());
+ if (key.GetKeyType() == TKikimrKey::Type::Table && tableDesc.Metadata->Kind == EKikimrTableKind::External) {
+ const auto& source = ExternalSourceFactory->GetOrCreate(tableDesc.Metadata->ExternalSource.Type);
+ ctx.Step.Repeat(TExprStep::DiscoveryIO)
+ .Repeat(TExprStep::Epochs)
+ .Repeat(TExprStep::Intents)
+ .Repeat(TExprStep::LoadTablesMetadata)
+ .Repeat(TExprStep::RewriteIO);
+ TExprNode::TPtr path = ctx.NewCallable(node->Pos(), "String", { ctx.NewAtom(node->Pos(), tableDesc.Metadata->ExternalSource.TableLocation) });
+ auto table = ctx.NewList(node->Pos(), {ctx.NewAtom(node->Pos(), "table"), path});
+ auto key = ctx.NewCallable(node->Pos(), "Key", {table});
+ auto newRead = Build<TCoRead>(ctx, node->Pos())
+ .World(read->Child(0))
+ .DataSource(
+ Build<TCoDataSource>(ctx, node->Pos())
+ .Category(ctx.NewAtom(node->Pos(), source->GetName()))
+ .FreeArgs()
+ .Add(ctx.NewAtom(node->Pos(), tableDesc.Metadata->ExternalSource.DataSourcePath))
+ .Build()
+ .Done().Ptr()
+ )
+ .FreeArgs()
+ .Add(ctx.NewCallable(node->Pos(), "MrTableConcat", {key}))
+ .Add(ctx.NewCallable(node->Pos(), "Void", {}))
+ .Add(BuildSettings(node->Pos(), ctx, tableDesc.Metadata->Columns, source, tableDesc.Metadata->ExternalSource.TableContent))
+ .Build()
+ .Done().Ptr();
+ auto retChildren = node->ChildrenList();
+ retChildren[0] = newRead;
+ return ctx.ChangeChildren(*node, std::move(retChildren));
+ }
+
auto newRead = ctx.RenameNode(*read, newName);
if (auto maybeRead = TMaybeNode<TKiReadTable>(newRead)) {
@@ -613,6 +730,7 @@ private:
TTypeAnnotationContext& Types;
TIntrusivePtr<IKikimrGateway> Gateway;
TIntrusivePtr<TKikimrSessionContext> SessionCtx;
+ NExternalSource::IExternalSourceFactory::TPtr ExternalSourceFactory;
TAutoPtr<IGraphTransformer> ConfigurationTransformer;
TAutoPtr<IGraphTransformer> IntentDeterminationTransformer;
@@ -650,15 +768,18 @@ TIntrusivePtr<IDataProvider> CreateKikimrDataSource(
const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry,
TTypeAnnotationContext& types,
TIntrusivePtr<IKikimrGateway> gateway,
- TIntrusivePtr<TKikimrSessionContext> sessionCtx)
+ TIntrusivePtr<TKikimrSessionContext> sessionCtx,
+ const NExternalSource::IExternalSourceFactory::TPtr& externalSourceFactory)
{
- return new TKikimrDataSource(functionRegistry, types, gateway, sessionCtx);
+ return new TKikimrDataSource(functionRegistry, types, gateway, sessionCtx, externalSourceFactory);
}
TAutoPtr<IGraphTransformer> CreateKiSourceLoadTableMetadataTransformer(TIntrusivePtr<IKikimrGateway> gateway,
- TIntrusivePtr<TKikimrSessionContext> sessionCtx)
+ TIntrusivePtr<TKikimrSessionContext> sessionCtx,
+ TTypeAnnotationContext& types,
+ const NExternalSource::IExternalSourceFactory::TPtr& externalSourceFactory)
{
- return new TKiSourceLoadTableMetadataTransformer(gateway, sessionCtx);
+ return new TKiSourceLoadTableMetadataTransformer(gateway, sessionCtx, types, externalSourceFactory);
}
} // namespace NYql
diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h
index 0493fdc0312..11109ee9f45 100644
--- a/ydb/core/kqp/provider/yql_kikimr_gateway.h
+++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h
@@ -283,7 +283,8 @@ enum class EKikimrTableKind : ui32 {
Unspecified = 0,
Datashard = 1,
SysView = 2,
- Olap = 3
+ Olap = 3,
+ External = 4
};
enum class ETableType : ui32 {
@@ -305,6 +306,16 @@ enum class EStoreType : ui32 {
Column = 1
};
+struct TExternalSource {
+ TString Type;
+ TString TableLocation;
+ TString TableContent;
+ TString DataSourcePath;
+ TString DataSourceLocation;
+ TString DataSourceInstallation;
+ NKikimrSchemeOp::TAuth DataSourceAuth;
+};
+
struct TKikimrTableMetadata : public TThrRefBase {
bool DoesExist = false;
TString Cluster;
@@ -336,6 +347,8 @@ struct TKikimrTableMetadata : public TThrRefBase {
TVector<TColumnFamily> ColumnFamilies;
TTableSettings TableSettings;
+ TExternalSource ExternalSource;
+
TKikimrTableMetadata(const TString& cluster, const TString& table)
: Cluster(cluster)
, Name(table)
diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp
index 1340464e725..eaca7a69698 100644
--- a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp
@@ -439,6 +439,40 @@ Y_UNIT_TEST_SUITE(KikimrIcGateway) {
TestCreateExternalDataSource(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_data_source");
TestDropExternalDataSource(*kikimr.GetTestServer().GetRuntime(), GetIcGateway(kikimr.GetTestServer()), "/Root/f1/f2/external_data_source");
}
+
+ Y_UNIT_TEST(TestLoadExternalTable) {
+ TKikimrRunner kikimr;
+ kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+ TString externalDataSourceName = "/Root/ExternalDataSource";
+ TString externalTableName = "/Root/ExternalTable";
+ auto query = TStringBuilder() << R"(
+ CREATE EXTERNAL DATA SOURCE `)" << externalDataSourceName << R"(` WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="my-bucket",
+ AUTH_METHOD="NONE"
+ );
+ CREATE EXTERNAL TABLE `)" << externalTableName << R"(` (
+ Key Uint64,
+ Value String
+ ) WITH (
+ DATA_SOURCE=")" << externalDataSourceName << R"(",
+ LOCATION="/"
+ );)";
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+ auto responseFuture = GetIcGateway(kikimr.GetTestServer())->LoadTableMetadata(TestCluster, externalTableName, IKikimrGateway::TLoadTableMetadataSettings());
+ responseFuture.Wait();
+ auto response = responseFuture.GetValue();
+ response.Issues().PrintTo(Cerr);
+ UNIT_ASSERT(response.Success());
+ UNIT_ASSERT_VALUES_EQUAL(response.Metadata->ExternalSource.Type, "ObjectStorage");
+ UNIT_ASSERT_VALUES_EQUAL(response.Metadata->ExternalSource.TableLocation, "/");
+ UNIT_ASSERT_VALUES_EQUAL(response.Metadata->ExternalSource.DataSourcePath, externalDataSourceName);
+ UNIT_ASSERT_VALUES_EQUAL(response.Metadata->ExternalSource.DataSourceLocation, "my-bucket");
+ UNIT_ASSERT_VALUES_EQUAL(response.Metadata->Columns.size(), 2);
+ }
}
} // namespace NYql
diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h
index 560a81ccc50..1d104a4b839 100644
--- a/ydb/core/kqp/provider/yql_kikimr_provider.h
+++ b/ydb/core/kqp/provider/yql_kikimr_provider.h
@@ -3,6 +3,7 @@
#include "yql_kikimr_gateway.h"
#include "yql_kikimr_settings.h"
+#include <ydb/core/external_sources/external_source_factory.h>
#include <ydb/core/kqp/query_data/kqp_query_data.h>
#include <ydb/library/yql/ast/yql_gc_nodes.h>
#include <ydb/library/yql/core/yql_type_annotation.h>
@@ -452,7 +453,8 @@ TIntrusivePtr<IDataProvider> CreateKikimrDataSource(
const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry,
TTypeAnnotationContext& types,
TIntrusivePtr<IKikimrGateway> gateway,
- TIntrusivePtr<TKikimrSessionContext> sessionCtx);
+ TIntrusivePtr<TKikimrSessionContext> sessionCtx,
+ const NKikimr::NExternalSource::IExternalSourceFactory::TPtr& sourceFactory);
TIntrusivePtr<IDataProvider> CreateKikimrDataSink(
const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry,
diff --git a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h
index b94e2c2eb2f..d1d4702db5c 100644
--- a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h
+++ b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h
@@ -2,6 +2,7 @@
#include "yql_kikimr_provider.h"
+#include <ydb/core/external_sources/external_source_factory.h>
#include <ydb/core/kqp/provider/yql_kikimr_expr_nodes.h>
#include <ydb/core/kqp/provider/yql_kikimr_results.h>
@@ -155,7 +156,9 @@ TAutoPtr<IGraphTransformer> CreateKiLogicalOptProposalTransformer(TIntrusivePtr<
TTypeAnnotationContext& types);
TAutoPtr<IGraphTransformer> CreateKiPhysicalOptProposalTransformer(TIntrusivePtr<TKikimrSessionContext> sessionCtx);
TAutoPtr<IGraphTransformer> CreateKiSourceLoadTableMetadataTransformer(TIntrusivePtr<IKikimrGateway> gateway,
- TIntrusivePtr<TKikimrSessionContext> sessionCtx);
+ TIntrusivePtr<TKikimrSessionContext> sessionCtx,
+ TTypeAnnotationContext& types,
+ const NKikimr::NExternalSource::IExternalSourceFactory::TPtr& sourceFactory);
TAutoPtr<IGraphTransformer> CreateKiSinkIntentDeterminationTransformer(TIntrusivePtr<TKikimrSessionContext> sessionCtx);
TAutoPtr<IGraphTransformer> CreateKiSourceCallableExecutionTransformer(
diff --git a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp
index b64aad7b937..edf8e5d2456 100644
--- a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp
+++ b/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp
@@ -8,6 +8,10 @@
#include <aws/s3/model/PutObjectRequest.h>
#include <aws/s3/S3Client.h>
+#include <ydb/library/yql/utils/log/log.h>
+
+#include <fmt/format.h>
+
namespace NKikimr {
namespace NKqp {
@@ -61,7 +65,6 @@ void CreateBucketWithObject(const TString& bucket, const TString& object, const
Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
Y_UNIT_TEST(ExecuteScript) {
- Cerr << "S3 endpoint: " << GetEnv("S3_ENDPOINT") << Endl;
CreateBucketWithObject("test_bucket", "Root/test_object", TEST_CONTENT);
SetEnv("TEST_S3_BUCKET", "test_bucket");
SetEnv("TEST_S3_OBJECT", "test_object");
@@ -101,6 +104,74 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2");
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world");
}
+
+ Y_UNIT_TEST(ExecuteScriptWithExternalTableResolve) {
+ using namespace fmt::literals;
+ const TString externalDataSourceName = "/Root/external_data_source";
+ const TString externalTableName = "/Root/test_binding_resolve";
+ const TString bucket = "test_bucket1";
+ const TString object = "Root/test_object";
+
+ CreateBucketWithObject(bucket, object, TEST_CONTENT);
+
+ auto kikimr = DefaultKikimrRunner();
+ kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
+
+ auto tc = kikimr.GetTableClient();
+ auto session = tc.CreateSession().GetValueSync().GetSession();
+ const TString query = fmt::format(R"(
+ CREATE EXTERNAL DATA SOURCE `{external_source}` WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="{location}",
+ AUTH_METHOD="NONE"
+ );
+ CREATE EXTERNAL TABLE `{external_table}` (
+ key Utf8 NOT NULL,
+ value Utf8 NOT NULL
+ ) WITH (
+ DATA_SOURCE="{external_source}",
+ LOCATION="{object}",
+ FORMAT="json_each_row"
+ );)",
+ "external_source"_a = externalTableName,
+ "external_table"_a = externalDataSourceName,
+ "location"_a = GetEnv("S3_ENDPOINT") + "/" + bucket + "/",
+ "object"_a = object
+ );
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+
+ auto db = kikimr.GetQueryClient();
+ auto executeScrptsResult = db.ExecuteScript(fmt::format(R"(
+ SELECT * FROM `{external_table}`
+ )", "external_table"_a=externalDataSourceName)).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(executeScrptsResult.Status().GetStatus(), EStatus::SUCCESS, executeScrptsResult.Status().GetIssues().ToString());
+ UNIT_ASSERT(executeScrptsResult.Metadata().ExecutionId);
+
+ TMaybe<TFetchScriptResultsResult> results;
+ do {
+ Sleep(TDuration::MilliSeconds(50));
+ TAsyncFetchScriptResultsResult future = db.FetchScriptResults(executeScrptsResult.Metadata().ExecutionId);
+ results.ConstructInPlace(future.ExtractValueSync());
+ if (!results->IsSuccess()) {
+ UNIT_ASSERT_C(results->GetStatus() == NYdb::EStatus::BAD_REQUEST, results->GetStatus() << ": " << results->GetIssues().ToString());
+ UNIT_ASSERT_STRING_CONTAINS(results->GetIssues().ToOneLineString(), "Results are not ready");
+ }
+ } while (!results->HasResultSet());
+ TResultSetParser resultSet(results->ExtractResultSet());
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2);
+
+ UNIT_ASSERT(resultSet.TryNextRow());
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1");
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo");
+
+ UNIT_ASSERT(resultSet.TryNextRow());
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2");
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world");
+ }
+
+
}
} // namespace NKqp
diff --git a/ydb/library/yql/core/yql_data_provider.h b/ydb/library/yql/core/yql_data_provider.h
index ac606ffcb57..8919fc17f5c 100644
--- a/ydb/library/yql/core/yql_data_provider.h
+++ b/ydb/library/yql/core/yql_data_provider.h
@@ -104,6 +104,7 @@ public:
virtual IGraphTransformer& GetConfigurationTransformer() = 0;
virtual TExprNode::TPtr GetClusterInfo(const TString& cluster, TExprContext& ctx) = 0;
virtual const THashMap<TString, TString>* GetClusterTokens() = 0;
+ virtual void AddCluster(const TString& name, const THashMap<TString, TString>& properties) = 0;
//-- discovery & rewrite
virtual IGraphTransformer& GetIODiscoveryTransformer() = 0;
diff --git a/ydb/library/yql/providers/common/provider/yql_data_provider_impl.cpp b/ydb/library/yql/providers/common/provider/yql_data_provider_impl.cpp
index 279d11ad515..f43c44a7cc9 100644
--- a/ydb/library/yql/providers/common/provider/yql_data_provider_impl.cpp
+++ b/ydb/library/yql/providers/common/provider/yql_data_provider_impl.cpp
@@ -88,6 +88,10 @@ TExprNode::TPtr TDataProviderBase::GetClusterInfo(const TString& cluster, TExprC
return {};
}
+void TDataProviderBase::AddCluster(const TString& name, const THashMap<TString, TString>& properties) {
+ Y_UNUSED(name, properties);
+}
+
const THashMap<TString, TString>* TDataProviderBase::GetClusterTokens() {
return nullptr;
}
diff --git a/ydb/library/yql/providers/common/provider/yql_data_provider_impl.h b/ydb/library/yql/providers/common/provider/yql_data_provider_impl.h
index afdaa465099..e661c757915 100644
--- a/ydb/library/yql/providers/common/provider/yql_data_provider_impl.h
+++ b/ydb/library/yql/providers/common/provider/yql_data_provider_impl.h
@@ -44,6 +44,7 @@ public:
bool Initialize(TExprContext& ctx) override;
IGraphTransformer& GetConfigurationTransformer() override;
TExprNode::TPtr GetClusterInfo(const TString& cluster, TExprContext& ctx) override;
+ void AddCluster(const TString& name, const THashMap<TString, TString>& properties) override;
const THashMap<TString, TString>* GetClusterTokens() override;
IGraphTransformer& GetIODiscoveryTransformer() override;
IGraphTransformer& GetEpochsTransformer() override;
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource.cpp
index 88bdb04cb9a..7d5672179b2 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource.cpp
@@ -1,8 +1,9 @@
#include "yql_s3_provider_impl.h"
#include "yql_s3_dq_integration.h"
-#include <ydb/library/yql/providers/common/config/yql_setting.h>
#include <ydb/library/yql/providers/common/config/yql_configuration_transformer.h>
+#include <ydb/library/yql/providers/common/config/yql_setting.h>
+#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h>
#include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h>
#include <ydb/library/yql/providers/common/provider/yql_provider.h>
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
@@ -28,6 +29,12 @@ public:
, DqIntegration_(CreateS3DqIntegration(State_))
{}
+ void AddCluster(const TString& name, const THashMap<TString, TString>& properties) override {
+ auto& settings = State_->Configuration->Clusters[name];
+ settings.Url = properties.Value("location", "");
+ State_->Configuration->Tokens[name] = ComposeStructuredTokenJsonForServiceAccount(properties.Value("serviceAccountId", ""), properties.Value("serviceAccountIdSignature", ""), properties.Value("authToken", ""));
+ }
+
TStringBuf GetName() const override {
return S3ProviderName;
}