diff options
author | spuchin <spuchin@ydb.tech> | 2023-01-08 21:40:07 +0300 |
---|---|---|
committer | spuchin <spuchin@ydb.tech> | 2023-01-08 21:40:07 +0300 |
commit | ddec3014fde819da31b3c2ce50321b44e77de0c4 (patch) | |
tree | 18ef9f72c3c5dfc9b0d330374bd322c17cb827dd | |
parent | 5b8946eafc3ffcfe351d1e070a395bee8337dead (diff) | |
download | ydb-ddec3014fde819da31b3c2ce50321b44e77de0c4.tar.gz |
Add separate KQP query type for generic query. ()
22 files changed, 275 insertions, 54 deletions
diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp index c432aa49d7c..27fbc44d33e 100644 --- a/ydb/core/grpc_services/query/rpc_execute_query.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp @@ -34,8 +34,7 @@ std::tuple<Ydb::StatusIds::StatusCode, NYql::TIssues> FillKqpRequest( } } - // TODO: Use new type of query (QUERY_TYPE_SQL_QUERY) - kqpRequest.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_SCAN); + kqpRequest.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_QUERY); kqpRequest.MutableRequest()->SetKeepSession(false); switch (req.query_case()) { diff --git a/ydb/core/kqp/common/kqp.cpp b/ydb/core/kqp/common/kqp.cpp index 040bc5f9f40..d54a2c04b56 100644 --- a/ydb/core/kqp/common/kqp.cpp +++ b/ydb/core/kqp/common/kqp.cpp @@ -6,6 +6,23 @@ namespace NKikimr::NKqp { +bool IsSqlQuery(const NKikimrKqp::EQueryType& queryType) { + switch (queryType) { + case NKikimrKqp::QUERY_TYPE_SQL_DML: + case NKikimrKqp::QUERY_TYPE_SQL_DDL: + case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT: + case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING: + case NKikimrKqp::QUERY_TYPE_SQL_SCAN: + case NKikimrKqp::QUERY_TYPE_SQL_QUERY: + return true; + + default: + break; + } + + return false; +} + TKqpShutdownController::TKqpShutdownController(NActors::TActorId kqpProxyActorId, const NKikimrConfig::TTableServiceConfig& tableServiceConfig, bool enableGraceful) : KqpProxyActorId_(kqpProxyActorId) , EnableGraceful(enableGraceful) diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h index 7e7fe78adc9..7038cd03906 100644 --- a/ydb/core/kqp/common/kqp.h +++ b/ydb/core/kqp/common/kqp.h @@ -145,6 +145,8 @@ struct TKqpQuerySettings { } }; +bool IsSqlQuery(const NKikimrKqp::EQueryType& queryType); + struct TKqpQueryId { TString Cluster; TString Database; @@ -165,19 +167,17 @@ public: case NKikimrKqp::QUERY_TYPE_SQL_SCAN: case NKikimrKqp::QUERY_TYPE_AST_DML: case NKikimrKqp::QUERY_TYPE_AST_SCAN: - break; + case NKikimrKqp::QUERY_TYPE_SQL_QUERY: + break; + default: - Y_ENSURE(false, "Unsupported request type"); + Y_ENSURE(false, "Unsupported request type"); } } - bool IsScan() const { - return QueryType == NKikimrKqp::QUERY_TYPE_SQL_SCAN || QueryType == NKikimrKqp::QUERY_TYPE_AST_SCAN; - } - bool IsSql() const { - return QueryType == NKikimrKqp::QUERY_TYPE_SQL_DML || QueryType == NKikimrKqp::QUERY_TYPE_SQL_SCAN; + return IsSqlQuery(QueryType); } bool operator==(const TKqpQueryId& other) const { diff --git a/ydb/core/kqp/common/kqp_timeouts.cpp b/ydb/core/kqp/common/kqp_timeouts.cpp index c8ad9c584aa..f66f9c7a3b8 100644 --- a/ydb/core/kqp/common/kqp_timeouts.cpp +++ b/ydb/core/kqp/common/kqp_timeouts.cpp @@ -15,6 +15,7 @@ ui64 GetDefaultQueryTimeoutMs(NKikimrKqp::EQueryType queryType, const NKikimrCon case NKikimrKqp::QUERY_TYPE_SQL_DML: case NKikimrKqp::QUERY_TYPE_PREPARED_DML: case NKikimrKqp::QUERY_TYPE_AST_DML: + case NKikimrKqp::QUERY_TYPE_SQL_QUERY: return queryLimits.GetDataQueryTimeoutMs(); case NKikimrKqp::QUERY_TYPE_SQL_SCAN: diff --git a/ydb/core/kqp/common/kqp_yql.cpp b/ydb/core/kqp/common/kqp_yql.cpp index 8cbaaecba95..a7b5196d997 100644 --- a/ydb/core/kqp/common/kqp_yql.cpp +++ b/ydb/core/kqp/common/kqp_yql.cpp @@ -13,6 +13,8 @@ static EPhysicalQueryType GetPhysicalQueryType(const TStringBuf& value) { return EPhysicalQueryType::Data; } else if (value == "scan_query") { return EPhysicalQueryType::Scan; + } else if (value == "query") { + return EPhysicalQueryType::Query; } else { YQL_ENSURE(false, "Unknown physical query type: " << value); } @@ -26,6 +28,8 @@ static TStringBuf PhysicalQueryTypeToString(EPhysicalQueryType type) { return "data_query"; case EPhysicalQueryType::Scan: return "scan_query"; + case EPhysicalQueryType::Query: + return "query"; } YQL_ENSURE(false, "Unexpected physical query type: " << type); @@ -67,6 +71,8 @@ static EPhysicalTxType GetPhysicalTxType(const TStringBuf& value) { return EPhysicalTxType::Data; } else if (value == "scan") { return EPhysicalTxType::Scan; + } else if (value == "generic") { + return EPhysicalTxType::Generic; } else { YQL_ENSURE(false, "Unknown physical tx type: " << value); } @@ -82,6 +88,8 @@ static TStringBuf PhysicalTxTypeToString(EPhysicalTxType type) { return "data"; case EPhysicalTxType::Scan: return "scan"; + case EPhysicalTxType::Generic: + return "generic"; } YQL_ENSURE(false, "Unexpected physical tx type: " << type); diff --git a/ydb/core/kqp/common/kqp_yql.h b/ydb/core/kqp/common/kqp_yql.h index 735e93360ab..eb2d22948ef 100644 --- a/ydb/core/kqp/common/kqp_yql.h +++ b/ydb/core/kqp/common/kqp_yql.h @@ -10,7 +10,8 @@ const TStringBuf KqpEffectTag = "KqpEffect"; enum class EPhysicalQueryType { Unspecified, Data, - Scan + Scan, + Query }; struct TKqpPhyQuerySettings { @@ -25,7 +26,8 @@ enum class EPhysicalTxType { Unspecified, Compute, Data, - Scan + Scan, + Generic }; struct TKqpPhyTxSettings { diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index 221fd387153..5da3f59da87 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -107,11 +107,27 @@ public: NCpuTime::TCpuTimer timer(CompileCpuTime); - AsyncCompileResult = Query.IsScan() - ? KqpHost->PrepareScanQuery(Query.Text, Query.IsSql(), prepareSettings) - : Query.IsSql() - ? KqpHost->PrepareDataQuery(Query.Text, prepareSettings) - : KqpHost->PrepareDataQueryAst(Query.Text, prepareSettings); + switch (Query.QueryType) { + case NKikimrKqp::QUERY_TYPE_SQL_DML: + AsyncCompileResult = KqpHost->PrepareDataQuery(Query.Text, prepareSettings); + break; + + case NKikimrKqp::QUERY_TYPE_AST_DML: + AsyncCompileResult = KqpHost->PrepareDataQueryAst(Query.Text, prepareSettings); + break; + + case NKikimrKqp::QUERY_TYPE_SQL_SCAN: + case NKikimrKqp::QUERY_TYPE_AST_SCAN: + AsyncCompileResult = KqpHost->PrepareScanQuery(Query.Text, Query.IsSql(), prepareSettings); + break; + + case NKikimrKqp::QUERY_TYPE_SQL_QUERY: + AsyncCompileResult = KqpHost->PrepareQuery(Query.Text, prepareSettings); + break; + + default: + YQL_ENSURE(false, "Unexpected query type: " << Query.QueryType); + } Continue(ctx); diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp index 9553ead0320..034804a1d91 100644 --- a/ydb/core/kqp/counters/kqp_counters.cpp +++ b/ydb/core/kqp/counters/kqp_counters.cpp @@ -102,6 +102,8 @@ void TKqpCountersBase::Init() { KqpGroup->GetCounter("Request/QueryTypeSqlScan", true); QueryTypes[NKikimrKqp::EQueryType::QUERY_TYPE_AST_SCAN] = KqpGroup->GetCounter("Request/QueryTypeAstScan", true); + QueryTypes[NKikimrKqp::EQueryType::QUERY_TYPE_SQL_QUERY] = + KqpGroup->GetCounter("Request/QueryTypeQuery", true); OtherQueryTypes = KqpGroup->GetCounter("Requests/QueryTypeOther", true); QueriesWithRangeScan = KqpGroup->GetCounter("Query/WithRangeScan", true); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp index 7e58c735738..4ee793167b7 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp @@ -207,6 +207,11 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt data = false; break; + case NKqpProto::TKqpPhyTx::TYPE_GENERIC: + // TODO: Use separate executer. + data = false; + break; + default: YQL_ENSURE(false, "Unsupported physical tx type: " << (ui32)tx.Body->GetType()); } diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index 79bc0d4a2cf..17e2f55f5bc 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -677,6 +677,8 @@ public: return KqpRunner->PrepareDataQuery(cluster, query, ctx, settings); case EKikimrQueryType::Scan: return KqpRunner->PrepareScanQuery(cluster, query, ctx, settings); + case EKikimrQueryType::Query: + return KqpRunner->PrepareQuery(cluster, query, ctx, settings); case EKikimrQueryType::YqlScript: case EKikimrQueryType::YqlScriptStreaming: break; @@ -1024,6 +1026,13 @@ public: }); } + IAsyncQueryResultPtr PrepareQuery(const TString& query, const TPrepareSettings& settings) override { + return CheckedProcessQuery(*ExprCtx, + [this, &query, settings] (TExprContext& ctx) mutable { + return PrepareQueryInternal(query, settings, ctx); + }); + } + IAsyncQueryResultPtr ExecuteYqlScript(const TString& script, NKikimrMiniKQL::TParams&& parameters, const TExecScriptSettings& settings) override { @@ -1317,6 +1326,29 @@ private: SessionCtx, *ExecuteCtx); } + IAsyncQueryResultPtr PrepareQueryInternal(const TString& query, const TPrepareSettings& settings, + TExprContext& ctx) + { + SetupYqlTransformer(); + + SessionCtx->Query().Type = EKikimrQueryType::Query; + SessionCtx->Query().PrepareOnly = true; + SessionCtx->Query().PreparingQuery = std::make_unique<NKikimrKqp::TPreparedQuery>(); + if (settings.DocumentApiRestricted) { + SessionCtx->Query().DocumentApiRestricted = *settings.DocumentApiRestricted; + } + + // TODO: Support PG + TMaybe<TSqlVersion> sqlVersion = 1; + auto queryExpr = CompileYqlQuery(query, /* isSql */ true, /* sqlAutoCommit */ false, ctx, sqlVersion); + if (!queryExpr) { + return nullptr; + } + + return MakeIntrusive<TAsyncPrepareYqlResult>(queryExpr.Get(), ctx, *YqlTransformer, SessionCtx->QueryPtr(), + query, sqlVersion); + } + IAsyncQueryResultPtr PrepareScanQueryInternal(const TString& query, bool isSql, TExprContext& ctx, EKikimrStatsMode statsMode = EKikimrStatsMode::None) { diff --git a/ydb/core/kqp/host/kqp_host.h b/ydb/core/kqp/host/kqp_host.h index 80ee24fd2f1..daf6321ff11 100644 --- a/ydb/core/kqp/host/kqp_host.h +++ b/ydb/core/kqp/host/kqp_host.h @@ -49,6 +49,9 @@ public: virtual IAsyncQueryResultPtr ExplainScanQuery(const TString& query, bool isSql) = 0; + /* Generic queries */ + virtual IAsyncQueryResultPtr PrepareQuery(const TString& query, const TPrepareSettings& settings) = 0; + /* Scripting */ virtual IAsyncQueryResultPtr ValidateYqlScript(const TString& script) = 0; virtual TQueryResult SyncValidateYqlScript(const TString& script) = 0; diff --git a/ydb/core/kqp/host/kqp_host_impl.h b/ydb/core/kqp/host/kqp_host_impl.h index 079d9597683..d0da15c6c38 100644 --- a/ydb/core/kqp/host/kqp_host_impl.h +++ b/ydb/core/kqp/host/kqp_host_impl.h @@ -236,6 +236,10 @@ public: virtual TIntrusivePtr<TAsyncQueryResult> PrepareScanQuery(const TString& cluster, const NYql::TExprNode::TPtr& query, NYql::TExprContext& ctx, const NYql::IKikimrQueryExecutor::TExecuteSettings& settings) = 0; + + virtual TIntrusivePtr<TAsyncQueryResult> PrepareQuery(const TString& cluster, + const NYql::TExprNode::TPtr& query, NYql::TExprContext& ctx, + const NYql::IKikimrQueryExecutor::TExecuteSettings& settings) = 0; }; TIntrusivePtr<IKqpRunner> CreateKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster, diff --git a/ydb/core/kqp/host/kqp_runner.cpp b/ydb/core/kqp/host/kqp_runner.cpp index 211e1f97b6f..22485389317 100644 --- a/ydb/core/kqp/host/kqp_runner.cpp +++ b/ydb/core/kqp/host/kqp_runner.cpp @@ -174,6 +174,31 @@ public: return PrepareQueryInternal(cluster, dataQueryBlocks, ctx, scanSettings); } + TIntrusivePtr<TAsyncQueryResult> PrepareQuery(const TString& cluster, const TExprNode::TPtr& query, + TExprContext& ctx, const IKikimrQueryExecutor::TExecuteSettings& settings) override + { + YQL_ENSURE(TransformCtx->QueryCtx->Type == EKikimrQueryType::Query); + YQL_ENSURE(TransformCtx->QueryCtx->PrepareOnly); + YQL_ENSURE(TransformCtx->QueryCtx->PreparingQuery); + YQL_ENSURE(TMaybeNode<TKiDataQueryBlocks>(query)); + + TKiDataQueryBlocks dataQueryBlocks(query); + + const auto& queryBlock = dataQueryBlocks.Arg(0); + if (queryBlock.Results().Size() != 1) { + ctx.AddError(YqlIssue(ctx.GetPosition(dataQueryBlocks.Pos()), TIssuesIds::KIKIMR_PRECONDITION_FAILED, + "Multiple result sets not yet supported.")); + return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues())); + } + if (queryBlock.Effects().ArgCount() > 0) { + ctx.AddError(YqlIssue(ctx.GetPosition(dataQueryBlocks.Pos()), TIssuesIds::KIKIMR_PRECONDITION_FAILED, + "Data modifications not yet supported.")); + return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues())); + } + + return PrepareQueryInternal(cluster, dataQueryBlocks, ctx, settings); + } + private: TIntrusivePtr<TAsyncQueryResult> PrepareQueryInternal(const TString& cluster, const TKiDataQueryBlocks& dataQueryBlocks, TExprContext& ctx, @@ -217,6 +242,7 @@ private: switch (queryType) { case EKikimrQueryType::Dml: case EKikimrQueryType::Scan: + case EKikimrQueryType::Query: break; default: YQL_ENSURE(false, "PrepareQueryNewEngine, unexpected query type: " << queryType); diff --git a/ydb/core/kqp/opt/kqp_opt_build_phy_query.cpp b/ydb/core/kqp/opt/kqp_opt_build_phy_query.cpp index 06ec2b12a4f..e3409d0cf12 100644 --- a/ydb/core/kqp/opt/kqp_opt_build_phy_query.cpp +++ b/ydb/core/kqp/opt/kqp_opt_build_phy_query.cpp @@ -33,6 +33,10 @@ public: querySettings.Type = EPhysicalQueryType::Scan; break; } + case EKikimrQueryType::Query: { + querySettings.Type = EPhysicalQueryType::Query; + break; + } default: { YQL_ENSURE(false, "Unexpected query type: " << KqpCtx->QueryCtx->Type); } diff --git a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp index c6cb7dd2431..b78d57d42e3 100644 --- a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp +++ b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp @@ -26,7 +26,7 @@ TAutoPtr<NYql::IGraphTransformer> CreateKqpBuildPhyStagesTransformer(bool allowD class TKqpBuildTxTransformer : public TSyncTransformerBase { public: TKqpBuildTxTransformer() - : QueryType(EKikimrQueryType::Dml) + : QueryType(EKikimrQueryType::Unspecified) , IsPrecompute(false) {} void Init(EKikimrQueryType queryType, bool isPrecompute) { @@ -68,6 +68,14 @@ private: return EPhysicalTxType::Scan; } + if (QueryType == EKikimrQueryType::Query) { + if (IsPrecompute && allStagesArePure) { + return EPhysicalTxType::Compute; + } + + return EPhysicalTxType::Generic; + } + if (allStagesArePure) { return EPhysicalTxType::Compute; } diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h index 8b71f0cc458..9079b5040b5 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider.h @@ -67,6 +67,7 @@ enum class EKikimrQueryType { YqlInternal, Scan, YqlScriptStreaming, + Query, }; struct TKikimrQueryContext : TThrRefBase { @@ -308,6 +309,13 @@ public: return false; } + if (queryType == EKikimrQueryType::Query && (newOp & KikimrSchemeOps())) { + TString message = TStringBuilder() << "Operation '" << newOp + << "' can't be performed in query"; + ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); + return false; + } + if (queryType == EKikimrQueryType::Ddl && (newOp & KikimrDataOps())) { TString message = TStringBuilder() << "Operation '" << newOp << "' can't be performed in scheme query"; diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 9dee2b8d195..8299c070ba3 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -92,22 +92,6 @@ std::optional<ui32> TryDecodeYdbSessionId(const TString& sessionId) { return std::nullopt; } -bool IsSqlQuery(const NKikimrKqp::EQueryType& queryType) { - switch (queryType) { - case NKikimrKqp::QUERY_TYPE_SQL_DML: - case NKikimrKqp::QUERY_TYPE_SQL_DDL: - case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT: - case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING: - case NKikimrKqp::QUERY_TYPE_SQL_SCAN: - return true; - - default: - break; - } - - return false; -} - TString EncodeSessionId(ui32 nodeId, const TString& id) { Ydb::TOperationId opId; opId.SetKind(NOperationId::TOperationId::SESSION_YQL); diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index 2b4096755ce..cdcd87ea577 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -29,6 +29,7 @@ NKqpProto::TKqpPhyTx::EType GetPhyTxType(const EPhysicalTxType& type) { case EPhysicalTxType::Compute: return NKqpProto::TKqpPhyTx::TYPE_COMPUTE; case EPhysicalTxType::Data: return NKqpProto::TKqpPhyTx::TYPE_DATA; case EPhysicalTxType::Scan: return NKqpProto::TKqpPhyTx::TYPE_SCAN; + case EPhysicalTxType::Generic: return NKqpProto::TKqpPhyTx::TYPE_GENERIC; case EPhysicalTxType::Unspecified: break; @@ -41,6 +42,7 @@ NKqpProto::TKqpPhyQuery::EType GetPhyQueryType(const EPhysicalQueryType& type) { switch (type) { case EPhysicalQueryType::Data: return NKqpProto::TKqpPhyQuery::TYPE_DATA; case EPhysicalQueryType::Scan: return NKqpProto::TKqpPhyQuery::TYPE_SCAN; + case EPhysicalQueryType::Query: return NKqpProto::TKqpPhyQuery::TYPE_QUERY; case EPhysicalQueryType::Unspecified: break; diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index b9e1afcc8cb..0f1f5861b54 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -400,6 +400,7 @@ public: case NKikimrKqp::QUERY_TYPE_SQL_SCAN: case NKikimrKqp::QUERY_TYPE_AST_SCAN: case NKikimrKqp::QUERY_TYPE_AST_DML: + case NKikimrKqp::QUERY_TYPE_SQL_QUERY: return true; // should not be compiled. TODO: forward to request executer @@ -707,6 +708,10 @@ public: ) { AcquirePersistentSnapshot(); return; + } else if (queryRequest.GetType() == NKikimrKqp::QUERY_TYPE_SQL_QUERY) { + // TODO: Switch to MVCC snapshots after moving to separate executer. + AcquirePersistentSnapshot(); + return; } else if (NeedSnapshot(*QueryState->TxCtx, *Config, /*rollback*/ false, QueryState->Commit, QueryState->PreparedQuery->GetPhysicalQuery())) { @@ -897,10 +902,22 @@ public: action = NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED; } - YQL_ENSURE(action == NKikimrKqp::QUERY_ACTION_EXECUTE && type == NKikimrKqp::QUERY_TYPE_SQL_SCAN - || action == NKikimrKqp::QUERY_ACTION_EXECUTE && type == NKikimrKqp::QUERY_TYPE_AST_SCAN - || action == NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED && type == NKikimrKqp::QUERY_TYPE_PREPARED_DML, - "Unexpected query action: " << action << " and type: " << type); + switch (action) { + case NKikimrKqp::QUERY_ACTION_EXECUTE: + YQL_ENSURE( + type == NKikimrKqp::QUERY_TYPE_SQL_SCAN || + type == NKikimrKqp::QUERY_TYPE_AST_SCAN || + type == NKikimrKqp::QUERY_TYPE_SQL_QUERY + ); + break; + + case NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED: + YQL_ENSURE(type == NKikimrKqp::QUERY_TYPE_PREPARED_DML); + break; + + default: + YQL_ENSURE(false, "Unexpected query action: " << action); + } ParseParameters(QueryState->Request.GetParameters()); ParseParameters(QueryState->Request.GetYdbParameters()); @@ -948,7 +965,7 @@ public: Cleanup(IsFatalError(record.GetYdbStatus())); } - IKqpGateway::TExecPhysicalRequest PrepareRequest(TKqpQueryState *queryState, TTxAllocatorState::TPtr alloc) { + IKqpGateway::TExecPhysicalRequest PrepareBaseRequest(TKqpQueryState *queryState, TTxAllocatorState::TPtr alloc) { IKqpGateway::TExecPhysicalRequest request(alloc); if (queryState) { @@ -971,13 +988,15 @@ public: IKqpGateway::TExecPhysicalRequest PreparePureRequest(TKqpQueryState *queryState) { - auto request = PrepareRequest(queryState, queryState->TxCtx->TxAlloc); + auto request = PrepareBaseRequest(queryState, queryState->TxCtx->TxAlloc); request.NeedTxId = false; return request; } - IKqpGateway::TExecPhysicalRequest PreparePhysicalRequest(TKqpQueryState *queryState, TTxAllocatorState::TPtr alloc) { - auto request = PrepareRequest(queryState, alloc); + IKqpGateway::TExecPhysicalRequest PreparePhysicalRequest(TKqpQueryState *queryState, + TTxAllocatorState::TPtr alloc) + { + auto request = PrepareBaseRequest(queryState, alloc); if (queryState) { request.Snapshot = queryState->TxCtx->GetSnapshot(); @@ -990,7 +1009,7 @@ public: } IKqpGateway::TExecPhysicalRequest PrepareScanRequest(TKqpQueryState *queryState) { - auto request = PrepareRequest(queryState, queryState->TxCtx->TxAlloc); + auto request = PrepareBaseRequest(queryState, queryState->TxCtx->TxAlloc); request.MaxComputeActors = Config->_KqpMaxComputeActors.Get().GetRef(); request.DisableLlvmForUdfStages = Config->DisableLlvmForUdfStages(); @@ -1001,6 +1020,42 @@ public: return request; } + IKqpGateway::TExecPhysicalRequest PrepareGenericRequest(TKqpQueryState *queryState) { + auto request = PrepareBaseRequest(queryState, queryState->TxCtx->TxAlloc); + + YQL_ENSURE(queryState); + request.Snapshot = queryState->TxCtx->GetSnapshot(); + + return request; + } + + IKqpGateway::TExecPhysicalRequest PrepareRequest(std::shared_ptr<const NKqpProto::TKqpPhyTx> tx, bool pure, + TKqpQueryState *queryState) + { + if (pure) { + YQL_ENSURE(tx); + return PreparePureRequest(QueryState.get()); + } + + if (!tx) { + return PreparePhysicalRequest(QueryState.get(), queryState->TxCtx->TxAlloc); + } + + switch (tx->GetType()) { + case NKqpProto::TKqpPhyTx::TYPE_COMPUTE: + // TODO: Compute is always pure, should not depend on number of stages. + return PreparePhysicalRequest(QueryState.get(), queryState->TxCtx->TxAlloc); + case NKqpProto::TKqpPhyTx::TYPE_DATA: + return PreparePhysicalRequest(QueryState.get(), queryState->TxCtx->TxAlloc); + case NKqpProto::TKqpPhyTx::TYPE_SCAN: + return PrepareScanRequest(QueryState.get()); + case NKqpProto::TKqpPhyTx::TYPE_GENERIC: + return PrepareGenericRequest(QueryState.get()); + default: + YQL_ENSURE(false, "Unexpected physical tx type: " << (int)tx->GetType()); + } + } + void ValidateParameter(const TString& name, const NKikimrMiniKQL::TType& type) { auto& txCtx = QueryState->TxCtx; YQL_ENSURE(txCtx); @@ -1176,14 +1231,10 @@ public: return true; }; + bool pure = tx && calcPure(*tx); + auto request = PrepareRequest(tx, pure, QueryState.get()); - auto request = pure - ? PreparePureRequest(QueryState.get()) - : (tx && tx->GetType() == NKqpProto::TKqpPhyTx::TYPE_SCAN) - ? PrepareScanRequest(QueryState.get()) - : PreparePhysicalRequest(QueryState.get(), QueryState->TxCtx->TxAlloc); - ; LOG_D("ExecutePhyTx, tx: " << (void*)tx.get() << " commit: " << commit << " txCtx.DeferredEffects.size(): " << txCtx.DeferredEffects.Size()); @@ -1198,6 +1249,7 @@ public: case NKqpProto::TKqpPhyTx::TYPE_COMPUTE: case NKqpProto::TKqpPhyTx::TYPE_DATA: case NKqpProto::TKqpPhyTx::TYPE_SCAN: + case NKqpProto::TKqpPhyTx::TYPE_GENERIC: break; default: YQL_ENSURE(false, "Unexpected physical tx type in data query: " << (ui32)tx->GetType()); @@ -1453,7 +1505,8 @@ public: case NKikimrKqp::QUERY_TYPE_PREPARED_DML: case NKikimrKqp::QUERY_TYPE_SQL_SCAN: case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT: - case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING: { + case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING: + case NKikimrKqp::QUERY_TYPE_SQL_QUERY: { TString text = ExtractQueryText(); if (IsQueryAllowedToLog(text)) { auto userSID = NACLib::TUserToken(QueryState->UserToken).GetUserSID(); @@ -1632,10 +1685,12 @@ public: bool useYdbResponseFormat = QueryState->Request.GetUsePublicResponseDataFormat(); // Result for scan query is sent directly to target actor. - bool isScanQuery = QueryState->Request.GetType() == NKikimrKqp::QUERY_TYPE_AST_SCAN - || QueryState->Request.GetType() == NKikimrKqp::QUERY_TYPE_SQL_SCAN; + bool streamResult = + QueryState->Request.GetType() == NKikimrKqp::QUERY_TYPE_AST_SCAN || + QueryState->Request.GetType() == NKikimrKqp::QUERY_TYPE_SQL_SCAN || + QueryState->Request.GetType() == NKikimrKqp::QUERY_TYPE_SQL_QUERY; - if (QueryState->PreparedQuery && !isScanQuery) { + if (QueryState->PreparedQuery && !streamResult) { auto& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery(); for (size_t i = 0; i < phyQuery.ResultBindingsSize(); ++i) { auto& rb = phyQuery.GetResultBindings(i); diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp index d21ad049b7f..50de1d25b3c 100644 --- a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp @@ -7,7 +7,7 @@ using namespace NYdb; using namespace NYdb::NQuery; Y_UNIT_TEST_SUITE(KqpQueryService) { - Y_UNIT_TEST(StreamExecuteQuery) { + Y_UNIT_TEST(StreamExecuteQueryPure) { auto kikimr = DefaultKikimrRunner(); auto db = kikimr.GetQueryClient(); @@ -33,7 +33,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { UNIT_ASSERT_VALUES_EQUAL(count, 1); } - Y_UNIT_TEST(ExecuteQuery) { + Y_UNIT_TEST(ExecuteQueryPure) { auto kikimr = DefaultKikimrRunner(); auto db = kikimr.GetQueryClient(); @@ -44,6 +44,47 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { CompareYson(R"([[1]])", FormatResultSetYson(result.GetResultSet(0))); } + + Y_UNIT_TEST(StreamExecuteQuery) { + auto kikimr = DefaultKikimrRunner(); + auto db = kikimr.GetQueryClient(); + + auto it = db.StreamExecuteQuery(R"( + SELECT Key, Value2 FROM TwoShard WHERE Value2 > 0; + )").ExtractValueSync(); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + + ui64 count = 0; + for (;;) { + auto streamPart = it.ReadNext().GetValueSync(); + if (!streamPart.IsSuccess()) { + UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString()); + break; + } + + if (streamPart.HasResultSet()) { + auto resultSet = streamPart.ExtractResultSet(); + count += resultSet.RowsCount(); + } + } + + UNIT_ASSERT_VALUES_EQUAL(count, 2); + } + + Y_UNIT_TEST(ExecuteQuery) { + auto kikimr = DefaultKikimrRunner(); + auto db = kikimr.GetQueryClient(); + + auto result = db.ExecuteQuery(R"( + SELECT Key, Value2 FROM TwoShard WHERE Value2 > 0; + )").ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + CompareYson(R"([ + [[3u];[1]]; + [[4000000003u];[1]] + ])", FormatResultSetYson(result.GetResultSet(0))); + } } } // namespace NKqp diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index a675b231400..622a4ab2bc0 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -36,6 +36,8 @@ enum EQueryType { QUERY_TYPE_SQL_SCAN = 9; QUERY_TYPE_AST_SCAN = 10; QUERY_TYPE_SQL_SCRIPT_STREAMING = 11; + + QUERY_TYPE_SQL_QUERY = 12; }; enum EQueryAction { diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto index c049b2cd9ab..854dafe4756 100644 --- a/ydb/core/protos/kqp_physical.proto +++ b/ydb/core/protos/kqp_physical.proto @@ -287,6 +287,7 @@ message TKqpPhyTx { TYPE_COMPUTE = 1; TYPE_DATA = 2; TYPE_SCAN = 3; + TYPE_GENERIC = 4; }; EType Type = 1; @@ -325,6 +326,7 @@ message TKqpPhyQuery { TYPE_UNSPECIFIED = 0; TYPE_DATA = 1; TYPE_SCAN = 2; + TYPE_QUERY = 3; }; EType Type = 1; |