diff options
author | spuchin <spuchin@ydb.tech> | 2023-05-19 19:39:07 +0300 |
---|---|---|
committer | spuchin <spuchin@ydb.tech> | 2023-05-19 19:39:07 +0300 |
commit | 47e31d4cfeab1251302eaae74eb9bba78ac7386d (patch) | |
tree | abb52f1ad10d5c3d079662d9df4ffd888de79f64 | |
parent | 18b646ad279f03049dbdccd87931fca890bc9437 (diff) | |
download | ydb-47e31d4cfeab1251302eaae74eb9bba78ac7386d.tar.gz |
Add tx_control for generic query API. ()
23 files changed, 343 insertions, 140 deletions
diff --git a/ydb/core/grpc_services/query/query_helpers.h b/ydb/core/grpc_services/query/query_helpers.h deleted file mode 100644 index 630c2c65e1a..00000000000 --- a/ydb/core/grpc_services/query/query_helpers.h +++ /dev/null @@ -1,89 +0,0 @@ -#pragma once -#include <ydb/core/base/kikimr_issue.h> -#include <ydb/core/grpc_services/rpc_kqp_base.h> -#include <ydb/public/api/protos/draft/ydb_query.pb.h> - -#include <utility> - -namespace NKikimr::NGRpcService { - -namespace NQueryHelpersPrivate { - -inline bool FillQueryContent(const Ydb::Query::ExecuteQueryRequest& req, NKikimrKqp::TEvQueryRequest& kqpRequest, NYql::TIssues& issues) { - switch (req.query_case()) { - case Ydb::Query::ExecuteQueryRequest::kQueryContent: - if (!CheckQuery(req.query_content().text(), issues)) { - return false; - } - - kqpRequest.MutableRequest()->SetQuery(req.query_content().text()); - return true; - - default: - issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Unexpected query option")); - return false; - } -} - -inline bool FillQueryContent(const Ydb::Query::ExecuteScriptRequest& req, NKikimrKqp::TEvQueryRequest& kqpRequest, NYql::TIssues& issues) { - switch (req.script_case()) { - case Ydb::Query::ExecuteScriptRequest::kScriptContent: - if (!CheckQuery(req.script_content().text(), issues)) { - return false; - } - - kqpRequest.MutableRequest()->SetQuery(req.script_content().text()); - return true; - - case Ydb::Query::ExecuteScriptRequest::kScriptId: - issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Execution by script id is not supported yet")); - return false; - - default: - issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Unexpected query option")); - return false; - } -} - -inline NKikimrKqp::EQueryType GetQueryType(const Ydb::Query::ExecuteQueryRequest&) { - return NKikimrKqp::QUERY_TYPE_SQL_QUERY; -} - -inline NKikimrKqp::EQueryType GetQueryType(const Ydb::Query::ExecuteScriptRequest&) { - return NKikimrKqp::QUERY_TYPE_FEDERATED_QUERY; -} - -} // namespace NQueryHelpersPrivate - -template <class TRequestProto> -std::tuple<Ydb::StatusIds::StatusCode, NYql::TIssues> FillKqpRequest( - const TRequestProto& req, NKikimrKqp::TEvQueryRequest& kqpRequest) -{ - kqpRequest.MutableRequest()->MutableYdbParameters()->insert(req.parameters().begin(), req.parameters().end()); - switch (req.exec_mode()) { - case Ydb::Query::EXEC_MODE_EXECUTE: - kqpRequest.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); - break; - default: { - NYql::TIssues issues; - issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Unexpected query mode")); - return {Ydb::StatusIds::BAD_REQUEST, issues}; - } - } - - kqpRequest.MutableRequest()->SetType(NQueryHelpersPrivate::GetQueryType(req)); - kqpRequest.MutableRequest()->SetKeepSession(false); - - // TODO: Use tx control from request. - kqpRequest.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); - kqpRequest.MutableRequest()->MutableTxControl()->set_commit_tx(true); - - NYql::TIssues issues; - if (!NQueryHelpersPrivate::FillQueryContent(req, kqpRequest, issues)) { - return {Ydb::StatusIds::BAD_REQUEST, issues}; - } - - return {Ydb::StatusIds::SUCCESS, {}}; -} - -} // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp index 0bc431d13ca..a50eb52f262 100644 --- a/ydb/core/grpc_services/query/rpc_execute_query.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp @@ -1,5 +1,4 @@ #include "service_query.h" -#include "query_helpers.h" #include <ydb/core/actorlib_impl/long_timer.h> #include <ydb/core/base/appdata.h> @@ -66,6 +65,105 @@ private: ui64 TotalResponsesSize_ = 0; }; +bool FillQueryContent(const Ydb::Query::ExecuteQueryRequest& req, NKikimrKqp::TEvQueryRequest& kqpRequest, + NYql::TIssues& issues) +{ + switch (req.query_case()) { + case Ydb::Query::ExecuteQueryRequest::kQueryContent: + if (!CheckQuery(req.query_content().text(), issues)) { + return false; + } + + kqpRequest.MutableRequest()->SetQuery(req.query_content().text()); + return true; + + default: + issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Unexpected query option")); + return false; + } +} + +bool FillTxSettings(const Ydb::Query::TransactionSettings& from, Ydb::Table::TransactionSettings& to, + NYql::TIssues& issues) +{ + switch (from.tx_mode_case()) { + case Ydb::Query::TransactionSettings::kSerializableReadWrite: + to.mutable_serializable_read_write(); + break; + case Ydb::Query::TransactionSettings::kOnlineReadOnly: + to.mutable_online_read_only()->set_allow_inconsistent_reads( + from.online_read_only().allow_inconsistent_reads()); + break; + case Ydb::Query::TransactionSettings::kStaleReadOnly: + to.mutable_stale_read_only(); + break; + case Ydb::Query::TransactionSettings::kSnapshotReadOnly: + to.mutable_snapshot_read_only(); + break; + default: + issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, + "Invalid tx_settings")); + return false; + } + + return true; +} + +bool FillTxControl(const Ydb::Query::TransactionControl& from, Ydb::Table::TransactionControl& to, + NYql::TIssues& issues) +{ + switch (from.tx_selector_case()) { + case Ydb::Query::TransactionControl::kTxId: + to.set_tx_id(from.tx_id()); + break; + case Ydb::Query::TransactionControl::kBeginTx: + if (!FillTxSettings(from.begin_tx(), *to.mutable_begin_tx(), issues)) { + return false; + } + break; + default: + issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, + "Invalid tx_control settings")); + return false; + } + + to.set_commit_tx(from.commit_tx()); + return true; +} + +std::tuple<Ydb::StatusIds::StatusCode, NYql::TIssues> FillKqpRequest( + const Ydb::Query::ExecuteQueryRequest& req, NKikimrKqp::TEvQueryRequest& kqpRequest) +{ + kqpRequest.MutableRequest()->MutableYdbParameters()->insert(req.parameters().begin(), req.parameters().end()); + switch (req.exec_mode()) { + case Ydb::Query::EXEC_MODE_EXECUTE: + kqpRequest.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); + break; + default: { + NYql::TIssues issues; + issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Unexpected query mode")); + return {Ydb::StatusIds::BAD_REQUEST, std::move(issues)}; + } + } + + kqpRequest.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY); + kqpRequest.MutableRequest()->SetKeepSession(false); + + if (req.has_tx_control()) { + NYql::TIssues issues; + if (!FillTxControl(req.tx_control(), *kqpRequest.MutableRequest()->MutableTxControl(), issues)) { + return {Ydb::StatusIds::BAD_REQUEST, std::move(issues)}; + } + } + + NYql::TIssues issues; + if (!FillQueryContent(req, kqpRequest, issues)) { + return {Ydb::StatusIds::BAD_REQUEST, std::move(issues)}; + } + + return {Ydb::StatusIds::SUCCESS, {}}; +} + class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> { public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { @@ -126,13 +224,13 @@ private: auto [fillStatus, fillIssues] = FillKqpRequest(*req, ev->Record); if (fillStatus != Ydb::StatusIds::SUCCESS) { - return ReplyFinishStream(fillStatus, fillIssues); + return ReplyFinishStream(fillStatus, std::move(fillIssues)); } if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release())) { NYql::TIssues issues; issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Internal error")); - ReplyFinishStream(Ydb::StatusIds::INTERNAL_ERROR, issues); + ReplyFinishStream(Ydb::StatusIds::INTERNAL_ERROR, std::move(issues)); } } diff --git a/ydb/core/grpc_services/query/rpc_execute_script.cpp b/ydb/core/grpc_services/query/rpc_execute_script.cpp index 86c45645d0a..9d35a44307e 100644 --- a/ydb/core/grpc_services/query/rpc_execute_script.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_script.cpp @@ -1,5 +1,4 @@ #include "service_query.h" -#include "query_helpers.h" #include <ydb/core/base/appdata.h> #include <ydb/core/base/kikimr_issue.h> @@ -21,6 +20,60 @@ using namespace NActors; using TEvExecuteScriptRequest = TGrpcRequestNoOperationCall<Ydb::Query::ExecuteScriptRequest, Ydb::Operations::Operation>; +bool FillQueryContent(const Ydb::Query::ExecuteScriptRequest& req, NKikimrKqp::TEvQueryRequest& kqpRequest, + NYql::TIssues& issues) +{ + switch (req.script_case()) { + case Ydb::Query::ExecuteScriptRequest::kScriptContent: + if (!CheckQuery(req.script_content().text(), issues)) { + return false; + } + + kqpRequest.MutableRequest()->SetQuery(req.script_content().text()); + return true; + + case Ydb::Query::ExecuteScriptRequest::kScriptId: + issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, + "Execution by script id is not supported yet")); + return false; + + default: + issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, + "Unexpected query option")); + return false; + } +} + +std::tuple<Ydb::StatusIds::StatusCode, NYql::TIssues> FillKqpRequest( + const Ydb::Query::ExecuteScriptRequest& req, NKikimrKqp::TEvQueryRequest& kqpRequest) +{ + kqpRequest.MutableRequest()->MutableYdbParameters()->insert(req.parameters().begin(), req.parameters().end()); + switch (req.exec_mode()) { + case Ydb::Query::EXEC_MODE_EXECUTE: + kqpRequest.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); + break; + default: { + NYql::TIssues issues; + issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Unexpected query mode")); + return {Ydb::StatusIds::BAD_REQUEST, std::move(issues)}; + } + } + + kqpRequest.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT); + kqpRequest.MutableRequest()->SetKeepSession(false); + + // TODO: Avoid explicit tx_control for script queries. + kqpRequest.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); + kqpRequest.MutableRequest()->MutableTxControl()->set_commit_tx(true); + + NYql::TIssues issues; + if (!FillQueryContent(req, kqpRequest, issues)) { + return {Ydb::StatusIds::BAD_REQUEST, std::move(issues)}; + } + + return {Ydb::StatusIds::SUCCESS, {}}; +} + class TExecuteScriptRPC : public TActorBootstrapped<TExecuteScriptRPC> { public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { diff --git a/ydb/core/kqp/common/kqp_timeouts.cpp b/ydb/core/kqp/common/kqp_timeouts.cpp index 819730e684c..d5e55838877 100644 --- a/ydb/core/kqp/common/kqp_timeouts.cpp +++ b/ydb/core/kqp/common/kqp_timeouts.cpp @@ -15,8 +15,8 @@ ui64 GetDefaultQueryTimeoutMs(NKikimrKqp::EQueryType queryType, const NKikimrCon case NKikimrKqp::QUERY_TYPE_SQL_DML: case NKikimrKqp::QUERY_TYPE_PREPARED_DML: case NKikimrKqp::QUERY_TYPE_AST_DML: - case NKikimrKqp::QUERY_TYPE_SQL_QUERY: - case NKikimrKqp::QUERY_TYPE_FEDERATED_QUERY: + case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY: + case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT: return queryLimits.GetDataQueryTimeoutMs(); case NKikimrKqp::QUERY_TYPE_SQL_SCAN: diff --git a/ydb/core/kqp/common/simple/helpers.cpp b/ydb/core/kqp/common/simple/helpers.cpp index e9694c52de3..11c06c9a3f1 100644 --- a/ydb/core/kqp/common/simple/helpers.cpp +++ b/ydb/core/kqp/common/simple/helpers.cpp @@ -9,8 +9,8 @@ bool IsSqlQuery(const NKikimrKqp::EQueryType& queryType) { case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT: case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING: case NKikimrKqp::QUERY_TYPE_SQL_SCAN: - case NKikimrKqp::QUERY_TYPE_SQL_QUERY: - case NKikimrKqp::QUERY_TYPE_FEDERATED_QUERY: + case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY: + case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT: return true; default: diff --git a/ydb/core/kqp/common/simple/query_id.cpp b/ydb/core/kqp/common/simple/query_id.cpp index 09c57a7f267..8ec1ace9224 100644 --- a/ydb/core/kqp/common/simple/query_id.cpp +++ b/ydb/core/kqp/common/simple/query_id.cpp @@ -23,8 +23,8 @@ TKqpQueryId::TKqpQueryId(const TString& cluster, const TString& database, const case NKikimrKqp::QUERY_TYPE_SQL_SCAN: case NKikimrKqp::QUERY_TYPE_AST_DML: case NKikimrKqp::QUERY_TYPE_AST_SCAN: - case NKikimrKqp::QUERY_TYPE_SQL_QUERY: - case NKikimrKqp::QUERY_TYPE_FEDERATED_QUERY: + case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY: + case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT: break; default: diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index 34757a637ec..c087db13806 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -126,11 +126,11 @@ public: AsyncCompileResult = KqpHost->PrepareScanQuery(QueryRef, QueryId.IsSql(), prepareSettings); break; - case NKikimrKqp::QUERY_TYPE_SQL_QUERY: + case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY: AsyncCompileResult = KqpHost->PrepareQuery(QueryRef, prepareSettings); break; - case NKikimrKqp::QUERY_TYPE_FEDERATED_QUERY: + case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT: AsyncCompileResult = KqpHost->PrepareFederatedQuery(QueryRef, prepareSettings); break; diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp index e60e2f68ecf..2e5c72f45fe 100644 --- a/ydb/core/kqp/counters/kqp_counters.cpp +++ b/ydb/core/kqp/counters/kqp_counters.cpp @@ -102,10 +102,10 @@ void TKqpCountersBase::Init() { KqpGroup->GetCounter("Request/QueryTypeSqlScan", true); QueryTypes[NKikimrKqp::EQueryType::QUERY_TYPE_AST_SCAN] = KqpGroup->GetCounter("Request/QueryTypeAstScan", true); - QueryTypes[NKikimrKqp::EQueryType::QUERY_TYPE_SQL_QUERY] = - KqpGroup->GetCounter("Request/QueryTypeQuery", true); - QueryTypes[NKikimrKqp::EQueryType::QUERY_TYPE_FEDERATED_QUERY] = - KqpGroup->GetCounter("Request/QueryTypeFederatedQuery", true); + QueryTypes[NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_QUERY] = + KqpGroup->GetCounter("Request/QueryTypeGenericQuery", true); + QueryTypes[NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_SCRIPT] = + KqpGroup->GetCounter("Request/QueryTypeGenericScript", true); OtherQueryTypes = KqpGroup->GetCounter("Requests/QueryTypeOther", true); QueriesWithRangeScan = KqpGroup->GetCounter("Query/WithRangeScan", true); diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp index f785a66297a..e4592c1ae22 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp @@ -443,7 +443,7 @@ Y_UNIT_TEST_SUITE(KqpProxy) { auto ev = MakeHolder<TEvKqp::TEvScriptRequest>(); auto& req = *ev->Record.MutableRequest(); req.SetQuery("SELECT 42"); - req.SetType(NKikimrKqp::QUERY_TYPE_FEDERATED_QUERY); + req.SetType(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT); req.SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); req.SetDatabase(settings.DomainName); diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 5ab09057b0b..5f3da545a24 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -374,8 +374,8 @@ public: return ( type == NKikimrKqp::QUERY_TYPE_AST_SCAN || type == NKikimrKqp::QUERY_TYPE_SQL_SCAN || - type == NKikimrKqp::QUERY_TYPE_SQL_QUERY || - type == NKikimrKqp::QUERY_TYPE_FEDERATED_QUERY + type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY || + type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT ); } diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 1c99d959823..95e09d350f6 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -271,8 +271,8 @@ public: case NKikimrKqp::QUERY_TYPE_SQL_SCAN: case NKikimrKqp::QUERY_TYPE_AST_SCAN: case NKikimrKqp::QUERY_TYPE_AST_DML: - case NKikimrKqp::QUERY_TYPE_SQL_QUERY: - case NKikimrKqp::QUERY_TYPE_FEDERATED_QUERY: + case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY: + case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT: return true; // should not be compiled. TODO: forward to request executer @@ -648,8 +648,8 @@ public: YQL_ENSURE( type == NKikimrKqp::QUERY_TYPE_SQL_SCAN || type == NKikimrKqp::QUERY_TYPE_AST_SCAN || - type == NKikimrKqp::QUERY_TYPE_SQL_QUERY || - type == NKikimrKqp::QUERY_TYPE_FEDERATED_QUERY + type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY || + type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT ); break; @@ -1140,8 +1140,8 @@ public: case NKikimrKqp::QUERY_TYPE_SQL_SCAN: case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT: case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING: - case NKikimrKqp::QUERY_TYPE_SQL_QUERY: - case NKikimrKqp::QUERY_TYPE_FEDERATED_QUERY: { + case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY: + case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT: { TString text = QueryState->ExtractQueryText(); if (IsQueryAllowedToLog(text)) { auto userSID = QueryState->UserToken->GetUserSID(); diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp index c3a4055f142..2111ebdd288 100644 --- a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp @@ -15,7 +15,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { auto it = db.StreamExecuteQuery(R"( SELECT 1; - )").ExtractValueSync(); + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); ui64 count = 0; @@ -41,7 +41,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { auto result = db.ExecuteQuery(R"( SELECT 1; - )").ExtractValueSync(); + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); CompareYson(R"([[1]])", FormatResultSetYson(result.GetResultSet(0))); @@ -53,7 +53,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { auto it = db.StreamExecuteQuery(R"( SELECT Key, Value2 FROM TwoShard WHERE Value2 > 0; - )").ExtractValueSync(); + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); ui64 count = 0; @@ -79,7 +79,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { auto result = db.ExecuteQuery(R"( SELECT Key, Value2 FROM TwoShard WHERE Value2 > 0 ORDER BY Key; - )").ExtractValueSync(); + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); CompareYson(R"([ @@ -94,7 +94,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { auto result = db.ExecuteQuery(R"( SELECT COUNT(*) FROM EightShard; - )").ExtractValueSync(); + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); CompareYson(R"([[24u]])", FormatResultSetYson(result.GetResultSet(0))); @@ -107,7 +107,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { auto result = db.ExecuteQuery(R"( SELECT * FROM EightShard WHERE Text = "Value2" AND Data = 1 ORDER BY Key; SELECT * FROM TwoShard WHERE Key < 10 ORDER BY Key; - )").ExtractValueSync(); + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); CompareYson(R"([ @@ -127,7 +127,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { auto result = db.ExecuteQuery(R"( SELECT COUNT(*) FROM EightShard; SELECT COUNT(*) FROM TwoShard; - )").ExtractValueSync(); + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); CompareYson(R"([[24u]])", FormatResultSetYson(result.GetResultSet(0))); diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index d3bc30a98b9..2c155c661f3 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -37,8 +37,8 @@ enum EQueryType { QUERY_TYPE_AST_SCAN = 10; QUERY_TYPE_SQL_SCRIPT_STREAMING = 11; - QUERY_TYPE_SQL_QUERY = 12; - QUERY_TYPE_FEDERATED_QUERY = 13; + QUERY_TYPE_SQL_GENERIC_QUERY = 12; + QUERY_TYPE_SQL_GENERIC_SCRIPT = 13; }; enum EQueryAction { diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.darwin-x86_64.txt b/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.darwin-x86_64.txt index 4aa472691b8..646cde7da97 100644 --- a/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.darwin-x86_64.txt +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.darwin-x86_64.txt @@ -20,4 +20,5 @@ target_link_libraries(client-draft-ydb_query PUBLIC target_sources(client-draft-ydb_query PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/query.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/tx.cpp ) diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.linux-aarch64.txt index 57264527a05..4f212f0f10f 100644 --- a/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.linux-aarch64.txt +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.linux-aarch64.txt @@ -21,4 +21,5 @@ target_link_libraries(client-draft-ydb_query PUBLIC target_sources(client-draft-ydb_query PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/query.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/tx.cpp ) diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.linux-x86_64.txt b/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.linux-x86_64.txt index 57264527a05..4f212f0f10f 100644 --- a/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.linux-x86_64.txt +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.linux-x86_64.txt @@ -21,4 +21,5 @@ target_link_libraries(client-draft-ydb_query PUBLIC target_sources(client-draft-ydb_query PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/query.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/tx.cpp ) diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.windows-x86_64.txt b/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.windows-x86_64.txt index 4aa472691b8..646cde7da97 100644 --- a/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.windows-x86_64.txt +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.windows-x86_64.txt @@ -20,4 +20,5 @@ target_link_libraries(client-draft-ydb_query PUBLIC target_sources(client-draft-ydb_query PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/query.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/tx.cpp ) diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp b/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp index 03189d4f7b4..34f63848404 100644 --- a/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp @@ -21,12 +21,16 @@ public: // TODO: Drain sessions. } - TAsyncExecuteQueryIterator StreamExecuteQuery(const TString& query, const TExecuteQuerySettings& settings) { - return TExecQueryImpl::StreamExecuteQuery(Connections_, DbDriverState_, query, settings); + TAsyncExecuteQueryIterator StreamExecuteQuery(const TString& query, const TTxControl& txControl, + const TExecuteQuerySettings& settings) + { + return TExecQueryImpl::StreamExecuteQuery(Connections_, DbDriverState_, query, txControl, settings); } - TAsyncExecuteQueryResult ExecuteQuery(const TString& query, const TExecuteQuerySettings& settings) { - return TExecQueryImpl::ExecuteQuery(Connections_, DbDriverState_, query, settings); + TAsyncExecuteQueryResult ExecuteQuery(const TString& query, const TTxControl& txControl, + const TExecuteQuerySettings& settings) + { + return TExecQueryImpl::ExecuteQuery(Connections_, DbDriverState_, query, txControl, settings); } NThreading::TFuture<TScriptExecutionOperation> ExecuteScript(const TString& script, const TExecuteScriptSettings& settings) { @@ -126,15 +130,15 @@ TQueryClient::TQueryClient(const TDriver& driver, const TClientSettings& setting } TAsyncExecuteQueryResult TQueryClient::ExecuteQuery(const TString& query, - const TExecuteQuerySettings& settings) + const TTxControl& txControl, const TExecuteQuerySettings& settings) { - return Impl_->ExecuteQuery(query, settings); + return Impl_->ExecuteQuery(query, txControl, settings); } TAsyncExecuteQueryIterator TQueryClient::StreamExecuteQuery(const TString& query, - const TExecuteQuerySettings& settings) + const TTxControl& txControl, const TExecuteQuerySettings& settings) { - return Impl_->StreamExecuteQuery(query, settings); + return Impl_->StreamExecuteQuery(query, txControl, settings); } NThreading::TFuture<TScriptExecutionOperation> TQueryClient::ExecuteScript(const TString& script, diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/client.h b/ydb/public/sdk/cpp/client/draft/ydb_query/client.h index 3b009ef1982..59daab44697 100644 --- a/ydb/public/sdk/cpp/client/draft/ydb_query/client.h +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/client.h @@ -1,6 +1,7 @@ #pragma once #include "query.h" +#include "tx.h" #include <ydb/public/sdk/cpp/client/ydb_driver/driver.h> @@ -21,10 +22,10 @@ class TQueryClient { public: TQueryClient(const TDriver& driver, const TClientSettings& settings = TClientSettings()); - TAsyncExecuteQueryResult ExecuteQuery(const TString& query, + TAsyncExecuteQueryResult ExecuteQuery(const TString& query, const TTxControl& txControl, const TExecuteQuerySettings& settings = TExecuteQuerySettings()); - TAsyncExecuteQueryIterator StreamExecuteQuery(const TString& query, + TAsyncExecuteQueryIterator StreamExecuteQuery(const TString& query, const TTxControl& txControl, const TExecuteQuerySettings& settings = TExecuteQuerySettings()); NThreading::TFuture<TScriptExecutionOperation> ExecuteScript(const TString& script, diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp index 202bb329bdd..4c93bbf0a8b 100644 --- a/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp @@ -11,6 +11,26 @@ namespace NYdb::NQuery { using namespace NThreading; +static void SetTxSettings(const TTxSettings& txSettings, Ydb::Query::TransactionSettings* proto) { + switch (txSettings.Mode_) { + case TTxSettings::TS_SERIALIZABLE_RW: + proto->mutable_serializable_read_write(); + break; + case TTxSettings::TS_ONLINE_RO: + proto->mutable_online_read_only()->set_allow_inconsistent_reads( + txSettings.OnlineSettings_.AllowInconsistentReads_); + break; + case TTxSettings::TS_STALE_RO: + proto->mutable_stale_read_only(); + break; + case TTxSettings::TS_SNAPSHOT_RO: + proto->mutable_snapshot_read_only(); + break; + default: + throw TContractViolation("Unexpected transaction mode."); + } +} + class TExecuteQueryIterator::TReaderImpl { public: using TSelf = TExecuteQueryIterator::TReaderImpl; @@ -147,12 +167,21 @@ struct TExecuteQueryBuffer : public TThrRefBase, TNonCopyable { TFuture<std::pair<TPlainStatus, TExecuteQueryProcessorPtr>> StreamExecuteQueryImpl( const std::shared_ptr<TGRpcConnectionsImpl>& connections, const TDbDriverStatePtr& driverState, - const TString& query, const TExecuteQuerySettings& settings) + const TString& query, const TTxControl& txControl, const TExecuteQuerySettings& settings) { auto request = MakeRequest<Ydb::Query::ExecuteQueryRequest>(); request.set_exec_mode(Ydb::Query::EXEC_MODE_EXECUTE); request.mutable_query_content()->set_text(query); + auto requestTxControl = request.mutable_tx_control(); + requestTxControl->set_commit_tx(txControl.CommitTx_); + if (txControl.TxId_) { + requestTxControl->set_tx_id(*txControl.TxId_); + } else { + Y_ASSERT(txControl.TxSettings_); + SetTxSettings(*txControl.TxSettings_, requestTxControl->mutable_begin_tx()); + } + auto promise = NewPromise<std::pair<TPlainStatus, TExecuteQueryProcessorPtr>>(); connections->StartReadStream< @@ -173,7 +202,8 @@ TFuture<std::pair<TPlainStatus, TExecuteQueryProcessorPtr>> StreamExecuteQueryIm } TAsyncExecuteQueryIterator TExecQueryImpl::StreamExecuteQuery(const std::shared_ptr<TGRpcConnectionsImpl>& connections, - const TDbDriverStatePtr& driverState, const TString& query, const TExecuteQuerySettings& settings) + const TDbDriverStatePtr& driverState, const TString& query, const TTxControl& txControl, + const TExecuteQuerySettings& settings) { auto promise = NewPromise<TExecuteQueryIterator>(); @@ -188,14 +218,15 @@ TAsyncExecuteQueryIterator TExecQueryImpl::StreamExecuteQuery(const std::shared_ ); }; - StreamExecuteQueryImpl(connections, driverState, query, settings).Subscribe(iteratorCallback); + StreamExecuteQueryImpl(connections, driverState, query, txControl, settings).Subscribe(iteratorCallback); return promise.GetFuture(); } TAsyncExecuteQueryResult TExecQueryImpl::ExecuteQuery(const std::shared_ptr<TGRpcConnectionsImpl>& connections, - const TDbDriverStatePtr& driverState, const TString& query, const TExecuteQuerySettings& settings) + const TDbDriverStatePtr& driverState, const TString& query, const TTxControl& txControl, + const TExecuteQuerySettings& settings) { - return StreamExecuteQuery(connections, driverState, query, settings) + return StreamExecuteQuery(connections, driverState, query, txControl, settings) .Apply([](TAsyncExecuteQueryIterator itFuture){ auto it = itFuture.ExtractValue(); diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.h b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.h index dbb448c9830..ba562dfbc6e 100644 --- a/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.h +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.h @@ -3,6 +3,7 @@ #include <ydb/public/sdk/cpp/client/impl/ydb_internal/internal_header.h> #include <ydb/public/sdk/cpp/client/draft/ydb_query/query.h> +#include <ydb/public/sdk/cpp/client/draft/ydb_query/tx.h> #include <ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h> namespace NYdb::NQuery { @@ -10,10 +11,12 @@ namespace NYdb::NQuery { class TExecQueryImpl { public: static TAsyncExecuteQueryIterator StreamExecuteQuery(const std::shared_ptr<TGRpcConnectionsImpl>& connections, - const TDbDriverStatePtr& driverState, const TString& query, const TExecuteQuerySettings& settings); + const TDbDriverStatePtr& driverState, const TString& query, const TTxControl& txControl, + const TExecuteQuerySettings& settings); static TAsyncExecuteQueryResult ExecuteQuery(const std::shared_ptr<TGRpcConnectionsImpl>& connections, - const TDbDriverStatePtr& driverState, const TString& query, const TExecuteQuerySettings& settings); + const TDbDriverStatePtr& driverState, const TString& query, const TTxControl& txControl, + const TExecuteQuerySettings& settings); }; } // namespace NYdb::NQuery::NImpl diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/tx.cpp b/ydb/public/sdk/cpp/client/draft/ydb_query/tx.cpp new file mode 100644 index 00000000000..e9ef329d632 --- /dev/null +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/tx.cpp @@ -0,0 +1 @@ +#include "tx.h" diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/tx.h b/ydb/public/sdk/cpp/client/draft/ydb_query/tx.h new file mode 100644 index 00000000000..1e1517b02d1 --- /dev/null +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/tx.h @@ -0,0 +1,97 @@ +#pragma once + +#include <ydb/public/api/grpc/draft/ydb_query_v1.grpc.pb.h> + +#include <ydb/public/sdk/cpp/client/ydb_types/fluent_settings_helpers.h> + +namespace NYdb::NQuery { + +struct TTxOnlineSettings { + using TSelf = TTxOnlineSettings; + + FLUENT_SETTING_DEFAULT(bool, AllowInconsistentReads, false); + + TTxOnlineSettings() {} +}; + +struct TTxSettings { + using TSelf = TTxSettings; + + TTxSettings() + : Mode_(TS_SERIALIZABLE_RW) {} + + static TTxSettings SerializableRW() { + return TTxSettings(TS_SERIALIZABLE_RW); + } + + static TTxSettings OnlineRO(const TTxOnlineSettings& settings = TTxOnlineSettings()) { + return TTxSettings(TS_ONLINE_RO).OnlineSettings(settings); + } + + static TTxSettings StaleRO() { + return TTxSettings(TS_STALE_RO); + } + + static TTxSettings SnapshotRO() { + return TTxSettings(TS_SNAPSHOT_RO); + } + + void Out(IOutputStream& out) const { + switch (Mode_) { + case TS_SERIALIZABLE_RW: + out << "SerializableRW"; + break; + case TS_ONLINE_RO: + out << "OnlineRO"; + break; + case TS_STALE_RO: + out << "StaleRO"; + break; + case TS_SNAPSHOT_RO: + out << "SnapshotRO"; + break; + default: + out << "Unknown"; + break; + } + } + + enum ETransactionMode { + TS_SERIALIZABLE_RW, + TS_ONLINE_RO, + TS_STALE_RO, + TS_SNAPSHOT_RO + }; + + const ETransactionMode Mode_; + FLUENT_SETTING(TTxOnlineSettings, OnlineSettings); + +private: + TTxSettings(ETransactionMode mode) + : Mode_(mode) {} +}; + +struct TTxControl { + using TSelf = TTxControl; + + static TTxControl Tx(const TString& txId) { + return TTxControl(txId); + } + + static TTxControl BeginTx(const TTxSettings& settings = TTxSettings()) { + return TTxControl(settings); + } + + const TMaybe<TString> TxId_; + const TMaybe<TTxSettings> TxSettings_; + FLUENT_SETTING_FLAG(CommitTx); + +private: + TTxControl(const TString& txId) + : TxId_(txId) {} + + TTxControl(const TTxSettings& txSettings) + : TxSettings_(txSettings) {} +}; + +} // namespace NYdb::NQuery |