aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordcherednik <dcherednik@ydb.tech>2022-10-26 14:13:07 +0300
committerdcherednik <dcherednik@ydb.tech>2022-10-26 14:13:07 +0300
commit71b018f828ad9f35f0b391747dd7eaa200793d1d (patch)
treea72222a98aa62b30d83638b0f7714c504cca229e
parent5e7625c7515775f88c42a8581489fb9efc993c3d (diff)
downloadydb-71b018f828ad9f35f0b391747dd7eaa200793d1d.tar.gz
ydb format support for session actor.
-rw-r--r--library/cpp/grpc/server/grpc_request.h5
-rw-r--r--library/cpp/grpc/server/grpc_request_base.h3
-rw-r--r--ydb/core/grpc_services/base/base.h12
-rw-r--r--ydb/core/grpc_services/local_rpc/local_rpc.h4
-rw-r--r--ydb/core/grpc_services/rpc_deferrable.h4
-rw-r--r--ydb/core/grpc_services/rpc_execute_data_query.cpp14
-rw-r--r--ydb/core/grpc_services/rpc_execute_yql_script.cpp2
-rw-r--r--ydb/core/grpc_services/rpc_kqp_base.h38
-rw-r--r--ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp2
-rw-r--r--ydb/core/kqp/kqp.h10
-rw-r--r--ydb/core/kqp/kqp_response.cpp31
-rw-r--r--ydb/core/kqp/kqp_session_actor.cpp19
-rw-r--r--ydb/core/kqp/proxy/kqp_proxy_service.cpp3
-rw-r--r--ydb/core/kqp/proxy/kqp_proxy_service.h3
-rw-r--r--ydb/core/kqp/ut/kqp_params_ut.cpp4
-rw-r--r--ydb/core/protos/kqp.proto4
-rw-r--r--ydb/core/public_http/grpc_request_context_wrapper.cpp4
-rw-r--r--ydb/core/public_http/grpc_request_context_wrapper.h1
-rw-r--r--ydb/core/viewer/json_query.h2
19 files changed, 115 insertions, 50 deletions
diff --git a/library/cpp/grpc/server/grpc_request.h b/library/cpp/grpc/server/grpc_request.h
index b79a0d94fab..a3a5c291f07 100644
--- a/library/cpp/grpc/server/grpc_request.h
+++ b/library/cpp/grpc/server/grpc_request.h
@@ -179,6 +179,11 @@ public:
return Request_;
}
+ NProtoBuf::Message* GetRequestMut() override {
+ return Request_;
+ }
+
+
TAuthState& GetAuthState() override {
return AuthState_;
}
diff --git a/library/cpp/grpc/server/grpc_request_base.h b/library/cpp/grpc/server/grpc_request_base.h
index 8827d028b34..105f9515d0a 100644
--- a/library/cpp/grpc/server/grpc_request_base.h
+++ b/library/cpp/grpc/server/grpc_request_base.h
@@ -53,6 +53,9 @@ public:
//! Get pointer to the request's message.
virtual const NProtoBuf::Message* GetRequest() const = 0;
+ //! Get mutable pointer to the request's message.
+ virtual NProtoBuf::Message* GetRequestMut() = 0;
+
//! Get current auth state
virtual TAuthState& GetAuthState() = 0;
diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h
index 4bf9ed201ae..c98440185a7 100644
--- a/ydb/core/grpc_services/base/base.h
+++ b/ydb/core/grpc_services/base/base.h
@@ -380,6 +380,7 @@ class IRequestCtx : public virtual IRequestCtxBase {
public:
virtual void SetClientLostAction(std::function<void()>&& cb) = 0;
virtual const google::protobuf::Message* GetRequest() const = 0;
+ virtual google::protobuf::Message* GetRequestMut() = 0;
virtual google::protobuf::Arena* GetArena() = 0;
virtual const TMaybe<TString> GetRequestType() const = 0;
virtual void SetRuHeader(ui64 ru) = 0;
@@ -1004,6 +1005,13 @@ public:
return request;
}
+ template <typename T>
+ static TRequest* GetProtoRequestMut(const T& req) {
+ auto request = dynamic_cast<TRequest*>(req->GetRequestMut());
+ Y_VERIFY(request != nullptr, "Wrong using of TGRpcRequestWrapper");
+ return request;
+ }
+
const TRequest* GetProtoRequest() const {
return GetProtoRequest(this);
}
@@ -1094,6 +1102,10 @@ public:
return Ctx_->GetRequest();
}
+ google::protobuf::Message* GetRequestMut() override {
+ return Ctx_->GetRequestMut();
+ }
+
void SetRespHook(TRespHook&& hook) override {
RespHook = std::move(hook);
}
diff --git a/ydb/core/grpc_services/local_rpc/local_rpc.h b/ydb/core/grpc_services/local_rpc/local_rpc.h
index b39996b84f3..b70b9b420ca 100644
--- a/ydb/core/grpc_services/local_rpc/local_rpc.h
+++ b/ydb/core/grpc_services/local_rpc/local_rpc.h
@@ -137,6 +137,10 @@ public:
return &Request;
}
+ google::protobuf::Message* GetRequestMut() override {
+ return &Request;
+ }
+
void SetClientLostAction(std::function<void()>&&) override {}
void AddServerHint(const TString&) override {}
diff --git a/ydb/core/grpc_services/rpc_deferrable.h b/ydb/core/grpc_services/rpc_deferrable.h
index 644e7b84c40..306f12620f4 100644
--- a/ydb/core/grpc_services/rpc_deferrable.h
+++ b/ydb/core/grpc_services/rpc_deferrable.h
@@ -47,6 +47,10 @@ public:
return TRequest::GetProtoRequest(Request_);
}
+ typename TRequest::TRequest* GetProtoRequestMut() {
+ return TRequest::GetProtoRequestMut(Request_);
+ }
+
Ydb::Operations::OperationParams::OperationMode GetOperationMode() const {
return GetProtoRequest()->operation_params().operation_mode();
}
diff --git a/ydb/core/grpc_services/rpc_execute_data_query.cpp b/ydb/core/grpc_services/rpc_execute_data_query.cpp
index 5f62ffefae6..dca4c52bcb9 100644
--- a/ydb/core/grpc_services/rpc_execute_data_query.cpp
+++ b/ydb/core/grpc_services/rpc_execute_data_query.cpp
@@ -57,6 +57,8 @@ public:
SetAuthToken(ev, *Request_);
SetDatabase(ev, *Request_);
+ ev->Record.MutableRequest()->SetUsePublicResponseDataFormat(true);
+
NYql::TIssues issues;
if (CheckSession(req->session_id(), issues)) {
ev->Record.MutableRequest()->SetSessionId(req->session_id());
@@ -77,9 +79,8 @@ public:
if (req->parametersSize() != 0) {
try {
- NKikimrMiniKQL::TParams params;
- ConvertYdbParamsToMiniKQLParams(req->parameters(), params);
- ev->Record.MutableRequest()->MutableParameters()->CopyFrom(params);
+ ConvertYdbParamsToMiniKQLParams(req->parameters(), *ev->Record.MutableRequest()->MutableParameters());
+ //ev->Record.MutableRequest()->MutableYdbParameters()->swap(*req->mutable_parameters());
} catch (const std::exception& ex) {
auto issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Failed to parse query parameters.");
issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(NYql::ExceptionToIssue(ex)));
@@ -220,11 +221,14 @@ public:
if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
const auto& kqpResponse = record.GetResponse();
const auto& issueMessage = kqpResponse.GetQueryIssues();
-
auto queryResult = TEvExecuteDataQueryRequest::AllocateResult<Ydb::Table::ExecuteQueryResult>(Request_);
try {
- ConvertKqpQueryResultsToDbResult(kqpResponse, queryResult);
+ if (kqpResponse.GetYdbResults().size()) {
+ queryResult->mutable_result_sets()->CopyFrom(kqpResponse.GetYdbResults());
+ } else {
+ NKqp::ConvertKqpQueryResultsToDbResult(kqpResponse, queryResult);
+ }
ConvertQueryStats(kqpResponse, queryResult);
if (kqpResponse.HasTxMeta()) {
queryResult->mutable_tx_meta()->CopyFrom(kqpResponse.GetTxMeta());
diff --git a/ydb/core/grpc_services/rpc_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_execute_yql_script.cpp
index 4bd8f86c9a5..1102709c57f 100644
--- a/ydb/core/grpc_services/rpc_execute_yql_script.cpp
+++ b/ydb/core/grpc_services/rpc_execute_yql_script.cpp
@@ -100,7 +100,7 @@ public:
auto queryResult = TEvExecuteYqlScriptRequest::AllocateResult<TResult>(Request_);
try {
- ConvertKqpQueryResultsToDbResult(kqpResponse, queryResult);
+ NKqp::ConvertKqpQueryResultsToDbResult(kqpResponse, queryResult);
} catch (const std::exception& ex) {
NYql::TIssues issues;
issues.AddIssue(NYql::ExceptionToIssue(ex));
diff --git a/ydb/core/grpc_services/rpc_kqp_base.h b/ydb/core/grpc_services/rpc_kqp_base.h
index 8f5787aa9f5..16da82b632e 100644
--- a/ydb/core/grpc_services/rpc_kqp_base.h
+++ b/ydb/core/grpc_services/rpc_kqp_base.h
@@ -74,44 +74,6 @@ inline bool CheckQuery(const TString& query, NYql::TIssues& issues) {
void FillQueryStats(Ydb::TableStats::QueryStats& queryStats, const NKikimrKqp::TQueryResponse& kqpResponse);
-inline void ConvertKqpQueryResultToDbResult(const NKikimrMiniKQL::TResult& from, Ydb::ResultSet* to) {
- const auto& type = from.GetType();
- TStackVec<NKikimrMiniKQL::TType> columnTypes;
- Y_ENSURE(type.GetKind() == NKikimrMiniKQL::ETypeKind::Struct);
- for (const auto& member : type.GetStruct().GetMember()) {
- if (member.GetType().GetKind() == NKikimrMiniKQL::ETypeKind::List) {
- for (const auto& column : member.GetType().GetList().GetItem().GetStruct().GetMember()) {
- auto columnMeta = to->add_columns();
- columnMeta->set_name(column.GetName());
- columnTypes.push_back(column.GetType());
- ConvertMiniKQLTypeToYdbType(column.GetType(), *columnMeta->mutable_type());
- }
- }
- }
- for (const auto& responseStruct : from.GetValue().GetStruct()) {
- for (const auto& row : responseStruct.GetList()) {
- auto newRow = to->add_rows();
- ui32 columnCount = static_cast<ui32>(row.StructSize());
- Y_ENSURE(columnCount == columnTypes.size());
- for (ui32 i = 0; i < columnCount; i++) {
- const auto& column = row.GetStruct(i);
- ConvertMiniKQLValueToYdbValue(columnTypes[i], column, *newRow->add_items());
- }
- }
- if (responseStruct.Getvalue_valueCase() == NKikimrMiniKQL::TValue::kBool) {
- to->set_truncated(responseStruct.GetBool());
- }
- }
-}
-
-template<typename TFrom, typename TTo>
-inline void ConvertKqpQueryResultsToDbResult(const TFrom& from, TTo* to) {
- const auto& results = from.GetResults();
- for (const auto& result : results) {
- ConvertKqpQueryResultToDbResult(result, to->add_result_sets());
- }
-}
-
template <typename TDerived, typename TRequest>
class TRpcKqpRequestActor : public TRpcOperationRequestActor<TDerived, TRequest> {
using TBase = TRpcOperationRequestActor<TDerived, TRequest>;
diff --git a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
index bc854fe3bb7..03c8a61cdd3 100644
--- a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
+++ b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
@@ -214,7 +214,7 @@ private:
auto result = response.mutable_result();
try {
- ConvertKqpQueryResultToDbResult(kqpResult, result->mutable_result_set());
+ NKqp::ConvertKqpQueryResultToDbResult(kqpResult, result->mutable_result_set());
} catch (std::exception ex) {
ReplyFinishStream(ex.what(), ctx);
}
diff --git a/ydb/core/kqp/kqp.h b/ydb/core/kqp/kqp.h
index 9168308a314..e34a7c46114 100644
--- a/ydb/core/kqp/kqp.h
+++ b/ydb/core/kqp/kqp.h
@@ -16,6 +16,16 @@
namespace NKikimr {
namespace NKqp {
+void ConvertKqpQueryResultToDbResult(const NKikimrMiniKQL::TResult& from, Ydb::ResultSet* to);
+
+template<typename TFrom, typename TTo>
+inline void ConvertKqpQueryResultsToDbResult(const TFrom& from, TTo* to) {
+ const auto& results = from.GetResults();
+ for (const auto& result : results) {
+ ConvertKqpQueryResultToDbResult(result, to->add_result_sets());
+ }
+}
+
enum class ETableReadType {
Other = 0,
Scan = 1,
diff --git a/ydb/core/kqp/kqp_response.cpp b/ydb/core/kqp/kqp_response.cpp
index 8d96beaac4b..d98da6d1b8b 100644
--- a/ydb/core/kqp/kqp_response.cpp
+++ b/ydb/core/kqp/kqp_response.cpp
@@ -1,4 +1,5 @@
#include "kqp_impl.h"
+#include <ydb/core/ydb_convert/ydb_convert.h>
#include <ydb/library/yql/public/issue/yql_issue_message.h>
@@ -51,6 +52,36 @@ bool HasSchemeOrFatalIssues(const TIssue& issue) {
} // namespace
+void ConvertKqpQueryResultToDbResult(const NKikimrMiniKQL::TResult& from, Ydb::ResultSet* to) {
+ const auto& type = from.GetType();
+ TStackVec<NKikimrMiniKQL::TType> columnTypes;
+ Y_ENSURE(type.GetKind() == NKikimrMiniKQL::ETypeKind::Struct);
+ for (const auto& member : type.GetStruct().GetMember()) {
+ if (member.GetType().GetKind() == NKikimrMiniKQL::ETypeKind::List) {
+ for (const auto& column : member.GetType().GetList().GetItem().GetStruct().GetMember()) {
+ auto columnMeta = to->add_columns();
+ columnMeta->set_name(column.GetName());
+ columnTypes.push_back(column.GetType());
+ ConvertMiniKQLTypeToYdbType(column.GetType(), *columnMeta->mutable_type());
+ }
+ }
+ }
+ for (const auto& responseStruct : from.GetValue().GetStruct()) {
+ for (const auto& row : responseStruct.GetList()) {
+ auto newRow = to->add_rows();
+ ui32 columnCount = static_cast<ui32>(row.StructSize());
+ Y_ENSURE(columnCount == columnTypes.size());
+ for (ui32 i = 0; i < columnCount; i++) {
+ const auto& column = row.GetStruct(i);
+ ConvertMiniKQLValueToYdbValue(columnTypes[i], column, *newRow->add_items());
+ }
+ }
+ if (responseStruct.Getvalue_valueCase() == NKikimrMiniKQL::TValue::kBool) {
+ to->set_truncated(responseStruct.GetBool());
+ }
+ }
+}
+
TMaybe<Ydb::StatusIds::StatusCode> GetYdbStatus(const TIssue& issue) {
if (issue.GetSeverity() == TSeverityIds::S_FATAL) {
return Ydb::StatusIds::INTERNAL_ERROR;
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp
index 2fb94cabe65..70357b9320b 100644
--- a/ydb/core/kqp/kqp_session_actor.cpp
+++ b/ydb/core/kqp/kqp_session_actor.cpp
@@ -12,6 +12,7 @@
#include <ydb/core/kqp/provider/yql_kikimr_provider.h>
#include <ydb/core/kqp/provider/yql_kikimr_results.h>
#include <ydb/core/kqp/rm/kqp_snapshot_manager.h>
+#include <ydb/core/ydb_convert/ydb_convert.h>
#include <ydb/core/util/ulid.h>
@@ -334,6 +335,16 @@ public:
QueryState->StartTime = TInstant::Now();
QueryState->UserToken = event.GetUserToken();
QueryState->QueryDeadlines = GetQueryDeadlines(queryRequest);
+
+ if (!queryRequest.HasParameters() && queryRequest.YdbParametersSize()) {
+ try {
+ ConvertYdbParamsToMiniKQLParams(queryRequest.GetYdbParameters(), *queryRequest.MutableParameters());
+ } catch (const std::exception& ex) {
+ ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_REQUEST)
+ << "Failed to parse query parameters. "<< ex.what();
+ }
+ }
+
QueryState->ParametersSize = queryRequest.GetParameters().ByteSize();
QueryState->RequestActorId = ActorIdFromProto(event.GetRequestActorId());
QueryState->KeepSession = Settings.LongSession || queryRequest.GetKeepSession();
@@ -1460,6 +1471,8 @@ public:
response->SetPreparedQuery(queryId);
}
+ bool useYdbResponseFormat = QueryState->Request.GetUsePublicResponseDataFormat();
+
if (QueryState->PreparedQuery) {
auto& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
for (size_t i = 0; i < phyQuery.ResultBindingsSize(); ++i) {
@@ -1484,7 +1497,11 @@ public:
auto* protoRes = KikimrResultToProto(txResults[txIndex][resultIndex], {},
fillSettings.value_or(FillSettings), arena.get());
- response->AddResults()->Swap(protoRes);
+ if (useYdbResponseFormat) {
+ ConvertKqpQueryResultToDbResult(*protoRes, response->AddYdbResults());
+ } else {
+ response->AddResults()->Swap(protoRes);
+ }
}
}
diff --git a/ydb/core/kqp/proxy/kqp_proxy_service.cpp b/ydb/core/kqp/proxy/kqp_proxy_service.cpp
index 3ccce5cef4e..034b69f16ab 100644
--- a/ydb/core/kqp/proxy/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy/kqp_proxy_service.cpp
@@ -18,6 +18,7 @@
#include <ydb/core/actorlib_impl/long_timer.h>
#include <ydb/public/lib/operation_id/operation_id.h>
#include <ydb/core/node_whiteboard/node_whiteboard.h>
+#include <ydb/core/ydb_convert/ydb_convert.h>
#include <ydb/library/yql/utils/actor_log/log.h>
#include <ydb/library/yql/core/services/mounts/yql_mounts.h>
@@ -522,6 +523,8 @@ public:
if (sessionInfo) {
targetId = sessionInfo->WorkerId;
} else {
+ // No local session. Disable public format due to compatibility.
+ request.SetUsePublicResponseDataFormat(false);
targetId = TryGetSessionTargetActor(request.GetSessionId(), requestInfo, requestId);
if (!targetId) {
return;
diff --git a/ydb/core/kqp/proxy/kqp_proxy_service.h b/ydb/core/kqp/proxy/kqp_proxy_service.h
index e6d44a176ca..e8fec4eed61 100644
--- a/ydb/core/kqp/proxy/kqp_proxy_service.h
+++ b/ydb/core/kqp/proxy/kqp_proxy_service.h
@@ -155,7 +155,8 @@ public:
ReadySessions[1].push_back(sessionId);
}
- auto result = LocalSessions.emplace(sessionId, TKqpSessionInfo(sessionId, workerId, database, dbCounters, std::move(pos)));
+ auto result = LocalSessions.emplace(sessionId,
+ TKqpSessionInfo(sessionId, workerId, database, dbCounters, std::move(pos)));
SessionsCountPerDatabase[database]++;
Y_VERIFY(result.second, "Duplicate session id!");
TargetIdIndex.emplace(workerId, sessionId);
diff --git a/ydb/core/kqp/ut/kqp_params_ut.cpp b/ydb/core/kqp/ut/kqp_params_ut.cpp
index 021f97e2d71..d00d5ddbb54 100644
--- a/ydb/core/kqp/ut/kqp_params_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_params_ut.cpp
@@ -274,8 +274,8 @@ Y_UNIT_TEST_SUITE(KqpParams) {
UNIT_ASSERT_C(actual == expected1 || actual == expected2, "expected: " << expected1 << ", got: " << actual);
}
- Y_UNIT_TEST_NEW_ENGINE(InvalidJson) {
- TKikimrRunner kikimr;
+ Y_UNIT_TEST_QUAD(InvalidJson, UseNewEngine, UseSessionActor) {
+ auto kikimr = KikimrRunnerEnableSessionActor(UseNewEngine && UseSessionActor);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index 8476ed6d0a0..143c37f74c1 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -102,6 +102,8 @@ message TQueryRequest {
optional NYql.NDqProto.EDqStatsMode StatsMode = 20; // deprecated
optional Ydb.Table.QueryStatsCollection.Mode CollectStats = 21;
optional TTopicOperations TopicOperations = 22;
+ optional bool UsePublicResponseDataFormat = 23;
+ map<string, Ydb.TypedValue> YdbParameters = 24;
}
message TKqpPathIdProto {
@@ -262,6 +264,7 @@ message TQueryResponse {
repeated TParameterDescription QueryParameters = 10;
optional Ydb.Table.TransactionMeta TxMeta = 11;
optional NKqpProto.TKqpStatsQuery QueryStats = 12;
+ repeated Ydb.ResultSet YdbResults = 13;
}
message TEvQueryResponse {
@@ -383,6 +386,7 @@ message TExecuterTxResult {
optional NKikimrMiniKQL.TResult Locks = 4;
reserved 5; // (deprecated) Stats
optional NYql.NDqProto.TDqExecutionStats Stats = 6;
+ repeated Ydb.ResultSet YdbResults = 7;
};
message TExecuterTxResponse {
diff --git a/ydb/core/public_http/grpc_request_context_wrapper.cpp b/ydb/core/public_http/grpc_request_context_wrapper.cpp
index 2b6913259e1..a7b0225c4ae 100644
--- a/ydb/core/public_http/grpc_request_context_wrapper.cpp
+++ b/ydb/core/public_http/grpc_request_context_wrapper.cpp
@@ -19,6 +19,10 @@ namespace NKikimr::NPublicHttp {
return Request.get();
}
+ NProtoBuf::Message* TGrpcRequestContextWrapper::GetRequestMut() {
+ return Request.get();
+ }
+
NGrpc::TAuthState& TGrpcRequestContextWrapper::GetAuthState() {
return AuthState;
}
diff --git a/ydb/core/public_http/grpc_request_context_wrapper.h b/ydb/core/public_http/grpc_request_context_wrapper.h
index 70c11898726..25605e2c20c 100644
--- a/ydb/core/public_http/grpc_request_context_wrapper.h
+++ b/ydb/core/public_http/grpc_request_context_wrapper.h
@@ -25,6 +25,7 @@ private:
public:
TGrpcRequestContextWrapper(const THttpRequestContext& requestContext, std::unique_ptr<NProtoBuf::Message> request, TReplySender replySender);
virtual const NProtoBuf::Message* GetRequest() const;
+ virtual NProtoBuf::Message* GetRequestMut();
virtual NGrpc::TAuthState& GetAuthState();
virtual void Reply(NProtoBuf::Message* resp, ui32 status = 0);
virtual void Reply(grpc::ByteBuffer* resp, ui32 status = 0);
diff --git a/ydb/core/viewer/json_query.h b/ydb/core/viewer/json_query.h
index fc7566e71cf..abc35c8b60b 100644
--- a/ydb/core/viewer/json_query.h
+++ b/ydb/core/viewer/json_query.h
@@ -295,7 +295,7 @@ private:
try {
for (const auto& result : response.GetResults()) {
Ydb::ResultSet resultSet;
- NGRpcService::ConvertKqpQueryResultToDbResult(result, &resultSet);
+ NKqp::ConvertKqpQueryResultToDbResult(result, &resultSet);
ResultSets.emplace_back(std::move(resultSet));
}
}