diff options
| author | Daniil Cherednik <[email protected]> | 2022-04-01 11:52:06 +0300 |
|---|---|---|
| committer | Daniil Cherednik <[email protected]> | 2022-04-01 11:52:06 +0300 |
| commit | e84a22027f7a0a962827c8d6be4968a810b042db (patch) | |
| tree | 0fa6bf8be5465a002ef4d31689865afeccb9c253 | |
| parent | 05091b7d4e382df88a2480b582e800943634518d (diff) | |
Invert grpc_request_proxy dependence for coordination service (control plane) KIKIMR-13646
ref:762156edf55afdc30a10288b74119e5056b9e5e7
| -rw-r--r-- | ydb/core/grpc_services/grpc_request_proxy.cpp | 4 | ||||
| -rw-r--r-- | ydb/core/grpc_services/grpc_request_proxy.h | 4 | ||||
| -rw-r--r-- | ydb/core/grpc_services/rpc_alter_coordination_node.cpp | 13 | ||||
| -rw-r--r-- | ydb/core/grpc_services/rpc_calls.h | 6 | ||||
| -rw-r--r-- | ydb/core/grpc_services/rpc_create_coordination_node.cpp | 13 | ||||
| -rw-r--r-- | ydb/core/grpc_services/rpc_describe_coordination_node.cpp | 15 | ||||
| -rw-r--r-- | ydb/core/grpc_services/rpc_drop_coordination_node.cpp | 13 | ||||
| -rw-r--r-- | ydb/core/grpc_services/service_coordination.h | 17 | ||||
| -rw-r--r-- | ydb/services/kesus/grpc_service.cpp | 26 |
9 files changed, 60 insertions, 51 deletions
diff --git a/ydb/core/grpc_services/grpc_request_proxy.cpp b/ydb/core/grpc_services/grpc_request_proxy.cpp index deda24cdf01..e35cdda7d30 100644 --- a/ydb/core/grpc_services/grpc_request_proxy.cpp +++ b/ydb/core/grpc_services/grpc_request_proxy.cpp @@ -568,10 +568,6 @@ void TGRpcRequestProxyImpl::StateFunc(TAutoPtr<IEventHandle>& ev, const TActorCo HFunc(TRefreshTokenImpl, PreHandle); HFunc(TEvLoginRequest, PreHandle); HFunc(TEvListEndpointsRequest, PreHandle); - HFunc(TEvCreateCoordinationNode, PreHandle); - HFunc(TEvAlterCoordinationNode, PreHandle); - HFunc(TEvDropCoordinationNode, PreHandle); - HFunc(TEvDescribeCoordinationNode, PreHandle); HFunc(TEvReadColumnsRequest, PreHandle); HFunc(TEvGetShardLocationsRequest, PreHandle); HFunc(TEvKikhouseDescribeTableRequest, PreHandle); diff --git a/ydb/core/grpc_services/grpc_request_proxy.h b/ydb/core/grpc_services/grpc_request_proxy.h index adeabb56a2f..d1cfe65d855 100644 --- a/ydb/core/grpc_services/grpc_request_proxy.h +++ b/ydb/core/grpc_services/grpc_request_proxy.h @@ -50,10 +50,6 @@ public: protected: void Handle(TEvListEndpointsRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvCreateCoordinationNode::TPtr& ev, const TActorContext& ctx); - void Handle(TEvAlterCoordinationNode::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDropCoordinationNode::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDescribeCoordinationNode::TPtr& ev, const TActorContext& ctx); void Handle(TEvReadColumnsRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvGetShardLocationsRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvKikhouseDescribeTableRequest::TPtr& ev, const TActorContext& ctx); diff --git a/ydb/core/grpc_services/rpc_alter_coordination_node.cpp b/ydb/core/grpc_services/rpc_alter_coordination_node.cpp index 52cfe8c9403..58b36eda9d6 100644 --- a/ydb/core/grpc_services/rpc_alter_coordination_node.cpp +++ b/ydb/core/grpc_services/rpc_alter_coordination_node.cpp @@ -1,6 +1,6 @@ -#include "grpc_request_proxy.h" +#include "service_coordination.h" +#include <ydb/core/grpc_services/base/base.h> -#include "rpc_calls.h" #include "rpc_scheme_base.h" #include "rpc_common.h" @@ -10,11 +10,14 @@ namespace NGRpcService { using namespace NActors; using namespace Ydb; +using TEvAlterCoordinationNode = TGrpcRequestOperationCall<Ydb::Coordination::AlterNodeRequest, + Ydb::Coordination::AlterNodeResponse>; + class TAlterCoordinationNodeRPC : public TRpcSchemeRequestActor<TAlterCoordinationNodeRPC, TEvAlterCoordinationNode> { using TBase = TRpcSchemeRequestActor<TAlterCoordinationNodeRPC, TEvAlterCoordinationNode>; public: - TAlterCoordinationNodeRPC(TEvAlterCoordinationNode* msg) + TAlterCoordinationNodeRPC(IRequestOpCtx* msg) : TBase(msg) {} void Bootstrap(const TActorContext& ctx) { @@ -57,8 +60,8 @@ private: } }; -void TGRpcRequestProxy::Handle(TEvAlterCoordinationNode::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TAlterCoordinationNodeRPC(ev->Release().Release())); +void DoAlterCoordinationNode(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TAlterCoordinationNodeRPC(p.release())); } } // namespace NKikimr diff --git a/ydb/core/grpc_services/rpc_calls.h b/ydb/core/grpc_services/rpc_calls.h index 9224eed750f..bad9473fb64 100644 --- a/ydb/core/grpc_services/rpc_calls.h +++ b/ydb/core/grpc_services/rpc_calls.h @@ -45,16 +45,13 @@ void FillYdbStatus(Ydb::Coordination::SessionResponse& resp, const NYql::TIssues using TEvListEndpointsRequest = TGRpcRequestWrapper<TRpcServices::EvListEndpoints, Ydb::Discovery::ListEndpointsRequest, Ydb::Discovery::ListEndpointsResponse, true>; -using TEvCreateCoordinationNode = TGRpcRequestWrapper<TRpcServices::EvCreateCoordinationNode, Ydb::Coordination::CreateNodeRequest, Ydb::Coordination::CreateNodeResponse, true, TRateLimiterMode::Rps>; -using TEvAlterCoordinationNode = TGRpcRequestWrapper<TRpcServices::EvAlterCoordinationNode, Ydb::Coordination::AlterNodeRequest, Ydb::Coordination::AlterNodeResponse, true, TRateLimiterMode::Rps>; -using TEvDropCoordinationNode = TGRpcRequestWrapper<TRpcServices::EvDropCoordinationNode, Ydb::Coordination::DropNodeRequest, Ydb::Coordination::DropNodeResponse, true, TRateLimiterMode::Rps>; -using TEvDescribeCoordinationNode = TGRpcRequestWrapper<TRpcServices::EvDescribeCoordinationNode, Ydb::Coordination::DescribeNodeRequest, Ydb::Coordination::DescribeNodeResponse, true, TRateLimiterMode::Rps>; using TEvReadColumnsRequest = TGRpcRequestWrapper<TRpcServices::EvReadColumns, Ydb::ClickhouseInternal::ScanRequest, Ydb::ClickhouseInternal::ScanResponse, true>; using TEvGetShardLocationsRequest = TGRpcRequestWrapper<TRpcServices::EvGetShardLocations, Ydb::ClickhouseInternal::GetShardLocationsRequest, Ydb::ClickhouseInternal::GetShardLocationsResponse, true>; using TEvKikhouseDescribeTableRequest = TGRpcRequestWrapper<TRpcServices::EvKikhouseDescribeTable, Ydb::ClickhouseInternal::DescribeTableRequest, Ydb::ClickhouseInternal::DescribeTableResponse, true>; using TEvS3ListingRequest = TGRpcRequestWrapper<TRpcServices::EvS3Listing, Ydb::S3Internal::S3ListingRequest, Ydb::S3Internal::S3ListingResponse, true>; using TEvBiStreamPingRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvBiStreamPing, Draft::Dummy::PingRequest, Draft::Dummy::PingResponse>; using TEvExperimentalStreamQueryRequest = TGRpcRequestWrapper<TRpcServices::EvExperimentalStreamQuery, Ydb::Experimental::ExecuteStreamQueryRequest, Ydb::Experimental::ExecuteStreamQueryResponse, false>; + using TEvStreamPQWriteRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamPQWrite, Ydb::PersQueue::V1::StreamingWriteClientMessage, Ydb::PersQueue::V1::StreamingWriteServerMessage>; using TEvStreamPQReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamPQRead, Ydb::PersQueue::V1::MigrationStreamingReadClientMessage, Ydb::PersQueue::V1::MigrationStreamingReadServerMessage>; using TEvPQReadInfoRequest = TGRpcRequestWrapper<TRpcServices::EvPQReadInfo, Ydb::PersQueue::V1::ReadInfoRequest, Ydb::PersQueue::V1::ReadInfoResponse, true>; @@ -64,6 +61,7 @@ using TEvPQAlterTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQA using TEvPQDescribeTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQDescribeTopic, Ydb::PersQueue::V1::DescribeTopicRequest, Ydb::PersQueue::V1::DescribeTopicResponse, true>; using TEvPQAddReadRuleRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQAddReadRule, Ydb::PersQueue::V1::AddReadRuleRequest, Ydb::PersQueue::V1::AddReadRuleResponse, true>; using TEvPQRemoveReadRuleRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQRemoveReadRule, Ydb::PersQueue::V1::RemoveReadRuleRequest, Ydb::PersQueue::V1::RemoveReadRuleResponse, true>; + using TEvExportToYtRequest = TGRpcRequestValidationWrapper<TRpcServices::EvExportToYt, Ydb::Export::ExportToYtRequest, Ydb::Export::ExportToYtResponse, true>; using TEvExportToS3Request = TGRpcRequestValidationWrapper<TRpcServices::EvExportToS3, Ydb::Export::ExportToS3Request, Ydb::Export::ExportToS3Response, true>; using TEvImportFromS3Request = TGRpcRequestValidationWrapper<TRpcServices::EvImportFromS3, Ydb::Import::ImportFromS3Request, Ydb::Import::ImportFromS3Response, true>; diff --git a/ydb/core/grpc_services/rpc_create_coordination_node.cpp b/ydb/core/grpc_services/rpc_create_coordination_node.cpp index 11c838592ca..f96fe2f4ef6 100644 --- a/ydb/core/grpc_services/rpc_create_coordination_node.cpp +++ b/ydb/core/grpc_services/rpc_create_coordination_node.cpp @@ -1,6 +1,6 @@ -#include "grpc_request_proxy.h" +#include "service_coordination.h" +#include <ydb/core/grpc_services/base/base.h> -#include "rpc_calls.h" #include "rpc_scheme_base.h" #include "rpc_common.h" @@ -10,11 +10,14 @@ namespace NGRpcService { using namespace NActors; using namespace Ydb; +using TEvCreateCoordinationNode = TGrpcRequestOperationCall<Ydb::Coordination::CreateNodeRequest, + Ydb::Coordination::CreateNodeResponse>; + class TCreateCoordinationNodeRPC : public TRpcSchemeRequestActor<TCreateCoordinationNodeRPC, TEvCreateCoordinationNode> { using TBase = TRpcSchemeRequestActor<TCreateCoordinationNodeRPC, TEvCreateCoordinationNode>; public: - TCreateCoordinationNodeRPC(TEvCreateCoordinationNode* msg) + TCreateCoordinationNodeRPC(IRequestOpCtx* msg) : TBase(msg) {} void Bootstrap(const TActorContext& ctx) { @@ -55,8 +58,8 @@ private: } }; -void TGRpcRequestProxy::Handle(TEvCreateCoordinationNode::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TCreateCoordinationNodeRPC(ev->Release().Release())); +void DoCreateCoordinationNode(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TCreateCoordinationNodeRPC(p.release())); } } // namespace NKikimr diff --git a/ydb/core/grpc_services/rpc_describe_coordination_node.cpp b/ydb/core/grpc_services/rpc_describe_coordination_node.cpp index 04c16bf32e8..291cd267cb5 100644 --- a/ydb/core/grpc_services/rpc_describe_coordination_node.cpp +++ b/ydb/core/grpc_services/rpc_describe_coordination_node.cpp @@ -1,6 +1,6 @@ -#include "grpc_request_proxy.h" +#include "service_coordination.h" +#include <ydb/core/grpc_services/base/base.h> -#include "rpc_calls.h" #include "rpc_scheme_base.h" #include "rpc_common.h" @@ -14,11 +14,14 @@ namespace NGRpcService { using namespace NActors; using namespace Ydb; +using TEvDescribeCoordinationNode = TGrpcRequestOperationCall<Ydb::Coordination::DescribeNodeRequest, + Ydb::Coordination::DescribeNodeResponse>; + class TDescribeCoordinationNode : public TRpcSchemeRequestActor<TDescribeCoordinationNode, TEvDescribeCoordinationNode> { using TBase = TRpcSchemeRequestActor<TDescribeCoordinationNode, TEvDescribeCoordinationNode>; public: - TDescribeCoordinationNode(TEvDescribeCoordinationNode* msg) + TDescribeCoordinationNode(IRequestOpCtx* msg) : TBase(msg) {} void Bootstrap(const TActorContext &ctx) { @@ -91,12 +94,10 @@ private: ctx.Send(MakeTxProxyID(), navigateRequest.release()); } - - }; -void TGRpcRequestProxy::Handle(TEvDescribeCoordinationNode::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TDescribeCoordinationNode(ev->Release().Release())); +void DoDescribeCoordinationNode(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TDescribeCoordinationNode(p.release())); } } diff --git a/ydb/core/grpc_services/rpc_drop_coordination_node.cpp b/ydb/core/grpc_services/rpc_drop_coordination_node.cpp index 3632ec58723..7298df3223c 100644 --- a/ydb/core/grpc_services/rpc_drop_coordination_node.cpp +++ b/ydb/core/grpc_services/rpc_drop_coordination_node.cpp @@ -1,6 +1,6 @@ -#include "grpc_request_proxy.h" +#include "service_coordination.h" +#include <ydb/core/grpc_services/base/base.h> -#include "rpc_calls.h" #include "rpc_scheme_base.h" #include "rpc_common.h" @@ -10,11 +10,14 @@ namespace NGRpcService { using namespace NActors; using namespace Ydb; +using TEvDropCoordinationNode = TGrpcRequestOperationCall<Ydb::Coordination::DropNodeRequest, + Ydb::Coordination::DropNodeResponse>; + class TDropCoordinationNodeRPC : public TRpcSchemeRequestActor<TDropCoordinationNodeRPC, TEvDropCoordinationNode> { using TBase = TRpcSchemeRequestActor<TDropCoordinationNodeRPC, TEvDropCoordinationNode>; public: - TDropCoordinationNodeRPC(TEvDropCoordinationNode* msg) + TDropCoordinationNodeRPC(IRequestOpCtx* msg) : TBase(msg) {} void Bootstrap(const TActorContext &ctx) { @@ -54,8 +57,8 @@ private: } }; -void TGRpcRequestProxy::Handle(TEvDropCoordinationNode::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TDropCoordinationNodeRPC(ev->Release().Release())); +void DoDropCoordinationNode(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TDropCoordinationNodeRPC(p.release())); } } // namespace NKikimr diff --git a/ydb/core/grpc_services/service_coordination.h b/ydb/core/grpc_services/service_coordination.h new file mode 100644 index 00000000000..94842d3b695 --- /dev/null +++ b/ydb/core/grpc_services/service_coordination.h @@ -0,0 +1,17 @@ +#pragma once +#include <memory> + +namespace NKikimr { +namespace NGRpcService { + +class IRequestOpCtx; +class IFacilityProvider; + +void DoCreateCoordinationNode(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoAlterCoordinationNode(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDropCoordinationNode(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDescribeCoordinationNode(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); + +} +} + diff --git a/ydb/services/kesus/grpc_service.cpp b/ydb/services/kesus/grpc_service.cpp index c5826b66274..e297e00fe01 100644 --- a/ydb/services/kesus/grpc_service.cpp +++ b/ydb/services/kesus/grpc_service.cpp @@ -7,6 +7,7 @@ #include <ydb/core/base/appdata.h> #include <ydb/core/grpc_services/grpc_helper.h> #include <ydb/core/grpc_services/grpc_request_proxy.h> +#include <ydb/core/grpc_services/service_coordination.h> #include <ydb/core/grpc_services/rpc_calls.h> #include <ydb/core/grpc_streaming/grpc_streaming.h> #include <ydb/core/base/ticket_parser.h> @@ -644,35 +645,26 @@ void TKesusGRpcService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { #error ADD_REQUEST macro is already defined #endif -#define ADD_REQUEST(NAME, IN, OUT, ACTION) \ +#define ADD_REQUEST(NAME, IN, OUT, CB) \ MakeIntrusive<NGRpcService::TGRpcRequest<Ydb::Coordination::IN, Ydb::Coordination::OUT, TKesusGRpcService>>( \ this, \ &Service_, \ CQ, \ [this](NGrpc::IRequestContextBase* reqCtx) { \ NGRpcService::ReportGrpcReqToMon(*ActorSystem, reqCtx->GetPeer()); \ - ACTION; \ + ActorSystem->Send(GRpcRequestProxyId, \ + new NGRpcService::TGrpcRequestOperationCall<Ydb::Coordination::IN, Ydb::Coordination::OUT> \ + (reqCtx, &CB, NGRpcService::TRequestAuxSettings{NGRpcService::TRateLimiterMode::Rps, nullptr})); \ }, \ &Ydb::Coordination::V1::CoordinationService::AsyncService::Request ## NAME, \ "Coordination/" #NAME, \ logger, \ getCounterBlock("coordination", #NAME))->Run(); - ADD_REQUEST(CreateNode, CreateNodeRequest, CreateNodeResponse, { - ActorSystem->Send(GRpcRequestProxyId, new NGRpcService::TEvCreateCoordinationNode(reqCtx)); - }); - - ADD_REQUEST(AlterNode, AlterNodeRequest, AlterNodeResponse, { - ActorSystem->Send(GRpcRequestProxyId, new NGRpcService::TEvAlterCoordinationNode(reqCtx)); - }); - - ADD_REQUEST(DropNode, DropNodeRequest, DropNodeResponse, { - ActorSystem->Send(GRpcRequestProxyId, new NGRpcService::TEvDropCoordinationNode(reqCtx)); - }); - - ADD_REQUEST(DescribeNode, DescribeNodeRequest, DescribeNodeResponse, { - ActorSystem->Send(GRpcRequestProxyId, new NGRpcService::TEvDescribeCoordinationNode(reqCtx)); - }); + ADD_REQUEST(CreateNode, CreateNodeRequest, CreateNodeResponse, NGRpcService::DoCreateCoordinationNode); + ADD_REQUEST(AlterNode, AlterNodeRequest, AlterNodeResponse, NGRpcService::DoAlterCoordinationNode); + ADD_REQUEST(DropNode, DropNodeRequest, DropNodeResponse, NGRpcService::DoDropCoordinationNode); + ADD_REQUEST(DescribeNode, DescribeNodeRequest, DescribeNodeResponse, NGRpcService::DoDescribeCoordinationNode); #undef ADD_REQUEST |
