aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordcherednik <dcherednik@ydb.tech>2023-11-16 21:12:27 +0300
committerdcherednik <dcherednik@ydb.tech>2023-11-16 21:42:21 +0300
commit5c9d28dd6a0bc188f88b0db8816620b67f08d789 (patch)
tree84bdedb6b16b8d85e5ced82fab55b0f3941a34b5
parentaeb7214aa4705327afbd04c9d72c3e37aab0427e (diff)
downloadydb-5c9d28dd6a0bc188f88b0db8816620b67f08d789.tar.gz
Interactive transaction control support for ExecuteQuery call. KIKIMR-19883
-rw-r--r--ydb/core/grpc_services/query/rpc_execute_query.cpp31
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.cpp4
-rw-r--r--ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp39
-rw-r--r--ydb/public/api/protos/ydb_query.proto2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_query/client.cpp32
-rw-r--r--ydb/public/sdk/cpp/client/ydb_query/client.h61
-rw-r--r--ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp57
-rw-r--r--ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.h6
-rw-r--r--ydb/public/sdk/cpp/client/ydb_query/query.cpp16
-rw-r--r--ydb/public/sdk/cpp/client/ydb_query/query.h52
10 files changed, 194 insertions, 106 deletions
diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp
index dd9783f9f8..5e6e34e4c4 100644
--- a/ydb/core/grpc_services/query/rpc_execute_query.cpp
+++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp
@@ -238,6 +238,7 @@ private:
Ydb::Table::TransactionControl* txControl = nullptr;
if (req->has_tx_control()) {
+ ReplyTxMeta = !req->tx_control().commit_tx();
txControl = google::protobuf::Arena::CreateMessage<Ydb::Table::TransactionControl>(Request_->GetArena());
if (!FillTxControl(req->tx_control(), *txControl, issues)) {
return ReplyFinishStream(Ydb::StatusIds::BAD_REQUEST, std::move(issues));
@@ -364,20 +365,31 @@ private:
if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
Request_->SetRuHeader(record.GetConsumedRu());
- }
-
- if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS && NeedReportStats(*Request_->GetProtoRequest())) {
- Ydb::Query::ExecuteQueryResponsePart response;
- response.set_status(Ydb::StatusIds::SUCCESS);
auto& kqpResponse = record.GetResponse();
- FillQueryStats(*response.mutable_exec_stats(), kqpResponse);
+
+ Ydb::Query::ExecuteQueryResponsePart response;
AuditContextAppend(Request_.get(), *Request_->GetProtoRequest(), response);
- TString out;
- Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);
- Request_->SendSerializedResult(std::move(out), record.GetYdbStatus());
+ bool hasTrailingMessage = false;
+
+ if (kqpResponse.HasTxMeta() && ReplyTxMeta) {
+ hasTrailingMessage = true;
+ response.mutable_tx_meta()->set_id(kqpResponse.GetTxMeta().id());
+ }
+
+ if (NeedReportStats(*Request_->GetProtoRequest())) {
+ hasTrailingMessage = true;
+ FillQueryStats(*response.mutable_exec_stats(), kqpResponse);
+ }
+
+ if (hasTrailingMessage) {
+ response.set_status(Ydb::StatusIds::SUCCESS);
+ TString out;
+ Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);
+ Request_->SendSerializedResult(std::move(out), record.GetYdbStatus());
+ }
}
ReplyFinishStream(record.GetYdbStatus(), issues);
@@ -454,6 +466,7 @@ private:
TRpcFlowControlState FlowControl_;
TMap<TActorId, TProducerState> StreamProducers_;
+ bool ReplyTxMeta = false;
};
} // namespace
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
index f05c5f6be1..d5ea510f05 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
@@ -849,8 +849,8 @@ TCollectedStreamResult CollectStreamResultImpl(TIterator& it) {
}
if constexpr (std::is_same_v<TIterator, NYdb::NQuery::TExecuteQueryIterator>) {
- UNIT_ASSERT_C(streamPart.HasResultSet() || streamPart.GetStats(),
- "Unexpected empty query service response.");
+ UNIT_ASSERT_C(streamPart.HasResultSet() || streamPart.GetStats() || streamPart.GetTransaction(),
+ "Unexpected empty query service response.");
if (streamPart.HasResultSet()) {
auto resultSet = streamPart.ExtractResultSet();
diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
index dc4648f12b..ef3c92e645 100644
--- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
@@ -3,6 +3,7 @@
#include <ydb/public/lib/ut_helpers/ut_helpers_query.h>
#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
+#include <ydb/public/sdk/cpp/client/ydb_types/exceptions/exceptions.h>
#include <ydb/public/sdk/cpp/client/ydb_types/operation/operation.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
@@ -229,6 +230,44 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
UNIT_ASSERT_VALUES_EQUAL_C(commitTxResult.GetStatus(), EStatus::ABORTED, commitTxResult.GetIssues().ToString());
}
+ Y_UNIT_TEST(ExecuteQueryInteractiveTx) {
+ auto kikimr = DefaultKikimrRunner();
+ auto db = kikimr.GetQueryClient();
+ auto sessionResult = db.GetSession().ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(sessionResult.GetStatus(), EStatus::SUCCESS, sessionResult.GetIssues().ToString());
+ auto session = sessionResult.GetSession();
+
+ const TString query = "UPDATE TwoShard SET Value2 = 0";
+ auto result = session.ExecuteQuery(query, TTxControl::BeginTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ auto transaction = result.GetTransaction();
+ UNIT_ASSERT(transaction->IsActive());
+
+ auto checkResult = [&](TString expected) {
+ auto selectRes = db.ExecuteQuery(
+ "SELECT * FROM TwoShard ORDER BY Key",
+ TTxControl::BeginTx().CommitTx()
+ ).ExtractValueSync();
+
+ UNIT_ASSERT_C(selectRes.IsSuccess(), selectRes.GetIssues().ToString());
+ CompareYson(expected, FormatResultSetYson(selectRes.GetResultSet(0)));
+ };
+ checkResult(R"([[[1u];["One"];[-1]];[[2u];["Two"];[0]];[[3u];["Three"];[1]];[[4000000001u];["BigOne"];[-1]];[[4000000002u];["BigTwo"];[0]];[[4000000003u];["BigThree"];[1]]])");
+
+ auto txRes = transaction->Commit().GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(txRes.GetStatus(), EStatus::SUCCESS, txRes.GetIssues().ToString());
+
+ checkResult(R"([[[1u];["One"];[0]];[[2u];["Two"];[0]];[[3u];["Three"];[0]];[[4000000001u];["BigOne"];[0]];[[4000000002u];["BigTwo"];[0]];[[4000000003u];["BigThree"];[0]]])");
+ }
+
+ Y_UNIT_TEST(ForbidInteractiveTxOnImplicitSession) {
+ auto kikimr = DefaultKikimrRunner();
+ auto db = kikimr.GetQueryClient();
+
+ const TString query = "SELECT 1";
+ UNIT_ASSERT_EXCEPTION(db.ExecuteQuery(query, TTxControl::BeginTx()).ExtractValueSync(), NYdb::TContractViolation);
+ }
+
Y_UNIT_TEST(ExecuteRetryQuery) {
auto kikimr = DefaultKikimrRunner();
auto db = kikimr.GetQueryClient();
diff --git a/ydb/public/api/protos/ydb_query.proto b/ydb/public/api/protos/ydb_query.proto
index 2ba8e24942..ec0bbe861f 100644
--- a/ydb/public/api/protos/ydb_query.proto
+++ b/ydb/public/api/protos/ydb_query.proto
@@ -184,6 +184,8 @@ message ExecuteQueryResponsePart {
// Execution statistics (last part only)
Ydb.TableStats.QueryStats exec_stats = 5;
+
+ TransactionMeta tx_meta = 6;
}
message ExecuteScriptRequest {
diff --git a/ydb/public/sdk/cpp/client/ydb_query/client.cpp b/ydb/public/sdk/cpp/client/ydb_query/client.cpp
index c508c987cb..22d0389d38 100644
--- a/ydb/public/sdk/cpp/client/ydb_query/client.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_query/client.cpp
@@ -67,18 +67,18 @@ public:
}
TAsyncExecuteQueryIterator StreamExecuteQuery(const TString& query, const TTxControl& txControl,
- const TMaybe<TParams>& params, const TExecuteQuerySettings& settings, const TString& sessionId = {})
+ const TMaybe<TParams>& params, const TExecuteQuerySettings& settings, const TMaybe<TSession>& session = {})
{
return TExecQueryImpl::StreamExecuteQuery(
- Connections_, DbDriverState_, query, txControl, params, settings, sessionId);
+ Connections_, DbDriverState_, query, txControl, params, settings, session);
}
TAsyncExecuteQueryResult ExecuteQuery(const TString& query, const TTxControl& txControl,
const TMaybe<TParams>& params, const TExecuteQuerySettings& settings,
- const TString& sessionId = {})
+ const TMaybe<TSession>& session = {})
{
return TExecQueryImpl::ExecuteQuery(
- Connections_, DbDriverState_, query, txControl, params, settings, sessionId);
+ Connections_, DbDriverState_, query, txControl, params, settings, session);
}
NThreading::TFuture<TScriptExecutionOperation> ExecuteScript(const TString& script, const TExecuteScriptSettings& settings) {
@@ -610,7 +610,7 @@ TAsyncExecuteQueryResult TSession::ExecuteQuery(const TString& query, const TTxC
{
return NSessionPool::InjectSessionStatusInterception(
SessionImpl_,
- Client_->ExecuteQuery(query, txControl, {}, settings, SessionImpl_->GetId()),
+ Client_->ExecuteQuery(query, txControl, {}, settings, *this),
true,
Client_->Settings_.SessionPoolSettings_.CloseIdleThreshold_);
}
@@ -620,7 +620,7 @@ TAsyncExecuteQueryResult TSession::ExecuteQuery(const TString& query, const TTxC
{
return NSessionPool::InjectSessionStatusInterception(
SessionImpl_,
- Client_->ExecuteQuery(query, txControl, params, settings, SessionImpl_->GetId()),
+ Client_->ExecuteQuery(query, txControl, params, settings, *this),
true,
Client_->Settings_.SessionPoolSettings_.CloseIdleThreshold_);
}
@@ -630,7 +630,7 @@ TAsyncExecuteQueryIterator TSession::StreamExecuteQuery(const TString& query, co
{
return NSessionPool::InjectSessionStatusInterception(
SessionImpl_,
- Client_->StreamExecuteQuery(query, txControl, {}, settings, SessionImpl_->GetId()),
+ Client_->StreamExecuteQuery(query, txControl, {}, settings, *this),
true,
Client_->Settings_.SessionPoolSettings_.CloseIdleThreshold_);
}
@@ -640,7 +640,7 @@ TAsyncExecuteQueryIterator TSession::StreamExecuteQuery(const TString& query, co
{
return NSessionPool::InjectSessionStatusInterception(
SessionImpl_,
- Client_->StreamExecuteQuery(query, txControl, params, settings, SessionImpl_->GetId()),
+ Client_->StreamExecuteQuery(query, txControl, params, settings, *this),
true,
Client_->Settings_.SessionPoolSettings_.CloseIdleThreshold_);
}
@@ -678,4 +678,20 @@ const TTransaction& TBeginTransactionResult::GetTransaction() const {
return Transaction_;
}
+const TVector<TResultSet>& TExecuteQueryResult::GetResultSets() const {
+ return ResultSets_;
+}
+
+TResultSet TExecuteQueryResult::GetResultSet(size_t resultIndex) const {
+ if (resultIndex >= ResultSets_.size()) {
+ RaiseError(TString("Requested index out of range\n"));
+ }
+
+ return ResultSets_[resultIndex];
+}
+
+TResultSetParser TExecuteQueryResult::GetResultSetParser(size_t resultIndex) const {
+ return TResultSetParser(GetResultSet(resultIndex));
+}
+
} // namespace NYdb::NQuery
diff --git a/ydb/public/sdk/cpp/client/ydb_query/client.h b/ydb/public/sdk/cpp/client/ydb_query/client.h
index 138f91b596..b774617717 100644
--- a/ydb/public/sdk/cpp/client/ydb_query/client.h
+++ b/ydb/public/sdk/cpp/client/ydb_query/client.h
@@ -152,6 +152,7 @@ private:
class TTransaction {
friend class TQueryClient;
+ friend class TExecuteQueryIterator::TReaderImpl;
public:
const TString& GetId() const {
return TxId_;
@@ -185,4 +186,64 @@ private:
TTransaction Transaction_;
};
+class TExecuteQueryPart : public TStreamPartStatus {
+public:
+ bool HasResultSet() const { return ResultSet_.Defined(); }
+ ui64 GetResultSetIndex() const { return ResultSetIndex_; }
+ const TResultSet& GetResultSet() const { return *ResultSet_; }
+ TResultSet ExtractResultSet() { return std::move(*ResultSet_); }
+
+ const TMaybe<TExecStats>& GetStats() const { return Stats_; }
+ const TMaybe<TTransaction>& GetTransaction() const { return Transaction_; }
+
+ TExecuteQueryPart(TStatus&& status, TMaybe<TExecStats>&& queryStats, TMaybe<TTransaction>&& tx)
+ : TStreamPartStatus(std::move(status))
+ , Stats_(std::move(queryStats))
+ , Transaction_(std::move(tx))
+ {}
+
+ TExecuteQueryPart(TStatus&& status, TResultSet&& resultSet, i64 resultSetIndex,
+ TMaybe<TExecStats>&& queryStats, TMaybe<TTransaction>&& tx)
+ : TStreamPartStatus(std::move(status))
+ , ResultSet_(std::move(resultSet))
+ , ResultSetIndex_(resultSetIndex)
+ , Stats_(std::move(queryStats))
+ , Transaction_(std::move(tx))
+ {}
+
+private:
+ TMaybe<TResultSet> ResultSet_;
+ i64 ResultSetIndex_ = 0;
+ TMaybe<TExecStats> Stats_;
+ TMaybe<TTransaction> Transaction_;
+};
+
+class TExecuteQueryResult : public TStatus {
+public:
+ const TVector<TResultSet>& GetResultSets() const;
+ TResultSet GetResultSet(size_t resultIndex) const;
+ TResultSetParser GetResultSetParser(size_t resultIndex) const;
+
+ const TMaybe<TExecStats>& GetStats() const { return Stats_; }
+
+ TMaybe<TTransaction> GetTransaction() const {return Transaction_; }
+
+ TExecuteQueryResult(TStatus&& status)
+ : TStatus(std::move(status))
+ {}
+
+ TExecuteQueryResult(TStatus&& status, TVector<TResultSet>&& resultSets,
+ TMaybe<TExecStats>&& stats, TMaybe<TTransaction>&& tx)
+ : TStatus(std::move(status))
+ , ResultSets_(std::move(resultSets))
+ , Stats_(std::move(stats))
+ , Transaction_(std::move(tx))
+ {}
+
+private:
+ TVector<TResultSet> ResultSets_;
+ TMaybe<TExecStats> Stats_;
+ TMaybe<TTransaction> Transaction_;
+};
+
} // namespace NYdb::NQuery
diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp b/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp
index 63001271e0..aa3a8aa398 100644
--- a/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp
@@ -1,6 +1,7 @@
#define INCLUDE_YDB_INTERNAL_H
#include "exec_query.h"
+#include <ydb/public/sdk/cpp/client/ydb_query/client.h>
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/make_request/make.h>
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h>
#include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h>
@@ -41,10 +42,11 @@ public:
using TGRpcStatus = NGrpc::TGrpcStatus;
using TBatchReadResult = std::pair<TResponse, TGRpcStatus>;
- TReaderImpl(TStreamProcessorPtr streamProcessor, const TString& endpoint)
+ TReaderImpl(TStreamProcessorPtr streamProcessor, const TString& endpoint, const TMaybe<TSession>& session)
: StreamProcessor_(streamProcessor)
, Finished_(false)
, Endpoint_(endpoint)
+ , Session_(session)
{}
~TReaderImpl() {
@@ -61,7 +63,7 @@ public:
auto readCb = [self, promise](TGRpcStatus&& grpcStatus) mutable {
if (!grpcStatus.Ok()) {
self->Finished_ = true;
- promise.SetValue({TStatus(TPlainStatus(grpcStatus, self->Endpoint_)), {}});
+ promise.SetValue({TStatus(TPlainStatus(grpcStatus, self->Endpoint_)), {}, {}});
} else {
NYql::TIssues issues;
NYql::IssuesFromMessage(self->Response_.issues(), issues);
@@ -70,19 +72,25 @@ public:
TStatus status{std::move(plainStatus)};
TMaybe<TExecStats> stats;
+ TMaybe<TTransaction> tx;
if (self->Response_.has_exec_stats()) {
stats = TExecStats(std::move(*self->Response_.mutable_exec_stats()));
}
+ if (self->Response_.has_tx_meta() && self->Session_.Defined()) {
+ tx = TTransaction(self->Session_.GetRef(), self->Response_.tx_meta().id());
+ }
+
if (self->Response_.has_result_set()) {
promise.SetValue({
std::move(status),
TResultSet(std::move(*self->Response_.mutable_result_set())),
self->Response_.result_set_index(),
- std::move(stats)
+ std::move(stats),
+ std::move(tx)
});
} else {
- promise.SetValue({std::move(status), std::move(stats)});
+ promise.SetValue({std::move(status), std::move(stats), std::move(tx)});
}
}
};
@@ -95,6 +103,7 @@ private:
TResponse Response_;
bool Finished_;
TString Endpoint_;
+ TMaybe<TSession> Session_;
};
TAsyncExecuteQueryPart TExecuteQueryIterator::ReadNext() {
@@ -119,6 +128,7 @@ struct TExecuteQueryBuffer : public TThrRefBase, TNonCopyable {
TVector<NYql::TIssue> Issues_;
TVector<Ydb::ResultSet> ResultSets_;
TMaybe<TExecStats> Stats_;
+ TMaybe<TTransaction> Tx_;
void Next() {
TPtr self(this);
@@ -131,10 +141,12 @@ struct TExecuteQueryBuffer : public TThrRefBase, TNonCopyable {
TVector<NYql::TIssue> issues;
TVector<Ydb::ResultSet> resultProtos;
TMaybe<TExecStats> stats;
+ TMaybe<TTransaction> tx;
std::swap(self->Issues_, issues);
std::swap(self->ResultSets_, resultProtos);
std::swap(self->Stats_, stats);
+ std::swap(self->Tx_, tx);
TVector<TResultSet> resultSets;
for (auto& proto : resultProtos) {
@@ -144,10 +156,11 @@ struct TExecuteQueryBuffer : public TThrRefBase, TNonCopyable {
self->Promise_.SetValue(TExecuteQueryResult(
TStatus(EStatus::SUCCESS, NYql::TIssues(std::move(issues))),
std::move(resultSets),
- std::move(stats)
+ std::move(stats),
+ std::move(tx)
));
} else {
- self->Promise_.SetValue(TExecuteQueryResult(std::move(part), {}, {}));
+ self->Promise_.SetValue(TExecuteQueryResult(std::move(part), {}, {}, {}));
}
return;
@@ -172,8 +185,12 @@ struct TExecuteQueryBuffer : public TThrRefBase, TNonCopyable {
resultSet.mutable_rows()->Add(inRsProto.rows().begin(), inRsProto.rows().end());
}
- if (part.GetStats().Defined()) {
- self->Stats_ = part.GetStats();
+ if (const auto& st = part.GetStats()) {
+ self->Stats_ = st;
+ }
+
+ if (const auto& tx = part.GetTransaction()) {
+ self->Tx_ = tx;
}
self->Next();
@@ -184,15 +201,17 @@ struct TExecuteQueryBuffer : public TThrRefBase, TNonCopyable {
TFuture<std::pair<TPlainStatus, TExecuteQueryProcessorPtr>> StreamExecuteQueryImpl(
const std::shared_ptr<TGRpcConnectionsImpl>& connections, const TDbDriverStatePtr& driverState,
const TString& query, const TTxControl& txControl, const ::google::protobuf::Map<TString, Ydb::TypedValue>* params,
- const TExecuteQuerySettings& settings, const TString& sessionId)
+ const TExecuteQuerySettings& settings, const TMaybe<TSession>& session)
{
auto request = MakeRequest<Ydb::Query::ExecuteQueryRequest>();
request.set_exec_mode(::Ydb::Query::ExecMode(settings.ExecMode_));
request.set_stats_mode(::Ydb::Query::StatsMode(settings.StatsMode_));
request.mutable_query_content()->set_text(query);
request.mutable_query_content()->set_syntax(::Ydb::Query::Syntax(settings.Syntax_));
- if (sessionId) {
- request.set_session_id(sessionId);
+ if (session.Defined()) {
+ request.set_session_id(session->GetId());
+ } else if ((txControl.TxSettings_.Defined() && !txControl.CommitTx_) || txControl.TxId_.Defined()) {
+ throw TContractViolation("Interactive tx must use explisit session");
}
if (settings.ConcurrentResultSets_) {
@@ -219,8 +238,8 @@ TFuture<std::pair<TPlainStatus, TExecuteQueryProcessorPtr>> StreamExecuteQueryIm
auto promise = NewPromise<std::pair<TPlainStatus, TExecuteQueryProcessorPtr>>();
auto rpcSettings = TRpcRequestSettings::Make(settings);
- if (sessionId) {
- rpcSettings.PreferredEndpoint = TEndpointKey(GetNodeIdFromSession(sessionId));
+ if (session.Defined()) {
+ rpcSettings.PreferredEndpoint = TEndpointKey(GetNodeIdFromSession(session->GetId()));
}
connections->StartReadStream<
@@ -242,16 +261,16 @@ TFuture<std::pair<TPlainStatus, TExecuteQueryProcessorPtr>> StreamExecuteQueryIm
TAsyncExecuteQueryIterator TExecQueryImpl::StreamExecuteQuery(const std::shared_ptr<TGRpcConnectionsImpl>& connections,
const TDbDriverStatePtr& driverState, const TString& query, const TTxControl& txControl,
- const TMaybe<TParams>& params, const TExecuteQuerySettings& settings, const TString& sessionId)
+ const TMaybe<TParams>& params, const TExecuteQuerySettings& settings, const TMaybe<TSession>& session)
{
auto promise = NewPromise<TExecuteQueryIterator>();
- auto iteratorCallback = [promise](TFuture<std::pair<TPlainStatus, TExecuteQueryProcessorPtr>> future) mutable {
+ auto iteratorCallback = [promise, session](TFuture<std::pair<TPlainStatus, TExecuteQueryProcessorPtr>> future) mutable {
Y_ASSERT(future.HasValue());
auto pair = future.ExtractValue();
promise.SetValue(TExecuteQueryIterator(
pair.second
- ? std::make_shared<TExecuteQueryIterator::TReaderImpl>(pair.second, pair.first.Endpoint)
+ ? std::make_shared<TExecuteQueryIterator::TReaderImpl>(pair.second, pair.first.Endpoint, session)
: nullptr,
std::move(pair.first))
);
@@ -261,19 +280,19 @@ TAsyncExecuteQueryIterator TExecQueryImpl::StreamExecuteQuery(const std::shared_
? &params->GetProtoMap()
: nullptr;
- StreamExecuteQueryImpl(connections, driverState, query, txControl, paramsProto, settings, sessionId)
+ StreamExecuteQueryImpl(connections, driverState, query, txControl, paramsProto, settings, session)
.Subscribe(iteratorCallback);
return promise.GetFuture();
}
TAsyncExecuteQueryResult TExecQueryImpl::ExecuteQuery(const std::shared_ptr<TGRpcConnectionsImpl>& connections,
const TDbDriverStatePtr& driverState, const TString& query, const TTxControl& txControl,
- const TMaybe<TParams>& params, const TExecuteQuerySettings& settings, const TString& sessionId)
+ const TMaybe<TParams>& params, const TExecuteQuerySettings& settings, const TMaybe<TSession>& session)
{
auto syncSettings = settings;
syncSettings.ConcurrentResultSets(true);
- return StreamExecuteQuery(connections, driverState, query, txControl, params, syncSettings, sessionId)
+ return StreamExecuteQuery(connections, driverState, query, txControl, params, syncSettings, session)
.Apply([](TAsyncExecuteQueryIterator itFuture){
auto it = itFuture.ExtractValue();
diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.h b/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.h
index 24765225a7..e455ac1cdc 100644
--- a/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.h
+++ b/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.h
@@ -2,7 +2,7 @@
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/internal_header.h>
-#include <ydb/public/sdk/cpp/client/ydb_query/query.h>
+#include <ydb/public/sdk/cpp/client/ydb_query/client.h>
#include <ydb/public/sdk/cpp/client/ydb_query/tx.h>
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h>
#include <ydb/public/sdk/cpp/client/ydb_params/params.h>
@@ -13,11 +13,11 @@ class TExecQueryImpl {
public:
static TAsyncExecuteQueryIterator StreamExecuteQuery(const std::shared_ptr<TGRpcConnectionsImpl>& connections,
const TDbDriverStatePtr& driverState, const TString& query, const TTxControl& txControl,
- const TMaybe<TParams>& params, const TExecuteQuerySettings& settings, const TString& sessionId);
+ const TMaybe<TParams>& params, const TExecuteQuerySettings& settings, const TMaybe<TSession>& session);
static TAsyncExecuteQueryResult ExecuteQuery(const std::shared_ptr<TGRpcConnectionsImpl>& connections,
const TDbDriverStatePtr& driverState, const TString& query, const TTxControl& txControl,
- const TMaybe<TParams>& params, const TExecuteQuerySettings& settings, const TString& sessionId);
+ const TMaybe<TParams>& params, const TExecuteQuerySettings& settings, const TMaybe<TSession>& session);
};
} // namespace NYdb::NQuery::NImpl
diff --git a/ydb/public/sdk/cpp/client/ydb_query/query.cpp b/ydb/public/sdk/cpp/client/ydb_query/query.cpp
index 852528faad..49066dea33 100644
--- a/ydb/public/sdk/cpp/client/ydb_query/query.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_query/query.cpp
@@ -33,22 +33,6 @@ std::string_view StatsModeToString(const EStatsMode statsMode) {
}
}
-const TVector<TResultSet>& TExecuteQueryResult::GetResultSets() const {
- return ResultSets_;
-}
-
-TResultSet TExecuteQueryResult::GetResultSet(size_t resultIndex) const {
- if (resultIndex >= ResultSets_.size()) {
- RaiseError(TString("Requested index out of range\n"));
- }
-
- return ResultSets_[resultIndex];
-}
-
-TResultSetParser TExecuteQueryResult::GetResultSetParser(size_t resultIndex) const {
- return TResultSetParser(GetResultSet(resultIndex));
-}
-
TScriptExecutionOperation::TScriptExecutionOperation(TStatus&& status, Ydb::Operations::Operation&& operation)
: TOperation(std::move(status), std::move(operation))
{
diff --git a/ydb/public/sdk/cpp/client/ydb_query/query.h b/ydb/public/sdk/cpp/client/ydb_query/query.h
index cfe0518cbd..2b2be8bd98 100644
--- a/ydb/public/sdk/cpp/client/ydb_query/query.h
+++ b/ydb/public/sdk/cpp/client/ydb_query/query.h
@@ -49,33 +49,7 @@ enum class EExecStatus {
Failed = 50,
};
-class TExecuteQueryPart : public TStreamPartStatus {
-public:
- bool HasResultSet() const { return ResultSet_.Defined(); }
- ui64 GetResultSetIndex() const { return ResultSetIndex_; }
- const TResultSet& GetResultSet() const { return *ResultSet_; }
- TResultSet ExtractResultSet() { return std::move(*ResultSet_); }
-
- const TMaybe<TExecStats>& GetStats() const { return Stats_; }
-
- TExecuteQueryPart(TStatus&& status, TMaybe<TExecStats>&& queryStats)
- : TStreamPartStatus(std::move(status))
- , Stats_(std::move(queryStats))
- {}
-
- TExecuteQueryPart(TStatus&& status, TResultSet&& resultSet, i64 resultSetIndex, TMaybe<TExecStats>&& queryStats)
- : TStreamPartStatus(std::move(status))
- , ResultSet_(std::move(resultSet))
- , ResultSetIndex_(resultSetIndex)
- , Stats_(std::move(queryStats))
- {}
-
-private:
- TMaybe<TResultSet> ResultSet_;
- i64 ResultSetIndex_ = 0;
- TMaybe<TExecStats> Stats_;
-};
-
+class TExecuteQueryPart;
using TAsyncExecuteQueryPart = NThreading::TFuture<TExecuteQueryPart>;
class TExecuteQueryIterator : public TStatus {
@@ -108,28 +82,7 @@ struct TBeginTxSettings : public TRequestSettings<TBeginTxSettings> {};
struct TCommitTxSettings : public TRequestSettings<TCommitTxSettings> {};
struct TRollbackTxSettings : public TRequestSettings<TRollbackTxSettings> {};
-class TExecuteQueryResult : public TStatus {
-public:
- const TVector<TResultSet>& GetResultSets() const;
- TResultSet GetResultSet(size_t resultIndex) const;
- TResultSetParser GetResultSetParser(size_t resultIndex) const;
-
- const TMaybe<TExecStats>& GetStats() const { return Stats_; }
- TExecuteQueryResult(TStatus&& status)
- : TStatus(std::move(status))
- {}
-
- TExecuteQueryResult(TStatus&& status, TVector<TResultSet>&& resultSets, TMaybe<TExecStats>&& stats)
- : TStatus(std::move(status))
- , ResultSets_(std::move(resultSets))
- , Stats_(std::move(stats))
- {}
-
-private:
- TVector<TResultSet> ResultSets_;
- TMaybe<TExecStats> Stats_;
-};
class TCommitTransactionResult : public TStatus {
public:
@@ -138,7 +91,6 @@ public:
class TBeginTransactionResult;
-using TAsyncExecuteQueryResult = NThreading::TFuture<TExecuteQueryResult>;
using TAsyncBeginTransactionResult = NThreading::TFuture<TBeginTransactionResult>;
using TAsyncCommitTransactionResult = NThreading::TFuture<TCommitTransactionResult>;
@@ -215,6 +167,8 @@ private:
TString NextFetchToken_;
};
+class TExecuteQueryResult;
using TAsyncFetchScriptResultsResult = NThreading::TFuture<TFetchScriptResultsResult>;
+using TAsyncExecuteQueryResult = NThreading::TFuture<TExecuteQueryResult>;
} // namespace NYdb::NQuery