diff options
author | hcpp <hcpp@ydb.tech> | 2023-05-19 17:30:32 +0300 |
---|---|---|
committer | hcpp <hcpp@ydb.tech> | 2023-05-19 17:30:32 +0300 |
commit | 81eb00dfba71fc3110d5af2eed7fa320ace33e5b (patch) | |
tree | 04af84ed414d9e9a8e35d268bea64946ccea62e3 | |
parent | af03088e36285492d768810e454fe9c83f4be74f (diff) | |
download | ydb-81eb00dfba71fc3110d5af2eed7fa320ace33e5b.tar.gz |
select from data source has been supported
New entities appeared in the ydb - external data sources. Before this review, it was impossible to read the data in the sql queries from these objects. Reading from these objects is supported in this review. An example of such a reading:
```(sql)
SELECT * FROM `/local/s3_data_source`.`/` WITH (
format="json_each_row",
schema(
key Utf8 NOT NULL,
value Utf8 NOT NULL
)
)
```
Here `/local/s3_data_source` is an external data source that describes the connection to s3 storage
-rw-r--r-- | ydb/core/kqp/gateway/kqp_ic_gateway.cpp | 10 | ||||
-rw-r--r-- | ydb/core/kqp/gateway/kqp_metadata_loader.cpp | 19 | ||||
-rw-r--r-- | ydb/core/kqp/host/kqp_host.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_datasource.cpp | 28 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_gateway.h | 13 | ||||
-rw-r--r-- | ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp | 276 | ||||
-rw-r--r-- | ydb/library/yql/sql/settings/translation_settings.h | 1 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/context.h | 4 |
8 files changed, 339 insertions, 13 deletions
diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp index d4dbcce9b9b..bad166e2bb5 100644 --- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp @@ -659,12 +659,16 @@ public: TFuture<TTableMetadataResult> LoadTableMetadata(const TString& cluster, const TString& table, TLoadTableMetadataSettings settings) override { try { - if (!CheckCluster(cluster)) { + if (!settings.WithExternalDatasources_ && !CheckCluster(cluster)) { return InvalidCluster<TTableMetadataResult>(cluster); } - return MetadataLoader->LoadTableMetadata(cluster, table, settings, Database, UserToken); - + settings.WithExternalDatasources_ = !CheckCluster(cluster); + // In the case of reading from an external data source, + // we have a construction of the form: `/Root/external_data_source`.`/path_in_external_system` WITH (...) + // In this syntax, information about path_in_external_system is already known and we only need information about external_data_source. + // To do this, we go to the DefaultCluster and get information about external_data_source from scheme shard + return MetadataLoader->LoadTableMetadata(settings.WithExternalDatasources_ ? GetDefaultCluster() : cluster, settings.WithExternalDatasources_ ? cluster : table, settings, Database, UserToken); } catch (yexception& e) { return MakeFuture(ResultFromException<TTableMetadataResult>(e)); } diff --git a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp index 66e26d0c0b3..eb00e3f676a 100644 --- a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp +++ b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp @@ -43,16 +43,19 @@ std::pair<TNavigate::TEntry, TString> CreateNavigateEntry(const std::pair<TIndex return {entry, pair.second}; } -std::optional<std::pair<TNavigate::TEntry, TString>> CreateNavigateExternalEntry(const TString& path) { +std::optional<std::pair<TNavigate::TEntry, TString>> CreateNavigateExternalEntry(const TString& path, bool externalDataSource) { TNavigate::TEntry entry; entry.Path = SplitPath(path); entry.Operation = NSchemeCache::TSchemeCacheNavigate::EOp::OpUnknown; + if (externalDataSource) { + entry.Kind = NSchemeCache::TSchemeCacheNavigate::EKind::KindExternalDataSource; + } entry.SyncVersion = true; return {{entry, path}}; } -std::optional<std::pair<TNavigate::TEntry, TString>> CreateNavigateExternalEntry(const std::pair<TIndexId, TString>& pair) { - Y_UNUSED(pair); +std::optional<std::pair<TNavigate::TEntry, TString>> CreateNavigateExternalEntry(const std::pair<TIndexId, TString>& pair, bool externalDataSource) { + Y_UNUSED(pair, externalDataSource); return {}; } @@ -202,6 +205,7 @@ TTableMetadataResult GetExternalTableMetadataResult(const NSchemeCache::TSchemeC ); } + tableMeta->ExternalSource.SourceType = NYql::ESourceType::ExternalTable; tableMeta->ExternalSource.Type = description.GetSourceType(); tableMeta->ExternalSource.TableLocation = description.GetLocation(); tableMeta->ExternalSource.TableContent = description.GetContent(); @@ -223,10 +227,12 @@ TTableMetadataResult GetExternalDataSourceMetadataResult(const NSchemeCache::TSc tableMeta->Attributes = entry.Attributes; + tableMeta->ExternalSource.SourceType = NYql::ESourceType::ExternalDataSource; tableMeta->ExternalSource.Type = description.GetSourceType(); tableMeta->ExternalSource.DataSourceLocation = description.GetLocation(); tableMeta->ExternalSource.DataSourceInstallation = description.GetInstallation(); tableMeta->ExternalSource.DataSourceAuth = description.GetAuth(); + tableMeta->ExternalSource.DataSourcePath = tableName; return result; } @@ -514,8 +520,10 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta using EStatus = NSchemeCache::TSchemeCacheNavigate::EStatus; using EKind = NSchemeCache::TSchemeCacheNavigate::EKind; - const auto entry = CreateNavigateEntry(id, settings); - const auto externalEntry = CreateNavigateExternalEntry(id); + const auto externalEntryItem = CreateNavigateExternalEntry(id, settings.WithExternalDatasources_); + Y_VERIFY(!settings.WithExternalDatasources_ || externalEntryItem, "External data source must be resolved using path only"); + const auto entry = settings.WithExternalDatasources_ ? *externalEntryItem : CreateNavigateEntry(id, settings); + const auto externalEntry = settings.WithExternalDatasources_ ? std::optional<std::pair<TNavigate::TEntry, TString>>{} : externalEntryItem; const ui64 expectedSchemaVersion = GetExpectedVersion(id); LOG_DEBUG_S(*ActorSystem, NKikimrServices::KQP_GATEWAY, "Load table metadata from cache by path, request" << GetDebugString(id)); @@ -582,6 +590,7 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta promise.SetValue(externalTableMetadata); return; } + settings.WithExternalDatasources_ = true; LoadTableMetadataCache(cluster, dataSourcePath, settings, database, userToken) .Apply([promise, externalTableMetadata](const TFuture<TTableMetadataResult>& result) mutable { diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index 5c1caadea31..b77525cba30 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -1082,6 +1082,7 @@ private: settings.V0Behavior = NSQLTranslation::EV0Behavior::Silent; } + settings.DynamicClusterProvider = SessionCtx->Config().FeatureFlags.GetEnableExternalDataSources() ? NYql::KikimrProviderName : TString{}; settings.InferSyntaxVersion = true; settings.V0ForceDisable = false; settings.WarnOnV0 = false; diff --git a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp index 8fcfd98ffff..65f45687112 100644 --- a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp @@ -136,7 +136,10 @@ public: auto& result = emplaceResult.first->second; auto future = Gateway->LoadTableMetadata(clusterName, tableName, - IKikimrGateway::TLoadTableMetadataSettings().WithTableStats(table.GetNeedsStats()).WithPrivateTables(IsInternalCall)); + IKikimrGateway::TLoadTableMetadataSettings() + .WithTableStats(table.GetNeedsStats()) + .WithPrivateTables(IsInternalCall) + .WithExternalDatasources(SessionCtx->Config().FeatureFlags.GetEnableExternalDataSources())); futures.push_back(future.Apply([result, queryType] (const NThreading::TFuture<IKikimrGateway::TTableMetadataResult>& future) { @@ -596,7 +599,28 @@ public: } auto& tableDesc = SessionCtx->Tables().GetTable(TString{source.Cluster()}, key.GetTablePath()); - if (key.GetKeyType() == TKikimrKey::Type::Table && tableDesc.Metadata->Kind == EKikimrTableKind::External) { + if (key.GetKeyType() == TKikimrKey::Type::Table && tableDesc.Metadata->Kind == EKikimrTableKind::External && tableDesc.Metadata->ExternalSource.SourceType == ESourceType::ExternalDataSource) { + const auto& source = ExternalSourceFactory->GetOrCreate(tableDesc.Metadata->ExternalSource.Type); + ctx.Step.Repeat(TExprStep::DiscoveryIO) + .Repeat(TExprStep::Epochs) + .Repeat(TExprStep::Intents) + .Repeat(TExprStep::LoadTablesMetadata) + .Repeat(TExprStep::RewriteIO); + auto readArgs = read->ChildrenList(); + readArgs[1] = Build<TCoDataSource>(ctx, node->Pos()) + .Category(ctx.NewAtom(node->Pos(), source->GetName())) + .FreeArgs() + .Add(readArgs[1]->ChildrenList()[1]) + .Build() + .Done().Ptr(); + readArgs[2] = ctx.NewCallable(node->Pos(), "MrTableConcat", { readArgs[2] }); + auto newRead = ctx.ChangeChildren(*read, std::move(readArgs)); + auto retChildren = node->ChildrenList(); + retChildren[0] = newRead; + return ctx.ChangeChildren(*node, std::move(retChildren)); + } + + if (key.GetKeyType() == TKikimrKey::Type::Table && tableDesc.Metadata->Kind == EKikimrTableKind::External && tableDesc.Metadata->ExternalSource.SourceType == ESourceType::ExternalTable) { const auto& source = ExternalSourceFactory->GetOrCreate(tableDesc.Metadata->ExternalSource.Type); ctx.Step.Repeat(TExprStep::DiscoveryIO) .Repeat(TExprStep::Epochs) diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h index 99004025067..947d6ffc0f1 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway.h +++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h @@ -306,7 +306,14 @@ enum class EStoreType : ui32 { Column = 1 }; +enum class ESourceType : ui32 { + Unknown = 0, + ExternalTable = 1, + ExternalDataSource = 2 +}; + struct TExternalSource { + ESourceType SourceType = ESourceType::Unknown; TString Type; TString TableLocation; TString TableContent; @@ -657,8 +664,14 @@ public: return *this; } + TLoadTableMetadataSettings& WithExternalDatasources(bool enable) { + WithExternalDatasources_ = enable; + return *this; + } + bool RequestStats_ = false; bool WithPrivateTables_ = false; + bool WithExternalDatasources_ = false; }; class IKqpTableMetadataLoader : public std::enable_shared_from_this<IKqpTableMetadataLoader> { diff --git a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp index 03d51859a0b..b6b90a285f1 100644 --- a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp @@ -14,6 +14,8 @@ #include <fmt/format.h> +#include <ydb/public/sdk/cpp/client/ydb_table/table.h> + namespace NKikimr { namespace NKqp { @@ -144,8 +146,8 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { LOCATION="{object}", FORMAT="json_each_row" );)", - "external_source"_a = externalTableName, - "external_table"_a = externalDataSourceName, + "external_source"_a = externalDataSourceName, + "external_table"_a = externalTableName, "location"_a = GetEnv("S3_ENDPOINT") + "/" + bucket + "/", "object"_a = object ); @@ -155,7 +157,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { auto db = kikimr.GetQueryClient(); auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"( SELECT * FROM `{external_table}` - )", "external_table"_a=externalDataSourceName)).ExtractValueSync(); + )", "external_table"_a=externalTableName)).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); @@ -177,7 +179,275 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world"); } + Y_UNIT_TEST(ExecuteScriptWithDataSource) { + using namespace fmt::literals; + const TString externalDataSourceName = "/Root/external_data_source"; + const TString bucket = "test_bucket3"; + + CreateBucketWithObject(bucket, "Root/test_object", TEST_CONTENT); + + auto kikimr = DefaultKikimrRunner(); + kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true); + auto tc = kikimr.GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format(R"( + CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{location}", + AUTH_METHOD="NONE" + );)", + "external_source"_a = externalDataSourceName, + "location"_a = GetEnv("S3_ENDPOINT") + "/" + bucket + "/" + ); + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + auto db = kikimr.GetQueryClient(); + auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"( + SELECT * FROM `{external_source}`.`/` WITH ( + format="json_each_row", + schema( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ) + ) + )", "external_source"_a = externalDataSourceName)).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); + UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); + + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver()); + UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); + TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId).ExtractValueSync(); + UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); + + TResultSetParser resultSet(results.ExtractResultSet()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2); + UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2); + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo"); + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world"); + } + + Y_UNIT_TEST(ExecuteScriptWithDataSourceJoinYdb) { + using namespace fmt::literals; + const TString externalDataSourceName = "/Root/external_data_source_2"; + const TString ydbTable = "/Root/ydb_table"; + const TString bucket = "test_bucket4"; + + CreateBucketWithObject(bucket, "Root/test_object", TEST_CONTENT); + + auto kikimr = DefaultKikimrRunner(); + kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true); + auto tc = kikimr.GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + { + const TString query = fmt::format(R"( + CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{location}", + AUTH_METHOD="NONE" + ); + CREATE TABLE `{ydb_table}` ( + key Utf8, + value Utf8, + PRIMARY KEY (key) + ); + )", + "external_source"_a = externalDataSourceName, + "location"_a = GetEnv("S3_ENDPOINT") + "/" + bucket + "/", + "ydb_table"_a = ydbTable + ); + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + { + const TString query = fmt::format(R"( + REPLACE INTO `{ydb_table}` (key, value) VALUES + ("1", "one"), + ("2", "two") + )", + "ydb_table"_a = ydbTable + ); + auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + auto db = kikimr.GetQueryClient(); + auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"( + SELECT t1.key as key, t1.value as v1, t2.value as v2 FROM `{external_source}`.`/` WITH ( + format="json_each_row", + schema( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ) + ) AS t1 JOIN `ydb_table` AS t2 ON t1.key = t2.key + )" + , "external_source"_a = externalDataSourceName + , "ydb_table"_a = ydbTable)).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); + UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); + + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver()); + UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); + TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId).ExtractValueSync(); + UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); + + TResultSetParser resultSet(results.ExtractResultSet()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 3); + UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2); + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo"); + UNIT_ASSERT_VALUES_EQUAL(*resultSet.ColumnParser(2).GetOptionalUtf8(), "one"); + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world"); + UNIT_ASSERT_VALUES_EQUAL(*resultSet.ColumnParser(2).GetOptionalUtf8(), "two"); + } + + Y_UNIT_TEST(ExecuteScriptWithExternalTableResolveCheckPragma) { + using namespace fmt::literals; + const TString externalDataSourceName = "/Root/external_data_source"; + const TString externalTableName = "/Root/test_binding_resolve"; + const TString bucket = "test_bucket5"; + const TString object = "Root/test_object"; + CreateBucketWithObject(bucket, object, TEST_CONTENT); + + auto kikimr = DefaultKikimrRunner(); + kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true); + + auto tc = kikimr.GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format(R"( + CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{location}", + AUTH_METHOD="NONE" + ); + CREATE EXTERNAL TABLE `{external_table}` ( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ) WITH ( + DATA_SOURCE="{external_source}", + LOCATION="{object}", + FORMAT="json_each_row" + );)", + "external_source"_a = externalDataSourceName, + "external_table"_a = externalTableName, + "location"_a = GetEnv("S3_ENDPOINT") + "/" + bucket + "/", + "object"_a = object + ); + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + auto db = kikimr.GetQueryClient(); + auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"( + PRAGMA s3.JsonListSizeLimit = "10"; + PRAGMA s3.SourceCoroActor = 'true'; + PRAGMA Kikimr.OptEnableOlapPushdown = "false"; + SELECT * FROM `{external_table}` + )", "external_table"_a=externalTableName)).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); + UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); + + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver()); + UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); + TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId).ExtractValueSync(); + UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); + + TResultSetParser resultSet(results.ExtractResultSet()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2); + UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2); + + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo"); + + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world"); + } + + Y_UNIT_TEST(ExecuteScriptWithDataSourceJoinYdbCheckPragma) { + using namespace fmt::literals; + const TString externalDataSourceName = "/Root/external_data_source_2"; + const TString ydbTable = "/Root/ydb_table"; + const TString bucket = "test_bucket6"; + + CreateBucketWithObject(bucket, "Root/test_object", TEST_CONTENT); + + auto kikimr = DefaultKikimrRunner(); + kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true); + auto tc = kikimr.GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + { + const TString query = fmt::format(R"( + CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{location}", + AUTH_METHOD="NONE" + ); + CREATE TABLE `{ydb_table}` ( + key Utf8, + value Utf8, + PRIMARY KEY (key) + ); + )", + "external_source"_a = externalDataSourceName, + "location"_a = GetEnv("S3_ENDPOINT") + "/" + bucket + "/", + "ydb_table"_a = ydbTable + ); + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + { + const TString query = fmt::format(R"( + REPLACE INTO `{ydb_table}` (key, value) VALUES + ("1", "one"), + ("2", "two") + )", + "ydb_table"_a = ydbTable + ); + auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + auto db = kikimr.GetQueryClient(); + auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"( + PRAGMA s3.JsonListSizeLimit = "10"; + PRAGMA s3.SourceCoroActor = 'true'; + PRAGMA Kikimr.OptEnableOlapPushdown = "false"; + SELECT t1.key as key, t1.value as v1, t2.value as v2 FROM `{external_source}`.`/` WITH ( + format="json_each_row", + schema( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ) + ) AS t1 JOIN `ydb_table` AS t2 ON t1.key = t2.key + )" + , "external_source"_a = externalDataSourceName + , "ydb_table"_a = ydbTable)).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); + UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); + + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver()); + UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); + TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId).ExtractValueSync(); + UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); + + TResultSetParser resultSet(results.ExtractResultSet()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 3); + UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2); + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo"); + UNIT_ASSERT_VALUES_EQUAL(*resultSet.ColumnParser(2).GetOptionalUtf8(), "one"); + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world"); + UNIT_ASSERT_VALUES_EQUAL(*resultSet.ColumnParser(2).GetOptionalUtf8(), "two"); + } } } // namespace NKqp diff --git a/ydb/library/yql/sql/settings/translation_settings.h b/ydb/library/yql/sql/settings/translation_settings.h index f1d909de62a..f17c50ddc3c 100644 --- a/ydb/library/yql/sql/settings/translation_settings.h +++ b/ydb/library/yql/sql/settings/translation_settings.h @@ -90,6 +90,7 @@ namespace NSQLTranslation { ISqlFeaturePolicy::TPtr V0WarnAsError; ISqlFeaturePolicy::TPtr DqDefaultAuto; bool AssumeYdbOnClusterWithSlash; + TString DynamicClusterProvider; }; bool ParseTranslationSettings(const TString& query, NSQLTranslation::TTranslationSettings& settings, NYql::TIssues& issues); diff --git a/ydb/library/yql/sql/v1/context.h b/ydb/library/yql/sql/v1/context.h index 3ff7ccbe478..773f4763fda 100644 --- a/ydb/library/yql/sql/v1/context.h +++ b/ydb/library/yql/sql/v1/context.h @@ -138,6 +138,10 @@ namespace NSQLTranslationV1 { normalizedClusterName = cluster; return TString(NYql::KikimrProviderName); } + if (Settings.DynamicClusterProvider) { + normalizedClusterName = cluster; + return Settings.DynamicClusterProvider; + } return Nothing(); } |