aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorspuchin <spuchin@ydb.tech>2023-05-19 19:39:07 +0300
committerspuchin <spuchin@ydb.tech>2023-05-19 19:39:07 +0300
commit47e31d4cfeab1251302eaae74eb9bba78ac7386d (patch)
treeabb52f1ad10d5c3d079662d9df4ffd888de79f64
parent18b646ad279f03049dbdccd87931fca890bc9437 (diff)
downloadydb-47e31d4cfeab1251302eaae74eb9bba78ac7386d.tar.gz
Add tx_control for generic query API. ()
-rw-r--r--ydb/core/grpc_services/query/query_helpers.h89
-rw-r--r--ydb/core/grpc_services/query/rpc_execute_query.cpp104
-rw-r--r--ydb/core/grpc_services/query/rpc_execute_script.cpp55
-rw-r--r--ydb/core/kqp/common/kqp_timeouts.cpp4
-rw-r--r--ydb/core/kqp/common/simple/helpers.cpp4
-rw-r--r--ydb/core/kqp/common/simple/query_id.cpp4
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_actor.cpp4
-rw-r--r--ydb/core/kqp/counters/kqp_counters.cpp8
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp2
-rw-r--r--ydb/core/kqp/session_actor/kqp_query_state.h4
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp12
-rw-r--r--ydb/core/kqp/ut/service/kqp_query_service_ut.cpp14
-rw-r--r--ydb/core/protos/kqp.proto4
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp20
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/client.h5
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp41
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.h7
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/tx.cpp1
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/tx.h97
23 files changed, 343 insertions, 140 deletions
diff --git a/ydb/core/grpc_services/query/query_helpers.h b/ydb/core/grpc_services/query/query_helpers.h
deleted file mode 100644
index 630c2c65e1a..00000000000
--- a/ydb/core/grpc_services/query/query_helpers.h
+++ /dev/null
@@ -1,89 +0,0 @@
-#pragma once
-#include <ydb/core/base/kikimr_issue.h>
-#include <ydb/core/grpc_services/rpc_kqp_base.h>
-#include <ydb/public/api/protos/draft/ydb_query.pb.h>
-
-#include <utility>
-
-namespace NKikimr::NGRpcService {
-
-namespace NQueryHelpersPrivate {
-
-inline bool FillQueryContent(const Ydb::Query::ExecuteQueryRequest& req, NKikimrKqp::TEvQueryRequest& kqpRequest, NYql::TIssues& issues) {
- switch (req.query_case()) {
- case Ydb::Query::ExecuteQueryRequest::kQueryContent:
- if (!CheckQuery(req.query_content().text(), issues)) {
- return false;
- }
-
- kqpRequest.MutableRequest()->SetQuery(req.query_content().text());
- return true;
-
- default:
- issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Unexpected query option"));
- return false;
- }
-}
-
-inline bool FillQueryContent(const Ydb::Query::ExecuteScriptRequest& req, NKikimrKqp::TEvQueryRequest& kqpRequest, NYql::TIssues& issues) {
- switch (req.script_case()) {
- case Ydb::Query::ExecuteScriptRequest::kScriptContent:
- if (!CheckQuery(req.script_content().text(), issues)) {
- return false;
- }
-
- kqpRequest.MutableRequest()->SetQuery(req.script_content().text());
- return true;
-
- case Ydb::Query::ExecuteScriptRequest::kScriptId:
- issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Execution by script id is not supported yet"));
- return false;
-
- default:
- issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Unexpected query option"));
- return false;
- }
-}
-
-inline NKikimrKqp::EQueryType GetQueryType(const Ydb::Query::ExecuteQueryRequest&) {
- return NKikimrKqp::QUERY_TYPE_SQL_QUERY;
-}
-
-inline NKikimrKqp::EQueryType GetQueryType(const Ydb::Query::ExecuteScriptRequest&) {
- return NKikimrKqp::QUERY_TYPE_FEDERATED_QUERY;
-}
-
-} // namespace NQueryHelpersPrivate
-
-template <class TRequestProto>
-std::tuple<Ydb::StatusIds::StatusCode, NYql::TIssues> FillKqpRequest(
- const TRequestProto& req, NKikimrKqp::TEvQueryRequest& kqpRequest)
-{
- kqpRequest.MutableRequest()->MutableYdbParameters()->insert(req.parameters().begin(), req.parameters().end());
- switch (req.exec_mode()) {
- case Ydb::Query::EXEC_MODE_EXECUTE:
- kqpRequest.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
- break;
- default: {
- NYql::TIssues issues;
- issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Unexpected query mode"));
- return {Ydb::StatusIds::BAD_REQUEST, issues};
- }
- }
-
- kqpRequest.MutableRequest()->SetType(NQueryHelpersPrivate::GetQueryType(req));
- kqpRequest.MutableRequest()->SetKeepSession(false);
-
- // TODO: Use tx control from request.
- kqpRequest.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
- kqpRequest.MutableRequest()->MutableTxControl()->set_commit_tx(true);
-
- NYql::TIssues issues;
- if (!NQueryHelpersPrivate::FillQueryContent(req, kqpRequest, issues)) {
- return {Ydb::StatusIds::BAD_REQUEST, issues};
- }
-
- return {Ydb::StatusIds::SUCCESS, {}};
-}
-
-} // namespace NKikimr::NGRpcService
diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp
index 0bc431d13ca..a50eb52f262 100644
--- a/ydb/core/grpc_services/query/rpc_execute_query.cpp
+++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp
@@ -1,5 +1,4 @@
#include "service_query.h"
-#include "query_helpers.h"
#include <ydb/core/actorlib_impl/long_timer.h>
#include <ydb/core/base/appdata.h>
@@ -66,6 +65,105 @@ private:
ui64 TotalResponsesSize_ = 0;
};
+bool FillQueryContent(const Ydb::Query::ExecuteQueryRequest& req, NKikimrKqp::TEvQueryRequest& kqpRequest,
+ NYql::TIssues& issues)
+{
+ switch (req.query_case()) {
+ case Ydb::Query::ExecuteQueryRequest::kQueryContent:
+ if (!CheckQuery(req.query_content().text(), issues)) {
+ return false;
+ }
+
+ kqpRequest.MutableRequest()->SetQuery(req.query_content().text());
+ return true;
+
+ default:
+ issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Unexpected query option"));
+ return false;
+ }
+}
+
+bool FillTxSettings(const Ydb::Query::TransactionSettings& from, Ydb::Table::TransactionSettings& to,
+ NYql::TIssues& issues)
+{
+ switch (from.tx_mode_case()) {
+ case Ydb::Query::TransactionSettings::kSerializableReadWrite:
+ to.mutable_serializable_read_write();
+ break;
+ case Ydb::Query::TransactionSettings::kOnlineReadOnly:
+ to.mutable_online_read_only()->set_allow_inconsistent_reads(
+ from.online_read_only().allow_inconsistent_reads());
+ break;
+ case Ydb::Query::TransactionSettings::kStaleReadOnly:
+ to.mutable_stale_read_only();
+ break;
+ case Ydb::Query::TransactionSettings::kSnapshotReadOnly:
+ to.mutable_snapshot_read_only();
+ break;
+ default:
+ issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR,
+ "Invalid tx_settings"));
+ return false;
+ }
+
+ return true;
+}
+
+bool FillTxControl(const Ydb::Query::TransactionControl& from, Ydb::Table::TransactionControl& to,
+ NYql::TIssues& issues)
+{
+ switch (from.tx_selector_case()) {
+ case Ydb::Query::TransactionControl::kTxId:
+ to.set_tx_id(from.tx_id());
+ break;
+ case Ydb::Query::TransactionControl::kBeginTx:
+ if (!FillTxSettings(from.begin_tx(), *to.mutable_begin_tx(), issues)) {
+ return false;
+ }
+ break;
+ default:
+ issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR,
+ "Invalid tx_control settings"));
+ return false;
+ }
+
+ to.set_commit_tx(from.commit_tx());
+ return true;
+}
+
+std::tuple<Ydb::StatusIds::StatusCode, NYql::TIssues> FillKqpRequest(
+ const Ydb::Query::ExecuteQueryRequest& req, NKikimrKqp::TEvQueryRequest& kqpRequest)
+{
+ kqpRequest.MutableRequest()->MutableYdbParameters()->insert(req.parameters().begin(), req.parameters().end());
+ switch (req.exec_mode()) {
+ case Ydb::Query::EXEC_MODE_EXECUTE:
+ kqpRequest.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
+ break;
+ default: {
+ NYql::TIssues issues;
+ issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Unexpected query mode"));
+ return {Ydb::StatusIds::BAD_REQUEST, std::move(issues)};
+ }
+ }
+
+ kqpRequest.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY);
+ kqpRequest.MutableRequest()->SetKeepSession(false);
+
+ if (req.has_tx_control()) {
+ NYql::TIssues issues;
+ if (!FillTxControl(req.tx_control(), *kqpRequest.MutableRequest()->MutableTxControl(), issues)) {
+ return {Ydb::StatusIds::BAD_REQUEST, std::move(issues)};
+ }
+ }
+
+ NYql::TIssues issues;
+ if (!FillQueryContent(req, kqpRequest, issues)) {
+ return {Ydb::StatusIds::BAD_REQUEST, std::move(issues)};
+ }
+
+ return {Ydb::StatusIds::SUCCESS, {}};
+}
+
class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
@@ -126,13 +224,13 @@ private:
auto [fillStatus, fillIssues] = FillKqpRequest(*req, ev->Record);
if (fillStatus != Ydb::StatusIds::SUCCESS) {
- return ReplyFinishStream(fillStatus, fillIssues);
+ return ReplyFinishStream(fillStatus, std::move(fillIssues));
}
if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release())) {
NYql::TIssues issues;
issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Internal error"));
- ReplyFinishStream(Ydb::StatusIds::INTERNAL_ERROR, issues);
+ ReplyFinishStream(Ydb::StatusIds::INTERNAL_ERROR, std::move(issues));
}
}
diff --git a/ydb/core/grpc_services/query/rpc_execute_script.cpp b/ydb/core/grpc_services/query/rpc_execute_script.cpp
index 86c45645d0a..9d35a44307e 100644
--- a/ydb/core/grpc_services/query/rpc_execute_script.cpp
+++ b/ydb/core/grpc_services/query/rpc_execute_script.cpp
@@ -1,5 +1,4 @@
#include "service_query.h"
-#include "query_helpers.h"
#include <ydb/core/base/appdata.h>
#include <ydb/core/base/kikimr_issue.h>
@@ -21,6 +20,60 @@ using namespace NActors;
using TEvExecuteScriptRequest = TGrpcRequestNoOperationCall<Ydb::Query::ExecuteScriptRequest,
Ydb::Operations::Operation>;
+bool FillQueryContent(const Ydb::Query::ExecuteScriptRequest& req, NKikimrKqp::TEvQueryRequest& kqpRequest,
+ NYql::TIssues& issues)
+{
+ switch (req.script_case()) {
+ case Ydb::Query::ExecuteScriptRequest::kScriptContent:
+ if (!CheckQuery(req.script_content().text(), issues)) {
+ return false;
+ }
+
+ kqpRequest.MutableRequest()->SetQuery(req.script_content().text());
+ return true;
+
+ case Ydb::Query::ExecuteScriptRequest::kScriptId:
+ issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR,
+ "Execution by script id is not supported yet"));
+ return false;
+
+ default:
+ issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR,
+ "Unexpected query option"));
+ return false;
+ }
+}
+
+std::tuple<Ydb::StatusIds::StatusCode, NYql::TIssues> FillKqpRequest(
+ const Ydb::Query::ExecuteScriptRequest& req, NKikimrKqp::TEvQueryRequest& kqpRequest)
+{
+ kqpRequest.MutableRequest()->MutableYdbParameters()->insert(req.parameters().begin(), req.parameters().end());
+ switch (req.exec_mode()) {
+ case Ydb::Query::EXEC_MODE_EXECUTE:
+ kqpRequest.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
+ break;
+ default: {
+ NYql::TIssues issues;
+ issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Unexpected query mode"));
+ return {Ydb::StatusIds::BAD_REQUEST, std::move(issues)};
+ }
+ }
+
+ kqpRequest.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT);
+ kqpRequest.MutableRequest()->SetKeepSession(false);
+
+ // TODO: Avoid explicit tx_control for script queries.
+ kqpRequest.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
+ kqpRequest.MutableRequest()->MutableTxControl()->set_commit_tx(true);
+
+ NYql::TIssues issues;
+ if (!FillQueryContent(req, kqpRequest, issues)) {
+ return {Ydb::StatusIds::BAD_REQUEST, std::move(issues)};
+ }
+
+ return {Ydb::StatusIds::SUCCESS, {}};
+}
+
class TExecuteScriptRPC : public TActorBootstrapped<TExecuteScriptRPC> {
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
diff --git a/ydb/core/kqp/common/kqp_timeouts.cpp b/ydb/core/kqp/common/kqp_timeouts.cpp
index 819730e684c..d5e55838877 100644
--- a/ydb/core/kqp/common/kqp_timeouts.cpp
+++ b/ydb/core/kqp/common/kqp_timeouts.cpp
@@ -15,8 +15,8 @@ ui64 GetDefaultQueryTimeoutMs(NKikimrKqp::EQueryType queryType, const NKikimrCon
case NKikimrKqp::QUERY_TYPE_SQL_DML:
case NKikimrKqp::QUERY_TYPE_PREPARED_DML:
case NKikimrKqp::QUERY_TYPE_AST_DML:
- case NKikimrKqp::QUERY_TYPE_SQL_QUERY:
- case NKikimrKqp::QUERY_TYPE_FEDERATED_QUERY:
+ case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY:
+ case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT:
return queryLimits.GetDataQueryTimeoutMs();
case NKikimrKqp::QUERY_TYPE_SQL_SCAN:
diff --git a/ydb/core/kqp/common/simple/helpers.cpp b/ydb/core/kqp/common/simple/helpers.cpp
index e9694c52de3..11c06c9a3f1 100644
--- a/ydb/core/kqp/common/simple/helpers.cpp
+++ b/ydb/core/kqp/common/simple/helpers.cpp
@@ -9,8 +9,8 @@ bool IsSqlQuery(const NKikimrKqp::EQueryType& queryType) {
case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT:
case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING:
case NKikimrKqp::QUERY_TYPE_SQL_SCAN:
- case NKikimrKqp::QUERY_TYPE_SQL_QUERY:
- case NKikimrKqp::QUERY_TYPE_FEDERATED_QUERY:
+ case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY:
+ case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT:
return true;
default:
diff --git a/ydb/core/kqp/common/simple/query_id.cpp b/ydb/core/kqp/common/simple/query_id.cpp
index 09c57a7f267..8ec1ace9224 100644
--- a/ydb/core/kqp/common/simple/query_id.cpp
+++ b/ydb/core/kqp/common/simple/query_id.cpp
@@ -23,8 +23,8 @@ TKqpQueryId::TKqpQueryId(const TString& cluster, const TString& database, const
case NKikimrKqp::QUERY_TYPE_SQL_SCAN:
case NKikimrKqp::QUERY_TYPE_AST_DML:
case NKikimrKqp::QUERY_TYPE_AST_SCAN:
- case NKikimrKqp::QUERY_TYPE_SQL_QUERY:
- case NKikimrKqp::QUERY_TYPE_FEDERATED_QUERY:
+ case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY:
+ case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT:
break;
default:
diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
index 34757a637ec..c087db13806 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
@@ -126,11 +126,11 @@ public:
AsyncCompileResult = KqpHost->PrepareScanQuery(QueryRef, QueryId.IsSql(), prepareSettings);
break;
- case NKikimrKqp::QUERY_TYPE_SQL_QUERY:
+ case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY:
AsyncCompileResult = KqpHost->PrepareQuery(QueryRef, prepareSettings);
break;
- case NKikimrKqp::QUERY_TYPE_FEDERATED_QUERY:
+ case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT:
AsyncCompileResult = KqpHost->PrepareFederatedQuery(QueryRef, prepareSettings);
break;
diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp
index e60e2f68ecf..2e5c72f45fe 100644
--- a/ydb/core/kqp/counters/kqp_counters.cpp
+++ b/ydb/core/kqp/counters/kqp_counters.cpp
@@ -102,10 +102,10 @@ void TKqpCountersBase::Init() {
KqpGroup->GetCounter("Request/QueryTypeSqlScan", true);
QueryTypes[NKikimrKqp::EQueryType::QUERY_TYPE_AST_SCAN] =
KqpGroup->GetCounter("Request/QueryTypeAstScan", true);
- QueryTypes[NKikimrKqp::EQueryType::QUERY_TYPE_SQL_QUERY] =
- KqpGroup->GetCounter("Request/QueryTypeQuery", true);
- QueryTypes[NKikimrKqp::EQueryType::QUERY_TYPE_FEDERATED_QUERY] =
- KqpGroup->GetCounter("Request/QueryTypeFederatedQuery", true);
+ QueryTypes[NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_QUERY] =
+ KqpGroup->GetCounter("Request/QueryTypeGenericQuery", true);
+ QueryTypes[NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_SCRIPT] =
+ KqpGroup->GetCounter("Request/QueryTypeGenericScript", true);
OtherQueryTypes = KqpGroup->GetCounter("Requests/QueryTypeOther", true);
QueriesWithRangeScan = KqpGroup->GetCounter("Query/WithRangeScan", true);
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp
index f785a66297a..e4592c1ae22 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp
@@ -443,7 +443,7 @@ Y_UNIT_TEST_SUITE(KqpProxy) {
auto ev = MakeHolder<TEvKqp::TEvScriptRequest>();
auto& req = *ev->Record.MutableRequest();
req.SetQuery("SELECT 42");
- req.SetType(NKikimrKqp::QUERY_TYPE_FEDERATED_QUERY);
+ req.SetType(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT);
req.SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
req.SetDatabase(settings.DomainName);
diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h
index 5ab09057b0b..5f3da545a24 100644
--- a/ydb/core/kqp/session_actor/kqp_query_state.h
+++ b/ydb/core/kqp/session_actor/kqp_query_state.h
@@ -374,8 +374,8 @@ public:
return (
type == NKikimrKqp::QUERY_TYPE_AST_SCAN ||
type == NKikimrKqp::QUERY_TYPE_SQL_SCAN ||
- type == NKikimrKqp::QUERY_TYPE_SQL_QUERY ||
- type == NKikimrKqp::QUERY_TYPE_FEDERATED_QUERY
+ type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY ||
+ type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT
);
}
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 1c99d959823..95e09d350f6 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -271,8 +271,8 @@ public:
case NKikimrKqp::QUERY_TYPE_SQL_SCAN:
case NKikimrKqp::QUERY_TYPE_AST_SCAN:
case NKikimrKqp::QUERY_TYPE_AST_DML:
- case NKikimrKqp::QUERY_TYPE_SQL_QUERY:
- case NKikimrKqp::QUERY_TYPE_FEDERATED_QUERY:
+ case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY:
+ case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT:
return true;
// should not be compiled. TODO: forward to request executer
@@ -648,8 +648,8 @@ public:
YQL_ENSURE(
type == NKikimrKqp::QUERY_TYPE_SQL_SCAN ||
type == NKikimrKqp::QUERY_TYPE_AST_SCAN ||
- type == NKikimrKqp::QUERY_TYPE_SQL_QUERY ||
- type == NKikimrKqp::QUERY_TYPE_FEDERATED_QUERY
+ type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY ||
+ type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT
);
break;
@@ -1140,8 +1140,8 @@ public:
case NKikimrKqp::QUERY_TYPE_SQL_SCAN:
case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT:
case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING:
- case NKikimrKqp::QUERY_TYPE_SQL_QUERY:
- case NKikimrKqp::QUERY_TYPE_FEDERATED_QUERY: {
+ case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY:
+ case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT: {
TString text = QueryState->ExtractQueryText();
if (IsQueryAllowedToLog(text)) {
auto userSID = QueryState->UserToken->GetUserSID();
diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
index c3a4055f142..2111ebdd288 100644
--- a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
@@ -15,7 +15,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
auto it = db.StreamExecuteQuery(R"(
SELECT 1;
- )").ExtractValueSync();
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
ui64 count = 0;
@@ -41,7 +41,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
auto result = db.ExecuteQuery(R"(
SELECT 1;
- )").ExtractValueSync();
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([[1]])", FormatResultSetYson(result.GetResultSet(0)));
@@ -53,7 +53,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
auto it = db.StreamExecuteQuery(R"(
SELECT Key, Value2 FROM TwoShard WHERE Value2 > 0;
- )").ExtractValueSync();
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
ui64 count = 0;
@@ -79,7 +79,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
auto result = db.ExecuteQuery(R"(
SELECT Key, Value2 FROM TwoShard WHERE Value2 > 0 ORDER BY Key;
- )").ExtractValueSync();
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([
@@ -94,7 +94,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
auto result = db.ExecuteQuery(R"(
SELECT COUNT(*) FROM EightShard;
- )").ExtractValueSync();
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([[24u]])", FormatResultSetYson(result.GetResultSet(0)));
@@ -107,7 +107,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
auto result = db.ExecuteQuery(R"(
SELECT * FROM EightShard WHERE Text = "Value2" AND Data = 1 ORDER BY Key;
SELECT * FROM TwoShard WHERE Key < 10 ORDER BY Key;
- )").ExtractValueSync();
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([
@@ -127,7 +127,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
auto result = db.ExecuteQuery(R"(
SELECT COUNT(*) FROM EightShard;
SELECT COUNT(*) FROM TwoShard;
- )").ExtractValueSync();
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([[24u]])", FormatResultSetYson(result.GetResultSet(0)));
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index d3bc30a98b9..2c155c661f3 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -37,8 +37,8 @@ enum EQueryType {
QUERY_TYPE_AST_SCAN = 10;
QUERY_TYPE_SQL_SCRIPT_STREAMING = 11;
- QUERY_TYPE_SQL_QUERY = 12;
- QUERY_TYPE_FEDERATED_QUERY = 13;
+ QUERY_TYPE_SQL_GENERIC_QUERY = 12;
+ QUERY_TYPE_SQL_GENERIC_SCRIPT = 13;
};
enum EQueryAction {
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.darwin-x86_64.txt b/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.darwin-x86_64.txt
index 4aa472691b8..646cde7da97 100644
--- a/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.darwin-x86_64.txt
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.darwin-x86_64.txt
@@ -20,4 +20,5 @@ target_link_libraries(client-draft-ydb_query PUBLIC
target_sources(client-draft-ydb_query PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/query.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/tx.cpp
)
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.linux-aarch64.txt
index 57264527a05..4f212f0f10f 100644
--- a/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.linux-aarch64.txt
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.linux-aarch64.txt
@@ -21,4 +21,5 @@ target_link_libraries(client-draft-ydb_query PUBLIC
target_sources(client-draft-ydb_query PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/query.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/tx.cpp
)
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.linux-x86_64.txt b/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.linux-x86_64.txt
index 57264527a05..4f212f0f10f 100644
--- a/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.linux-x86_64.txt
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.linux-x86_64.txt
@@ -21,4 +21,5 @@ target_link_libraries(client-draft-ydb_query PUBLIC
target_sources(client-draft-ydb_query PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/query.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/tx.cpp
)
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.windows-x86_64.txt b/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.windows-x86_64.txt
index 4aa472691b8..646cde7da97 100644
--- a/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.windows-x86_64.txt
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.windows-x86_64.txt
@@ -20,4 +20,5 @@ target_link_libraries(client-draft-ydb_query PUBLIC
target_sources(client-draft-ydb_query PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/query.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/tx.cpp
)
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp b/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp
index 03189d4f7b4..34f63848404 100644
--- a/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp
@@ -21,12 +21,16 @@ public:
// TODO: Drain sessions.
}
- TAsyncExecuteQueryIterator StreamExecuteQuery(const TString& query, const TExecuteQuerySettings& settings) {
- return TExecQueryImpl::StreamExecuteQuery(Connections_, DbDriverState_, query, settings);
+ TAsyncExecuteQueryIterator StreamExecuteQuery(const TString& query, const TTxControl& txControl,
+ const TExecuteQuerySettings& settings)
+ {
+ return TExecQueryImpl::StreamExecuteQuery(Connections_, DbDriverState_, query, txControl, settings);
}
- TAsyncExecuteQueryResult ExecuteQuery(const TString& query, const TExecuteQuerySettings& settings) {
- return TExecQueryImpl::ExecuteQuery(Connections_, DbDriverState_, query, settings);
+ TAsyncExecuteQueryResult ExecuteQuery(const TString& query, const TTxControl& txControl,
+ const TExecuteQuerySettings& settings)
+ {
+ return TExecQueryImpl::ExecuteQuery(Connections_, DbDriverState_, query, txControl, settings);
}
NThreading::TFuture<TScriptExecutionOperation> ExecuteScript(const TString& script, const TExecuteScriptSettings& settings) {
@@ -126,15 +130,15 @@ TQueryClient::TQueryClient(const TDriver& driver, const TClientSettings& setting
}
TAsyncExecuteQueryResult TQueryClient::ExecuteQuery(const TString& query,
- const TExecuteQuerySettings& settings)
+ const TTxControl& txControl, const TExecuteQuerySettings& settings)
{
- return Impl_->ExecuteQuery(query, settings);
+ return Impl_->ExecuteQuery(query, txControl, settings);
}
TAsyncExecuteQueryIterator TQueryClient::StreamExecuteQuery(const TString& query,
- const TExecuteQuerySettings& settings)
+ const TTxControl& txControl, const TExecuteQuerySettings& settings)
{
- return Impl_->StreamExecuteQuery(query, settings);
+ return Impl_->StreamExecuteQuery(query, txControl, settings);
}
NThreading::TFuture<TScriptExecutionOperation> TQueryClient::ExecuteScript(const TString& script,
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/client.h b/ydb/public/sdk/cpp/client/draft/ydb_query/client.h
index 3b009ef1982..59daab44697 100644
--- a/ydb/public/sdk/cpp/client/draft/ydb_query/client.h
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/client.h
@@ -1,6 +1,7 @@
#pragma once
#include "query.h"
+#include "tx.h"
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
@@ -21,10 +22,10 @@ class TQueryClient {
public:
TQueryClient(const TDriver& driver, const TClientSettings& settings = TClientSettings());
- TAsyncExecuteQueryResult ExecuteQuery(const TString& query,
+ TAsyncExecuteQueryResult ExecuteQuery(const TString& query, const TTxControl& txControl,
const TExecuteQuerySettings& settings = TExecuteQuerySettings());
- TAsyncExecuteQueryIterator StreamExecuteQuery(const TString& query,
+ TAsyncExecuteQueryIterator StreamExecuteQuery(const TString& query, const TTxControl& txControl,
const TExecuteQuerySettings& settings = TExecuteQuerySettings());
NThreading::TFuture<TScriptExecutionOperation> ExecuteScript(const TString& script,
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp
index 202bb329bdd..4c93bbf0a8b 100644
--- a/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp
@@ -11,6 +11,26 @@ namespace NYdb::NQuery {
using namespace NThreading;
+static void SetTxSettings(const TTxSettings& txSettings, Ydb::Query::TransactionSettings* proto) {
+ switch (txSettings.Mode_) {
+ case TTxSettings::TS_SERIALIZABLE_RW:
+ proto->mutable_serializable_read_write();
+ break;
+ case TTxSettings::TS_ONLINE_RO:
+ proto->mutable_online_read_only()->set_allow_inconsistent_reads(
+ txSettings.OnlineSettings_.AllowInconsistentReads_);
+ break;
+ case TTxSettings::TS_STALE_RO:
+ proto->mutable_stale_read_only();
+ break;
+ case TTxSettings::TS_SNAPSHOT_RO:
+ proto->mutable_snapshot_read_only();
+ break;
+ default:
+ throw TContractViolation("Unexpected transaction mode.");
+ }
+}
+
class TExecuteQueryIterator::TReaderImpl {
public:
using TSelf = TExecuteQueryIterator::TReaderImpl;
@@ -147,12 +167,21 @@ struct TExecuteQueryBuffer : public TThrRefBase, TNonCopyable {
TFuture<std::pair<TPlainStatus, TExecuteQueryProcessorPtr>> StreamExecuteQueryImpl(
const std::shared_ptr<TGRpcConnectionsImpl>& connections, const TDbDriverStatePtr& driverState,
- const TString& query, const TExecuteQuerySettings& settings)
+ const TString& query, const TTxControl& txControl, const TExecuteQuerySettings& settings)
{
auto request = MakeRequest<Ydb::Query::ExecuteQueryRequest>();
request.set_exec_mode(Ydb::Query::EXEC_MODE_EXECUTE);
request.mutable_query_content()->set_text(query);
+ auto requestTxControl = request.mutable_tx_control();
+ requestTxControl->set_commit_tx(txControl.CommitTx_);
+ if (txControl.TxId_) {
+ requestTxControl->set_tx_id(*txControl.TxId_);
+ } else {
+ Y_ASSERT(txControl.TxSettings_);
+ SetTxSettings(*txControl.TxSettings_, requestTxControl->mutable_begin_tx());
+ }
+
auto promise = NewPromise<std::pair<TPlainStatus, TExecuteQueryProcessorPtr>>();
connections->StartReadStream<
@@ -173,7 +202,8 @@ TFuture<std::pair<TPlainStatus, TExecuteQueryProcessorPtr>> StreamExecuteQueryIm
}
TAsyncExecuteQueryIterator TExecQueryImpl::StreamExecuteQuery(const std::shared_ptr<TGRpcConnectionsImpl>& connections,
- const TDbDriverStatePtr& driverState, const TString& query, const TExecuteQuerySettings& settings)
+ const TDbDriverStatePtr& driverState, const TString& query, const TTxControl& txControl,
+ const TExecuteQuerySettings& settings)
{
auto promise = NewPromise<TExecuteQueryIterator>();
@@ -188,14 +218,15 @@ TAsyncExecuteQueryIterator TExecQueryImpl::StreamExecuteQuery(const std::shared_
);
};
- StreamExecuteQueryImpl(connections, driverState, query, settings).Subscribe(iteratorCallback);
+ StreamExecuteQueryImpl(connections, driverState, query, txControl, settings).Subscribe(iteratorCallback);
return promise.GetFuture();
}
TAsyncExecuteQueryResult TExecQueryImpl::ExecuteQuery(const std::shared_ptr<TGRpcConnectionsImpl>& connections,
- const TDbDriverStatePtr& driverState, const TString& query, const TExecuteQuerySettings& settings)
+ const TDbDriverStatePtr& driverState, const TString& query, const TTxControl& txControl,
+ const TExecuteQuerySettings& settings)
{
- return StreamExecuteQuery(connections, driverState, query, settings)
+ return StreamExecuteQuery(connections, driverState, query, txControl, settings)
.Apply([](TAsyncExecuteQueryIterator itFuture){
auto it = itFuture.ExtractValue();
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.h b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.h
index dbb448c9830..ba562dfbc6e 100644
--- a/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.h
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.h
@@ -3,6 +3,7 @@
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/internal_header.h>
#include <ydb/public/sdk/cpp/client/draft/ydb_query/query.h>
+#include <ydb/public/sdk/cpp/client/draft/ydb_query/tx.h>
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h>
namespace NYdb::NQuery {
@@ -10,10 +11,12 @@ namespace NYdb::NQuery {
class TExecQueryImpl {
public:
static TAsyncExecuteQueryIterator StreamExecuteQuery(const std::shared_ptr<TGRpcConnectionsImpl>& connections,
- const TDbDriverStatePtr& driverState, const TString& query, const TExecuteQuerySettings& settings);
+ const TDbDriverStatePtr& driverState, const TString& query, const TTxControl& txControl,
+ const TExecuteQuerySettings& settings);
static TAsyncExecuteQueryResult ExecuteQuery(const std::shared_ptr<TGRpcConnectionsImpl>& connections,
- const TDbDriverStatePtr& driverState, const TString& query, const TExecuteQuerySettings& settings);
+ const TDbDriverStatePtr& driverState, const TString& query, const TTxControl& txControl,
+ const TExecuteQuerySettings& settings);
};
} // namespace NYdb::NQuery::NImpl
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/tx.cpp b/ydb/public/sdk/cpp/client/draft/ydb_query/tx.cpp
new file mode 100644
index 00000000000..e9ef329d632
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/tx.cpp
@@ -0,0 +1 @@
+#include "tx.h"
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/tx.h b/ydb/public/sdk/cpp/client/draft/ydb_query/tx.h
new file mode 100644
index 00000000000..1e1517b02d1
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/tx.h
@@ -0,0 +1,97 @@
+#pragma once
+
+#include <ydb/public/api/grpc/draft/ydb_query_v1.grpc.pb.h>
+
+#include <ydb/public/sdk/cpp/client/ydb_types/fluent_settings_helpers.h>
+
+namespace NYdb::NQuery {
+
+struct TTxOnlineSettings {
+ using TSelf = TTxOnlineSettings;
+
+ FLUENT_SETTING_DEFAULT(bool, AllowInconsistentReads, false);
+
+ TTxOnlineSettings() {}
+};
+
+struct TTxSettings {
+ using TSelf = TTxSettings;
+
+ TTxSettings()
+ : Mode_(TS_SERIALIZABLE_RW) {}
+
+ static TTxSettings SerializableRW() {
+ return TTxSettings(TS_SERIALIZABLE_RW);
+ }
+
+ static TTxSettings OnlineRO(const TTxOnlineSettings& settings = TTxOnlineSettings()) {
+ return TTxSettings(TS_ONLINE_RO).OnlineSettings(settings);
+ }
+
+ static TTxSettings StaleRO() {
+ return TTxSettings(TS_STALE_RO);
+ }
+
+ static TTxSettings SnapshotRO() {
+ return TTxSettings(TS_SNAPSHOT_RO);
+ }
+
+ void Out(IOutputStream& out) const {
+ switch (Mode_) {
+ case TS_SERIALIZABLE_RW:
+ out << "SerializableRW";
+ break;
+ case TS_ONLINE_RO:
+ out << "OnlineRO";
+ break;
+ case TS_STALE_RO:
+ out << "StaleRO";
+ break;
+ case TS_SNAPSHOT_RO:
+ out << "SnapshotRO";
+ break;
+ default:
+ out << "Unknown";
+ break;
+ }
+ }
+
+ enum ETransactionMode {
+ TS_SERIALIZABLE_RW,
+ TS_ONLINE_RO,
+ TS_STALE_RO,
+ TS_SNAPSHOT_RO
+ };
+
+ const ETransactionMode Mode_;
+ FLUENT_SETTING(TTxOnlineSettings, OnlineSettings);
+
+private:
+ TTxSettings(ETransactionMode mode)
+ : Mode_(mode) {}
+};
+
+struct TTxControl {
+ using TSelf = TTxControl;
+
+ static TTxControl Tx(const TString& txId) {
+ return TTxControl(txId);
+ }
+
+ static TTxControl BeginTx(const TTxSettings& settings = TTxSettings()) {
+ return TTxControl(settings);
+ }
+
+ const TMaybe<TString> TxId_;
+ const TMaybe<TTxSettings> TxSettings_;
+ FLUENT_SETTING_FLAG(CommitTx);
+
+private:
+ TTxControl(const TString& txId)
+ : TxId_(txId) {}
+
+ TTxControl(const TTxSettings& txSettings)
+ : TxSettings_(txSettings) {}
+};
+
+} // namespace NYdb::NQuery