diff options
author | spuchin <spuchin@ydb.tech> | 2022-12-27 13:32:14 +0300 |
---|---|---|
committer | spuchin <spuchin@ydb.tech> | 2022-12-27 13:32:14 +0300 |
commit | 4c087567ebe51fc9ca768f5e5cc1fa54e234083c (patch) | |
tree | 7ea613767195c90c81c4fe0d752fec52e7362faf | |
parent | 6430ddcd067d7ce86670847f0cd6eed645978b7b (diff) | |
download | ydb-4c087567ebe51fc9ca768f5e5cc1fa54e234083c.tar.gz |
ExecuteQuery stub. ()
38 files changed, 1064 insertions, 3 deletions
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index 6dec2e73f6..c35be9b29e 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -100,6 +100,7 @@ #include <ydb/services/ydb/ydb_logstore.h> #include <ydb/services/ydb/ydb_long_tx.h> #include <ydb/services/ydb/ydb_operation.h> +#include <ydb/services/ydb/ydb_query.h> #include <ydb/services/ydb/ydb_scheme.h> #include <ydb/services/ydb/ydb_scripting.h> #include <ydb/services/ydb/ydb_table.h> @@ -577,6 +578,8 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { names["logstore"] = &hasLogStore; TServiceCfg hasAuth = services.empty(); names["auth"] = &hasAuth; + TServiceCfg hasQueryService = services.empty(); + names["query_service"] = &hasQueryService; std::unordered_set<TString> enabled; for (const auto& name : services) { @@ -805,6 +808,11 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { server.AddService(new NGRpcService::TGRpcFqPrivateTaskService(ActorSystem.Get(), Counters, grpcRequestProxyId)); } + if (hasQueryService) { + server.AddService(new NGRpcService::TGRpcYdbQueryService(ActorSystem.Get(), Counters, + grpcRequestProxyId, hasDataStreams.IsRlAllowed())); + } + if (hasLogStore) { server.AddService(new NGRpcService::TGRpcYdbLogStoreService(ActorSystem.Get(), Counters, grpcRequestProxyId, hasLogStore.IsRlAllowed())); diff --git a/ydb/core/grpc_services/CMakeLists.darwin.txt b/ydb/core/grpc_services/CMakeLists.darwin.txt index e42e68aba8..5ce37acb12 100644 --- a/ydb/core/grpc_services/CMakeLists.darwin.txt +++ b/ydb/core/grpc_services/CMakeLists.darwin.txt @@ -87,6 +87,7 @@ target_sources(ydb-core-grpc_services PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_drop_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_discovery.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_execute_data_query.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_execute_query.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_execute_scheme_query.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_execute_yql_script.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_explain_yql_script.cpp diff --git a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt index 6d1930bcca..c5bdf1447c 100644 --- a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt +++ b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt @@ -88,6 +88,7 @@ target_sources(ydb-core-grpc_services PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_drop_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_discovery.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_execute_data_query.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_execute_query.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_execute_scheme_query.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_execute_yql_script.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_explain_yql_script.cpp diff --git a/ydb/core/grpc_services/CMakeLists.linux.txt b/ydb/core/grpc_services/CMakeLists.linux.txt index 6d1930bcca..c5bdf1447c 100644 --- a/ydb/core/grpc_services/CMakeLists.linux.txt +++ b/ydb/core/grpc_services/CMakeLists.linux.txt @@ -88,6 +88,7 @@ target_sources(ydb-core-grpc_services PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_drop_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_discovery.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_execute_data_query.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_execute_query.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_execute_scheme_query.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_execute_yql_script.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_explain_yql_script.cpp diff --git a/ydb/core/grpc_services/base/base_service.h b/ydb/core/grpc_services/base/base_service.h index 52db0b46df..1c6f6f6432 100644 --- a/ydb/core/grpc_services/base/base_service.h +++ b/ydb/core/grpc_services/base/base_service.h @@ -1,6 +1,8 @@ #pragma once +#include <library/cpp/actors/core/actorsystem.h> #include <library/cpp/grpc/server/grpc_request_base.h> +#include <library/cpp/grpc/server/grpc_server.h> #include <library/cpp/grpc/server/logger.h> namespace NKikimr { diff --git a/ydb/core/grpc_services/rpc_execute_query.cpp b/ydb/core/grpc_services/rpc_execute_query.cpp new file mode 100644 index 0000000000..cf57107c11 --- /dev/null +++ b/ydb/core/grpc_services/rpc_execute_query.cpp @@ -0,0 +1,334 @@ +#include "service_query.h" + +#include "rpc_kqp_base.h" + +#include <ydb/core/actorlib_impl/long_timer.h> +#include <ydb/core/base/appdata.h> +#include <ydb/core/base/kikimr_issue.h> +#include <ydb/core/grpc_services/base/base.h> +#include <ydb/core/kqp/executer_actor/kqp_executer.h> +#include <ydb/public/api/protos/draft/ydb_query.pb.h> + +#include <library/cpp/actors/core/actor_bootstrapped.h> + +namespace NKikimr::NGRpcService { + +namespace { + +using namespace NActors; + +using TEvExecuteQueryRequest = TGrpcRequestNoOperationCall<Ydb::Query::ExecuteQueryRequest, + Ydb::Query::ExecuteQueryResponsePart>; + +std::tuple<Ydb::StatusIds::StatusCode, NYql::TIssues> FillKqpRequest( + const Ydb::Query::ExecuteQueryRequest& req, NKikimrKqp::TEvQueryRequest& kqpRequest) +{ + kqpRequest.MutableRequest()->MutableYdbParameters()->insert(req.parameters().begin(), req.parameters().end()); + switch (req.exec_mode()) { + case Ydb::Query::EXEC_MODE_EXECUTE: + kqpRequest.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); + break; + default: { + NYql::TIssues issues; + issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Unexpected query mode")); + return {Ydb::StatusIds::BAD_REQUEST, issues}; + } + } + + // TODO: Use new type of query (QUERY_TYPE_SQL_QUERY) + kqpRequest.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_SCAN); + kqpRequest.MutableRequest()->SetKeepSession(false); + + switch (req.query_case()) { + case Ydb::Query::ExecuteQueryRequest::kQueryContent: { + NYql::TIssues issues; + if (!CheckQuery(req.query_content().text(), issues)) { + return {Ydb::StatusIds::BAD_REQUEST, issues}; + } + + kqpRequest.MutableRequest()->SetQuery(req.query_content().text()); + break; + } + + default: { + NYql::TIssues issues; + issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Unexpected query option")); + return {Ydb::StatusIds::BAD_REQUEST, issues}; + } + } + + return {Ydb::StatusIds::SUCCESS, {}}; +} + +class RpcFlowControlState { +public: + RpcFlowControlState(ui64 inflightLimitBytes) + : InflightLimitBytes_(inflightLimitBytes) {} + + void PushResponse(ui64 responseSizeBytes) { + ResponseSizeQueue_.push(responseSizeBytes); + TotalResponsesSize_ += responseSizeBytes; + } + + void PopResponse() { + Y_ENSURE(!ResponseSizeQueue_.empty()); + TotalResponsesSize_ -= ResponseSizeQueue_.front(); + ResponseSizeQueue_.pop(); + } + + size_t QueueSize() const { + return ResponseSizeQueue_.size(); + } + + ui64 FreeSpaceBytes() const { + return TotalResponsesSize_ < InflightLimitBytes_ + ? InflightLimitBytes_ - TotalResponsesSize_ + : 0; + } + + ui64 InflightBytes() const { + return TotalResponsesSize_; + } + + ui64 InflightLimitBytes() const { + return InflightLimitBytes_; + } + +private: + const ui64 InflightLimitBytes_; + + TQueue<ui64> ResponseSizeQueue_; + ui64 TotalResponsesSize_ = 0; +}; + +class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> { +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::GRPC_STREAM_REQ; + } + + TExecuteQueryRPC(TEvExecuteQueryRequest* request, ui64 inflightLimitBytes) + : Request_(request) + , FlowControl_(inflightLimitBytes) {} + + void Bootstrap(const TActorContext &ctx) { + this->Become(&TExecuteQueryRPC::StateWork); + + auto selfId = this->SelfId(); + auto as = TActivationContext::ActorSystem(); + + Request_->SetClientLostAction([selfId, as]() { + as->Send(selfId, new TEvents::TEvWakeup(EWakeupTag::ClientLostTag)); + }); + + Request_->SetStreamingNotify([selfId, as](size_t left) { + as->Send(selfId, new TRpcServices::TEvGrpcNextReply(left)); + }); + + Proceed(ctx); + } + +private: + void StateWork(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) { + try { + switch (ev->GetTypeRewrite()) { + HFunc(TEvents::TEvWakeup, Handle); + HFunc(TRpcServices::TEvGrpcNextReply, Handle); + HFunc(NKqp::TEvKqpExecuter::TEvExecuterProgress, Handle); + HFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle); + HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle); + default: + UnexpectedEvent(__func__, ev, ctx); + } + } catch (const yexception& ex) { + InternalError(ex.what(), ctx); + } + } + + void Proceed(const TActorContext &ctx) { + const auto req = Request_->GetProtoRequest(); + const auto traceId = Request_->GetTraceId(); + + auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); + SetAuthToken(ev, *Request_); + SetDatabase(ev, *Request_); + SetRlPath(ev, *Request_); + + if (traceId) { + ev->Record.SetTraceId(traceId.GetRef()); + } + + ActorIdToProto(this->SelfId(), ev->Record.MutableRequestActorId()); + + auto [fillStatus, fillIssues] = FillKqpRequest(*req, ev->Record); + if (fillStatus != Ydb::StatusIds::SUCCESS) { + return ReplyFinishStream(fillStatus, fillIssues, ctx); + } + + if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release())) { + NYql::TIssues issues; + issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Internal error")); + ReplyFinishStream(Ydb::StatusIds::INTERNAL_ERROR, issues, ctx); + } + } + + void Handle(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx) { + switch ((EWakeupTag) ev->Get()->Tag) { + case EWakeupTag::ClientLostTag: + return HandleClientLost(ctx); + } + } + + void Handle(TRpcServices::TEvGrpcNextReply::TPtr& ev, const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " NextReply" + << ", left: " << ev->Get()->LeftInQueue + << ", queue: " << FlowControl_.QueueSize() + << ", inflight bytes: " << FlowControl_.InflightBytes() + << ", limit bytes: " << FlowControl_.InflightLimitBytes()); + + while (FlowControl_.QueueSize() > ev->Get()->LeftInQueue) { + FlowControl_.PopResponse(); + } + + ui64 freeSpaceBytes = FlowControl_.FreeSpaceBytes(); + if (ResumeWithSeqNo_ && freeSpaceBytes > 0) { + LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << "Resume execution, " + << ", seqNo: " << *ResumeWithSeqNo_ + << ", freeSpace: " << freeSpaceBytes + << ", executer: " << ExecuterActorId_); + + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); + resp->Record.SetSeqNo(*ResumeWithSeqNo_); + resp->Record.SetFreeSpace(freeSpaceBytes); + + ctx.Send(ExecuterActorId_, resp.Release()); + + ResumeWithSeqNo_.Clear(); + } + } + + void Handle(NKqp::TEvKqpExecuter::TEvExecuterProgress::TPtr& ev, const TActorContext& ctx) { + ExecuterActorId_ = ActorIdFromProto(ev->Get()->Record.GetExecuterActorId()); + LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << "ExecuterActorId: " << ExecuterActorId_); + } + + void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev, const TActorContext& ctx) { + Ydb::Query::ExecuteQueryResponsePart response; + response.set_status(Ydb::StatusIds::SUCCESS); + response.set_result_set_index(0); + response.mutable_result_set()->Swap(ev->Get()->Record.MutableResultSet()); + + TString out; + Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out); + + Request_->SendSerializedResult(std::move(out), Ydb::StatusIds::SUCCESS); + + auto freeSpaceBytes = FlowControl_.FreeSpaceBytes(); + if (freeSpaceBytes == 0) { + ResumeWithSeqNo_ = ev->Get()->Record.GetSeqNo(); + } + + LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << "Send stream data ack" + << ", seqNo: " << ev->Get()->Record.GetSeqNo() + << ", freeSpace: " << freeSpaceBytes + << ", to: " << ev->Sender + << ", queue: " << FlowControl_.QueueSize()); + + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); + resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); + resp->Record.SetFreeSpace(freeSpaceBytes); + + ctx.Send(ev->Sender, resp.Release()); + } + + void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { + auto& record = ev->Get()->Record.GetRef(); + + NYql::TIssues issues; + const auto& issueMessage = record.GetResponse().GetQueryIssues(); + NYql::IssuesFromMessage(issueMessage, issues); + + ReplyFinishStream(record.GetYdbStatus(), issues, ctx); + } + +private: + void HandleClientLost(const TActorContext& ctx) { + // TODO: Abort query execution. + Y_UNUSED(ctx); + } + + void ReplyFinishStream(Ydb::StatusIds::StatusCode status, const NYql::TIssue& issue, const TActorContext& ctx) { + google::protobuf::RepeatedPtrField<TYdbIssueMessageType> issuesMessage; + NYql::IssueToMessage(issue, issuesMessage.Add()); + + ReplyFinishStream(status, issuesMessage, ctx); + } + + void ReplyFinishStream(Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues, const TActorContext& ctx) { + google::protobuf::RepeatedPtrField<TYdbIssueMessageType> issuesMessage; + for (auto& issue : issues) { + auto item = issuesMessage.Add(); + NYql::IssueToMessage(issue, item); + } + + ReplyFinishStream(status, issuesMessage, ctx); + } + + void ReplyFinishStream(Ydb::StatusIds::StatusCode status, + const google::protobuf::RepeatedPtrField<TYdbIssueMessageType>& message, const TActorContext& ctx) + { + LOG_INFO_S(ctx, NKikimrServices::RPC_REQUEST, "Finish grpc stream, status: " + << Ydb::StatusIds::StatusCode_Name(status)); + + // Skip sending empty result in case of success status - simplify client logic + if (status != Ydb::StatusIds::SUCCESS) { + TString out; + Ydb::Query::ExecuteQueryResponsePart response; + response.set_status(status); + response.mutable_issues()->CopyFrom(message); + Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out); + Request_->SendSerializedResult(std::move(out), status); + } + + Request_->FinishStream(); + this->PassAway(); + } + + void InternalError(const TString& message, const TActorContext& ctx) { + LOG_ERROR_S(ctx, NKikimrServices::RPC_REQUEST, "Internal error, message: " << message); + + auto issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, message); + ReplyFinishStream(Ydb::StatusIds::INTERNAL_ERROR, issue, ctx); + } + + void UnexpectedEvent(const TString& state, TAutoPtr<NActors::IEventHandle>& ev, const TActorContext& ctx) { + InternalError(TStringBuilder() << "TExecuteQueryRPC in state " << state << " received unexpected event " << + TypeName(*ev.Get()->GetBase()) << Sprintf("(0x%08" PRIx32 ")", ev->GetTypeRewrite()), ctx); + } + +private: + enum EWakeupTag : ui64 { + ClientLostTag = 1, + }; + +private: + std::unique_ptr<TEvExecuteQueryRequest> Request_; + + RpcFlowControlState FlowControl_; + TMaybe<ui64> ResumeWithSeqNo_; + + TActorId ExecuterActorId_; +}; + +} // namespace + +void DoExecuteQueryRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f) { + // Use default channel buffer size as inflight limit + ui64 inflightLimitBytes = f.GetAppConfig().GetTableServiceConfig().GetResourceManager().GetChannelBufferSize(); + + auto* req = dynamic_cast<TEvExecuteQueryRequest*>(p.release()); + Y_VERIFY(req != nullptr, "Wrong using of TGRpcRequestWrapper"); + TActivationContext::AsActorContext().Register(new TExecuteQueryRPC(req, inflightLimitBytes)); +} + +} // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/service_query.h b/ydb/core/grpc_services/service_query.h new file mode 100644 index 0000000000..fb6740648f --- /dev/null +++ b/ydb/core/grpc_services/service_query.h @@ -0,0 +1,13 @@ +#pragma once + +#include <memory> + +namespace NKikimr::NGRpcService { + +class IRequestOpCtx; +class IRequestNoOpCtx; +class IFacilityProvider; + +void DoExecuteQueryRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider&); + +} // namespace NKikimr::NGRpcService diff --git a/ydb/core/kqp/ut/common/CMakeLists.darwin.txt b/ydb/core/kqp/ut/common/CMakeLists.darwin.txt index 5b3d6bc3d8..ca9b96e849 100644 --- a/ydb/core/kqp/ut/common/CMakeLists.darwin.txt +++ b/ydb/core/kqp/ut/common/CMakeLists.darwin.txt @@ -19,6 +19,7 @@ target_link_libraries(kqp-ut-common PUBLIC yql-utils-backtrace public-lib-yson_value cpp-client-draft + client-draft-ydb_query cpp-client-ydb_proto cpp-client-ydb_scheme cpp-client-ydb_table diff --git a/ydb/core/kqp/ut/common/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/ut/common/CMakeLists.linux-aarch64.txt index eab66e487a..4f6c34f949 100644 --- a/ydb/core/kqp/ut/common/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/ut/common/CMakeLists.linux-aarch64.txt @@ -20,6 +20,7 @@ target_link_libraries(kqp-ut-common PUBLIC yql-utils-backtrace public-lib-yson_value cpp-client-draft + client-draft-ydb_query cpp-client-ydb_proto cpp-client-ydb_scheme cpp-client-ydb_table diff --git a/ydb/core/kqp/ut/common/CMakeLists.linux.txt b/ydb/core/kqp/ut/common/CMakeLists.linux.txt index eab66e487a..4f6c34f949 100644 --- a/ydb/core/kqp/ut/common/CMakeLists.linux.txt +++ b/ydb/core/kqp/ut/common/CMakeLists.linux.txt @@ -20,6 +20,7 @@ target_link_libraries(kqp-ut-common PUBLIC yql-utils-backtrace public-lib-yson_value cpp-client-draft + client-draft-ydb_query cpp-client-ydb_proto cpp-client-ydb_scheme cpp-client-ydb_table diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h index d2a1dedd85..558cdc6491 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.h +++ b/ydb/core/kqp/ut/common/kqp_ut_common.h @@ -6,6 +6,7 @@ #include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h> #include <ydb/public/sdk/cpp/client/ydb_table/table.h> #include <ydb/public/sdk/cpp/client/draft/ydb_scripting.h> +#include <ydb/public/sdk/cpp/client/draft/ydb_query/client.h> #include <library/cpp/yson/node/node_io.h> @@ -51,14 +52,14 @@ void N(NUnitTest::TTestContext&) template <bool ForceVersionV1> -TString Query(const TString& tmpl) { +TString MakeQuery(const TString& tmpl) { return TStringBuilder() << (ForceVersionV1 ? "--!syntax_v1\n" : "") << tmpl; } -#define Q_(expr) Query<false>(expr) -#define Q1_(expr) Query<true>(expr) +#define Q_(expr) MakeQuery<false>(expr) +#define Q1_(expr) MakeQuery<true>(expr) namespace NKikimr { namespace NKqp { @@ -133,6 +134,10 @@ public: .UseQueryCache(false)); } + NYdb::NQuery::TQueryClient GetQueryClient() const { + return NYdb::NQuery::TQueryClient(*Driver); + } + bool IsUsingSnapshotReads() const { return Server->GetRuntime()->GetAppData().FeatureFlags.GetEnableMvccSnapshotReads(); } diff --git a/ydb/core/kqp/ut/service/CMakeLists.darwin.txt b/ydb/core/kqp/ut/service/CMakeLists.darwin.txt index 2072a651c4..dd508390b9 100644 --- a/ydb/core/kqp/ut/service/CMakeLists.darwin.txt +++ b/ydb/core/kqp/ut/service/CMakeLists.darwin.txt @@ -34,6 +34,7 @@ target_link_options(ydb-core-kqp-ut-service PRIVATE ) target_sources(ydb-core-kqp-ut-service PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/service/kqp_document_api_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/service/kqp_service_ut.cpp ) add_test( diff --git a/ydb/core/kqp/ut/service/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/ut/service/CMakeLists.linux-aarch64.txt index 291528848c..40ec0e2702 100644 --- a/ydb/core/kqp/ut/service/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/ut/service/CMakeLists.linux-aarch64.txt @@ -37,6 +37,7 @@ target_link_options(ydb-core-kqp-ut-service PRIVATE ) target_sources(ydb-core-kqp-ut-service PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/service/kqp_document_api_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/service/kqp_service_ut.cpp ) add_test( diff --git a/ydb/core/kqp/ut/service/CMakeLists.linux.txt b/ydb/core/kqp/ut/service/CMakeLists.linux.txt index 0083566c3e..43db5b00b2 100644 --- a/ydb/core/kqp/ut/service/CMakeLists.linux.txt +++ b/ydb/core/kqp/ut/service/CMakeLists.linux.txt @@ -39,6 +39,7 @@ target_link_options(ydb-core-kqp-ut-service PRIVATE ) target_sources(ydb-core-kqp-ut-service PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/service/kqp_document_api_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/service/kqp_service_ut.cpp ) add_test( diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp new file mode 100644 index 0000000000..d21ad049b7 --- /dev/null +++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp @@ -0,0 +1,50 @@ +#include <ydb/core/kqp/ut/common/kqp_ut_common.h> + +namespace NKikimr { +namespace NKqp { + +using namespace NYdb; +using namespace NYdb::NQuery; + +Y_UNIT_TEST_SUITE(KqpQueryService) { + Y_UNIT_TEST(StreamExecuteQuery) { + auto kikimr = DefaultKikimrRunner(); + auto db = kikimr.GetQueryClient(); + + auto it = db.StreamExecuteQuery(R"( + SELECT 1; + )").ExtractValueSync(); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + + ui64 count = 0; + for (;;) { + auto streamPart = it.ReadNext().GetValueSync(); + if (!streamPart.IsSuccess()) { + UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString()); + break; + } + + if (streamPart.HasResultSet()) { + auto resultSet = streamPart.ExtractResultSet(); + count += resultSet.RowsCount(); + } + } + + UNIT_ASSERT_VALUES_EQUAL(count, 1); + } + + Y_UNIT_TEST(ExecuteQuery) { + auto kikimr = DefaultKikimrRunner(); + auto db = kikimr.GetQueryClient(); + + auto result = db.ExecuteQuery(R"( + SELECT 1; + )").ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + CompareYson(R"([[1]])", FormatResultSetYson(result.GetResultSet(0))); + } +} + +} // namespace NKqp +} // namespace NKikimr diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 83e7240c3a..0efb648e4b 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -19,6 +19,7 @@ #include <ydb/services/ydb/ydb_export.h> #include <ydb/services/ydb/ydb_import.h> #include <ydb/services/ydb/ydb_operation.h> +#include <ydb/services/ydb/ydb_query.h> #include <ydb/services/ydb/ydb_scheme.h> #include <ydb/services/ydb/ydb_scripting.h> #include <ydb/services/ydb/ydb_table.h> @@ -338,6 +339,7 @@ namespace Tests { GRpcServer->AddService(new NGRpcService::TGRpcYdbLongTxService(system, counters, grpcRequestProxyId, true)); GRpcServer->AddService(new NGRpcService::TGRpcDataStreamsService(system, counters, grpcRequestProxyId, true)); GRpcServer->AddService(new NGRpcService::TGRpcMonitoringService(system, counters, grpcRequestProxyId, true)); + GRpcServer->AddService(new NGRpcService::TGRpcYdbQueryService(system, counters, grpcRequestProxyId, true)); if (Settings->EnableYq) { GRpcServer->AddService(new NGRpcService::TGRpcYandexQueryService(system, counters, grpcRequestProxyId)); GRpcServer->AddService(new NGRpcService::TGRpcFederatedQueryService(system, counters, grpcRequestProxyId)); diff --git a/ydb/public/sdk/cpp/client/draft/CMakeLists.darwin.txt b/ydb/public/sdk/cpp/client/draft/CMakeLists.darwin.txt index de0df9620e..88788ca375 100644 --- a/ydb/public/sdk/cpp/client/draft/CMakeLists.darwin.txt +++ b/ydb/public/sdk/cpp/client/draft/CMakeLists.darwin.txt @@ -7,6 +7,7 @@ add_subdirectory(ut) +add_subdirectory(ydb_query) add_library(cpp-client-draft) target_link_libraries(cpp-client-draft PUBLIC diff --git a/ydb/public/sdk/cpp/client/draft/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/draft/CMakeLists.linux-aarch64.txt index 6dbee5efe4..1d43bb8918 100644 --- a/ydb/public/sdk/cpp/client/draft/CMakeLists.linux-aarch64.txt +++ b/ydb/public/sdk/cpp/client/draft/CMakeLists.linux-aarch64.txt @@ -7,6 +7,7 @@ add_subdirectory(ut) +add_subdirectory(ydb_query) add_library(cpp-client-draft) target_link_libraries(cpp-client-draft PUBLIC diff --git a/ydb/public/sdk/cpp/client/draft/CMakeLists.linux.txt b/ydb/public/sdk/cpp/client/draft/CMakeLists.linux.txt index 6dbee5efe4..1d43bb8918 100644 --- a/ydb/public/sdk/cpp/client/draft/CMakeLists.linux.txt +++ b/ydb/public/sdk/cpp/client/draft/CMakeLists.linux.txt @@ -7,6 +7,7 @@ add_subdirectory(ut) +add_subdirectory(ydb_query) add_library(cpp-client-draft) target_link_libraries(cpp-client-draft PUBLIC diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.darwin.txt b/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.darwin.txt new file mode 100644 index 0000000000..d436332b10 --- /dev/null +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.darwin.txt @@ -0,0 +1,22 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(impl) + +add_library(client-draft-ydb_query) +target_link_libraries(client-draft-ydb_query PUBLIC + contrib-libs-cxxsupp + yutil + draft-ydb_query-impl + cpp-client-ydb_common_client + cpp-client-ydb_table +) +target_sources(client-draft-ydb_query PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/query.cpp +) diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..49ec853da9 --- /dev/null +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.linux-aarch64.txt @@ -0,0 +1,23 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(impl) + +add_library(client-draft-ydb_query) +target_link_libraries(client-draft-ydb_query PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + draft-ydb_query-impl + cpp-client-ydb_common_client + cpp-client-ydb_table +) +target_sources(client-draft-ydb_query PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/query.cpp +) diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.linux.txt b/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.linux.txt new file mode 100644 index 0000000000..49ec853da9 --- /dev/null +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.linux.txt @@ -0,0 +1,23 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(impl) + +add_library(client-draft-ydb_query) +target_link_libraries(client-draft-ydb_query PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + draft-ydb_query-impl + cpp-client-ydb_common_client + cpp-client-ydb_table +) +target_sources(client-draft-ydb_query PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/query.cpp +) diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.txt b/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.txt new file mode 100644 index 0000000000..3e0811fb22 --- /dev/null +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE) + include(CMakeLists.darwin.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux.txt) +endif() diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp b/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp new file mode 100644 index 0000000000..50d49a48e4 --- /dev/null +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp @@ -0,0 +1,49 @@ +#include "client.h" + +#include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h> +#include <ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.h> + +namespace NYdb::NQuery { + +class TQueryClient::TImpl: public TClientImplCommon<TQueryClient::TImpl> { +public: + TImpl(std::shared_ptr<TGRpcConnectionsImpl>&& connections, const TClientSettings& settings) + : TClientImplCommon(std::move(connections), settings) + , Settings_(settings) + { + } + + ~TImpl() { + // TODO: Drain sessions. + } + + TAsyncExecuteQueryIterator StreamExecuteQuery(const TString& query, const TExecuteQuerySettings& settings) { + return TExecQueryImpl::StreamExecuteQuery(Connections_, DbDriverState_, query, settings); + } + + TAsyncExecuteQueryResult ExecuteQuery(const TString& query, const TExecuteQuerySettings& settings) { + return TExecQueryImpl::ExecuteQuery(Connections_, DbDriverState_, query, settings); + } + +private: + TClientSettings Settings_; +}; + +TQueryClient::TQueryClient(const TDriver& driver, const TClientSettings& settings) + : Impl_(new TQueryClient::TImpl(CreateInternalInterface(driver), settings)) +{ +} + +TAsyncExecuteQueryResult TQueryClient::ExecuteQuery(const TString& query, + const TExecuteQuerySettings& settings) +{ + return Impl_->ExecuteQuery(query, settings); +} + +TAsyncExecuteQueryIterator TQueryClient::StreamExecuteQuery(const TString& query, + const TExecuteQuerySettings& settings) +{ + return Impl_->StreamExecuteQuery(query, settings); +} + +} // namespace NYdb::NQuery diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/client.h b/ydb/public/sdk/cpp/client/draft/ydb_query/client.h new file mode 100644 index 0000000000..2ce4e15126 --- /dev/null +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/client.h @@ -0,0 +1,35 @@ +#pragma once + +#include "query.h" + +#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h> + +#include <util/generic/maybe.h> +#include <util/generic/ptr.h> + +namespace NYdb { + class TProtoAccessor; +} + +namespace NYdb::NQuery { + +struct TClientSettings : public TCommonClientSettingsBase<TClientSettings> { + using TSelf = TClientSettings; +}; + +class TQueryClient { +public: + TQueryClient(const TDriver& driver, const TClientSettings& settings = TClientSettings()); + + TAsyncExecuteQueryResult ExecuteQuery(const TString& query, + const TExecuteQuerySettings& settings = TExecuteQuerySettings()); + + TAsyncExecuteQueryIterator StreamExecuteQuery(const TString& query, + const TExecuteQuerySettings& settings = TExecuteQuerySettings()); + +private: + class TImpl; + std::shared_ptr<TImpl> Impl_; +}; + +} // namespace NYdb::NQuery diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/impl/CMakeLists.darwin.txt b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/CMakeLists.darwin.txt new file mode 100644 index 0000000000..4e2c3d0522 --- /dev/null +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/CMakeLists.darwin.txt @@ -0,0 +1,21 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(draft-ydb_query-impl) +target_link_libraries(draft-ydb_query-impl PUBLIC + contrib-libs-cxxsupp + yutil + api-grpc-draft + api-protos + client-ydb_common_client-impl + cpp-client-ydb_proto +) +target_sources(draft-ydb_query-impl PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp +) diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/impl/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..b9530d4d0b --- /dev/null +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/CMakeLists.linux-aarch64.txt @@ -0,0 +1,22 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(draft-ydb_query-impl) +target_link_libraries(draft-ydb_query-impl PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + api-grpc-draft + api-protos + client-ydb_common_client-impl + cpp-client-ydb_proto +) +target_sources(draft-ydb_query-impl PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp +) diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/impl/CMakeLists.linux.txt b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/CMakeLists.linux.txt new file mode 100644 index 0000000000..b9530d4d0b --- /dev/null +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/CMakeLists.linux.txt @@ -0,0 +1,22 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(draft-ydb_query-impl) +target_link_libraries(draft-ydb_query-impl PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + api-grpc-draft + api-protos + client-ydb_common_client-impl + cpp-client-ydb_proto +) +target_sources(draft-ydb_query-impl PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp +) diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/impl/CMakeLists.txt b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/CMakeLists.txt new file mode 100644 index 0000000000..3e0811fb22 --- /dev/null +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE) + include(CMakeLists.darwin.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux.txt) +endif() diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp new file mode 100644 index 0000000000..00bba6c120 --- /dev/null +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.cpp @@ -0,0 +1,215 @@ +#define INCLUDE_YDB_INTERNAL_H +#include "exec_query.h" + +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/make_request/make.h> +#include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h> +#undef INCLUDE_YDB_INTERNAL_H + +#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> + +namespace NYdb::NQuery { + +using namespace NThreading; + +class TExecuteQueryIterator::TReaderImpl { +public: + using TSelf = TExecuteQueryIterator::TReaderImpl; + using TResponse = Ydb::Query::ExecuteQueryResponsePart; + using TStreamProcessorPtr = NGrpc::IStreamRequestReadProcessor<TResponse>::TPtr; + using TReadCallback = NGrpc::IStreamRequestReadProcessor<TResponse>::TReadCallback; + using TGRpcStatus = NGrpc::TGrpcStatus; + using TBatchReadResult = std::pair<TResponse, TGRpcStatus>; + + TReaderImpl(TStreamProcessorPtr streamProcessor, const TString& endpoint) + : StreamProcessor_(streamProcessor) + , Finished_(false) + , Endpoint_(endpoint) + {} + + ~TReaderImpl() { + StreamProcessor_->Cancel(); + } + + bool IsFinished() const { + return Finished_; + } + + TAsyncExecuteQueryPart ReadNext(std::shared_ptr<TSelf> self) { + auto promise = NThreading::NewPromise<TExecuteQueryPart>(); + // Capture self - guarantee no dtor call during the read + auto readCb = [self, promise](TGRpcStatus&& grpcStatus) mutable { + if (!grpcStatus.Ok()) { + self->Finished_ = true; + promise.SetValue({TStatus(TPlainStatus(grpcStatus, self->Endpoint_))}); + } else { + NYql::TIssues issues; + NYql::IssuesFromMessage(self->Response_.issues(), issues); + EStatus clientStatus = static_cast<EStatus>(self->Response_.status()); + // TODO: Add headers for streaming calls. + TPlainStatus plainStatus{clientStatus, std::move(issues), self->Endpoint_, {}}; + TStatus status{std::move(plainStatus)}; + + if (self->Response_.has_result_set()) { + promise.SetValue({ + std::move(status), + TResultSet(std::move(*self->Response_.mutable_result_set())), + self->Response_.result_set_index() + }); + } else { + promise.SetValue({std::move(status)}); + } + } + }; + + StreamProcessor_->Read(&Response_, readCb); + return promise.GetFuture(); + } +private: + TStreamProcessorPtr StreamProcessor_; + TResponse Response_; + bool Finished_; + TString Endpoint_; +}; + +TAsyncExecuteQueryPart TExecuteQueryIterator::ReadNext() { + if (ReaderImpl_->IsFinished()) { + RaiseError("Attempt to perform read on invalid or finished stream"); + } + + return ReaderImpl_->ReadNext(ReaderImpl_); +} + +using TExecuteQueryProcessorPtr = TExecuteQueryIterator::TReaderImpl::TStreamProcessorPtr; + +struct TExecuteQueryBuffer : public TThrRefBase, TNonCopyable { + using TPtr = TIntrusivePtr<TExecuteQueryBuffer>; + + TExecuteQueryBuffer(TExecuteQueryIterator&& iterator) + : Promise_(NewPromise<TExecuteQueryResult>()) + , Iterator_(std::move(iterator)) {} + + TPromise<TExecuteQueryResult> Promise_; + TExecuteQueryIterator Iterator_; + TVector<NYql::TIssue> Issues_; + TVector<Ydb::ResultSet> ResultSets_; + + void Next() { + TPtr self(this); + + Iterator_.ReadNext().Subscribe([self](TAsyncExecuteQueryPart partFuture) mutable { + auto part = partFuture.ExtractValue(); + + if (!part.IsSuccess()) { + if (part.EOS()) { + TVector<NYql::TIssue> issues; + TVector<Ydb::ResultSet> resultProtos; + + std::swap(self->Issues_, issues); + std::swap(self->ResultSets_, resultProtos); + + TVector<TResultSet> resultSets; + for (auto& proto : resultProtos) { + resultSets.emplace_back(std::move(proto)); + } + + self->Promise_.SetValue(TExecuteQueryResult( + TStatus(EStatus::SUCCESS, NYql::TIssues(std::move(issues))), + std::move(resultSets) + )); + } else { + self->Promise_.SetValue(TExecuteQueryResult(std::move(part), {})); + } + + return; + } + + if (part.HasResultSet()) { + // TODO: Support multi-results + Y_ENSURE(part.GetResultSetIndex() == 0); + + auto inRs = part.ExtractResultSet(); + auto& inRsProto = TProtoAccessor::GetProto(inRs); + + if (self->ResultSets_.empty()) { + self->ResultSets_.resize(1); + } + + auto& resultSet = self->ResultSets_[0]; + if (resultSet.columns().empty()) { + resultSet.mutable_columns()->CopyFrom(inRsProto.columns()); + } + + resultSet.mutable_rows()->Add(inRsProto.rows().begin(), inRsProto.rows().end()); + } + + self->Next(); + }); + } +}; + +TFuture<std::pair<TPlainStatus, TExecuteQueryProcessorPtr>> StreamExecuteQueryImpl( + const std::shared_ptr<TGRpcConnectionsImpl>& connections, const TDbDriverStatePtr& driverState, + const TString& query, const TExecuteQuerySettings& settings) +{ + auto request = MakeRequest<Ydb::Query::ExecuteQueryRequest>(); + request.set_exec_mode(Ydb::Query::EXEC_MODE_EXECUTE); + request.mutable_query_content()->set_text(query); + + auto promise = NewPromise<std::pair<TPlainStatus, TExecuteQueryProcessorPtr>>(); + + connections->StartReadStream< + Ydb::Query::V1::QueryService, + Ydb::Query::ExecuteQueryRequest, + Ydb::Query::ExecuteQueryResponsePart> + ( + std::move(request), + [promise] (TPlainStatus status, TExecuteQueryProcessorPtr processor) mutable { + promise.SetValue(std::make_pair(status, processor)); + }, + &Ydb::Query::V1::QueryService::Stub::AsyncExecuteQuery, + driverState, + TRpcRequestSettings::Make(settings) + ); + + return promise.GetFuture(); +} + +TAsyncExecuteQueryIterator TExecQueryImpl::StreamExecuteQuery(const std::shared_ptr<TGRpcConnectionsImpl>& connections, + const TDbDriverStatePtr& driverState, const TString& query, const TExecuteQuerySettings& settings) +{ + auto promise = NewPromise<TExecuteQueryIterator>(); + + auto iteratorCallback = [promise](TFuture<std::pair<TPlainStatus, TExecuteQueryProcessorPtr>> future) mutable { + Y_ASSERT(future.HasValue()); + auto pair = future.ExtractValue(); + promise.SetValue(TExecuteQueryIterator( + pair.second + ? std::make_shared<TExecuteQueryIterator::TReaderImpl>(pair.second, pair.first.Endpoint) + : nullptr, + std::move(pair.first)) + ); + }; + + StreamExecuteQueryImpl(connections, driverState, query, settings).Subscribe(iteratorCallback); + return promise.GetFuture(); +} + +TAsyncExecuteQueryResult TExecQueryImpl::ExecuteQuery(const std::shared_ptr<TGRpcConnectionsImpl>& connections, + const TDbDriverStatePtr& driverState, const TString& query, const TExecuteQuerySettings& settings) +{ + return StreamExecuteQuery(connections, driverState, query, settings) + .Apply([](TAsyncExecuteQueryIterator itFuture){ + auto it = itFuture.ExtractValue(); + + if (!it.IsSuccess()) { + return MakeFuture<TExecuteQueryResult>(std::move(it)); + } + + auto buffer = MakeIntrusive<TExecuteQueryBuffer>(std::move(it)); + buffer->Next(); + + return buffer->Promise_.GetFuture(); + }); +} + +} // namespace NYdb::NQuery diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.h b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.h new file mode 100644 index 0000000000..dbb448c983 --- /dev/null +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.h @@ -0,0 +1,19 @@ +#pragma once + +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/internal_header.h> + +#include <ydb/public/sdk/cpp/client/draft/ydb_query/query.h> +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h> + +namespace NYdb::NQuery { + +class TExecQueryImpl { +public: + static TAsyncExecuteQueryIterator StreamExecuteQuery(const std::shared_ptr<TGRpcConnectionsImpl>& connections, + const TDbDriverStatePtr& driverState, const TString& query, const TExecuteQuerySettings& settings); + + static TAsyncExecuteQueryResult ExecuteQuery(const std::shared_ptr<TGRpcConnectionsImpl>& connections, + const TDbDriverStatePtr& driverState, const TString& query, const TExecuteQuerySettings& settings); +}; + +} // namespace NYdb::NQuery::NImpl diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/query.cpp b/ydb/public/sdk/cpp/client/draft/ydb_query/query.cpp new file mode 100644 index 0000000000..bf1a8e517b --- /dev/null +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/query.cpp @@ -0,0 +1,17 @@ +#include "query.h" + +namespace NYdb::NQuery { + +const TVector<TResultSet>& TExecuteQueryResult::GetResultSets() const { + return ResultSets_; +} + +TResultSet TExecuteQueryResult::GetResultSet(size_t resultIndex) const { + if (resultIndex >= ResultSets_.size()) { + RaiseError(TString("Requested index out of range\n")); + } + + return ResultSets_[resultIndex]; +} + +} // namespace NYdb::NQuery diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/query.h b/ydb/public/sdk/cpp/client/draft/ydb_query/query.h new file mode 100644 index 0000000000..4c0f0a71e3 --- /dev/null +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/query.h @@ -0,0 +1,81 @@ +#pragma once + +#include <ydb/public/api/grpc/draft/ydb_query_v1.grpc.pb.h> + +#include <ydb/public/sdk/cpp/client/ydb_result/result.h> +#include <ydb/public/sdk/cpp/client/ydb_types/request_settings.h> +#include <ydb/public/sdk/cpp/client/ydb_types/status/status.h> + +#include <library/cpp/threading/future/future.h> + +namespace NYdb::NQuery { + +class TExecuteQueryPart : public TStreamPartStatus { +public: + bool HasResultSet() const { return ResultSet_.Defined(); } + ui64 GetResultSetIndex() const { return ResultSetIndex_; } + const TResultSet& GetResultSet() const { return *ResultSet_; } + TResultSet ExtractResultSet() { return std::move(*ResultSet_); } + + TExecuteQueryPart(TStatus&& status) + : TStreamPartStatus(std::move(status)) + {} + + TExecuteQueryPart(TStatus&& status, TResultSet&& resultSet, i64 resultSetIndex) + : TStreamPartStatus(std::move(status)) + , ResultSet_(std::move(resultSet)) + , ResultSetIndex_(resultSetIndex) + {} + +private: + TMaybe<TResultSet> ResultSet_; + i64 ResultSetIndex_ = 0; +}; + +using TAsyncExecuteQueryPart = NThreading::TFuture<TExecuteQueryPart>; + +class TExecuteQueryIterator : public TStatus { + friend class TExecQueryImpl; +public: + class TReaderImpl; + + TAsyncExecuteQueryPart ReadNext(); + +private: + TExecuteQueryIterator( + std::shared_ptr<TReaderImpl> impl, + TPlainStatus&& status) + : TStatus(std::move(status)) + , ReaderImpl_(impl) {} + + std::shared_ptr<TReaderImpl> ReaderImpl_; +}; + +using TAsyncExecuteQueryIterator = NThreading::TFuture<TExecuteQueryIterator>; + +struct TExecuteQuerySettings : public TRequestSettings<TExecuteQuerySettings> { +}; + +class TExecuteQueryResult : public TStatus { +public: + const TVector<TResultSet>& GetResultSets() const; + TResultSet GetResultSet(size_t resultIndex) const; + + TResultSetParser GetResultSetParser(size_t resultIndex) const; + + TExecuteQueryResult(TStatus&& status) + : TStatus(std::move(status)) + {} + + TExecuteQueryResult(TStatus&& status, TVector<TResultSet>&& resultSets) + : TStatus(std::move(status)) + , ResultSets_(std::move(resultSets)) + {} + +private: + TVector<TResultSet> ResultSets_; +}; + +using TAsyncExecuteQueryResult = NThreading::TFuture<TExecuteQueryResult>; + +} // namespace NYdb::NQuery diff --git a/ydb/services/ydb/CMakeLists.darwin.txt b/ydb/services/ydb/CMakeLists.darwin.txt index f42a596347..9ae9e8c513 100644 --- a/ydb/services/ydb/CMakeLists.darwin.txt +++ b/ydb/services/ydb/CMakeLists.darwin.txt @@ -39,6 +39,7 @@ target_sources(ydb-services-ydb PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_import.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_logstore.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_operation.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_query.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scheme.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scripting.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_table.cpp diff --git a/ydb/services/ydb/CMakeLists.linux-aarch64.txt b/ydb/services/ydb/CMakeLists.linux-aarch64.txt index 63a005d2ef..a609f4dece 100644 --- a/ydb/services/ydb/CMakeLists.linux-aarch64.txt +++ b/ydb/services/ydb/CMakeLists.linux-aarch64.txt @@ -40,6 +40,7 @@ target_sources(ydb-services-ydb PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_import.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_logstore.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_operation.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_query.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scheme.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scripting.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_table.cpp diff --git a/ydb/services/ydb/CMakeLists.linux.txt b/ydb/services/ydb/CMakeLists.linux.txt index 63a005d2ef..a609f4dece 100644 --- a/ydb/services/ydb/CMakeLists.linux.txt +++ b/ydb/services/ydb/CMakeLists.linux.txt @@ -40,6 +40,7 @@ target_sources(ydb-services-ydb PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_import.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_logstore.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_operation.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_query.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scheme.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_scripting.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_table.cpp diff --git a/ydb/services/ydb/ydb_query.cpp b/ydb/services/ydb/ydb_query.cpp new file mode 100644 index 0000000000..97de31837c --- /dev/null +++ b/ydb/services/ydb/ydb_query.cpp @@ -0,0 +1,35 @@ +#include "ydb_query.h" + +#include <ydb/core/grpc_services/base/base.h> +#include <ydb/core/grpc_services/counters/counters.h> +#include <ydb/core/grpc_services/grpc_helper.h> +#include <ydb/core/grpc_services/service_query.h> + +namespace NKikimr::NGRpcService { + +void TGRpcYdbQueryService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { + using Ydb::Query::ExecuteQueryRequest; + using Ydb::Query::ExecuteQueryResponsePart; + + auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); + +#ifdef ADD_REQUEST +#error ADD_REQUEST macro already defined +#endif +#define ADD_REQUEST(NAME, IN, OUT, ACTION) \ + MakeIntrusive<TGRpcRequest<Ydb::Query::IN, Ydb::Query::OUT, TGRpcYdbQueryService>>(this, &Service_, CQ_, \ + [this](NGrpc::IRequestContextBase *ctx) { \ + NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ + ACTION; \ + }, &Ydb::Query::V1::QueryService::AsyncService::Request ## NAME, \ + #NAME, logger, getCounterBlock("query", #NAME))->Run(); + + ADD_REQUEST(ExecuteQuery, ExecuteQueryRequest, ExecuteQueryResponsePart, { + ActorSystem_->Send(GRpcRequestProxyId_, + new TGrpcRequestNoOperationCall<ExecuteQueryRequest, ExecuteQueryResponsePart> + (ctx, &DoExecuteQueryRequest, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); + }) +#undef ADD_REQUEST +} + +} // namespace NKikimr::NGRpcService diff --git a/ydb/services/ydb/ydb_query.h b/ydb/services/ydb/ydb_query.h new file mode 100644 index 0000000000..a866d6070a --- /dev/null +++ b/ydb/services/ydb/ydb_query.h @@ -0,0 +1,18 @@ +#pragma once + +#include <ydb/core/grpc_services/base/base_service.h> +#include <ydb/public/api/grpc/draft/ydb_query_v1.grpc.pb.h> + +namespace NKikimr::NGRpcService { + +class TGRpcYdbQueryService + : public TGrpcServiceBase<Ydb::Query::V1::QueryService> +{ +public: + using TGrpcServiceBase<Ydb::Query::V1::QueryService>::TGrpcServiceBase; + +private: + void SetupIncomingRequests(NGrpc::TLoggerPtr logger); +}; + +} // namespace NKikimr::NGRpcService |