aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhcpp <hcpp@ydb.tech>2023-05-19 17:30:32 +0300
committerhcpp <hcpp@ydb.tech>2023-05-19 17:30:32 +0300
commit81eb00dfba71fc3110d5af2eed7fa320ace33e5b (patch)
tree04af84ed414d9e9a8e35d268bea64946ccea62e3
parentaf03088e36285492d768810e454fe9c83f4be74f (diff)
downloadydb-81eb00dfba71fc3110d5af2eed7fa320ace33e5b.tar.gz
select from data source has been supported
New entities appeared in the ydb - external data sources. Before this review, it was impossible to read the data in the sql queries from these objects. Reading from these objects is supported in this review. An example of such a reading: ```(sql) SELECT * FROM `/local/s3_data_source`.`/` WITH ( format="json_each_row", schema( key Utf8 NOT NULL, value Utf8 NOT NULL ) ) ``` Here `/local/s3_data_source` is an external data source that describes the connection to s3 storage
-rw-r--r--ydb/core/kqp/gateway/kqp_ic_gateway.cpp10
-rw-r--r--ydb/core/kqp/gateway/kqp_metadata_loader.cpp19
-rw-r--r--ydb/core/kqp/host/kqp_host.cpp1
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_datasource.cpp28
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_gateway.h13
-rw-r--r--ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp276
-rw-r--r--ydb/library/yql/sql/settings/translation_settings.h1
-rw-r--r--ydb/library/yql/sql/v1/context.h4
8 files changed, 339 insertions, 13 deletions
diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
index d4dbcce9b9b..bad166e2bb5 100644
--- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
+++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
@@ -659,12 +659,16 @@ public:
TFuture<TTableMetadataResult> LoadTableMetadata(const TString& cluster, const TString& table,
TLoadTableMetadataSettings settings) override {
try {
- if (!CheckCluster(cluster)) {
+ if (!settings.WithExternalDatasources_ && !CheckCluster(cluster)) {
return InvalidCluster<TTableMetadataResult>(cluster);
}
- return MetadataLoader->LoadTableMetadata(cluster, table, settings, Database, UserToken);
-
+ settings.WithExternalDatasources_ = !CheckCluster(cluster);
+ // In the case of reading from an external data source,
+ // we have a construction of the form: `/Root/external_data_source`.`/path_in_external_system` WITH (...)
+ // In this syntax, information about path_in_external_system is already known and we only need information about external_data_source.
+ // To do this, we go to the DefaultCluster and get information about external_data_source from scheme shard
+ return MetadataLoader->LoadTableMetadata(settings.WithExternalDatasources_ ? GetDefaultCluster() : cluster, settings.WithExternalDatasources_ ? cluster : table, settings, Database, UserToken);
} catch (yexception& e) {
return MakeFuture(ResultFromException<TTableMetadataResult>(e));
}
diff --git a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp
index 66e26d0c0b3..eb00e3f676a 100644
--- a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp
+++ b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp
@@ -43,16 +43,19 @@ std::pair<TNavigate::TEntry, TString> CreateNavigateEntry(const std::pair<TIndex
return {entry, pair.second};
}
-std::optional<std::pair<TNavigate::TEntry, TString>> CreateNavigateExternalEntry(const TString& path) {
+std::optional<std::pair<TNavigate::TEntry, TString>> CreateNavigateExternalEntry(const TString& path, bool externalDataSource) {
TNavigate::TEntry entry;
entry.Path = SplitPath(path);
entry.Operation = NSchemeCache::TSchemeCacheNavigate::EOp::OpUnknown;
+ if (externalDataSource) {
+ entry.Kind = NSchemeCache::TSchemeCacheNavigate::EKind::KindExternalDataSource;
+ }
entry.SyncVersion = true;
return {{entry, path}};
}
-std::optional<std::pair<TNavigate::TEntry, TString>> CreateNavigateExternalEntry(const std::pair<TIndexId, TString>& pair) {
- Y_UNUSED(pair);
+std::optional<std::pair<TNavigate::TEntry, TString>> CreateNavigateExternalEntry(const std::pair<TIndexId, TString>& pair, bool externalDataSource) {
+ Y_UNUSED(pair, externalDataSource);
return {};
}
@@ -202,6 +205,7 @@ TTableMetadataResult GetExternalTableMetadataResult(const NSchemeCache::TSchemeC
);
}
+ tableMeta->ExternalSource.SourceType = NYql::ESourceType::ExternalTable;
tableMeta->ExternalSource.Type = description.GetSourceType();
tableMeta->ExternalSource.TableLocation = description.GetLocation();
tableMeta->ExternalSource.TableContent = description.GetContent();
@@ -223,10 +227,12 @@ TTableMetadataResult GetExternalDataSourceMetadataResult(const NSchemeCache::TSc
tableMeta->Attributes = entry.Attributes;
+ tableMeta->ExternalSource.SourceType = NYql::ESourceType::ExternalDataSource;
tableMeta->ExternalSource.Type = description.GetSourceType();
tableMeta->ExternalSource.DataSourceLocation = description.GetLocation();
tableMeta->ExternalSource.DataSourceInstallation = description.GetInstallation();
tableMeta->ExternalSource.DataSourceAuth = description.GetAuth();
+ tableMeta->ExternalSource.DataSourcePath = tableName;
return result;
}
@@ -514,8 +520,10 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
using EStatus = NSchemeCache::TSchemeCacheNavigate::EStatus;
using EKind = NSchemeCache::TSchemeCacheNavigate::EKind;
- const auto entry = CreateNavigateEntry(id, settings);
- const auto externalEntry = CreateNavigateExternalEntry(id);
+ const auto externalEntryItem = CreateNavigateExternalEntry(id, settings.WithExternalDatasources_);
+ Y_VERIFY(!settings.WithExternalDatasources_ || externalEntryItem, "External data source must be resolved using path only");
+ const auto entry = settings.WithExternalDatasources_ ? *externalEntryItem : CreateNavigateEntry(id, settings);
+ const auto externalEntry = settings.WithExternalDatasources_ ? std::optional<std::pair<TNavigate::TEntry, TString>>{} : externalEntryItem;
const ui64 expectedSchemaVersion = GetExpectedVersion(id);
LOG_DEBUG_S(*ActorSystem, NKikimrServices::KQP_GATEWAY, "Load table metadata from cache by path, request" << GetDebugString(id));
@@ -582,6 +590,7 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
promise.SetValue(externalTableMetadata);
return;
}
+ settings.WithExternalDatasources_ = true;
LoadTableMetadataCache(cluster, dataSourcePath, settings, database, userToken)
.Apply([promise, externalTableMetadata](const TFuture<TTableMetadataResult>& result) mutable
{
diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp
index 5c1caadea31..b77525cba30 100644
--- a/ydb/core/kqp/host/kqp_host.cpp
+++ b/ydb/core/kqp/host/kqp_host.cpp
@@ -1082,6 +1082,7 @@ private:
settings.V0Behavior = NSQLTranslation::EV0Behavior::Silent;
}
+ settings.DynamicClusterProvider = SessionCtx->Config().FeatureFlags.GetEnableExternalDataSources() ? NYql::KikimrProviderName : TString{};
settings.InferSyntaxVersion = true;
settings.V0ForceDisable = false;
settings.WarnOnV0 = false;
diff --git a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp
index 8fcfd98ffff..65f45687112 100644
--- a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp
@@ -136,7 +136,10 @@ public:
auto& result = emplaceResult.first->second;
auto future = Gateway->LoadTableMetadata(clusterName, tableName,
- IKikimrGateway::TLoadTableMetadataSettings().WithTableStats(table.GetNeedsStats()).WithPrivateTables(IsInternalCall));
+ IKikimrGateway::TLoadTableMetadataSettings()
+ .WithTableStats(table.GetNeedsStats())
+ .WithPrivateTables(IsInternalCall)
+ .WithExternalDatasources(SessionCtx->Config().FeatureFlags.GetEnableExternalDataSources()));
futures.push_back(future.Apply([result, queryType]
(const NThreading::TFuture<IKikimrGateway::TTableMetadataResult>& future) {
@@ -596,7 +599,28 @@ public:
}
auto& tableDesc = SessionCtx->Tables().GetTable(TString{source.Cluster()}, key.GetTablePath());
- if (key.GetKeyType() == TKikimrKey::Type::Table && tableDesc.Metadata->Kind == EKikimrTableKind::External) {
+ if (key.GetKeyType() == TKikimrKey::Type::Table && tableDesc.Metadata->Kind == EKikimrTableKind::External && tableDesc.Metadata->ExternalSource.SourceType == ESourceType::ExternalDataSource) {
+ const auto& source = ExternalSourceFactory->GetOrCreate(tableDesc.Metadata->ExternalSource.Type);
+ ctx.Step.Repeat(TExprStep::DiscoveryIO)
+ .Repeat(TExprStep::Epochs)
+ .Repeat(TExprStep::Intents)
+ .Repeat(TExprStep::LoadTablesMetadata)
+ .Repeat(TExprStep::RewriteIO);
+ auto readArgs = read->ChildrenList();
+ readArgs[1] = Build<TCoDataSource>(ctx, node->Pos())
+ .Category(ctx.NewAtom(node->Pos(), source->GetName()))
+ .FreeArgs()
+ .Add(readArgs[1]->ChildrenList()[1])
+ .Build()
+ .Done().Ptr();
+ readArgs[2] = ctx.NewCallable(node->Pos(), "MrTableConcat", { readArgs[2] });
+ auto newRead = ctx.ChangeChildren(*read, std::move(readArgs));
+ auto retChildren = node->ChildrenList();
+ retChildren[0] = newRead;
+ return ctx.ChangeChildren(*node, std::move(retChildren));
+ }
+
+ if (key.GetKeyType() == TKikimrKey::Type::Table && tableDesc.Metadata->Kind == EKikimrTableKind::External && tableDesc.Metadata->ExternalSource.SourceType == ESourceType::ExternalTable) {
const auto& source = ExternalSourceFactory->GetOrCreate(tableDesc.Metadata->ExternalSource.Type);
ctx.Step.Repeat(TExprStep::DiscoveryIO)
.Repeat(TExprStep::Epochs)
diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h
index 99004025067..947d6ffc0f1 100644
--- a/ydb/core/kqp/provider/yql_kikimr_gateway.h
+++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h
@@ -306,7 +306,14 @@ enum class EStoreType : ui32 {
Column = 1
};
+enum class ESourceType : ui32 {
+ Unknown = 0,
+ ExternalTable = 1,
+ ExternalDataSource = 2
+};
+
struct TExternalSource {
+ ESourceType SourceType = ESourceType::Unknown;
TString Type;
TString TableLocation;
TString TableContent;
@@ -657,8 +664,14 @@ public:
return *this;
}
+ TLoadTableMetadataSettings& WithExternalDatasources(bool enable) {
+ WithExternalDatasources_ = enable;
+ return *this;
+ }
+
bool RequestStats_ = false;
bool WithPrivateTables_ = false;
+ bool WithExternalDatasources_ = false;
};
class IKqpTableMetadataLoader : public std::enable_shared_from_this<IKqpTableMetadataLoader> {
diff --git a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp
index 03d51859a0b..b6b90a285f1 100644
--- a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp
+++ b/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp
@@ -14,6 +14,8 @@
#include <fmt/format.h>
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+
namespace NKikimr {
namespace NKqp {
@@ -144,8 +146,8 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
LOCATION="{object}",
FORMAT="json_each_row"
);)",
- "external_source"_a = externalTableName,
- "external_table"_a = externalDataSourceName,
+ "external_source"_a = externalDataSourceName,
+ "external_table"_a = externalTableName,
"location"_a = GetEnv("S3_ENDPOINT") + "/" + bucket + "/",
"object"_a = object
);
@@ -155,7 +157,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
auto db = kikimr.GetQueryClient();
auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"(
SELECT * FROM `{external_table}`
- )", "external_table"_a=externalDataSourceName)).ExtractValueSync();
+ )", "external_table"_a=externalTableName)).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId);
@@ -177,7 +179,275 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world");
}
+ Y_UNIT_TEST(ExecuteScriptWithDataSource) {
+ using namespace fmt::literals;
+ const TString externalDataSourceName = "/Root/external_data_source";
+ const TString bucket = "test_bucket3";
+
+ CreateBucketWithObject(bucket, "Root/test_object", TEST_CONTENT);
+
+ auto kikimr = DefaultKikimrRunner();
+ kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
+ auto tc = kikimr.GetTableClient();
+ auto session = tc.CreateSession().GetValueSync().GetSession();
+ const TString query = fmt::format(R"(
+ CREATE EXTERNAL DATA SOURCE `{external_source}` WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="{location}",
+ AUTH_METHOD="NONE"
+ );)",
+ "external_source"_a = externalDataSourceName,
+ "location"_a = GetEnv("S3_ENDPOINT") + "/" + bucket + "/"
+ );
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+ auto db = kikimr.GetQueryClient();
+ auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"(
+ SELECT * FROM `{external_source}`.`/` WITH (
+ format="json_each_row",
+ schema(
+ key Utf8 NOT NULL,
+ value Utf8 NOT NULL
+ )
+ )
+ )", "external_source"_a = externalDataSourceName)).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
+ UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId);
+
+ NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver());
+ UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed);
+ TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId).ExtractValueSync();
+ UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());
+
+ TResultSetParser resultSet(results.ExtractResultSet());
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2);
+ UNIT_ASSERT(resultSet.TryNextRow());
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1");
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo");
+ UNIT_ASSERT(resultSet.TryNextRow());
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2");
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world");
+ }
+
+ Y_UNIT_TEST(ExecuteScriptWithDataSourceJoinYdb) {
+ using namespace fmt::literals;
+ const TString externalDataSourceName = "/Root/external_data_source_2";
+ const TString ydbTable = "/Root/ydb_table";
+ const TString bucket = "test_bucket4";
+
+ CreateBucketWithObject(bucket, "Root/test_object", TEST_CONTENT);
+
+ auto kikimr = DefaultKikimrRunner();
+ kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
+ auto tc = kikimr.GetTableClient();
+ auto session = tc.CreateSession().GetValueSync().GetSession();
+ {
+ const TString query = fmt::format(R"(
+ CREATE EXTERNAL DATA SOURCE `{external_source}` WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="{location}",
+ AUTH_METHOD="NONE"
+ );
+ CREATE TABLE `{ydb_table}` (
+ key Utf8,
+ value Utf8,
+ PRIMARY KEY (key)
+ );
+ )",
+ "external_source"_a = externalDataSourceName,
+ "location"_a = GetEnv("S3_ENDPOINT") + "/" + bucket + "/",
+ "ydb_table"_a = ydbTable
+ );
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ {
+ const TString query = fmt::format(R"(
+ REPLACE INTO `{ydb_table}` (key, value) VALUES
+ ("1", "one"),
+ ("2", "two")
+ )",
+ "ydb_table"_a = ydbTable
+ );
+ auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()).GetValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ auto db = kikimr.GetQueryClient();
+ auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"(
+ SELECT t1.key as key, t1.value as v1, t2.value as v2 FROM `{external_source}`.`/` WITH (
+ format="json_each_row",
+ schema(
+ key Utf8 NOT NULL,
+ value Utf8 NOT NULL
+ )
+ ) AS t1 JOIN `ydb_table` AS t2 ON t1.key = t2.key
+ )"
+ , "external_source"_a = externalDataSourceName
+ , "ydb_table"_a = ydbTable)).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
+ UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId);
+
+ NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver());
+ UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed);
+ TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId).ExtractValueSync();
+ UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());
+
+ TResultSetParser resultSet(results.ExtractResultSet());
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 3);
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2);
+ UNIT_ASSERT(resultSet.TryNextRow());
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1");
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo");
+ UNIT_ASSERT_VALUES_EQUAL(*resultSet.ColumnParser(2).GetOptionalUtf8(), "one");
+ UNIT_ASSERT(resultSet.TryNextRow());
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2");
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world");
+ UNIT_ASSERT_VALUES_EQUAL(*resultSet.ColumnParser(2).GetOptionalUtf8(), "two");
+ }
+
+ Y_UNIT_TEST(ExecuteScriptWithExternalTableResolveCheckPragma) {
+ using namespace fmt::literals;
+ const TString externalDataSourceName = "/Root/external_data_source";
+ const TString externalTableName = "/Root/test_binding_resolve";
+ const TString bucket = "test_bucket5";
+ const TString object = "Root/test_object";
+ CreateBucketWithObject(bucket, object, TEST_CONTENT);
+
+ auto kikimr = DefaultKikimrRunner();
+ kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
+
+ auto tc = kikimr.GetTableClient();
+ auto session = tc.CreateSession().GetValueSync().GetSession();
+ const TString query = fmt::format(R"(
+ CREATE EXTERNAL DATA SOURCE `{external_source}` WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="{location}",
+ AUTH_METHOD="NONE"
+ );
+ CREATE EXTERNAL TABLE `{external_table}` (
+ key Utf8 NOT NULL,
+ value Utf8 NOT NULL
+ ) WITH (
+ DATA_SOURCE="{external_source}",
+ LOCATION="{object}",
+ FORMAT="json_each_row"
+ );)",
+ "external_source"_a = externalDataSourceName,
+ "external_table"_a = externalTableName,
+ "location"_a = GetEnv("S3_ENDPOINT") + "/" + bucket + "/",
+ "object"_a = object
+ );
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+
+ auto db = kikimr.GetQueryClient();
+ auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"(
+ PRAGMA s3.JsonListSizeLimit = "10";
+ PRAGMA s3.SourceCoroActor = 'true';
+ PRAGMA Kikimr.OptEnableOlapPushdown = "false";
+ SELECT * FROM `{external_table}`
+ )", "external_table"_a=externalTableName)).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
+ UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId);
+
+ NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver());
+ UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed);
+ TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId).ExtractValueSync();
+ UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());
+
+ TResultSetParser resultSet(results.ExtractResultSet());
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2);
+
+ UNIT_ASSERT(resultSet.TryNextRow());
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1");
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo");
+
+ UNIT_ASSERT(resultSet.TryNextRow());
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2");
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world");
+ }
+
+ Y_UNIT_TEST(ExecuteScriptWithDataSourceJoinYdbCheckPragma) {
+ using namespace fmt::literals;
+ const TString externalDataSourceName = "/Root/external_data_source_2";
+ const TString ydbTable = "/Root/ydb_table";
+ const TString bucket = "test_bucket6";
+
+ CreateBucketWithObject(bucket, "Root/test_object", TEST_CONTENT);
+
+ auto kikimr = DefaultKikimrRunner();
+ kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
+ auto tc = kikimr.GetTableClient();
+ auto session = tc.CreateSession().GetValueSync().GetSession();
+ {
+ const TString query = fmt::format(R"(
+ CREATE EXTERNAL DATA SOURCE `{external_source}` WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="{location}",
+ AUTH_METHOD="NONE"
+ );
+ CREATE TABLE `{ydb_table}` (
+ key Utf8,
+ value Utf8,
+ PRIMARY KEY (key)
+ );
+ )",
+ "external_source"_a = externalDataSourceName,
+ "location"_a = GetEnv("S3_ENDPOINT") + "/" + bucket + "/",
+ "ydb_table"_a = ydbTable
+ );
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ {
+ const TString query = fmt::format(R"(
+ REPLACE INTO `{ydb_table}` (key, value) VALUES
+ ("1", "one"),
+ ("2", "two")
+ )",
+ "ydb_table"_a = ydbTable
+ );
+ auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()).GetValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ auto db = kikimr.GetQueryClient();
+ auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"(
+ PRAGMA s3.JsonListSizeLimit = "10";
+ PRAGMA s3.SourceCoroActor = 'true';
+ PRAGMA Kikimr.OptEnableOlapPushdown = "false";
+ SELECT t1.key as key, t1.value as v1, t2.value as v2 FROM `{external_source}`.`/` WITH (
+ format="json_each_row",
+ schema(
+ key Utf8 NOT NULL,
+ value Utf8 NOT NULL
+ )
+ ) AS t1 JOIN `ydb_table` AS t2 ON t1.key = t2.key
+ )"
+ , "external_source"_a = externalDataSourceName
+ , "ydb_table"_a = ydbTable)).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
+ UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId);
+
+ NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver());
+ UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed);
+ TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId).ExtractValueSync();
+ UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());
+
+ TResultSetParser resultSet(results.ExtractResultSet());
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 3);
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2);
+ UNIT_ASSERT(resultSet.TryNextRow());
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1");
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo");
+ UNIT_ASSERT_VALUES_EQUAL(*resultSet.ColumnParser(2).GetOptionalUtf8(), "one");
+ UNIT_ASSERT(resultSet.TryNextRow());
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2");
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world");
+ UNIT_ASSERT_VALUES_EQUAL(*resultSet.ColumnParser(2).GetOptionalUtf8(), "two");
+ }
}
} // namespace NKqp
diff --git a/ydb/library/yql/sql/settings/translation_settings.h b/ydb/library/yql/sql/settings/translation_settings.h
index f1d909de62a..f17c50ddc3c 100644
--- a/ydb/library/yql/sql/settings/translation_settings.h
+++ b/ydb/library/yql/sql/settings/translation_settings.h
@@ -90,6 +90,7 @@ namespace NSQLTranslation {
ISqlFeaturePolicy::TPtr V0WarnAsError;
ISqlFeaturePolicy::TPtr DqDefaultAuto;
bool AssumeYdbOnClusterWithSlash;
+ TString DynamicClusterProvider;
};
bool ParseTranslationSettings(const TString& query, NSQLTranslation::TTranslationSettings& settings, NYql::TIssues& issues);
diff --git a/ydb/library/yql/sql/v1/context.h b/ydb/library/yql/sql/v1/context.h
index 3ff7ccbe478..773f4763fda 100644
--- a/ydb/library/yql/sql/v1/context.h
+++ b/ydb/library/yql/sql/v1/context.h
@@ -138,6 +138,10 @@ namespace NSQLTranslationV1 {
normalizedClusterName = cluster;
return TString(NYql::KikimrProviderName);
}
+ if (Settings.DynamicClusterProvider) {
+ normalizedClusterName = cluster;
+ return Settings.DynamicClusterProvider;
+ }
return Nothing();
}