aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikita Vasilev <ns-vasilev@ydb.tech>2024-07-08 15:04:34 +0300
committerGitHub <noreply@github.com>2024-07-08 15:04:34 +0300
commit6e9dd6ac5156f9af648e967854f73d8c631bcd43 (patch)
tree6ab5edd2e5436822093dcf68f58d3609286a0774
parentf94fdc342be22a31c40a6e49f600f87abd166f90 (diff)
downloadydb-6e9dd6ac5156f9af648e967854f73d8c631bcd43.tar.gz
Several CTAS (#6330)
-rw-r--r--ydb/core/kqp/common/compilation/events.h10
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_actor.cpp3
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_service.cpp17
-rw-r--r--ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp55
4 files changed, 79 insertions, 6 deletions
diff --git a/ydb/core/kqp/common/compilation/events.h b/ydb/core/kqp/common/compilation/events.h
index 062148adf75..7b43ccd4e31 100644
--- a/ydb/core/kqp/common/compilation/events.h
+++ b/ydb/core/kqp/common/compilation/events.h
@@ -76,7 +76,8 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::
const TMaybe<TKqpQueryId>& query, bool isQueryActionPrepare, TInstant deadline,
TKqpDbCountersPtr dbCounters, const TGUCSettings::TPtr& gUCSettings, const TMaybe<TString>& applicationName,
std::shared_ptr<std::atomic<bool>> intrestedInResult, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
- NLWTrace::TOrbit orbit = {}, TKqpTempTablesState::TConstPtr tempTablesState = nullptr, TMaybe<TQueryAst> queryAst = Nothing())
+ NLWTrace::TOrbit orbit = {}, TKqpTempTablesState::TConstPtr tempTablesState = nullptr, TMaybe<TQueryAst> queryAst = Nothing(),
+ bool split = false, NYql::TExprContext* splitCtx = nullptr, NYql::TExprNode::TPtr splitExpr = nullptr)
: UserToken(userToken)
, Uid(uid)
, Query(query)
@@ -90,6 +91,9 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::
, TempTablesState(std::move(tempTablesState))
, IntrestedInResult(std::move(intrestedInResult))
, QueryAst(queryAst)
+ , Split(split)
+ , SplitCtx(splitCtx)
+ , SplitExpr(splitExpr)
{
}
@@ -110,6 +114,10 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::
std::shared_ptr<std::atomic<bool>> IntrestedInResult;
TMaybe<TQueryAst> QueryAst;
+ bool Split = false;
+
+ NYql::TExprContext* SplitCtx = nullptr;
+ NYql::TExprNode::TPtr SplitExpr = nullptr;
};
struct TEvCompileResponse: public TEventLocal<TEvCompileResponse, TKqpEvents::EvCompileResponse> {
diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
index 776d2c53458..ccce7d60b74 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
@@ -175,8 +175,7 @@ private:
YQL_ENSURE(PerStatementResult);
const auto prepareSettings = PrepareCompilationSettings(ctx);
-
- auto result = KqpHost->SplitQuery(QueryId.Text, prepareSettings);
+ auto result = KqpHost->SplitQuery(QueryRef, prepareSettings);
Become(&TKqpCompileActor::CompileState);
ReplySplitResult(ctx, std::move(result));
diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp
index ff52fe2277f..c6dc3b19d36 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp
@@ -611,6 +611,7 @@ private:
<< ", queryUid: " << (request.Uid ? *request.Uid : "<empty>")
<< ", queryText: \"" << (request.Query ? EscapeC(request.Query->Text) : "<empty>") << "\""
<< ", keepInCache: " << request.KeepInCache
+ << ", split: " << request.Split
<< *request.UserRequestContext);
*Counters->CompileQueryCacheSize = QueryCache.Size();
@@ -697,7 +698,7 @@ private:
request.Deadline,
ev->Get()->Split
? ECompileActorAction::SPLIT
- : TableServiceConfig.GetEnableAstCache()
+ : (TableServiceConfig.GetEnableAstCache() && !request.QueryAst)
? ECompileActorAction::PARSE
: ECompileActorAction::COMPILE);
TKqpCompileRequest compileRequest(ev->Sender, CreateGuidAsString(), std::move(*request.Query),
@@ -760,7 +761,16 @@ private:
NWilson::TSpan compileServiceSpan(TWilsonKqp::CompileService, ev->Get() ? std::move(ev->TraceId) : NWilson::TTraceId(), "CompileService");
- TKqpCompileSettings compileSettings(true, request.IsQueryActionPrepare, false, request.Deadline, TableServiceConfig.GetEnableAstCache() ? ECompileActorAction::PARSE : ECompileActorAction::COMPILE);
+ TKqpCompileSettings compileSettings(
+ true,
+ request.IsQueryActionPrepare,
+ false,
+ request.Deadline,
+ ev->Get()->Split
+ ? ECompileActorAction::SPLIT
+ : (TableServiceConfig.GetEnableAstCache() && !request.QueryAst)
+ ? ECompileActorAction::PARSE
+ : ECompileActorAction::COMPILE);
TKqpCompileRequest compileRequest(ev->Sender, request.Uid, request.Query ? *request.Query : *compileResult->Query,
compileSettings, request.UserToken, dbCounters, request.GUCSettings, request.ApplicationName,
ev->Cookie, std::move(ev->Get()->IntrestedInResult),
@@ -824,6 +834,7 @@ private:
if (compileResult->NeedToSplit) {
Reply(compileRequest.Sender, compileResult, compileStats, ctx,
compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan), (CollectDiagnostics ? ev->Get()->ReplayMessageUserView : std::nullopt));
+ ProcessQueue(ctx);
return;
}
@@ -961,7 +972,6 @@ private:
compileRequest.Orbit,
compileRequest.Query.UserSid);
- compileRequest.CompileSettings.Action = ECompileActorAction::COMPILE;
compileRequest.QueryAst = std::move(queryAst);
if (!RequestsQueue.Enqueue(std::move(compileRequest))) {
@@ -994,6 +1004,7 @@ private:
return;
}
+ compileRequest.CompileSettings.Action = ECompileActorAction::COMPILE;
CompileByAst(astStatements.front(), compileRequest, ctx);
}
diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
index 1469403863a..f9f6a626a45 100644
--- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
@@ -2566,6 +2566,61 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
}
}
+ Y_UNIT_TEST(SeveralCTAS) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
+ appConfig.MutableTableServiceConfig()->SetEnableAstCache(true);
+ appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
+ appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
+ appConfig.MutableTableServiceConfig()->SetEnableCreateTableAs(true);
+ appConfig.MutableTableServiceConfig()->SetEnablePerStatementQueryExecution(true);
+ auto setting = NKikimrKqp::TKqpSetting();
+ auto serverSettings = TKikimrSettings()
+ .SetAppConfig(appConfig)
+ .SetKqpSettings({setting})
+ .SetWithSampleTables(false)
+ .SetEnableTempTables(true);
+
+ TKikimrRunner kikimr(serverSettings);
+ auto db = kikimr.GetQueryClient();
+
+ {
+ auto result = db.ExecuteQuery(R"(
+ CREATE TABLE Table1 (
+ PRIMARY KEY (Key)
+ ) AS SELECT 1u AS Key, "1" AS Value1, "1" AS Value2;
+ CREATE TABLE Table2 (
+ PRIMARY KEY (Key)
+ ) AS SELECT 2u AS Key, "2" AS Value1, "2" AS Value2;
+ CREATE TABLE Table3 (
+ PRIMARY KEY (Key)
+ ) AS SELECT * FROM Table2 UNION ALL SELECT * FROM Table1;
+ SELECT * FROM Table1 ORDER BY Key;
+ SELECT * FROM Table2 ORDER BY Key;
+ SELECT * FROM Table3 ORDER BY Key;
+ )", TTxControl::NoTx(), TExecuteQuerySettings().ClientTimeout(TDuration::MilliSeconds(500))).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 3);
+ // Results are empty. Snapshot was taken before tables were created, so we don't see changes after snapshot.
+ // This will be fixed in future, for example, by implicit commit before/after each ddl statement.
+ CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0)));
+ CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(1)));
+ CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(2)));
+
+ result = db.ExecuteQuery(R"(
+ SELECT * FROM Table1 ORDER BY Key;
+ SELECT * FROM Table2 ORDER BY Key;
+ SELECT * FROM Table3 ORDER BY Key;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 3);
+ CompareYson(R"([[[1u];["1"];["1"]]])", FormatResultSetYson(result.GetResultSet(0)));
+ CompareYson(R"([[[2u];["2"];["2"]]])", FormatResultSetYson(result.GetResultSet(1)));
+ // Also empty now(
+ CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(2)));
+ }
+ }
+
Y_UNIT_TEST(TableSink_ReplaceFromSelectOlap) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);