diff options
author | vpolka <vpolka@yandex-team.com> | 2023-11-16 14:48:56 +0300 |
---|---|---|
committer | vpolka <vpolka@yandex-team.com> | 2023-11-16 16:16:34 +0300 |
commit | 3b6c6c4a799c566414ad46638676ac561f09521f (patch) | |
tree | 83f8ec0e70452fb0b066f41da7e571f71cd84baa | |
parent | f1d6afc2ce52684185b06a1d1ea85be28690f10c (diff) | |
download | ydb-3b6c6c4a799c566414ad46638676ac561f09521f.tar.gz |
KIKIMR-18857: find query in cache by ast and use pg consts like params
31 files changed, 835 insertions, 184 deletions
diff --git a/ydb/core/kqp/common/compilation/events.h b/ydb/core/kqp/common/compilation/events.h index 04c1f684b4..7c01dfa9ff 100644 --- a/ydb/core/kqp/common/compilation/events.h +++ b/ydb/core/kqp/common/compilation/events.h @@ -6,6 +6,7 @@ #include <ydb/core/kqp/common/simple/temp_tables.h> #include <ydb/core/kqp/common/simple/kqp_event_ids.h> #include <ydb/core/kqp/common/simple/query_id.h> +#include <ydb/core/kqp/common/simple/query_ast.h> #include <ydb/core/kqp/common/kqp_user_request_context.h> #include <ydb/core/kqp/counters/kqp_counters.h> @@ -97,6 +98,15 @@ struct TEvCompileResponse: public TEventLocal<TEvCompileResponse, TKqpEvents::Ev NLWTrace::TOrbit Orbit; }; +struct TEvParseResponse: public TEventLocal<TEvParseResponse, TKqpEvents::EvParseResponse> { + TEvParseResponse(const TKqpQueryId& query, TMaybe<TQueryAst> astResult) + : AstResult(std::move(astResult)) + , Query(query) {} + + TMaybe<TQueryAst> AstResult; + TKqpQueryId Query; +}; + struct TEvCompileInvalidateRequest: public TEventLocal<TEvCompileInvalidateRequest, TKqpEvents::EvCompileInvalidateRequest> { TEvCompileInvalidateRequest(const TString& uid, TKqpDbCountersPtr dbCounters) diff --git a/ydb/core/kqp/common/compilation/result.h b/ydb/core/kqp/common/compilation/result.h index b9178a01bc..d66543ee94 100644 --- a/ydb/core/kqp/common/compilation/result.h +++ b/ydb/core/kqp/common/compilation/result.h @@ -3,39 +3,30 @@ #include <ydb/core/kqp/common/simple/query_id.h> #include <ydb/core/kqp/common/simple/helpers.h> #include <ydb/library/yql/public/issue/yql_issue.h> +#include <ydb/library/yql/ast/yql_ast.h> namespace NKikimr::NKqp { class TPreparedQueryHolder; +struct TQueryAst; struct TKqpCompileResult { using TConstPtr = std::shared_ptr<const TKqpCompileResult>; - TKqpCompileResult(const TString& uid, TKqpQueryId&& query, const Ydb::StatusIds::StatusCode& status, - const NYql::TIssues& issues, ETableReadType maxReadType) - : Status(status) - , Issues(issues) - , Query(std::move(query)) - , Uid(uid) - , MaxReadType(maxReadType) {} - TKqpCompileResult(const TString& uid, const Ydb::StatusIds::StatusCode& status, const NYql::TIssues& issues, - ETableReadType maxReadType) + ETableReadType maxReadType, TMaybe<TKqpQueryId> query = {}, std::shared_ptr<NYql::TAstParseResult> ast = {}) : Status(status) , Issues(issues) + , Query(std::move(query)) , Uid(uid) - , MaxReadType(maxReadType) {} - - static std::shared_ptr<TKqpCompileResult> Make(const TString& uid, TKqpQueryId&& query, - const Ydb::StatusIds::StatusCode& status, const NYql::TIssues& issues, ETableReadType maxReadType) - { - return std::make_shared<TKqpCompileResult>(uid, std::move(query), status, issues, maxReadType); - } + , MaxReadType(maxReadType) + , Ast(std::move(ast)) {} static std::shared_ptr<TKqpCompileResult> Make(const TString& uid, const Ydb::StatusIds::StatusCode& status, - const NYql::TIssues& issues, ETableReadType maxReadType) + const NYql::TIssues& issues, ETableReadType maxReadType, TMaybe<TKqpQueryId> query = {}, + std::shared_ptr<NYql::TAstParseResult> ast = {}) { - return std::make_shared<TKqpCompileResult>(uid, status, issues, maxReadType); + return std::make_shared<TKqpCompileResult>(uid, status, issues, maxReadType, std::move(query), std::move(ast)); } Ydb::StatusIds::StatusCode Status; @@ -46,6 +37,7 @@ struct TKqpCompileResult { ETableReadType MaxReadType; bool AllowCache = true; + std::shared_ptr<NYql::TAstParseResult> Ast; std::shared_ptr<const TPreparedQueryHolder> PreparedQuery; }; diff --git a/ydb/core/kqp/common/events/events.h b/ydb/core/kqp/common/events/events.h index 26cf8f50b3..6f08219141 100644 --- a/ydb/core/kqp/common/events/events.h +++ b/ydb/core/kqp/common/events/events.h @@ -40,6 +40,7 @@ struct TEvKqp { using TEvCompileRequest = NPrivateEvents::TEvCompileRequest; using TEvRecompileRequest = NPrivateEvents::TEvRecompileRequest; using TEvCompileResponse = NPrivateEvents::TEvCompileResponse; + using TEvParseResponse = NPrivateEvents::TEvParseResponse; using TEvCompileInvalidateRequest = NPrivateEvents::TEvCompileInvalidateRequest; using TEvInitiateSessionShutdown = NKikimr::NKqp::NPrivateEvents::TEvInitiateSessionShutdown; diff --git a/ydb/core/kqp/common/simple/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/common/simple/CMakeLists.darwin-x86_64.txt index 97f8b468b2..44da5ffcff 100644 --- a/ydb/core/kqp/common/simple/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/common/simple/CMakeLists.darwin-x86_64.txt @@ -14,6 +14,7 @@ target_link_libraries(kqp-common-simple PUBLIC contrib-libs-protobuf ydb-core-base ydb-core-protos + library-yql-ast yql-dq-actors api-protos ) diff --git a/ydb/core/kqp/common/simple/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/common/simple/CMakeLists.linux-aarch64.txt index 04281e2995..3330bd638f 100644 --- a/ydb/core/kqp/common/simple/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/common/simple/CMakeLists.linux-aarch64.txt @@ -15,6 +15,7 @@ target_link_libraries(kqp-common-simple PUBLIC contrib-libs-protobuf ydb-core-base ydb-core-protos + library-yql-ast yql-dq-actors api-protos ) diff --git a/ydb/core/kqp/common/simple/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/common/simple/CMakeLists.linux-x86_64.txt index 04281e2995..3330bd638f 100644 --- a/ydb/core/kqp/common/simple/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/common/simple/CMakeLists.linux-x86_64.txt @@ -15,6 +15,7 @@ target_link_libraries(kqp-common-simple PUBLIC contrib-libs-protobuf ydb-core-base ydb-core-protos + library-yql-ast yql-dq-actors api-protos ) diff --git a/ydb/core/kqp/common/simple/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/common/simple/CMakeLists.windows-x86_64.txt index 97f8b468b2..44da5ffcff 100644 --- a/ydb/core/kqp/common/simple/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/common/simple/CMakeLists.windows-x86_64.txt @@ -14,6 +14,7 @@ target_link_libraries(kqp-common-simple PUBLIC contrib-libs-protobuf ydb-core-base ydb-core-protos + library-yql-ast yql-dq-actors api-protos ) diff --git a/ydb/core/kqp/common/simple/kqp_event_ids.h b/ydb/core/kqp/common/simple/kqp_event_ids.h index 2bbbfe6ee1..ef4e11d40b 100644 --- a/ydb/core/kqp/common/simple/kqp_event_ids.h +++ b/ydb/core/kqp/common/simple/kqp_event_ids.h @@ -38,6 +38,8 @@ struct TKqpEvents { EvCancelScriptExecutionResponse, EvCancelQueryRequest, EvCancelQueryResponse, + EvParseRequest, + EvParseResponse, }; static_assert (EvCompileInvalidateRequest + 1 == EvAbortExecution); diff --git a/ydb/core/kqp/common/simple/query_ast.h b/ydb/core/kqp/common/simple/query_ast.h new file mode 100644 index 0000000000..59519750f9 --- /dev/null +++ b/ydb/core/kqp/common/simple/query_ast.h @@ -0,0 +1,20 @@ +#pragma once + +#include <ydb/library/yql/ast/yql_ast.h> + +#include <util/generic/maybe.h> + +namespace NKikimr::NKqp { + +struct TQueryAst { + TQueryAst(std::shared_ptr<NYql::TAstParseResult> ast, const TMaybe<ui16>& sqlVersion, const TMaybe<bool>& deprecatedSQL) + : Ast(std::move(ast)) + , SqlVersion(sqlVersion) + , DeprecatedSQL(deprecatedSQL) {} + + std::shared_ptr<NYql::TAstParseResult> Ast; + TMaybe<ui16> SqlVersion; + TMaybe<bool> DeprecatedSQL; +}; + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/common/simple/query_ref.h b/ydb/core/kqp/common/simple/query_ref.h index 4493e212cc..dc1838222e 100644 --- a/ydb/core/kqp/common/simple/query_ref.h +++ b/ydb/core/kqp/common/simple/query_ref.h @@ -1,22 +1,20 @@ #pragma once - - -#include <util/generic/string.h> - -#include <map> -#include <memory> +#include "query_ast.h" +#include "settings.h" namespace NKikimr::NKqp { struct TKqpQueryRef { - TKqpQueryRef(const TString& text, std::shared_ptr<std::map<TString, Ydb::Type>> parameterTypes = {}) + TKqpQueryRef(const TString& text, std::shared_ptr<std::map<TString, Ydb::Type>> parameterTypes = {}, const TMaybe<TQueryAst>& astResult = {}) : Text(text) , ParameterTypes(parameterTypes) + , AstResult(astResult) {} // Text is owned by TKqpQueryId const TString& Text; std::shared_ptr<std::map<TString, Ydb::Type>> ParameterTypes; + TMaybe<TQueryAst> AstResult; }; } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/common/simple/ya.make b/ydb/core/kqp/common/simple/ya.make index b7f529b6e3..54bd274e2b 100644 --- a/ydb/core/kqp/common/simple/ya.make +++ b/ydb/core/kqp/common/simple/ya.make @@ -13,6 +13,7 @@ PEERDIR( contrib/libs/protobuf ydb/core/base ydb/core/protos + ydb/library/yql/ast ydb/library/yql/dq/actors ydb/public/api/protos ) diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index 83b55ab23e..f6b3c6b71e 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -7,6 +7,7 @@ #include <ydb/core/kqp/counters/kqp_counters.h> #include <ydb/core/kqp/gateway/kqp_metadata_loader.h> #include <ydb/core/kqp/host/kqp_host.h> +#include <ydb/core/kqp/host/kqp_translate.h> #include <ydb/core/kqp/session_actor/kqp_worker_common.h> #include <ydb/library/yql/utils/actor_log/log.h> @@ -33,23 +34,6 @@ using namespace NThreading; using namespace NYql; using namespace NYql::NDq; -namespace { -NSQLTranslation::EBindingsMode RemapBindingsMode(NKikimrConfig::TTableServiceConfig::EBindingsMode mode) { - switch (mode) { - case NKikimrConfig::TTableServiceConfig::BM_ENABLED: - return NSQLTranslation::EBindingsMode::ENABLED; - case NKikimrConfig::TTableServiceConfig::BM_DISABLED: - return NSQLTranslation::EBindingsMode::DISABLED; - case NKikimrConfig::TTableServiceConfig::BM_DROP_WITH_WARNING: - return NSQLTranslation::EBindingsMode::DROP_WITH_WARNING; - case NKikimrConfig::TTableServiceConfig::BM_DROP: - return NSQLTranslation::EBindingsMode::DROP; - default: - return NSQLTranslation::EBindingsMode::ENABLED; - } -} -} - class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> { public: using TBase = TActorBootstrapped<TKqpCompileActor>; @@ -67,14 +51,15 @@ public: const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpDbCountersPtr dbCounters, std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const TIntrusivePtr<TUserRequestContext>& userRequestContext, - NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState, bool collectFullDiagnostics) + NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState, bool collectFullDiagnostics, + ECompileActorAction compileAction, TMaybe<TQueryAst> astResult) : Owner(owner) , ModuleResolverState(moduleResolverState) , Counters(counters) , FederatedQuerySetup(federatedQuerySetup) , Uid(uid) , QueryId(queryId) - , QueryRef(QueryId.Text, QueryId.QueryParameterTypes) + , QueryRef(QueryId.Text, QueryId.QueryParameterTypes, astResult) , UserToken(userToken) , DbCounters(dbCounters) , Config(MakeIntrusive<TKikimrConfiguration>()) @@ -84,6 +69,8 @@ public: , CompileActorSpan(TWilsonKqp::CompileActor, std::move(traceId), "CompileActor") , TempTablesState(std::move(tempTablesState)) , CollectFullDiagnostics(collectFullDiagnostics) + , CompileAction(compileAction) + , AstResult(std::move(astResult)) { Config->Init(kqpSettings->DefaultSettings.GetDefaultSettings(), QueryId.Cluster, kqpSettings->Settings, false); @@ -106,6 +93,62 @@ public: } void Bootstrap(const TActorContext& ctx) { + switch(CompileAction) { + case ECompileActorAction::PARSE: + StartParsing(ctx); + break; + case ECompileActorAction::COMPILE: + StartCompilation(ctx); + break; + } + } + + void Die(const NActors::TActorContext& ctx) override { + if (TimeoutTimerActorId) { + ctx.Send(TimeoutTimerActorId, new TEvents::TEvPoisonPill()); + } + + TBase::Die(ctx); + } + +private: + STFUNC(CompileState) { + try { + switch (ev->GetTypeRewrite()) { + HFunc(TEvKqp::TEvContinueProcess, Handle); + cFunc(TEvents::TSystem::Wakeup, HandleTimeout); + default: + UnexpectedEvent("CompileState", ev->GetTypeRewrite()); + } + } catch (const yexception& e) { + InternalError(e.what()); + } + } + +private: + void SetQueryAst(const TActorContext &ctx) { + TString cluster = QueryId.Cluster; + TString kqpTablePathPrefix = Config->_KqpTablePathPrefix.Get().GetRef(); + ui16 kqpYqlSyntaxVersion = Config->_KqpYqlSyntaxVersion.Get().GetRef(); + NSQLTranslation::EBindingsMode bindingsMode = Config->BindingsMode; + bool isEnableExternalDataSources = AppData(ctx)->FeatureFlags.GetEnableExternalDataSources(); + bool isEnablePgConstsToParams = Config->EnablePgConstsToParams; + + auto astResult = ParseQuery(ConvertType(QueryId.Settings.QueryType), QueryId.Settings.Syntax, QueryId.Text, QueryId.QueryParameterTypes, QueryId.IsSql(), cluster, kqpTablePathPrefix, kqpYqlSyntaxVersion, bindingsMode, isEnableExternalDataSources, isEnablePgConstsToParams); + YQL_ENSURE(astResult.Ast); + if (astResult.Ast->IsOk()) { + AstResult = std::move(astResult); + } + } + + void StartParsing(const TActorContext &ctx) { + SetQueryAst(ctx); + + Become(&TKqpCompileActor::CompileState); + ReplyParseResult(ctx); + } + + void StartCompilation(const TActorContext &ctx) { StartTime = TInstant::Now(); Counters->ReportCompileStart(DbCounters); @@ -194,33 +237,9 @@ public: } Continue(ctx); - Become(&TKqpCompileActor::CompileState); } - void Die(const NActors::TActorContext& ctx) override { - if (TimeoutTimerActorId) { - ctx.Send(TimeoutTimerActorId, new TEvents::TEvPoisonPill()); - } - - TBase::Die(ctx); - } - -private: - STFUNC(CompileState) { - try { - switch (ev->GetTypeRewrite()) { - HFunc(TEvKqp::TEvContinueProcess, Handle); - cFunc(TEvents::TSystem::Wakeup, HandleTimeout); - default: - UnexpectedEvent("CompileState", ev->GetTypeRewrite()); - } - } catch (const yexception& e) { - InternalError(e.what()); - } - } - -private: void Continue(const TActorContext &ctx) { TActorSystem* actorSystem = ctx.ExecutorThread.ActorSystem; TActorId selfId = ctx.SelfID; @@ -312,7 +331,7 @@ private: } void ReplyError(Ydb::StatusIds::StatusCode status, const TIssues& issues) { - Reply(TKqpCompileResult::Make(Uid, std::move(QueryId), status, issues, ETableReadType::Other)); + Reply(TKqpCompileResult::Make(Uid, status, issues, ETableReadType::Other, std::move(QueryId))); } void InternalError(const TString message) { @@ -332,6 +351,26 @@ private: << ", at state:" << state); } + void ReplyParseResult(const TActorContext &ctx) { + Y_UNUSED(ctx); + ALOG_DEBUG(NKikimrServices::KQP_COMPILE_ACTOR, "Send parsing result" + << ", self: " << SelfId() + << ", owner: " << Owner + << (AstResult && AstResult->Ast->IsOk() ? ", parsing is successful" : ", parsing is not successful")); + + auto responseEv = MakeHolder<TEvKqp::TEvParseResponse>(QueryId, std::move(AstResult)); + AstResult = Nothing(); + Send(Owner, responseEv.Release()); + + Counters->ReportCompileFinish(DbCounters); + + if (CompileActorSpan) { + CompileActorSpan.End(); + } + + PassAway(); + } + void Handle(TEvKqp::TEvContinueProcess::TPtr &ev, const TActorContext &ctx) { Y_ENSURE(!ev->Get()->QueryId); @@ -359,7 +398,7 @@ private: auto queryType = QueryId.Settings.QueryType; - KqpCompileResult = TKqpCompileResult::Make(Uid, std::move(QueryId), status, kqpResult.Issues(), maxReadType); + KqpCompileResult = TKqpCompileResult::Make(Uid, status, kqpResult.Issues(), maxReadType, std::move(QueryId)); if (status == Ydb::StatusIds::SUCCESS) { YQL_ENSURE(kqpResult.PreparingQuery); @@ -369,6 +408,10 @@ private: preparedQueryHolder->MutableLlvmSettings().Fill(Config, queryType); KqpCompileResult->PreparedQuery = preparedQueryHolder; KqpCompileResult->AllowCache = CanCacheQuery(KqpCompileResult->PreparedQuery->GetPhysicalQuery()); + + if (AstResult) { + KqpCompileResult->Ast = AstResult->Ast; + } } auto now = TInstant::Now(); @@ -430,6 +473,9 @@ private: TKqpTempTablesState::TConstPtr TempTablesState; bool CollectFullDiagnostics; + + ECompileActorAction CompileAction; + TMaybe<TQueryAst> AstResult; }; void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConfig& serviceConfig) { @@ -456,6 +502,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf kqpConfig.BindingsMode = RemapBindingsMode(serviceConfig.GetBindingsMode()); kqpConfig.PredicateExtract20 = serviceConfig.GetPredicateExtract20(); kqpConfig.IndexAutoChooserMode = serviceConfig.GetIndexAutoChooseMode(); + kqpConfig.EnablePgConstsToParams = serviceConfig.GetEnablePgConstsToParams(); } IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings, @@ -466,13 +513,15 @@ IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstP const TString& uid, const TKqpQueryId& query, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, TKqpDbCountersPtr dbCounters, const TIntrusivePtr<TUserRequestContext>& userRequestContext, - NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState, bool collectFullDiagnostics) + NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState, + ECompileActorAction compileAction, TMaybe<TQueryAst> astResult, bool collectFullDiagnostics) { return new TKqpCompileActor(owner, kqpSettings, tableServiceConfig, queryServiceConfig, metadataProviderConfig, moduleResolverState, counters, uid, query, userToken, dbCounters, federatedQuerySetup, userRequestContext, - std::move(traceId), std::move(tempTablesState), collectFullDiagnostics); + std::move(traceId), std::move(tempTablesState), collectFullDiagnostics, + compileAction, std::move(astResult)); } } // namespace NKqp diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp index e5b7f4255b..9ebb183fb0 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp @@ -7,7 +7,9 @@ #include <ydb/core/cms/console/configs_dispatcher.h> #include <ydb/core/kqp/counters/kqp_counters.h> #include <ydb/core/kqp/common/kqp_lwtrace_probes.h> +#include <ydb/core/kqp/common/simple/query_ast.h> #include <ydb/core/ydb_convert/ydb_convert.h> +#include <ydb/core/kqp/host/kqp_translate.h> #include <ydb/library/aclib/aclib.h> #include <library/cpp/actors/core/actor_bootstrapped.h> @@ -32,7 +34,7 @@ public: : List(size) , Ttl(ttl) {} - bool Insert(const TKqpCompileResult::TConstPtr& compileResult) { + void InsertQuery(const TKqpCompileResult::TConstPtr& compileResult) { Y_ENSURE(compileResult->Query); auto& query = *compileResult->Query; @@ -40,6 +42,20 @@ public: auto queryIt = QueryIndex.emplace(query, compileResult->Uid); Y_ENSURE(queryIt.second); + } + + void InsertAst(const TKqpCompileResult::TConstPtr& compileResult) { + Y_ENSURE(compileResult->Query); + Y_ENSURE(compileResult->Ast); + + AstIndex.emplace(GetQueryIdWithAst(*compileResult->Query, *compileResult->Ast), compileResult->Uid); + } + + bool Insert(const TKqpCompileResult::TConstPtr& compileResult, bool isEnableAstCache) { + InsertQuery(compileResult); + if (isEnableAstCache && compileResult->Ast) { + InsertAst(compileResult); + } auto it = Index.emplace(compileResult->Uid, TCacheEntry{compileResult, TAppData::TimeProvider->Now() + Ttl}); @@ -53,7 +69,11 @@ public: if (removedItem) { DecBytes(removedItem->Value.CompileResult->PreparedQuery->ByteSize()); - QueryIndex.erase(*removedItem->Value.CompileResult->Query); + auto queryId = *removedItem->Value.CompileResult->Query; + QueryIndex.erase(queryId); + if (removedItem->Value.CompileResult->Ast) { + AstIndex.erase(GetQueryIdWithAst(queryId, *removedItem->Value.CompileResult->Ast)); + } auto indexIt = Index.find(*removedItem); if (indexIt != Index.end()) { Index.erase(indexIt); @@ -89,6 +109,25 @@ public: } } + TKqpQueryId GetQueryIdWithAst(const TKqpQueryId& query, const NYql::TAstParseResult& ast) { + Y_ABORT_UNLESS(ast.Root); + std::shared_ptr<std::map<TString, Ydb::Type>> astPgParams; + if (query.QueryParameterTypes || ast.PgAutoParamValues) { + astPgParams = std::make_shared<std::map<TString, Ydb::Type>>(); + if (query.QueryParameterTypes) { + for (const auto& [name, param] : *query.QueryParameterTypes) { + astPgParams->insert({name, param}); + } + } + if (ast.PgAutoParamValues) { + for (const auto& [name, param] : *ast.PgAutoParamValues) { + astPgParams->insert({name, param.Gettype()}); + } + } + } + return TKqpQueryId{query.Cluster, query.Database, ast.Root->ToString(), query.Settings, astPgParams}; + } + TKqpCompileResult::TConstPtr FindByQuery(const TKqpQueryId& query, bool promote) { auto uid = QueryIndex.FindPtr(query); if (!uid) { @@ -98,6 +137,15 @@ public: return FindByUid(*uid, promote); } + TKqpCompileResult::TConstPtr FindByAst(const TKqpQueryId& query, const NYql::TAstParseResult& ast, bool promote) { + auto uid = AstIndex.FindPtr(GetQueryIdWithAst(query, ast)); + if (!uid) { + return nullptr; + } + + return FindByUid(*uid, promote); + } + bool EraseByUid(const TString& uid) { auto it = Index.find(TItem(uid)); if (it == Index.end()) { @@ -111,7 +159,11 @@ public: Y_ABORT_UNLESS(item->Value.CompileResult); Y_ABORT_UNLESS(item->Value.CompileResult->Query); - QueryIndex.erase(*item->Value.CompileResult->Query); + auto queryId = *item->Value.CompileResult->Query; + QueryIndex.erase(queryId); + if (item->Value.CompileResult->Ast) { + AstIndex.erase(GetQueryIdWithAst(queryId, *item->Value.CompileResult->Ast)); + } Index.erase(it); @@ -145,6 +197,7 @@ public: List = TList(List.GetMaxSize()); Index.clear(); QueryIndex.clear(); + AstIndex.clear(); ByteSize = 0; } @@ -174,6 +227,7 @@ private: TList List; THashSet<TItem, TItem::THash> Index; THashMap<TKqpQueryId, TString, THash<TKqpQueryId>> QueryIndex; + THashMap<TKqpQueryId, TString, THash<TKqpQueryId>> AstIndex; ui64 ByteSize = 0; TDuration Ttl; }; @@ -184,7 +238,9 @@ struct TKqpCompileRequest { ui64 cookie, std::shared_ptr<std::atomic<bool>> intrestedInResult, const TIntrusivePtr<TUserRequestContext>& userRequestContext, NLWTrace::TOrbit orbit = {}, NWilson::TSpan span = {}, - TKqpTempTablesState::TConstPtr tempTablesState = {}) + TKqpTempTablesState::TConstPtr tempTablesState = {}, + ECompileActorAction action = ECompileActorAction::COMPILE, + TMaybe<TQueryAst> astResult = {}) : Sender(sender) , Query(std::move(query)) , Uid(uid) @@ -198,6 +254,8 @@ struct TKqpCompileRequest { , Cookie(cookie) , TempTablesState(std::move(tempTablesState)) , IntrestedInResult(std::move(intrestedInResult)) + , Action(action) + , AstResult(std::move(astResult)) {} TActorId Sender; @@ -215,6 +273,8 @@ struct TKqpCompileRequest { ui64 Cookie; TKqpTempTablesState::TConstPtr TempTablesState; std::shared_ptr<std::atomic<bool>> IntrestedInResult; + ECompileActorAction Action; + TMaybe<TQueryAst> AstResult; bool IsIntrestedInResult() const { return IntrestedInResult->load(); @@ -374,6 +434,7 @@ private: HFunc(TEvKqp::TEvCompileResponse, Handle); HFunc(TEvKqp::TEvCompileInvalidateRequest, Handle); HFunc(TEvKqp::TEvRecompileRequest, Handle); + HFunc(TEvKqp::TEvParseResponse, Handle); hFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse, HandleConfig); hFunc(NConsole::TEvConsole::TEvConfigNotificationRequest, HandleConfig); @@ -550,7 +611,7 @@ private: if (compileResult) { Counters->ReportQueryCacheHit(dbCounters, true); - LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache" + LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache from query text" << ", sender: " << ev->Sender << ", queryUid: " << compileResult->Uid); @@ -558,19 +619,17 @@ private: return; } - Counters->ReportQueryCacheHit(dbCounters, false); - CollectDiagnostics = request.CollectDiagnostics; LWTRACK(KqpCompileServiceEnqueued, ev->Get()->Orbit, ev->Get()->Query ? ev->Get()->Query->UserSid : 0); - TKqpCompileRequest compileRequest(ev->Sender, CreateGuidAsString(), std::move(*request.Query), request.KeepInCache, request.UserToken, request.Deadline, dbCounters, ev->Cookie, std::move(ev->Get()->IntrestedInResult), ev->Get()->UserRequestContext, - std::move(ev->Get()->Orbit), std::move(compileServiceSpan), std::move(ev->Get()->TempTablesState)); + std::move(ev->Get()->Orbit), std::move(compileServiceSpan), std::move(ev->Get()->TempTablesState), + TableServiceConfig.GetEnableAstCache() ? ECompileActorAction::PARSE : ECompileActorAction::COMPILE); if (!RequestsQueue.Enqueue(std::move(compileRequest))) { Counters->ReportCompileRequestRejected(dbCounters); @@ -680,7 +739,7 @@ private: if (QueryCache.FindByUid(compileResult->Uid, false)) { QueryCache.Replace(compileResult); } else if (keepInCache) { - if (QueryCache.Insert(compileResult)) { + if (QueryCache.Insert(compileResult, TableServiceConfig.GetEnableAstCache())) { Counters->CompileQueryCacheEvicted->Inc(); } if (compileResult->Query && compileResult->Query->Settings.IsPrepareQuery) { @@ -753,6 +812,54 @@ private: StartCheckQueriesTtlTimer(); } + void Handle(TEvKqp::TEvParseResponse::TPtr& ev, const TActorContext& ctx) { + auto& parseResult = ev->Get()->AstResult; + auto& query = ev->Get()->Query; + auto compileRequest = RequestsQueue.FinishActiveRequest(query); + if (parseResult && parseResult->Ast->IsOk()) { + auto compileResult = QueryCache.FindByAst(query, *parseResult->Ast, compileRequest.KeepInCache); + if (compileResult) { + Counters->ReportQueryCacheHit(compileRequest.DbCounters, true); + + LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache from ast" + << ", sender: " << compileRequest.Sender + << ", queryUid: " << compileResult->Uid); + + compileResult->Ast->PgAutoParamValues = std::move(parseResult->Ast->PgAutoParamValues); + + ReplyFromCache(compileRequest.Sender, compileResult, ctx, compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan)); + return; + } + } + Counters->ReportQueryCacheHit(compileRequest.DbCounters, false); + + LWTRACK(KqpCompileServiceEnqueued, + compileRequest.Orbit, + compileRequest.Query.UserSid); + + compileRequest.Action = ECompileActorAction::COMPILE; + compileRequest.AstResult = std::move(parseResult); + + if (!RequestsQueue.Enqueue(std::move(compileRequest))) { + Counters->ReportCompileRequestRejected(compileRequest.DbCounters); + + LOG_WARN_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Requests queue size limit exceeded" + << ", sender: " << ev->Sender + << ", queueSize: " << RequestsQueue.Size()); + + NYql::TIssue issue(NYql::TPosition(), TStringBuilder() << + "Exceeded maximum number of requests in compile service queue."); + ReplyError(ev->Sender, "", Ydb::StatusIds::OVERLOADED, {issue}, ctx, compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan)); + return; + } + + LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Added request to queue" + << ", sender: " << ev->Sender + << ", queueSize: " << RequestsQueue.Size()); + + ProcessQueue(ctx); + } + private: bool InsertPreparingQuery(const TKqpCompileResult::TConstPtr& compileResult, bool keepInCache) { YQL_ENSURE(compileResult->Query); @@ -773,10 +880,13 @@ private: if (QueryCache.FindByQuery(query, keepInCache)) { return false; } - auto newCompileResult = TKqpCompileResult::Make(CreateGuidAsString(), std::move(query), compileResult->Status, compileResult->Issues, compileResult->MaxReadType); + if (compileResult->Ast && QueryCache.FindByAst(query, *compileResult->Ast, keepInCache)) { + return false; + } + auto newCompileResult = TKqpCompileResult::Make(CreateGuidAsString(), compileResult->Status, compileResult->Issues, compileResult->MaxReadType, std::move(query), compileResult->Ast); newCompileResult->AllowCache = compileResult->AllowCache; newCompileResult->PreparedQuery = compileResult->PreparedQuery; - return QueryCache.Insert(newCompileResult); + return QueryCache.Insert(newCompileResult, TableServiceConfig.GetEnableAstCache()); } void ProcessQueue(const TActorContext& ctx) { @@ -808,16 +918,16 @@ private: void StartCompilation(TKqpCompileRequest&& request, const TActorContext& ctx) { auto compileActor = CreateKqpCompileActor(ctx.SelfID, KqpSettings, TableServiceConfig, QueryServiceConfig, MetadataProviderConfig, ModuleResolverState, Counters, - request.Uid, request.Query, request.UserToken, FederatedQuerySetup, request.DbCounters, request.UserRequestContext, - request.CompileServiceSpan.GetTraceId(), std::move(request.TempTablesState), CollectDiagnostics); + request.Uid, request.Query, request.UserToken, FederatedQuerySetup, request.DbCounters, request.UserRequestContext, + request.CompileServiceSpan.GetTraceId(), request.TempTablesState, request.Action, std::move(request.AstResult), CollectDiagnostics); auto compileActorId = ctx.ExecutorThread.RegisterActor(compileActor, TMailboxType::HTSwap, AppData(ctx)->UserPoolId); LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Created compile actor" << ", sender: " << request.Sender << ", compileActor: " << compileActorId); - request.CompileActor = compileActorId; + RequestsQueue.AddActiveRequest(std::move(request)); } diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.h b/ydb/core/kqp/compile_service/kqp_compile_service.h index aeed99cd62..213dc1ecff 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.h +++ b/ydb/core/kqp/compile_service/kqp_compile_service.h @@ -8,6 +8,11 @@ namespace NKikimr { namespace NKqp { +enum class ECompileActorAction { + COMPILE, + PARSE +}; + IActor* CreateKqpCompileService(const NKikimrConfig::TTableServiceConfig& tableServiceConfig, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, @@ -30,6 +35,8 @@ IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstP TKqpDbCountersPtr dbCounters, const TIntrusivePtr<TUserRequestContext>& userRequestContext, NWilson::TTraceId traceId = {}, TKqpTempTablesState::TConstPtr tempTablesState = nullptr, + ECompileActorAction compileAction = ECompileActorAction::COMPILE, + TMaybe<TQueryAst> astResult = {}, bool collectFullDiagnostics = false); IActor* CreateKqpCompileRequestActor(const TActorId& owner, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TMaybe<TString>& uid, diff --git a/ydb/core/kqp/host/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/host/CMakeLists.darwin-x86_64.txt index 1170ac5cd8..905cc9c5fe 100644 --- a/ydb/core/kqp/host/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/host/CMakeLists.darwin-x86_64.txt @@ -38,5 +38,6 @@ target_sources(core-kqp-host PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_host.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_runner.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_transform.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_translate.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_type_ann.cpp ) diff --git a/ydb/core/kqp/host/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/host/CMakeLists.linux-aarch64.txt index 4955fb9ec5..42faf8147f 100644 --- a/ydb/core/kqp/host/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/host/CMakeLists.linux-aarch64.txt @@ -39,5 +39,6 @@ target_sources(core-kqp-host PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_host.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_runner.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_transform.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_translate.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_type_ann.cpp ) diff --git a/ydb/core/kqp/host/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/host/CMakeLists.linux-x86_64.txt index 4955fb9ec5..42faf8147f 100644 --- a/ydb/core/kqp/host/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/host/CMakeLists.linux-x86_64.txt @@ -39,5 +39,6 @@ target_sources(core-kqp-host PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_host.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_runner.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_transform.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_translate.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_type_ann.cpp ) diff --git a/ydb/core/kqp/host/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/host/CMakeLists.windows-x86_64.txt index 1170ac5cd8..905cc9c5fe 100644 --- a/ydb/core/kqp/host/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/host/CMakeLists.windows-x86_64.txt @@ -38,5 +38,6 @@ target_sources(core-kqp-host PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_host.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_runner.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_transform.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_translate.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_type_ann.cpp ) diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index cfac07d4bb..7b2bd52ac5 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -6,7 +6,6 @@ #include <ydb/core/kqp/common/kqp_yql.h> #include <ydb/core/kqp/opt/kqp_query_plan.h> #include <ydb/core/kqp/provider/yql_kikimr_provider_impl.h> -#include <ydb/core/kqp/provider/yql_kikimr_results.h> #include <ydb/library/yql/core/yql_opt_proposed_by_data.h> #include <ydb/library/yql/core/services/yql_plan.h> @@ -21,7 +20,6 @@ #include <ydb/library/yql/providers/generic/provider/yql_generic_provider.h> #include <ydb/library/yql/providers/generic/provider/yql_generic_state.h> #include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h> -#include <ydb/library/yql/sql/sql.h> #include <library/cpp/cache/cache.h> #include <library/cpp/random_provider/random_provider.h> @@ -904,7 +902,6 @@ public: , IsInternalCall(isInternalCall) , FederatedQuerySetup(federatedQuerySetup) , SessionCtx(new TKikimrSessionContext(funcRegistry, config, TAppData::TimeProvider, TAppData::RandomProvider)) - , ClustersMap({{Cluster, TString(KikimrProviderName)}}) , TypesCtx(MakeIntrusive<TTypeAnnotationContext>()) , PlanBuilder(CreatePlanBuilder(*TypesCtx)) , FakeWorld(ExprCtx->NewWorld(TPosition())) @@ -1064,101 +1061,31 @@ private: TExprNode::TPtr CompileQuery(const TKqpQueryRef& query, bool isSql, bool sqlAutoCommit, TExprContext& ctx, TMaybe<TSqlVersion>& sqlVersion, const TMaybe<bool>& usePgParser) const { - TAstParseResult astRes; - if (isSql) { - NSQLTranslation::TTranslationSettings settings{}; - if (usePgParser) { - settings.PgParser = *usePgParser; - } - if (sqlVersion) { - settings.SyntaxVersion = *sqlVersion; - - if (*sqlVersion > 0) { - // Restrict fallback to V0 - settings.V0Behavior = NSQLTranslation::EV0Behavior::Disable; - } - } else { - settings.SyntaxVersion = SessionCtx->Config()._KqpYqlSyntaxVersion.Get().GetRef(); - settings.V0Behavior = NSQLTranslation::EV0Behavior::Silent; - } - - if (SessionCtx->Config().FeatureFlags.GetEnableExternalDataSources()) { - settings.DynamicClusterProvider = NYql::KikimrProviderName; - settings.BindingsMode = SessionCtx->Config().BindingsMode; - } - - settings.InferSyntaxVersion = true; - settings.V0ForceDisable = false; - settings.WarnOnV0 = false; - settings.DefaultCluster = Cluster; - settings.ClusterMapping = ClustersMap; - auto tablePathPrefix = SessionCtx->Config()._KqpTablePathPrefix.Get().GetRef(); - if (!tablePathPrefix.empty()) { - settings.PathPrefix = tablePathPrefix; - } - settings.EndOfQueryCommit = sqlAutoCommit; - settings.Flags.insert("FlexibleTypes"); - settings.Flags.insert("AnsiLike"); - - if (SessionCtx->Query().Type == EKikimrQueryType::Scan - || SessionCtx->Query().Type == EKikimrQueryType::YqlScript - || SessionCtx->Query().Type == EKikimrQueryType::YqlScriptStreaming - || SessionCtx->Query().Type == EKikimrQueryType::Query - || SessionCtx->Query().Type == EKikimrQueryType::Script) - { - // We enable EmitAggApply for filter and aggregate pushdowns to Column Shards - settings.Flags.insert("EmitAggApply"); - } else { - settings.Flags.insert("DisableEmitStartsWith"); - } - - if (SessionCtx->Query().Type == EKikimrQueryType::Query - || SessionCtx->Query().Type == EKikimrQueryType::Script) - { - settings.Flags.insert("AnsiOptionalAs"); - settings.Flags.insert("WarnOnAnsiAliasShadowing"); - settings.Flags.insert("AnsiCurrentRow"); - settings.Flags.insert("AnsiInForEmptyOrNullableItemsCollections"); - } - - if (query.ParameterTypes) { - NSQLTranslation::TTranslationSettings versionSettings = settings; - NYql::TIssues versionIssues; - - if (ParseTranslationSettings(query.Text, versionSettings, versionIssues) && versionSettings.SyntaxVersion == 1) { - for (const auto& [paramName, paramType] : *(query.ParameterTypes)) { - auto type = NYql::ParseTypeFromYdbType(paramType, ctx); - if (type != nullptr) { - if (paramName.StartsWith("$")) { - settings.DeclaredNamedExprs[paramName.substr(1)] = NYql::FormatType(type); - } else { - settings.DeclaredNamedExprs[paramName] = NYql::FormatType(type); - } - } - } - } - } - - ui16 actualSyntaxVersion = 0; - astRes = NSQLTranslation::SqlToYql(query.Text, settings, nullptr, &actualSyntaxVersion); - TypesCtx->DeprecatedSQL = (actualSyntaxVersion == 0); - sqlVersion = actualSyntaxVersion; + std::shared_ptr<NYql::TAstParseResult> queryAst; + if (!query.AstResult) { + auto astRes = ParseQuery(SessionCtx->Query().Type, usePgParser, + query.Text, query.ParameterTypes, isSql, sqlAutoCommit, sqlVersion, TypesCtx->DeprecatedSQL, + Cluster, SessionCtx->Config()._KqpTablePathPrefix.Get().GetRef(), + SessionCtx->Config()._KqpYqlSyntaxVersion.Get().GetRef(), SessionCtx->Config().BindingsMode, + SessionCtx->Config().FeatureFlags.GetEnableExternalDataSources(), ctx, SessionCtx->Config().EnablePgConstsToParams); + queryAst = std::make_shared<NYql::TAstParseResult>(std::move(astRes)); } else { - sqlVersion = {}; - astRes = ParseAst(query.Text); - - // Do not check SQL constraints on s-expressions input, as it may come from both V0/V1. - // Constraints were already checked on type annotation of SQL query. - TypesCtx->DeprecatedSQL = true; + queryAst = query.AstResult->Ast; + sqlVersion = query.AstResult->SqlVersion; + if (query.AstResult->DeprecatedSQL) { + TypesCtx->DeprecatedSQL = *query.AstResult->DeprecatedSQL; + } } - ctx.IssueManager.AddIssues(astRes.Issues); - if (!astRes.IsOk()) { + YQL_ENSURE(queryAst); + ctx.IssueManager.AddIssues(queryAst->Issues); + if (!queryAst->IsOk()) { return nullptr; } + YQL_ENSURE(queryAst->Root); TExprNode::TPtr result; - if (!CompileExpr(*astRes.Root, result, ctx, ModuleResolver.get(), nullptr)) { + if (!CompileExpr(*queryAst->Root, result, ctx, ModuleResolver.get(), nullptr)) { return nullptr; } @@ -1669,7 +1596,6 @@ private: std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup; TIntrusivePtr<TKikimrSessionContext> SessionCtx; - THashMap<TString, TString> ClustersMap; TIntrusivePtr<NKikimr::NMiniKQL::IFunctionRegistry> FuncRegistryHolder; const NKikimr::NMiniKQL::IFunctionRegistry* FuncRegistry; diff --git a/ydb/core/kqp/host/kqp_host.h b/ydb/core/kqp/host/kqp_host.h index 7c35b1b5c1..5423b17bb5 100644 --- a/ydb/core/kqp/host/kqp_host.h +++ b/ydb/core/kqp/host/kqp_host.h @@ -3,6 +3,7 @@ #include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h> #include <ydb/core/kqp/gateway/kqp_gateway.h> #include <ydb/core/kqp/provider/yql_kikimr_provider.h> +#include <ydb/core/kqp/host/kqp_translate.h> #include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> #include <ydb/library/yql/providers/common/token_accessor/client/factory.h> diff --git a/ydb/core/kqp/host/kqp_translate.cpp b/ydb/core/kqp/host/kqp_translate.cpp new file mode 100644 index 0000000000..099c46490c --- /dev/null +++ b/ydb/core/kqp/host/kqp_translate.cpp @@ -0,0 +1,201 @@ +#include "kqp_translate.h" + +#include <ydb/library/yql/sql/sql.h> + + +namespace NKikimr { +namespace NKqp { + +NSQLTranslation::EBindingsMode RemapBindingsMode(NKikimrConfig::TTableServiceConfig::EBindingsMode mode) { + switch (mode) { + case NKikimrConfig::TTableServiceConfig::BM_ENABLED: + return NSQLTranslation::EBindingsMode::ENABLED; + case NKikimrConfig::TTableServiceConfig::BM_DISABLED: + return NSQLTranslation::EBindingsMode::DISABLED; + case NKikimrConfig::TTableServiceConfig::BM_DROP_WITH_WARNING: + return NSQLTranslation::EBindingsMode::DROP_WITH_WARNING; + case NKikimrConfig::TTableServiceConfig::BM_DROP: + return NSQLTranslation::EBindingsMode::DROP; + default: + return NSQLTranslation::EBindingsMode::ENABLED; + } +} + +NYql::EKikimrQueryType ConvertType(NKikimrKqp::EQueryType type) { + switch (type) { + case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT: + return NYql::EKikimrQueryType::YqlScript; + + case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING: + return NYql::EKikimrQueryType::YqlScriptStreaming; + + case NKikimrKqp::QUERY_TYPE_SQL_DML: + case NKikimrKqp::QUERY_TYPE_AST_DML: + return NYql::EKikimrQueryType::Dml; + + case NKikimrKqp::QUERY_TYPE_SQL_DDL: + return NYql::EKikimrQueryType::Ddl; + + case NKikimrKqp::QUERY_TYPE_AST_SCAN: + case NKikimrKqp::QUERY_TYPE_SQL_SCAN: + return NYql::EKikimrQueryType::Scan; + + case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY: + case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY: + return NYql::EKikimrQueryType::Query; + + case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT: + return NYql::EKikimrQueryType::Script; + + case NKikimrKqp::QUERY_TYPE_PREPARED_DML: + case NKikimrKqp::QUERY_TYPE_UNDEFINED: + YQL_ENSURE(false, "Unexpected query type: " << type); + } +} + +NSQLTranslation::TTranslationSettings GetTranslationSettings(NYql::EKikimrQueryType queryType, const TMaybe<bool>& usePgParser, bool sqlAutoCommit, + const TString& queryText, std::shared_ptr<std::map<TString, Ydb::Type>> queryParameters, TMaybe<ui16>& sqlVersion, TString cluster, + TString kqpTablePathPrefix, ui16 kqpYqlSyntaxVersion, NSQLTranslation::EBindingsMode bindingsMode, bool isEnableExternalDataSources, + NYql::TExprContext& ctx, bool isEnablePgConstsToParams) { + NSQLTranslation::TTranslationSettings settings{}; + + if (usePgParser) { + settings.PgParser = *usePgParser; + } + + if (settings.PgParser) { + settings.AutoParametrizeEnabled = isEnablePgConstsToParams; + settings.AutoParametrizeEnabledScopes = {"WHERE", "VALUES", "DISTINCT ON", "JOIN ON", "GROUP BY", + "HAVING", "SELECT", "LIMIT", "OFFSET", "RANGE FUNCTION", "GROUPING", "SUBLINK TEST", + "PARTITITON BY", "FRAME", "ORDER BY"}; + } + + if (queryType == NYql::EKikimrQueryType::Scan || queryType == NYql::EKikimrQueryType::Query) { + sqlVersion = sqlVersion ? *sqlVersion : 1; + } + + if (sqlVersion) { + settings.SyntaxVersion = *sqlVersion; + + if (*sqlVersion > 0) { + // Restrict fallback to V0 + settings.V0Behavior = NSQLTranslation::EV0Behavior::Disable; + } + } else { + settings.SyntaxVersion = kqpYqlSyntaxVersion; + settings.V0Behavior = NSQLTranslation::EV0Behavior::Silent; + } + + if (isEnableExternalDataSources) { + settings.DynamicClusterProvider = NYql::KikimrProviderName; + settings.BindingsMode = bindingsMode; + } + + settings.InferSyntaxVersion = true; + settings.V0ForceDisable = false; + settings.WarnOnV0 = false; + settings.DefaultCluster = cluster; + settings.ClusterMapping = {{cluster, TString(NYql::KikimrProviderName)}}; + auto tablePathPrefix = kqpTablePathPrefix; + if (!tablePathPrefix.empty()) { + settings.PathPrefix = tablePathPrefix; + } + + settings.EndOfQueryCommit = sqlAutoCommit; + + settings.Flags.insert("FlexibleTypes"); + settings.Flags.insert("AnsiLike"); + if (queryType == NYql::EKikimrQueryType::Scan + || queryType == NYql::EKikimrQueryType::YqlScript + || queryType == NYql::EKikimrQueryType::YqlScriptStreaming + || queryType == NYql::EKikimrQueryType::Query + || queryType == NYql::EKikimrQueryType::Script) + { + // We enable EmitAggApply for filter and aggregate pushdowns to Column Shards + settings.Flags.insert("EmitAggApply"); + } else { + settings.Flags.insert("DisableEmitStartsWith"); + } + + if (queryType == NYql::EKikimrQueryType::Query || queryType == NYql::EKikimrQueryType::Script) + { + settings.Flags.insert("AnsiOptionalAs"); + settings.Flags.insert("WarnOnAnsiAliasShadowing"); + settings.Flags.insert("AnsiCurrentRow"); + settings.Flags.insert("AnsiInForEmptyOrNullableItemsCollections"); + } + + if (queryParameters) { + NSQLTranslation::TTranslationSettings versionSettings = settings; + NYql::TIssues versionIssues; + + if (ParseTranslationSettings(queryText, versionSettings, versionIssues) && versionSettings.SyntaxVersion == 1) { + for (const auto& [paramName, paramType] : *(queryParameters)) { + auto type = NYql::ParseTypeFromYdbType(paramType, ctx); + if (type != nullptr) { + if (paramName.StartsWith("$")) { + settings.DeclaredNamedExprs[paramName.substr(1)] = NYql::FormatType(type); + } else { + settings.DeclaredNamedExprs[paramName] = NYql::FormatType(type); + } + } + } + } + } + + return settings; +} + +NYql::TAstParseResult ParseQuery(NYql::EKikimrQueryType queryType, const TMaybe<bool>& usePgParser, const TString& queryText, + std::shared_ptr<std::map<TString, Ydb::Type>> queryParameters, bool isSql, bool sqlAutoCommit, + TMaybe<ui16>& sqlVersion, bool& deprecatedSQL, TString cluster, TString kqpTablePathPrefix, + ui16 kqpYqlSyntaxVersion, NSQLTranslation::EBindingsMode bindingsMode, bool isEnableExternalDataSources, + NYql::TExprContext& ctx, bool isEnablePgConstsToParams) { + NYql::TAstParseResult astRes; + if (isSql) { + auto settings = GetTranslationSettings(queryType, usePgParser, sqlAutoCommit, queryText, queryParameters, sqlVersion, cluster, kqpTablePathPrefix, kqpYqlSyntaxVersion, bindingsMode, isEnableExternalDataSources, ctx, isEnablePgConstsToParams); + ui16 actualSyntaxVersion = 0; + auto ast = NSQLTranslation::SqlToYql(queryText, settings, nullptr, &actualSyntaxVersion); + deprecatedSQL = (actualSyntaxVersion == 0); + sqlVersion = actualSyntaxVersion; + return std::move(ast); + } else { + sqlVersion = {}; + deprecatedSQL = true; + return NYql::ParseAst(queryText); + + // Do not check SQL constraints on s-expressions input, as it may come from both V0/V1. + // Constraints were already checked on type annotation of SQL query. + } +} + +TQueryAst ParseQuery(NYql::EKikimrQueryType queryType, const TMaybe<Ydb::Query::Syntax>& syntax, const TString& queryText, std::shared_ptr<std::map<TString, Ydb::Type>> queryParameters, bool isSql, + TString cluster, TString kqpTablePathPrefix, ui16 kqpYqlSyntaxVersion, NSQLTranslation::EBindingsMode bindingsMode, bool isEnableExternalDataSources, bool isEnablePgConstsToParams) { + bool deprecatedSQL; + TMaybe<ui16> sqlVersion; + TMaybe<bool> usePgParser; + if (syntax) + switch (*syntax) { + case Ydb::Query::Syntax::SYNTAX_YQL_V1: + usePgParser = false; + break; + case Ydb::Query::Syntax::SYNTAX_PG: + usePgParser = true; + break; + default: + break; + } + + NYql::TExprContext ctx; + bool sqlAutoCommit; + if (queryType == NYql::EKikimrQueryType::YqlScript || queryType == NYql::EKikimrQueryType::YqlScriptStreaming) { + sqlAutoCommit = true; + } else { + sqlAutoCommit = false; + } + auto astRes = ParseQuery(queryType, usePgParser, queryText, queryParameters, isSql, sqlAutoCommit, sqlVersion, deprecatedSQL, cluster, kqpTablePathPrefix, kqpYqlSyntaxVersion, bindingsMode, isEnableExternalDataSources, ctx, isEnablePgConstsToParams); + return TQueryAst(std::make_shared<NYql::TAstParseResult>(std::move(astRes)), sqlVersion, deprecatedSQL); +} + +} // namespace NKqp +} // namespace NKikimr diff --git a/ydb/core/kqp/host/kqp_translate.h b/ydb/core/kqp/host/kqp_translate.h new file mode 100644 index 0000000000..bf218b8464 --- /dev/null +++ b/ydb/core/kqp/host/kqp_translate.h @@ -0,0 +1,28 @@ +#pragma once + +#include <ydb/core/kqp/provider/yql_kikimr_results.h> +#include <ydb/core/kqp/common/kqp.h> + +namespace NKikimr { +namespace NKqp { + +NSQLTranslation::EBindingsMode RemapBindingsMode(NKikimrConfig::TTableServiceConfig::EBindingsMode mode); + +NYql::EKikimrQueryType ConvertType(NKikimrKqp::EQueryType type); + +NSQLTranslation::TTranslationSettings GetTranslationSettings(NYql::EKikimrQueryType queryType, const TMaybe<bool>& usePgParser, bool sqlAutoCommit, + const TString& queryText, std::shared_ptr<std::map<TString, Ydb::Type>> queryParameters, TMaybe<ui16>& sqlVersion, TString cluster, + TString kqpTablePathPrefix, ui16 kqpYqlSyntaxVersion, NSQLTranslation::EBindingsMode bindingsMode, bool isEnableExternalDataSources, NYql::TExprContext& ctx, + bool isEnablePgConstsToParams); + +NYql::TAstParseResult ParseQuery(NYql::EKikimrQueryType queryType, const TMaybe<bool>& usePgParser, const TString& queryText, + std::shared_ptr<std::map<TString, Ydb::Type>> queryParameters, bool isSql, bool sqlAutoCommit, TMaybe<ui16>& sqlVersion, + bool& deprecatedSQL, TString cluster, TString kqpTablePathPrefix, ui16 kqpYqlSyntaxVersion, NSQLTranslation::EBindingsMode bindingsMode, + bool isEnableExternalDataSources, NYql::TExprContext& ctx, bool isEnablePgConstsToParams); + +TQueryAst ParseQuery(NYql::EKikimrQueryType queryType, const TMaybe<Ydb::Query::Syntax>& syntax, const TString& queryText, + std::shared_ptr<std::map<TString, Ydb::Type>> queryParameters, bool isSql, TString cluster, TString kqpTablePathPrefix, + ui16 kqpYqlSyntaxVersion, NSQLTranslation::EBindingsMode bindingsMode, bool isEnableExternalDataSources, bool isEnablePgConstsToParams); + +} // namespace NKqp +} // namespace NKikimr diff --git a/ydb/core/kqp/host/ya.make b/ydb/core/kqp/host/ya.make index 0b7ff0368b..cb9a5cfc95 100644 --- a/ydb/core/kqp/host/ya.make +++ b/ydb/core/kqp/host/ya.make @@ -6,6 +6,7 @@ SRCS( kqp_host.cpp kqp_runner.cpp kqp_transform.cpp + kqp_translate.cpp kqp_type_ann.cpp ) diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.h b/ydb/core/kqp/provider/yql_kikimr_settings.h index 01bbc5cd71..0e0b23b7ca 100644 --- a/ydb/core/kqp/provider/yql_kikimr_settings.h +++ b/ydb/core/kqp/provider/yql_kikimr_settings.h @@ -159,6 +159,8 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi bool EnableColumnsWithDefault = false; NSQLTranslation::EBindingsMode BindingsMode = NSQLTranslation::EBindingsMode::ENABLED; NKikimrConfig::TTableServiceConfig_EIndexAutoChooseMode IndexAutoChooserMode; + bool EnableAstCache = false; + bool EnablePgConstsToParams = false; }; } diff --git a/ydb/core/kqp/query_data/kqp_query_data.cpp b/ydb/core/kqp/query_data/kqp_query_data.cpp index 03d1ea7108..c0ef1de5b4 100644 --- a/ydb/core/kqp/query_data/kqp_query_data.cpp +++ b/ydb/core/kqp/query_data/kqp_query_data.cpp @@ -475,7 +475,6 @@ void TQueryData::Clear() { { auto g = TypeEnv().BindAllocator(); Params.clear(); - TUnboxedParamsMap emptyMap; UnboxedData.swap(emptyMap); diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 2b5c68ffcf..9dd31b0db9 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -693,7 +693,16 @@ public: } try { - QueryState->QueryData->ParseParameters(QueryState->GetYdbParameters()); + auto parameters = QueryState->GetYdbParameters(); + if (QueryState->CompileResult && QueryState->CompileResult->Ast) { + auto& params = QueryState->CompileResult->Ast->PgAutoParamValues; + if (params) { + for(const auto& [name, param] : *params) { + parameters.insert({name, param}); + } + } + } + QueryState->QueryData->ParseParameters(parameters); } catch(const yexception& ex) { ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << ex.what(); } @@ -1172,7 +1181,7 @@ public: ExecuterId = TActorId{}; if (response->GetStatus() != Ydb::StatusIds::SUCCESS) { - LOG_I("TEvTxResponse has non-success status, CurrentTx: " << QueryState->CurrentTx); + LOG_D("TEvTxResponse has non-success status, CurrentTx: " << QueryState->CurrentTx); auto status = response->GetStatus(); TIssues issues; diff --git a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp index c185c3be20..6fd21e3ca4 100644 --- a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp +++ b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp @@ -2851,6 +2851,213 @@ Y_UNIT_TEST_SUITE(KqpPg) { )", FormatResultSetYson(result.GetResultSet(0))); } } + + Y_UNIT_TEST(CheckPgAutoParams) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true); + appConfig.MutableTableServiceConfig()->SetEnableAstCache(true); + auto settings = NYdb::NQuery::TExecuteQuerySettings() + .Syntax(NYdb::NQuery::ESyntax::Pg) + .StatsMode(NYdb::NQuery::EStatsMode::Basic); + + { + // Check disable setting + appConfig.MutableTableServiceConfig()->SetEnablePgConstsToParams(false); + auto setting = NKikimrKqp::TKqpSetting(); + auto serverSettings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetKqpSettings({setting}); + TKikimrRunner kikimr(serverSettings.SetWithSampleTables(false)); + auto db = kikimr.GetQueryClient(); + const auto query = Q_(R"( + CREATE TABLE PgTable ( + key int4 PRIMARY KEY, + value text + ))"); + db.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync(); + + { + const auto query = Q_(R"( + SELECT * FROM PgTable WHERE key = 1; + )"); + auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto stats1 = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats1.compilation().from_cache(), false); + } + + { + const auto query = Q_(R"( + SELECT * FROM PgTable WHERE key = 2; + )"); + auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false); + } + } + + appConfig.MutableTableServiceConfig()->SetEnablePgConstsToParams(true); + auto setting = NKikimrKqp::TKqpSetting(); + auto serverSettings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetKqpSettings({setting}); + TKikimrRunner kikimr(serverSettings.SetWithSampleTables(false)); + auto db = kikimr.GetQueryClient(); + const auto query = Q_(R"( + CREATE TABLE PgTable ( + key int4 PRIMARY KEY, + value text + ))"); + db.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync(); + + { + // Check the same queries and differend values + { + const auto query = Q_(R"( + SELECT * FROM PgTable WHERE key = 3; + )"); + auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false); + } + + { + const auto query = Q_(R"( + SELECT * FROM PgTable WHERE key = 4; + )"); + auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), true); + } + } + + { + // Check values without table + { + const auto query = Q_(R"( + SELECT 1; + )"); + auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false); + } + + { + const auto query = Q_(R"( + SELECT 'a'; + )"); + auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false); + } + + { + const auto query = Q_(R"( + SELECT true; + )"); + auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false); + } + + { + const auto query = Q_(R"( + SELECT 2; + )"); + auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), true); + } + + { + const auto query = Q_(R"( + SELECT (1, 2); + )"); + auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + UNIT_ASSERT(result.GetIssues().ToString().Contains("alternative is not implemented yet : 138")); + } + } + + { + // Check wrong values type for table + { + const auto query = Q_(R"( + SELECT * FROM PgTable WHERE key = '3'; + )"); + auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + { + const auto query = Q_(R"( + SELECT * FROM PgTable WHERE value = 4; + )"); + auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + UNIT_ASSERT(result.GetIssues().ToString().Contains("Unable to find an overload for operator = with given argument type(s): (text,int4)")); + } + + { + const auto query = Q_(R"( + SELECT * FROM PgTable WHERE key = 3 and value = 4; + )"); + auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + UNIT_ASSERT(result.GetIssues().ToString().Contains("Unable to find an overload for operator = with given argument type(s): (text,int4)")); + } + + { + const auto query = Q_(R"( + SELECT * FROM PgTable WHERE key = 'a'; + )"); + auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + } + + { + // Check insert + auto result = db.ExecuteQuery(R"( + CREATE TABLE PgTable1(id int4, value text, primary key(id)); + )", NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + result = db.ExecuteQuery(R"( + CREATE TABLE PgTable2(id uint64, primary key(id)); + )", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + { + auto result = db.ExecuteQuery(R"( + INSERT INTO PgTable1 VALUES (1, 'a', 'a'); + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + UNIT_ASSERT(result.GetIssues().ToString().Contains("VALUES have 3 columns, INSERT INTO expects: 2")); + } + { + auto result = db.ExecuteQuery(R"( + INSERT INTO PgTable2 VALUES ('a'); + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + UNIT_ASSERT(result.GetIssues().ToString().Contains("Failed to convert 'id': pgunknown to Optional<Uint64>")); + } + { + auto result = db.ExecuteQuery(R"( + INSERT INTO PgTable1 VALUES ('a', 1); + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString()); + UNIT_ASSERT(result.GetIssues().ToString().Contains("invalid input syntax for type integer: \"a\"")); + } + } + } } } // namespace NKqp diff --git a/ydb/core/kqp/ut/query/kqp_params_ut.cpp b/ydb/core/kqp/ut/query/kqp_params_ut.cpp index 185080d65e..820bf125c2 100644 --- a/ydb/core/kqp/ut/query/kqp_params_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_params_ut.cpp @@ -319,6 +319,80 @@ Y_UNIT_TEST_SUITE(KqpParams) { UNIT_ASSERT_VALUES_EQUAL_C(prepareResult.GetStatus(), EStatus::SUCCESS, prepareResult.GetIssues().ToString()); } + Y_UNIT_TEST(CheckCacheByAst) { + auto query1 = Q1_(R"( + SELECT * FROM `/Root/Test` WHERE Group = 1 AND Name = "2"; + )"); + auto query2 = Q1_(R"( + select * from `/Root/Test` where Group = 1 AND Name = "2"; + )"); + + NYdb::NTable::TExecDataQuerySettings execSettings; + execSettings.KeepInQueryCache(true); + execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + + { + // Check disable setting + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableAstCache(false); + auto setting = NKikimrKqp::TKqpSetting(); + auto serverSettings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetKqpSettings({setting}); + TKikimrRunner kikimr(serverSettings.SetWithSampleTables(true)); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + auto result = session.ExecuteDataQuery(query1, TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), execSettings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false); + + result = session.ExecuteDataQuery(query2, TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), execSettings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false); + } + + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableAstCache(true); + auto setting = NKikimrKqp::TKqpSetting(); + auto serverSettings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetKqpSettings({setting}); + + { + // Check 2 exec queries + TKikimrRunner kikimr(serverSettings.SetWithSampleTables(true)); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + auto result = session.ExecuteDataQuery(query1, TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), execSettings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false); + + result = session.ExecuteDataQuery(query2, TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), execSettings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), true); + } + { + // Check Prepare and Exec queries + TKikimrRunner kikimr(serverSettings.SetWithSampleTables(true)); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + auto prepareResult = session.PrepareDataQuery(query1).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(prepareResult.GetStatus(), EStatus::SUCCESS, prepareResult.GetIssues().ToString()); + + auto execResult = session.ExecuteDataQuery(query2, TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), execSettings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(execResult.GetStatus(), EStatus::SUCCESS, execResult.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*execResult.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), true); + } + } + Y_UNIT_TEST(ExplicitSameParameterTypesQueryCacheCheck) { TKikimrRunner kikimr; auto db = kikimr.GetTableClient(); diff --git a/ydb/core/protos/table_service_config.proto b/ydb/core/protos/table_service_config.proto index 42ca5f8303..6971597554 100644 --- a/ydb/core/protos/table_service_config.proto +++ b/ydb/core/protos/table_service_config.proto @@ -267,4 +267,7 @@ message TTableServiceConfig { optional EIndexAutoChooseMode IndexAutoChooseMode = 50 [default = DISABLED]; optional bool EnableColumnsWithDefault = 51 [default = true]; + + optional bool EnableAstCache = 52 [default = false]; + optional bool EnablePgConstsToParams = 53 [default = false]; }; diff --git a/ydb/library/yql/ast/yql_ast.cpp b/ydb/library/yql/ast/yql_ast.cpp index f260a65de6..f4405646ef 100644 --- a/ydb/library/yql/ast/yql_ast.cpp +++ b/ydb/library/yql/ast/yql_ast.cpp @@ -607,6 +607,7 @@ TAstParseResult::TAstParseResult(TAstParseResult&& other) : Pool(std::move(other.Pool)) , Root(other.Root) , Issues(std::move(other.Issues)) + , PgAutoParamValues(std::move(other.PgAutoParamValues)) { other.Root = nullptr; } @@ -617,6 +618,7 @@ TAstParseResult& TAstParseResult::operator=(TAstParseResult&& other) { Root = other.Root; other.Root = nullptr; Issues = std::move(other.Issues); + PgAutoParamValues = std::move(other.PgAutoParamValues); return *this; } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_map.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_map.cpp index 02f4867f0c..ce1b534cf8 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_map.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_map.cpp @@ -304,12 +304,12 @@ public: if (auto elements = list.GetElements()) { auto size = list.GetListLength(); NUdf::TUnboxedValue* items = nullptr; - const auto result = ctx.HolderFactory.CreateDirectArrayHolder(size, items); + NUdf::TUnboxedValue result = ctx.HolderFactory.CreateDirectArrayHolder(size, items); while (size--) { Item->SetValue(ctx, NUdf::TUnboxedValue(*elements++)); *items++ = NewItem->GetValue(ctx); } - return result; + return result.Release(); } return ctx.HolderFactory.Create<TListValue>(ctx, std::move(list), Item, NewItem); |