diff options
author | Nikita Vasilev <ns-vasilev@ydb.tech> | 2024-07-08 15:04:34 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-07-08 15:04:34 +0300 |
commit | 6e9dd6ac5156f9af648e967854f73d8c631bcd43 (patch) | |
tree | 6ab5edd2e5436822093dcf68f58d3609286a0774 | |
parent | f94fdc342be22a31c40a6e49f600f87abd166f90 (diff) | |
download | ydb-6e9dd6ac5156f9af648e967854f73d8c631bcd43.tar.gz |
Several CTAS (#6330)
-rw-r--r-- | ydb/core/kqp/common/compilation/events.h | 10 | ||||
-rw-r--r-- | ydb/core/kqp/compile_service/kqp_compile_actor.cpp | 3 | ||||
-rw-r--r-- | ydb/core/kqp/compile_service/kqp_compile_service.cpp | 17 | ||||
-rw-r--r-- | ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp | 55 |
4 files changed, 79 insertions, 6 deletions
diff --git a/ydb/core/kqp/common/compilation/events.h b/ydb/core/kqp/common/compilation/events.h index 062148adf75..7b43ccd4e31 100644 --- a/ydb/core/kqp/common/compilation/events.h +++ b/ydb/core/kqp/common/compilation/events.h @@ -76,7 +76,8 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents:: const TMaybe<TKqpQueryId>& query, bool isQueryActionPrepare, TInstant deadline, TKqpDbCountersPtr dbCounters, const TGUCSettings::TPtr& gUCSettings, const TMaybe<TString>& applicationName, std::shared_ptr<std::atomic<bool>> intrestedInResult, const TIntrusivePtr<TUserRequestContext>& userRequestContext, - NLWTrace::TOrbit orbit = {}, TKqpTempTablesState::TConstPtr tempTablesState = nullptr, TMaybe<TQueryAst> queryAst = Nothing()) + NLWTrace::TOrbit orbit = {}, TKqpTempTablesState::TConstPtr tempTablesState = nullptr, TMaybe<TQueryAst> queryAst = Nothing(), + bool split = false, NYql::TExprContext* splitCtx = nullptr, NYql::TExprNode::TPtr splitExpr = nullptr) : UserToken(userToken) , Uid(uid) , Query(query) @@ -90,6 +91,9 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents:: , TempTablesState(std::move(tempTablesState)) , IntrestedInResult(std::move(intrestedInResult)) , QueryAst(queryAst) + , Split(split) + , SplitCtx(splitCtx) + , SplitExpr(splitExpr) { } @@ -110,6 +114,10 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents:: std::shared_ptr<std::atomic<bool>> IntrestedInResult; TMaybe<TQueryAst> QueryAst; + bool Split = false; + + NYql::TExprContext* SplitCtx = nullptr; + NYql::TExprNode::TPtr SplitExpr = nullptr; }; struct TEvCompileResponse: public TEventLocal<TEvCompileResponse, TKqpEvents::EvCompileResponse> { diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index 776d2c53458..ccce7d60b74 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -175,8 +175,7 @@ private: YQL_ENSURE(PerStatementResult); const auto prepareSettings = PrepareCompilationSettings(ctx); - - auto result = KqpHost->SplitQuery(QueryId.Text, prepareSettings); + auto result = KqpHost->SplitQuery(QueryRef, prepareSettings); Become(&TKqpCompileActor::CompileState); ReplySplitResult(ctx, std::move(result)); diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp index ff52fe2277f..c6dc3b19d36 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp @@ -611,6 +611,7 @@ private: << ", queryUid: " << (request.Uid ? *request.Uid : "<empty>") << ", queryText: \"" << (request.Query ? EscapeC(request.Query->Text) : "<empty>") << "\"" << ", keepInCache: " << request.KeepInCache + << ", split: " << request.Split << *request.UserRequestContext); *Counters->CompileQueryCacheSize = QueryCache.Size(); @@ -697,7 +698,7 @@ private: request.Deadline, ev->Get()->Split ? ECompileActorAction::SPLIT - : TableServiceConfig.GetEnableAstCache() + : (TableServiceConfig.GetEnableAstCache() && !request.QueryAst) ? ECompileActorAction::PARSE : ECompileActorAction::COMPILE); TKqpCompileRequest compileRequest(ev->Sender, CreateGuidAsString(), std::move(*request.Query), @@ -760,7 +761,16 @@ private: NWilson::TSpan compileServiceSpan(TWilsonKqp::CompileService, ev->Get() ? std::move(ev->TraceId) : NWilson::TTraceId(), "CompileService"); - TKqpCompileSettings compileSettings(true, request.IsQueryActionPrepare, false, request.Deadline, TableServiceConfig.GetEnableAstCache() ? ECompileActorAction::PARSE : ECompileActorAction::COMPILE); + TKqpCompileSettings compileSettings( + true, + request.IsQueryActionPrepare, + false, + request.Deadline, + ev->Get()->Split + ? ECompileActorAction::SPLIT + : (TableServiceConfig.GetEnableAstCache() && !request.QueryAst) + ? ECompileActorAction::PARSE + : ECompileActorAction::COMPILE); TKqpCompileRequest compileRequest(ev->Sender, request.Uid, request.Query ? *request.Query : *compileResult->Query, compileSettings, request.UserToken, dbCounters, request.GUCSettings, request.ApplicationName, ev->Cookie, std::move(ev->Get()->IntrestedInResult), @@ -824,6 +834,7 @@ private: if (compileResult->NeedToSplit) { Reply(compileRequest.Sender, compileResult, compileStats, ctx, compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan), (CollectDiagnostics ? ev->Get()->ReplayMessageUserView : std::nullopt)); + ProcessQueue(ctx); return; } @@ -961,7 +972,6 @@ private: compileRequest.Orbit, compileRequest.Query.UserSid); - compileRequest.CompileSettings.Action = ECompileActorAction::COMPILE; compileRequest.QueryAst = std::move(queryAst); if (!RequestsQueue.Enqueue(std::move(compileRequest))) { @@ -994,6 +1004,7 @@ private: return; } + compileRequest.CompileSettings.Action = ECompileActorAction::COMPILE; CompileByAst(astStatements.front(), compileRequest, ctx); } diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index 1469403863a..f9f6a626a45 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -2566,6 +2566,61 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { } } + Y_UNIT_TEST(SeveralCTAS) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true); + appConfig.MutableTableServiceConfig()->SetEnableAstCache(true); + appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true); + appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true); + appConfig.MutableTableServiceConfig()->SetEnableCreateTableAs(true); + appConfig.MutableTableServiceConfig()->SetEnablePerStatementQueryExecution(true); + auto setting = NKikimrKqp::TKqpSetting(); + auto serverSettings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetKqpSettings({setting}) + .SetWithSampleTables(false) + .SetEnableTempTables(true); + + TKikimrRunner kikimr(serverSettings); + auto db = kikimr.GetQueryClient(); + + { + auto result = db.ExecuteQuery(R"( + CREATE TABLE Table1 ( + PRIMARY KEY (Key) + ) AS SELECT 1u AS Key, "1" AS Value1, "1" AS Value2; + CREATE TABLE Table2 ( + PRIMARY KEY (Key) + ) AS SELECT 2u AS Key, "2" AS Value1, "2" AS Value2; + CREATE TABLE Table3 ( + PRIMARY KEY (Key) + ) AS SELECT * FROM Table2 UNION ALL SELECT * FROM Table1; + SELECT * FROM Table1 ORDER BY Key; + SELECT * FROM Table2 ORDER BY Key; + SELECT * FROM Table3 ORDER BY Key; + )", TTxControl::NoTx(), TExecuteQuerySettings().ClientTimeout(TDuration::MilliSeconds(500))).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 3); + // Results are empty. Snapshot was taken before tables were created, so we don't see changes after snapshot. + // This will be fixed in future, for example, by implicit commit before/after each ddl statement. + CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(1))); + CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(2))); + + result = db.ExecuteQuery(R"( + SELECT * FROM Table1 ORDER BY Key; + SELECT * FROM Table2 ORDER BY Key; + SELECT * FROM Table3 ORDER BY Key; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 3); + CompareYson(R"([[[1u];["1"];["1"]]])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([[[2u];["2"];["2"]]])", FormatResultSetYson(result.GetResultSet(1))); + // Also empty now( + CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(2))); + } + } + Y_UNIT_TEST(TableSink_ReplaceFromSelectOlap) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true); |