diff options
author | galaxycrab <UgnineSirdis@ydb.tech> | 2023-05-26 23:26:06 +0300 |
---|---|---|
committer | galaxycrab <UgnineSirdis@ydb.tech> | 2023-05-26 23:26:06 +0300 |
commit | 4eb34319ae07ef0fd4a7e7a9f3bd07c15dbb724d (patch) | |
tree | 6052f659b0379330eb0774c0f46836b42fdbffdf | |
parent | 025c78f0683aef81d9d392bfbf4bb5b12da684a9 (diff) | |
download | ydb-4eb34319ae07ef0fd4a7e7a9f3bd07c15dbb724d.tar.gz |
Support ExecuteQuery with federated data sources
-rw-r--r-- | ydb/core/kqp/common/kqp_yql.cpp | 12 | ||||
-rw-r--r-- | ydb/core/kqp/common/kqp_yql.h | 4 | ||||
-rw-r--r-- | ydb/core/kqp/compile_service/kqp_compile_actor.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/host/kqp_host.cpp | 8 | ||||
-rw-r--r-- | ydb/core/kqp/host/kqp_host.h | 4 | ||||
-rw-r--r-- | ydb/core/kqp/opt/kqp_opt_build_phy_query.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/query_compiler/kqp_query_compiler.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp | 90 | ||||
-rw-r--r-- | ydb/core/protos/kqp_physical.proto | 2 |
9 files changed, 103 insertions, 29 deletions
diff --git a/ydb/core/kqp/common/kqp_yql.cpp b/ydb/core/kqp/common/kqp_yql.cpp index 7a5a8417f1..32294b86b9 100644 --- a/ydb/core/kqp/common/kqp_yql.cpp +++ b/ydb/core/kqp/common/kqp_yql.cpp @@ -15,9 +15,9 @@ static EPhysicalQueryType GetPhysicalQueryType(const TStringBuf& value) { } else if (value == "scan_query") { return EPhysicalQueryType::Scan; } else if (value == "query") { - return EPhysicalQueryType::Query; - } else if (value == "federated_query") { - return EPhysicalQueryType::FederatedQuery; + return EPhysicalQueryType::GenericQuery; + } else if (value == "script") { + return EPhysicalQueryType::GenericScript; } else { YQL_ENSURE(false, "Unknown physical query type: " << value); } @@ -31,10 +31,10 @@ static TStringBuf PhysicalQueryTypeToString(EPhysicalQueryType type) { return "data_query"; case EPhysicalQueryType::Scan: return "scan_query"; - case EPhysicalQueryType::Query: + case EPhysicalQueryType::GenericQuery: return "query"; - case EPhysicalQueryType::FederatedQuery: - return "federated_query"; + case EPhysicalQueryType::GenericScript: + return "script"; } YQL_ENSURE(false, "Unexpected physical query type: " << type); diff --git a/ydb/core/kqp/common/kqp_yql.h b/ydb/core/kqp/common/kqp_yql.h index 5ba9190ec5..293f98c90d 100644 --- a/ydb/core/kqp/common/kqp_yql.h +++ b/ydb/core/kqp/common/kqp_yql.h @@ -12,8 +12,8 @@ enum class EPhysicalQueryType { Unspecified, Data, Scan, - Query, - FederatedQuery, + GenericQuery, + GenericScript, }; struct TKqpPhyQuerySettings { diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index 7158e4ea90..a1fedcc1cd 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -127,11 +127,11 @@ public: break; case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY: - AsyncCompileResult = KqpHost->PrepareQuery(QueryRef, prepareSettings); + AsyncCompileResult = KqpHost->PrepareGenericQuery(QueryRef, prepareSettings); break; case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT: - AsyncCompileResult = KqpHost->PrepareFederatedQuery(QueryRef, prepareSettings); + AsyncCompileResult = KqpHost->PrepareGenericScript(QueryRef, prepareSettings); break; default: diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index 0ca852010b..b7da42e058 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -967,14 +967,14 @@ public: }); } - IAsyncQueryResultPtr PrepareQuery(const TKqpQueryRef& query, const TPrepareSettings& settings) override { + IAsyncQueryResultPtr PrepareGenericQuery(const TKqpQueryRef& query, const TPrepareSettings& settings) override { return CheckedProcessQuery(*ExprCtx, [this, &query, settings] (TExprContext& ctx) mutable { return PrepareQueryInternal(query, EKikimrQueryType::Query, settings, ctx); }); } - IAsyncQueryResultPtr PrepareFederatedQuery(const TKqpQueryRef& query, const TPrepareSettings& settings) override { + IAsyncQueryResultPtr PrepareGenericScript(const TKqpQueryRef& query, const TPrepareSettings& settings) override { return CheckedProcessQuery(*ExprCtx, [this, &query, settings] (TExprContext& ctx) mutable { return PrepareQueryInternal(query, EKikimrQueryType::Script, settings, ctx); @@ -1059,7 +1059,7 @@ private: NSQLTranslation::TTranslationSettings settings{}; // TODO: remove this test crutch when dynamic bindings discovery will be implemented // YQ-1964 - if (SessionCtx->Query().Type == EKikimrQueryType::Script && GetEnv("TEST_S3_CONNECTION")) { + if ((SessionCtx->Query().Type == EKikimrQueryType::Script || SessionCtx->Query().Type == EKikimrQueryType::Query) && GetEnv("TEST_S3_CONNECTION")) { NSQLTranslation::TTableBindingSettings binding; binding.ClusterType = "s3"; binding.Settings["cluster"] = GetEnv("TEST_S3_CONNECTION"); @@ -1501,7 +1501,7 @@ private: } void Init(EKikimrQueryType queryType) { - if (queryType == EKikimrQueryType::Script) { + if (queryType == EKikimrQueryType::Script || queryType == EKikimrQueryType::Query) { InitS3Provider(); } diff --git a/ydb/core/kqp/host/kqp_host.h b/ydb/core/kqp/host/kqp_host.h index 5c1cf64438..a599f88178 100644 --- a/ydb/core/kqp/host/kqp_host.h +++ b/ydb/core/kqp/host/kqp_host.h @@ -62,10 +62,10 @@ public: virtual IAsyncQueryResultPtr ExplainScanQuery(const TKqpQueryRef& query, bool isSql) = 0; /* Generic queries */ - virtual IAsyncQueryResultPtr PrepareQuery(const TKqpQueryRef& query, const TPrepareSettings& settings) = 0; + virtual IAsyncQueryResultPtr PrepareGenericQuery(const TKqpQueryRef& query, const TPrepareSettings& settings) = 0; /* Federated queries */ - virtual IAsyncQueryResultPtr PrepareFederatedQuery(const TKqpQueryRef& query, const TPrepareSettings& settings) = 0; + virtual IAsyncQueryResultPtr PrepareGenericScript(const TKqpQueryRef& query, const TPrepareSettings& settings) = 0; /* Scripting */ virtual IAsyncQueryResultPtr ValidateYqlScript(const TKqpQueryRef& script) = 0; diff --git a/ydb/core/kqp/opt/kqp_opt_build_phy_query.cpp b/ydb/core/kqp/opt/kqp_opt_build_phy_query.cpp index bb64d15129..a07f408842 100644 --- a/ydb/core/kqp/opt/kqp_opt_build_phy_query.cpp +++ b/ydb/core/kqp/opt/kqp_opt_build_phy_query.cpp @@ -34,11 +34,11 @@ public: break; } case EKikimrQueryType::Query: { - querySettings.Type = EPhysicalQueryType::Query; + querySettings.Type = EPhysicalQueryType::GenericQuery; break; } case EKikimrQueryType::Script: { - querySettings.Type = EPhysicalQueryType::FederatedQuery; + querySettings.Type = EPhysicalQueryType::GenericScript; break; } default: { diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index ee3487a959..9c4f31b9a1 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -46,8 +46,8 @@ NKqpProto::TKqpPhyQuery::EType GetPhyQueryType(const EPhysicalQueryType& type) { switch (type) { case EPhysicalQueryType::Data: return NKqpProto::TKqpPhyQuery::TYPE_DATA; case EPhysicalQueryType::Scan: return NKqpProto::TKqpPhyQuery::TYPE_SCAN; - case EPhysicalQueryType::Query: return NKqpProto::TKqpPhyQuery::TYPE_QUERY; - case EPhysicalQueryType::FederatedQuery: return NKqpProto::TKqpPhyQuery::TYPE_FEDERATED_QUERY; + case EPhysicalQueryType::GenericQuery: return NKqpProto::TKqpPhyQuery::TYPE_QUERY; + case EPhysicalQueryType::GenericScript: return NKqpProto::TKqpPhyQuery::TYPE_SCRIPT; case EPhysicalQueryType::Unspecified: break; diff --git a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp index b6b90a285f..6e6bd8c7ce 100644 --- a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp @@ -123,7 +123,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { const TString externalDataSourceName = "/Root/external_data_source"; const TString externalTableName = "/Root/test_binding_resolve"; const TString bucket = "test_bucket1"; - const TString object = "Root/test_object"; + const TString object = "test_object"; CreateBucketWithObject(bucket, object, TEST_CONTENT); @@ -154,10 +154,12 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { auto result = session.ExecuteSchemeQuery(query).GetValueSync(); UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + const TString sql = fmt::format(R"( + SELECT * FROM `{external_table}` + )", "external_table"_a=externalTableName); + auto db = kikimr.GetQueryClient(); - auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"( - SELECT * FROM `{external_table}` - )", "external_table"_a=externalTableName)).ExtractValueSync(); + auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); @@ -179,12 +181,84 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world"); } + Y_UNIT_TEST(ExecuteQueryWithExternalTableResolve) { + using namespace fmt::literals; + const TString externalDataSourceName = "/Root/external_data_source"; + const TString externalTableName = "/Root/test_binding_resolve"; + const TString bucket = "test_bucket_execute_query"; + const TString object = "test_object"; + + CreateBucketWithObject(bucket, object, TEST_CONTENT); + + auto kikimr = DefaultKikimrRunner(); + kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true); + + auto tc = kikimr.GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format(R"( + CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{location}", + AUTH_METHOD="NONE" + ); + CREATE EXTERNAL TABLE `{external_table}` ( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ) WITH ( + DATA_SOURCE="{external_source}", + LOCATION="{object}", + FORMAT="json_each_row" + );)", + "external_source"_a = externalDataSourceName, + "external_table"_a = externalTableName, + "location"_a = GetEnv("S3_ENDPOINT") + "/" + bucket + "/", + "object"_a = object + ); + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + const TString sql = fmt::format(R"( + SELECT * FROM `{external_table}` + )", "external_table"_a=externalTableName); + + auto db = kikimr.GetQueryClient(); + auto executeQueryIterator = db.StreamExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + + size_t currentRow = 0; + while (true) { + auto part = executeQueryIterator.ReadNext().ExtractValueSync(); + if (!part.IsSuccess()) { + UNIT_ASSERT_C(part.EOS(), part.GetIssues().ToString()); + break; + } + + UNIT_ASSERT(part.HasResultSet()); + + auto result = part.GetResultSet(); + + TResultSetParser resultSet(result); + while (resultSet.TryNextRow()) { + if (currentRow == 0) { + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo"); + } else if (currentRow == 1) { + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world"); + } else { + UNIT_ASSERT(false); + } + ++currentRow; + } + UNIT_ASSERT(currentRow > 0); + } + } + Y_UNIT_TEST(ExecuteScriptWithDataSource) { using namespace fmt::literals; const TString externalDataSourceName = "/Root/external_data_source"; const TString bucket = "test_bucket3"; - CreateBucketWithObject(bucket, "Root/test_object", TEST_CONTENT); + CreateBucketWithObject(bucket, "test_object", TEST_CONTENT); auto kikimr = DefaultKikimrRunner(); kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true); @@ -236,7 +310,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { const TString ydbTable = "/Root/ydb_table"; const TString bucket = "test_bucket4"; - CreateBucketWithObject(bucket, "Root/test_object", TEST_CONTENT); + CreateBucketWithObject(bucket, "test_object", TEST_CONTENT); auto kikimr = DefaultKikimrRunner(); kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true); @@ -311,7 +385,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { const TString externalDataSourceName = "/Root/external_data_source"; const TString externalTableName = "/Root/test_binding_resolve"; const TString bucket = "test_bucket5"; - const TString object = "Root/test_object"; + const TString object = "test_object"; CreateBucketWithObject(bucket, object, TEST_CONTENT); @@ -376,7 +450,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { const TString ydbTable = "/Root/ydb_table"; const TString bucket = "test_bucket6"; - CreateBucketWithObject(bucket, "Root/test_object", TEST_CONTENT); + CreateBucketWithObject(bucket, "test_object", TEST_CONTENT); auto kikimr = DefaultKikimrRunner(); kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true); diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto index 3d9aa849c5..3149502895 100644 --- a/ydb/core/protos/kqp_physical.proto +++ b/ydb/core/protos/kqp_physical.proto @@ -370,7 +370,7 @@ message TKqpPhyQuery { TYPE_DATA = 1; TYPE_SCAN = 2; TYPE_QUERY = 3; - TYPE_FEDERATED_QUERY = 4; + TYPE_SCRIPT = 4; }; EType Type = 1; |