summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorva-kuznecov <[email protected]>2022-11-11 14:20:42 +0300
committerva-kuznecov <[email protected]>2022-11-11 14:20:42 +0300
commit8477c89551778cd9c8d3b19cb71af18423feb2c2 (patch)
treeacacf933c9aaff54164fe9b7feeebaadb39e9378
parentd14187ebf3fffb8dcfd9405ba04152bb15c4d3ff (diff)
Execute QUERY_TYPE_AST_* in SessionActor
-rw-r--r--ydb/core/kqp/host/kqp_host.cpp7
-rw-r--r--ydb/core/kqp/host/kqp_host.h1
-rw-r--r--ydb/core/kqp/kqp.h31
-rw-r--r--ydb/core/kqp/kqp_compile_actor.cpp8
-rw-r--r--ydb/core/kqp/kqp_session_actor.cpp33
-rw-r--r--ydb/core/kqp/kqp_worker_actor.cpp2
6 files changed, 60 insertions, 22 deletions
diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp
index a225b548aa6..654b3d371aa 100644
--- a/ydb/core/kqp/host/kqp_host.cpp
+++ b/ydb/core/kqp/host/kqp_host.cpp
@@ -1231,6 +1231,13 @@ public:
});
}
+ IAsyncQueryResultPtr PrepareDataQueryAst(const TString& query, const TPrepareSettings& settings) override {
+ return CheckedProcessQuery(*ExprCtx,
+ [this, &query, settings] (TExprContext& ctx) mutable {
+ return PrepareDataQueryAstInternal(query, settings, ctx);
+ });
+ }
+
TQueryResult SyncPrepareDataQuery(const TString& query, const TPrepareSettings& settings) override {
return CheckedSyncProcessQuery(
[this, &query, settings] () mutable {
diff --git a/ydb/core/kqp/host/kqp_host.h b/ydb/core/kqp/host/kqp_host.h
index c726390327b..beec6b20900 100644
--- a/ydb/core/kqp/host/kqp_host.h
+++ b/ydb/core/kqp/host/kqp_host.h
@@ -70,6 +70,7 @@ public:
virtual TQueryResult SyncExplainDataQuery(const TString& query, bool isSql) = 0;
virtual IAsyncQueryResultPtr PrepareDataQuery(const TString& query, const TPrepareSettings& settings) = 0;
+ virtual IAsyncQueryResultPtr PrepareDataQueryAst(const TString& query, const TPrepareSettings& settings) = 0;
virtual TQueryResult SyncPrepareDataQuery(const TString& query, const TPrepareSettings& settings) = 0;
virtual IAsyncQueryResultPtr ExecuteDataQuery(const TString& txId, std::shared_ptr<const NKikimrKqp::TPreparedQuery>& query,
diff --git a/ydb/core/kqp/kqp.h b/ydb/core/kqp/kqp.h
index ce21aba33cd..a0d7f6f3de4 100644
--- a/ydb/core/kqp/kqp.h
+++ b/ydb/core/kqp/kqp.h
@@ -150,15 +150,34 @@ struct TKqpQueryId {
TString UserSid;
TString Text;
TKqpQuerySettings Settings;
- bool Scan;
+ NKikimrKqp::EQueryType QueryType;
public:
- TKqpQueryId(const TString& cluster, const TString& database, const TString& text, bool scan)
+ TKqpQueryId(const TString& cluster, const TString& database, const TString& text, NKikimrKqp::EQueryType type)
: Cluster(cluster)
, Database(database)
, Text(text)
- , Scan(scan)
- {}
+ , QueryType(type)
+ {
+ switch (QueryType) {
+ case NKikimrKqp::QUERY_TYPE_SQL_DML:
+ case NKikimrKqp::QUERY_TYPE_SQL_SCAN:
+ case NKikimrKqp::QUERY_TYPE_AST_DML:
+ case NKikimrKqp::QUERY_TYPE_AST_SCAN:
+ break;
+ default:
+ Y_ENSURE(false, "Unsupported request type");
+ }
+
+ }
+
+ bool IsScan() const {
+ return QueryType == NKikimrKqp::QUERY_TYPE_SQL_SCAN || QueryType == NKikimrKqp::QUERY_TYPE_AST_SCAN;
+ }
+
+ bool IsSql() const {
+ return QueryType == NKikimrKqp::QUERY_TYPE_SQL_DML || QueryType == NKikimrKqp::QUERY_TYPE_SQL_SCAN;
+ }
bool operator==(const TKqpQueryId& other) const {
return
@@ -167,7 +186,7 @@ public:
UserSid == other.UserSid &&
Text == other.Text &&
Settings == other.Settings &&
- Scan == other.Scan;
+ QueryType == other.QueryType;
}
bool operator!=(const TKqpQueryId& other) {
@@ -180,7 +199,7 @@ public:
bool operator>=(const TKqpQueryId&) = delete;
size_t GetHash() const noexcept {
- auto tuple = std::make_tuple(Cluster, Database, UserSid, Text, Settings, Scan);
+ auto tuple = std::make_tuple(Cluster, Database, UserSid, Text, Settings, QueryType);
return THash<decltype(tuple)>()(tuple);
}
};
diff --git a/ydb/core/kqp/kqp_compile_actor.cpp b/ydb/core/kqp/kqp_compile_actor.cpp
index a83b0f60e31..fcc2617bc01 100644
--- a/ydb/core/kqp/kqp_compile_actor.cpp
+++ b/ydb/core/kqp/kqp_compile_actor.cpp
@@ -108,9 +108,11 @@ public:
NCpuTime::TCpuTimer timer(CompileCpuTime);
- AsyncCompileResult = Query.Scan
- ? KqpHost->PrepareScanQuery(Query.Text, true, prepareSettings)
- : KqpHost->PrepareDataQuery(Query.Text, prepareSettings);
+ AsyncCompileResult = Query.IsScan()
+ ? KqpHost->PrepareScanQuery(Query.Text, Query.IsSql(), prepareSettings)
+ : Query.IsSql()
+ ? KqpHost->PrepareDataQuery(Query.Text, prepareSettings)
+ : KqpHost->PrepareDataQueryAst(Query.Text, prepareSettings);
Continue(ctx);
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp
index 76cddec679b..a9ecc2a4982 100644
--- a/ydb/core/kqp/kqp_session_actor.cpp
+++ b/ydb/core/kqp/kqp_session_actor.cpp
@@ -313,15 +313,15 @@ public:
case NKikimrKqp::QUERY_TYPE_SQL_DML:
case NKikimrKqp::QUERY_TYPE_PREPARED_DML:
case NKikimrKqp::QUERY_TYPE_SQL_SCAN:
+ case NKikimrKqp::QUERY_TYPE_AST_SCAN:
+ case NKikimrKqp::QUERY_TYPE_AST_DML:
return true;
// should not be compiled. TODO: forward to request executer
// not supported yet
case NKikimrKqp::QUERY_TYPE_SQL_DDL:
- case NKikimrKqp::QUERY_TYPE_AST_SCAN:
case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT:
case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING:
- case NKikimrKqp::QUERY_TYPE_AST_DML:
case NKikimrKqp::QUERY_TYPE_UNDEFINED:
return false;
}
@@ -519,16 +519,15 @@ public:
TMaybe<TString> uid;
bool keepInCache = false;
- bool scan = queryRequest.GetType() == NKikimrKqp::QUERY_TYPE_SQL_SCAN;
switch (queryRequest.GetAction()) {
case NKikimrKqp::QUERY_ACTION_EXECUTE:
- query = TKqpQueryId(Settings.Cluster, Settings.Database, queryRequest.GetQuery(), scan);
- keepInCache = queryRequest.GetQueryCachePolicy().keep_in_cache();
+ query = TKqpQueryId(Settings.Cluster, Settings.Database, queryRequest.GetQuery(), queryRequest.GetType());
+ keepInCache = queryRequest.GetQueryCachePolicy().keep_in_cache() && query->IsSql();
break;
case NKikimrKqp::QUERY_ACTION_PREPARE:
- query = TKqpQueryId(Settings.Cluster, Settings.Database, queryRequest.GetQuery(), scan);
- keepInCache = true;
+ query = TKqpQueryId(Settings.Cluster, Settings.Database, queryRequest.GetQuery(), queryRequest.GetType());
+ keepInCache = query->IsSql();
break;
case NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED:
@@ -603,7 +602,9 @@ public:
QueryState->TxCtx->OnBeginQuery();
- if (queryRequest.GetType() == NKikimrKqp::QUERY_TYPE_SQL_SCAN) {
+ if (queryRequest.GetType() == NKikimrKqp::QUERY_TYPE_SQL_SCAN
+ || queryRequest.GetType() == NKikimrKqp::QUERY_TYPE_AST_SCAN
+ ) {
AcquirePersistentSnapshot();
return;
} else if (NeedSnapshot(*QueryState->TxCtx, *Config, /*rollback*/ false, QueryState->Commit,
@@ -782,12 +783,15 @@ public:
auto action = queryRequest.GetAction();
auto type = queryRequest.GetType();
- if (action == NKikimrKqp::QUERY_ACTION_EXECUTE && type == NKikimrKqp::QUERY_TYPE_SQL_DML) {
+ if (action == NKikimrKqp::QUERY_ACTION_EXECUTE && type == NKikimrKqp::QUERY_TYPE_SQL_DML
+ || action == NKikimrKqp::QUERY_ACTION_EXECUTE && type == NKikimrKqp::QUERY_TYPE_AST_DML)
+ {
type = NKikimrKqp::QUERY_TYPE_PREPARED_DML;
action = NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED;
}
YQL_ENSURE(action == NKikimrKqp::QUERY_ACTION_EXECUTE && type == NKikimrKqp::QUERY_TYPE_SQL_SCAN
+ || action == NKikimrKqp::QUERY_ACTION_EXECUTE && type == NKikimrKqp::QUERY_TYPE_AST_SCAN
|| action == NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED && type == NKikimrKqp::QUERY_TYPE_PREPARED_DML,
"Unexpected query action: " << action << " and type: " << type);
@@ -1234,9 +1238,12 @@ public:
QueryState->Orbit = std::move(ev->Get()->Orbit);
auto* response = ev->Get()->Record.MutableResponse();
+
LOG_D("TEvTxResponse, CurrentTx: " << QueryState->CurrentTx
<< "/" << (QueryState->PreparedQuery ? QueryState->PreparedQuery->GetPhysicalQuery().TransactionsSize() : 0)
- << " response.status: " << response->GetStatus() << " results.size: " << response->GetResult().ResultsSize());
+ << " response.status: " << response->GetStatus()
+ << " results.size: " << (response->HasResult() ? std::to_string(response->GetResult().ResultsSize()) : " <no result>"));
+
ExecuterId = TActorId{};
if (response->GetStatus() != Ydb::StatusIds::SUCCESS) {
@@ -1516,7 +1523,11 @@ public:
bool useYdbResponseFormat = QueryState->Request.GetUsePublicResponseDataFormat();
- if (QueryState->PreparedQuery) {
+ // Result for scan query is sent directly to target actor.
+ bool isScanQuery = QueryState->Request.GetType() == NKikimrKqp::QUERY_TYPE_AST_SCAN
+ || QueryState->Request.GetType() == NKikimrKqp::QUERY_TYPE_SQL_SCAN;
+
+ if (QueryState->PreparedQuery && !isScanQuery) {
auto& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
for (size_t i = 0; i < phyQuery.ResultBindingsSize(); ++i) {
auto& rb = phyQuery.GetResultBindings(i);
diff --git a/ydb/core/kqp/kqp_worker_actor.cpp b/ydb/core/kqp/kqp_worker_actor.cpp
index 6855e2d80e8..3a489054f32 100644
--- a/ydb/core/kqp/kqp_worker_actor.cpp
+++ b/ydb/core/kqp/kqp_worker_actor.cpp
@@ -293,9 +293,7 @@ public:
if (queryRequest.GetAction() == NKikimrKqp::QUERY_ACTION_EXECUTE) {
switch (queryRequest.GetType()) {
case NKikimrKqp::QUERY_TYPE_SQL_DDL:
- case NKikimrKqp::QUERY_TYPE_AST_DML:
case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT:
- case NKikimrKqp::QUERY_TYPE_AST_SCAN:
case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING:
break;