diff options
author | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-04-13 21:13:30 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-04-13 21:13:30 +0300 |
commit | 642a79321672b18325904452fa97fa708767a0c4 (patch) | |
tree | eb0bbd479fcc0cf842af2136dfbb77c8b4546d49 | |
parent | 73c0ec18907383621ed0d2df671285a8c1a99d27 (diff) | |
download | ydb-642a79321672b18325904452fa97fa708767a0c4.tar.gz |
Invert grpc_request_proxy dependence for ratelimiter service. KIKIMR-13646
ref:3abfc579472bd8a24448293f79f620e2193df97b
-rw-r--r-- | ydb/core/grpc_services/grpc_request_check_actor.h | 1 | ||||
-rw-r--r-- | ydb/core/grpc_services/grpc_request_proxy.cpp | 6 | ||||
-rw-r--r-- | ydb/core/grpc_services/grpc_request_proxy.h | 6 | ||||
-rw-r--r-- | ydb/core/grpc_services/local_rate_limiter.cpp | 1 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_calls.h | 8 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_rate_limiter_api.cpp | 27 | ||||
-rw-r--r-- | ydb/core/grpc_services/service_ratelimiter.h | 19 | ||||
-rw-r--r-- | ydb/core/grpc_services/service_ratelimiter_events.h | 23 | ||||
-rw-r--r-- | ydb/services/rate_limiter/grpc_service.cpp | 27 |
9 files changed, 75 insertions, 43 deletions
diff --git a/ydb/core/grpc_services/grpc_request_check_actor.h b/ydb/core/grpc_services/grpc_request_check_actor.h index e04d412fde..52342d0108 100644 --- a/ydb/core/grpc_services/grpc_request_check_actor.h +++ b/ydb/core/grpc_services/grpc_request_check_actor.h @@ -1,5 +1,6 @@ #pragma once #include "defs.h" +#include "service_ratelimiter_events.h" #include "local_rate_limiter.h" #include "operation_helpers.h" #include "rpc_calls.h" diff --git a/ydb/core/grpc_services/grpc_request_proxy.cpp b/ydb/core/grpc_services/grpc_request_proxy.cpp index 7cd5e3c9ec..0ae6479fee 100644 --- a/ydb/core/grpc_services/grpc_request_proxy.cpp +++ b/ydb/core/grpc_services/grpc_request_proxy.cpp @@ -581,12 +581,6 @@ void TGRpcRequestProxyImpl::StateFunc(TAutoPtr<IEventHandle>& ev, const TActorCo HFunc(TEvPQRemoveReadRuleRequest, PreHandle); HFunc(TEvPQDescribeTopicRequest, PreHandle); HFunc(TEvDiscoverPQClustersRequest, PreHandle); - HFunc(TEvCreateRateLimiterResource, PreHandle); - HFunc(TEvAlterRateLimiterResource, PreHandle); - HFunc(TEvDropRateLimiterResource, PreHandle); - HFunc(TEvListRateLimiterResources, PreHandle); - HFunc(TEvDescribeRateLimiterResource, PreHandle); - HFunc(TEvAcquireRateLimiterResource, PreHandle); HFunc(TEvCoordinationSessionRequest, PreHandle); HFunc(TEvProxyRuntimeEvent, PreHandle); diff --git a/ydb/core/grpc_services/grpc_request_proxy.h b/ydb/core/grpc_services/grpc_request_proxy.h index 500ea9f779..3074e08799 100644 --- a/ydb/core/grpc_services/grpc_request_proxy.h +++ b/ydb/core/grpc_services/grpc_request_proxy.h @@ -63,12 +63,6 @@ protected: void Handle(TEvPQRemoveReadRuleRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvPQDescribeTopicRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvDiscoverPQClustersRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvCreateRateLimiterResource::TPtr& ev, const TActorContext& ctx); - void Handle(TEvAlterRateLimiterResource::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDropRateLimiterResource::TPtr& ev, const TActorContext& ctx); - void Handle(TEvListRateLimiterResources::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDescribeRateLimiterResource::TPtr& ev, const TActorContext& ctx); - void Handle(TEvAcquireRateLimiterResource::TPtr& ev, const TActorContext& ctx); void Handle(TEvLoginRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvCoordinationSessionRequest::TPtr& ev, const TActorContext& ctx); diff --git a/ydb/core/grpc_services/local_rate_limiter.cpp b/ydb/core/grpc_services/local_rate_limiter.cpp index 3a2fcb78cb..5f4a9c7d1e 100644 --- a/ydb/core/grpc_services/local_rate_limiter.cpp +++ b/ydb/core/grpc_services/local_rate_limiter.cpp @@ -1,4 +1,5 @@ #include "local_rate_limiter.h" +#include "service_ratelimiter_events.h" #include "rpc_common.h" #include <ydb/core/grpc_services/local_rpc/local_rpc.h> diff --git a/ydb/core/grpc_services/rpc_calls.h b/ydb/core/grpc_services/rpc_calls.h index 949e353343..536d3a0e40 100644 --- a/ydb/core/grpc_services/rpc_calls.h +++ b/ydb/core/grpc_services/rpc_calls.h @@ -16,7 +16,6 @@ #include <ydb/public/api/protos/ydb_s3_internal.pb.h> #include <ydb/public/api/protos/ydb_persqueue_cluster_discovery.pb.h> #include <ydb/public/api/protos/ydb_persqueue_v1.pb.h> -#include <ydb/public/api/protos/ydb_rate_limiter.pb.h> #include <ydb/public/api/protos/yq.pb.h> @@ -58,15 +57,10 @@ using TEvPQAddReadRuleRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQ using TEvPQRemoveReadRuleRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQRemoveReadRule, Ydb::PersQueue::V1::RemoveReadRuleRequest, Ydb::PersQueue::V1::RemoveReadRuleResponse, true>; using TEvDiscoverPQClustersRequest = TGRpcRequestWrapper<TRpcServices::EvDiscoverPQClusters, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResponse, true>; -using TEvCreateRateLimiterResource = TGRpcRequestWrapper<TRpcServices::EvCreateRateLimiterResource, Ydb::RateLimiter::CreateResourceRequest, Ydb::RateLimiter::CreateResourceResponse, true, TRateLimiterMode::Rps>; -using TEvAlterRateLimiterResource = TGRpcRequestWrapper<TRpcServices::EvAlterRateLimiterResource, Ydb::RateLimiter::AlterResourceRequest, Ydb::RateLimiter::AlterResourceResponse, true, TRateLimiterMode::Rps>; -using TEvDropRateLimiterResource = TGRpcRequestWrapper<TRpcServices::EvDropRateLimiterResource, Ydb::RateLimiter::DropResourceRequest, Ydb::RateLimiter::DropResourceResponse, true, TRateLimiterMode::Rps>; -using TEvListRateLimiterResources = TGRpcRequestWrapper<TRpcServices::EvListRateLimiterResources, Ydb::RateLimiter::ListResourcesRequest, Ydb::RateLimiter::ListResourcesResponse, true, TRateLimiterMode::Rps>; -using TEvDescribeRateLimiterResource = TGRpcRequestWrapper<TRpcServices::EvDescribeRateLimiterResource, Ydb::RateLimiter::DescribeResourceRequest, Ydb::RateLimiter::DescribeResourceResponse, true, TRateLimiterMode::Rps>; -using TEvAcquireRateLimiterResource = TGRpcRequestWrapper<TRpcServices::EvAcquireRateLimiterResource, Ydb::RateLimiter::AcquireResourceRequest, Ydb::RateLimiter::AcquireResourceResponse, true>; using TEvLoginRequest = TGRpcRequestWrapperNoAuth<TRpcServices::EvLogin, Ydb::Auth::LoginRequest, Ydb::Auth::LoginResponse>; using TEvCoordinationSessionRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvCoordinationSession, Ydb::Coordination::SessionRequest, Ydb::Coordination::SessionResponse>; + } // namespace NGRpcService } // namespace NKikimr diff --git a/ydb/core/grpc_services/rpc_rate_limiter_api.cpp b/ydb/core/grpc_services/rpc_rate_limiter_api.cpp index 3e27a32d95..a59859b33b 100644 --- a/ydb/core/grpc_services/rpc_rate_limiter_api.cpp +++ b/ydb/core/grpc_services/rpc_rate_limiter_api.cpp @@ -1,4 +1,5 @@ -#include "grpc_request_proxy.h" +#include "service_ratelimiter.h" +#include "service_ratelimiter_events.h" #include "rpc_calls.h" #include "rpc_scheme_base.h" @@ -494,28 +495,28 @@ public: } // namespace -void TGRpcRequestProxy::Handle(TEvCreateRateLimiterResource::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TCreateRateLimiterResourceRPC(ev->Release().Release())); +void DoCreateRateLimiterResource(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TCreateRateLimiterResourceRPC(p.release())); } -void TGRpcRequestProxy::Handle(TEvAlterRateLimiterResource::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TAlterRateLimiterResourceRPC(ev->Release().Release())); +void DoAlterRateLimiterResource(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TAlterRateLimiterResourceRPC(p.release())); } -void TGRpcRequestProxy::Handle(TEvDropRateLimiterResource::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TDropRateLimiterResourceRPC(ev->Release().Release())); +void DoDropRateLimiterResource(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TDropRateLimiterResourceRPC(p.release())); } -void TGRpcRequestProxy::Handle(TEvListRateLimiterResources::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TListRateLimiterResourcesRPC(ev->Release().Release())); +void DoListRateLimiterResources(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TListRateLimiterResourcesRPC(p.release())); } -void TGRpcRequestProxy::Handle(TEvDescribeRateLimiterResource::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TDescribeRateLimiterResourceRPC(ev->Release().Release())); +void DoDescribeRateLimiterResource(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TDescribeRateLimiterResourceRPC(p.release())); } -void TGRpcRequestProxy::Handle(TEvAcquireRateLimiterResource::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TAcquireRateLimiterResourceRPC(ev->Release().Release())); +void DoAcquireRateLimiterResource(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TAcquireRateLimiterResourceRPC(p.release())); } template<> diff --git a/ydb/core/grpc_services/service_ratelimiter.h b/ydb/core/grpc_services/service_ratelimiter.h new file mode 100644 index 0000000000..003c706edc --- /dev/null +++ b/ydb/core/grpc_services/service_ratelimiter.h @@ -0,0 +1,19 @@ +#pragma once + +#include <memory> + +namespace NKikimr { +namespace NGRpcService { + +class IRequestOpCtx; +class IFacilityProvider; + +void DoCreateRateLimiterResource(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoAlterRateLimiterResource(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDropRateLimiterResource(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoListRateLimiterResources(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDescribeRateLimiterResource(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoAcquireRateLimiterResource(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); + +} +} diff --git a/ydb/core/grpc_services/service_ratelimiter_events.h b/ydb/core/grpc_services/service_ratelimiter_events.h new file mode 100644 index 0000000000..edd96653eb --- /dev/null +++ b/ydb/core/grpc_services/service_ratelimiter_events.h @@ -0,0 +1,23 @@ +#pragma once + +#include <ydb/core/grpc_services/base/base.h> +#include <ydb/public/api/protos/ydb_rate_limiter.pb.h> + +namespace NKikimr { +namespace NGRpcService { + +using TEvCreateRateLimiterResource = TGrpcRequestOperationCall<Ydb::RateLimiter::CreateResourceRequest, + Ydb::RateLimiter::CreateResourceResponse>; +using TEvAlterRateLimiterResource = TGrpcRequestOperationCall<Ydb::RateLimiter::AlterResourceRequest, + Ydb::RateLimiter::AlterResourceResponse>; +using TEvDropRateLimiterResource = TGrpcRequestOperationCall<Ydb::RateLimiter::DropResourceRequest, + Ydb::RateLimiter::DropResourceResponse>; +using TEvListRateLimiterResources = TGrpcRequestOperationCall<Ydb::RateLimiter::ListResourcesRequest, + Ydb::RateLimiter::ListResourcesResponse>; +using TEvDescribeRateLimiterResource = TGrpcRequestOperationCall<Ydb::RateLimiter::DescribeResourceRequest, + Ydb::RateLimiter::DescribeResourceResponse>; +using TEvAcquireRateLimiterResource = TGrpcRequestOperationCall<Ydb::RateLimiter::AcquireResourceRequest, + Ydb::RateLimiter::AcquireResourceResponse>; + +} +} diff --git a/ydb/services/rate_limiter/grpc_service.cpp b/ydb/services/rate_limiter/grpc_service.cpp index 480045d29a..2040c78463 100644 --- a/ydb/services/rate_limiter/grpc_service.cpp +++ b/ydb/services/rate_limiter/grpc_service.cpp @@ -1,8 +1,8 @@ #include "grpc_service.h" #include <ydb/core/grpc_services/grpc_helper.h> -#include <ydb/core/grpc_services/grpc_request_proxy.h> -#include <ydb/core/grpc_services/rpc_calls.h> +#include <ydb/core/grpc_services/base/base.h> +#include <ydb/core/grpc_services/service_ratelimiter.h> namespace NKikimr::NQuoter { @@ -34,12 +34,13 @@ void TRateLimiterGRpcService::DecRequest() { void TRateLimiterGRpcService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { auto getCounterBlock = NGRpcService::CreateCounterCb(Counters, ActorSystem); + using namespace NGRpcService; #ifdef SETUP_METHOD #error SETUP_METHOD macro collision #endif -#define SETUP_METHOD(methodName, event) \ +#define SETUP_METHOD(methodName, cb, rps) \ MakeIntrusive<NGRpcService::TGRpcRequest< \ Ydb::RateLimiter::Y_CAT(methodName, Request), \ Ydb::RateLimiter::Y_CAT(methodName, Response), \ @@ -48,9 +49,13 @@ void TRateLimiterGRpcService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { this, \ &Service_, \ CQ, \ - [this](NGrpc::IRequestContextBase* reqCtx) { \ + [this](NGrpc::IRequestContextBase* reqCtx) { \ NGRpcService::ReportGrpcReqToMon(*ActorSystem, reqCtx->GetPeer()); \ - ActorSystem->Send(GRpcRequestProxyId, new NGRpcService::event(reqCtx)); \ + ActorSystem->Send(GRpcRequestProxyId, \ + new NGRpcService::TGrpcRequestOperationCall< \ + Ydb::RateLimiter::Y_CAT(methodName, Request), \ + Ydb::RateLimiter::Y_CAT(methodName, Response)> \ + (reqCtx, &cb, TRequestAuxSettings{TRateLimiterMode::rps, nullptr})); \ }, \ &Ydb::RateLimiter::V1::RateLimiterService::AsyncService::Y_CAT(Request, methodName), \ "RateLimiter/" Y_STRINGIZE(methodName), \ @@ -58,12 +63,12 @@ void TRateLimiterGRpcService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { getCounterBlock("rate_limiter", Y_STRINGIZE(methodName)) \ )->Run() - SETUP_METHOD(CreateResource, TEvCreateRateLimiterResource); - SETUP_METHOD(AlterResource, TEvAlterRateLimiterResource); - SETUP_METHOD(DropResource, TEvDropRateLimiterResource); - SETUP_METHOD(ListResources, TEvListRateLimiterResources); - SETUP_METHOD(DescribeResource, TEvDescribeRateLimiterResource); - SETUP_METHOD(AcquireResource, TEvAcquireRateLimiterResource); + SETUP_METHOD(CreateResource, DoCreateRateLimiterResource, Rps); + SETUP_METHOD(AlterResource, DoAlterRateLimiterResource, Rps); + SETUP_METHOD(DropResource, DoDropRateLimiterResource, Rps); + SETUP_METHOD(ListResources, DoListRateLimiterResources, Rps); + SETUP_METHOD(DescribeResource, DoDescribeRateLimiterResource, Rps); + SETUP_METHOD(AcquireResource, DoAcquireRateLimiterResource, Off); #undef SETUP_METHOD } |