diff options
author | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-25 16:51:47 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-25 16:51:47 +0300 |
commit | cbee4d9c75bc99f6dbd3745d56dd5270337f9782 (patch) | |
tree | f8b47675cce276c09b68df6254010c6169f5aa54 | |
parent | 42757795514a0ad28c3a4046f1024469e6dd92b7 (diff) | |
download | ydb-cbee4d9c75bc99f6dbd3745d56dd5270337f9782.tar.gz |
Invert grpc_request_proxy dependence for cms, discovery, monitoring, operation services. KIKIMR-13646
ref:ff5c0a0bbc95962962da0b812938459f346fd22f
23 files changed, 292 insertions, 203 deletions
diff --git a/ydb/core/grpc_services/grpc_request_proxy.cpp b/ydb/core/grpc_services/grpc_request_proxy.cpp index 358570e4e76..14bb45680db 100644 --- a/ydb/core/grpc_services/grpc_request_proxy.cpp +++ b/ydb/core/grpc_services/grpc_request_proxy.cpp @@ -566,17 +566,7 @@ void TGRpcRequestProxyImpl::StateFunc(TAutoPtr<IEventHandle>& ev, const TActorCo switch (ev->GetTypeRewrite()) { HFunc(TRefreshTokenImpl, PreHandle); HFunc(TEvLoginRequest, PreHandle); - HFunc(TEvGetOperationRequest, PreHandle); - HFunc(TEvCancelOperationRequest, PreHandle); - HFunc(TEvForgetOperationRequest, PreHandle); - HFunc(TEvListOperationsRequest, PreHandle); - HFunc(TEvCreateTenantRequest, PreHandle); - HFunc(TEvAlterTenantRequest, PreHandle); - HFunc(TEvGetTenantStatusRequest, PreHandle); - HFunc(TEvListTenantsRequest, PreHandle); - HFunc(TEvRemoveTenantRequest, PreHandle); HFunc(TEvListEndpointsRequest, PreHandle); - HFunc(TEvDescribeTenantOptionsRequest, PreHandle); HFunc(TEvCreateCoordinationNode, PreHandle); HFunc(TEvAlterCoordinationNode, PreHandle); HFunc(TEvDropCoordinationNode, PreHandle); @@ -601,7 +591,6 @@ void TGRpcRequestProxyImpl::StateFunc(TAutoPtr<IEventHandle>& ev, const TActorCo HFunc(TEvImportFromS3Request, PreHandle); HFunc(TEvImportDataRequest, PreHandle); HFunc(TEvDiscoverPQClustersRequest, PreHandle); - HFunc(TEvWhoAmIRequest, PreHandle); HFunc(TEvCreateRateLimiterResource, PreHandle); HFunc(TEvAlterRateLimiterResource, PreHandle); HFunc(TEvDropRateLimiterResource, PreHandle); @@ -611,7 +600,6 @@ void TGRpcRequestProxyImpl::StateFunc(TAutoPtr<IEventHandle>& ev, const TActorCo HFunc(TEvKikhouseCreateSnapshotRequest, PreHandle); HFunc(TEvKikhouseRefreshSnapshotRequest, PreHandle); HFunc(TEvKikhouseDiscardSnapshotRequest, PreHandle); - HFunc(TEvSelfCheckRequest, PreHandle); HFunc(TEvCoordinationSessionRequest, PreHandle); HFunc(TEvLongTxBeginRequest, PreHandle); HFunc(TEvLongTxCommitRequest, PreHandle); diff --git a/ydb/core/grpc_services/grpc_request_proxy.h b/ydb/core/grpc_services/grpc_request_proxy.h index 508f678330f..2d568d5f47d 100644 --- a/ydb/core/grpc_services/grpc_request_proxy.h +++ b/ydb/core/grpc_services/grpc_request_proxy.h @@ -48,17 +48,8 @@ public: }; protected: - void Handle(TEvGetOperationRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvCancelOperationRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvForgetOperationRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvListOperationsRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvCreateTenantRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvAlterTenantRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvGetTenantStatusRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvListTenantsRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvRemoveTenantRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvListEndpointsRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDescribeTenantOptionsRequest::TPtr& ev, const TActorContext& ctx); + void Handle(TEvCreateCoordinationNode::TPtr& ev, const TActorContext& ctx); void Handle(TEvAlterCoordinationNode::TPtr& ev, const TActorContext& ctx); void Handle(TEvDropCoordinationNode::TPtr& ev, const TActorContext& ctx); @@ -83,7 +74,6 @@ protected: void Handle(TEvImportFromS3Request::TPtr& ev, const TActorContext& ctx); void Handle(TEvImportDataRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvDiscoverPQClustersRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvWhoAmIRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvCreateRateLimiterResource::TPtr& ev, const TActorContext& ctx); void Handle(TEvAlterRateLimiterResource::TPtr& ev, const TActorContext& ctx); void Handle(TEvDropRateLimiterResource::TPtr& ev, const TActorContext& ctx); @@ -93,7 +83,6 @@ protected: void Handle(TEvKikhouseCreateSnapshotRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvKikhouseRefreshSnapshotRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvKikhouseDiscardSnapshotRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvSelfCheckRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvLoginRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvCoordinationSessionRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvLongTxBeginRequest::TPtr& ev, const TActorContext& ctx); diff --git a/ydb/core/grpc_services/rpc_calls.h b/ydb/core/grpc_services/rpc_calls.h index 4fa6b5b38ac..3f70f2f0002 100644 --- a/ydb/core/grpc_services/rpc_calls.h +++ b/ydb/core/grpc_services/rpc_calls.h @@ -7,7 +7,7 @@ #include <ydb/public/api/protos/ydb_auth.pb.h> #include <ydb/public/api/protos/ydb_clickhouse_internal.pb.h> -#include <ydb/public/api/protos/ydb_cms.pb.h> + #include <ydb/public/api/protos/ydb_coordination.pb.h> #include <ydb/public/api/protos/ydb_discovery.pb.h> #include <ydb/public/api/protos/ydb_experimental.pb.h> @@ -19,7 +19,7 @@ #include <ydb/public/api/protos/ydb_persqueue_cluster_discovery.pb.h> #include <ydb/public/api/protos/ydb_persqueue_v1.pb.h> #include <ydb/public/api/protos/ydb_rate_limiter.pb.h> -#include <ydb/public/api/protos/ydb_monitoring.pb.h> + #include <ydb/public/api/protos/yq.pb.h> #include <ydb/public/api/grpc/draft/dummy.pb.h> @@ -45,17 +45,8 @@ void FillYdbStatus(Draft::Dummy::PingResponse& resp, const NYql::TIssues& issues template <> void FillYdbStatus(Ydb::Coordination::SessionResponse& resp, const NYql::TIssues& issues, Ydb::StatusIds::StatusCode status); -using TEvGetOperationRequest = TGRpcRequestValidationWrapper<TRpcServices::EvGetOperation, Ydb::Operations::GetOperationRequest, Ydb::Operations::GetOperationResponse, true, TRateLimiterMode::Rps>; -using TEvCancelOperationRequest = TGRpcRequestValidationWrapper<TRpcServices::EvCancelOperation, Ydb::Operations::CancelOperationRequest, Ydb::Operations::CancelOperationResponse, false, TRateLimiterMode::Rps>; -using TEvForgetOperationRequest = TGRpcRequestValidationWrapper<TRpcServices::EvForgetOperation, Ydb::Operations::ForgetOperationRequest, Ydb::Operations::ForgetOperationResponse, false, TRateLimiterMode::Rps>; -using TEvListOperationsRequest = TGRpcRequestValidationWrapper<TRpcServices::EvListOperations, Ydb::Operations::ListOperationsRequest, Ydb::Operations::ListOperationsResponse, false, TRateLimiterMode::Rps>; -using TEvCreateTenantRequest = TGRpcRequestWrapper<TRpcServices::EvCreateTenant, Ydb::Cms::CreateDatabaseRequest, Ydb::Cms::CreateDatabaseResponse, true>; -using TEvAlterTenantRequest = TGRpcRequestWrapper<TRpcServices::EvAlterTenant, Ydb::Cms::AlterDatabaseRequest, Ydb::Cms::AlterDatabaseResponse, true>; -using TEvGetTenantStatusRequest = TGRpcRequestWrapper<TRpcServices::EvGetTenantStatus, Ydb::Cms::GetDatabaseStatusRequest, Ydb::Cms::GetDatabaseStatusResponse, true>; -using TEvListTenantsRequest = TGRpcRequestWrapper<TRpcServices::EvListTenants, Ydb::Cms::ListDatabasesRequest, Ydb::Cms::ListDatabasesResponse, true>; -using TEvRemoveTenantRequest = TGRpcRequestWrapper<TRpcServices::EvRemoveTenant, Ydb::Cms::RemoveDatabaseRequest, Ydb::Cms::RemoveDatabaseResponse, true>; using TEvListEndpointsRequest = TGRpcRequestWrapper<TRpcServices::EvListEndpoints, Ydb::Discovery::ListEndpointsRequest, Ydb::Discovery::ListEndpointsResponse, true>; -using TEvDescribeTenantOptionsRequest = TGRpcRequestWrapper<TRpcServices::EvDescribeTenantOptions, Ydb::Cms::DescribeDatabaseOptionsRequest, Ydb::Cms::DescribeDatabaseOptionsResponse, true>; + using TEvCreateCoordinationNode = TGRpcRequestWrapper<TRpcServices::EvCreateCoordinationNode, Ydb::Coordination::CreateNodeRequest, Ydb::Coordination::CreateNodeResponse, true, TRateLimiterMode::Rps>; using TEvAlterCoordinationNode = TGRpcRequestWrapper<TRpcServices::EvAlterCoordinationNode, Ydb::Coordination::AlterNodeRequest, Ydb::Coordination::AlterNodeResponse, true, TRateLimiterMode::Rps>; using TEvDropCoordinationNode = TGRpcRequestWrapper<TRpcServices::EvDropCoordinationNode, Ydb::Coordination::DropNodeRequest, Ydb::Coordination::DropNodeResponse, true, TRateLimiterMode::Rps>; @@ -80,7 +71,6 @@ using TEvExportToS3Request = TGRpcRequestValidationWrapper<TRpcServices::EvExpor using TEvImportFromS3Request = TGRpcRequestValidationWrapper<TRpcServices::EvImportFromS3, Ydb::Import::ImportFromS3Request, Ydb::Import::ImportFromS3Response, true>; using TEvImportDataRequest = TGRpcRequestValidationWrapper<TRpcServices::EvImportData, Ydb::Import::ImportDataRequest, Ydb::Import::ImportDataResponse, true>; using TEvDiscoverPQClustersRequest = TGRpcRequestWrapper<TRpcServices::EvDiscoverPQClusters, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResponse, true>; -using TEvWhoAmIRequest = TGRpcRequestWrapper<TRpcServices::EvWhoAmI, Ydb::Discovery::WhoAmIRequest, Ydb::Discovery::WhoAmIResponse, true, TRateLimiterMode::Rps>; using TEvCreateRateLimiterResource = TGRpcRequestWrapper<TRpcServices::EvCreateRateLimiterResource, Ydb::RateLimiter::CreateResourceRequest, Ydb::RateLimiter::CreateResourceResponse, true, TRateLimiterMode::Rps>; using TEvAlterRateLimiterResource = TGRpcRequestWrapper<TRpcServices::EvAlterRateLimiterResource, Ydb::RateLimiter::AlterResourceRequest, Ydb::RateLimiter::AlterResourceResponse, true, TRateLimiterMode::Rps>; using TEvDropRateLimiterResource = TGRpcRequestWrapper<TRpcServices::EvDropRateLimiterResource, Ydb::RateLimiter::DropResourceRequest, Ydb::RateLimiter::DropResourceResponse, true, TRateLimiterMode::Rps>; @@ -90,7 +80,7 @@ using TEvAcquireRateLimiterResource = TGRpcRequestWrapper<TRpcServices::EvAcquir using TEvKikhouseCreateSnapshotRequest = TGRpcRequestWrapper<TRpcServices::EvKikhouseCreateSnapshot, Ydb::ClickhouseInternal::CreateSnapshotRequest, Ydb::ClickhouseInternal::CreateSnapshotResponse, true>; using TEvKikhouseRefreshSnapshotRequest = TGRpcRequestWrapper<TRpcServices::EvKikhouseRefreshSnapshot, Ydb::ClickhouseInternal::RefreshSnapshotRequest, Ydb::ClickhouseInternal::RefreshSnapshotResponse, true>; using TEvKikhouseDiscardSnapshotRequest = TGRpcRequestWrapper<TRpcServices::EvKikhouseDiscardSnapshot, Ydb::ClickhouseInternal::DiscardSnapshotRequest, Ydb::ClickhouseInternal::DiscardSnapshotResponse, true>; -using TEvSelfCheckRequest = TGRpcRequestWrapper<TRpcServices::EvSelfCheck, Ydb::Monitoring::SelfCheckRequest, Ydb::Monitoring::SelfCheckResponse, true>; + using TEvLoginRequest = TGRpcRequestWrapperNoAuth<TRpcServices::EvLogin, Ydb::Auth::LoginRequest, Ydb::Auth::LoginResponse>; using TEvCoordinationSessionRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvCoordinationSession, Ydb::Coordination::SessionRequest, Ydb::Coordination::SessionResponse>; using TEvLongTxBeginRequest = TGRpcRequestWrapper<TRpcServices::EvLongTxBegin, Ydb::LongTx::BeginTransactionRequest, Ydb::LongTx::BeginTransactionResponse, true>; diff --git a/ydb/core/grpc_services/rpc_cancel_operation.cpp b/ydb/core/grpc_services/rpc_cancel_operation.cpp index 579f927c80f..fdb049ecc7a 100644 --- a/ydb/core/grpc_services/rpc_cancel_operation.cpp +++ b/ydb/core/grpc_services/rpc_cancel_operation.cpp @@ -1,8 +1,7 @@ -#include "grpc_request_proxy.h" +#include "service_operation.h" #include "operation_helpers.h" -#include "rpc_calls.h" #include "rpc_operation_request_base.h" - +#include <ydb/core/grpc_services/base/base.h> #include <ydb/core/tx/schemeshard/schemeshard_build_index.h> #include <ydb/core/tx/schemeshard/schemeshard_export.h> #include <ydb/core/tx/schemeshard/schemeshard_import.h> @@ -19,6 +18,9 @@ using namespace NKikimrIssues; using namespace NOperationId; using namespace Ydb; +using TEvCancelOperationRequest = TGrpcRequestNoOperationCall<Ydb::Operations::CancelOperationRequest, + Ydb::Operations::CancelOperationResponse>; + class TCancelOperationRPC: public TRpcOperationRequestActor<TCancelOperationRPC, TEvCancelOperationRequest> { TStringBuf GetLogPrefix() const override { switch (OperationId.GetKind()) { @@ -77,7 +79,7 @@ public: using TRpcOperationRequestActor::TRpcOperationRequestActor; void Bootstrap() { - const TString& id = Request->GetProtoRequest()->id(); + const TString& id = GetProtoRequest()->id(); try { OperationId = TOperationId(id); @@ -119,8 +121,8 @@ private: }; // TCancelOperationRPC -void TGRpcRequestProxy::Handle(TEvCancelOperationRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TCancelOperationRPC(ev->Release().Release())); +void DoCancelOperationRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider &) { + TActivationContext::AsActorContext().Register(new TCancelOperationRPC(p.release())); } } // namespace NGRpcService diff --git a/ydb/core/grpc_services/rpc_cms.cpp b/ydb/core/grpc_services/rpc_cms.cpp index 78450233069..aa1aacf0898 100644 --- a/ydb/core/grpc_services/rpc_cms.cpp +++ b/ydb/core/grpc_services/rpc_cms.cpp @@ -1,11 +1,11 @@ -#include "grpc_request_proxy.h" - -#include "rpc_calls.h" +#include "service_cms.h" #include "rpc_deferrable.h" +#include <ydb/core/grpc_services/base/base.h> #include <ydb/core/base/appdata.h> #include <ydb/core/base/tablet_pipe.h> #include <ydb/core/cms/console/console.h> +#include <ydb/public/api/protos/ydb_cms.pb.h> namespace NKikimr { namespace NGRpcService { @@ -14,6 +14,19 @@ using namespace NActors; using namespace NConsole; using namespace Ydb; +using TEvCreateTenantRequest = TGrpcRequestOperationCall<Cms::CreateDatabaseRequest, + Cms::CreateDatabaseResponse>; +using TEvAlterTenantRequest = TGrpcRequestOperationCall<Cms::AlterDatabaseRequest, + Cms::AlterDatabaseResponse>; +using TEvGetTenantStatusRequest = TGrpcRequestOperationCall<Cms::GetDatabaseStatusRequest, + Cms::GetDatabaseStatusResponse>; +using TEvListTenantsRequest = TGrpcRequestOperationCall<Cms::ListDatabasesRequest, + Cms::ListDatabasesResponse>; +using TEvRemoveTenantRequest = TGrpcRequestOperationCall<Cms::RemoveDatabaseRequest, + Cms::RemoveDatabaseResponse>; +using TEvDescribeTenantOptionsRequest = TGrpcRequestOperationCall<Cms::DescribeDatabaseOptionsRequest, + Cms::DescribeDatabaseOptionsResponse>; + template <typename TRequest, typename TCmsRequest, typename TCmsResponse> class TCmsRPC : public TRpcOperationRequestActor<TCmsRPC<TRequest, TCmsRequest, TCmsResponse>, TRequest> { using TThis = TCmsRPC<TRequest, TCmsRequest, TCmsResponse>; @@ -22,7 +35,7 @@ class TCmsRPC : public TRpcOperationRequestActor<TCmsRPC<TRequest, TCmsRequest, TActorId CmsPipe; public: - TCmsRPC(TRequest* msg) + TCmsRPC(IRequestOpCtx* msg) : TBase(msg) { } @@ -131,46 +144,46 @@ private: } }; -void TGRpcRequestProxy::Handle(TEvCreateTenantRequest::TPtr& ev, const TActorContext& ctx) -{ - ctx.Register(new TCmsRPC<TEvCreateTenantRequest, - TEvConsole::TEvCreateTenantRequest, - TEvConsole::TEvCreateTenantResponse>(ev->Release().Release())); +void DoCreateTenantRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) { + TActivationContext::AsActorContext().Register( + new TCmsRPC<TEvCreateTenantRequest, + TEvConsole::TEvCreateTenantRequest, + TEvConsole::TEvCreateTenantResponse>(p.release())); } -void TGRpcRequestProxy::Handle(TEvAlterTenantRequest::TPtr& ev, const TActorContext& ctx) -{ - ctx.Register(new TCmsRPC<TEvAlterTenantRequest, - TEvConsole::TEvAlterTenantRequest, - TEvConsole::TEvAlterTenantResponse>(ev->Release().Release())); +void DoAlterTenantRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) { + TActivationContext::AsActorContext().Register( + new TCmsRPC<TEvAlterTenantRequest, + TEvConsole::TEvAlterTenantRequest, + TEvConsole::TEvAlterTenantResponse>(p.release())); } -void TGRpcRequestProxy::Handle(TEvGetTenantStatusRequest::TPtr& ev, const TActorContext& ctx) -{ - ctx.Register(new TCmsRPC<TEvGetTenantStatusRequest, - TEvConsole::TEvGetTenantStatusRequest, - TEvConsole::TEvGetTenantStatusResponse>(ev->Release().Release())); +void DoGetTenantStatusRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) { + TActivationContext::AsActorContext().Register( + new TCmsRPC<TEvGetTenantStatusRequest, + TEvConsole::TEvGetTenantStatusRequest, + TEvConsole::TEvGetTenantStatusResponse>(p.release())); } -void TGRpcRequestProxy::Handle(TEvListTenantsRequest::TPtr& ev, const TActorContext& ctx) -{ - ctx.Register(new TCmsRPC<TEvListTenantsRequest, - TEvConsole::TEvListTenantsRequest, - TEvConsole::TEvListTenantsResponse>(ev->Release().Release())); +void DoListTenantsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) { + TActivationContext::AsActorContext().Register( + new TCmsRPC<TEvListTenantsRequest, + TEvConsole::TEvListTenantsRequest, + TEvConsole::TEvListTenantsResponse>(p.release())); } -void TGRpcRequestProxy::Handle(TEvRemoveTenantRequest::TPtr& ev, const TActorContext& ctx) -{ - ctx.Register(new TCmsRPC<TEvRemoveTenantRequest, - TEvConsole::TEvRemoveTenantRequest, - TEvConsole::TEvRemoveTenantResponse>(ev->Release().Release())); +void DoRemoveTenantRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) { + TActivationContext::AsActorContext().Register( + new TCmsRPC<TEvRemoveTenantRequest, + TEvConsole::TEvRemoveTenantRequest, + TEvConsole::TEvRemoveTenantResponse>(p.release())); } -void TGRpcRequestProxy::Handle(TEvDescribeTenantOptionsRequest::TPtr& ev, const TActorContext& ctx) -{ - ctx.Register(new TCmsRPC<TEvDescribeTenantOptionsRequest, - TEvConsole::TEvDescribeTenantOptionsRequest, - TEvConsole::TEvDescribeTenantOptionsResponse>(ev->Release().Release())); +void DoDescribeTenantOptionsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) { + TActivationContext::AsActorContext().Register( + new TCmsRPC<TEvDescribeTenantOptionsRequest, + TEvConsole::TEvDescribeTenantOptionsRequest, + TEvConsole::TEvDescribeTenantOptionsResponse>(p.release())); } } // namespace NGRpcService diff --git a/ydb/core/grpc_services/rpc_export.cpp b/ydb/core/grpc_services/rpc_export.cpp index f0fa7be3dfe..352d5f6bc72 100644 --- a/ydb/core/grpc_services/rpc_export.cpp +++ b/ydb/core/grpc_services/rpc_export.cpp @@ -25,7 +25,7 @@ class TExportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>, } IEventBase* MakeRequest() override { - const auto& request = *this->Request->GetProtoRequest(); + const auto& request = *this->GetProtoRequest(); auto ev = MakeHolder<TEvExport::TEvCreateExportRequest>(); ev->Record.SetTxId(this->TxId); @@ -57,7 +57,7 @@ class TExportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>, TVector<TString> paths; paths.emplace_back(this->DatabaseName); // first entry is database - ExtractPaths(paths, this->Request->GetProtoRequest()->settings()); + ExtractPaths(paths, this->GetProtoRequest()->settings()); return paths; } @@ -175,7 +175,7 @@ public: using TRpcOperationRequestActor<TDerived, TEvRequest, true>::TRpcOperationRequestActor; void Bootstrap(const TActorContext&) { - const auto& request = *this->Request->GetProtoRequest(); + const auto& request = *this->GetProtoRequest(); if (request.settings().items().empty()) { return this->Reply(StatusIds::BAD_REQUEST, TIssuesIds::DEFAULT_ERROR, "Items are not set"); diff --git a/ydb/core/grpc_services/rpc_forget_operation.cpp b/ydb/core/grpc_services/rpc_forget_operation.cpp index 8b250125cac..03da84298b0 100644 --- a/ydb/core/grpc_services/rpc_forget_operation.cpp +++ b/ydb/core/grpc_services/rpc_forget_operation.cpp @@ -1,8 +1,8 @@ -#include "grpc_request_proxy.h" +#include "service_operation.h" #include "operation_helpers.h" -#include "rpc_calls.h" #include "rpc_operation_request_base.h" +#include <ydb/core/grpc_services/base/base.h> #include <ydb/core/tx/schemeshard/schemeshard_build_index.h> #include <ydb/core/tx/schemeshard/schemeshard_export.h> #include <ydb/core/tx/schemeshard/schemeshard_import.h> @@ -19,6 +19,9 @@ using namespace NKikimrIssues; using namespace NOperationId; using namespace Ydb; +using TEvForgetOperationRequest = TGrpcRequestNoOperationCall<Ydb::Operations::ForgetOperationRequest, + Ydb::Operations::ForgetOperationResponse>; + class TForgetOperationRPC: public TRpcOperationRequestActor<TForgetOperationRPC, TEvForgetOperationRequest> { TStringBuf GetLogPrefix() const override { switch (OperationId.GetKind()) { @@ -77,7 +80,7 @@ public: using TRpcOperationRequestActor::TRpcOperationRequestActor; void Bootstrap() { - const TString& id = Request->GetProtoRequest()->id(); + const TString& id = GetProtoRequest()->id(); try { OperationId = TOperationId(id); @@ -119,8 +122,8 @@ private: }; // TForgetOperationRPC -void TGRpcRequestProxy::Handle(TEvForgetOperationRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TForgetOperationRPC(ev->Release().Release())); +void DoForgetOperationRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider &) { + TActivationContext::AsActorContext().Register(new TForgetOperationRPC(p.release())); } } // namespace NGRpcService diff --git a/ydb/core/grpc_services/rpc_get_operation.cpp b/ydb/core/grpc_services/rpc_get_operation.cpp index 6f7a1406863..b585d797dbd 100644 --- a/ydb/core/grpc_services/rpc_get_operation.cpp +++ b/ydb/core/grpc_services/rpc_get_operation.cpp @@ -1,9 +1,11 @@ -#include "grpc_request_proxy.h" +#include "service_operation.h" + #include "operation_helpers.h" #include "rpc_export_base.h" #include "rpc_import_base.h" #include "rpc_operation_request_base.h" +#include <ydb/core/grpc_services/base/base.h> #include <google/protobuf/text_format.h> #include <ydb/core/base/tablet_pipe.h> @@ -27,6 +29,9 @@ using namespace NActors; using namespace NOperationId; using namespace Ydb; +using TEvGetOperationRequest = TGrpcRequestOperationCall<Ydb::Operations::GetOperationRequest, + Ydb::Operations::GetOperationResponse>; + class TGetOperationRPC : public TRpcOperationRequestActor<TGetOperationRPC, TEvGetOperationRequest, true>, public TExportConv { @@ -69,7 +74,7 @@ public: using TRpcOperationRequestActor::TRpcOperationRequestActor; void Bootstrap(const TActorContext &ctx) { - const auto req = Request->GetProtoRequest(); + const auto req = GetProtoRequest(); try { OperationId_ = TOperationId(req->id()); @@ -153,7 +158,7 @@ private: PipeActorId_ = ctx.ExecutorThread.RegisterActor(pipeActor); auto request = MakeHolder<NConsole::TEvConsole::TEvGetOperationRequest>(); - request->Record.MutableRequest()->set_id(Request->GetProtoRequest()->id()); + request->Record.MutableRequest()->set_id(GetProtoRequest()->id()); request->Record.SetUserToken(Request->GetInternalToken()); NTabletPipe::SendData(ctx, PipeActorId_, request.Release()); } @@ -227,7 +232,7 @@ private: const TActorContext &ctx) { TEvGetOperationRequest::TResponse resp; auto deferred = resp.mutable_operation(); - deferred->set_id(Request->GetProtoRequest()->id()); + deferred->set_id(GetProtoRequest()->id()); deferred->set_ready(true); deferred->set_status(status); if (issues.size()) @@ -238,7 +243,7 @@ private: void ReplyGetOperationResponse(bool ready, const TActorContext& ctx, StatusIds::StatusCode status = StatusIds::SUCCESS) { TEvGetOperationRequest::TResponse resp; auto deferred = resp.mutable_operation(); - deferred->set_id(Request->GetProtoRequest()->id()); + deferred->set_id(GetProtoRequest()->id()); deferred->set_ready(ready); if (ready) { deferred->set_status(status); @@ -252,7 +257,7 @@ private: { TEvGetOperationRequest::TResponse resp; auto deferred = resp.mutable_operation(); - deferred->set_id(Request->GetProtoRequest()->id()); + deferred->set_id(GetProtoRequest()->id()); deferred->set_ready(ready); if (ready) { deferred->set_status(status); @@ -278,8 +283,8 @@ private: TActorId PipeActorId_; }; -void TGRpcRequestProxy::Handle(TEvGetOperationRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TGetOperationRPC(ev->Release().Release())); +void DoGetOperationRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TGetOperationRPC(p.release())); } } // namespace NGRpcService diff --git a/ydb/core/grpc_services/rpc_import.cpp b/ydb/core/grpc_services/rpc_import.cpp index 83eff924039..6fc610c5d0a 100644 --- a/ydb/core/grpc_services/rpc_import.cpp +++ b/ydb/core/grpc_services/rpc_import.cpp @@ -25,7 +25,7 @@ class TImportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>, } IEventBase* MakeRequest() override { - const auto& request = *this->Request->GetProtoRequest(); + const auto& request = *(this->GetProtoRequest()); auto ev = MakeHolder<TEvImport::TEvCreateImportRequest>(); ev->Record.SetTxId(this->TxId); @@ -56,7 +56,7 @@ public: using TRpcOperationRequestActor<TDerived, TEvRequest, true>::TRpcOperationRequestActor; void Bootstrap(const TActorContext&) { - const auto& request = *this->Request->GetProtoRequest(); + const auto& request = *(this->GetProtoRequest()); if (request.settings().items().empty()) { return this->Reply(StatusIds::BAD_REQUEST, TIssuesIds::DEFAULT_ERROR, "Items are not set"); diff --git a/ydb/core/grpc_services/rpc_import_data.cpp b/ydb/core/grpc_services/rpc_import_data.cpp index 8df9b3e2c73..87c2d39c25e 100644 --- a/ydb/core/grpc_services/rpc_import_data.cpp +++ b/ydb/core/grpc_services/rpc_import_data.cpp @@ -119,7 +119,7 @@ class TImportDataRPC: public TRpcRequestActor<TImportDataRPC, TEvImportDataReque auto& entry = request->ResultSet.emplace_back(); entry.Operation = TNavigate::OpTable; - entry.Path = NKikimr::SplitPath(Request->GetProtoRequest()->path()); + entry.Path = NKikimr::SplitPath(GetProtoRequest()->path()); Send(MakeSchemeCacheID(), new TEvNavigate(request.Release())); Become(&TThis::StateResolvePath); @@ -221,7 +221,7 @@ class TImportDataRPC: public TRpcRequestActor<TImportDataRPC, TEvImportDataReque /// Process data void ProcessData() { - const auto& request = *Request->GetProtoRequest(); + const auto& request = *GetProtoRequest(); auto ev = MakeHolder<TEvDataShard::TEvUploadRowsRequest>(); ev->Record.SetTableId(KeyDesc->TableId.PathId.LocalPathId); @@ -414,7 +414,7 @@ public: using TRpcRequestActor<TImportDataRPC, TEvImportDataRequest, true>::TRpcRequestActor; void Bootstrap() { - switch (Request->GetProtoRequest()->format_case()) { + switch (GetProtoRequest()->format_case()) { case Import::ImportDataRequest::kYdbDump: break; default: diff --git a/ydb/core/grpc_services/rpc_list_operations.cpp b/ydb/core/grpc_services/rpc_list_operations.cpp index a905efd8d6f..15abef28e23 100644 --- a/ydb/core/grpc_services/rpc_list_operations.cpp +++ b/ydb/core/grpc_services/rpc_list_operations.cpp @@ -1,10 +1,11 @@ -#include "grpc_request_proxy.h" +#include "service_operation.h" + #include "operation_helpers.h" #include "rpc_export_base.h" #include "rpc_import_base.h" -#include "rpc_calls.h" #include "rpc_operation_request_base.h" +#include <ydb/core/grpc_services/base/base.h> #include <ydb/core/tx/schemeshard/schemeshard_build_index.h> #include <ydb/core/tx/schemeshard/schemeshard_export.h> #include <ydb/core/tx/schemeshard/schemeshard_import.h> @@ -21,11 +22,14 @@ using namespace NKikimrIssues; using namespace NOperationId; using namespace Ydb; +using TEvListOperationsRequest = TGrpcRequestNoOperationCall<Ydb::Operations::ListOperationsRequest, + Ydb::Operations::ListOperationsResponse>; + class TListOperationsRPC: public TRpcOperationRequestActor<TListOperationsRPC, TEvListOperationsRequest>, public TExportConv { TStringBuf GetLogPrefix() const override { - switch (ParseKind(Request->GetProtoRequest()->kind())) { + switch (ParseKind(GetProtoRequest()->kind())) { case TOperationId::EXPORT: return "[ListExports]"; case TOperationId::IMPORT: @@ -38,9 +42,9 @@ class TListOperationsRPC: public TRpcOperationRequestActor<TListOperationsRPC, T } IEventBase* MakeRequest() override { - const auto& request = *Request->GetProtoRequest(); + const auto& request = *GetProtoRequest(); - switch (ParseKind(Request->GetProtoRequest()->kind())) { + switch (ParseKind(GetProtoRequest()->kind())) { case TOperationId::EXPORT: return new TEvExport::TEvListExportsRequest(DatabaseName, request.page_size(), request.page_token(), request.kind()); case TOperationId::IMPORT: @@ -112,7 +116,7 @@ public: using TRpcOperationRequestActor::TRpcOperationRequestActor; void Bootstrap() { - switch (ParseKind(Request->GetProtoRequest()->kind())) { + switch (ParseKind(GetProtoRequest()->kind())) { case TOperationId::EXPORT: case TOperationId::IMPORT: case TOperationId::BUILD_INDEX: @@ -138,8 +142,8 @@ public: }; // TListOperationsRPC -void TGRpcRequestProxy::Handle(TEvListOperationsRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TListOperationsRPC(ev->Release().Release())); +void DoListOperationsRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider &) { + TActivationContext::AsActorContext().Register(new TListOperationsRPC(p.release())); } } // namespace NGRpcService diff --git a/ydb/core/grpc_services/rpc_login.cpp b/ydb/core/grpc_services/rpc_login.cpp index 9815aeac99b..b1ec612344b 100644 --- a/ydb/core/grpc_services/rpc_login.cpp +++ b/ydb/core/grpc_services/rpc_login.cpp @@ -12,7 +12,7 @@ namespace NGRpcService { using namespace NSchemeShard; -class TLoginRPC : public TRpcRequestActor<TLoginRPC, TEvLoginRequest> { +class TLoginRPC : public TRpcRequestActor<TLoginRPC, TEvLoginRequest, true> { public: using TRpcRequestActor::TRpcRequestActor; @@ -54,7 +54,7 @@ public: IActor* pipe = NTabletPipe::CreateClient(SelfId(), schemeShardTabletId, GetPipeClientConfig()); PipeClient = RegisterWithSameMailbox(pipe); THolder<TEvSchemeShard::TEvLogin> request = MakeHolder<TEvSchemeShard::TEvLogin>(); - const Ydb::Auth::LoginRequest* protoRequest = Request->GetProtoRequest(); + const Ydb::Auth::LoginRequest* protoRequest = GetProtoRequest(); request.Get()->Record.SetUser(protoRequest->user()); request.Get()->Record.SetPassword(protoRequest->password()); NTabletPipe::SendData(SelfId(), PipeClient, request.Release()); diff --git a/ydb/core/grpc_services/rpc_monitoring.cpp b/ydb/core/grpc_services/rpc_monitoring.cpp index 5640cdb5e22..d551da2fb76 100644 --- a/ydb/core/grpc_services/rpc_monitoring.cpp +++ b/ydb/core/grpc_services/rpc_monitoring.cpp @@ -1,9 +1,9 @@ -#include "grpc_request_proxy.h" +#include "service_monitoring.h" -#include "rpc_calls.h" #include "rpc_kqp_base.h" #include "rpc_request_base.h" +#include <ydb/core/grpc_services/base/base.h> #include <ydb/library/yql/public/issue/yql_issue_message.h> #include <ydb/library/yql/public/issue/yql_issue.h> @@ -15,6 +15,7 @@ #include <util/random/shuffle.h> #include <ydb/core/health_check/health_check.h> +#include <ydb/public/api/protos/ydb_monitoring.pb.h> namespace NKikimr { namespace NGRpcService { @@ -22,7 +23,9 @@ namespace NGRpcService { using namespace NActors; using namespace Ydb; -class TSelfCheckRPC : public TRpcRequestActor<TSelfCheckRPC, TEvSelfCheckRequest> { +using TEvSelfCheckRequest = TGrpcRequestOperationCall<Ydb::Monitoring::SelfCheckRequest, Ydb::Monitoring::SelfCheckResponse>; + +class TSelfCheckRPC : public TRpcRequestActor<TSelfCheckRPC, TEvSelfCheckRequest, true> { public: using TRpcRequestActor::TRpcRequestActor; @@ -31,7 +34,7 @@ public: void Bootstrap() { THolder<NHealthCheck::TEvSelfCheckRequest> request = MakeHolder<NHealthCheck::TEvSelfCheckRequest>(); - request->Request = *(Request->GetProtoRequest()); + request->Request = *GetProtoRequest(); if (Request->GetDatabaseName()) { request->Database = Request->GetDatabaseName().GetRef(); } @@ -69,8 +72,8 @@ public: } }; -void TGRpcRequestProxy::Handle(TEvSelfCheckRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TSelfCheckRPC(ev->Release().Release())); +void DoSelfCheckRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TSelfCheckRPC(p.release())); } } // namespace NGRpcService diff --git a/ydb/core/grpc_services/rpc_request_base.h b/ydb/core/grpc_services/rpc_request_base.h index b5cb84fc0b0..71a4bc1cd73 100644 --- a/ydb/core/grpc_services/rpc_request_base.h +++ b/ydb/core/grpc_services/rpc_request_base.h @@ -1,5 +1,7 @@ #pragma once +#include "grpc_request_proxy.h" +#include <ydb/core/grpc_services/base/base.h> #include <ydb/core/base/appdata.h> #include <ydb/core/base/kikimr_issue.h> #include <ydb/core/protos/services.pb.h> @@ -24,6 +26,7 @@ public: using TRequest = typename TEvRequest::TRequest; using TResponse = typename TEvRequest::TResponse; using TOperation = Ydb::Operations::Operation; + typedef typename std::conditional<HasOperation, IRequestOpCtx, IRequestNoOpCtx>::type TRequestCtx; protected: void Reply( @@ -114,12 +117,16 @@ protected: return MakeOperation(status, ErrorToIssues(error)); } + const typename TEvRequest::TRequest* GetProtoRequest() const { + return TEvRequest::GetProtoRequest(Request); + } + public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::GRPC_REQ; } - explicit TRpcRequestActor(TEvRequest* ev) + explicit TRpcRequestActor(TRequestCtx* ev) : Request(ev) , DatabaseName(Request->GetDatabaseName().GetOrElse(DatabaseFromDomain(AppData()))) { @@ -157,7 +164,7 @@ private: } protected: - THolder<TEvRequest> Request; + THolder<TRequestCtx> Request; const TString DatabaseName; THolder<const NACLib::TUserToken> UserToken; diff --git a/ydb/core/grpc_services/rpc_whoami.cpp b/ydb/core/grpc_services/rpc_whoami.cpp index b8b2dadfcf1..b695d15a6ed 100644 --- a/ydb/core/grpc_services/rpc_whoami.cpp +++ b/ydb/core/grpc_services/rpc_whoami.cpp @@ -1,25 +1,32 @@ -#include "grpc_request_proxy.h" -#include "rpc_calls.h" +#include "service_discovery.h" +#include <ydb/core/grpc_services/base/base.h> #include <library/cpp/actors/core/actor_bootstrapped.h> #include <ydb/core/security/ticket_parser.h> - +#include <ydb/public/api/protos/ydb_discovery.pb.h> namespace NKikimr { namespace NGRpcService { +using TEvWhoAmIRequest = TGrpcRequestOperationCall<Ydb::Discovery::WhoAmIRequest, + Ydb::Discovery::WhoAmIResponse>; + class TWhoAmIRPC : public TActorBootstrapped<TWhoAmIRPC> { public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::GRPC_REQ; } - TWhoAmIRPC(TEvWhoAmIRequest::TPtr& request) - : Request(request->Release().Release()) + TWhoAmIRPC(IRequestOpCtx* request) + : Request(request) {} void Bootstrap(const TActorContext& ctx) { - TMaybe<TString> authToken = Request->GetYdbToken(); + //TODO: Do we realy realy need to make call to the ticket parser here??? + //we have done it already in grpc_request_proxy + auto req = dynamic_cast<TEvWhoAmIRequest*>(Request.get()); + Y_VERIFY(req, "Unexpected request type for TWhoAmIRPC"); + TMaybe<TString> authToken = req->GetYdbToken(); if (authToken) { TMaybe<TString> database = Request->GetDatabaseName(); ctx.Send(MakeTicketParserID(), new TEvTicketParser::TEvAuthorizeTicket({ @@ -49,7 +56,7 @@ private: if (!result.Error) { if (result.Token != nullptr) { response->set_user(result.Token->GetUserSID()); - if (Request->GetProtoRequest()->include_groups()) { + if (TEvWhoAmIRequest::GetProtoRequest(Request)->include_groups()) { for (const auto& group : result.Token->GetGroupSIDs()) { response->add_groups(group); } @@ -75,11 +82,11 @@ private: Request->ReplyWithYdbStatus(Ydb::StatusIds::GENERIC_ERROR); } - THolder<TEvWhoAmIRequest> Request; + std::unique_ptr<IRequestOpCtx> Request; }; -void TGRpcRequestProxy::Handle(TEvWhoAmIRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TWhoAmIRPC(ev)); +void DoWhoAmIRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TWhoAmIRPC(p.release())); } } // namespace NGRpcService diff --git a/ydb/core/grpc_services/service_cms.h b/ydb/core/grpc_services/service_cms.h new file mode 100644 index 00000000000..4802ba1b4af --- /dev/null +++ b/ydb/core/grpc_services/service_cms.h @@ -0,0 +1,19 @@ +#pragma once + +#include <memory> + +namespace NKikimr { +namespace NGRpcService { + +class IRequestOpCtx; +class IFacilityProvider; + +void DoCreateTenantRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoAlterTenantRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoGetTenantStatusRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoListTenantsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoRemoveTenantRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDescribeTenantOptionsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); + +} +} diff --git a/ydb/core/grpc_services/service_discovery.h b/ydb/core/grpc_services/service_discovery.h new file mode 100644 index 00000000000..9d1647d9072 --- /dev/null +++ b/ydb/core/grpc_services/service_discovery.h @@ -0,0 +1,15 @@ +#pragma once + +#include <memory> + +namespace NKikimr { +namespace NGRpcService { + +class IRequestOpCtx; +class IFacilityProvider; + +void DoListEndpointsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoWhoAmIRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); + +} +} diff --git a/ydb/core/grpc_services/service_monitoring.h b/ydb/core/grpc_services/service_monitoring.h new file mode 100644 index 00000000000..6ac2928d1af --- /dev/null +++ b/ydb/core/grpc_services/service_monitoring.h @@ -0,0 +1,14 @@ +#pragma once + +#include <memory> + +namespace NKikimr { +namespace NGRpcService { + +class IRequestOpCtx; +class IFacilityProvider; + +void DoSelfCheckRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); + +} +} diff --git a/ydb/core/grpc_services/service_operation.h b/ydb/core/grpc_services/service_operation.h new file mode 100644 index 00000000000..ef476f7c900 --- /dev/null +++ b/ydb/core/grpc_services/service_operation.h @@ -0,0 +1,18 @@ +#pragma once + +#include <memory> + +namespace NKikimr { +namespace NGRpcService { + +class IRequestOpCtx; +class IRequestNoOpCtx; +class IFacilityProvider; + +void DoGetOperationRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoCancelOperationRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider&); +void DoForgetOperationRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider&); +void DoListOperationsRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider&); + +} +} diff --git a/ydb/services/cms/grpc_service.cpp b/ydb/services/cms/grpc_service.cpp index 33a7569982c..737ae3e81d0 100644 --- a/ydb/services/cms/grpc_service.cpp +++ b/ydb/services/cms/grpc_service.cpp @@ -1,8 +1,8 @@ #include "grpc_service.h" +#include <ydb/core/grpc_services/service_cms.h> #include <ydb/core/grpc_services/grpc_helper.h> -#include <ydb/core/grpc_services/grpc_request_proxy.h> -#include <ydb/core/grpc_services/rpc_calls.h> +#include <ydb/core/grpc_services/base/base.h> namespace NKikimr { namespace NGRpcService { @@ -37,35 +37,28 @@ void TGRpcCmsService::DecRequest() { void TGRpcCmsService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); + using namespace Ydb; + #ifdef ADD_REQUEST #error ADD_REQUEST macro already defined #endif -#define ADD_REQUEST(NAME, IN, OUT, ACTION) \ - MakeIntrusive<TGRpcRequest<Ydb::Cms::IN, Ydb::Cms::OUT, TGRpcCmsService>>(this, &Service_, CQ_, \ - [this](NGrpc::IRequestContextBase *ctx) { \ - ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ - ACTION; \ - }, &Ydb::Cms::V1::CmsService::AsyncService::Request ## NAME, \ - #NAME, logger, getCounterBlock("cms", #NAME))->Run(); +#define ADD_REQUEST(NAME, CB) \ + MakeIntrusive<TGRpcRequest<Cms::NAME##Request, Cms::NAME##Response, TGRpcCmsService>> \ + (this, &Service_, CQ_, \ + [this](NGrpc::IRequestContextBase *ctx) { \ + NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ + ActorSystem_->Send(GRpcRequestProxyId_, \ + new TGrpcRequestOperationCall<Cms::NAME##Request, Cms::NAME##Response> \ + (ctx, &CB, TRequestAuxSettings{TRateLimiterMode::Rps, nullptr})); \ + }, &Cms::V1::CmsService::AsyncService::Request ## NAME, \ + #NAME, logger, getCounterBlock("cms", #NAME))->Run(); - ADD_REQUEST(CreateDatabase, CreateDatabaseRequest, CreateDatabaseResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvCreateTenantRequest(ctx)); - }) - ADD_REQUEST(AlterDatabase, AlterDatabaseRequest, AlterDatabaseResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvAlterTenantRequest(ctx)); - }) - ADD_REQUEST(GetDatabaseStatus, GetDatabaseStatusRequest, GetDatabaseStatusResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvGetTenantStatusRequest(ctx)); - }) - ADD_REQUEST(ListDatabases, ListDatabasesRequest, ListDatabasesResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvListTenantsRequest(ctx)); - }) - ADD_REQUEST(RemoveDatabase, RemoveDatabaseRequest, RemoveDatabaseResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvRemoveTenantRequest(ctx)); - }) - ADD_REQUEST(DescribeDatabaseOptions, DescribeDatabaseOptionsRequest, DescribeDatabaseOptionsResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvDescribeTenantOptionsRequest(ctx)); - }) + ADD_REQUEST(CreateDatabase, DoCreateTenantRequest) + ADD_REQUEST(AlterDatabase, DoAlterTenantRequest) + ADD_REQUEST(GetDatabaseStatus, DoGetTenantStatusRequest) + ADD_REQUEST(ListDatabases, DoListTenantsRequest) + ADD_REQUEST(RemoveDatabase, DoRemoveTenantRequest) + ADD_REQUEST(DescribeDatabaseOptions, DoDescribeTenantOptionsRequest) #undef ADD_REQUEST } diff --git a/ydb/services/discovery/grpc_service.cpp b/ydb/services/discovery/grpc_service.cpp index 010322b5d9b..fdc95d95ce5 100644 --- a/ydb/services/discovery/grpc_service.cpp +++ b/ydb/services/discovery/grpc_service.cpp @@ -1,6 +1,7 @@ #include "grpc_service.h" - +#include <ydb/core/grpc_services/service_discovery.h> #include <ydb/core/grpc_services/grpc_helper.h> +#include <ydb/core/grpc_services/base/base.h> #include <ydb/core/grpc_services/grpc_request_proxy.h> #include <ydb/core/grpc_services/rpc_calls.h> @@ -44,25 +45,40 @@ TGRpcDiscoveryService::TGRpcDiscoveryService(NActors::TActorSystem *system, void TGRpcDiscoveryService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); - #ifdef ADD_REQUEST - #error ADD_REQUEST macro already defined - #endif - #define ADD_REQUEST(NAME, IN, OUT, ACTION) \ - MakeIntrusive<TGRpcRequest<Ydb::Discovery::IN, Ydb::Discovery::OUT, TGRpcDiscoveryService>>(this, &Service_, CQ_, \ - [this](NGrpc::IRequestContextBase *reqCtx) { \ - NGRpcService::ReportGrpcReqToMon(*ActorSystem_, reqCtx->GetPeer(), GetSdkBuildInfo(reqCtx)); \ - ACTION; \ - }, &Ydb::Discovery::V1::DiscoveryService::AsyncService::Request ## NAME, \ - #NAME, logger, getCounterBlock("discovery", #NAME))->Run(); + using namespace Ydb; +#ifdef ADD_REQUEST +#error macro already defined +#endif +#define ADD_REQUEST(NAME, CB) \ + MakeIntrusive<TGRpcRequest<Discovery::NAME##Request, Discovery::NAME##Response, TGRpcDiscoveryService>> \ + (this, &Service_, CQ_, \ + [this](NGrpc::IRequestContextBase *ctx) { \ + NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer(), GetSdkBuildInfo(ctx)); \ + ActorSystem_->Send(GRpcRequestProxyId_, \ + new TGrpcRequestOperationCall<Discovery::NAME##Request, Discovery::NAME##Response> \ + (ctx, &CB, TRequestAuxSettings{TRateLimiterMode::Rps, nullptr})); \ + }, &Ydb::Discovery::V1::DiscoveryService::AsyncService::Request ## NAME, \ + #NAME, logger, getCounterBlock("discovery", #NAME))->Run(); + + ADD_REQUEST(WhoAmI, DoWhoAmIRequest) - ADD_REQUEST(ListEndpoints, ListEndpointsRequest, ListEndpointsResponse, { +#ifdef ADD_LEGACY_REQUEST +#error macro already defined +#endif +#define ADD_LEGACY_REQUEST(NAME, IN, OUT, ACTION) \ + MakeIntrusive<TGRpcRequest<Ydb::Discovery::IN, Ydb::Discovery::OUT, TGRpcDiscoveryService>>(this, &Service_, CQ_, \ + [this](NGrpc::IRequestContextBase *reqCtx) { \ + NGRpcService::ReportGrpcReqToMon(*ActorSystem_, reqCtx->GetPeer(), GetSdkBuildInfo(reqCtx)); \ + ACTION; \ + }, &Ydb::Discovery::V1::DiscoveryService::AsyncService::Request ## NAME, \ + #NAME, logger, getCounterBlock("discovery", #NAME))->Run(); + + ADD_LEGACY_REQUEST(ListEndpoints, ListEndpointsRequest, ListEndpointsResponse, { ActorSystem_->Send(GRpcRequestProxyId_, new TEvListEndpointsRequest(reqCtx)); }) - ADD_REQUEST(WhoAmI, WhoAmIRequest, WhoAmIResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvWhoAmIRequest(reqCtx)); - }) - #undef ADD_REQUEST +#undef ADD_REQUEST +#undef ADD_LEGACY_REQUEST } } // namespace NGRpcService diff --git a/ydb/services/monitoring/grpc_service.cpp b/ydb/services/monitoring/grpc_service.cpp index d09fe4f3a4f..c7fe67c7415 100644 --- a/ydb/services/monitoring/grpc_service.cpp +++ b/ydb/services/monitoring/grpc_service.cpp @@ -1,7 +1,8 @@ #include "grpc_service.h" #include <ydb/core/grpc_services/grpc_helper.h> -#include <ydb/core/grpc_services/rpc_calls.h> +#include <ydb/core/grpc_services/service_monitoring.h> +#include <ydb/core/grpc_services/base/base.h> namespace NKikimr { namespace NGRpcService { @@ -43,20 +44,23 @@ void TGRpcMonitoringService::DecRequest() { void TGRpcMonitoringService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); + using namespace Ydb; + #ifdef ADD_REQUEST #error ADD_REQUEST macro already defined #endif -#define ADD_REQUEST(NAME, IN, OUT, ACTION) \ - MakeIntrusive<TGRpcRequest<Ydb::Monitoring::IN, Ydb::Monitoring::OUT, TGRpcMonitoringService>>(this, &Service_, CQ_, \ - [this](NGrpc::IRequestContextBase* reqCtx) { \ - NGRpcService::ReportGrpcReqToMon(*ActorSystem_, reqCtx->GetPeer(), GetSdkBuildInfo(reqCtx)); \ - ACTION; \ - }, &Ydb::Monitoring::V1::MonitoringService::AsyncService::Request ## NAME, \ - #NAME, logger, getCounterBlock("monitoring", #NAME))->Run(); +#define ADD_REQUEST(NAME, CB) \ + MakeIntrusive<TGRpcRequest<Monitoring::NAME##Request, Monitoring::NAME##Response, TGRpcMonitoringService>> \ + (this, &Service_, CQ_, \ + [this](NGrpc::IRequestContextBase *ctx) { \ + NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer(), GetSdkBuildInfo(ctx)); \ + ActorSystem_->Send(GRpcRequestProxyId_, \ + new TGrpcRequestOperationCall<Monitoring::NAME##Request, Monitoring::NAME##Response> \ + (ctx, &CB, TRequestAuxSettings{TRateLimiterMode::Off, nullptr})); \ + }, &Ydb::Monitoring::V1::MonitoringService::AsyncService::Request ## NAME, \ + #NAME, logger, getCounterBlock("monitoring", #NAME))->Run(); - ADD_REQUEST(SelfCheck, SelfCheckRequest, SelfCheckResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvSelfCheckRequest(reqCtx)); - }) + ADD_REQUEST(SelfCheck, DoSelfCheckRequest) #undef ADD_REQUEST } diff --git a/ydb/services/ydb/ydb_operation.cpp b/ydb/services/ydb/ydb_operation.cpp index 602980027cb..4d7d63a4331 100644 --- a/ydb/services/ydb/ydb_operation.cpp +++ b/ydb/services/ydb/ydb_operation.cpp @@ -1,8 +1,9 @@ #include "ydb_operation.h" +#include <ydb/core/grpc_services/service_operation.h> #include <ydb/core/grpc_services/grpc_helper.h> -#include <ydb/core/grpc_services/grpc_request_proxy.h> -#include <ydb/core/grpc_services/rpc_calls.h> +#include <ydb/core/grpc_services/base/base.h> +#include <ydb/public/api/protos/ydb_operation.pb.h> namespace NKikimr { namespace NGRpcService { @@ -33,29 +34,27 @@ void TGRpcOperationService::DecRequest() { void TGRpcOperationService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); + using namespace Ydb; + #ifdef ADD_REQUEST #error ADD_REQUEST macro already defined #endif -#define ADD_REQUEST(NAME, IN, OUT, ACTION) \ - MakeIntrusive<TGRpcRequest<Ydb::Operations::IN, Ydb::Operations::OUT, TGRpcOperationService>>(this, &Service_, CQ_, \ - [this](NGrpc::IRequestContextBase *ctx) { \ - NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ - ACTION; \ - }, &Ydb::Operation::V1::OperationService::AsyncService::Request ## NAME, \ - #NAME, logger, getCounterBlock("operation", #NAME))->Run(); - - ADD_REQUEST(GetOperation, GetOperationRequest, GetOperationResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvGetOperationRequest(ctx)); - }) - ADD_REQUEST(CancelOperation, CancelOperationRequest, CancelOperationResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvCancelOperationRequest(ctx)); - }) - ADD_REQUEST(ForgetOperation, ForgetOperationRequest, ForgetOperationResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvForgetOperationRequest(ctx)); - }) - ADD_REQUEST(ListOperations, ListOperationsRequest, ListOperationsResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvListOperationsRequest(ctx)); - }) +#define ADD_REQUEST(NAME, CB, TCALL) \ + MakeIntrusive<TGRpcRequest<Operations::NAME##Request, Operations::NAME##Response, TGRpcOperationService>> \ + (this, &Service_, CQ_, \ + [this](NGrpc::IRequestContextBase *ctx) { \ + NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ + ActorSystem_->Send(GRpcRequestProxyId_, \ + new TCALL<Operations::NAME##Request, Operations::NAME##Response> \ + (ctx, &CB, TRequestAuxSettings{TRateLimiterMode::Rps, nullptr})); \ + }, &Operation::V1::OperationService::AsyncService::Request ## NAME, \ + #NAME, logger, getCounterBlock("operation", #NAME))->Run(); + + ADD_REQUEST(GetOperation, DoGetOperationRequest, TGrpcRequestOperationCall) + ADD_REQUEST(CancelOperation, DoCancelOperationRequest, TGrpcRequestNoOperationCall) + ADD_REQUEST(ForgetOperation, DoForgetOperationRequest, TGrpcRequestNoOperationCall) + ADD_REQUEST(ListOperations, DoListOperationsRequest, TGrpcRequestNoOperationCall) + #undef ADD_REQUEST } |