diff options
author | dcherednik <dcherednik@ydb.tech> | 2023-11-16 21:12:27 +0300 |
---|---|---|
committer | dcherednik <dcherednik@ydb.tech> | 2023-11-16 21:42:21 +0300 |
commit | 5c9d28dd6a0bc188f88b0db8816620b67f08d789 (patch) | |
tree | 84bdedb6b16b8d85e5ced82fab55b0f3941a34b5 | |
parent | aeb7214aa4705327afbd04c9d72c3e37aab0427e (diff) | |
download | ydb-5c9d28dd6a0bc188f88b0db8816620b67f08d789.tar.gz |
Interactive transaction control support for ExecuteQuery call. KIKIMR-19883
-rw-r--r-- | ydb/core/grpc_services/query/rpc_execute_query.cpp | 31 | ||||
-rw-r--r-- | ydb/core/kqp/ut/common/kqp_ut_common.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp | 39 | ||||
-rw-r--r-- | ydb/public/api/protos/ydb_query.proto | 2 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_query/client.cpp | 32 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_query/client.h | 61 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp | 57 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.h | 6 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_query/query.cpp | 16 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_query/query.h | 52 |
10 files changed, 194 insertions, 106 deletions
diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp index dd9783f9f8..5e6e34e4c4 100644 --- a/ydb/core/grpc_services/query/rpc_execute_query.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp @@ -238,6 +238,7 @@ private: Ydb::Table::TransactionControl* txControl = nullptr; if (req->has_tx_control()) { + ReplyTxMeta = !req->tx_control().commit_tx(); txControl = google::protobuf::Arena::CreateMessage<Ydb::Table::TransactionControl>(Request_->GetArena()); if (!FillTxControl(req->tx_control(), *txControl, issues)) { return ReplyFinishStream(Ydb::StatusIds::BAD_REQUEST, std::move(issues)); @@ -364,20 +365,31 @@ private: if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) { Request_->SetRuHeader(record.GetConsumedRu()); - } - - if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS && NeedReportStats(*Request_->GetProtoRequest())) { - Ydb::Query::ExecuteQueryResponsePart response; - response.set_status(Ydb::StatusIds::SUCCESS); auto& kqpResponse = record.GetResponse(); - FillQueryStats(*response.mutable_exec_stats(), kqpResponse); + + Ydb::Query::ExecuteQueryResponsePart response; AuditContextAppend(Request_.get(), *Request_->GetProtoRequest(), response); - TString out; - Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out); - Request_->SendSerializedResult(std::move(out), record.GetYdbStatus()); + bool hasTrailingMessage = false; + + if (kqpResponse.HasTxMeta() && ReplyTxMeta) { + hasTrailingMessage = true; + response.mutable_tx_meta()->set_id(kqpResponse.GetTxMeta().id()); + } + + if (NeedReportStats(*Request_->GetProtoRequest())) { + hasTrailingMessage = true; + FillQueryStats(*response.mutable_exec_stats(), kqpResponse); + } + + if (hasTrailingMessage) { + response.set_status(Ydb::StatusIds::SUCCESS); + TString out; + Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out); + Request_->SendSerializedResult(std::move(out), record.GetYdbStatus()); + } } ReplyFinishStream(record.GetYdbStatus(), issues); @@ -454,6 +466,7 @@ private: TRpcFlowControlState FlowControl_; TMap<TActorId, TProducerState> StreamProducers_; + bool ReplyTxMeta = false; }; } // namespace diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp index f05c5f6be1..d5ea510f05 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp +++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp @@ -849,8 +849,8 @@ TCollectedStreamResult CollectStreamResultImpl(TIterator& it) { } if constexpr (std::is_same_v<TIterator, NYdb::NQuery::TExecuteQueryIterator>) { - UNIT_ASSERT_C(streamPart.HasResultSet() || streamPart.GetStats(), - "Unexpected empty query service response."); + UNIT_ASSERT_C(streamPart.HasResultSet() || streamPart.GetStats() || streamPart.GetTransaction(), + "Unexpected empty query service response."); if (streamPart.HasResultSet()) { auto resultSet = streamPart.ExtractResultSet(); diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index dc4648f12b..ef3c92e645 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -3,6 +3,7 @@ #include <ydb/public/lib/ut_helpers/ut_helpers_query.h> #include <ydb/public/sdk/cpp/client/ydb_operation/operation.h> #include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> +#include <ydb/public/sdk/cpp/client/ydb_types/exceptions/exceptions.h> #include <ydb/public/sdk/cpp/client/ydb_types/operation/operation.h> #include <ydb/core/kqp/counters/kqp_counters.h> @@ -229,6 +230,44 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { UNIT_ASSERT_VALUES_EQUAL_C(commitTxResult.GetStatus(), EStatus::ABORTED, commitTxResult.GetIssues().ToString()); } + Y_UNIT_TEST(ExecuteQueryInteractiveTx) { + auto kikimr = DefaultKikimrRunner(); + auto db = kikimr.GetQueryClient(); + auto sessionResult = db.GetSession().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(sessionResult.GetStatus(), EStatus::SUCCESS, sessionResult.GetIssues().ToString()); + auto session = sessionResult.GetSession(); + + const TString query = "UPDATE TwoShard SET Value2 = 0"; + auto result = session.ExecuteQuery(query, TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + auto transaction = result.GetTransaction(); + UNIT_ASSERT(transaction->IsActive()); + + auto checkResult = [&](TString expected) { + auto selectRes = db.ExecuteQuery( + "SELECT * FROM TwoShard ORDER BY Key", + TTxControl::BeginTx().CommitTx() + ).ExtractValueSync(); + + UNIT_ASSERT_C(selectRes.IsSuccess(), selectRes.GetIssues().ToString()); + CompareYson(expected, FormatResultSetYson(selectRes.GetResultSet(0))); + }; + checkResult(R"([[[1u];["One"];[-1]];[[2u];["Two"];[0]];[[3u];["Three"];[1]];[[4000000001u];["BigOne"];[-1]];[[4000000002u];["BigTwo"];[0]];[[4000000003u];["BigThree"];[1]]])"); + + auto txRes = transaction->Commit().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(txRes.GetStatus(), EStatus::SUCCESS, txRes.GetIssues().ToString()); + + checkResult(R"([[[1u];["One"];[0]];[[2u];["Two"];[0]];[[3u];["Three"];[0]];[[4000000001u];["BigOne"];[0]];[[4000000002u];["BigTwo"];[0]];[[4000000003u];["BigThree"];[0]]])"); + } + + Y_UNIT_TEST(ForbidInteractiveTxOnImplicitSession) { + auto kikimr = DefaultKikimrRunner(); + auto db = kikimr.GetQueryClient(); + + const TString query = "SELECT 1"; + UNIT_ASSERT_EXCEPTION(db.ExecuteQuery(query, TTxControl::BeginTx()).ExtractValueSync(), NYdb::TContractViolation); + } + Y_UNIT_TEST(ExecuteRetryQuery) { auto kikimr = DefaultKikimrRunner(); auto db = kikimr.GetQueryClient(); diff --git a/ydb/public/api/protos/ydb_query.proto b/ydb/public/api/protos/ydb_query.proto index 2ba8e24942..ec0bbe861f 100644 --- a/ydb/public/api/protos/ydb_query.proto +++ b/ydb/public/api/protos/ydb_query.proto @@ -184,6 +184,8 @@ message ExecuteQueryResponsePart { // Execution statistics (last part only) Ydb.TableStats.QueryStats exec_stats = 5; + + TransactionMeta tx_meta = 6; } message ExecuteScriptRequest { diff --git a/ydb/public/sdk/cpp/client/ydb_query/client.cpp b/ydb/public/sdk/cpp/client/ydb_query/client.cpp index c508c987cb..22d0389d38 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/client.cpp +++ b/ydb/public/sdk/cpp/client/ydb_query/client.cpp @@ -67,18 +67,18 @@ public: } TAsyncExecuteQueryIterator StreamExecuteQuery(const TString& query, const TTxControl& txControl, - const TMaybe<TParams>& params, const TExecuteQuerySettings& settings, const TString& sessionId = {}) + const TMaybe<TParams>& params, const TExecuteQuerySettings& settings, const TMaybe<TSession>& session = {}) { return TExecQueryImpl::StreamExecuteQuery( - Connections_, DbDriverState_, query, txControl, params, settings, sessionId); + Connections_, DbDriverState_, query, txControl, params, settings, session); } TAsyncExecuteQueryResult ExecuteQuery(const TString& query, const TTxControl& txControl, const TMaybe<TParams>& params, const TExecuteQuerySettings& settings, - const TString& sessionId = {}) + const TMaybe<TSession>& session = {}) { return TExecQueryImpl::ExecuteQuery( - Connections_, DbDriverState_, query, txControl, params, settings, sessionId); + Connections_, DbDriverState_, query, txControl, params, settings, session); } NThreading::TFuture<TScriptExecutionOperation> ExecuteScript(const TString& script, const TExecuteScriptSettings& settings) { @@ -610,7 +610,7 @@ TAsyncExecuteQueryResult TSession::ExecuteQuery(const TString& query, const TTxC { return NSessionPool::InjectSessionStatusInterception( SessionImpl_, - Client_->ExecuteQuery(query, txControl, {}, settings, SessionImpl_->GetId()), + Client_->ExecuteQuery(query, txControl, {}, settings, *this), true, Client_->Settings_.SessionPoolSettings_.CloseIdleThreshold_); } @@ -620,7 +620,7 @@ TAsyncExecuteQueryResult TSession::ExecuteQuery(const TString& query, const TTxC { return NSessionPool::InjectSessionStatusInterception( SessionImpl_, - Client_->ExecuteQuery(query, txControl, params, settings, SessionImpl_->GetId()), + Client_->ExecuteQuery(query, txControl, params, settings, *this), true, Client_->Settings_.SessionPoolSettings_.CloseIdleThreshold_); } @@ -630,7 +630,7 @@ TAsyncExecuteQueryIterator TSession::StreamExecuteQuery(const TString& query, co { return NSessionPool::InjectSessionStatusInterception( SessionImpl_, - Client_->StreamExecuteQuery(query, txControl, {}, settings, SessionImpl_->GetId()), + Client_->StreamExecuteQuery(query, txControl, {}, settings, *this), true, Client_->Settings_.SessionPoolSettings_.CloseIdleThreshold_); } @@ -640,7 +640,7 @@ TAsyncExecuteQueryIterator TSession::StreamExecuteQuery(const TString& query, co { return NSessionPool::InjectSessionStatusInterception( SessionImpl_, - Client_->StreamExecuteQuery(query, txControl, params, settings, SessionImpl_->GetId()), + Client_->StreamExecuteQuery(query, txControl, params, settings, *this), true, Client_->Settings_.SessionPoolSettings_.CloseIdleThreshold_); } @@ -678,4 +678,20 @@ const TTransaction& TBeginTransactionResult::GetTransaction() const { return Transaction_; } +const TVector<TResultSet>& TExecuteQueryResult::GetResultSets() const { + return ResultSets_; +} + +TResultSet TExecuteQueryResult::GetResultSet(size_t resultIndex) const { + if (resultIndex >= ResultSets_.size()) { + RaiseError(TString("Requested index out of range\n")); + } + + return ResultSets_[resultIndex]; +} + +TResultSetParser TExecuteQueryResult::GetResultSetParser(size_t resultIndex) const { + return TResultSetParser(GetResultSet(resultIndex)); +} + } // namespace NYdb::NQuery diff --git a/ydb/public/sdk/cpp/client/ydb_query/client.h b/ydb/public/sdk/cpp/client/ydb_query/client.h index 138f91b596..b774617717 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/client.h +++ b/ydb/public/sdk/cpp/client/ydb_query/client.h @@ -152,6 +152,7 @@ private: class TTransaction { friend class TQueryClient; + friend class TExecuteQueryIterator::TReaderImpl; public: const TString& GetId() const { return TxId_; @@ -185,4 +186,64 @@ private: TTransaction Transaction_; }; +class TExecuteQueryPart : public TStreamPartStatus { +public: + bool HasResultSet() const { return ResultSet_.Defined(); } + ui64 GetResultSetIndex() const { return ResultSetIndex_; } + const TResultSet& GetResultSet() const { return *ResultSet_; } + TResultSet ExtractResultSet() { return std::move(*ResultSet_); } + + const TMaybe<TExecStats>& GetStats() const { return Stats_; } + const TMaybe<TTransaction>& GetTransaction() const { return Transaction_; } + + TExecuteQueryPart(TStatus&& status, TMaybe<TExecStats>&& queryStats, TMaybe<TTransaction>&& tx) + : TStreamPartStatus(std::move(status)) + , Stats_(std::move(queryStats)) + , Transaction_(std::move(tx)) + {} + + TExecuteQueryPart(TStatus&& status, TResultSet&& resultSet, i64 resultSetIndex, + TMaybe<TExecStats>&& queryStats, TMaybe<TTransaction>&& tx) + : TStreamPartStatus(std::move(status)) + , ResultSet_(std::move(resultSet)) + , ResultSetIndex_(resultSetIndex) + , Stats_(std::move(queryStats)) + , Transaction_(std::move(tx)) + {} + +private: + TMaybe<TResultSet> ResultSet_; + i64 ResultSetIndex_ = 0; + TMaybe<TExecStats> Stats_; + TMaybe<TTransaction> Transaction_; +}; + +class TExecuteQueryResult : public TStatus { +public: + const TVector<TResultSet>& GetResultSets() const; + TResultSet GetResultSet(size_t resultIndex) const; + TResultSetParser GetResultSetParser(size_t resultIndex) const; + + const TMaybe<TExecStats>& GetStats() const { return Stats_; } + + TMaybe<TTransaction> GetTransaction() const {return Transaction_; } + + TExecuteQueryResult(TStatus&& status) + : TStatus(std::move(status)) + {} + + TExecuteQueryResult(TStatus&& status, TVector<TResultSet>&& resultSets, + TMaybe<TExecStats>&& stats, TMaybe<TTransaction>&& tx) + : TStatus(std::move(status)) + , ResultSets_(std::move(resultSets)) + , Stats_(std::move(stats)) + , Transaction_(std::move(tx)) + {} + +private: + TVector<TResultSet> ResultSets_; + TMaybe<TExecStats> Stats_; + TMaybe<TTransaction> Transaction_; +}; + } // namespace NYdb::NQuery diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp b/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp index 63001271e0..aa3a8aa398 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp +++ b/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp @@ -1,6 +1,7 @@ #define INCLUDE_YDB_INTERNAL_H #include "exec_query.h" +#include <ydb/public/sdk/cpp/client/ydb_query/client.h> #include <ydb/public/sdk/cpp/client/impl/ydb_internal/make_request/make.h> #include <ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h> #include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h> @@ -41,10 +42,11 @@ public: using TGRpcStatus = NGrpc::TGrpcStatus; using TBatchReadResult = std::pair<TResponse, TGRpcStatus>; - TReaderImpl(TStreamProcessorPtr streamProcessor, const TString& endpoint) + TReaderImpl(TStreamProcessorPtr streamProcessor, const TString& endpoint, const TMaybe<TSession>& session) : StreamProcessor_(streamProcessor) , Finished_(false) , Endpoint_(endpoint) + , Session_(session) {} ~TReaderImpl() { @@ -61,7 +63,7 @@ public: auto readCb = [self, promise](TGRpcStatus&& grpcStatus) mutable { if (!grpcStatus.Ok()) { self->Finished_ = true; - promise.SetValue({TStatus(TPlainStatus(grpcStatus, self->Endpoint_)), {}}); + promise.SetValue({TStatus(TPlainStatus(grpcStatus, self->Endpoint_)), {}, {}}); } else { NYql::TIssues issues; NYql::IssuesFromMessage(self->Response_.issues(), issues); @@ -70,19 +72,25 @@ public: TStatus status{std::move(plainStatus)}; TMaybe<TExecStats> stats; + TMaybe<TTransaction> tx; if (self->Response_.has_exec_stats()) { stats = TExecStats(std::move(*self->Response_.mutable_exec_stats())); } + if (self->Response_.has_tx_meta() && self->Session_.Defined()) { + tx = TTransaction(self->Session_.GetRef(), self->Response_.tx_meta().id()); + } + if (self->Response_.has_result_set()) { promise.SetValue({ std::move(status), TResultSet(std::move(*self->Response_.mutable_result_set())), self->Response_.result_set_index(), - std::move(stats) + std::move(stats), + std::move(tx) }); } else { - promise.SetValue({std::move(status), std::move(stats)}); + promise.SetValue({std::move(status), std::move(stats), std::move(tx)}); } } }; @@ -95,6 +103,7 @@ private: TResponse Response_; bool Finished_; TString Endpoint_; + TMaybe<TSession> Session_; }; TAsyncExecuteQueryPart TExecuteQueryIterator::ReadNext() { @@ -119,6 +128,7 @@ struct TExecuteQueryBuffer : public TThrRefBase, TNonCopyable { TVector<NYql::TIssue> Issues_; TVector<Ydb::ResultSet> ResultSets_; TMaybe<TExecStats> Stats_; + TMaybe<TTransaction> Tx_; void Next() { TPtr self(this); @@ -131,10 +141,12 @@ struct TExecuteQueryBuffer : public TThrRefBase, TNonCopyable { TVector<NYql::TIssue> issues; TVector<Ydb::ResultSet> resultProtos; TMaybe<TExecStats> stats; + TMaybe<TTransaction> tx; std::swap(self->Issues_, issues); std::swap(self->ResultSets_, resultProtos); std::swap(self->Stats_, stats); + std::swap(self->Tx_, tx); TVector<TResultSet> resultSets; for (auto& proto : resultProtos) { @@ -144,10 +156,11 @@ struct TExecuteQueryBuffer : public TThrRefBase, TNonCopyable { self->Promise_.SetValue(TExecuteQueryResult( TStatus(EStatus::SUCCESS, NYql::TIssues(std::move(issues))), std::move(resultSets), - std::move(stats) + std::move(stats), + std::move(tx) )); } else { - self->Promise_.SetValue(TExecuteQueryResult(std::move(part), {}, {})); + self->Promise_.SetValue(TExecuteQueryResult(std::move(part), {}, {}, {})); } return; @@ -172,8 +185,12 @@ struct TExecuteQueryBuffer : public TThrRefBase, TNonCopyable { resultSet.mutable_rows()->Add(inRsProto.rows().begin(), inRsProto.rows().end()); } - if (part.GetStats().Defined()) { - self->Stats_ = part.GetStats(); + if (const auto& st = part.GetStats()) { + self->Stats_ = st; + } + + if (const auto& tx = part.GetTransaction()) { + self->Tx_ = tx; } self->Next(); @@ -184,15 +201,17 @@ struct TExecuteQueryBuffer : public TThrRefBase, TNonCopyable { TFuture<std::pair<TPlainStatus, TExecuteQueryProcessorPtr>> StreamExecuteQueryImpl( const std::shared_ptr<TGRpcConnectionsImpl>& connections, const TDbDriverStatePtr& driverState, const TString& query, const TTxControl& txControl, const ::google::protobuf::Map<TString, Ydb::TypedValue>* params, - const TExecuteQuerySettings& settings, const TString& sessionId) + const TExecuteQuerySettings& settings, const TMaybe<TSession>& session) { auto request = MakeRequest<Ydb::Query::ExecuteQueryRequest>(); request.set_exec_mode(::Ydb::Query::ExecMode(settings.ExecMode_)); request.set_stats_mode(::Ydb::Query::StatsMode(settings.StatsMode_)); request.mutable_query_content()->set_text(query); request.mutable_query_content()->set_syntax(::Ydb::Query::Syntax(settings.Syntax_)); - if (sessionId) { - request.set_session_id(sessionId); + if (session.Defined()) { + request.set_session_id(session->GetId()); + } else if ((txControl.TxSettings_.Defined() && !txControl.CommitTx_) || txControl.TxId_.Defined()) { + throw TContractViolation("Interactive tx must use explisit session"); } if (settings.ConcurrentResultSets_) { @@ -219,8 +238,8 @@ TFuture<std::pair<TPlainStatus, TExecuteQueryProcessorPtr>> StreamExecuteQueryIm auto promise = NewPromise<std::pair<TPlainStatus, TExecuteQueryProcessorPtr>>(); auto rpcSettings = TRpcRequestSettings::Make(settings); - if (sessionId) { - rpcSettings.PreferredEndpoint = TEndpointKey(GetNodeIdFromSession(sessionId)); + if (session.Defined()) { + rpcSettings.PreferredEndpoint = TEndpointKey(GetNodeIdFromSession(session->GetId())); } connections->StartReadStream< @@ -242,16 +261,16 @@ TFuture<std::pair<TPlainStatus, TExecuteQueryProcessorPtr>> StreamExecuteQueryIm TAsyncExecuteQueryIterator TExecQueryImpl::StreamExecuteQuery(const std::shared_ptr<TGRpcConnectionsImpl>& connections, const TDbDriverStatePtr& driverState, const TString& query, const TTxControl& txControl, - const TMaybe<TParams>& params, const TExecuteQuerySettings& settings, const TString& sessionId) + const TMaybe<TParams>& params, const TExecuteQuerySettings& settings, const TMaybe<TSession>& session) { auto promise = NewPromise<TExecuteQueryIterator>(); - auto iteratorCallback = [promise](TFuture<std::pair<TPlainStatus, TExecuteQueryProcessorPtr>> future) mutable { + auto iteratorCallback = [promise, session](TFuture<std::pair<TPlainStatus, TExecuteQueryProcessorPtr>> future) mutable { Y_ASSERT(future.HasValue()); auto pair = future.ExtractValue(); promise.SetValue(TExecuteQueryIterator( pair.second - ? std::make_shared<TExecuteQueryIterator::TReaderImpl>(pair.second, pair.first.Endpoint) + ? std::make_shared<TExecuteQueryIterator::TReaderImpl>(pair.second, pair.first.Endpoint, session) : nullptr, std::move(pair.first)) ); @@ -261,19 +280,19 @@ TAsyncExecuteQueryIterator TExecQueryImpl::StreamExecuteQuery(const std::shared_ ? ¶ms->GetProtoMap() : nullptr; - StreamExecuteQueryImpl(connections, driverState, query, txControl, paramsProto, settings, sessionId) + StreamExecuteQueryImpl(connections, driverState, query, txControl, paramsProto, settings, session) .Subscribe(iteratorCallback); return promise.GetFuture(); } TAsyncExecuteQueryResult TExecQueryImpl::ExecuteQuery(const std::shared_ptr<TGRpcConnectionsImpl>& connections, const TDbDriverStatePtr& driverState, const TString& query, const TTxControl& txControl, - const TMaybe<TParams>& params, const TExecuteQuerySettings& settings, const TString& sessionId) + const TMaybe<TParams>& params, const TExecuteQuerySettings& settings, const TMaybe<TSession>& session) { auto syncSettings = settings; syncSettings.ConcurrentResultSets(true); - return StreamExecuteQuery(connections, driverState, query, txControl, params, syncSettings, sessionId) + return StreamExecuteQuery(connections, driverState, query, txControl, params, syncSettings, session) .Apply([](TAsyncExecuteQueryIterator itFuture){ auto it = itFuture.ExtractValue(); diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.h b/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.h index 24765225a7..e455ac1cdc 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.h +++ b/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.h @@ -2,7 +2,7 @@ #include <ydb/public/sdk/cpp/client/impl/ydb_internal/internal_header.h> -#include <ydb/public/sdk/cpp/client/ydb_query/query.h> +#include <ydb/public/sdk/cpp/client/ydb_query/client.h> #include <ydb/public/sdk/cpp/client/ydb_query/tx.h> #include <ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h> #include <ydb/public/sdk/cpp/client/ydb_params/params.h> @@ -13,11 +13,11 @@ class TExecQueryImpl { public: static TAsyncExecuteQueryIterator StreamExecuteQuery(const std::shared_ptr<TGRpcConnectionsImpl>& connections, const TDbDriverStatePtr& driverState, const TString& query, const TTxControl& txControl, - const TMaybe<TParams>& params, const TExecuteQuerySettings& settings, const TString& sessionId); + const TMaybe<TParams>& params, const TExecuteQuerySettings& settings, const TMaybe<TSession>& session); static TAsyncExecuteQueryResult ExecuteQuery(const std::shared_ptr<TGRpcConnectionsImpl>& connections, const TDbDriverStatePtr& driverState, const TString& query, const TTxControl& txControl, - const TMaybe<TParams>& params, const TExecuteQuerySettings& settings, const TString& sessionId); + const TMaybe<TParams>& params, const TExecuteQuerySettings& settings, const TMaybe<TSession>& session); }; } // namespace NYdb::NQuery::NImpl diff --git a/ydb/public/sdk/cpp/client/ydb_query/query.cpp b/ydb/public/sdk/cpp/client/ydb_query/query.cpp index 852528faad..49066dea33 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/query.cpp +++ b/ydb/public/sdk/cpp/client/ydb_query/query.cpp @@ -33,22 +33,6 @@ std::string_view StatsModeToString(const EStatsMode statsMode) { } } -const TVector<TResultSet>& TExecuteQueryResult::GetResultSets() const { - return ResultSets_; -} - -TResultSet TExecuteQueryResult::GetResultSet(size_t resultIndex) const { - if (resultIndex >= ResultSets_.size()) { - RaiseError(TString("Requested index out of range\n")); - } - - return ResultSets_[resultIndex]; -} - -TResultSetParser TExecuteQueryResult::GetResultSetParser(size_t resultIndex) const { - return TResultSetParser(GetResultSet(resultIndex)); -} - TScriptExecutionOperation::TScriptExecutionOperation(TStatus&& status, Ydb::Operations::Operation&& operation) : TOperation(std::move(status), std::move(operation)) { diff --git a/ydb/public/sdk/cpp/client/ydb_query/query.h b/ydb/public/sdk/cpp/client/ydb_query/query.h index cfe0518cbd..2b2be8bd98 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/query.h +++ b/ydb/public/sdk/cpp/client/ydb_query/query.h @@ -49,33 +49,7 @@ enum class EExecStatus { Failed = 50, }; -class TExecuteQueryPart : public TStreamPartStatus { -public: - bool HasResultSet() const { return ResultSet_.Defined(); } - ui64 GetResultSetIndex() const { return ResultSetIndex_; } - const TResultSet& GetResultSet() const { return *ResultSet_; } - TResultSet ExtractResultSet() { return std::move(*ResultSet_); } - - const TMaybe<TExecStats>& GetStats() const { return Stats_; } - - TExecuteQueryPart(TStatus&& status, TMaybe<TExecStats>&& queryStats) - : TStreamPartStatus(std::move(status)) - , Stats_(std::move(queryStats)) - {} - - TExecuteQueryPart(TStatus&& status, TResultSet&& resultSet, i64 resultSetIndex, TMaybe<TExecStats>&& queryStats) - : TStreamPartStatus(std::move(status)) - , ResultSet_(std::move(resultSet)) - , ResultSetIndex_(resultSetIndex) - , Stats_(std::move(queryStats)) - {} - -private: - TMaybe<TResultSet> ResultSet_; - i64 ResultSetIndex_ = 0; - TMaybe<TExecStats> Stats_; -}; - +class TExecuteQueryPart; using TAsyncExecuteQueryPart = NThreading::TFuture<TExecuteQueryPart>; class TExecuteQueryIterator : public TStatus { @@ -108,28 +82,7 @@ struct TBeginTxSettings : public TRequestSettings<TBeginTxSettings> {}; struct TCommitTxSettings : public TRequestSettings<TCommitTxSettings> {}; struct TRollbackTxSettings : public TRequestSettings<TRollbackTxSettings> {}; -class TExecuteQueryResult : public TStatus { -public: - const TVector<TResultSet>& GetResultSets() const; - TResultSet GetResultSet(size_t resultIndex) const; - TResultSetParser GetResultSetParser(size_t resultIndex) const; - - const TMaybe<TExecStats>& GetStats() const { return Stats_; } - TExecuteQueryResult(TStatus&& status) - : TStatus(std::move(status)) - {} - - TExecuteQueryResult(TStatus&& status, TVector<TResultSet>&& resultSets, TMaybe<TExecStats>&& stats) - : TStatus(std::move(status)) - , ResultSets_(std::move(resultSets)) - , Stats_(std::move(stats)) - {} - -private: - TVector<TResultSet> ResultSets_; - TMaybe<TExecStats> Stats_; -}; class TCommitTransactionResult : public TStatus { public: @@ -138,7 +91,6 @@ public: class TBeginTransactionResult; -using TAsyncExecuteQueryResult = NThreading::TFuture<TExecuteQueryResult>; using TAsyncBeginTransactionResult = NThreading::TFuture<TBeginTransactionResult>; using TAsyncCommitTransactionResult = NThreading::TFuture<TCommitTransactionResult>; @@ -215,6 +167,8 @@ private: TString NextFetchToken_; }; +class TExecuteQueryResult; using TAsyncFetchScriptResultsResult = NThreading::TFuture<TFetchScriptResultsResult>; +using TAsyncExecuteQueryResult = NThreading::TFuture<TExecuteQueryResult>; } // namespace NYdb::NQuery |