aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorspuchin <spuchin@ydb.tech>2023-01-08 21:40:07 +0300
committerspuchin <spuchin@ydb.tech>2023-01-08 21:40:07 +0300
commitddec3014fde819da31b3c2ce50321b44e77de0c4 (patch)
tree18ef9f72c3c5dfc9b0d330374bd322c17cb827dd
parent5b8946eafc3ffcfe351d1e070a395bee8337dead (diff)
downloadydb-ddec3014fde819da31b3c2ce50321b44e77de0c4.tar.gz
Add separate KQP query type for generic query. ()
-rw-r--r--ydb/core/grpc_services/query/rpc_execute_query.cpp3
-rw-r--r--ydb/core/kqp/common/kqp.cpp17
-rw-r--r--ydb/core/kqp/common/kqp.h14
-rw-r--r--ydb/core/kqp/common/kqp_timeouts.cpp1
-rw-r--r--ydb/core/kqp/common/kqp_yql.cpp8
-rw-r--r--ydb/core/kqp/common/kqp_yql.h6
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_actor.cpp26
-rw-r--r--ydb/core/kqp/counters/kqp_counters.cpp2
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.cpp5
-rw-r--r--ydb/core/kqp/host/kqp_host.cpp32
-rw-r--r--ydb/core/kqp/host/kqp_host.h3
-rw-r--r--ydb/core/kqp/host/kqp_host_impl.h4
-rw-r--r--ydb/core/kqp/host/kqp_runner.cpp26
-rw-r--r--ydb/core/kqp/opt/kqp_opt_build_phy_query.cpp4
-rw-r--r--ydb/core/kqp/opt/kqp_opt_build_txs.cpp10
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_provider.h8
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.cpp16
-rw-r--r--ydb/core/kqp/query_compiler/kqp_query_compiler.cpp2
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp93
-rw-r--r--ydb/core/kqp/ut/service/kqp_query_service_ut.cpp45
-rw-r--r--ydb/core/protos/kqp.proto2
-rw-r--r--ydb/core/protos/kqp_physical.proto2
22 files changed, 275 insertions, 54 deletions
diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp
index c432aa49d7c..27fbc44d33e 100644
--- a/ydb/core/grpc_services/query/rpc_execute_query.cpp
+++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp
@@ -34,8 +34,7 @@ std::tuple<Ydb::StatusIds::StatusCode, NYql::TIssues> FillKqpRequest(
}
}
- // TODO: Use new type of query (QUERY_TYPE_SQL_QUERY)
- kqpRequest.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_SCAN);
+ kqpRequest.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_QUERY);
kqpRequest.MutableRequest()->SetKeepSession(false);
switch (req.query_case()) {
diff --git a/ydb/core/kqp/common/kqp.cpp b/ydb/core/kqp/common/kqp.cpp
index 040bc5f9f40..d54a2c04b56 100644
--- a/ydb/core/kqp/common/kqp.cpp
+++ b/ydb/core/kqp/common/kqp.cpp
@@ -6,6 +6,23 @@
namespace NKikimr::NKqp {
+bool IsSqlQuery(const NKikimrKqp::EQueryType& queryType) {
+ switch (queryType) {
+ case NKikimrKqp::QUERY_TYPE_SQL_DML:
+ case NKikimrKqp::QUERY_TYPE_SQL_DDL:
+ case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT:
+ case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING:
+ case NKikimrKqp::QUERY_TYPE_SQL_SCAN:
+ case NKikimrKqp::QUERY_TYPE_SQL_QUERY:
+ return true;
+
+ default:
+ break;
+ }
+
+ return false;
+}
+
TKqpShutdownController::TKqpShutdownController(NActors::TActorId kqpProxyActorId, const NKikimrConfig::TTableServiceConfig& tableServiceConfig, bool enableGraceful)
: KqpProxyActorId_(kqpProxyActorId)
, EnableGraceful(enableGraceful)
diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h
index 7e7fe78adc9..7038cd03906 100644
--- a/ydb/core/kqp/common/kqp.h
+++ b/ydb/core/kqp/common/kqp.h
@@ -145,6 +145,8 @@ struct TKqpQuerySettings {
}
};
+bool IsSqlQuery(const NKikimrKqp::EQueryType& queryType);
+
struct TKqpQueryId {
TString Cluster;
TString Database;
@@ -165,19 +167,17 @@ public:
case NKikimrKqp::QUERY_TYPE_SQL_SCAN:
case NKikimrKqp::QUERY_TYPE_AST_DML:
case NKikimrKqp::QUERY_TYPE_AST_SCAN:
- break;
+ case NKikimrKqp::QUERY_TYPE_SQL_QUERY:
+ break;
+
default:
- Y_ENSURE(false, "Unsupported request type");
+ Y_ENSURE(false, "Unsupported request type");
}
}
- bool IsScan() const {
- return QueryType == NKikimrKqp::QUERY_TYPE_SQL_SCAN || QueryType == NKikimrKqp::QUERY_TYPE_AST_SCAN;
- }
-
bool IsSql() const {
- return QueryType == NKikimrKqp::QUERY_TYPE_SQL_DML || QueryType == NKikimrKqp::QUERY_TYPE_SQL_SCAN;
+ return IsSqlQuery(QueryType);
}
bool operator==(const TKqpQueryId& other) const {
diff --git a/ydb/core/kqp/common/kqp_timeouts.cpp b/ydb/core/kqp/common/kqp_timeouts.cpp
index c8ad9c584aa..f66f9c7a3b8 100644
--- a/ydb/core/kqp/common/kqp_timeouts.cpp
+++ b/ydb/core/kqp/common/kqp_timeouts.cpp
@@ -15,6 +15,7 @@ ui64 GetDefaultQueryTimeoutMs(NKikimrKqp::EQueryType queryType, const NKikimrCon
case NKikimrKqp::QUERY_TYPE_SQL_DML:
case NKikimrKqp::QUERY_TYPE_PREPARED_DML:
case NKikimrKqp::QUERY_TYPE_AST_DML:
+ case NKikimrKqp::QUERY_TYPE_SQL_QUERY:
return queryLimits.GetDataQueryTimeoutMs();
case NKikimrKqp::QUERY_TYPE_SQL_SCAN:
diff --git a/ydb/core/kqp/common/kqp_yql.cpp b/ydb/core/kqp/common/kqp_yql.cpp
index 8cbaaecba95..a7b5196d997 100644
--- a/ydb/core/kqp/common/kqp_yql.cpp
+++ b/ydb/core/kqp/common/kqp_yql.cpp
@@ -13,6 +13,8 @@ static EPhysicalQueryType GetPhysicalQueryType(const TStringBuf& value) {
return EPhysicalQueryType::Data;
} else if (value == "scan_query") {
return EPhysicalQueryType::Scan;
+ } else if (value == "query") {
+ return EPhysicalQueryType::Query;
} else {
YQL_ENSURE(false, "Unknown physical query type: " << value);
}
@@ -26,6 +28,8 @@ static TStringBuf PhysicalQueryTypeToString(EPhysicalQueryType type) {
return "data_query";
case EPhysicalQueryType::Scan:
return "scan_query";
+ case EPhysicalQueryType::Query:
+ return "query";
}
YQL_ENSURE(false, "Unexpected physical query type: " << type);
@@ -67,6 +71,8 @@ static EPhysicalTxType GetPhysicalTxType(const TStringBuf& value) {
return EPhysicalTxType::Data;
} else if (value == "scan") {
return EPhysicalTxType::Scan;
+ } else if (value == "generic") {
+ return EPhysicalTxType::Generic;
} else {
YQL_ENSURE(false, "Unknown physical tx type: " << value);
}
@@ -82,6 +88,8 @@ static TStringBuf PhysicalTxTypeToString(EPhysicalTxType type) {
return "data";
case EPhysicalTxType::Scan:
return "scan";
+ case EPhysicalTxType::Generic:
+ return "generic";
}
YQL_ENSURE(false, "Unexpected physical tx type: " << type);
diff --git a/ydb/core/kqp/common/kqp_yql.h b/ydb/core/kqp/common/kqp_yql.h
index 735e93360ab..eb2d22948ef 100644
--- a/ydb/core/kqp/common/kqp_yql.h
+++ b/ydb/core/kqp/common/kqp_yql.h
@@ -10,7 +10,8 @@ const TStringBuf KqpEffectTag = "KqpEffect";
enum class EPhysicalQueryType {
Unspecified,
Data,
- Scan
+ Scan,
+ Query
};
struct TKqpPhyQuerySettings {
@@ -25,7 +26,8 @@ enum class EPhysicalTxType {
Unspecified,
Compute,
Data,
- Scan
+ Scan,
+ Generic
};
struct TKqpPhyTxSettings {
diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
index 221fd387153..5da3f59da87 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
@@ -107,11 +107,27 @@ public:
NCpuTime::TCpuTimer timer(CompileCpuTime);
- AsyncCompileResult = Query.IsScan()
- ? KqpHost->PrepareScanQuery(Query.Text, Query.IsSql(), prepareSettings)
- : Query.IsSql()
- ? KqpHost->PrepareDataQuery(Query.Text, prepareSettings)
- : KqpHost->PrepareDataQueryAst(Query.Text, prepareSettings);
+ switch (Query.QueryType) {
+ case NKikimrKqp::QUERY_TYPE_SQL_DML:
+ AsyncCompileResult = KqpHost->PrepareDataQuery(Query.Text, prepareSettings);
+ break;
+
+ case NKikimrKqp::QUERY_TYPE_AST_DML:
+ AsyncCompileResult = KqpHost->PrepareDataQueryAst(Query.Text, prepareSettings);
+ break;
+
+ case NKikimrKqp::QUERY_TYPE_SQL_SCAN:
+ case NKikimrKqp::QUERY_TYPE_AST_SCAN:
+ AsyncCompileResult = KqpHost->PrepareScanQuery(Query.Text, Query.IsSql(), prepareSettings);
+ break;
+
+ case NKikimrKqp::QUERY_TYPE_SQL_QUERY:
+ AsyncCompileResult = KqpHost->PrepareQuery(Query.Text, prepareSettings);
+ break;
+
+ default:
+ YQL_ENSURE(false, "Unexpected query type: " << Query.QueryType);
+ }
Continue(ctx);
diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp
index 9553ead0320..034804a1d91 100644
--- a/ydb/core/kqp/counters/kqp_counters.cpp
+++ b/ydb/core/kqp/counters/kqp_counters.cpp
@@ -102,6 +102,8 @@ void TKqpCountersBase::Init() {
KqpGroup->GetCounter("Request/QueryTypeSqlScan", true);
QueryTypes[NKikimrKqp::EQueryType::QUERY_TYPE_AST_SCAN] =
KqpGroup->GetCounter("Request/QueryTypeAstScan", true);
+ QueryTypes[NKikimrKqp::EQueryType::QUERY_TYPE_SQL_QUERY] =
+ KqpGroup->GetCounter("Request/QueryTypeQuery", true);
OtherQueryTypes = KqpGroup->GetCounter("Requests/QueryTypeOther", true);
QueriesWithRangeScan = KqpGroup->GetCounter("Query/WithRangeScan", true);
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
index 7e58c735738..4ee793167b7 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
@@ -207,6 +207,11 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
data = false;
break;
+ case NKqpProto::TKqpPhyTx::TYPE_GENERIC:
+ // TODO: Use separate executer.
+ data = false;
+ break;
+
default:
YQL_ENSURE(false, "Unsupported physical tx type: " << (ui32)tx.Body->GetType());
}
diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp
index 79bc0d4a2cf..17e2f55f5bc 100644
--- a/ydb/core/kqp/host/kqp_host.cpp
+++ b/ydb/core/kqp/host/kqp_host.cpp
@@ -677,6 +677,8 @@ public:
return KqpRunner->PrepareDataQuery(cluster, query, ctx, settings);
case EKikimrQueryType::Scan:
return KqpRunner->PrepareScanQuery(cluster, query, ctx, settings);
+ case EKikimrQueryType::Query:
+ return KqpRunner->PrepareQuery(cluster, query, ctx, settings);
case EKikimrQueryType::YqlScript:
case EKikimrQueryType::YqlScriptStreaming:
break;
@@ -1024,6 +1026,13 @@ public:
});
}
+ IAsyncQueryResultPtr PrepareQuery(const TString& query, const TPrepareSettings& settings) override {
+ return CheckedProcessQuery(*ExprCtx,
+ [this, &query, settings] (TExprContext& ctx) mutable {
+ return PrepareQueryInternal(query, settings, ctx);
+ });
+ }
+
IAsyncQueryResultPtr ExecuteYqlScript(const TString& script, NKikimrMiniKQL::TParams&& parameters,
const TExecScriptSettings& settings) override
{
@@ -1317,6 +1326,29 @@ private:
SessionCtx, *ExecuteCtx);
}
+ IAsyncQueryResultPtr PrepareQueryInternal(const TString& query, const TPrepareSettings& settings,
+ TExprContext& ctx)
+ {
+ SetupYqlTransformer();
+
+ SessionCtx->Query().Type = EKikimrQueryType::Query;
+ SessionCtx->Query().PrepareOnly = true;
+ SessionCtx->Query().PreparingQuery = std::make_unique<NKikimrKqp::TPreparedQuery>();
+ if (settings.DocumentApiRestricted) {
+ SessionCtx->Query().DocumentApiRestricted = *settings.DocumentApiRestricted;
+ }
+
+ // TODO: Support PG
+ TMaybe<TSqlVersion> sqlVersion = 1;
+ auto queryExpr = CompileYqlQuery(query, /* isSql */ true, /* sqlAutoCommit */ false, ctx, sqlVersion);
+ if (!queryExpr) {
+ return nullptr;
+ }
+
+ return MakeIntrusive<TAsyncPrepareYqlResult>(queryExpr.Get(), ctx, *YqlTransformer, SessionCtx->QueryPtr(),
+ query, sqlVersion);
+ }
+
IAsyncQueryResultPtr PrepareScanQueryInternal(const TString& query, bool isSql, TExprContext& ctx,
EKikimrStatsMode statsMode = EKikimrStatsMode::None)
{
diff --git a/ydb/core/kqp/host/kqp_host.h b/ydb/core/kqp/host/kqp_host.h
index 80ee24fd2f1..daf6321ff11 100644
--- a/ydb/core/kqp/host/kqp_host.h
+++ b/ydb/core/kqp/host/kqp_host.h
@@ -49,6 +49,9 @@ public:
virtual IAsyncQueryResultPtr ExplainScanQuery(const TString& query, bool isSql) = 0;
+ /* Generic queries */
+ virtual IAsyncQueryResultPtr PrepareQuery(const TString& query, const TPrepareSettings& settings) = 0;
+
/* Scripting */
virtual IAsyncQueryResultPtr ValidateYqlScript(const TString& script) = 0;
virtual TQueryResult SyncValidateYqlScript(const TString& script) = 0;
diff --git a/ydb/core/kqp/host/kqp_host_impl.h b/ydb/core/kqp/host/kqp_host_impl.h
index 079d9597683..d0da15c6c38 100644
--- a/ydb/core/kqp/host/kqp_host_impl.h
+++ b/ydb/core/kqp/host/kqp_host_impl.h
@@ -236,6 +236,10 @@ public:
virtual TIntrusivePtr<TAsyncQueryResult> PrepareScanQuery(const TString& cluster,
const NYql::TExprNode::TPtr& query, NYql::TExprContext& ctx,
const NYql::IKikimrQueryExecutor::TExecuteSettings& settings) = 0;
+
+ virtual TIntrusivePtr<TAsyncQueryResult> PrepareQuery(const TString& cluster,
+ const NYql::TExprNode::TPtr& query, NYql::TExprContext& ctx,
+ const NYql::IKikimrQueryExecutor::TExecuteSettings& settings) = 0;
};
TIntrusivePtr<IKqpRunner> CreateKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
diff --git a/ydb/core/kqp/host/kqp_runner.cpp b/ydb/core/kqp/host/kqp_runner.cpp
index 211e1f97b6f..22485389317 100644
--- a/ydb/core/kqp/host/kqp_runner.cpp
+++ b/ydb/core/kqp/host/kqp_runner.cpp
@@ -174,6 +174,31 @@ public:
return PrepareQueryInternal(cluster, dataQueryBlocks, ctx, scanSettings);
}
+ TIntrusivePtr<TAsyncQueryResult> PrepareQuery(const TString& cluster, const TExprNode::TPtr& query,
+ TExprContext& ctx, const IKikimrQueryExecutor::TExecuteSettings& settings) override
+ {
+ YQL_ENSURE(TransformCtx->QueryCtx->Type == EKikimrQueryType::Query);
+ YQL_ENSURE(TransformCtx->QueryCtx->PrepareOnly);
+ YQL_ENSURE(TransformCtx->QueryCtx->PreparingQuery);
+ YQL_ENSURE(TMaybeNode<TKiDataQueryBlocks>(query));
+
+ TKiDataQueryBlocks dataQueryBlocks(query);
+
+ const auto& queryBlock = dataQueryBlocks.Arg(0);
+ if (queryBlock.Results().Size() != 1) {
+ ctx.AddError(YqlIssue(ctx.GetPosition(dataQueryBlocks.Pos()), TIssuesIds::KIKIMR_PRECONDITION_FAILED,
+ "Multiple result sets not yet supported."));
+ return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues()));
+ }
+ if (queryBlock.Effects().ArgCount() > 0) {
+ ctx.AddError(YqlIssue(ctx.GetPosition(dataQueryBlocks.Pos()), TIssuesIds::KIKIMR_PRECONDITION_FAILED,
+ "Data modifications not yet supported."));
+ return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues()));
+ }
+
+ return PrepareQueryInternal(cluster, dataQueryBlocks, ctx, settings);
+ }
+
private:
TIntrusivePtr<TAsyncQueryResult> PrepareQueryInternal(const TString& cluster,
const TKiDataQueryBlocks& dataQueryBlocks, TExprContext& ctx,
@@ -217,6 +242,7 @@ private:
switch (queryType) {
case EKikimrQueryType::Dml:
case EKikimrQueryType::Scan:
+ case EKikimrQueryType::Query:
break;
default:
YQL_ENSURE(false, "PrepareQueryNewEngine, unexpected query type: " << queryType);
diff --git a/ydb/core/kqp/opt/kqp_opt_build_phy_query.cpp b/ydb/core/kqp/opt/kqp_opt_build_phy_query.cpp
index 06ec2b12a4f..e3409d0cf12 100644
--- a/ydb/core/kqp/opt/kqp_opt_build_phy_query.cpp
+++ b/ydb/core/kqp/opt/kqp_opt_build_phy_query.cpp
@@ -33,6 +33,10 @@ public:
querySettings.Type = EPhysicalQueryType::Scan;
break;
}
+ case EKikimrQueryType::Query: {
+ querySettings.Type = EPhysicalQueryType::Query;
+ break;
+ }
default: {
YQL_ENSURE(false, "Unexpected query type: " << KqpCtx->QueryCtx->Type);
}
diff --git a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
index c6cb7dd2431..b78d57d42e3 100644
--- a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
+++ b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
@@ -26,7 +26,7 @@ TAutoPtr<NYql::IGraphTransformer> CreateKqpBuildPhyStagesTransformer(bool allowD
class TKqpBuildTxTransformer : public TSyncTransformerBase {
public:
TKqpBuildTxTransformer()
- : QueryType(EKikimrQueryType::Dml)
+ : QueryType(EKikimrQueryType::Unspecified)
, IsPrecompute(false) {}
void Init(EKikimrQueryType queryType, bool isPrecompute) {
@@ -68,6 +68,14 @@ private:
return EPhysicalTxType::Scan;
}
+ if (QueryType == EKikimrQueryType::Query) {
+ if (IsPrecompute && allStagesArePure) {
+ return EPhysicalTxType::Compute;
+ }
+
+ return EPhysicalTxType::Generic;
+ }
+
if (allStagesArePure) {
return EPhysicalTxType::Compute;
}
diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h
index 8b71f0cc458..9079b5040b5 100644
--- a/ydb/core/kqp/provider/yql_kikimr_provider.h
+++ b/ydb/core/kqp/provider/yql_kikimr_provider.h
@@ -67,6 +67,7 @@ enum class EKikimrQueryType {
YqlInternal,
Scan,
YqlScriptStreaming,
+ Query,
};
struct TKikimrQueryContext : TThrRefBase {
@@ -308,6 +309,13 @@ public:
return false;
}
+ if (queryType == EKikimrQueryType::Query && (newOp & KikimrSchemeOps())) {
+ TString message = TStringBuilder() << "Operation '" << newOp
+ << "' can't be performed in query";
+ ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
+ return false;
+ }
+
if (queryType == EKikimrQueryType::Ddl && (newOp & KikimrDataOps())) {
TString message = TStringBuilder() << "Operation '" << newOp
<< "' can't be performed in scheme query";
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
index 9dee2b8d195..8299c070ba3 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
@@ -92,22 +92,6 @@ std::optional<ui32> TryDecodeYdbSessionId(const TString& sessionId) {
return std::nullopt;
}
-bool IsSqlQuery(const NKikimrKqp::EQueryType& queryType) {
- switch (queryType) {
- case NKikimrKqp::QUERY_TYPE_SQL_DML:
- case NKikimrKqp::QUERY_TYPE_SQL_DDL:
- case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT:
- case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING:
- case NKikimrKqp::QUERY_TYPE_SQL_SCAN:
- return true;
-
- default:
- break;
- }
-
- return false;
-}
-
TString EncodeSessionId(ui32 nodeId, const TString& id) {
Ydb::TOperationId opId;
opId.SetKind(NOperationId::TOperationId::SESSION_YQL);
diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
index 2b4096755ce..cdcd87ea577 100644
--- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
+++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
@@ -29,6 +29,7 @@ NKqpProto::TKqpPhyTx::EType GetPhyTxType(const EPhysicalTxType& type) {
case EPhysicalTxType::Compute: return NKqpProto::TKqpPhyTx::TYPE_COMPUTE;
case EPhysicalTxType::Data: return NKqpProto::TKqpPhyTx::TYPE_DATA;
case EPhysicalTxType::Scan: return NKqpProto::TKqpPhyTx::TYPE_SCAN;
+ case EPhysicalTxType::Generic: return NKqpProto::TKqpPhyTx::TYPE_GENERIC;
case EPhysicalTxType::Unspecified:
break;
@@ -41,6 +42,7 @@ NKqpProto::TKqpPhyQuery::EType GetPhyQueryType(const EPhysicalQueryType& type) {
switch (type) {
case EPhysicalQueryType::Data: return NKqpProto::TKqpPhyQuery::TYPE_DATA;
case EPhysicalQueryType::Scan: return NKqpProto::TKqpPhyQuery::TYPE_SCAN;
+ case EPhysicalQueryType::Query: return NKqpProto::TKqpPhyQuery::TYPE_QUERY;
case EPhysicalQueryType::Unspecified:
break;
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index b9e1afcc8cb..0f1f5861b54 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -400,6 +400,7 @@ public:
case NKikimrKqp::QUERY_TYPE_SQL_SCAN:
case NKikimrKqp::QUERY_TYPE_AST_SCAN:
case NKikimrKqp::QUERY_TYPE_AST_DML:
+ case NKikimrKqp::QUERY_TYPE_SQL_QUERY:
return true;
// should not be compiled. TODO: forward to request executer
@@ -707,6 +708,10 @@ public:
) {
AcquirePersistentSnapshot();
return;
+ } else if (queryRequest.GetType() == NKikimrKqp::QUERY_TYPE_SQL_QUERY) {
+ // TODO: Switch to MVCC snapshots after moving to separate executer.
+ AcquirePersistentSnapshot();
+ return;
} else if (NeedSnapshot(*QueryState->TxCtx, *Config, /*rollback*/ false, QueryState->Commit,
QueryState->PreparedQuery->GetPhysicalQuery()))
{
@@ -897,10 +902,22 @@ public:
action = NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED;
}
- YQL_ENSURE(action == NKikimrKqp::QUERY_ACTION_EXECUTE && type == NKikimrKqp::QUERY_TYPE_SQL_SCAN
- || action == NKikimrKqp::QUERY_ACTION_EXECUTE && type == NKikimrKqp::QUERY_TYPE_AST_SCAN
- || action == NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED && type == NKikimrKqp::QUERY_TYPE_PREPARED_DML,
- "Unexpected query action: " << action << " and type: " << type);
+ switch (action) {
+ case NKikimrKqp::QUERY_ACTION_EXECUTE:
+ YQL_ENSURE(
+ type == NKikimrKqp::QUERY_TYPE_SQL_SCAN ||
+ type == NKikimrKqp::QUERY_TYPE_AST_SCAN ||
+ type == NKikimrKqp::QUERY_TYPE_SQL_QUERY
+ );
+ break;
+
+ case NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED:
+ YQL_ENSURE(type == NKikimrKqp::QUERY_TYPE_PREPARED_DML);
+ break;
+
+ default:
+ YQL_ENSURE(false, "Unexpected query action: " << action);
+ }
ParseParameters(QueryState->Request.GetParameters());
ParseParameters(QueryState->Request.GetYdbParameters());
@@ -948,7 +965,7 @@ public:
Cleanup(IsFatalError(record.GetYdbStatus()));
}
- IKqpGateway::TExecPhysicalRequest PrepareRequest(TKqpQueryState *queryState, TTxAllocatorState::TPtr alloc) {
+ IKqpGateway::TExecPhysicalRequest PrepareBaseRequest(TKqpQueryState *queryState, TTxAllocatorState::TPtr alloc) {
IKqpGateway::TExecPhysicalRequest request(alloc);
if (queryState) {
@@ -971,13 +988,15 @@ public:
IKqpGateway::TExecPhysicalRequest PreparePureRequest(TKqpQueryState *queryState) {
- auto request = PrepareRequest(queryState, queryState->TxCtx->TxAlloc);
+ auto request = PrepareBaseRequest(queryState, queryState->TxCtx->TxAlloc);
request.NeedTxId = false;
return request;
}
- IKqpGateway::TExecPhysicalRequest PreparePhysicalRequest(TKqpQueryState *queryState, TTxAllocatorState::TPtr alloc) {
- auto request = PrepareRequest(queryState, alloc);
+ IKqpGateway::TExecPhysicalRequest PreparePhysicalRequest(TKqpQueryState *queryState,
+ TTxAllocatorState::TPtr alloc)
+ {
+ auto request = PrepareBaseRequest(queryState, alloc);
if (queryState) {
request.Snapshot = queryState->TxCtx->GetSnapshot();
@@ -990,7 +1009,7 @@ public:
}
IKqpGateway::TExecPhysicalRequest PrepareScanRequest(TKqpQueryState *queryState) {
- auto request = PrepareRequest(queryState, queryState->TxCtx->TxAlloc);
+ auto request = PrepareBaseRequest(queryState, queryState->TxCtx->TxAlloc);
request.MaxComputeActors = Config->_KqpMaxComputeActors.Get().GetRef();
request.DisableLlvmForUdfStages = Config->DisableLlvmForUdfStages();
@@ -1001,6 +1020,42 @@ public:
return request;
}
+ IKqpGateway::TExecPhysicalRequest PrepareGenericRequest(TKqpQueryState *queryState) {
+ auto request = PrepareBaseRequest(queryState, queryState->TxCtx->TxAlloc);
+
+ YQL_ENSURE(queryState);
+ request.Snapshot = queryState->TxCtx->GetSnapshot();
+
+ return request;
+ }
+
+ IKqpGateway::TExecPhysicalRequest PrepareRequest(std::shared_ptr<const NKqpProto::TKqpPhyTx> tx, bool pure,
+ TKqpQueryState *queryState)
+ {
+ if (pure) {
+ YQL_ENSURE(tx);
+ return PreparePureRequest(QueryState.get());
+ }
+
+ if (!tx) {
+ return PreparePhysicalRequest(QueryState.get(), queryState->TxCtx->TxAlloc);
+ }
+
+ switch (tx->GetType()) {
+ case NKqpProto::TKqpPhyTx::TYPE_COMPUTE:
+ // TODO: Compute is always pure, should not depend on number of stages.
+ return PreparePhysicalRequest(QueryState.get(), queryState->TxCtx->TxAlloc);
+ case NKqpProto::TKqpPhyTx::TYPE_DATA:
+ return PreparePhysicalRequest(QueryState.get(), queryState->TxCtx->TxAlloc);
+ case NKqpProto::TKqpPhyTx::TYPE_SCAN:
+ return PrepareScanRequest(QueryState.get());
+ case NKqpProto::TKqpPhyTx::TYPE_GENERIC:
+ return PrepareGenericRequest(QueryState.get());
+ default:
+ YQL_ENSURE(false, "Unexpected physical tx type: " << (int)tx->GetType());
+ }
+ }
+
void ValidateParameter(const TString& name, const NKikimrMiniKQL::TType& type) {
auto& txCtx = QueryState->TxCtx;
YQL_ENSURE(txCtx);
@@ -1176,14 +1231,10 @@ public:
return true;
};
+
bool pure = tx && calcPure(*tx);
+ auto request = PrepareRequest(tx, pure, QueryState.get());
- auto request = pure
- ? PreparePureRequest(QueryState.get())
- : (tx && tx->GetType() == NKqpProto::TKqpPhyTx::TYPE_SCAN)
- ? PrepareScanRequest(QueryState.get())
- : PreparePhysicalRequest(QueryState.get(), QueryState->TxCtx->TxAlloc);
- ;
LOG_D("ExecutePhyTx, tx: " << (void*)tx.get() << " commit: " << commit
<< " txCtx.DeferredEffects.size(): " << txCtx.DeferredEffects.Size());
@@ -1198,6 +1249,7 @@ public:
case NKqpProto::TKqpPhyTx::TYPE_COMPUTE:
case NKqpProto::TKqpPhyTx::TYPE_DATA:
case NKqpProto::TKqpPhyTx::TYPE_SCAN:
+ case NKqpProto::TKqpPhyTx::TYPE_GENERIC:
break;
default:
YQL_ENSURE(false, "Unexpected physical tx type in data query: " << (ui32)tx->GetType());
@@ -1453,7 +1505,8 @@ public:
case NKikimrKqp::QUERY_TYPE_PREPARED_DML:
case NKikimrKqp::QUERY_TYPE_SQL_SCAN:
case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT:
- case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING: {
+ case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING:
+ case NKikimrKqp::QUERY_TYPE_SQL_QUERY: {
TString text = ExtractQueryText();
if (IsQueryAllowedToLog(text)) {
auto userSID = NACLib::TUserToken(QueryState->UserToken).GetUserSID();
@@ -1632,10 +1685,12 @@ public:
bool useYdbResponseFormat = QueryState->Request.GetUsePublicResponseDataFormat();
// Result for scan query is sent directly to target actor.
- bool isScanQuery = QueryState->Request.GetType() == NKikimrKqp::QUERY_TYPE_AST_SCAN
- || QueryState->Request.GetType() == NKikimrKqp::QUERY_TYPE_SQL_SCAN;
+ bool streamResult =
+ QueryState->Request.GetType() == NKikimrKqp::QUERY_TYPE_AST_SCAN ||
+ QueryState->Request.GetType() == NKikimrKqp::QUERY_TYPE_SQL_SCAN ||
+ QueryState->Request.GetType() == NKikimrKqp::QUERY_TYPE_SQL_QUERY;
- if (QueryState->PreparedQuery && !isScanQuery) {
+ if (QueryState->PreparedQuery && !streamResult) {
auto& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
for (size_t i = 0; i < phyQuery.ResultBindingsSize(); ++i) {
auto& rb = phyQuery.GetResultBindings(i);
diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
index d21ad049b7f..50de1d25b3c 100644
--- a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
@@ -7,7 +7,7 @@ using namespace NYdb;
using namespace NYdb::NQuery;
Y_UNIT_TEST_SUITE(KqpQueryService) {
- Y_UNIT_TEST(StreamExecuteQuery) {
+ Y_UNIT_TEST(StreamExecuteQueryPure) {
auto kikimr = DefaultKikimrRunner();
auto db = kikimr.GetQueryClient();
@@ -33,7 +33,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
UNIT_ASSERT_VALUES_EQUAL(count, 1);
}
- Y_UNIT_TEST(ExecuteQuery) {
+ Y_UNIT_TEST(ExecuteQueryPure) {
auto kikimr = DefaultKikimrRunner();
auto db = kikimr.GetQueryClient();
@@ -44,6 +44,47 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
CompareYson(R"([[1]])", FormatResultSetYson(result.GetResultSet(0)));
}
+
+ Y_UNIT_TEST(StreamExecuteQuery) {
+ auto kikimr = DefaultKikimrRunner();
+ auto db = kikimr.GetQueryClient();
+
+ auto it = db.StreamExecuteQuery(R"(
+ SELECT Key, Value2 FROM TwoShard WHERE Value2 > 0;
+ )").ExtractValueSync();
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+
+ ui64 count = 0;
+ for (;;) {
+ auto streamPart = it.ReadNext().GetValueSync();
+ if (!streamPart.IsSuccess()) {
+ UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString());
+ break;
+ }
+
+ if (streamPart.HasResultSet()) {
+ auto resultSet = streamPart.ExtractResultSet();
+ count += resultSet.RowsCount();
+ }
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(count, 2);
+ }
+
+ Y_UNIT_TEST(ExecuteQuery) {
+ auto kikimr = DefaultKikimrRunner();
+ auto db = kikimr.GetQueryClient();
+
+ auto result = db.ExecuteQuery(R"(
+ SELECT Key, Value2 FROM TwoShard WHERE Value2 > 0;
+ )").ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ CompareYson(R"([
+ [[3u];[1]];
+ [[4000000003u];[1]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ }
}
} // namespace NKqp
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index a675b231400..622a4ab2bc0 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -36,6 +36,8 @@ enum EQueryType {
QUERY_TYPE_SQL_SCAN = 9;
QUERY_TYPE_AST_SCAN = 10;
QUERY_TYPE_SQL_SCRIPT_STREAMING = 11;
+
+ QUERY_TYPE_SQL_QUERY = 12;
};
enum EQueryAction {
diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto
index c049b2cd9ab..854dafe4756 100644
--- a/ydb/core/protos/kqp_physical.proto
+++ b/ydb/core/protos/kqp_physical.proto
@@ -287,6 +287,7 @@ message TKqpPhyTx {
TYPE_COMPUTE = 1;
TYPE_DATA = 2;
TYPE_SCAN = 3;
+ TYPE_GENERIC = 4;
};
EType Type = 1;
@@ -325,6 +326,7 @@ message TKqpPhyQuery {
TYPE_UNSPECIFIED = 0;
TYPE_DATA = 1;
TYPE_SCAN = 2;
+ TYPE_QUERY = 3;
};
EType Type = 1;