diff options
author | dcherednik <[email protected]> | 2023-06-26 19:39:22 +0300 |
---|---|---|
committer | dcherednik <[email protected]> | 2023-06-26 19:39:22 +0300 |
commit | 6c579bbd8c764857a7a200b32e7d1f95738a667b (patch) | |
tree | 0aa701a7e273e91c03a2c44b92bbc8ab64a6113f | |
parent | bb35bb7376709e02fc037b9c1e93f5a416287950 (diff) |
QueryService create session implementation
QueryService create session implementation.
move rpc_create_session
63 files changed, 362 insertions, 220 deletions
diff --git a/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt b/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt index 518f9b5671e..a1f41aa61da 100644 --- a/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt @@ -86,7 +86,6 @@ target_sources(ydb-core-grpc_services PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_copy_tables.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_export.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_create_coordination_node.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_create_session.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_create_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_delete_session.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_describe_coordination_node.cpp @@ -136,6 +135,7 @@ target_sources(ydb-core-grpc_services PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_whoami.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/table_settings.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_common/rpc_common_kqp_session.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/query/rpc_execute_query.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/query/rpc_execute_script.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp diff --git a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt index 2316cc559bd..42b6878c872 100644 --- a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt +++ b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt @@ -87,7 +87,6 @@ target_sources(ydb-core-grpc_services PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_copy_tables.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_export.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_create_coordination_node.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_create_session.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_create_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_delete_session.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_describe_coordination_node.cpp @@ -137,6 +136,7 @@ target_sources(ydb-core-grpc_services PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_whoami.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/table_settings.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_common/rpc_common_kqp_session.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/query/rpc_execute_query.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/query/rpc_execute_script.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp diff --git a/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt b/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt index 2316cc559bd..42b6878c872 100644 --- a/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt +++ b/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt @@ -87,7 +87,6 @@ target_sources(ydb-core-grpc_services PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_copy_tables.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_export.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_create_coordination_node.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_create_session.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_create_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_delete_session.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_describe_coordination_node.cpp @@ -137,6 +136,7 @@ target_sources(ydb-core-grpc_services PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_whoami.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/table_settings.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_common/rpc_common_kqp_session.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/query/rpc_execute_query.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/query/rpc_execute_script.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp diff --git a/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt b/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt index 518f9b5671e..a1f41aa61da 100644 --- a/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt +++ b/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt @@ -86,7 +86,6 @@ target_sources(ydb-core-grpc_services PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_copy_tables.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_export.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_create_coordination_node.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_create_session.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_create_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_delete_session.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_describe_coordination_node.cpp @@ -136,6 +135,7 @@ target_sources(ydb-core-grpc_services PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_whoami.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/table_settings.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_common/rpc_common_kqp_session.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/query/rpc_execute_query.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/query/rpc_execute_script.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp diff --git a/ydb/core/grpc_services/local_rate_limiter.cpp b/ydb/core/grpc_services/local_rate_limiter.cpp index fffcbf6bccc..d77fd26f984 100644 --- a/ydb/core/grpc_services/local_rate_limiter.cpp +++ b/ydb/core/grpc_services/local_rate_limiter.cpp @@ -1,6 +1,6 @@ #include "local_rate_limiter.h" #include "service_ratelimiter_events.h" -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" #include <ydb/core/grpc_services/local_rpc/local_rpc.h> diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp index 48fed86dd11..6fedd688690 100644 --- a/ydb/core/grpc_services/query/rpc_execute_query.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp @@ -462,7 +462,9 @@ private: } // namespace -void DoExecuteQueryRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f) { +namespace NQuery { + +void DoExecuteQuery(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f) { // Use default channel buffer size as inflight limit ui64 inflightLimitBytes = f.GetAppConfig()->GetTableServiceConfig().GetResourceManager().GetChannelBufferSize(); @@ -471,4 +473,6 @@ void DoExecuteQueryRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityPr f.RegisterActor(new TExecuteQueryRPC(req, inflightLimitBytes)); } +} + } // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/query/rpc_execute_script.cpp b/ydb/core/grpc_services/query/rpc_execute_script.cpp index 9d35a44307e..85a8ade5c24 100644 --- a/ydb/core/grpc_services/query/rpc_execute_script.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_script.cpp @@ -168,11 +168,15 @@ private: } // namespace -void DoExecuteScriptRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f) { +namespace NQuery { + +void DoExecuteScript(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f) { Y_UNUSED(f); auto* req = dynamic_cast<TEvExecuteScriptRequest*>(p.release()); Y_VERIFY(req != nullptr, "Wrong using of TGRpcRequestWrapper"); f.RegisterActor(new TExecuteScriptRPC(req)); } +} + } // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp b/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp index e67901e92ac..8a093102ce7 100644 --- a/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp +++ b/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp @@ -186,6 +186,8 @@ private: } // namespace +namespace NQuery { + void DoFetchScriptResults(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f) { Y_UNUSED(f); auto* req = dynamic_cast<TEvFetchScriptResultsRequest*>(p.release()); @@ -193,4 +195,6 @@ void DoFetchScriptResults(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityPro TActivationContext::AsActorContext().Register(new TFetchScriptResultsRPC(req)); } +} + } // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/query/service_query.h b/ydb/core/grpc_services/query/service_query.h index 11880839903..d9225c5b815 100644 --- a/ydb/core/grpc_services/query/service_query.h +++ b/ydb/core/grpc_services/query/service_query.h @@ -8,8 +8,13 @@ class IRequestOpCtx; class IRequestNoOpCtx; class IFacilityProvider; -void DoExecuteQueryRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f); -void DoExecuteScriptRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f); +namespace NQuery { + +void DoExecuteQuery(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f); +void DoExecuteScript(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f); void DoFetchScriptResults(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider&); +void DoCreateSession(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f); + +} // namespace NQuery } // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/rpc_alter_coordination_node.cpp b/ydb/core/grpc_services/rpc_alter_coordination_node.cpp index bfa595d1ef7..cb96d28e8ec 100644 --- a/ydb/core/grpc_services/rpc_alter_coordination_node.cpp +++ b/ydb/core/grpc_services/rpc_alter_coordination_node.cpp @@ -2,7 +2,7 @@ #include <ydb/core/grpc_services/base/base.h> #include "rpc_scheme_base.h" -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" namespace NKikimr { namespace NGRpcService { diff --git a/ydb/core/grpc_services/rpc_alter_table.cpp b/ydb/core/grpc_services/rpc_alter_table.cpp index 6e8e52d768b..9422ddeb602 100644 --- a/ydb/core/grpc_services/rpc_alter_table.cpp +++ b/ydb/core/grpc_services/rpc_alter_table.cpp @@ -2,7 +2,7 @@ #include <ydb/core/grpc_services/base/base.h> #include "rpc_scheme_base.h" -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" #include "operation_helpers.h" #include "table_settings.h" #include "service_table.h" diff --git a/ydb/core/grpc_services/rpc_begin_transaction.cpp b/ydb/core/grpc_services/rpc_begin_transaction.cpp index 187c79b9a71..e0a3f7b9cc4 100644 --- a/ydb/core/grpc_services/rpc_begin_transaction.cpp +++ b/ydb/core/grpc_services/rpc_begin_transaction.cpp @@ -3,7 +3,7 @@ #include "rpc_calls.h" #include "rpc_kqp_base.h" -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" #include "service_table.h" #include <ydb/library/yql/public/issue/yql_issue_message.h> diff --git a/ydb/core/grpc_services/rpc_commit_transaction.cpp b/ydb/core/grpc_services/rpc_commit_transaction.cpp index 8d0b27fd2d6..5ebd5e24e09 100644 --- a/ydb/core/grpc_services/rpc_commit_transaction.cpp +++ b/ydb/core/grpc_services/rpc_commit_transaction.cpp @@ -3,7 +3,7 @@ #include "rpc_calls.h" #include "rpc_kqp_base.h" -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" #include "service_table.h" #include <ydb/library/yql/public/issue/yql_issue_message.h> diff --git a/ydb/core/grpc_services/rpc_common.h b/ydb/core/grpc_services/rpc_common/rpc_common.h index 3044bc5a1d0..7288ce5618b 100644 --- a/ydb/core/grpc_services/rpc_common.h +++ b/ydb/core/grpc_services/rpc_common/rpc_common.h @@ -1,10 +1,10 @@ #pragma once -#include "grpc_request_proxy.h" #include <ydb/core/base/path.h> #include <ydb/core/tx/tx_proxy/proxy.h> #include <ydb/core/util/proto_duration.h> +#include "ydb/core/grpc_services/grpc_request_proxy.h" namespace NKikimr { namespace NGRpcService { diff --git a/ydb/core/grpc_services/rpc_common/rpc_common_kqp_session.cpp b/ydb/core/grpc_services/rpc_common/rpc_common_kqp_session.cpp new file mode 100644 index 00000000000..1baa27feceb --- /dev/null +++ b/ydb/core/grpc_services/rpc_common/rpc_common_kqp_session.cpp @@ -0,0 +1,224 @@ +#include <ydb/core/grpc_services/service_table.h> +#include <ydb/core/grpc_services/query/service_query.h> + +#include <ydb/core/kqp/common/events/events.h> +#include <ydb/core/kqp/common/simple/services.h> + +#include "rpc_common.h" + +#include <ydb/core/grpc_services/local_rpc/local_rpc.h> + +#include <ydb/library/yql/public/issue/yql_issue_message.h> +#include <ydb/library/yql/public/issue/yql_issue.h> +#include <ydb/public/sdk/cpp/client/resources/ydb_resources.h> + +#include <ydb/public/api/protos/draft/ydb_query.pb.h> + +namespace NKikimr { +namespace NGRpcService { + +using namespace NActors; +using namespace Ydb; +using namespace NKqp; + +using TEvCreateSessionRequest = TGrpcRequestOperationCall<Ydb::Table::CreateSessionRequest, + Ydb::Table::CreateSessionResponse>; + +class TCreateSessionRPC : public TActorBootstrapped<TCreateSessionRPC> { +public: + TCreateSessionRPC(IRequestCtx* msg) + : Request(msg) {} + + void Bootstrap(const TActorContext&) { + Become(&TCreateSessionRPC::StateWork); + + auto now = TInstant::Now(); + const auto& deadline = Request->GetDeadline(); + + if (deadline <= now) { + LOG_WARN_S(*TlsActivationContext, NKikimrServices::GRPC_PROXY, + SelfId() << " Request deadline has expired for " << now - deadline << " seconds"); + + Reply(Ydb::StatusIds::TIMEOUT); + return; + } + + auto selfId = this->SelfId(); + auto as = TActivationContext::ActorSystem(); + + Request->SetFinishAction([selfId, as]() { + as->Send(selfId, new TEvents::TEvWakeup); + }); + + CreateSessionImpl(); + } + +private: + void CreateSessionImpl() { + const auto& traceId = Request->GetTraceId(); + auto ev = MakeHolder<NKqp::TEvKqp::TEvCreateSessionRequest>(); + + ev->Record.SetDeadlineUs(Request->GetDeadline().MicroSeconds()); + + if (traceId) { + ev->Record.SetTraceId(traceId.GetRef()); + } + + if (Request->HasClientCapability(NYdb::YDB_CLIENT_CAPABILITY_SESSION_BALANCER)) { + ev->Record.SetCanCreateRemoteSession(true); + ev->Record.SetSupportsBalancing(true); + } + + SetDatabase(ev, *Request); + + Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), ev.Release()); + } + + void StateWork(TAutoPtr<IEventHandle>& ev) { + switch (ev->GetTypeRewrite()) { + HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, Handle); + hFunc(NKqp::TEvKqp::TEvProcessResponse, Handle); + hFunc(TEvents::TEvWakeup, Handle); + } + } + + void Handle(TEvents::TEvWakeup::TPtr&) { + ClientLost = true; + } + + void DoCloseSession(const TActorContext& ctx, const TString& sessionId) { + Ydb::Table::DeleteSessionRequest request; + request.set_session_id(sessionId); + + auto cb = [](const Ydb::Table::DeleteSessionResponse&){}; + + auto database = Request->GetDatabaseName().GetOrElse(""); + + using TEvDeleteSessionRequest = TGrpcRequestOperationCall<Ydb::Table::DeleteSessionRequest, + Ydb::Table::DeleteSessionResponse>; + + auto actorId = NRpcService::DoLocalRpcSameMailbox<TEvDeleteSessionRequest>( + std::move(request), std::move(cb), database, Request->GetSerializedToken(), ctx); + + LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::GRPC_PROXY, + SelfId() << " Client lost, session " << sessionId << " will be closed by " << actorId); + } + + void Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext& ctx) { + const auto& record = ev->Get()->Record; + if (record.GetResourceExhausted()) { + Request->ReplyWithRpcStatus(grpc::StatusCode::RESOURCE_EXHAUSTED, record.GetError()); + Die(ctx); + return; + } + + if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) { + const auto& kqpResponse = record.GetResponse(); + if (ClientLost) { + DoCloseSession(ctx, kqpResponse.GetSessionId()); + // We already lost the client, so the client should not see this status + Reply(Ydb::StatusIds::INTERNAL_ERROR); + } else { + SendSessionResult(kqpResponse.GetSessionId()); + PassAway(); + return; + } + } else { + return ReplyResponseError(record); + } + } + +private: + virtual void SendSessionResult(const TString& id) = 0; + + template<typename TResp> + void ReplyResponseError(const TResp& kqpResponse) { + if (kqpResponse.HasError()) { + Request->RaiseIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, kqpResponse.GetError())); + } + return Reply(kqpResponse.GetYdbStatus()); + } + + void Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev) { + const auto& record = ev->Get()->Record; + if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) { + // KQP should not send TEvProcessResponse with SUCCESS for CreateSession rpc. + // We expect TEvKqp::TEvCreateSessionResponse instead. + static const TString err = "Unexpected TEvProcessResponse with success status for CreateSession request"; + Request->RaiseIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, err)); + Reply(Ydb::StatusIds::INTERNAL_ERROR); + } else { + return ReplyResponseError(record); + } + } + + void Reply(Ydb::StatusIds::StatusCode status) { + Request->ReplyWithYdbStatus(status); + this->PassAway(); + } + + void Reply(Ydb::StatusIds::StatusCode status, NProtoBuf::Message* resp) { + Request->Reply(resp, status); + this->PassAway(); + } + +protected: + std::shared_ptr<IRequestCtx> Request; + +private: + bool ClientLost = false; +}; + +class TCreateSessionTableService : public TCreateSessionRPC { + using TCtx = IRequestOpCtx; + +public: + using TCreateSessionRPC::TCreateSessionRPC; + static TCreateSessionRPC* New(TCtx* ctx) { + return new TCreateSessionTableService(ctx); + } + +private: + void SendSessionResult(const TString& id) override { + Ydb::Table::CreateSessionResult result; + result.set_session_id(id); + static_cast<TCtx*>(Request.get())->SendResult(result, Ydb::StatusIds::SUCCESS); + }; +}; + +class TCreateSessionQueryService : public TCreateSessionRPC { +public: + using TCreateSessionRPC::TCreateSessionRPC; + static TCreateSessionRPC* New(IRequestNoOpCtx* ctx) { + return new TCreateSessionQueryService(ctx); + } + +private: + void SendSessionResult(const TString& id) override { + using TRes = Ydb::Query::CreateSessionResponse; + auto res = google::protobuf::Arena::CreateMessage<TRes>(Request->GetArena());; + res->set_status(Ydb::StatusIds::SUCCESS); + res->set_session_id(id); + Request->Reply(res, Ydb::StatusIds::SUCCESS); + }; +}; + +void DoCreateSessionRequest(std::unique_ptr<IRequestOpCtx> ctx, const IFacilityProvider& provider) { + provider.RegisterActor(TCreateSessionTableService::New(ctx.release())); +} + +template<> +IActor* TEvCreateSessionRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg) { + return TCreateSessionTableService::New(msg); +} + +namespace NQuery { + +void DoCreateSession(std::unique_ptr<IRequestNoOpCtx> ctx, const IFacilityProvider& provider) { + provider.RegisterActor(TCreateSessionQueryService::New(ctx.release())); +} + +} + +} // namespace NGRpcService +} // namespace NKikimr diff --git a/ydb/core/grpc_services/rpc_copy_table.cpp b/ydb/core/grpc_services/rpc_copy_table.cpp index 747040c1a46..729be8e20d2 100644 --- a/ydb/core/grpc_services/rpc_copy_table.cpp +++ b/ydb/core/grpc_services/rpc_copy_table.cpp @@ -3,7 +3,7 @@ #include "rpc_calls.h" #include "rpc_scheme_base.h" -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" namespace NKikimr { namespace NGRpcService { diff --git a/ydb/core/grpc_services/rpc_copy_tables.cpp b/ydb/core/grpc_services/rpc_copy_tables.cpp index 16eb52d8887..a31c0175589 100644 --- a/ydb/core/grpc_services/rpc_copy_tables.cpp +++ b/ydb/core/grpc_services/rpc_copy_tables.cpp @@ -3,7 +3,7 @@ #include "rpc_calls.h" #include "rpc_scheme_base.h" -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" namespace NKikimr { namespace NGRpcService { diff --git a/ydb/core/grpc_services/rpc_create_coordination_node.cpp b/ydb/core/grpc_services/rpc_create_coordination_node.cpp index cdcaea463c3..883e22790ba 100644 --- a/ydb/core/grpc_services/rpc_create_coordination_node.cpp +++ b/ydb/core/grpc_services/rpc_create_coordination_node.cpp @@ -2,7 +2,7 @@ #include <ydb/core/grpc_services/base/base.h> #include "rpc_scheme_base.h" -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" namespace NKikimr { namespace NGRpcService { diff --git a/ydb/core/grpc_services/rpc_create_session.cpp b/ydb/core/grpc_services/rpc_create_session.cpp deleted file mode 100644 index 81f6daaba8b..00000000000 --- a/ydb/core/grpc_services/rpc_create_session.cpp +++ /dev/null @@ -1,160 +0,0 @@ -#include "service_table.h" -#include <ydb/core/grpc_services/base/base.h> -#include "rpc_calls.h" -#include "rpc_common.h" -#include "rpc_kqp_base.h" -#include "service_table.h" - -#include <ydb/core/grpc_services/local_rpc/local_rpc.h> - -#include <ydb/library/yql/public/issue/yql_issue_message.h> -#include <ydb/library/yql/public/issue/yql_issue.h> -#include <ydb/public/sdk/cpp/client/resources/ydb_resources.h> - -namespace NKikimr { -namespace NGRpcService { - -using namespace NActors; -using namespace Ydb; -using namespace NKqp; - -using TEvCreateSessionRequest = TGrpcRequestOperationCall<Ydb::Table::CreateSessionRequest, - Ydb::Table::CreateSessionResponse>; - -class TCreateSessionRPC : public TRpcKqpRequestActor<TCreateSessionRPC, TEvCreateSessionRequest> { - using TBase = TRpcKqpRequestActor<TCreateSessionRPC, TEvCreateSessionRequest>; - -public: - TCreateSessionRPC(IRequestOpCtx* msg) - : TBase(msg) {} - - void Bootstrap(const TActorContext& ctx) { - TBase::Bootstrap(ctx); - - Become(&TCreateSessionRPC::StateWork); - - auto now = TInstant::Now(); - - if (Request().GetDeadline() <= now) { - LOG_WARN_S(*TlsActivationContext, NKikimrServices::GRPC_PROXY, - SelfId() << " Request deadline has expired for " << now - Request().GetDeadline() << " seconds"); - - Reply(Ydb::StatusIds::TIMEOUT, ctx); - return; - } - - auto selfId = this->SelfId(); - auto as = TActivationContext::ActorSystem(); - - Request_->SetFinishAction([selfId, as]() { - as->Send(selfId, new TEvents::TEvWakeup(EWakeupTag::WakeupTagClientLost)); - }); - - CreateSessionImpl(); - } - -private: - void CreateSessionImpl() { - const auto traceId = Request().GetTraceId(); - auto ev = MakeHolder<NKqp::TEvKqp::TEvCreateSessionRequest>(); - - ev->Record.SetDeadlineUs(Request().GetDeadline().MicroSeconds()); - - if (traceId) { - ev->Record.SetTraceId(traceId.GetRef()); - } - - if (Request().HasClientCapability(NYdb::YDB_CLIENT_CAPABILITY_SESSION_BALANCER)) { - ev->Record.SetCanCreateRemoteSession(true); - ev->Record.SetSupportsBalancing(true); - } - - SetDatabase(ev, Request()); - - Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), ev.Release()); - } - - void StateWork(TAutoPtr<IEventHandle>& ev) { - switch (ev->GetTypeRewrite()) { - HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, Handle); - HFunc(TEvents::TEvWakeup, Handle); - // Overide default forget action which terminate this actor on client disconnect - hFunc(TRpcServices::TEvForgetOperation, HandleForget); - default: TBase::StateWork(ev); - } - } - - void HandleForget(TRpcServices::TEvForgetOperation::TPtr &ev) { - Y_UNUSED(ev); - } - - void Handle(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx) { - switch ((EWakeupTag) ev->Get()->Tag) { - case EWakeupTag::WakeupTagClientLost: - return HandleClientLost(); - default: TBase::HandleWakeup(ev, ctx); - } - } - - void HandleClientLost() { - ClientLost = true; - } - - void DoCloseSession(const TActorContext& ctx, const TString& sessionId) { - Ydb::Table::DeleteSessionRequest request; - request.set_session_id(sessionId); - - auto cb = [](const Ydb::Table::DeleteSessionResponse&){}; - - auto database = Request_->GetDatabaseName().GetOrElse(""); - - using TEvDeleteSessionRequest = TGrpcRequestOperationCall<Ydb::Table::DeleteSessionRequest, - Ydb::Table::DeleteSessionResponse>; - - auto actorId = NRpcService::DoLocalRpcSameMailbox<TEvDeleteSessionRequest>( - std::move(request), std::move(cb), database, Request_->GetSerializedToken(), ctx); - - LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::GRPC_PROXY, - SelfId() << " Client lost, session " << sessionId << " will be closed by " << actorId); - } - - void Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext& ctx) { - const auto& record = ev->Get()->Record; - if (record.GetResourceExhausted()) { - Request().ReplyWithRpcStatus(grpc::StatusCode::RESOURCE_EXHAUSTED, record.GetError()); - Die(ctx); - return; - } - - if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) { - const auto& kqpResponse = record.GetResponse(); - Ydb::Table::CreateSessionResult result; - if (ClientLost) { - DoCloseSession(ctx, kqpResponse.GetSessionId()); - // We already lost the client, so the client should not see this status - Reply(Ydb::StatusIds::INTERNAL_ERROR, ctx); - } else { - result.set_session_id(kqpResponse.GetSessionId()); - return ReplyWithResult(Ydb::StatusIds::SUCCESS, result, ctx); - } - } else { - return OnQueryResponseError(record, ctx); - } - } -private: - - bool ClientLost = false; - -}; - -void DoCreateSessionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& provider) { - provider.RegisterActor(new TCreateSessionRPC(p.release())); -} - -template<> -IActor* TEvCreateSessionRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg) { - return new TCreateSessionRPC(msg); -} - -} // namespace NGRpcService -} // namespace NKikimr diff --git a/ydb/core/grpc_services/rpc_create_table.cpp b/ydb/core/grpc_services/rpc_create_table.cpp index 211eb9e4fae..857110255ec 100644 --- a/ydb/core/grpc_services/rpc_create_table.cpp +++ b/ydb/core/grpc_services/rpc_create_table.cpp @@ -2,7 +2,7 @@ #include <ydb/core/grpc_services/base/base.h> #include "rpc_calls.h" #include "rpc_scheme_base.h" -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" #include "table_settings.h" #include <ydb/core/cms/console/configs_dispatcher.h> diff --git a/ydb/core/grpc_services/rpc_deferrable.h b/ydb/core/grpc_services/rpc_deferrable.h index a2f60085792..a65d7153966 100644 --- a/ydb/core/grpc_services/rpc_deferrable.h +++ b/ydb/core/grpc_services/rpc_deferrable.h @@ -4,7 +4,7 @@ #include "grpc_request_proxy.h" #include "cancelation/cancelation.h" #include "cancelation/cancelation_event.h" -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" #include <ydb/core/tx/tx_proxy/proxy.h> #include <ydb/core/base/kikimr_issue.h> @@ -27,6 +27,18 @@ private: typedef TActorBootstrapped<TDerived> TBase; typedef typename std::conditional<IsOperation, IRequestOpCtx, IRequestNoOpCtx>::type TRequestBase; + template<typename TIn, typename TOut> + void Fill(const TIn* in, TOut* out) { + auto& operationParams = in->operation_params(); + out->OperationTimeout_ = GetDuration(operationParams.operation_timeout()); + out->CancelAfter_ = GetDuration(operationParams.cancel_after()); + out->ReportCostInfo_ = operationParams.report_cost_info() == Ydb::FeatureFlag::ENABLED; + } + + template<typename TOut> + void Fill(const NProtoBuf::Message*, TOut*) { + } + public: enum EWakeupTag { WakeupTagTimeout = 10, @@ -39,10 +51,11 @@ public: TRpcRequestWithOperationParamsActor(TRequestBase* request) : Request_(request) { - auto& operationParams = GetProtoRequest()->operation_params(); - OperationTimeout_ = GetDuration(operationParams.operation_timeout()); - CancelAfter_ = GetDuration(operationParams.cancel_after()); - ReportCostInfo_ = operationParams.report_cost_info() == Ydb::FeatureFlag::ENABLED; + Fill(GetProtoRequest(), this); + //auto& operationParams = GetProtoRequest()->operation_params(); + //OperationTimeout_ = GetDuration(operationParams.operation_timeout()); + //CancelAfter_ = GetDuration(operationParams.cancel_after()); + //ReportCostInfo_ = operationParams.report_cost_info() == Ydb::FeatureFlag::ENABLED; } const typename TRequest::TRequest* GetProtoRequest() const { diff --git a/ydb/core/grpc_services/rpc_describe_coordination_node.cpp b/ydb/core/grpc_services/rpc_describe_coordination_node.cpp index 8e35b4a450d..cdd9609bc2c 100644 --- a/ydb/core/grpc_services/rpc_describe_coordination_node.cpp +++ b/ydb/core/grpc_services/rpc_describe_coordination_node.cpp @@ -2,7 +2,7 @@ #include <ydb/core/grpc_services/base/base.h> #include "rpc_scheme_base.h" -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" #include <ydb/core/protos/flat_tx_scheme.pb.h> #include <ydb/core/tx/schemeshard/schemeshard.h> diff --git a/ydb/core/grpc_services/rpc_describe_path.cpp b/ydb/core/grpc_services/rpc_describe_path.cpp index 0ac91a25f1b..01f25e6a632 100644 --- a/ydb/core/grpc_services/rpc_describe_path.cpp +++ b/ydb/core/grpc_services/rpc_describe_path.cpp @@ -2,7 +2,7 @@ #include <ydb/core/grpc_services/base/base.h> #include "rpc_scheme_base.h" -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" #include <ydb/core/protos/flat_tx_scheme.pb.h> #include <ydb/core/tx/schemeshard/schemeshard.h> #include <ydb/core/tx/scheme_cache/scheme_cache.h> diff --git a/ydb/core/grpc_services/rpc_describe_table.cpp b/ydb/core/grpc_services/rpc_describe_table.cpp index 9b882a357e1..cb499db8ed1 100644 --- a/ydb/core/grpc_services/rpc_describe_table.cpp +++ b/ydb/core/grpc_services/rpc_describe_table.cpp @@ -5,7 +5,7 @@ #include "rpc_scheme_base.h" #include "service_table.h" -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" #include <ydb/core/tx/schemeshard/schemeshard.h> #include <ydb/core/ydb_convert/table_description.h> #include <ydb/core/ydb_convert/ydb_convert.h> diff --git a/ydb/core/grpc_services/rpc_describe_table_options.cpp b/ydb/core/grpc_services/rpc_describe_table_options.cpp index 46eef9302e6..122be242636 100644 --- a/ydb/core/grpc_services/rpc_describe_table_options.cpp +++ b/ydb/core/grpc_services/rpc_describe_table_options.cpp @@ -3,7 +3,7 @@ #include "rpc_calls.h" #include "rpc_scheme_base.h" -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" #include "service_table.h" #include <ydb/core/cms/console/configs_dispatcher.h> diff --git a/ydb/core/grpc_services/rpc_drop_coordination_node.cpp b/ydb/core/grpc_services/rpc_drop_coordination_node.cpp index 443e4ff2678..08914e8c8cb 100644 --- a/ydb/core/grpc_services/rpc_drop_coordination_node.cpp +++ b/ydb/core/grpc_services/rpc_drop_coordination_node.cpp @@ -2,7 +2,7 @@ #include <ydb/core/grpc_services/base/base.h> #include "rpc_scheme_base.h" -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" namespace NKikimr { namespace NGRpcService { diff --git a/ydb/core/grpc_services/rpc_drop_table.cpp b/ydb/core/grpc_services/rpc_drop_table.cpp index aa0740613e9..a09fcdcfd39 100644 --- a/ydb/core/grpc_services/rpc_drop_table.cpp +++ b/ydb/core/grpc_services/rpc_drop_table.cpp @@ -4,7 +4,7 @@ #include "service_table.h" #include "rpc_calls.h" #include "rpc_scheme_base.h" -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" namespace NKikimr { namespace NGRpcService { diff --git a/ydb/core/grpc_services/rpc_execute_data_query.cpp b/ydb/core/grpc_services/rpc_execute_data_query.cpp index 2337f1dcb56..201a61629b9 100644 --- a/ydb/core/grpc_services/rpc_execute_data_query.cpp +++ b/ydb/core/grpc_services/rpc_execute_data_query.cpp @@ -1,7 +1,7 @@ #include "service_table.h" #include <ydb/core/grpc_services/base/base.h> #include "rpc_kqp_base.h" -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" #include "service_table.h" #include <ydb/core/grpc_services/base/base.h> diff --git a/ydb/core/grpc_services/rpc_execute_scheme_query.cpp b/ydb/core/grpc_services/rpc_execute_scheme_query.cpp index 7fff4348bfb..3e8e292a8ed 100644 --- a/ydb/core/grpc_services/rpc_execute_scheme_query.cpp +++ b/ydb/core/grpc_services/rpc_execute_scheme_query.cpp @@ -3,7 +3,7 @@ #include "rpc_calls.h" #include "rpc_kqp_base.h" -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" #include "service_table.h" #include <ydb/core/protos/console_config.pb.h> diff --git a/ydb/core/grpc_services/rpc_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_execute_yql_script.cpp index 8d07dfad7d4..2d51d0780d2 100644 --- a/ydb/core/grpc_services/rpc_execute_yql_script.cpp +++ b/ydb/core/grpc_services/rpc_execute_yql_script.cpp @@ -1,6 +1,6 @@ #include "service_yql_scripting.h" #include "rpc_kqp_base.h" -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" #include <ydb/public/api/protos/ydb_scripting.pb.h> diff --git a/ydb/core/grpc_services/rpc_explain_data_query.cpp b/ydb/core/grpc_services/rpc_explain_data_query.cpp index a74d652c5e4..c6b7a1a0dba 100644 --- a/ydb/core/grpc_services/rpc_explain_data_query.cpp +++ b/ydb/core/grpc_services/rpc_explain_data_query.cpp @@ -3,7 +3,7 @@ #include "rpc_calls.h" #include "rpc_kqp_base.h" -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" #include "service_table.h" #include <ydb/core/protos/console_config.pb.h> diff --git a/ydb/core/grpc_services/rpc_explain_yql_script.cpp b/ydb/core/grpc_services/rpc_explain_yql_script.cpp index 5c3d604351f..8494ad758ff 100644 --- a/ydb/core/grpc_services/rpc_explain_yql_script.cpp +++ b/ydb/core/grpc_services/rpc_explain_yql_script.cpp @@ -1,6 +1,6 @@ #include "service_yql_scripting.h" #include "rpc_kqp_base.h" -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" #include <ydb/public/api/protos/ydb_scripting.pb.h> diff --git a/ydb/core/grpc_services/rpc_fq.cpp b/ydb/core/grpc_services/rpc_fq.cpp index 200347e2a6f..35e098f5d13 100644 --- a/ydb/core/grpc_services/rpc_fq.cpp +++ b/ydb/core/grpc_services/rpc_fq.cpp @@ -1,4 +1,4 @@ -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" #include "rpc_deferrable.h" #include <ydb/core/grpc_services/service_fq.h> diff --git a/ydb/core/grpc_services/rpc_fq_internal.cpp b/ydb/core/grpc_services/rpc_fq_internal.cpp index 10f1a2b5778..6cc80f3777e 100644 --- a/ydb/core/grpc_services/rpc_fq_internal.cpp +++ b/ydb/core/grpc_services/rpc_fq_internal.cpp @@ -1,5 +1,5 @@ #include "service_fq_internal.h" -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" #include "rpc_deferrable.h" #include <ydb/core/fq/libs/events/events.h> diff --git a/ydb/core/grpc_services/rpc_get_shard_locations.cpp b/ydb/core/grpc_services/rpc_get_shard_locations.cpp index 351fd6a004c..acaa3f5af41 100644 --- a/ydb/core/grpc_services/rpc_get_shard_locations.cpp +++ b/ydb/core/grpc_services/rpc_get_shard_locations.cpp @@ -1,6 +1,6 @@ #include "grpc_request_proxy.h" #include "rpc_calls.h" -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" #include <ydb/library/aclib/aclib.h> #include <ydb/core/base/tablet_pipe.h> diff --git a/ydb/core/grpc_services/rpc_import_data.cpp b/ydb/core/grpc_services/rpc_import_data.cpp index 82cc482ee23..88021a1b58d 100644 --- a/ydb/core/grpc_services/rpc_import_data.cpp +++ b/ydb/core/grpc_services/rpc_import_data.cpp @@ -1,6 +1,6 @@ #include "grpc_request_proxy.h" #include "rpc_calls.h" -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" #include "rpc_request_base.h" #include <ydb/public/api/protos/ydb_import.pb.h> diff --git a/ydb/core/grpc_services/rpc_keep_alive.cpp b/ydb/core/grpc_services/rpc_keep_alive.cpp index 0811455f350..d020684c177 100644 --- a/ydb/core/grpc_services/rpc_keep_alive.cpp +++ b/ydb/core/grpc_services/rpc_keep_alive.cpp @@ -3,7 +3,7 @@ #include "rpc_calls.h" #include "rpc_kqp_base.h" -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" #include "service_table.h" diff --git a/ydb/core/grpc_services/rpc_keyvalue.cpp b/ydb/core/grpc_services/rpc_keyvalue.cpp index aea0d568c92..fa504922bd6 100644 --- a/ydb/core/grpc_services/rpc_keyvalue.cpp +++ b/ydb/core/grpc_services/rpc_keyvalue.cpp @@ -4,7 +4,7 @@ #include <ydb/core/base/path.h> #include <ydb/core/grpc_services/rpc_scheme_base.h> -#include <ydb/core/grpc_services/rpc_common.h> +#include <ydb/core/grpc_services/rpc_common/rpc_common.h> #include <ydb/core/keyvalue/keyvalue_events.h> #include <ydb/core/tx/scheme_cache/scheme_cache.h> #include <ydb/core/mind/local.h> diff --git a/ydb/core/grpc_services/rpc_kh_describe.cpp b/ydb/core/grpc_services/rpc_kh_describe.cpp index 4f2c59582a0..b1c4e0dc390 100644 --- a/ydb/core/grpc_services/rpc_kh_describe.cpp +++ b/ydb/core/grpc_services/rpc_kh_describe.cpp @@ -1,7 +1,7 @@ #include "service_coordination.h" #include <ydb/core/grpc_services/base/base.h> -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" #include "resolve_local_db_table.h" #include <ydb/library/aclib/aclib.h> @@ -9,6 +9,7 @@ #include <ydb/core/tx/scheme_cache/scheme_cache.h> #include <ydb/core/tablet_flat/tablet_flat_executed.h> #include <ydb/core/base/tablet_pipecache.h> +#include <ydb/public/api/protos/ydb_clickhouse_internal.pb.h> #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/hfunc.h> diff --git a/ydb/core/grpc_services/rpc_kh_snapshots.cpp b/ydb/core/grpc_services/rpc_kh_snapshots.cpp index fbc6e471841..b6f1be215d6 100644 --- a/ydb/core/grpc_services/rpc_kh_snapshots.cpp +++ b/ydb/core/grpc_services/rpc_kh_snapshots.cpp @@ -4,7 +4,7 @@ #include "rpc_kh_snapshots.h" #include "resolve_local_db_table.h" -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" #include "rpc_deferrable.h" #include <ydb/core/actorlib_impl/long_timer.h> diff --git a/ydb/core/grpc_services/rpc_load_rows.cpp b/ydb/core/grpc_services/rpc_load_rows.cpp index 576c863c7be..847be3a1634 100644 --- a/ydb/core/grpc_services/rpc_load_rows.cpp +++ b/ydb/core/grpc_services/rpc_load_rows.cpp @@ -1,6 +1,6 @@ #include <ydb/core/grpc_services/base/base.h> -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" #include "service_table.h" #include <ydb/core/tx/tx_proxy/upload_rows_common_impl.h> diff --git a/ydb/core/grpc_services/rpc_log_store.cpp b/ydb/core/grpc_services/rpc_log_store.cpp index fc5e1d9f37a..3b967470737 100644 --- a/ydb/core/grpc_services/rpc_log_store.cpp +++ b/ydb/core/grpc_services/rpc_log_store.cpp @@ -1,5 +1,5 @@ #include "service_logstore.h" -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" #include "rpc_scheme_base.h" #include <ydb/core/ydb_convert/table_description.h> diff --git a/ydb/core/grpc_services/rpc_long_tx.cpp b/ydb/core/grpc_services/rpc_long_tx.cpp index 7d40b17835b..e3d9e95dc10 100644 --- a/ydb/core/grpc_services/rpc_long_tx.cpp +++ b/ydb/core/grpc_services/rpc_long_tx.cpp @@ -1,4 +1,4 @@ -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" #include "rpc_deferrable.h" #include "service_longtx.h" diff --git a/ydb/core/grpc_services/rpc_make_directory.cpp b/ydb/core/grpc_services/rpc_make_directory.cpp index 0d118557d79..b14e9e10d3b 100644 --- a/ydb/core/grpc_services/rpc_make_directory.cpp +++ b/ydb/core/grpc_services/rpc_make_directory.cpp @@ -1,7 +1,7 @@ #include "service_scheme.h" #include "rpc_scheme_base.h" -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" #include <ydb/core/grpc_services/base/base.h> #include <ydb/core/protos/flat_tx_scheme.pb.h> #include <ydb/public/api/protos/ydb_scheme.pb.h> diff --git a/ydb/core/grpc_services/rpc_modify_permissions.cpp b/ydb/core/grpc_services/rpc_modify_permissions.cpp index ac7181fc168..5c6091f725a 100644 --- a/ydb/core/grpc_services/rpc_modify_permissions.cpp +++ b/ydb/core/grpc_services/rpc_modify_permissions.cpp @@ -1,7 +1,7 @@ #include "service_scheme.h" #include "rpc_scheme_base.h" -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" #include <ydb/core/protos/flat_tx_scheme.pb.h> #include <ydb/core/tx/schemeshard/schemeshard.h> #include <ydb/core/ydb_convert/ydb_convert.h> diff --git a/ydb/core/grpc_services/rpc_prepare_data_query.cpp b/ydb/core/grpc_services/rpc_prepare_data_query.cpp index b4b6459a72c..62a196fa172 100644 --- a/ydb/core/grpc_services/rpc_prepare_data_query.cpp +++ b/ydb/core/grpc_services/rpc_prepare_data_query.cpp @@ -3,7 +3,7 @@ #include "rpc_calls.h" #include "rpc_kqp_base.h" -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" #include "service_table.h" #include <ydb/core/protos/console_config.pb.h> diff --git a/ydb/core/grpc_services/rpc_rate_limiter_api.cpp b/ydb/core/grpc_services/rpc_rate_limiter_api.cpp index c91bb009bc8..ebd720b0eec 100644 --- a/ydb/core/grpc_services/rpc_rate_limiter_api.cpp +++ b/ydb/core/grpc_services/rpc_rate_limiter_api.cpp @@ -3,7 +3,7 @@ #include "rpc_calls.h" #include "rpc_scheme_base.h" -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" #include <ydb/core/base/quoter.h> #include <ydb/core/kesus/tablet/events.h> diff --git a/ydb/core/grpc_services/rpc_read_columns.cpp b/ydb/core/grpc_services/rpc_read_columns.cpp index 5f17fd72f08..4bbc49f88ed 100644 --- a/ydb/core/grpc_services/rpc_read_columns.cpp +++ b/ydb/core/grpc_services/rpc_read_columns.cpp @@ -1,7 +1,7 @@ #include "service_coordination.h" #include <ydb/core/grpc_services/base/base.h> -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" #include "rpc_kh_snapshots.h" #include "resolve_local_db_table.h" #include <ydb/core/tx/scheme_cache/scheme_cache.h> @@ -13,6 +13,7 @@ #include <ydb/core/sys_view/scan.h> #include <ydb/core/formats/factory.h> #include <ydb/core/tablet_flat/tablet_flat_executed.h> +#include <ydb/public/api/protos/ydb_clickhouse_internal.pb.h> #include <util/string/vector.h> diff --git a/ydb/core/grpc_services/rpc_read_rows.cpp b/ydb/core/grpc_services/rpc_read_rows.cpp index 92b8bca53fa..275bbbf77a8 100644 --- a/ydb/core/grpc_services/rpc_read_rows.cpp +++ b/ydb/core/grpc_services/rpc_read_rows.cpp @@ -1,6 +1,6 @@ #include <ydb/core/grpc_services/base/base.h> -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" #include "service_table.h" #include <ydb/core/kqp/common/kqp_ru_calc.h> diff --git a/ydb/core/grpc_services/rpc_read_table.cpp b/ydb/core/grpc_services/rpc_read_table.cpp index ad888c08c67..37ee81207b6 100644 --- a/ydb/core/grpc_services/rpc_read_table.cpp +++ b/ydb/core/grpc_services/rpc_read_table.cpp @@ -1,7 +1,7 @@ #include "service_table.h" #include <ydb/core/grpc_services/base/base.h> -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" #include "rpc_calls.h" #include "rpc_kqp_base.h" #include "local_rate_limiter.h" diff --git a/ydb/core/grpc_services/rpc_remove_directory.cpp b/ydb/core/grpc_services/rpc_remove_directory.cpp index 2590e5c0f5e..e8037efbed1 100644 --- a/ydb/core/grpc_services/rpc_remove_directory.cpp +++ b/ydb/core/grpc_services/rpc_remove_directory.cpp @@ -1,7 +1,7 @@ #include "service_scheme.h" #include "rpc_scheme_base.h" -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" #include <ydb/core/grpc_services/base/base.h> #include <ydb/public/api/protos/ydb_scheme.pb.h> diff --git a/ydb/core/grpc_services/rpc_rename_tables.cpp b/ydb/core/grpc_services/rpc_rename_tables.cpp index 40c2be4f5db..e5fd056d48b 100644 --- a/ydb/core/grpc_services/rpc_rename_tables.cpp +++ b/ydb/core/grpc_services/rpc_rename_tables.cpp @@ -4,7 +4,7 @@ #include "service_table.h" #include "rpc_calls.h" #include "rpc_scheme_base.h" -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" namespace NKikimr { namespace NGRpcService { diff --git a/ydb/core/grpc_services/rpc_rollback_transaction.cpp b/ydb/core/grpc_services/rpc_rollback_transaction.cpp index 78a151749d4..136947f1f3b 100644 --- a/ydb/core/grpc_services/rpc_rollback_transaction.cpp +++ b/ydb/core/grpc_services/rpc_rollback_transaction.cpp @@ -3,7 +3,7 @@ #include "rpc_calls.h" #include "rpc_kqp_base.h" -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" #include "service_table.h" #include <ydb/library/yql/public/issue/yql_issue_message.h> diff --git a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp index f93d44e8803..1f81cb8b5ce 100644 --- a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp +++ b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp @@ -1,7 +1,7 @@ #include "service_table.h" #include <ydb/core/grpc_services/base/base.h> -#include "rpc_common.h" +#include "rpc_common/rpc_common.h" #include "rpc_kqp_base.h" #include "service_table.h" diff --git a/ydb/core/grpc_services/ya.make b/ydb/core/grpc_services/ya.make index f7265ec1574..e45e685f0c2 100644 --- a/ydb/core/grpc_services/ya.make +++ b/ydb/core/grpc_services/ya.make @@ -23,7 +23,6 @@ SRCS( rpc_copy_tables.cpp rpc_export.cpp rpc_create_coordination_node.cpp - rpc_create_session.cpp rpc_create_table.cpp rpc_delete_session.cpp rpc_describe_coordination_node.cpp @@ -74,6 +73,8 @@ SRCS( rpc_whoami.cpp table_settings.cpp + rpc_common/rpc_common_kqp_session.cpp + query/rpc_execute_query.cpp query/rpc_execute_script.cpp query/rpc_fetch_script_results.cpp diff --git a/ydb/services/persqueue_v1/rpc_calls.h b/ydb/services/persqueue_v1/rpc_calls.h index 5afd9e4fb6d..75fac9e8802 100644 --- a/ydb/services/persqueue_v1/rpc_calls.h +++ b/ydb/services/persqueue_v1/rpc_calls.h @@ -6,7 +6,6 @@ #include <ydb/core/grpc_services/rpc_calls.h> #include <ydb/core/grpc_services/rpc_scheme_base.h> -#include <ydb/core/grpc_services/rpc_common.h> namespace NKikimr::NGRpcService { diff --git a/ydb/services/ydb/ut/CMakeLists.darwin-x86_64.txt b/ydb/services/ydb/ut/CMakeLists.darwin-x86_64.txt index b73b9d81828..ca652056594 100644 --- a/ydb/services/ydb/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/services/ydb/ut/CMakeLists.darwin-x86_64.txt @@ -65,6 +65,7 @@ target_sources(ydb-services-ydb-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_olapstore_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_monitoring_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/cert_gen.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_query_ut.cpp ) set_property( TARGET diff --git a/ydb/services/ydb/ut/CMakeLists.linux-aarch64.txt b/ydb/services/ydb/ut/CMakeLists.linux-aarch64.txt index 017cd291baa..8f4a9f440b8 100644 --- a/ydb/services/ydb/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/services/ydb/ut/CMakeLists.linux-aarch64.txt @@ -68,6 +68,7 @@ target_sources(ydb-services-ydb-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_olapstore_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_monitoring_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/cert_gen.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_query_ut.cpp ) set_property( TARGET diff --git a/ydb/services/ydb/ut/CMakeLists.linux-x86_64.txt b/ydb/services/ydb/ut/CMakeLists.linux-x86_64.txt index 77c88076457..99673c83ec4 100644 --- a/ydb/services/ydb/ut/CMakeLists.linux-x86_64.txt +++ b/ydb/services/ydb/ut/CMakeLists.linux-x86_64.txt @@ -69,6 +69,7 @@ target_sources(ydb-services-ydb-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_olapstore_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_monitoring_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/cert_gen.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_query_ut.cpp ) set_property( TARGET diff --git a/ydb/services/ydb/ut/CMakeLists.windows-x86_64.txt b/ydb/services/ydb/ut/CMakeLists.windows-x86_64.txt index 64ee11bf384..f851c69d196 100644 --- a/ydb/services/ydb/ut/CMakeLists.windows-x86_64.txt +++ b/ydb/services/ydb/ut/CMakeLists.windows-x86_64.txt @@ -58,6 +58,7 @@ target_sources(ydb-services-ydb-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_olapstore_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_monitoring_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ydb/cert_gen.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ydb/ydb_query_ut.cpp ) set_property( TARGET diff --git a/ydb/services/ydb/ut/ya.make b/ydb/services/ydb/ut/ya.make index 7d3c20fa4e2..ebf7b4976a0 100644 --- a/ydb/services/ydb/ut/ya.make +++ b/ydb/services/ydb/ut/ya.make @@ -28,6 +28,7 @@ SRCS( ydb_olapstore_ut.cpp ydb_monitoring_ut.cpp cert_gen.cpp + ydb_query_ut.cpp ) PEERDIR( diff --git a/ydb/services/ydb/ydb_query.cpp b/ydb/services/ydb/ydb_query.cpp index 698417e2033..89d2202d5b3 100644 --- a/ydb/services/ydb/ydb_query.cpp +++ b/ydb/services/ydb/ydb_query.cpp @@ -9,6 +9,7 @@ namespace NKikimr::NGRpcService { void TGRpcYdbQueryService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { using namespace Ydb::Query; + using namespace NQuery; auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); @@ -26,13 +27,13 @@ void TGRpcYdbQueryService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { ADD_REQUEST(ExecuteQuery, ExecuteQueryRequest, ExecuteQueryResponsePart, { ActorSystem_->Send(GRpcRequestProxyId_, new TGrpcRequestNoOperationCall<ExecuteQueryRequest, ExecuteQueryResponsePart> - (ctx, &DoExecuteQueryRequest, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); + (ctx, &DoExecuteQuery, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); }) ADD_REQUEST(ExecuteScript, ExecuteScriptRequest, Ydb::Operations::Operation, { ActorSystem_->Send(GRpcRequestProxyId_, new TGrpcRequestNoOperationCall<ExecuteScriptRequest, Ydb::Operations::Operation> - (ctx, &DoExecuteScriptRequest, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); + (ctx, &DoExecuteScript, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); }) ADD_REQUEST(FetchScriptResults, FetchScriptResultsRequest, FetchScriptResultsResponse, { @@ -40,6 +41,12 @@ void TGRpcYdbQueryService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { new TGrpcRequestNoOperationCall<FetchScriptResultsRequest, FetchScriptResultsResponse> (ctx, &DoFetchScriptResults, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); }) + + ADD_REQUEST(CreateSession, CreateSessionRequest, CreateSessionResponse, { + ActorSystem_->Send(GRpcRequestProxyId_, + new TGrpcRequestNoOperationCall<CreateSessionRequest, CreateSessionResponse> + (ctx, &DoCreateSession, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); + }) #undef ADD_REQUEST } diff --git a/ydb/services/ydb/ydb_query_ut.cpp b/ydb/services/ydb/ydb_query_ut.cpp new file mode 100644 index 00000000000..095253d041d --- /dev/null +++ b/ydb/services/ydb/ydb_query_ut.cpp @@ -0,0 +1,34 @@ +#include "ydb_common_ut.h" + +#include <ydb/public/api/grpc/draft/ydb_query_v1.grpc.pb.h> + +using namespace NYdb; + +Y_UNIT_TEST_SUITE(YdbQueryService) { + Y_UNIT_TEST(TestCreateSession) { + TKikimrWithGrpcAndRootSchema server; + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + auto clientConfig = NGRpcProxy::TGRpcClientConfig(location); + bool allDoneOk = false; + + { + NGrpc::TGRpcClientLow clientLow; + auto connection = clientLow.CreateGRpcServiceConnection<Ydb::Query::V1::QueryService>(clientConfig); + + Ydb::Query::CreateSessionRequest request; + + NGrpc::TResponseCallback<Ydb::Query::CreateSessionResponse> responseCb = + [&allDoneOk](NGrpc::TGrpcStatus&& grpcStatus, Ydb::Query::CreateSessionResponse&& response) -> void { + UNIT_ASSERT(!grpcStatus.InternalError); + UNIT_ASSERT(grpcStatus.GRpcStatusCode == 0); + UNIT_ASSERT_VALUES_EQUAL(response.status(), Ydb::StatusIds::SUCCESS); + UNIT_ASSERT(response.session_id() != ""); + allDoneOk = true; + }; + + connection->DoRequest(request, std::move(responseCb), &Ydb::Query::V1::QueryService::Stub::AsyncCreateSession); + } + UNIT_ASSERT(allDoneOk); + } +} |