diff options
author | va-kuznecov <[email protected]> | 2022-11-11 14:20:42 +0300 |
---|---|---|
committer | va-kuznecov <[email protected]> | 2022-11-11 14:20:42 +0300 |
commit | 8477c89551778cd9c8d3b19cb71af18423feb2c2 (patch) | |
tree | acacf933c9aaff54164fe9b7feeebaadb39e9378 | |
parent | d14187ebf3fffb8dcfd9405ba04152bb15c4d3ff (diff) |
Execute QUERY_TYPE_AST_* in SessionActor
-rw-r--r-- | ydb/core/kqp/host/kqp_host.cpp | 7 | ||||
-rw-r--r-- | ydb/core/kqp/host/kqp_host.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/kqp.h | 31 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_compile_actor.cpp | 8 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_session_actor.cpp | 33 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_worker_actor.cpp | 2 |
6 files changed, 60 insertions, 22 deletions
diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index a225b548aa6..654b3d371aa 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -1231,6 +1231,13 @@ public: }); } + IAsyncQueryResultPtr PrepareDataQueryAst(const TString& query, const TPrepareSettings& settings) override { + return CheckedProcessQuery(*ExprCtx, + [this, &query, settings] (TExprContext& ctx) mutable { + return PrepareDataQueryAstInternal(query, settings, ctx); + }); + } + TQueryResult SyncPrepareDataQuery(const TString& query, const TPrepareSettings& settings) override { return CheckedSyncProcessQuery( [this, &query, settings] () mutable { diff --git a/ydb/core/kqp/host/kqp_host.h b/ydb/core/kqp/host/kqp_host.h index c726390327b..beec6b20900 100644 --- a/ydb/core/kqp/host/kqp_host.h +++ b/ydb/core/kqp/host/kqp_host.h @@ -70,6 +70,7 @@ public: virtual TQueryResult SyncExplainDataQuery(const TString& query, bool isSql) = 0; virtual IAsyncQueryResultPtr PrepareDataQuery(const TString& query, const TPrepareSettings& settings) = 0; + virtual IAsyncQueryResultPtr PrepareDataQueryAst(const TString& query, const TPrepareSettings& settings) = 0; virtual TQueryResult SyncPrepareDataQuery(const TString& query, const TPrepareSettings& settings) = 0; virtual IAsyncQueryResultPtr ExecuteDataQuery(const TString& txId, std::shared_ptr<const NKikimrKqp::TPreparedQuery>& query, diff --git a/ydb/core/kqp/kqp.h b/ydb/core/kqp/kqp.h index ce21aba33cd..a0d7f6f3de4 100644 --- a/ydb/core/kqp/kqp.h +++ b/ydb/core/kqp/kqp.h @@ -150,15 +150,34 @@ struct TKqpQueryId { TString UserSid; TString Text; TKqpQuerySettings Settings; - bool Scan; + NKikimrKqp::EQueryType QueryType; public: - TKqpQueryId(const TString& cluster, const TString& database, const TString& text, bool scan) + TKqpQueryId(const TString& cluster, const TString& database, const TString& text, NKikimrKqp::EQueryType type) : Cluster(cluster) , Database(database) , Text(text) - , Scan(scan) - {} + , QueryType(type) + { + switch (QueryType) { + case NKikimrKqp::QUERY_TYPE_SQL_DML: + case NKikimrKqp::QUERY_TYPE_SQL_SCAN: + case NKikimrKqp::QUERY_TYPE_AST_DML: + case NKikimrKqp::QUERY_TYPE_AST_SCAN: + break; + default: + Y_ENSURE(false, "Unsupported request type"); + } + + } + + bool IsScan() const { + return QueryType == NKikimrKqp::QUERY_TYPE_SQL_SCAN || QueryType == NKikimrKqp::QUERY_TYPE_AST_SCAN; + } + + bool IsSql() const { + return QueryType == NKikimrKqp::QUERY_TYPE_SQL_DML || QueryType == NKikimrKqp::QUERY_TYPE_SQL_SCAN; + } bool operator==(const TKqpQueryId& other) const { return @@ -167,7 +186,7 @@ public: UserSid == other.UserSid && Text == other.Text && Settings == other.Settings && - Scan == other.Scan; + QueryType == other.QueryType; } bool operator!=(const TKqpQueryId& other) { @@ -180,7 +199,7 @@ public: bool operator>=(const TKqpQueryId&) = delete; size_t GetHash() const noexcept { - auto tuple = std::make_tuple(Cluster, Database, UserSid, Text, Settings, Scan); + auto tuple = std::make_tuple(Cluster, Database, UserSid, Text, Settings, QueryType); return THash<decltype(tuple)>()(tuple); } }; diff --git a/ydb/core/kqp/kqp_compile_actor.cpp b/ydb/core/kqp/kqp_compile_actor.cpp index a83b0f60e31..fcc2617bc01 100644 --- a/ydb/core/kqp/kqp_compile_actor.cpp +++ b/ydb/core/kqp/kqp_compile_actor.cpp @@ -108,9 +108,11 @@ public: NCpuTime::TCpuTimer timer(CompileCpuTime); - AsyncCompileResult = Query.Scan - ? KqpHost->PrepareScanQuery(Query.Text, true, prepareSettings) - : KqpHost->PrepareDataQuery(Query.Text, prepareSettings); + AsyncCompileResult = Query.IsScan() + ? KqpHost->PrepareScanQuery(Query.Text, Query.IsSql(), prepareSettings) + : Query.IsSql() + ? KqpHost->PrepareDataQuery(Query.Text, prepareSettings) + : KqpHost->PrepareDataQueryAst(Query.Text, prepareSettings); Continue(ctx); diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp index 76cddec679b..a9ecc2a4982 100644 --- a/ydb/core/kqp/kqp_session_actor.cpp +++ b/ydb/core/kqp/kqp_session_actor.cpp @@ -313,15 +313,15 @@ public: case NKikimrKqp::QUERY_TYPE_SQL_DML: case NKikimrKqp::QUERY_TYPE_PREPARED_DML: case NKikimrKqp::QUERY_TYPE_SQL_SCAN: + case NKikimrKqp::QUERY_TYPE_AST_SCAN: + case NKikimrKqp::QUERY_TYPE_AST_DML: return true; // should not be compiled. TODO: forward to request executer // not supported yet case NKikimrKqp::QUERY_TYPE_SQL_DDL: - case NKikimrKqp::QUERY_TYPE_AST_SCAN: case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT: case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING: - case NKikimrKqp::QUERY_TYPE_AST_DML: case NKikimrKqp::QUERY_TYPE_UNDEFINED: return false; } @@ -519,16 +519,15 @@ public: TMaybe<TString> uid; bool keepInCache = false; - bool scan = queryRequest.GetType() == NKikimrKqp::QUERY_TYPE_SQL_SCAN; switch (queryRequest.GetAction()) { case NKikimrKqp::QUERY_ACTION_EXECUTE: - query = TKqpQueryId(Settings.Cluster, Settings.Database, queryRequest.GetQuery(), scan); - keepInCache = queryRequest.GetQueryCachePolicy().keep_in_cache(); + query = TKqpQueryId(Settings.Cluster, Settings.Database, queryRequest.GetQuery(), queryRequest.GetType()); + keepInCache = queryRequest.GetQueryCachePolicy().keep_in_cache() && query->IsSql(); break; case NKikimrKqp::QUERY_ACTION_PREPARE: - query = TKqpQueryId(Settings.Cluster, Settings.Database, queryRequest.GetQuery(), scan); - keepInCache = true; + query = TKqpQueryId(Settings.Cluster, Settings.Database, queryRequest.GetQuery(), queryRequest.GetType()); + keepInCache = query->IsSql(); break; case NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED: @@ -603,7 +602,9 @@ public: QueryState->TxCtx->OnBeginQuery(); - if (queryRequest.GetType() == NKikimrKqp::QUERY_TYPE_SQL_SCAN) { + if (queryRequest.GetType() == NKikimrKqp::QUERY_TYPE_SQL_SCAN + || queryRequest.GetType() == NKikimrKqp::QUERY_TYPE_AST_SCAN + ) { AcquirePersistentSnapshot(); return; } else if (NeedSnapshot(*QueryState->TxCtx, *Config, /*rollback*/ false, QueryState->Commit, @@ -782,12 +783,15 @@ public: auto action = queryRequest.GetAction(); auto type = queryRequest.GetType(); - if (action == NKikimrKqp::QUERY_ACTION_EXECUTE && type == NKikimrKqp::QUERY_TYPE_SQL_DML) { + if (action == NKikimrKqp::QUERY_ACTION_EXECUTE && type == NKikimrKqp::QUERY_TYPE_SQL_DML + || action == NKikimrKqp::QUERY_ACTION_EXECUTE && type == NKikimrKqp::QUERY_TYPE_AST_DML) + { type = NKikimrKqp::QUERY_TYPE_PREPARED_DML; action = NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED; } YQL_ENSURE(action == NKikimrKqp::QUERY_ACTION_EXECUTE && type == NKikimrKqp::QUERY_TYPE_SQL_SCAN + || action == NKikimrKqp::QUERY_ACTION_EXECUTE && type == NKikimrKqp::QUERY_TYPE_AST_SCAN || action == NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED && type == NKikimrKqp::QUERY_TYPE_PREPARED_DML, "Unexpected query action: " << action << " and type: " << type); @@ -1234,9 +1238,12 @@ public: QueryState->Orbit = std::move(ev->Get()->Orbit); auto* response = ev->Get()->Record.MutableResponse(); + LOG_D("TEvTxResponse, CurrentTx: " << QueryState->CurrentTx << "/" << (QueryState->PreparedQuery ? QueryState->PreparedQuery->GetPhysicalQuery().TransactionsSize() : 0) - << " response.status: " << response->GetStatus() << " results.size: " << response->GetResult().ResultsSize()); + << " response.status: " << response->GetStatus() + << " results.size: " << (response->HasResult() ? std::to_string(response->GetResult().ResultsSize()) : " <no result>")); + ExecuterId = TActorId{}; if (response->GetStatus() != Ydb::StatusIds::SUCCESS) { @@ -1516,7 +1523,11 @@ public: bool useYdbResponseFormat = QueryState->Request.GetUsePublicResponseDataFormat(); - if (QueryState->PreparedQuery) { + // Result for scan query is sent directly to target actor. + bool isScanQuery = QueryState->Request.GetType() == NKikimrKqp::QUERY_TYPE_AST_SCAN + || QueryState->Request.GetType() == NKikimrKqp::QUERY_TYPE_SQL_SCAN; + + if (QueryState->PreparedQuery && !isScanQuery) { auto& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery(); for (size_t i = 0; i < phyQuery.ResultBindingsSize(); ++i) { auto& rb = phyQuery.GetResultBindings(i); diff --git a/ydb/core/kqp/kqp_worker_actor.cpp b/ydb/core/kqp/kqp_worker_actor.cpp index 6855e2d80e8..3a489054f32 100644 --- a/ydb/core/kqp/kqp_worker_actor.cpp +++ b/ydb/core/kqp/kqp_worker_actor.cpp @@ -293,9 +293,7 @@ public: if (queryRequest.GetAction() == NKikimrKqp::QUERY_ACTION_EXECUTE) { switch (queryRequest.GetType()) { case NKikimrKqp::QUERY_TYPE_SQL_DDL: - case NKikimrKqp::QUERY_TYPE_AST_DML: case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT: - case NKikimrKqp::QUERY_TYPE_AST_SCAN: case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING: break; |