aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgalaxycrab <UgnineSirdis@ydb.tech>2023-05-26 23:26:06 +0300
committergalaxycrab <UgnineSirdis@ydb.tech>2023-05-26 23:26:06 +0300
commit4eb34319ae07ef0fd4a7e7a9f3bd07c15dbb724d (patch)
tree6052f659b0379330eb0774c0f46836b42fdbffdf
parent025c78f0683aef81d9d392bfbf4bb5b12da684a9 (diff)
downloadydb-4eb34319ae07ef0fd4a7e7a9f3bd07c15dbb724d.tar.gz
Support ExecuteQuery with federated data sources
-rw-r--r--ydb/core/kqp/common/kqp_yql.cpp12
-rw-r--r--ydb/core/kqp/common/kqp_yql.h4
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_actor.cpp4
-rw-r--r--ydb/core/kqp/host/kqp_host.cpp8
-rw-r--r--ydb/core/kqp/host/kqp_host.h4
-rw-r--r--ydb/core/kqp/opt/kqp_opt_build_phy_query.cpp4
-rw-r--r--ydb/core/kqp/query_compiler/kqp_query_compiler.cpp4
-rw-r--r--ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp90
-rw-r--r--ydb/core/protos/kqp_physical.proto2
9 files changed, 103 insertions, 29 deletions
diff --git a/ydb/core/kqp/common/kqp_yql.cpp b/ydb/core/kqp/common/kqp_yql.cpp
index 7a5a8417f1..32294b86b9 100644
--- a/ydb/core/kqp/common/kqp_yql.cpp
+++ b/ydb/core/kqp/common/kqp_yql.cpp
@@ -15,9 +15,9 @@ static EPhysicalQueryType GetPhysicalQueryType(const TStringBuf& value) {
} else if (value == "scan_query") {
return EPhysicalQueryType::Scan;
} else if (value == "query") {
- return EPhysicalQueryType::Query;
- } else if (value == "federated_query") {
- return EPhysicalQueryType::FederatedQuery;
+ return EPhysicalQueryType::GenericQuery;
+ } else if (value == "script") {
+ return EPhysicalQueryType::GenericScript;
} else {
YQL_ENSURE(false, "Unknown physical query type: " << value);
}
@@ -31,10 +31,10 @@ static TStringBuf PhysicalQueryTypeToString(EPhysicalQueryType type) {
return "data_query";
case EPhysicalQueryType::Scan:
return "scan_query";
- case EPhysicalQueryType::Query:
+ case EPhysicalQueryType::GenericQuery:
return "query";
- case EPhysicalQueryType::FederatedQuery:
- return "federated_query";
+ case EPhysicalQueryType::GenericScript:
+ return "script";
}
YQL_ENSURE(false, "Unexpected physical query type: " << type);
diff --git a/ydb/core/kqp/common/kqp_yql.h b/ydb/core/kqp/common/kqp_yql.h
index 5ba9190ec5..293f98c90d 100644
--- a/ydb/core/kqp/common/kqp_yql.h
+++ b/ydb/core/kqp/common/kqp_yql.h
@@ -12,8 +12,8 @@ enum class EPhysicalQueryType {
Unspecified,
Data,
Scan,
- Query,
- FederatedQuery,
+ GenericQuery,
+ GenericScript,
};
struct TKqpPhyQuerySettings {
diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
index 7158e4ea90..a1fedcc1cd 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
@@ -127,11 +127,11 @@ public:
break;
case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY:
- AsyncCompileResult = KqpHost->PrepareQuery(QueryRef, prepareSettings);
+ AsyncCompileResult = KqpHost->PrepareGenericQuery(QueryRef, prepareSettings);
break;
case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT:
- AsyncCompileResult = KqpHost->PrepareFederatedQuery(QueryRef, prepareSettings);
+ AsyncCompileResult = KqpHost->PrepareGenericScript(QueryRef, prepareSettings);
break;
default:
diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp
index 0ca852010b..b7da42e058 100644
--- a/ydb/core/kqp/host/kqp_host.cpp
+++ b/ydb/core/kqp/host/kqp_host.cpp
@@ -967,14 +967,14 @@ public:
});
}
- IAsyncQueryResultPtr PrepareQuery(const TKqpQueryRef& query, const TPrepareSettings& settings) override {
+ IAsyncQueryResultPtr PrepareGenericQuery(const TKqpQueryRef& query, const TPrepareSettings& settings) override {
return CheckedProcessQuery(*ExprCtx,
[this, &query, settings] (TExprContext& ctx) mutable {
return PrepareQueryInternal(query, EKikimrQueryType::Query, settings, ctx);
});
}
- IAsyncQueryResultPtr PrepareFederatedQuery(const TKqpQueryRef& query, const TPrepareSettings& settings) override {
+ IAsyncQueryResultPtr PrepareGenericScript(const TKqpQueryRef& query, const TPrepareSettings& settings) override {
return CheckedProcessQuery(*ExprCtx,
[this, &query, settings] (TExprContext& ctx) mutable {
return PrepareQueryInternal(query, EKikimrQueryType::Script, settings, ctx);
@@ -1059,7 +1059,7 @@ private:
NSQLTranslation::TTranslationSettings settings{};
// TODO: remove this test crutch when dynamic bindings discovery will be implemented // YQ-1964
- if (SessionCtx->Query().Type == EKikimrQueryType::Script && GetEnv("TEST_S3_CONNECTION")) {
+ if ((SessionCtx->Query().Type == EKikimrQueryType::Script || SessionCtx->Query().Type == EKikimrQueryType::Query) && GetEnv("TEST_S3_CONNECTION")) {
NSQLTranslation::TTableBindingSettings binding;
binding.ClusterType = "s3";
binding.Settings["cluster"] = GetEnv("TEST_S3_CONNECTION");
@@ -1501,7 +1501,7 @@ private:
}
void Init(EKikimrQueryType queryType) {
- if (queryType == EKikimrQueryType::Script) {
+ if (queryType == EKikimrQueryType::Script || queryType == EKikimrQueryType::Query) {
InitS3Provider();
}
diff --git a/ydb/core/kqp/host/kqp_host.h b/ydb/core/kqp/host/kqp_host.h
index 5c1cf64438..a599f88178 100644
--- a/ydb/core/kqp/host/kqp_host.h
+++ b/ydb/core/kqp/host/kqp_host.h
@@ -62,10 +62,10 @@ public:
virtual IAsyncQueryResultPtr ExplainScanQuery(const TKqpQueryRef& query, bool isSql) = 0;
/* Generic queries */
- virtual IAsyncQueryResultPtr PrepareQuery(const TKqpQueryRef& query, const TPrepareSettings& settings) = 0;
+ virtual IAsyncQueryResultPtr PrepareGenericQuery(const TKqpQueryRef& query, const TPrepareSettings& settings) = 0;
/* Federated queries */
- virtual IAsyncQueryResultPtr PrepareFederatedQuery(const TKqpQueryRef& query, const TPrepareSettings& settings) = 0;
+ virtual IAsyncQueryResultPtr PrepareGenericScript(const TKqpQueryRef& query, const TPrepareSettings& settings) = 0;
/* Scripting */
virtual IAsyncQueryResultPtr ValidateYqlScript(const TKqpQueryRef& script) = 0;
diff --git a/ydb/core/kqp/opt/kqp_opt_build_phy_query.cpp b/ydb/core/kqp/opt/kqp_opt_build_phy_query.cpp
index bb64d15129..a07f408842 100644
--- a/ydb/core/kqp/opt/kqp_opt_build_phy_query.cpp
+++ b/ydb/core/kqp/opt/kqp_opt_build_phy_query.cpp
@@ -34,11 +34,11 @@ public:
break;
}
case EKikimrQueryType::Query: {
- querySettings.Type = EPhysicalQueryType::Query;
+ querySettings.Type = EPhysicalQueryType::GenericQuery;
break;
}
case EKikimrQueryType::Script: {
- querySettings.Type = EPhysicalQueryType::FederatedQuery;
+ querySettings.Type = EPhysicalQueryType::GenericScript;
break;
}
default: {
diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
index ee3487a959..9c4f31b9a1 100644
--- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
+++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
@@ -46,8 +46,8 @@ NKqpProto::TKqpPhyQuery::EType GetPhyQueryType(const EPhysicalQueryType& type) {
switch (type) {
case EPhysicalQueryType::Data: return NKqpProto::TKqpPhyQuery::TYPE_DATA;
case EPhysicalQueryType::Scan: return NKqpProto::TKqpPhyQuery::TYPE_SCAN;
- case EPhysicalQueryType::Query: return NKqpProto::TKqpPhyQuery::TYPE_QUERY;
- case EPhysicalQueryType::FederatedQuery: return NKqpProto::TKqpPhyQuery::TYPE_FEDERATED_QUERY;
+ case EPhysicalQueryType::GenericQuery: return NKqpProto::TKqpPhyQuery::TYPE_QUERY;
+ case EPhysicalQueryType::GenericScript: return NKqpProto::TKqpPhyQuery::TYPE_SCRIPT;
case EPhysicalQueryType::Unspecified:
break;
diff --git a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp
index b6b90a285f..6e6bd8c7ce 100644
--- a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp
+++ b/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp
@@ -123,7 +123,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
const TString externalDataSourceName = "/Root/external_data_source";
const TString externalTableName = "/Root/test_binding_resolve";
const TString bucket = "test_bucket1";
- const TString object = "Root/test_object";
+ const TString object = "test_object";
CreateBucketWithObject(bucket, object, TEST_CONTENT);
@@ -154,10 +154,12 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+ const TString sql = fmt::format(R"(
+ SELECT * FROM `{external_table}`
+ )", "external_table"_a=externalTableName);
+
auto db = kikimr.GetQueryClient();
- auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"(
- SELECT * FROM `{external_table}`
- )", "external_table"_a=externalTableName)).ExtractValueSync();
+ auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId);
@@ -179,12 +181,84 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world");
}
+ Y_UNIT_TEST(ExecuteQueryWithExternalTableResolve) {
+ using namespace fmt::literals;
+ const TString externalDataSourceName = "/Root/external_data_source";
+ const TString externalTableName = "/Root/test_binding_resolve";
+ const TString bucket = "test_bucket_execute_query";
+ const TString object = "test_object";
+
+ CreateBucketWithObject(bucket, object, TEST_CONTENT);
+
+ auto kikimr = DefaultKikimrRunner();
+ kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
+
+ auto tc = kikimr.GetTableClient();
+ auto session = tc.CreateSession().GetValueSync().GetSession();
+ const TString query = fmt::format(R"(
+ CREATE EXTERNAL DATA SOURCE `{external_source}` WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="{location}",
+ AUTH_METHOD="NONE"
+ );
+ CREATE EXTERNAL TABLE `{external_table}` (
+ key Utf8 NOT NULL,
+ value Utf8 NOT NULL
+ ) WITH (
+ DATA_SOURCE="{external_source}",
+ LOCATION="{object}",
+ FORMAT="json_each_row"
+ );)",
+ "external_source"_a = externalDataSourceName,
+ "external_table"_a = externalTableName,
+ "location"_a = GetEnv("S3_ENDPOINT") + "/" + bucket + "/",
+ "object"_a = object
+ );
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+
+ const TString sql = fmt::format(R"(
+ SELECT * FROM `{external_table}`
+ )", "external_table"_a=externalTableName);
+
+ auto db = kikimr.GetQueryClient();
+ auto executeQueryIterator = db.StreamExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+
+ size_t currentRow = 0;
+ while (true) {
+ auto part = executeQueryIterator.ReadNext().ExtractValueSync();
+ if (!part.IsSuccess()) {
+ UNIT_ASSERT_C(part.EOS(), part.GetIssues().ToString());
+ break;
+ }
+
+ UNIT_ASSERT(part.HasResultSet());
+
+ auto result = part.GetResultSet();
+
+ TResultSetParser resultSet(result);
+ while (resultSet.TryNextRow()) {
+ if (currentRow == 0) {
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "1");
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "trololo");
+ } else if (currentRow == 1) {
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetUtf8(), "2");
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetUtf8(), "hello world");
+ } else {
+ UNIT_ASSERT(false);
+ }
+ ++currentRow;
+ }
+ UNIT_ASSERT(currentRow > 0);
+ }
+ }
+
Y_UNIT_TEST(ExecuteScriptWithDataSource) {
using namespace fmt::literals;
const TString externalDataSourceName = "/Root/external_data_source";
const TString bucket = "test_bucket3";
- CreateBucketWithObject(bucket, "Root/test_object", TEST_CONTENT);
+ CreateBucketWithObject(bucket, "test_object", TEST_CONTENT);
auto kikimr = DefaultKikimrRunner();
kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
@@ -236,7 +310,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
const TString ydbTable = "/Root/ydb_table";
const TString bucket = "test_bucket4";
- CreateBucketWithObject(bucket, "Root/test_object", TEST_CONTENT);
+ CreateBucketWithObject(bucket, "test_object", TEST_CONTENT);
auto kikimr = DefaultKikimrRunner();
kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
@@ -311,7 +385,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
const TString externalDataSourceName = "/Root/external_data_source";
const TString externalTableName = "/Root/test_binding_resolve";
const TString bucket = "test_bucket5";
- const TString object = "Root/test_object";
+ const TString object = "test_object";
CreateBucketWithObject(bucket, object, TEST_CONTENT);
@@ -376,7 +450,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
const TString ydbTable = "/Root/ydb_table";
const TString bucket = "test_bucket6";
- CreateBucketWithObject(bucket, "Root/test_object", TEST_CONTENT);
+ CreateBucketWithObject(bucket, "test_object", TEST_CONTENT);
auto kikimr = DefaultKikimrRunner();
kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto
index 3d9aa849c5..3149502895 100644
--- a/ydb/core/protos/kqp_physical.proto
+++ b/ydb/core/protos/kqp_physical.proto
@@ -370,7 +370,7 @@ message TKqpPhyQuery {
TYPE_DATA = 1;
TYPE_SCAN = 2;
TYPE_QUERY = 3;
- TYPE_FEDERATED_QUERY = 4;
+ TYPE_SCRIPT = 4;
};
EType Type = 1;