diff options
author | yuryalekseev <[email protected]> | 2023-05-11 18:01:57 +0300 |
---|---|---|
committer | yuryalekseev <[email protected]> | 2023-05-11 18:01:57 +0300 |
commit | 27c981a547473853468293ef69a23f7a3b766edb (patch) | |
tree | f1692757ec574daa131c19c7e83348e578e2fdec | |
parent | f92d86b7c6a95421b7ba1796565b3e4c2780cf34 (diff) |
Pass query parameter types over to translation settings in kqp.
26 files changed, 469 insertions, 116 deletions
diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h index 9894f5f0b64..a6ec8203340 100644 --- a/ydb/core/kqp/common/kqp.h +++ b/ydb/core/kqp/common/kqp.h @@ -3,6 +3,7 @@ #include "kqp_event_ids.h" #include "simple/helpers.h" #include "simple/query_id.h" +#include "simple/query_ref.h" #include "simple/settings.h" #include "simple/services.h" #include "events/events.h" @@ -19,10 +20,16 @@ #include <ydb/library/yql/dq/actors/dq.h> #include <ydb/library/yql/public/issue/yql_issue.h> #include <ydb/public/api/protos/ydb_status_codes.pb.h> +#include <ydb/public/api/protos/ydb_value.pb.h> #include <ydb/public/api/protos/draft/ydb_query.pb.h> +#include <google/protobuf/util/message_differencer.h> + #include <util/generic/guid.h> #include <util/generic/ptr.h> +#include <util/generic/string.h> + +#include <map> namespace NKikimr::NKqp { diff --git a/ydb/core/kqp/common/simple/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/common/simple/CMakeLists.darwin-x86_64.txt index 6833c55e76a..e67856840ed 100644 --- a/ydb/core/kqp/common/simple/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/common/simple/CMakeLists.darwin-x86_64.txt @@ -11,9 +11,11 @@ add_library(kqp-common-simple) target_link_libraries(kqp-common-simple PUBLIC contrib-libs-cxxsupp yutil + contrib-libs-protobuf + ydb-core-base ydb-core-protos yql-dq-actors - ydb-core-base + api-protos ) target_sources(kqp-common-simple PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/helpers.cpp diff --git a/ydb/core/kqp/common/simple/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/common/simple/CMakeLists.linux-aarch64.txt index f9217daae45..b8ddc513eb4 100644 --- a/ydb/core/kqp/common/simple/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/common/simple/CMakeLists.linux-aarch64.txt @@ -12,9 +12,11 @@ target_link_libraries(kqp-common-simple PUBLIC contrib-libs-linux-headers contrib-libs-cxxsupp yutil + contrib-libs-protobuf + ydb-core-base ydb-core-protos yql-dq-actors - ydb-core-base + api-protos ) target_sources(kqp-common-simple PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/helpers.cpp diff --git a/ydb/core/kqp/common/simple/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/common/simple/CMakeLists.linux-x86_64.txt index f9217daae45..b8ddc513eb4 100644 --- a/ydb/core/kqp/common/simple/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/common/simple/CMakeLists.linux-x86_64.txt @@ -12,9 +12,11 @@ target_link_libraries(kqp-common-simple PUBLIC contrib-libs-linux-headers contrib-libs-cxxsupp yutil + contrib-libs-protobuf + ydb-core-base ydb-core-protos yql-dq-actors - ydb-core-base + api-protos ) target_sources(kqp-common-simple PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/helpers.cpp diff --git a/ydb/core/kqp/common/simple/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/common/simple/CMakeLists.windows-x86_64.txt index 6833c55e76a..e67856840ed 100644 --- a/ydb/core/kqp/common/simple/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/common/simple/CMakeLists.windows-x86_64.txt @@ -11,9 +11,11 @@ add_library(kqp-common-simple) target_link_libraries(kqp-common-simple PUBLIC contrib-libs-cxxsupp yutil + contrib-libs-protobuf + ydb-core-base ydb-core-protos yql-dq-actors - ydb-core-base + api-protos ) target_sources(kqp-common-simple PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/simple/helpers.cpp diff --git a/ydb/core/kqp/common/simple/query_id.cpp b/ydb/core/kqp/common/simple/query_id.cpp index af7b3e139af..09c57a7f267 100644 --- a/ydb/core/kqp/common/simple/query_id.cpp +++ b/ydb/core/kqp/common/simple/query_id.cpp @@ -1,14 +1,22 @@ #include "query_id.h" #include "helpers.h" + +#include <ydb/public/api/protos/ydb_value.pb.h> +#include <google/protobuf/util/message_differencer.h> + #include <util/generic/yexception.h> +#include <map> +#include <memory> + namespace NKikimr::NKqp { -TKqpQueryId::TKqpQueryId(const TString& cluster, const TString& database, const TString& text, NKikimrKqp::EQueryType type) +TKqpQueryId::TKqpQueryId(const TString& cluster, const TString& database, const TString& text, NKikimrKqp::EQueryType type, std::shared_ptr<std::map<TString, Ydb::Type>> queryParameterTypes) : Cluster(cluster) , Database(database) , Text(text) , QueryType(type) + , QueryParameterTypes(queryParameterTypes) { switch (QueryType) { case NKikimrKqp::QUERY_TYPE_SQL_DML: @@ -28,4 +36,40 @@ bool TKqpQueryId::IsSql() const { return IsSqlQuery(QueryType); } +bool TKqpQueryId::operator==(const TKqpQueryId& other) const { + if (!(Cluster == other.Cluster && + Database == other.Database && + UserSid == other.UserSid && + Text == other.Text && + Settings == other.Settings && + QueryType == other.QueryType && + !QueryParameterTypes == !other.QueryParameterTypes)) { + return false; + } + + if (!QueryParameterTypes) { + return true; + } + + if (QueryParameterTypes->size() != other.QueryParameterTypes->size()) { + return false; + } + + for (auto it = QueryParameterTypes->begin(), otherIt = other.QueryParameterTypes->begin(); it != QueryParameterTypes->end(); ++it, ++otherIt) { + if (it->first != otherIt->first) { + return false; + } + + const auto& type = it->second; + const auto& otherType = otherIt->second; + + // we can't use type.SerializeAsString() == otherType.SerializeAsString() here since serialization of protobufs is unstable + if (!google::protobuf::util::MessageDifferencer::Equals(type, otherType)) { + return false; + } + } + + return true; +} + } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/common/simple/query_id.h b/ydb/core/kqp/common/simple/query_id.h index d73e36d0000..9d966725777 100644 --- a/ydb/core/kqp/common/simple/query_id.h +++ b/ydb/core/kqp/common/simple/query_id.h @@ -1,7 +1,12 @@ #pragma once #include "settings.h" -#include <util/generic/string.h> #include <ydb/core/protos/kqp.pb.h> +#include <ydb/public/api/protos/ydb_value.pb.h> + +#include <util/generic/string.h> + +#include <map> +#include <memory> namespace NKikimr::NKqp { @@ -12,21 +17,14 @@ struct TKqpQueryId { TString Text; TKqpQuerySettings Settings; NKikimrKqp::EQueryType QueryType; + std::shared_ptr<std::map<TString, Ydb::Type>> QueryParameterTypes; public: - TKqpQueryId(const TString& cluster, const TString& database, const TString& text, NKikimrKqp::EQueryType type); + TKqpQueryId(const TString& cluster, const TString& database, const TString& text, NKikimrKqp::EQueryType type, std::shared_ptr<std::map<TString, Ydb::Type>> queryParameterTypes); bool IsSql() const; - bool operator==(const TKqpQueryId& other) const { - return - Cluster == other.Cluster && - Database == other.Database && - UserSid == other.UserSid && - Text == other.Text && - Settings == other.Settings && - QueryType == other.QueryType; - } + bool operator==(const TKqpQueryId& other) const; bool operator!=(const TKqpQueryId& other) { return !(*this == other); @@ -38,7 +36,7 @@ public: bool operator>=(const TKqpQueryId&) = delete; size_t GetHash() const noexcept { - auto tuple = std::make_tuple(Cluster, Database, UserSid, Text, Settings, QueryType); + auto tuple = std::make_tuple(Cluster, Database, UserSid, Text, Settings, QueryType, QueryParameterTypes ? QueryParameterTypes->size() : 0u); return THash<decltype(tuple)>()(tuple); } }; diff --git a/ydb/core/kqp/common/simple/query_ref.h b/ydb/core/kqp/common/simple/query_ref.h new file mode 100644 index 00000000000..4c65926f7aa --- /dev/null +++ b/ydb/core/kqp/common/simple/query_ref.h @@ -0,0 +1,23 @@ +#pragma once + +#include <ydb/public/api/protos/ydb_value.pb.h> + +#include <util/generic/string.h> + +#include <map> +#include <memory> + +namespace NKikimr::NKqp { + +struct TKqpQueryRef { + TKqpQueryRef(const TString& text, std::shared_ptr<std::map<TString, Ydb::Type>> parameterTypes = {}) + : Text(text) + , ParameterTypes(parameterTypes) + {} + + // Text is owned by TKqpQueryId + const TString& Text; + std::shared_ptr<std::map<TString, Ydb::Type>> ParameterTypes; +}; + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index 38cfd658a3d..423a55de80a 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -43,7 +43,7 @@ public: TKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings, const TTableServiceConfig& serviceConfig, NYql::IHTTPGateway::TPtr httpGateway, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, - const TString& uid, const TKqpQueryId& query, + const TString& uid, const TKqpQueryId& queryId, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpDbCountersPtr dbCounters, NWilson::TTraceId traceId) : Owner(owner) @@ -51,17 +51,18 @@ public: , ModuleResolverState(moduleResolverState) , Counters(counters) , Uid(uid) - , Query(query) + , QueryId(queryId) + , QueryRef(QueryId.Text, QueryId.QueryParameterTypes) , UserToken(userToken) , DbCounters(dbCounters) , Config(MakeIntrusive<TKikimrConfiguration>()) , CompilationTimeout(TDuration::MilliSeconds(serviceConfig.GetCompileTimeoutMs())) , CompileActorSpan(TWilsonKqp::CompileActor, std::move(traceId), "CompileActor") { - Config->Init(kqpSettings->DefaultSettings.GetDefaultSettings(), Query.Cluster, kqpSettings->Settings, false); + Config->Init(kqpSettings->DefaultSettings.GetDefaultSettings(), QueryId.Cluster, kqpSettings->Settings, false); - if (!Query.Database.empty()) { - Config->_KqpTablePathPrefix = Query.Database; + if (!QueryId.Database.empty()) { + Config->_KqpTablePathPrefix = QueryId.Database; } ApplyServiceConfig(*Config, serviceConfig); @@ -80,9 +81,9 @@ public: LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_ACTOR, "Start compilation" << ", self: " << ctx.SelfID - << ", cluster: " << Query.Cluster - << ", database: " << Query.Database - << ", text: \"" << EscapeC(Query.Text) << "\"" + << ", cluster: " << QueryId.Cluster + << ", database: " << QueryId.Database + << ", text: \"" << EscapeC(QueryId.Text) << "\"" << ", startTime: " << StartTime); TimeoutTimerActorId = CreateLongTimer(ctx, CompilationTimeout, new IEventHandle(SelfId(), SelfId(), @@ -96,45 +97,45 @@ public: counters->TxProxyMon = new NTxProxy::TTxProxyMon(AppData(ctx)->Counters); std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>(TlsActivationContext->ActorSystem(), true); - Gateway = CreateKikimrIcGateway(Query.Cluster, Query.Database, std::move(loader), + Gateway = CreateKikimrIcGateway(QueryId.Cluster, QueryId.Database, std::move(loader), ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), counters); - Gateway->SetToken(Query.Cluster, UserToken); + Gateway->SetToken(QueryId.Cluster, UserToken); Config->FeatureFlags = AppData(ctx)->FeatureFlags; - KqpHost = CreateKqpHost(Gateway, Query.Cluster, Query.Database, Config, ModuleResolverState->ModuleResolver, - HttpGateway, AppData(ctx)->FunctionRegistry, false, Query.Settings.IsInternalCall); + KqpHost = CreateKqpHost(Gateway, QueryId.Cluster, QueryId.Database, Config, ModuleResolverState->ModuleResolver, + HttpGateway, AppData(ctx)->FunctionRegistry, false); IKqpHost::TPrepareSettings prepareSettings; - prepareSettings.DocumentApiRestricted = Query.Settings.DocumentApiRestricted; - prepareSettings.IsInternalCall = Query.Settings.IsInternalCall; + prepareSettings.DocumentApiRestricted = QueryId.Settings.DocumentApiRestricted; + prepareSettings.IsInternalCall = QueryId.Settings.IsInternalCall; NCpuTime::TCpuTimer timer(CompileCpuTime); - switch (Query.QueryType) { + switch (QueryId.QueryType) { case NKikimrKqp::QUERY_TYPE_SQL_DML: - AsyncCompileResult = KqpHost->PrepareDataQuery(Query.Text, prepareSettings); + AsyncCompileResult = KqpHost->PrepareDataQuery(QueryRef, prepareSettings); break; case NKikimrKqp::QUERY_TYPE_AST_DML: - AsyncCompileResult = KqpHost->PrepareDataQueryAst(Query.Text, prepareSettings); + AsyncCompileResult = KqpHost->PrepareDataQueryAst(QueryRef, prepareSettings); break; case NKikimrKqp::QUERY_TYPE_SQL_SCAN: case NKikimrKqp::QUERY_TYPE_AST_SCAN: - AsyncCompileResult = KqpHost->PrepareScanQuery(Query.Text, Query.IsSql(), prepareSettings); + AsyncCompileResult = KqpHost->PrepareScanQuery(QueryRef, QueryId.IsSql(), prepareSettings); break; case NKikimrKqp::QUERY_TYPE_SQL_QUERY: - AsyncCompileResult = KqpHost->PrepareQuery(Query.Text, prepareSettings); + AsyncCompileResult = KqpHost->PrepareQuery(QueryRef, prepareSettings); break; case NKikimrKqp::QUERY_TYPE_FEDERATED_QUERY: - AsyncCompileResult = KqpHost->PrepareFederatedQuery(Query.Text, prepareSettings); + AsyncCompileResult = KqpHost->PrepareFederatedQuery(QueryRef, prepareSettings); break; default: - YQL_ENSURE(false, "Unexpected query type: " << Query.QueryType); + YQL_ENSURE(false, "Unexpected query type: " << QueryId.QueryType); } Continue(ctx); @@ -187,14 +188,21 @@ private: replayMessage.InsertValue("query_id", Uid); replayMessage.InsertValue("version", "1.0"); - replayMessage.InsertValue("query_text", EscapeC(Query.Text)); + replayMessage.InsertValue("query_text", EscapeC(QueryId.Text)); + NJson::TJsonValue queryParameterTypes(NJson::JSON_MAP); + if (QueryId.QueryParameterTypes) { + for (const auto& [paramName, paramType] : *QueryId.QueryParameterTypes) { + queryParameterTypes[paramName] = Base64Encode(paramType.SerializeAsString()); + } + } + replayMessage.InsertValue("query_parameter_types", std::move(queryParameterTypes)); replayMessage.InsertValue("table_metadata", TString(NJson::WriteJson(tablesMeta, false))); replayMessage.InsertValue("created_at", ToString(TlsActivationContext->ActorSystem()->Timestamp().Seconds())); replayMessage.InsertValue("query_syntax", ToString(Config->_KqpYqlSyntaxVersion.Get().GetRef())); - replayMessage.InsertValue("query_database", Query.Database); - replayMessage.InsertValue("query_cluster", Query.Cluster); + replayMessage.InsertValue("query_database", QueryId.Database); + replayMessage.InsertValue("query_cluster", QueryId.Cluster); replayMessage.InsertValue("query_plan", queryPlan); - replayMessage.InsertValue("query_type", ToString(Query.QueryType)); + replayMessage.InsertValue("query_type", ToString(QueryId.QueryType)); TString message(NJson::WriteJson(replayMessage, /*formatOutput*/ false)); LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_COMPILE_ACTOR, "[" << SelfId() << "]: " << "Built the replay message " << message); @@ -230,7 +238,7 @@ private: } void ReplyError(Ydb::StatusIds::StatusCode status, const TIssues& issues) { - Reply(TKqpCompileResult::Make(Uid, std::move(Query), status, issues, ETableReadType::Other)); + Reply(TKqpCompileResult::Make(Uid, std::move(QueryId), status, issues, ETableReadType::Other)); } void InternalError(const TString message) { @@ -264,7 +272,7 @@ private: auto kqpResult = std::move(AsyncCompileResult->GetResult()); auto status = GetYdbStatus(kqpResult); - auto database = Query.Database; + auto database = QueryId.Database; if (kqpResult.SqlVersion) { Counters->ReportSqlVersion(DbCounters, *kqpResult.SqlVersion); } @@ -275,14 +283,16 @@ private: ETableReadType maxReadType = ExtractMostHeavyReadType(kqpResult.QueryPlan); - KqpCompileResult = TKqpCompileResult::Make(Uid, std::move(Query), status, kqpResult.Issues(), maxReadType); + auto queryType = QueryId.QueryType; + + KqpCompileResult = TKqpCompileResult::Make(Uid, std::move(QueryId), status, kqpResult.Issues(), maxReadType); if (status == Ydb::StatusIds::SUCCESS) { YQL_ENSURE(kqpResult.PreparingQuery); { auto preparedQueryHolder = std::make_shared<TPreparedQueryHolder>( kqpResult.PreparingQuery.release(), AppData()->FunctionRegistry); - preparedQueryHolder->MutableLlvmSettings().Fill(Config, Query.QueryType); + preparedQueryHolder->MutableLlvmSettings().Fill(Config, queryType); KqpCompileResult->PreparedQuery = preparedQueryHolder; } @@ -307,9 +317,9 @@ private: void HandleTimeout() { ALOG_NOTICE(NKikimrServices::KQP_COMPILE_ACTOR, "Compilation timeout" << ", self: " << SelfId() - << ", cluster: " << Query.Cluster - << ", database: " << Query.Database - << ", text: \"" << EscapeC(Query.Text) << "\"" + << ", cluster: " << QueryId.Cluster + << ", database: " << QueryId.Database + << ", text: \"" << EscapeC(QueryId.Text) << "\"" << ", startTime: " << StartTime); NYql::TIssue issue(NYql::TPosition(), "Query compilation timed out."); @@ -322,7 +332,8 @@ private: TIntrusivePtr<TModuleResolverState> ModuleResolverState; TIntrusivePtr<TKqpCounters> Counters; TString Uid; - TKqpQueryId Query; + TKqpQueryId QueryId; + TKqpQueryRef QueryRef; TIntrusiveConstPtr<NACLib::TUserToken> UserToken; TKqpDbCountersPtr DbCounters; TKikimrConfiguration::TPtr Config; @@ -365,7 +376,7 @@ IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstP TKqpDbCountersPtr dbCounters, NWilson::TTraceId traceId) { return new TKqpCompileActor(owner, kqpSettings, serviceConfig, std::move(httpGateway), moduleResolverState, counters, uid, - std::move(query), userToken, dbCounters, std::move(traceId)); + query, userToken, dbCounters, std::move(traceId)); } } // namespace NKqp diff --git a/ydb/core/kqp/host/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/host/CMakeLists.darwin-x86_64.txt index 3e80449fbe0..a970dc1965c 100644 --- a/ydb/core/kqp/host/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/host/CMakeLists.darwin-x86_64.txt @@ -19,6 +19,7 @@ target_link_libraries(core-kqp-host PUBLIC core-kqp-opt core-kqp-provider tx-long_tx_service-public + library-yql-ast yql-core-services yql-minikql-invoke_builtins library-yql-sql diff --git a/ydb/core/kqp/host/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/host/CMakeLists.linux-aarch64.txt index 267052dcc1d..99f93f4f5fd 100644 --- a/ydb/core/kqp/host/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/host/CMakeLists.linux-aarch64.txt @@ -20,6 +20,7 @@ target_link_libraries(core-kqp-host PUBLIC core-kqp-opt core-kqp-provider tx-long_tx_service-public + library-yql-ast yql-core-services yql-minikql-invoke_builtins library-yql-sql diff --git a/ydb/core/kqp/host/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/host/CMakeLists.linux-x86_64.txt index 267052dcc1d..99f93f4f5fd 100644 --- a/ydb/core/kqp/host/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/host/CMakeLists.linux-x86_64.txt @@ -20,6 +20,7 @@ target_link_libraries(core-kqp-host PUBLIC core-kqp-opt core-kqp-provider tx-long_tx_service-public + library-yql-ast yql-core-services yql-minikql-invoke_builtins library-yql-sql diff --git a/ydb/core/kqp/host/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/host/CMakeLists.windows-x86_64.txt index 3e80449fbe0..a970dc1965c 100644 --- a/ydb/core/kqp/host/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/host/CMakeLists.windows-x86_64.txt @@ -19,6 +19,7 @@ target_link_libraries(core-kqp-host PUBLIC core-kqp-opt core-kqp-provider tx-long_tx_service-public + library-yql-ast yql-core-services yql-minikql-invoke_builtins library-yql-sql diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index 9f5a678f65a..3e3cec7e914 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -2,10 +2,13 @@ #include <ydb/core/base/appdata.h> #include <ydb/core/external_sources/external_source_factory.h> +#include <ydb/core/kqp/common/kqp.h> #include <ydb/core/kqp/common/kqp_yql.h> #include <ydb/core/kqp/opt/kqp_query_plan.h> #include <ydb/core/kqp/provider/yql_kikimr_provider_impl.h> +#include <ydb/core/kqp/provider/yql_kikimr_results.h> +#include <ydb/library/yql/ast/yql_type_string.h> #include <ydb/library/yql/core/yql_opt_proposed_by_data.h> #include <ydb/library/yql/core/services/yql_plan.h> #include <ydb/library/yql/core/services/yql_transform_pipeline.h> @@ -310,16 +313,17 @@ public: using TResult = IKqpHost::TQueryResult; TAsyncPrepareYqlResult(TExprNode* queryRoot, TExprContext& exprCtx, IGraphTransformer& transformer, - TIntrusivePtr<TKikimrQueryContext> queryCtx, const TString& queryText, TMaybe<TSqlVersion> sqlVersion) + TIntrusivePtr<TKikimrQueryContext> queryCtx, const TKqpQueryRef& query, TMaybe<TSqlVersion> sqlVersion) : TKqpAsyncResultBase(queryRoot, exprCtx, transformer) , QueryCtx(queryCtx) - , QueryText(queryText) + , QueryText(query.Text) , SqlVersion(sqlVersion) {} void FillResult(TResult& prepareResult) const override { YQL_ENSURE(QueryCtx->PrepareOnly); YQL_ENSURE(QueryCtx->PreparingQuery); + // TODO: it's a const function, why do we move from class members? prepareResult.PreparingQuery = std::move(QueryCtx->PreparingQuery); prepareResult.PreparingQuery->SetText(std::move(QueryText)); prepareResult.SqlVersion = SqlVersion; @@ -907,77 +911,77 @@ public: SessionCtx->SetDatabase(database); } - IAsyncQueryResultPtr ExecuteSchemeQuery(const TString& query, bool isSql) override { + IAsyncQueryResultPtr ExecuteSchemeQuery(const TKqpQueryRef& query, bool isSql) override { return CheckedProcessQuery(*ExprCtx, [this, &query, isSql] (TExprContext& ctx) { return ExecuteSchemeQueryInternal(query, isSql, ctx); }); } - TQueryResult SyncExecuteSchemeQuery(const TString& query, bool isSql) override { + TQueryResult SyncExecuteSchemeQuery(const TKqpQueryRef& query, bool isSql) override { return CheckedSyncProcessQuery( [this, &query, isSql] () { return ExecuteSchemeQuery(query, isSql); }); } - IAsyncQueryResultPtr ExplainDataQuery(const TString& query, bool isSql) override { + IAsyncQueryResultPtr ExplainDataQuery(const TKqpQueryRef& query, bool isSql) override { return CheckedProcessQuery(*ExprCtx, [this, &query, isSql] (TExprContext& ctx) { return ExplainDataQueryInternal(query, isSql, ctx); }); } - IAsyncQueryResultPtr ExplainScanQuery(const TString& query, bool isSql) override { + IAsyncQueryResultPtr ExplainScanQuery(const TKqpQueryRef& query, bool isSql) override { return CheckedProcessQuery(*ExprCtx, [this, &query, isSql] (TExprContext& ctx) { return ExplainScanQueryInternal(query, isSql, ctx); }); } - TQueryResult SyncExplainDataQuery(const TString& query, bool isSql) override { + TQueryResult SyncExplainDataQuery(const TKqpQueryRef& query, bool isSql) override { return CheckedSyncProcessQuery( [this, &query, isSql] () { return ExplainDataQuery(query, isSql); }); } - IAsyncQueryResultPtr PrepareDataQuery(const TString& query, const TPrepareSettings& settings) override { + IAsyncQueryResultPtr PrepareDataQuery(const TKqpQueryRef& query, const TPrepareSettings& settings) override { return CheckedProcessQuery(*ExprCtx, [this, &query, settings] (TExprContext& ctx) mutable { return PrepareDataQueryInternal(query, settings, ctx); }); } - IAsyncQueryResultPtr PrepareDataQueryAst(const TString& query, const TPrepareSettings& settings) override { + IAsyncQueryResultPtr PrepareDataQueryAst(const TKqpQueryRef& query, const TPrepareSettings& settings) override { return CheckedProcessQuery(*ExprCtx, [this, &query, settings] (TExprContext& ctx) mutable { return PrepareDataQueryAstInternal(query, settings, ctx); }); } - TQueryResult SyncPrepareDataQuery(const TString& query, const TPrepareSettings& settings) override { + TQueryResult SyncPrepareDataQuery(const TKqpQueryRef& query, const TPrepareSettings& settings) override { return CheckedSyncProcessQuery( [this, &query, settings] () mutable { return PrepareDataQuery(query, settings); }); } - IAsyncQueryResultPtr PrepareQuery(const TString& query, const TPrepareSettings& settings) override { + IAsyncQueryResultPtr PrepareQuery(const TKqpQueryRef& query, const TPrepareSettings& settings) override { return CheckedProcessQuery(*ExprCtx, [this, &query, settings] (TExprContext& ctx) mutable { return PrepareQueryInternal(query, EKikimrQueryType::Query, settings, ctx); }); } - IAsyncQueryResultPtr PrepareFederatedQuery(const TString& query, const TPrepareSettings& settings) override { + IAsyncQueryResultPtr PrepareFederatedQuery(const TKqpQueryRef& query, const TPrepareSettings& settings) override { return CheckedProcessQuery(*ExprCtx, [this, &query, settings] (TExprContext& ctx) mutable { return PrepareQueryInternal(query, EKikimrQueryType::Script, settings, ctx); }); } - IAsyncQueryResultPtr ExecuteYqlScript(const TString& script, NKikimrMiniKQL::TParams&& parameters, + IAsyncQueryResultPtr ExecuteYqlScript(const TKqpQueryRef& script, NKikimrMiniKQL::TParams&& parameters, const TExecScriptSettings& settings) override { return CheckedProcessQuery(*ExprCtx, @@ -986,7 +990,7 @@ public: }); } - TQueryResult SyncExecuteYqlScript(const TString& script, NKikimrMiniKQL::TParams&& parameters, + TQueryResult SyncExecuteYqlScript(const TKqpQueryRef& script, NKikimrMiniKQL::TParams&& parameters, const TExecScriptSettings& settings) override { return CheckedSyncProcessQuery( @@ -995,7 +999,7 @@ public: }); } - IAsyncQueryResultPtr StreamExecuteYqlScript(const TString& script, NKikimrMiniKQL::TParams&& parameters, + IAsyncQueryResultPtr StreamExecuteYqlScript(const TKqpQueryRef& script, NKikimrMiniKQL::TParams&& parameters, const NActors::TActorId& target, const TExecScriptSettings& settings) override { return CheckedProcessQuery(*ExprCtx, @@ -1004,42 +1008,42 @@ public: }); } - IAsyncQueryResultPtr ValidateYqlScript(const TString& script) override { + IAsyncQueryResultPtr ValidateYqlScript(const TKqpQueryRef& script) override { return CheckedProcessQuery(*ExprCtx, [this, &script](TExprContext& ctx) mutable { return ValidateYqlScriptInternal(script, ctx); }); } - TQueryResult SyncValidateYqlScript(const TString& script) override { + TQueryResult SyncValidateYqlScript(const TKqpQueryRef& script) override { return CheckedSyncProcessQuery( [this, &script]() mutable { return ValidateYqlScript(script); }); } - IAsyncQueryResultPtr ExplainYqlScript(const TString& script) override { + IAsyncQueryResultPtr ExplainYqlScript(const TKqpQueryRef& script) override { return CheckedProcessQuery(*ExprCtx, [this, &script] (TExprContext& ctx) mutable { return ExplainYqlScriptInternal(script, ctx); }); } - TQueryResult SyncExplainYqlScript(const TString& script) override { + TQueryResult SyncExplainYqlScript(const TKqpQueryRef& script) override { return CheckedSyncProcessQuery( [this, &script] () mutable { return ExplainYqlScript(script); }); } - IAsyncQueryResultPtr PrepareScanQuery(const TString& query, bool isSql, const TPrepareSettings& /*settings*/) override { + IAsyncQueryResultPtr PrepareScanQuery(const TKqpQueryRef& query, bool isSql, const TPrepareSettings& /*settings*/) override { return CheckedProcessQuery(*ExprCtx, [this, &query, isSql] (TExprContext& ctx) mutable { return PrepareScanQueryInternal(query, isSql, ctx); }); } - TQueryResult SyncPrepareScanQuery(const TString& query, bool isSql, const TPrepareSettings& settings) override { + TQueryResult SyncPrepareScanQuery(const TKqpQueryRef& query, bool isSql, const TPrepareSettings& settings) override { return CheckedSyncProcessQuery( [this, &query, isSql, settings] () mutable { return PrepareScanQuery(query, isSql, settings); @@ -1047,12 +1051,12 @@ public: } private: - TExprNode::TPtr CompileQuery(const TString& query, bool isSql, bool sqlAutoCommit, TExprContext& ctx, + TExprNode::TPtr CompileQuery(const TKqpQueryRef& query, bool isSql, bool sqlAutoCommit, TExprContext& ctx, TMaybe<TSqlVersion>& sqlVersion) const { TAstParseResult astRes; if (isSql) { - NSQLTranslation::TTranslationSettings settings; + NSQLTranslation::TTranslationSettings settings{}; // TODO: remove this test crutch when dynamic bindings discovery will be implemented // YQ-1964 if (SessionCtx->Query().Type == EKikimrQueryType::Script && GetEnv("TEST_S3_CONNECTION")) { @@ -1100,13 +1104,31 @@ private: settings.Flags.insert("DisableEmitStartsWith"); } + if (query.ParameterTypes) { + NSQLTranslation::TTranslationSettings versionSettings = settings; + NYql::TIssues versionIssues; + + if (ParseTranslationSettings(query.Text, versionSettings, versionIssues) && versionSettings.SyntaxVersion == 1) { + for (const auto& [paramName, paramType] : *(query.ParameterTypes)) { + auto type = NYql::ParseTypeFromYdbType(paramType, ctx); + if (type != nullptr) { + if (paramName.StartsWith("$")) { + settings.DeclaredNamedExprs[paramName.substr(1)] = NYql::FormatType(type); + } else { + settings.DeclaredNamedExprs[paramName] = NYql::FormatType(type); + } + } + } + } + } + ui16 actualSyntaxVersion = 0; - astRes = NSQLTranslation::SqlToYql(query, settings, nullptr, &actualSyntaxVersion); + astRes = NSQLTranslation::SqlToYql(query.Text, settings, nullptr, &actualSyntaxVersion); TypesCtx->DeprecatedSQL = (actualSyntaxVersion == 0); sqlVersion = actualSyntaxVersion; } else { sqlVersion = {}; - astRes = ParseAst(query); + astRes = ParseAst(query.Text); // Do not check SQL constraints on s-expressions input, as it may come from both V0/V1. // Constraints were already checked on type annotation of SQL query. @@ -1128,7 +1150,7 @@ private: return result; } - TExprNode::TPtr CompileYqlQuery(const TString& query, bool isSql, bool sqlAutoCommit, TExprContext& ctx, + TExprNode::TPtr CompileYqlQuery(const TKqpQueryRef& query, bool isSql, bool sqlAutoCommit, TExprContext& ctx, TMaybe<TSqlVersion>& sqlVersion) const { auto queryExpr = CompileQuery(query, isSql, sqlAutoCommit, ctx, sqlVersion); @@ -1195,7 +1217,7 @@ private: return true; } - IAsyncQueryResultPtr ExecuteSchemeQueryInternal(const TString& query, bool isSql, TExprContext& ctx) { + IAsyncQueryResultPtr ExecuteSchemeQueryInternal(const TKqpQueryRef& query, bool isSql, TExprContext& ctx) { SetupYqlTransformer(EKikimrQueryType::Ddl); TMaybe<TSqlVersion> sqlVersion; @@ -1208,7 +1230,7 @@ private: *ResultProviderConfig, *PlanBuilder, sqlVersion); } - IAsyncQueryResultPtr ExplainDataQueryInternal(const TString& query, bool isSql, TExprContext& ctx) { + IAsyncQueryResultPtr ExplainDataQueryInternal(const TKqpQueryRef& query, bool isSql, TExprContext& ctx) { if (isSql) { return PrepareDataQueryInternal(query, {}, ctx); } @@ -1234,11 +1256,11 @@ private: }); } - IAsyncQueryResultPtr ExplainScanQueryInternal(const TString& query, bool isSql, TExprContext& ctx) { + IAsyncQueryResultPtr ExplainScanQueryInternal(const TKqpQueryRef& query, bool isSql, TExprContext& ctx) { return PrepareScanQueryInternal(query, isSql, ctx); } - IAsyncQueryResultPtr PrepareDataQueryInternal(const TString& query, const TPrepareSettings& settings, + IAsyncQueryResultPtr PrepareDataQueryInternal(const TKqpQueryRef& query, const TPrepareSettings& settings, TExprContext& ctx) { SetupYqlTransformer(EKikimrQueryType::Dml); @@ -1259,10 +1281,10 @@ private: } return MakeIntrusive<TAsyncPrepareYqlResult>(queryExpr.Get(), ctx, *YqlTransformer, SessionCtx->QueryPtr(), - query, sqlVersion); + query.Text, sqlVersion); } - IAsyncQueryResultPtr PrepareDataQueryAstInternal(const TString& queryAst, const TPrepareSettings& settings, + IAsyncQueryResultPtr PrepareDataQueryAstInternal(const TKqpQueryRef& queryAst, const TPrepareSettings& settings, TExprContext& ctx) { IKikimrQueryExecutor::TExecuteSettings execSettings; @@ -1289,7 +1311,7 @@ private: SessionCtx, *ExecuteCtx); } - IAsyncQueryResultPtr PrepareQueryInternal(const TString& query, EKikimrQueryType queryType, const TPrepareSettings& settings, + IAsyncQueryResultPtr PrepareQueryInternal(const TKqpQueryRef& query, EKikimrQueryType queryType, const TPrepareSettings& settings, TExprContext& ctx) { SetupYqlTransformer(queryType); @@ -1311,10 +1333,10 @@ private: } return MakeIntrusive<TAsyncPrepareYqlResult>(queryExpr.Get(), ctx, *YqlTransformer, SessionCtx->QueryPtr(), - query, sqlVersion); + query.Text, sqlVersion); } - IAsyncQueryResultPtr PrepareScanQueryInternal(const TString& query, bool isSql, TExprContext& ctx, + IAsyncQueryResultPtr PrepareScanQueryInternal(const TKqpQueryRef& query, bool isSql, TExprContext& ctx, EKikimrStatsMode statsMode = EKikimrStatsMode::None) { return isSql @@ -1322,7 +1344,7 @@ private: : PrepareScanQueryAstInternal(query, ctx); } - IAsyncQueryResultPtr PrepareScanQueryInternal(const TString& query, TExprContext& ctx, EKikimrStatsMode statsMode = EKikimrStatsMode::None) { + IAsyncQueryResultPtr PrepareScanQueryInternal(const TKqpQueryRef& query, TExprContext& ctx, EKikimrStatsMode statsMode = EKikimrStatsMode::None) { SetupYqlTransformer(EKikimrQueryType::Scan); SessionCtx->Query().PrepareOnly = true; @@ -1336,10 +1358,10 @@ private: } return MakeIntrusive<TAsyncPrepareYqlResult>(queryExpr.Get(), ctx, *YqlTransformer, SessionCtx->QueryPtr(), - query, sqlVersion); + query.Text, sqlVersion); } - IAsyncQueryResultPtr PrepareScanQueryAstInternal(const TString& queryAst, TExprContext& ctx) { + IAsyncQueryResultPtr PrepareScanQueryAstInternal(const TKqpQueryRef& queryAst, TExprContext& ctx) { IKikimrQueryExecutor::TExecuteSettings settings; SetupDataQueryAstTransformer(settings, EKikimrQueryType::Scan); @@ -1358,7 +1380,7 @@ private: SessionCtx, *ExecuteCtx); } - IAsyncQueryResultPtr ExecuteYqlScriptInternal(const TString& script, NKikimrMiniKQL::TParams&& parameters, + IAsyncQueryResultPtr ExecuteYqlScriptInternal(const TKqpQueryRef& script, NKikimrMiniKQL::TParams&& parameters, const TExecScriptSettings& settings, TExprContext& ctx) { SetupYqlTransformer(EKikimrQueryType::YqlScript); @@ -1382,7 +1404,7 @@ private: *ResultProviderConfig, *PlanBuilder, sqlVersion); } - IAsyncQueryResultPtr StreamExecuteYqlScriptInternal(const TString& script, NKikimrMiniKQL::TParams&& parameters, + IAsyncQueryResultPtr StreamExecuteYqlScriptInternal(const TKqpQueryRef& script, NKikimrMiniKQL::TParams&& parameters, const NActors::TActorId& target,const TExecScriptSettings& settings, TExprContext& ctx) { SetupYqlTransformer(EKikimrQueryType::YqlScriptStreaming); @@ -1408,7 +1430,7 @@ private: *ResultProviderConfig, *PlanBuilder, sqlVersion); } - IAsyncQueryResultPtr ValidateYqlScriptInternal(const TString& script, TExprContext& ctx) { + IAsyncQueryResultPtr ValidateYqlScriptInternal(const TKqpQueryRef& script, TExprContext& ctx) { SetupSession(EKikimrQueryType::YqlScript); SessionCtx->Query().PrepareOnly = true; @@ -1433,7 +1455,7 @@ private: return MakeIntrusive<TAsyncValidateYqlResult>(scriptExpr.Get(), SessionCtx, ctx, transformer, sqlVersion); } - IAsyncQueryResultPtr ExplainYqlScriptInternal(const TString& script, TExprContext& ctx) { + IAsyncQueryResultPtr ExplainYqlScriptInternal(const TKqpQueryRef& script, TExprContext& ctx) { SetupYqlTransformer(EKikimrQueryType::YqlScript); SessionCtx->Query().PrepareOnly = true; diff --git a/ydb/core/kqp/host/kqp_host.h b/ydb/core/kqp/host/kqp_host.h index 325e98aa860..6c1b2344e1e 100644 --- a/ydb/core/kqp/host/kqp_host.h +++ b/ydb/core/kqp/host/kqp_host.h @@ -8,6 +8,8 @@ namespace NKikimr { namespace NKqp { +struct TKqpQueryRef; + class IKqpHost : public TThrRefBase { public: using TQueryResult = IKqpGateway::TQueryResult; @@ -35,42 +37,42 @@ public: virtual ~IKqpHost() {} /* Data queries */ - virtual IAsyncQueryResultPtr ExplainDataQuery(const TString& query, bool isSql) = 0; - virtual TQueryResult SyncExplainDataQuery(const TString& query, bool isSql) = 0; + virtual IAsyncQueryResultPtr ExplainDataQuery(const TKqpQueryRef& query, bool isSql) = 0; + virtual TQueryResult SyncExplainDataQuery(const TKqpQueryRef& query, bool isSql) = 0; - virtual IAsyncQueryResultPtr PrepareDataQuery(const TString& query, const TPrepareSettings& settings) = 0; - virtual IAsyncQueryResultPtr PrepareDataQueryAst(const TString& query, const TPrepareSettings& settings) = 0; - virtual TQueryResult SyncPrepareDataQuery(const TString& query, const TPrepareSettings& settings) = 0; + virtual IAsyncQueryResultPtr PrepareDataQuery(const TKqpQueryRef& query, const TPrepareSettings& settings) = 0; + virtual IAsyncQueryResultPtr PrepareDataQueryAst(const TKqpQueryRef& query, const TPrepareSettings& settings) = 0; + virtual TQueryResult SyncPrepareDataQuery(const TKqpQueryRef& query, const TPrepareSettings& settings) = 0; /* Scheme queries */ - virtual IAsyncQueryResultPtr ExecuteSchemeQuery(const TString& query, bool isSql) = 0; - virtual TQueryResult SyncExecuteSchemeQuery(const TString& query, bool isSql) = 0; + virtual IAsyncQueryResultPtr ExecuteSchemeQuery(const TKqpQueryRef& query, bool isSql) = 0; + virtual TQueryResult SyncExecuteSchemeQuery(const TKqpQueryRef& query, bool isSql) = 0; /* Scan queries */ - virtual IAsyncQueryResultPtr PrepareScanQuery(const TString& query, bool isSql, const TPrepareSettings& settings) = 0; - virtual TQueryResult SyncPrepareScanQuery(const TString& query, bool isSql, const TPrepareSettings& settings) = 0; + virtual IAsyncQueryResultPtr PrepareScanQuery(const TKqpQueryRef& query, bool isSql, const TPrepareSettings& settings) = 0; + virtual TQueryResult SyncPrepareScanQuery(const TKqpQueryRef& query, bool isSql, const TPrepareSettings& settings) = 0; - virtual IAsyncQueryResultPtr ExplainScanQuery(const TString& query, bool isSql) = 0; + virtual IAsyncQueryResultPtr ExplainScanQuery(const TKqpQueryRef& query, bool isSql) = 0; /* Generic queries */ - virtual IAsyncQueryResultPtr PrepareQuery(const TString& query, const TPrepareSettings& settings) = 0; + virtual IAsyncQueryResultPtr PrepareQuery(const TKqpQueryRef& query, const TPrepareSettings& settings) = 0; /* Federated queries */ - virtual IAsyncQueryResultPtr PrepareFederatedQuery(const TString& query, const TPrepareSettings& settings) = 0; + virtual IAsyncQueryResultPtr PrepareFederatedQuery(const TKqpQueryRef& query, const TPrepareSettings& settings) = 0; /* Scripting */ - virtual IAsyncQueryResultPtr ValidateYqlScript(const TString& script) = 0; - virtual TQueryResult SyncValidateYqlScript(const TString& script) = 0; + virtual IAsyncQueryResultPtr ValidateYqlScript(const TKqpQueryRef& script) = 0; + virtual TQueryResult SyncValidateYqlScript(const TKqpQueryRef& script) = 0; - virtual IAsyncQueryResultPtr ExplainYqlScript(const TString& script) = 0; - virtual TQueryResult SyncExplainYqlScript(const TString& script) = 0; + virtual IAsyncQueryResultPtr ExplainYqlScript(const TKqpQueryRef& script) = 0; + virtual TQueryResult SyncExplainYqlScript(const TKqpQueryRef& script) = 0; - virtual IAsyncQueryResultPtr ExecuteYqlScript(const TString& script, NKikimrMiniKQL::TParams&& parameters, + virtual IAsyncQueryResultPtr ExecuteYqlScript(const TKqpQueryRef& script, NKikimrMiniKQL::TParams&& parameters, const TExecScriptSettings& settings) = 0; - virtual TQueryResult SyncExecuteYqlScript(const TString& script, NKikimrMiniKQL::TParams&& parameters, + virtual TQueryResult SyncExecuteYqlScript(const TKqpQueryRef& script, NKikimrMiniKQL::TParams&& parameters, const TExecScriptSettings& settings) = 0; - virtual IAsyncQueryResultPtr StreamExecuteYqlScript(const TString& script, NKikimrMiniKQL::TParams&& parameters, + virtual IAsyncQueryResultPtr StreamExecuteYqlScript(const TKqpQueryRef& script, NKikimrMiniKQL::TParams&& parameters, const NActors::TActorId& target, const TExecScriptSettings& settings) = 0; }; diff --git a/ydb/core/kqp/provider/yql_kikimr_results.cpp b/ydb/core/kqp/provider/yql_kikimr_results.cpp index 81ec7e96344..1e1c4ae7f37 100644 --- a/ydb/core/kqp/provider/yql_kikimr_results.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_results.cpp @@ -714,4 +714,114 @@ TExprNode::TPtr ParseKikimrProtoValue(const NKikimrMiniKQL::TType& type, const N } } +const TTypeAnnotationNode* ParseTypeFromYdbType(const Ydb::Type& type, TExprContext& ctx) { + switch (type.type_case()) { + case Ydb::Type::kVoidType: { + return ctx.MakeType<TVoidExprType>(); + } + + case Ydb::Type::kTypeId: { + auto slot = NKikimr::NUdf::FindDataSlot(type.type_id()); + if (!slot) { + ctx.AddError(TIssue(TPosition(), TStringBuilder() << "Unsupported data type: " + << type.ShortDebugString())); + + return nullptr; + } + return ctx.MakeType<TDataExprType>(*slot); + } + + case Ydb::Type::kDecimalType: { + auto slot = NKikimr::NUdf::FindDataSlot(NYql::NProto::TypeIds::Decimal); + if (!slot) { + ctx.AddError(TIssue(TPosition(), TStringBuilder() << "Unsupported data type: " + << type.ShortDebugString())); + + return nullptr; + } + + return ctx.MakeType<TDataExprParamsType>(*slot, ToString(type.decimal_type().precision()), ToString(type.decimal_type().scale())); + } + + case Ydb::Type::kOptionalType: { + auto itemType = ParseTypeFromYdbType(type.optional_type().item(), ctx); + if (!itemType) { + return nullptr; + } + + return ctx.MakeType<TOptionalExprType>(itemType); + } + + case Ydb::Type::kTupleType: { + TTypeAnnotationNode::TListType tupleItems; + + for (auto& element : type.tuple_type().Getelements()) { + auto elementType = ParseTypeFromYdbType(element, ctx); + if (!elementType) { + return nullptr; + } + + tupleItems.push_back(elementType); + } + + return ctx.MakeType<TTupleExprType>(tupleItems); + } + + case Ydb::Type::kListType: { + auto itemType = ParseTypeFromYdbType(type.list_type().item(), ctx); + if (!itemType) { + return nullptr; + } + + return ctx.MakeType<TListExprType>(itemType); + } + + case Ydb::Type::kStructType: { + TVector<const TItemExprType*> structMembers; + for (auto& member : type.struct_type().Getmembers()) { + auto memberType = ParseTypeFromYdbType(member.Gettype(), ctx); + if (!memberType) { + return nullptr; + } + + structMembers.push_back(ctx.MakeType<TItemExprType>(member.Getname(), memberType)); + } + + return ctx.MakeType<TStructExprType>(structMembers); + } + + case Ydb::Type::kDictType: { + auto keyType = ParseTypeFromYdbType(type.dict_type().key(), ctx); + if (!keyType) { + return nullptr; + } + + auto payloadType = ParseTypeFromYdbType(type.dict_type().payload(), ctx); + if (!payloadType) { + return nullptr; + } + + return ctx.MakeType<TDictExprType>(keyType, payloadType); + } + + case Ydb::Type::kPgType: { + if (!type.pg_type().type_name().empty()) { + const auto& typeName = type.pg_type().type_name(); + auto* typeDesc = NKikimr::NPg::TypeDescFromPgTypeName(typeName); + NKikimr::NPg::PgTypeIdFromTypeDesc(typeDesc); + return ctx.MakeType<TPgExprType>(NKikimr::NPg::PgTypeIdFromTypeDesc(typeDesc)); + } + return ctx.MakeType<TPgExprType>(type.pg_type().Getoid()); + } + + case Ydb::Type::kNullType: + [[fallthrough]]; + default: { + ctx.AddError(TIssue(TPosition(), TStringBuilder() << "Unsupported protobuf type: " + << type.ShortDebugString())); + return nullptr; + } + } +} + } // namespace NYql diff --git a/ydb/core/kqp/provider/yql_kikimr_results.h b/ydb/core/kqp/provider/yql_kikimr_results.h index 25e17225c04..edee530f9e7 100644 --- a/ydb/core/kqp/provider/yql_kikimr_results.h +++ b/ydb/core/kqp/provider/yql_kikimr_results.h @@ -17,4 +17,6 @@ bool ExportTypeToKikimrProto(const TTypeAnnotationNode& type, NKikimrMiniKQL::TT TExprNode::TPtr ParseKikimrProtoValue(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue& value, TPositionHandle pos, TExprContext& ctx); +const TTypeAnnotationNode* ParseTypeFromYdbType(const Ydb::Type& input, TExprContext& ctx); + } // namespace NYql diff --git a/ydb/core/kqp/session_actor/kqp_query_state.cpp b/ydb/core/kqp/session_actor/kqp_query_state.cpp index c18909c74f5..a39209ee845 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.cpp +++ b/ydb/core/kqp/session_actor/kqp_query_state.cpp @@ -127,12 +127,12 @@ std::unique_ptr<TEvKqp::TEvCompileRequest> TKqpQueryState::BuildCompileRequest() bool keepInCache = false; switch (GetAction()) { case NKikimrKqp::QUERY_ACTION_EXECUTE: - query = TKqpQueryId(Cluster, Database, GetQuery(), GetType()); + query = TKqpQueryId(Cluster, Database, GetQuery(), GetType(), GetQueryParameterTypes()); keepInCache = GetQueryKeepInCache() && query->IsSql(); break; case NKikimrKqp::QUERY_ACTION_PREPARE: - query = TKqpQueryId(Cluster, Database, GetQuery(), GetType()); + query = TKqpQueryId(Cluster, Database, GetQuery(), GetType(), GetQueryParameterTypes()); keepInCache = query->IsSql(); break; @@ -166,11 +166,11 @@ std::unique_ptr<TEvKqp::TEvRecompileRequest> TKqpQueryState::BuildReCompileReque switch (GetAction()) { case NKikimrKqp::QUERY_ACTION_EXECUTE: - query = TKqpQueryId(Cluster, Database, GetQuery(), GetType()); + query = TKqpQueryId(Cluster, Database, GetQuery(), GetType(), GetQueryParameterTypes()); break; case NKikimrKqp::QUERY_ACTION_PREPARE: - query = TKqpQueryId(Cluster, Database, GetQuery(), GetType()); + query = TKqpQueryId(Cluster, Database, GetQuery(), GetType(), GetQueryParameterTypes()); break; case NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED: diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 993bbe5b16e..abd838b2f65 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -14,6 +14,10 @@ #include <ydb/core/kqp/session_actor/kqp_tx.h> #include <util/generic/noncopyable.h> +#include <util/generic/string.h> + +#include <map> +#include <memory> namespace NKikimr::NKqp { @@ -42,6 +46,14 @@ public: , UserToken(ev->Get()->GetUserToken()) { RequestEv.reset(ev->Release().Release()); + + if (AppData()->FeatureFlags.GetEnableImplicitQueryParameterTypes() && !RequestEv->GetYdbParameters().empty()) { + QueryParameterTypes = std::make_shared<std::map<TString, Ydb::Type>>(); + for (const auto& [name, typedValue] : RequestEv->GetYdbParameters()) { + QueryParameterTypes->insert({name, typedValue.Gettype()}); + } + } + SetQueryDeadlines(config); auto action = GetAction(); KqpSessionSpan = NWilson::TSpan( @@ -94,6 +106,8 @@ public: TDuration CpuTime; std::optional<NCpuTime::TCpuTimer> CurrentTimer; + std::shared_ptr<std::map<TString, Ydb::Type>> QueryParameterTypes; + NKikimrKqp::EQueryAction GetAction() const { return RequestEv->GetAction(); } @@ -114,6 +128,10 @@ public: return RequestEv->GetType(); } + std::shared_ptr<std::map<TString, Ydb::Type>> GetQueryParameterTypes() const { + return QueryParameterTypes; + } + void EnsureAction() { YQL_ENSURE(RequestEv->HasAction()); } diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp index 82083c9b428..dd372477f93 100644 --- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp +++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp @@ -2,6 +2,7 @@ #include <ydb/core/client/minikql_compile/mkql_compile_service.h> #include <ydb/core/kqp/common/kqp_yql.h> +#include <ydb/core/kqp/common/kqp.h> #include <ydb/core/kqp/gateway/kqp_metadata_loader.h> #include <ydb/core/kqp/host/kqp_host.h> diff --git a/ydb/core/kqp/ut/query/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/ut/query/CMakeLists.darwin-x86_64.txt index 97ab78319b3..ec1ad6a0891 100644 --- a/ydb/core/kqp/ut/query/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/ut/query/CMakeLists.darwin-x86_64.txt @@ -20,6 +20,7 @@ target_link_libraries(ydb-core-kqp-ut-query PUBLIC library-cpp-cpuid_check cpp-testing-unittest_main ydb-core-kqp + cpp-client-ydb_proto kqp-ut-common yql-sql-pg_dummy ) diff --git a/ydb/core/kqp/ut/query/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/ut/query/CMakeLists.linux-aarch64.txt index 232ef60d67b..38ad4a0b6f7 100644 --- a/ydb/core/kqp/ut/query/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/ut/query/CMakeLists.linux-aarch64.txt @@ -20,6 +20,7 @@ target_link_libraries(ydb-core-kqp-ut-query PUBLIC yutil cpp-testing-unittest_main ydb-core-kqp + cpp-client-ydb_proto kqp-ut-common yql-sql-pg_dummy ) diff --git a/ydb/core/kqp/ut/query/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/ut/query/CMakeLists.linux-x86_64.txt index 37721fc9b91..1201b9132d2 100644 --- a/ydb/core/kqp/ut/query/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/ut/query/CMakeLists.linux-x86_64.txt @@ -21,6 +21,7 @@ target_link_libraries(ydb-core-kqp-ut-query PUBLIC library-cpp-cpuid_check cpp-testing-unittest_main ydb-core-kqp + cpp-client-ydb_proto kqp-ut-common yql-sql-pg_dummy ) diff --git a/ydb/core/kqp/ut/query/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/ut/query/CMakeLists.windows-x86_64.txt index e50e070e87a..b6a1a94bc2b 100644 --- a/ydb/core/kqp/ut/query/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/ut/query/CMakeLists.windows-x86_64.txt @@ -20,6 +20,7 @@ target_link_libraries(ydb-core-kqp-ut-query PUBLIC library-cpp-cpuid_check cpp-testing-unittest_main ydb-core-kqp + cpp-client-ydb_proto kqp-ut-common yql-sql-pg_dummy ) diff --git a/ydb/core/kqp/ut/query/kqp_params_ut.cpp b/ydb/core/kqp/ut/query/kqp_params_ut.cpp index 8e41e15aafe..5fd78a8bcb7 100644 --- a/ydb/core/kqp/ut/query/kqp_params_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_params_ut.cpp @@ -1,4 +1,5 @@ #include <ydb/core/kqp/ut/common/kqp_ut_common.h> +#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> namespace NKikimr { namespace NKqp { @@ -101,6 +102,103 @@ Y_UNIT_TEST_SUITE(KqpParams) { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST, result.GetIssues().ToString()); } + Y_UNIT_TEST(ImplicitParameterTypes) { + TKikimrRunner kikimr; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + auto params = db.GetParamsBuilder() + .AddParam("$name") + .String("Sergey") + .Build() + .AddParam("$group") + .Int32(1) + .Build() + .Build(); + + // don't DECLARE parameter types in text query + auto result = session.ExecuteDataQuery(Q1_(R"( + SELECT * FROM `/Root/Test` WHERE Group = $group AND Name = $name; + )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), params).ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + Y_UNIT_TEST(ExplicitSameParameterTypesQueryCacheCheck) { + TKikimrRunner kikimr; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + // enable query cache + NYdb::NTable::TExecDataQuerySettings execSettings{}; + execSettings.KeepInQueryCache(true); + // enable extraction of cache status from the reply + execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + + for (int i = 0; i < 2; ++i) { + auto params = db.GetParamsBuilder().AddParam("$group").Int32(1).Build().Build(); + auto result = session.ExecuteDataQuery(Q1_(R"( + DECLARE $group AS Int32; + SELECT * FROM `/Root/Test` WHERE Group = $group; + )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), params, execSettings).ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL(result.GetStats().Defined(), true); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), i); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + } + + Y_UNIT_TEST(ImplicitSameParameterTypesQueryCacheCheck) { + TKikimrRunner kikimr; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + // enable query cache + NYdb::NTable::TExecDataQuerySettings execSettings{}; + execSettings.KeepInQueryCache(true); + // enable extraction of cache status from the reply + execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + + for (int i = 0; i < 2; ++i) { + auto params = db.GetParamsBuilder().AddParam("$group").Int32(1).Build().Build(); + // don't DECLARE parameter type in text query + auto result = session.ExecuteDataQuery(Q1_(R"( + SELECT * FROM `/Root/Test` WHERE Group = $group; + )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), params, execSettings).ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL(result.GetStats().Defined(), true); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), i); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + } + + Y_UNIT_TEST(ImplicitDifferentParameterTypesQueryCacheCheck) { + TKikimrRunner kikimr; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + // enable query cache + NYdb::NTable::TExecDataQuerySettings execSettings{}; + execSettings.KeepInQueryCache(true); + // enable extraction of cache status from the reply + execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + + // two queries differ only by parameter type + for (const auto& params : { db.GetParamsBuilder().AddParam("$group").Int32(1).Build().Build(), db.GetParamsBuilder().AddParam("$group").Uint32(1).Build().Build() }) { + // don't DECLARE parameter type in text query + auto result = session.ExecuteDataQuery(Q1_(R"( + SELECT * FROM `/Root/Test` WHERE Group = $group; + )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), params, execSettings).ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL(result.GetStats().Defined(), true); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + } + Y_UNIT_TEST(DefaultParameterValue) { TKikimrRunner kikimr; auto db = kikimr.GetTableClient(); diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 9905bfe8ac5..13e505831d0 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -803,6 +803,7 @@ message TFeatureFlags { optional bool EnableSeparationComputeActorsFromRead = 90 [default = false]; optional bool EnablePQConfigTransactionsAtSchemeShard = 91 [default = false]; optional bool EnableScriptExecutionOperations = 92 [default = false]; + optional bool EnableImplicitQueryParameterTypes = 93 [default = true]; } |