aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorspuchin <spuchin@ydb.tech>2022-12-27 13:32:14 +0300
committerspuchin <spuchin@ydb.tech>2022-12-27 13:32:14 +0300
commit4c087567ebe51fc9ca768f5e5cc1fa54e234083c (patch)
tree7ea613767195c90c81c4fe0d752fec52e7362faf
parent6430ddcd067d7ce86670847f0cd6eed645978b7b (diff)
downloadydb-4c087567ebe51fc9ca768f5e5cc1fa54e234083c.tar.gz
ExecuteQuery stub. ()
-rw-r--r--ydb/core/driver_lib/run/run.cpp8
-rw-r--r--ydb/core/grpc_services/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/grpc_services/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/grpc_services/CMakeLists.linux.txt1
-rw-r--r--ydb/core/grpc_services/base/base_service.h2
-rw-r--r--ydb/core/grpc_services/rpc_execute_query.cpp334
-rw-r--r--ydb/core/grpc_services/service_query.h13
-rw-r--r--ydb/core/kqp/ut/common/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/kqp/ut/common/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/ut/common/CMakeLists.linux.txt1
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.h11
-rw-r--r--ydb/core/kqp/ut/service/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/kqp/ut/service/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/ut/service/CMakeLists.linux.txt1
-rw-r--r--ydb/core/kqp/ut/service/kqp_query_service_ut.cpp50
-rw-r--r--ydb/core/testlib/test_client.cpp2
-rw-r--r--ydb/public/sdk/cpp/client/draft/CMakeLists.darwin.txt1
-rw-r--r--ydb/public/sdk/cpp/client/draft/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/public/sdk/cpp/client/draft/CMakeLists.linux.txt1
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.darwin.txt22
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.linux-aarch64.txt23
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.linux.txt23
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.txt15
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp49
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/client.h35
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/impl/CMakeLists.darwin.txt21
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/impl/CMakeLists.linux-aarch64.txt22
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/impl/CMakeLists.linux.txt22
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/impl/CMakeLists.txt15
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp215
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.h19
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/query.cpp17
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/query.h81
-rw-r--r--ydb/services/ydb/CMakeLists.darwin.txt1
-rw-r--r--ydb/services/ydb/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/services/ydb/CMakeLists.linux.txt1
-rw-r--r--ydb/services/ydb/ydb_query.cpp35
-rw-r--r--ydb/services/ydb/ydb_query.h18
38 files changed, 1064 insertions, 3 deletions
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp
index 6dec2e73f6..c35be9b29e 100644
--- a/ydb/core/driver_lib/run/run.cpp
+++ b/ydb/core/driver_lib/run/run.cpp
@@ -100,6 +100,7 @@
#include <ydb/services/ydb/ydb_logstore.h>
#include <ydb/services/ydb/ydb_long_tx.h>
#include <ydb/services/ydb/ydb_operation.h>
+#include <ydb/services/ydb/ydb_query.h>
#include <ydb/services/ydb/ydb_scheme.h>
#include <ydb/services/ydb/ydb_scripting.h>
#include <ydb/services/ydb/ydb_table.h>
@@ -577,6 +578,8 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
names["logstore"] = &hasLogStore;
TServiceCfg hasAuth = services.empty();
names["auth"] = &hasAuth;
+ TServiceCfg hasQueryService = services.empty();
+ names["query_service"] = &hasQueryService;
std::unordered_set<TString> enabled;
for (const auto& name : services) {
@@ -805,6 +808,11 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
server.AddService(new NGRpcService::TGRpcFqPrivateTaskService(ActorSystem.Get(), Counters, grpcRequestProxyId));
}
+ if (hasQueryService) {
+ server.AddService(new NGRpcService::TGRpcYdbQueryService(ActorSystem.Get(), Counters,
+ grpcRequestProxyId, hasDataStreams.IsRlAllowed()));
+ }
+
if (hasLogStore) {
server.AddService(new NGRpcService::TGRpcYdbLogStoreService(ActorSystem.Get(), Counters,
grpcRequestProxyId, hasLogStore.IsRlAllowed()));
diff --git a/ydb/core/grpc_services/CMakeLists.darwin.txt b/ydb/core/grpc_services/CMakeLists.darwin.txt
index e42e68aba8..5ce37acb12 100644
--- a/ydb/core/grpc_services/CMakeLists.darwin.txt
+++ b/ydb/core/grpc_services/CMakeLists.darwin.txt
@@ -87,6 +87,7 @@ target_sources(ydb-core-grpc_services PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_drop_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_discovery.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_execute_data_query.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_execute_query.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_execute_scheme_query.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_execute_yql_script.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_explain_yql_script.cpp
diff --git a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt
index 6d1930bcca..c5bdf1447c 100644
--- a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt
@@ -88,6 +88,7 @@ target_sources(ydb-core-grpc_services PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_drop_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_discovery.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_execute_data_query.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_execute_query.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_execute_scheme_query.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_execute_yql_script.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_explain_yql_script.cpp
diff --git a/ydb/core/grpc_services/CMakeLists.linux.txt b/ydb/core/grpc_services/CMakeLists.linux.txt
index 6d1930bcca..c5bdf1447c 100644
--- a/ydb/core/grpc_services/CMakeLists.linux.txt
+++ b/ydb/core/grpc_services/CMakeLists.linux.txt
@@ -88,6 +88,7 @@ target_sources(ydb-core-grpc_services PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_drop_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_discovery.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_execute_data_query.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_execute_query.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_execute_scheme_query.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_execute_yql_script.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_explain_yql_script.cpp
diff --git a/ydb/core/grpc_services/base/base_service.h b/ydb/core/grpc_services/base/base_service.h
index 52db0b46df..1c6f6f6432 100644
--- a/ydb/core/grpc_services/base/base_service.h
+++ b/ydb/core/grpc_services/base/base_service.h
@@ -1,6 +1,8 @@
#pragma once
+#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/grpc/server/grpc_request_base.h>
+#include <library/cpp/grpc/server/grpc_server.h>
#include <library/cpp/grpc/server/logger.h>
namespace NKikimr {
diff --git a/ydb/core/grpc_services/rpc_execute_query.cpp b/ydb/core/grpc_services/rpc_execute_query.cpp
new file mode 100644
index 0000000000..cf57107c11
--- /dev/null
+++ b/ydb/core/grpc_services/rpc_execute_query.cpp
@@ -0,0 +1,334 @@
+#include "service_query.h"
+
+#include "rpc_kqp_base.h"
+
+#include <ydb/core/actorlib_impl/long_timer.h>
+#include <ydb/core/base/appdata.h>
+#include <ydb/core/base/kikimr_issue.h>
+#include <ydb/core/grpc_services/base/base.h>
+#include <ydb/core/kqp/executer_actor/kqp_executer.h>
+#include <ydb/public/api/protos/draft/ydb_query.pb.h>
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+
+namespace NKikimr::NGRpcService {
+
+namespace {
+
+using namespace NActors;
+
+using TEvExecuteQueryRequest = TGrpcRequestNoOperationCall<Ydb::Query::ExecuteQueryRequest,
+ Ydb::Query::ExecuteQueryResponsePart>;
+
+std::tuple<Ydb::StatusIds::StatusCode, NYql::TIssues> FillKqpRequest(
+ const Ydb::Query::ExecuteQueryRequest& req, NKikimrKqp::TEvQueryRequest& kqpRequest)
+{
+ kqpRequest.MutableRequest()->MutableYdbParameters()->insert(req.parameters().begin(), req.parameters().end());
+ switch (req.exec_mode()) {
+ case Ydb::Query::EXEC_MODE_EXECUTE:
+ kqpRequest.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
+ break;
+ default: {
+ NYql::TIssues issues;
+ issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Unexpected query mode"));
+ return {Ydb::StatusIds::BAD_REQUEST, issues};
+ }
+ }
+
+ // TODO: Use new type of query (QUERY_TYPE_SQL_QUERY)
+ kqpRequest.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_SCAN);
+ kqpRequest.MutableRequest()->SetKeepSession(false);
+
+ switch (req.query_case()) {
+ case Ydb::Query::ExecuteQueryRequest::kQueryContent: {
+ NYql::TIssues issues;
+ if (!CheckQuery(req.query_content().text(), issues)) {
+ return {Ydb::StatusIds::BAD_REQUEST, issues};
+ }
+
+ kqpRequest.MutableRequest()->SetQuery(req.query_content().text());
+ break;
+ }
+
+ default: {
+ NYql::TIssues issues;
+ issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Unexpected query option"));
+ return {Ydb::StatusIds::BAD_REQUEST, issues};
+ }
+ }
+
+ return {Ydb::StatusIds::SUCCESS, {}};
+}
+
+class RpcFlowControlState {
+public:
+ RpcFlowControlState(ui64 inflightLimitBytes)
+ : InflightLimitBytes_(inflightLimitBytes) {}
+
+ void PushResponse(ui64 responseSizeBytes) {
+ ResponseSizeQueue_.push(responseSizeBytes);
+ TotalResponsesSize_ += responseSizeBytes;
+ }
+
+ void PopResponse() {
+ Y_ENSURE(!ResponseSizeQueue_.empty());
+ TotalResponsesSize_ -= ResponseSizeQueue_.front();
+ ResponseSizeQueue_.pop();
+ }
+
+ size_t QueueSize() const {
+ return ResponseSizeQueue_.size();
+ }
+
+ ui64 FreeSpaceBytes() const {
+ return TotalResponsesSize_ < InflightLimitBytes_
+ ? InflightLimitBytes_ - TotalResponsesSize_
+ : 0;
+ }
+
+ ui64 InflightBytes() const {
+ return TotalResponsesSize_;
+ }
+
+ ui64 InflightLimitBytes() const {
+ return InflightLimitBytes_;
+ }
+
+private:
+ const ui64 InflightLimitBytes_;
+
+ TQueue<ui64> ResponseSizeQueue_;
+ ui64 TotalResponsesSize_ = 0;
+};
+
+class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
+public:
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::GRPC_STREAM_REQ;
+ }
+
+ TExecuteQueryRPC(TEvExecuteQueryRequest* request, ui64 inflightLimitBytes)
+ : Request_(request)
+ , FlowControl_(inflightLimitBytes) {}
+
+ void Bootstrap(const TActorContext &ctx) {
+ this->Become(&TExecuteQueryRPC::StateWork);
+
+ auto selfId = this->SelfId();
+ auto as = TActivationContext::ActorSystem();
+
+ Request_->SetClientLostAction([selfId, as]() {
+ as->Send(selfId, new TEvents::TEvWakeup(EWakeupTag::ClientLostTag));
+ });
+
+ Request_->SetStreamingNotify([selfId, as](size_t left) {
+ as->Send(selfId, new TRpcServices::TEvGrpcNextReply(left));
+ });
+
+ Proceed(ctx);
+ }
+
+private:
+ void StateWork(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) {
+ try {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(TEvents::TEvWakeup, Handle);
+ HFunc(TRpcServices::TEvGrpcNextReply, Handle);
+ HFunc(NKqp::TEvKqpExecuter::TEvExecuterProgress, Handle);
+ HFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle);
+ HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle);
+ default:
+ UnexpectedEvent(__func__, ev, ctx);
+ }
+ } catch (const yexception& ex) {
+ InternalError(ex.what(), ctx);
+ }
+ }
+
+ void Proceed(const TActorContext &ctx) {
+ const auto req = Request_->GetProtoRequest();
+ const auto traceId = Request_->GetTraceId();
+
+ auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
+ SetAuthToken(ev, *Request_);
+ SetDatabase(ev, *Request_);
+ SetRlPath(ev, *Request_);
+
+ if (traceId) {
+ ev->Record.SetTraceId(traceId.GetRef());
+ }
+
+ ActorIdToProto(this->SelfId(), ev->Record.MutableRequestActorId());
+
+ auto [fillStatus, fillIssues] = FillKqpRequest(*req, ev->Record);
+ if (fillStatus != Ydb::StatusIds::SUCCESS) {
+ return ReplyFinishStream(fillStatus, fillIssues, ctx);
+ }
+
+ if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release())) {
+ NYql::TIssues issues;
+ issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Internal error"));
+ ReplyFinishStream(Ydb::StatusIds::INTERNAL_ERROR, issues, ctx);
+ }
+ }
+
+ void Handle(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx) {
+ switch ((EWakeupTag) ev->Get()->Tag) {
+ case EWakeupTag::ClientLostTag:
+ return HandleClientLost(ctx);
+ }
+ }
+
+ void Handle(TRpcServices::TEvGrpcNextReply::TPtr& ev, const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " NextReply"
+ << ", left: " << ev->Get()->LeftInQueue
+ << ", queue: " << FlowControl_.QueueSize()
+ << ", inflight bytes: " << FlowControl_.InflightBytes()
+ << ", limit bytes: " << FlowControl_.InflightLimitBytes());
+
+ while (FlowControl_.QueueSize() > ev->Get()->LeftInQueue) {
+ FlowControl_.PopResponse();
+ }
+
+ ui64 freeSpaceBytes = FlowControl_.FreeSpaceBytes();
+ if (ResumeWithSeqNo_ && freeSpaceBytes > 0) {
+ LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << "Resume execution, "
+ << ", seqNo: " << *ResumeWithSeqNo_
+ << ", freeSpace: " << freeSpaceBytes
+ << ", executer: " << ExecuterActorId_);
+
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
+ resp->Record.SetSeqNo(*ResumeWithSeqNo_);
+ resp->Record.SetFreeSpace(freeSpaceBytes);
+
+ ctx.Send(ExecuterActorId_, resp.Release());
+
+ ResumeWithSeqNo_.Clear();
+ }
+ }
+
+ void Handle(NKqp::TEvKqpExecuter::TEvExecuterProgress::TPtr& ev, const TActorContext& ctx) {
+ ExecuterActorId_ = ActorIdFromProto(ev->Get()->Record.GetExecuterActorId());
+ LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << "ExecuterActorId: " << ExecuterActorId_);
+ }
+
+ void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev, const TActorContext& ctx) {
+ Ydb::Query::ExecuteQueryResponsePart response;
+ response.set_status(Ydb::StatusIds::SUCCESS);
+ response.set_result_set_index(0);
+ response.mutable_result_set()->Swap(ev->Get()->Record.MutableResultSet());
+
+ TString out;
+ Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);
+
+ Request_->SendSerializedResult(std::move(out), Ydb::StatusIds::SUCCESS);
+
+ auto freeSpaceBytes = FlowControl_.FreeSpaceBytes();
+ if (freeSpaceBytes == 0) {
+ ResumeWithSeqNo_ = ev->Get()->Record.GetSeqNo();
+ }
+
+ LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << "Send stream data ack"
+ << ", seqNo: " << ev->Get()->Record.GetSeqNo()
+ << ", freeSpace: " << freeSpaceBytes
+ << ", to: " << ev->Sender
+ << ", queue: " << FlowControl_.QueueSize());
+
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
+ resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
+ resp->Record.SetFreeSpace(freeSpaceBytes);
+
+ ctx.Send(ev->Sender, resp.Release());
+ }
+
+ void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
+ auto& record = ev->Get()->Record.GetRef();
+
+ NYql::TIssues issues;
+ const auto& issueMessage = record.GetResponse().GetQueryIssues();
+ NYql::IssuesFromMessage(issueMessage, issues);
+
+ ReplyFinishStream(record.GetYdbStatus(), issues, ctx);
+ }
+
+private:
+ void HandleClientLost(const TActorContext& ctx) {
+ // TODO: Abort query execution.
+ Y_UNUSED(ctx);
+ }
+
+ void ReplyFinishStream(Ydb::StatusIds::StatusCode status, const NYql::TIssue& issue, const TActorContext& ctx) {
+ google::protobuf::RepeatedPtrField<TYdbIssueMessageType> issuesMessage;
+ NYql::IssueToMessage(issue, issuesMessage.Add());
+
+ ReplyFinishStream(status, issuesMessage, ctx);
+ }
+
+ void ReplyFinishStream(Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues, const TActorContext& ctx) {
+ google::protobuf::RepeatedPtrField<TYdbIssueMessageType> issuesMessage;
+ for (auto& issue : issues) {
+ auto item = issuesMessage.Add();
+ NYql::IssueToMessage(issue, item);
+ }
+
+ ReplyFinishStream(status, issuesMessage, ctx);
+ }
+
+ void ReplyFinishStream(Ydb::StatusIds::StatusCode status,
+ const google::protobuf::RepeatedPtrField<TYdbIssueMessageType>& message, const TActorContext& ctx)
+ {
+ LOG_INFO_S(ctx, NKikimrServices::RPC_REQUEST, "Finish grpc stream, status: "
+ << Ydb::StatusIds::StatusCode_Name(status));
+
+ // Skip sending empty result in case of success status - simplify client logic
+ if (status != Ydb::StatusIds::SUCCESS) {
+ TString out;
+ Ydb::Query::ExecuteQueryResponsePart response;
+ response.set_status(status);
+ response.mutable_issues()->CopyFrom(message);
+ Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);
+ Request_->SendSerializedResult(std::move(out), status);
+ }
+
+ Request_->FinishStream();
+ this->PassAway();
+ }
+
+ void InternalError(const TString& message, const TActorContext& ctx) {
+ LOG_ERROR_S(ctx, NKikimrServices::RPC_REQUEST, "Internal error, message: " << message);
+
+ auto issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, message);
+ ReplyFinishStream(Ydb::StatusIds::INTERNAL_ERROR, issue, ctx);
+ }
+
+ void UnexpectedEvent(const TString& state, TAutoPtr<NActors::IEventHandle>& ev, const TActorContext& ctx) {
+ InternalError(TStringBuilder() << "TExecuteQueryRPC in state " << state << " received unexpected event " <<
+ TypeName(*ev.Get()->GetBase()) << Sprintf("(0x%08" PRIx32 ")", ev->GetTypeRewrite()), ctx);
+ }
+
+private:
+ enum EWakeupTag : ui64 {
+ ClientLostTag = 1,
+ };
+
+private:
+ std::unique_ptr<TEvExecuteQueryRequest> Request_;
+
+ RpcFlowControlState FlowControl_;
+ TMaybe<ui64> ResumeWithSeqNo_;
+
+ TActorId ExecuterActorId_;
+};
+
+} // namespace
+
+void DoExecuteQueryRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f) {
+ // Use default channel buffer size as inflight limit
+ ui64 inflightLimitBytes = f.GetAppConfig().GetTableServiceConfig().GetResourceManager().GetChannelBufferSize();
+
+ auto* req = dynamic_cast<TEvExecuteQueryRequest*>(p.release());
+ Y_VERIFY(req != nullptr, "Wrong using of TGRpcRequestWrapper");
+ TActivationContext::AsActorContext().Register(new TExecuteQueryRPC(req, inflightLimitBytes));
+}
+
+} // namespace NKikimr::NGRpcService
diff --git a/ydb/core/grpc_services/service_query.h b/ydb/core/grpc_services/service_query.h
new file mode 100644
index 0000000000..fb6740648f
--- /dev/null
+++ b/ydb/core/grpc_services/service_query.h
@@ -0,0 +1,13 @@
+#pragma once
+
+#include <memory>
+
+namespace NKikimr::NGRpcService {
+
+class IRequestOpCtx;
+class IRequestNoOpCtx;
+class IFacilityProvider;
+
+void DoExecuteQueryRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider&);
+
+} // namespace NKikimr::NGRpcService
diff --git a/ydb/core/kqp/ut/common/CMakeLists.darwin.txt b/ydb/core/kqp/ut/common/CMakeLists.darwin.txt
index 5b3d6bc3d8..ca9b96e849 100644
--- a/ydb/core/kqp/ut/common/CMakeLists.darwin.txt
+++ b/ydb/core/kqp/ut/common/CMakeLists.darwin.txt
@@ -19,6 +19,7 @@ target_link_libraries(kqp-ut-common PUBLIC
yql-utils-backtrace
public-lib-yson_value
cpp-client-draft
+ client-draft-ydb_query
cpp-client-ydb_proto
cpp-client-ydb_scheme
cpp-client-ydb_table
diff --git a/ydb/core/kqp/ut/common/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/ut/common/CMakeLists.linux-aarch64.txt
index eab66e487a..4f6c34f949 100644
--- a/ydb/core/kqp/ut/common/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/ut/common/CMakeLists.linux-aarch64.txt
@@ -20,6 +20,7 @@ target_link_libraries(kqp-ut-common PUBLIC
yql-utils-backtrace
public-lib-yson_value
cpp-client-draft
+ client-draft-ydb_query
cpp-client-ydb_proto
cpp-client-ydb_scheme
cpp-client-ydb_table
diff --git a/ydb/core/kqp/ut/common/CMakeLists.linux.txt b/ydb/core/kqp/ut/common/CMakeLists.linux.txt
index eab66e487a..4f6c34f949 100644
--- a/ydb/core/kqp/ut/common/CMakeLists.linux.txt
+++ b/ydb/core/kqp/ut/common/CMakeLists.linux.txt
@@ -20,6 +20,7 @@ target_link_libraries(kqp-ut-common PUBLIC
yql-utils-backtrace
public-lib-yson_value
cpp-client-draft
+ client-draft-ydb_query
cpp-client-ydb_proto
cpp-client-ydb_scheme
cpp-client-ydb_table
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h
index d2a1dedd85..558cdc6491 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.h
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.h
@@ -6,6 +6,7 @@
#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <ydb/public/sdk/cpp/client/draft/ydb_scripting.h>
+#include <ydb/public/sdk/cpp/client/draft/ydb_query/client.h>
#include <library/cpp/yson/node/node_io.h>
@@ -51,14 +52,14 @@
void N(NUnitTest::TTestContext&)
template <bool ForceVersionV1>
-TString Query(const TString& tmpl) {
+TString MakeQuery(const TString& tmpl) {
return TStringBuilder()
<< (ForceVersionV1 ? "--!syntax_v1\n" : "")
<< tmpl;
}
-#define Q_(expr) Query<false>(expr)
-#define Q1_(expr) Query<true>(expr)
+#define Q_(expr) MakeQuery<false>(expr)
+#define Q1_(expr) MakeQuery<true>(expr)
namespace NKikimr {
namespace NKqp {
@@ -133,6 +134,10 @@ public:
.UseQueryCache(false));
}
+ NYdb::NQuery::TQueryClient GetQueryClient() const {
+ return NYdb::NQuery::TQueryClient(*Driver);
+ }
+
bool IsUsingSnapshotReads() const {
return Server->GetRuntime()->GetAppData().FeatureFlags.GetEnableMvccSnapshotReads();
}
diff --git a/ydb/core/kqp/ut/service/CMakeLists.darwin.txt b/ydb/core/kqp/ut/service/CMakeLists.darwin.txt
index 2072a651c4..dd508390b9 100644
--- a/ydb/core/kqp/ut/service/CMakeLists.darwin.txt
+++ b/ydb/core/kqp/ut/service/CMakeLists.darwin.txt
@@ -34,6 +34,7 @@ target_link_options(ydb-core-kqp-ut-service PRIVATE
)
target_sources(ydb-core-kqp-ut-service PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/service/kqp_document_api_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/service/kqp_service_ut.cpp
)
add_test(
diff --git a/ydb/core/kqp/ut/service/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/ut/service/CMakeLists.linux-aarch64.txt
index 291528848c..40ec0e2702 100644
--- a/ydb/core/kqp/ut/service/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/ut/service/CMakeLists.linux-aarch64.txt
@@ -37,6 +37,7 @@ target_link_options(ydb-core-kqp-ut-service PRIVATE
)
target_sources(ydb-core-kqp-ut-service PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/service/kqp_document_api_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/service/kqp_service_ut.cpp
)
add_test(
diff --git a/ydb/core/kqp/ut/service/CMakeLists.linux.txt b/ydb/core/kqp/ut/service/CMakeLists.linux.txt
index 0083566c3e..43db5b00b2 100644
--- a/ydb/core/kqp/ut/service/CMakeLists.linux.txt
+++ b/ydb/core/kqp/ut/service/CMakeLists.linux.txt
@@ -39,6 +39,7 @@ target_link_options(ydb-core-kqp-ut-service PRIVATE
)
target_sources(ydb-core-kqp-ut-service PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/service/kqp_document_api_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/service/kqp_service_ut.cpp
)
add_test(
diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
new file mode 100644
index 0000000000..d21ad049b7
--- /dev/null
+++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
@@ -0,0 +1,50 @@
+#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
+
+namespace NKikimr {
+namespace NKqp {
+
+using namespace NYdb;
+using namespace NYdb::NQuery;
+
+Y_UNIT_TEST_SUITE(KqpQueryService) {
+ Y_UNIT_TEST(StreamExecuteQuery) {
+ auto kikimr = DefaultKikimrRunner();
+ auto db = kikimr.GetQueryClient();
+
+ auto it = db.StreamExecuteQuery(R"(
+ SELECT 1;
+ )").ExtractValueSync();
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+
+ ui64 count = 0;
+ for (;;) {
+ auto streamPart = it.ReadNext().GetValueSync();
+ if (!streamPart.IsSuccess()) {
+ UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString());
+ break;
+ }
+
+ if (streamPart.HasResultSet()) {
+ auto resultSet = streamPart.ExtractResultSet();
+ count += resultSet.RowsCount();
+ }
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(count, 1);
+ }
+
+ Y_UNIT_TEST(ExecuteQuery) {
+ auto kikimr = DefaultKikimrRunner();
+ auto db = kikimr.GetQueryClient();
+
+ auto result = db.ExecuteQuery(R"(
+ SELECT 1;
+ )").ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ CompareYson(R"([[1]])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+}
+
+} // namespace NKqp
+} // namespace NKikimr
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index 83e7240c3a..0efb648e4b 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -19,6 +19,7 @@
#include <ydb/services/ydb/ydb_export.h>
#include <ydb/services/ydb/ydb_import.h>
#include <ydb/services/ydb/ydb_operation.h>
+#include <ydb/services/ydb/ydb_query.h>
#include <ydb/services/ydb/ydb_scheme.h>
#include <ydb/services/ydb/ydb_scripting.h>
#include <ydb/services/ydb/ydb_table.h>
@@ -338,6 +339,7 @@ namespace Tests {
GRpcServer->AddService(new NGRpcService::TGRpcYdbLongTxService(system, counters, grpcRequestProxyId, true));
GRpcServer->AddService(new NGRpcService::TGRpcDataStreamsService(system, counters, grpcRequestProxyId, true));
GRpcServer->AddService(new NGRpcService::TGRpcMonitoringService(system, counters, grpcRequestProxyId, true));
+ GRpcServer->AddService(new NGRpcService::TGRpcYdbQueryService(system, counters, grpcRequestProxyId, true));
if (Settings->EnableYq) {
GRpcServer->AddService(new NGRpcService::TGRpcYandexQueryService(system, counters, grpcRequestProxyId));
GRpcServer->AddService(new NGRpcService::TGRpcFederatedQueryService(system, counters, grpcRequestProxyId));
diff --git a/ydb/public/sdk/cpp/client/draft/CMakeLists.darwin.txt b/ydb/public/sdk/cpp/client/draft/CMakeLists.darwin.txt
index de0df9620e..88788ca375 100644
--- a/ydb/public/sdk/cpp/client/draft/CMakeLists.darwin.txt
+++ b/ydb/public/sdk/cpp/client/draft/CMakeLists.darwin.txt
@@ -7,6 +7,7 @@
add_subdirectory(ut)
+add_subdirectory(ydb_query)
add_library(cpp-client-draft)
target_link_libraries(cpp-client-draft PUBLIC
diff --git a/ydb/public/sdk/cpp/client/draft/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/draft/CMakeLists.linux-aarch64.txt
index 6dbee5efe4..1d43bb8918 100644
--- a/ydb/public/sdk/cpp/client/draft/CMakeLists.linux-aarch64.txt
+++ b/ydb/public/sdk/cpp/client/draft/CMakeLists.linux-aarch64.txt
@@ -7,6 +7,7 @@
add_subdirectory(ut)
+add_subdirectory(ydb_query)
add_library(cpp-client-draft)
target_link_libraries(cpp-client-draft PUBLIC
diff --git a/ydb/public/sdk/cpp/client/draft/CMakeLists.linux.txt b/ydb/public/sdk/cpp/client/draft/CMakeLists.linux.txt
index 6dbee5efe4..1d43bb8918 100644
--- a/ydb/public/sdk/cpp/client/draft/CMakeLists.linux.txt
+++ b/ydb/public/sdk/cpp/client/draft/CMakeLists.linux.txt
@@ -7,6 +7,7 @@
add_subdirectory(ut)
+add_subdirectory(ydb_query)
add_library(cpp-client-draft)
target_link_libraries(cpp-client-draft PUBLIC
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.darwin.txt b/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.darwin.txt
new file mode 100644
index 0000000000..d436332b10
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.darwin.txt
@@ -0,0 +1,22 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(impl)
+
+add_library(client-draft-ydb_query)
+target_link_libraries(client-draft-ydb_query PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ draft-ydb_query-impl
+ cpp-client-ydb_common_client
+ cpp-client-ydb_table
+)
+target_sources(client-draft-ydb_query PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/query.cpp
+)
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..49ec853da9
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,23 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(impl)
+
+add_library(client-draft-ydb_query)
+target_link_libraries(client-draft-ydb_query PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ draft-ydb_query-impl
+ cpp-client-ydb_common_client
+ cpp-client-ydb_table
+)
+target_sources(client-draft-ydb_query PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/query.cpp
+)
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.linux.txt b/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.linux.txt
new file mode 100644
index 0000000000..49ec853da9
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.linux.txt
@@ -0,0 +1,23 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(impl)
+
+add_library(client-draft-ydb_query)
+target_link_libraries(client-draft-ydb_query PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ draft-ydb_query-impl
+ cpp-client-ydb_common_client
+ cpp-client-ydb_table
+)
+target_sources(client-draft-ydb_query PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/query.cpp
+)
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.txt b/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.txt
new file mode 100644
index 0000000000..3e0811fb22
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.txt
@@ -0,0 +1,15 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (APPLE)
+ include(CMakeLists.darwin.txt)
+elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux.txt)
+endif()
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp b/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp
new file mode 100644
index 0000000000..50d49a48e4
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp
@@ -0,0 +1,49 @@
+#include "client.h"
+
+#include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h>
+#include <ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.h>
+
+namespace NYdb::NQuery {
+
+class TQueryClient::TImpl: public TClientImplCommon<TQueryClient::TImpl> {
+public:
+ TImpl(std::shared_ptr<TGRpcConnectionsImpl>&& connections, const TClientSettings& settings)
+ : TClientImplCommon(std::move(connections), settings)
+ , Settings_(settings)
+ {
+ }
+
+ ~TImpl() {
+ // TODO: Drain sessions.
+ }
+
+ TAsyncExecuteQueryIterator StreamExecuteQuery(const TString& query, const TExecuteQuerySettings& settings) {
+ return TExecQueryImpl::StreamExecuteQuery(Connections_, DbDriverState_, query, settings);
+ }
+
+ TAsyncExecuteQueryResult ExecuteQuery(const TString& query, const TExecuteQuerySettings& settings) {
+ return TExecQueryImpl::ExecuteQuery(Connections_, DbDriverState_, query, settings);
+ }
+
+private:
+ TClientSettings Settings_;
+};
+
+TQueryClient::TQueryClient(const TDriver& driver, const TClientSettings& settings)
+ : Impl_(new TQueryClient::TImpl(CreateInternalInterface(driver), settings))
+{
+}
+
+TAsyncExecuteQueryResult TQueryClient::ExecuteQuery(const TString& query,
+ const TExecuteQuerySettings& settings)
+{
+ return Impl_->ExecuteQuery(query, settings);
+}
+
+TAsyncExecuteQueryIterator TQueryClient::StreamExecuteQuery(const TString& query,
+ const TExecuteQuerySettings& settings)
+{
+ return Impl_->StreamExecuteQuery(query, settings);
+}
+
+} // namespace NYdb::NQuery
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/client.h b/ydb/public/sdk/cpp/client/draft/ydb_query/client.h
new file mode 100644
index 0000000000..2ce4e15126
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/client.h
@@ -0,0 +1,35 @@
+#pragma once
+
+#include "query.h"
+
+#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
+
+#include <util/generic/maybe.h>
+#include <util/generic/ptr.h>
+
+namespace NYdb {
+ class TProtoAccessor;
+}
+
+namespace NYdb::NQuery {
+
+struct TClientSettings : public TCommonClientSettingsBase<TClientSettings> {
+ using TSelf = TClientSettings;
+};
+
+class TQueryClient {
+public:
+ TQueryClient(const TDriver& driver, const TClientSettings& settings = TClientSettings());
+
+ TAsyncExecuteQueryResult ExecuteQuery(const TString& query,
+ const TExecuteQuerySettings& settings = TExecuteQuerySettings());
+
+ TAsyncExecuteQueryIterator StreamExecuteQuery(const TString& query,
+ const TExecuteQuerySettings& settings = TExecuteQuerySettings());
+
+private:
+ class TImpl;
+ std::shared_ptr<TImpl> Impl_;
+};
+
+} // namespace NYdb::NQuery
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/impl/CMakeLists.darwin.txt b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/CMakeLists.darwin.txt
new file mode 100644
index 0000000000..4e2c3d0522
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/CMakeLists.darwin.txt
@@ -0,0 +1,21 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(draft-ydb_query-impl)
+target_link_libraries(draft-ydb_query-impl PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ api-grpc-draft
+ api-protos
+ client-ydb_common_client-impl
+ cpp-client-ydb_proto
+)
+target_sources(draft-ydb_query-impl PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp
+)
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/impl/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..b9530d4d0b
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,22 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(draft-ydb_query-impl)
+target_link_libraries(draft-ydb_query-impl PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ api-grpc-draft
+ api-protos
+ client-ydb_common_client-impl
+ cpp-client-ydb_proto
+)
+target_sources(draft-ydb_query-impl PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp
+)
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/impl/CMakeLists.linux.txt b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/CMakeLists.linux.txt
new file mode 100644
index 0000000000..b9530d4d0b
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/CMakeLists.linux.txt
@@ -0,0 +1,22 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(draft-ydb_query-impl)
+target_link_libraries(draft-ydb_query-impl PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ api-grpc-draft
+ api-protos
+ client-ydb_common_client-impl
+ cpp-client-ydb_proto
+)
+target_sources(draft-ydb_query-impl PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp
+)
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/impl/CMakeLists.txt b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/CMakeLists.txt
new file mode 100644
index 0000000000..3e0811fb22
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/CMakeLists.txt
@@ -0,0 +1,15 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (APPLE)
+ include(CMakeLists.darwin.txt)
+elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux.txt)
+endif()
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp
new file mode 100644
index 0000000000..00bba6c120
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp
@@ -0,0 +1,215 @@
+#define INCLUDE_YDB_INTERNAL_H
+#include "exec_query.h"
+
+#include <ydb/public/sdk/cpp/client/impl/ydb_internal/make_request/make.h>
+#include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h>
+#undef INCLUDE_YDB_INTERNAL_H
+
+#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
+
+namespace NYdb::NQuery {
+
+using namespace NThreading;
+
+class TExecuteQueryIterator::TReaderImpl {
+public:
+ using TSelf = TExecuteQueryIterator::TReaderImpl;
+ using TResponse = Ydb::Query::ExecuteQueryResponsePart;
+ using TStreamProcessorPtr = NGrpc::IStreamRequestReadProcessor<TResponse>::TPtr;
+ using TReadCallback = NGrpc::IStreamRequestReadProcessor<TResponse>::TReadCallback;
+ using TGRpcStatus = NGrpc::TGrpcStatus;
+ using TBatchReadResult = std::pair<TResponse, TGRpcStatus>;
+
+ TReaderImpl(TStreamProcessorPtr streamProcessor, const TString& endpoint)
+ : StreamProcessor_(streamProcessor)
+ , Finished_(false)
+ , Endpoint_(endpoint)
+ {}
+
+ ~TReaderImpl() {
+ StreamProcessor_->Cancel();
+ }
+
+ bool IsFinished() const {
+ return Finished_;
+ }
+
+ TAsyncExecuteQueryPart ReadNext(std::shared_ptr<TSelf> self) {
+ auto promise = NThreading::NewPromise<TExecuteQueryPart>();
+ // Capture self - guarantee no dtor call during the read
+ auto readCb = [self, promise](TGRpcStatus&& grpcStatus) mutable {
+ if (!grpcStatus.Ok()) {
+ self->Finished_ = true;
+ promise.SetValue({TStatus(TPlainStatus(grpcStatus, self->Endpoint_))});
+ } else {
+ NYql::TIssues issues;
+ NYql::IssuesFromMessage(self->Response_.issues(), issues);
+ EStatus clientStatus = static_cast<EStatus>(self->Response_.status());
+ // TODO: Add headers for streaming calls.
+ TPlainStatus plainStatus{clientStatus, std::move(issues), self->Endpoint_, {}};
+ TStatus status{std::move(plainStatus)};
+
+ if (self->Response_.has_result_set()) {
+ promise.SetValue({
+ std::move(status),
+ TResultSet(std::move(*self->Response_.mutable_result_set())),
+ self->Response_.result_set_index()
+ });
+ } else {
+ promise.SetValue({std::move(status)});
+ }
+ }
+ };
+
+ StreamProcessor_->Read(&Response_, readCb);
+ return promise.GetFuture();
+ }
+private:
+ TStreamProcessorPtr StreamProcessor_;
+ TResponse Response_;
+ bool Finished_;
+ TString Endpoint_;
+};
+
+TAsyncExecuteQueryPart TExecuteQueryIterator::ReadNext() {
+ if (ReaderImpl_->IsFinished()) {
+ RaiseError("Attempt to perform read on invalid or finished stream");
+ }
+
+ return ReaderImpl_->ReadNext(ReaderImpl_);
+}
+
+using TExecuteQueryProcessorPtr = TExecuteQueryIterator::TReaderImpl::TStreamProcessorPtr;
+
+struct TExecuteQueryBuffer : public TThrRefBase, TNonCopyable {
+ using TPtr = TIntrusivePtr<TExecuteQueryBuffer>;
+
+ TExecuteQueryBuffer(TExecuteQueryIterator&& iterator)
+ : Promise_(NewPromise<TExecuteQueryResult>())
+ , Iterator_(std::move(iterator)) {}
+
+ TPromise<TExecuteQueryResult> Promise_;
+ TExecuteQueryIterator Iterator_;
+ TVector<NYql::TIssue> Issues_;
+ TVector<Ydb::ResultSet> ResultSets_;
+
+ void Next() {
+ TPtr self(this);
+
+ Iterator_.ReadNext().Subscribe([self](TAsyncExecuteQueryPart partFuture) mutable {
+ auto part = partFuture.ExtractValue();
+
+ if (!part.IsSuccess()) {
+ if (part.EOS()) {
+ TVector<NYql::TIssue> issues;
+ TVector<Ydb::ResultSet> resultProtos;
+
+ std::swap(self->Issues_, issues);
+ std::swap(self->ResultSets_, resultProtos);
+
+ TVector<TResultSet> resultSets;
+ for (auto& proto : resultProtos) {
+ resultSets.emplace_back(std::move(proto));
+ }
+
+ self->Promise_.SetValue(TExecuteQueryResult(
+ TStatus(EStatus::SUCCESS, NYql::TIssues(std::move(issues))),
+ std::move(resultSets)
+ ));
+ } else {
+ self->Promise_.SetValue(TExecuteQueryResult(std::move(part), {}));
+ }
+
+ return;
+ }
+
+ if (part.HasResultSet()) {
+ // TODO: Support multi-results
+ Y_ENSURE(part.GetResultSetIndex() == 0);
+
+ auto inRs = part.ExtractResultSet();
+ auto& inRsProto = TProtoAccessor::GetProto(inRs);
+
+ if (self->ResultSets_.empty()) {
+ self->ResultSets_.resize(1);
+ }
+
+ auto& resultSet = self->ResultSets_[0];
+ if (resultSet.columns().empty()) {
+ resultSet.mutable_columns()->CopyFrom(inRsProto.columns());
+ }
+
+ resultSet.mutable_rows()->Add(inRsProto.rows().begin(), inRsProto.rows().end());
+ }
+
+ self->Next();
+ });
+ }
+};
+
+TFuture<std::pair<TPlainStatus, TExecuteQueryProcessorPtr>> StreamExecuteQueryImpl(
+ const std::shared_ptr<TGRpcConnectionsImpl>& connections, const TDbDriverStatePtr& driverState,
+ const TString& query, const TExecuteQuerySettings& settings)
+{
+ auto request = MakeRequest<Ydb::Query::ExecuteQueryRequest>();
+ request.set_exec_mode(Ydb::Query::EXEC_MODE_EXECUTE);
+ request.mutable_query_content()->set_text(query);
+
+ auto promise = NewPromise<std::pair<TPlainStatus, TExecuteQueryProcessorPtr>>();
+
+ connections->StartReadStream<
+ Ydb::Query::V1::QueryService,
+ Ydb::Query::ExecuteQueryRequest,
+ Ydb::Query::ExecuteQueryResponsePart>
+ (
+ std::move(request),
+ [promise] (TPlainStatus status, TExecuteQueryProcessorPtr processor) mutable {
+ promise.SetValue(std::make_pair(status, processor));
+ },
+ &Ydb::Query::V1::QueryService::Stub::AsyncExecuteQuery,
+ driverState,
+ TRpcRequestSettings::Make(settings)
+ );
+
+ return promise.GetFuture();
+}
+
+TAsyncExecuteQueryIterator TExecQueryImpl::StreamExecuteQuery(const std::shared_ptr<TGRpcConnectionsImpl>& connections,
+ const TDbDriverStatePtr& driverState, const TString& query, const TExecuteQuerySettings& settings)
+{
+ auto promise = NewPromise<TExecuteQueryIterator>();
+
+ auto iteratorCallback = [promise](TFuture<std::pair<TPlainStatus, TExecuteQueryProcessorPtr>> future) mutable {
+ Y_ASSERT(future.HasValue());
+ auto pair = future.ExtractValue();
+ promise.SetValue(TExecuteQueryIterator(
+ pair.second
+ ? std::make_shared<TExecuteQueryIterator::TReaderImpl>(pair.second, pair.first.Endpoint)
+ : nullptr,
+ std::move(pair.first))
+ );
+ };
+
+ StreamExecuteQueryImpl(connections, driverState, query, settings).Subscribe(iteratorCallback);
+ return promise.GetFuture();
+}
+
+TAsyncExecuteQueryResult TExecQueryImpl::ExecuteQuery(const std::shared_ptr<TGRpcConnectionsImpl>& connections,
+ const TDbDriverStatePtr& driverState, const TString& query, const TExecuteQuerySettings& settings)
+{
+ return StreamExecuteQuery(connections, driverState, query, settings)
+ .Apply([](TAsyncExecuteQueryIterator itFuture){
+ auto it = itFuture.ExtractValue();
+
+ if (!it.IsSuccess()) {
+ return MakeFuture<TExecuteQueryResult>(std::move(it));
+ }
+
+ auto buffer = MakeIntrusive<TExecuteQueryBuffer>(std::move(it));
+ buffer->Next();
+
+ return buffer->Promise_.GetFuture();
+ });
+}
+
+} // namespace NYdb::NQuery
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.h b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.h
new file mode 100644
index 0000000000..dbb448c983
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.h
@@ -0,0 +1,19 @@
+#pragma once
+
+#include <ydb/public/sdk/cpp/client/impl/ydb_internal/internal_header.h>
+
+#include <ydb/public/sdk/cpp/client/draft/ydb_query/query.h>
+#include <ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h>
+
+namespace NYdb::NQuery {
+
+class TExecQueryImpl {
+public:
+ static TAsyncExecuteQueryIterator StreamExecuteQuery(const std::shared_ptr<TGRpcConnectionsImpl>& connections,
+ const TDbDriverStatePtr& driverState, const TString& query, const TExecuteQuerySettings& settings);
+
+ static TAsyncExecuteQueryResult ExecuteQuery(const std::shared_ptr<TGRpcConnectionsImpl>& connections,
+ const TDbDriverStatePtr& driverState, const TString& query, const TExecuteQuerySettings& settings);
+};
+
+} // namespace NYdb::NQuery::NImpl
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/query.cpp b/ydb/public/sdk/cpp/client/draft/ydb_query/query.cpp
new file mode 100644
index 0000000000..bf1a8e517b
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/query.cpp
@@ -0,0 +1,17 @@
+#include "query.h"
+
+namespace NYdb::NQuery {
+
+const TVector<TResultSet>& TExecuteQueryResult::GetResultSets() const {
+ return ResultSets_;
+}
+
+TResultSet TExecuteQueryResult::GetResultSet(size_t resultIndex) const {
+ if (resultIndex >= ResultSets_.size()) {
+ RaiseError(TString("Requested index out of range\n"));
+ }
+
+ return ResultSets_[resultIndex];
+}
+
+} // namespace NYdb::NQuery
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/query.h b/ydb/public/sdk/cpp/client/draft/ydb_query/query.h
new file mode 100644
index 0000000000..4c0f0a71e3
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/query.h
@@ -0,0 +1,81 @@
+#pragma once
+
+#include <ydb/public/api/grpc/draft/ydb_query_v1.grpc.pb.h>
+
+#include <ydb/public/sdk/cpp/client/ydb_result/result.h>
+#include <ydb/public/sdk/cpp/client/ydb_types/request_settings.h>
+#include <ydb/public/sdk/cpp/client/ydb_types/status/status.h>
+
+#include <library/cpp/threading/future/future.h>
+
+namespace NYdb::NQuery {
+
+class TExecuteQueryPart : public TStreamPartStatus {
+public:
+ bool HasResultSet() const { return ResultSet_.Defined(); }
+ ui64 GetResultSetIndex() const { return ResultSetIndex_; }
+ const TResultSet& GetResultSet() const { return *ResultSet_; }
+ TResultSet ExtractResultSet() { return std::move(*ResultSet_); }
+
+ TExecuteQueryPart(TStatus&& status)
+ : TStreamPartStatus(std::move(status))
+ {}
+
+ TExecuteQueryPart(TStatus&& status, TResultSet&& resultSet, i64 resultSetIndex)
+ : TStreamPartStatus(std::move(status))
+ , ResultSet_(std::move(resultSet))
+ , ResultSetIndex_(resultSetIndex)
+ {}
+
+private:
+ TMaybe<TResultSet> ResultSet_;
+ i64 ResultSetIndex_ = 0;
+};
+
+using TAsyncExecuteQueryPart = NThreading::TFuture<TExecuteQueryPart>;
+
+class TExecuteQueryIterator : public TStatus {
+ friend class TExecQueryImpl;
+public:
+ class TReaderImpl;
+
+ TAsyncExecuteQueryPart ReadNext();
+
+private:
+ TExecuteQueryIterator(
+ std::shared_ptr<TReaderImpl> impl,
+ TPlainStatus&& status)
+ : TStatus(std::move(status))
+ , ReaderImpl_(impl) {}
+
+ std::shared_ptr<TReaderImpl> ReaderImpl_;
+};
+
+using TAsyncExecuteQueryIterator = NThreading::TFuture<TExecuteQueryIterator>;
+
+struct TExecuteQuerySettings : public TRequestSettings<TExecuteQuerySettings> {
+};
+
+class TExecuteQueryResult : public TStatus {
+public:
+ const TVector<TResultSet>& GetResultSets() const;
+ TResultSet GetResultSet(size_t resultIndex) const;
+
+ TResultSetParser GetResultSetParser(size_t resultIndex) const;
+
+ TExecuteQueryResult(TStatus&& status)
+ : TStatus(std::move(status))
+ {}
+
+ TExecuteQueryResult(TStatus&& status, TVector<TResultSet>&& resultSets)
+ : TStatus(std::move(status))
+ , ResultSets_(std::move(resultSets))
+ {}
+
+private:
+ TVector<TResultSet> ResultSets_;
+};
+
+using TAsyncExecuteQueryResult = NThreading::TFuture<TExecuteQueryResult>;
+
+} // namespace NYdb::NQuery
diff --git a/ydb/services/ydb/CMakeLists.darwin.txt b/ydb/services/ydb/CMakeLists.darwin.txt
index f42a596347..9ae9e8c513 100644
--- a/ydb/services/ydb/CMakeLists.darwin.txt
+++ b/ydb/services/ydb/CMakeLists.darwin.txt
@@ -39,6 +39,7 @@ target_sources(ydb-services-ydb PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_import.cpp
${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_logstore.cpp
${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_operation.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_query.cpp
${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scheme.cpp
${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scripting.cpp
${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_table.cpp
diff --git a/ydb/services/ydb/CMakeLists.linux-aarch64.txt b/ydb/services/ydb/CMakeLists.linux-aarch64.txt
index 63a005d2ef..a609f4dece 100644
--- a/ydb/services/ydb/CMakeLists.linux-aarch64.txt
+++ b/ydb/services/ydb/CMakeLists.linux-aarch64.txt
@@ -40,6 +40,7 @@ target_sources(ydb-services-ydb PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_import.cpp
${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_logstore.cpp
${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_operation.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_query.cpp
${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scheme.cpp
${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scripting.cpp
${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_table.cpp
diff --git a/ydb/services/ydb/CMakeLists.linux.txt b/ydb/services/ydb/CMakeLists.linux.txt
index 63a005d2ef..a609f4dece 100644
--- a/ydb/services/ydb/CMakeLists.linux.txt
+++ b/ydb/services/ydb/CMakeLists.linux.txt
@@ -40,6 +40,7 @@ target_sources(ydb-services-ydb PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_import.cpp
${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_logstore.cpp
${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_operation.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_query.cpp
${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scheme.cpp
${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scripting.cpp
${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_table.cpp
diff --git a/ydb/services/ydb/ydb_query.cpp b/ydb/services/ydb/ydb_query.cpp
new file mode 100644
index 0000000000..97de31837c
--- /dev/null
+++ b/ydb/services/ydb/ydb_query.cpp
@@ -0,0 +1,35 @@
+#include "ydb_query.h"
+
+#include <ydb/core/grpc_services/base/base.h>
+#include <ydb/core/grpc_services/counters/counters.h>
+#include <ydb/core/grpc_services/grpc_helper.h>
+#include <ydb/core/grpc_services/service_query.h>
+
+namespace NKikimr::NGRpcService {
+
+void TGRpcYdbQueryService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
+ using Ydb::Query::ExecuteQueryRequest;
+ using Ydb::Query::ExecuteQueryResponsePart;
+
+ auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_);
+
+#ifdef ADD_REQUEST
+#error ADD_REQUEST macro already defined
+#endif
+#define ADD_REQUEST(NAME, IN, OUT, ACTION) \
+ MakeIntrusive<TGRpcRequest<Ydb::Query::IN, Ydb::Query::OUT, TGRpcYdbQueryService>>(this, &Service_, CQ_, \
+ [this](NGrpc::IRequestContextBase *ctx) { \
+ NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
+ ACTION; \
+ }, &Ydb::Query::V1::QueryService::AsyncService::Request ## NAME, \
+ #NAME, logger, getCounterBlock("query", #NAME))->Run();
+
+ ADD_REQUEST(ExecuteQuery, ExecuteQueryRequest, ExecuteQueryResponsePart, {
+ ActorSystem_->Send(GRpcRequestProxyId_,
+ new TGrpcRequestNoOperationCall<ExecuteQueryRequest, ExecuteQueryResponsePart>
+ (ctx, &DoExecuteQueryRequest, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr}));
+ })
+#undef ADD_REQUEST
+}
+
+} // namespace NKikimr::NGRpcService
diff --git a/ydb/services/ydb/ydb_query.h b/ydb/services/ydb/ydb_query.h
new file mode 100644
index 0000000000..a866d6070a
--- /dev/null
+++ b/ydb/services/ydb/ydb_query.h
@@ -0,0 +1,18 @@
+#pragma once
+
+#include <ydb/core/grpc_services/base/base_service.h>
+#include <ydb/public/api/grpc/draft/ydb_query_v1.grpc.pb.h>
+
+namespace NKikimr::NGRpcService {
+
+class TGRpcYdbQueryService
+ : public TGrpcServiceBase<Ydb::Query::V1::QueryService>
+{
+public:
+ using TGrpcServiceBase<Ydb::Query::V1::QueryService>::TGrpcServiceBase;
+
+private:
+ void SetupIncomingRequests(NGrpc::TLoggerPtr logger);
+};
+
+} // namespace NKikimr::NGRpcService