diff options
author | galaxycrab <UgnineSirdis@ydb.tech> | 2023-03-02 17:47:09 +0300 |
---|---|---|
committer | galaxycrab <UgnineSirdis@ydb.tech> | 2023-03-02 17:47:09 +0300 |
commit | 9a024452fbd14773e9b17693833e41b062a31742 (patch) | |
tree | 12f3b0d5a7b4385b5d07ccf7281885319ba7159e | |
parent | 9f2f2091b3bef79e2599a80d4bb0d962d1e20b3a (diff) | |
download | ydb-9a024452fbd14773e9b17693833e41b062a31742.tar.gz |
Introduce new query type for ExecuteScript API method in EQueryType, EKikimrQueryType, EPhysicalQueryType, TKqpPhyQuery
-rw-r--r-- | ydb/core/grpc_services/query/query_helpers.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/common/kqp.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/common/kqp.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/common/kqp_timeouts.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/common/kqp_yql.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/common/kqp_yql.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/compile_service/kqp_compile_actor.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/counters/kqp_counters.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/host/kqp_host.cpp | 14 | ||||
-rw-r--r-- | ydb/core/kqp/host/kqp_host.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/host/kqp_runner.cpp | 3 | ||||
-rw-r--r-- | ydb/core/kqp/opt/kqp_opt_build_phy_query.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/opt/kqp_opt_build_txs.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_provider.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/query_compiler/kqp_query_compiler.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 10 | ||||
-rw-r--r-- | ydb/core/protos/kqp.proto | 1 | ||||
-rw-r--r-- | ydb/core/protos/kqp_physical.proto | 1 |
18 files changed, 48 insertions, 14 deletions
diff --git a/ydb/core/grpc_services/query/query_helpers.h b/ydb/core/grpc_services/query/query_helpers.h index c706ee34163..630c2c65e1a 100644 --- a/ydb/core/grpc_services/query/query_helpers.h +++ b/ydb/core/grpc_services/query/query_helpers.h @@ -50,7 +50,7 @@ inline NKikimrKqp::EQueryType GetQueryType(const Ydb::Query::ExecuteQueryRequest } inline NKikimrKqp::EQueryType GetQueryType(const Ydb::Query::ExecuteScriptRequest&) { - return NKikimrKqp::QUERY_TYPE_SQL_QUERY; // TODO: make new query type + return NKikimrKqp::QUERY_TYPE_FEDERATED_QUERY; } } // namespace NQueryHelpersPrivate diff --git a/ydb/core/kqp/common/kqp.cpp b/ydb/core/kqp/common/kqp.cpp index d54a2c04b56..f52e4f122ab 100644 --- a/ydb/core/kqp/common/kqp.cpp +++ b/ydb/core/kqp/common/kqp.cpp @@ -14,6 +14,7 @@ bool IsSqlQuery(const NKikimrKqp::EQueryType& queryType) { case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING: case NKikimrKqp::QUERY_TYPE_SQL_SCAN: case NKikimrKqp::QUERY_TYPE_SQL_QUERY: + case NKikimrKqp::QUERY_TYPE_FEDERATED_QUERY: return true; default: @@ -64,4 +65,3 @@ void TKqpShutdownController::Stop() { } } // namespace NKikimr::NKqp - diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h index a0d303ebbf3..16e4666af4e 100644 --- a/ydb/core/kqp/common/kqp.h +++ b/ydb/core/kqp/common/kqp.h @@ -171,6 +171,7 @@ public: case NKikimrKqp::QUERY_TYPE_AST_DML: case NKikimrKqp::QUERY_TYPE_AST_SCAN: case NKikimrKqp::QUERY_TYPE_SQL_QUERY: + case NKikimrKqp::QUERY_TYPE_FEDERATED_QUERY: break; default: diff --git a/ydb/core/kqp/common/kqp_timeouts.cpp b/ydb/core/kqp/common/kqp_timeouts.cpp index f66f9c7a3b8..819730e684c 100644 --- a/ydb/core/kqp/common/kqp_timeouts.cpp +++ b/ydb/core/kqp/common/kqp_timeouts.cpp @@ -16,6 +16,7 @@ ui64 GetDefaultQueryTimeoutMs(NKikimrKqp::EQueryType queryType, const NKikimrCon case NKikimrKqp::QUERY_TYPE_PREPARED_DML: case NKikimrKqp::QUERY_TYPE_AST_DML: case NKikimrKqp::QUERY_TYPE_SQL_QUERY: + case NKikimrKqp::QUERY_TYPE_FEDERATED_QUERY: return queryLimits.GetDataQueryTimeoutMs(); case NKikimrKqp::QUERY_TYPE_SQL_SCAN: diff --git a/ydb/core/kqp/common/kqp_yql.cpp b/ydb/core/kqp/common/kqp_yql.cpp index 2334424b4ab..ec488a54686 100644 --- a/ydb/core/kqp/common/kqp_yql.cpp +++ b/ydb/core/kqp/common/kqp_yql.cpp @@ -16,6 +16,8 @@ static EPhysicalQueryType GetPhysicalQueryType(const TStringBuf& value) { return EPhysicalQueryType::Scan; } else if (value == "query") { return EPhysicalQueryType::Query; + } else if (value == "federated_query") { + return EPhysicalQueryType::FederatedQuery; } else { YQL_ENSURE(false, "Unknown physical query type: " << value); } @@ -31,6 +33,8 @@ static TStringBuf PhysicalQueryTypeToString(EPhysicalQueryType type) { return "scan_query"; case EPhysicalQueryType::Query: return "query"; + case EPhysicalQueryType::FederatedQuery: + return "federated_query"; } YQL_ENSURE(false, "Unexpected physical query type: " << type); diff --git a/ydb/core/kqp/common/kqp_yql.h b/ydb/core/kqp/common/kqp_yql.h index 0c29de4df31..3713e742273 100644 --- a/ydb/core/kqp/common/kqp_yql.h +++ b/ydb/core/kqp/common/kqp_yql.h @@ -12,7 +12,8 @@ enum class EPhysicalQueryType { Unspecified, Data, Scan, - Query + Query, + FederatedQuery, }; struct TKqpPhyQuerySettings { diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index 8a7e8515827..ce503ca9d9d 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -122,6 +122,7 @@ public: break; case NKikimrKqp::QUERY_TYPE_SQL_QUERY: + case NKikimrKqp::QUERY_TYPE_FEDERATED_QUERY: AsyncCompileResult = KqpHost->PrepareQuery(Query.Text, prepareSettings); break; diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp index 3ed8577d707..35964922a8a 100644 --- a/ydb/core/kqp/counters/kqp_counters.cpp +++ b/ydb/core/kqp/counters/kqp_counters.cpp @@ -104,6 +104,8 @@ void TKqpCountersBase::Init() { KqpGroup->GetCounter("Request/QueryTypeAstScan", true); QueryTypes[NKikimrKqp::EQueryType::QUERY_TYPE_SQL_QUERY] = KqpGroup->GetCounter("Request/QueryTypeQuery", true); + QueryTypes[NKikimrKqp::EQueryType::QUERY_TYPE_FEDERATED_QUERY] = + KqpGroup->GetCounter("Request/QueryTypeFederatedQuery", true); OtherQueryTypes = KqpGroup->GetCounter("Requests/QueryTypeOther", true); QueriesWithRangeScan = KqpGroup->GetCounter("Query/WithRangeScan", true); diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index a6d39e32d12..3f062a22dd1 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -679,6 +679,7 @@ public: case EKikimrQueryType::Scan: return KqpRunner->PrepareScanQuery(cluster, query, ctx, settings); case EKikimrQueryType::Query: + case EKikimrQueryType::FederatedQuery: return KqpRunner->PrepareQuery(cluster, query, ctx, settings); case EKikimrQueryType::YqlScript: case EKikimrQueryType::YqlScriptStreaming: @@ -1032,7 +1033,14 @@ public: IAsyncQueryResultPtr PrepareQuery(const TString& query, const TPrepareSettings& settings) override { return CheckedProcessQuery(*ExprCtx, [this, &query, settings] (TExprContext& ctx) mutable { - return PrepareQueryInternal(query, settings, ctx); + return PrepareQueryInternal(query, EKikimrQueryType::Query, settings, ctx); + }); + } + + IAsyncQueryResultPtr PrepareFederatedQuery(const TString& query, const TPrepareSettings& settings) override { + return CheckedProcessQuery(*ExprCtx, + [this, &query, settings] (TExprContext& ctx) mutable { + return PrepareQueryInternal(query, EKikimrQueryType::FederatedQuery, settings, ctx); }); } @@ -1329,12 +1337,12 @@ private: SessionCtx, *ExecuteCtx); } - IAsyncQueryResultPtr PrepareQueryInternal(const TString& query, const TPrepareSettings& settings, + IAsyncQueryResultPtr PrepareQueryInternal(const TString& query, EKikimrQueryType queryType, const TPrepareSettings& settings, TExprContext& ctx) { SetupYqlTransformer(); - SessionCtx->Query().Type = EKikimrQueryType::Query; + SessionCtx->Query().Type = queryType; SessionCtx->Query().PrepareOnly = true; SessionCtx->Query().PreparingQuery = std::make_unique<NKikimrKqp::TPreparedQuery>(); if (settings.DocumentApiRestricted) { diff --git a/ydb/core/kqp/host/kqp_host.h b/ydb/core/kqp/host/kqp_host.h index daf6321ff11..3cf7788250c 100644 --- a/ydb/core/kqp/host/kqp_host.h +++ b/ydb/core/kqp/host/kqp_host.h @@ -52,6 +52,9 @@ public: /* Generic queries */ virtual IAsyncQueryResultPtr PrepareQuery(const TString& query, const TPrepareSettings& settings) = 0; + /* Federated queries */ + virtual IAsyncQueryResultPtr PrepareFederatedQuery(const TString& query, const TPrepareSettings& settings) = 0; + /* Scripting */ virtual IAsyncQueryResultPtr ValidateYqlScript(const TString& script) = 0; virtual TQueryResult SyncValidateYqlScript(const TString& script) = 0; diff --git a/ydb/core/kqp/host/kqp_runner.cpp b/ydb/core/kqp/host/kqp_runner.cpp index 34650514068..ecc6307d29d 100644 --- a/ydb/core/kqp/host/kqp_runner.cpp +++ b/ydb/core/kqp/host/kqp_runner.cpp @@ -177,7 +177,7 @@ public: TIntrusivePtr<TAsyncQueryResult> PrepareQuery(const TString& cluster, const TExprNode::TPtr& query, TExprContext& ctx, const IKikimrQueryExecutor::TExecuteSettings& settings) override { - YQL_ENSURE(TransformCtx->QueryCtx->Type == EKikimrQueryType::Query); + YQL_ENSURE(TransformCtx->QueryCtx->Type == EKikimrQueryType::Query || TransformCtx->QueryCtx->Type == EKikimrQueryType::FederatedQuery); YQL_ENSURE(TransformCtx->QueryCtx->PrepareOnly); YQL_ENSURE(TransformCtx->QueryCtx->PreparingQuery); YQL_ENSURE(TMaybeNode<TKiDataQueryBlocks>(query)); @@ -243,6 +243,7 @@ private: case EKikimrQueryType::Dml: case EKikimrQueryType::Scan: case EKikimrQueryType::Query: + case EKikimrQueryType::FederatedQuery: break; default: YQL_ENSURE(false, "PrepareQueryNewEngine, unexpected query type: " << queryType); diff --git a/ydb/core/kqp/opt/kqp_opt_build_phy_query.cpp b/ydb/core/kqp/opt/kqp_opt_build_phy_query.cpp index e3409d0cf12..122f37196bd 100644 --- a/ydb/core/kqp/opt/kqp_opt_build_phy_query.cpp +++ b/ydb/core/kqp/opt/kqp_opt_build_phy_query.cpp @@ -37,6 +37,10 @@ public: querySettings.Type = EPhysicalQueryType::Query; break; } + case EKikimrQueryType::FederatedQuery: { + querySettings.Type = EPhysicalQueryType::FederatedQuery; + break; + } default: { YQL_ENSURE(false, "Unexpected query type: " << KqpCtx->QueryCtx->Type); } diff --git a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp index 4a3c500f3ad..013341dc86f 100644 --- a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp +++ b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp @@ -69,7 +69,7 @@ private: return EPhysicalTxType::Scan; } - if (QueryType == EKikimrQueryType::Query) { + if (QueryType == EKikimrQueryType::Query || QueryType == EKikimrQueryType::FederatedQuery) { if (IsPrecompute && allStagesArePure) { return EPhysicalTxType::Compute; } diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h index 9b920d1499a..18555063d18 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider.h @@ -67,6 +67,7 @@ enum class EKikimrQueryType { Scan, YqlScriptStreaming, Query, + FederatedQuery, }; struct TKikimrQueryContext : TThrRefBase { @@ -308,7 +309,7 @@ public: return false; } - if (queryType == EKikimrQueryType::Query && (newOp & KikimrSchemeOps())) { + if ((queryType == EKikimrQueryType::Query || queryType == EKikimrQueryType::FederatedQuery) && (newOp & KikimrSchemeOps())) { TString message = TStringBuilder() << "Operation '" << newOp << "' can't be performed in query"; ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index 38fd6788bdb..b266ed427e9 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -44,6 +44,7 @@ NKqpProto::TKqpPhyQuery::EType GetPhyQueryType(const EPhysicalQueryType& type) { case EPhysicalQueryType::Data: return NKqpProto::TKqpPhyQuery::TYPE_DATA; case EPhysicalQueryType::Scan: return NKqpProto::TKqpPhyQuery::TYPE_SCAN; case EPhysicalQueryType::Query: return NKqpProto::TKqpPhyQuery::TYPE_QUERY; + case EPhysicalQueryType::FederatedQuery: return NKqpProto::TKqpPhyQuery::TYPE_FEDERATED_QUERY; case EPhysicalQueryType::Unspecified: break; @@ -524,7 +525,7 @@ public: [](const TItemExprType* first, const TItemExprType* second) { return first->GetName() < second->GetName(); }); - inputsParams.erase(std::unique(inputsParams.begin(), inputsParams.end(), + inputsParams.erase(std::unique(inputsParams.begin(), inputsParams.end(), [](const TItemExprType* first, const TItemExprType* second) { return first->GetName() == second->GetName(); }), @@ -715,7 +716,7 @@ private: for (auto&& i : *txProto.MutableStages()) { i.MutableProgram()->MutableSettings()->SetLevelDataPrediction(rPredictor.GetLevelDataVolume(i.GetProgram().GetSettings().GetStageLevel())); } - + YQL_ENSURE(hasEffectStage == txSettings.WithEffects); diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 23b4b87a2c4..8d738d35a4d 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -219,7 +219,8 @@ struct TKqpQueryState { return ( type == NKikimrKqp::QUERY_TYPE_AST_SCAN || type == NKikimrKqp::QUERY_TYPE_SQL_SCAN || - type == NKikimrKqp::QUERY_TYPE_SQL_QUERY + type == NKikimrKqp::QUERY_TYPE_SQL_QUERY || + type == NKikimrKqp::QUERY_TYPE_FEDERATED_QUERY ); } @@ -437,6 +438,7 @@ public: case NKikimrKqp::QUERY_TYPE_AST_SCAN: case NKikimrKqp::QUERY_TYPE_AST_DML: case NKikimrKqp::QUERY_TYPE_SQL_QUERY: + case NKikimrKqp::QUERY_TYPE_FEDERATED_QUERY: return true; // should not be compiled. TODO: forward to request executer @@ -885,7 +887,8 @@ public: YQL_ENSURE( type == NKikimrKqp::QUERY_TYPE_SQL_SCAN || type == NKikimrKqp::QUERY_TYPE_AST_SCAN || - type == NKikimrKqp::QUERY_TYPE_SQL_QUERY + type == NKikimrKqp::QUERY_TYPE_SQL_QUERY || + type == NKikimrKqp::QUERY_TYPE_FEDERATED_QUERY ); break; @@ -1462,7 +1465,8 @@ public: case NKikimrKqp::QUERY_TYPE_SQL_SCAN: case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT: case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING: - case NKikimrKqp::QUERY_TYPE_SQL_QUERY: { + case NKikimrKqp::QUERY_TYPE_SQL_QUERY: + case NKikimrKqp::QUERY_TYPE_FEDERATED_QUERY: { TString text = QueryState->ExtractQueryText(); if (IsQueryAllowedToLog(text)) { auto userSID = NACLib::TUserToken(QueryState->UserToken).GetUserSID(); diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index 5220ae755b9..887c91aeb8d 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -38,6 +38,7 @@ enum EQueryType { QUERY_TYPE_SQL_SCRIPT_STREAMING = 11; QUERY_TYPE_SQL_QUERY = 12; + QUERY_TYPE_FEDERATED_QUERY = 13; }; enum EQueryAction { diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto index 48cba779ba7..b2598b4e227 100644 --- a/ydb/core/protos/kqp_physical.proto +++ b/ydb/core/protos/kqp_physical.proto @@ -355,6 +355,7 @@ message TKqpPhyQuery { TYPE_DATA = 1; TYPE_SCAN = 2; TYPE_QUERY = 3; + TYPE_FEDERATED_QUERY = 4; }; EType Type = 1; |