aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDaniil Cherednik <dcherednik@yandex-team.ru>2022-04-13 21:13:30 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-04-13 21:13:30 +0300
commit642a79321672b18325904452fa97fa708767a0c4 (patch)
treeeb0bbd479fcc0cf842af2136dfbb77c8b4546d49
parent73c0ec18907383621ed0d2df671285a8c1a99d27 (diff)
downloadydb-642a79321672b18325904452fa97fa708767a0c4.tar.gz
Invert grpc_request_proxy dependence for ratelimiter service. KIKIMR-13646
ref:3abfc579472bd8a24448293f79f620e2193df97b
-rw-r--r--ydb/core/grpc_services/grpc_request_check_actor.h1
-rw-r--r--ydb/core/grpc_services/grpc_request_proxy.cpp6
-rw-r--r--ydb/core/grpc_services/grpc_request_proxy.h6
-rw-r--r--ydb/core/grpc_services/local_rate_limiter.cpp1
-rw-r--r--ydb/core/grpc_services/rpc_calls.h8
-rw-r--r--ydb/core/grpc_services/rpc_rate_limiter_api.cpp27
-rw-r--r--ydb/core/grpc_services/service_ratelimiter.h19
-rw-r--r--ydb/core/grpc_services/service_ratelimiter_events.h23
-rw-r--r--ydb/services/rate_limiter/grpc_service.cpp27
9 files changed, 75 insertions, 43 deletions
diff --git a/ydb/core/grpc_services/grpc_request_check_actor.h b/ydb/core/grpc_services/grpc_request_check_actor.h
index e04d412fde..52342d0108 100644
--- a/ydb/core/grpc_services/grpc_request_check_actor.h
+++ b/ydb/core/grpc_services/grpc_request_check_actor.h
@@ -1,5 +1,6 @@
#pragma once
#include "defs.h"
+#include "service_ratelimiter_events.h"
#include "local_rate_limiter.h"
#include "operation_helpers.h"
#include "rpc_calls.h"
diff --git a/ydb/core/grpc_services/grpc_request_proxy.cpp b/ydb/core/grpc_services/grpc_request_proxy.cpp
index 7cd5e3c9ec..0ae6479fee 100644
--- a/ydb/core/grpc_services/grpc_request_proxy.cpp
+++ b/ydb/core/grpc_services/grpc_request_proxy.cpp
@@ -581,12 +581,6 @@ void TGRpcRequestProxyImpl::StateFunc(TAutoPtr<IEventHandle>& ev, const TActorCo
HFunc(TEvPQRemoveReadRuleRequest, PreHandle);
HFunc(TEvPQDescribeTopicRequest, PreHandle);
HFunc(TEvDiscoverPQClustersRequest, PreHandle);
- HFunc(TEvCreateRateLimiterResource, PreHandle);
- HFunc(TEvAlterRateLimiterResource, PreHandle);
- HFunc(TEvDropRateLimiterResource, PreHandle);
- HFunc(TEvListRateLimiterResources, PreHandle);
- HFunc(TEvDescribeRateLimiterResource, PreHandle);
- HFunc(TEvAcquireRateLimiterResource, PreHandle);
HFunc(TEvCoordinationSessionRequest, PreHandle);
HFunc(TEvProxyRuntimeEvent, PreHandle);
diff --git a/ydb/core/grpc_services/grpc_request_proxy.h b/ydb/core/grpc_services/grpc_request_proxy.h
index 500ea9f779..3074e08799 100644
--- a/ydb/core/grpc_services/grpc_request_proxy.h
+++ b/ydb/core/grpc_services/grpc_request_proxy.h
@@ -63,12 +63,6 @@ protected:
void Handle(TEvPQRemoveReadRuleRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPQDescribeTopicRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDiscoverPQClustersRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvCreateRateLimiterResource::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvAlterRateLimiterResource::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDropRateLimiterResource::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvListRateLimiterResources::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDescribeRateLimiterResource::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvAcquireRateLimiterResource::TPtr& ev, const TActorContext& ctx);
void Handle(TEvLoginRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvCoordinationSessionRequest::TPtr& ev, const TActorContext& ctx);
diff --git a/ydb/core/grpc_services/local_rate_limiter.cpp b/ydb/core/grpc_services/local_rate_limiter.cpp
index 3a2fcb78cb..5f4a9c7d1e 100644
--- a/ydb/core/grpc_services/local_rate_limiter.cpp
+++ b/ydb/core/grpc_services/local_rate_limiter.cpp
@@ -1,4 +1,5 @@
#include "local_rate_limiter.h"
+#include "service_ratelimiter_events.h"
#include "rpc_common.h"
#include <ydb/core/grpc_services/local_rpc/local_rpc.h>
diff --git a/ydb/core/grpc_services/rpc_calls.h b/ydb/core/grpc_services/rpc_calls.h
index 949e353343..536d3a0e40 100644
--- a/ydb/core/grpc_services/rpc_calls.h
+++ b/ydb/core/grpc_services/rpc_calls.h
@@ -16,7 +16,6 @@
#include <ydb/public/api/protos/ydb_s3_internal.pb.h>
#include <ydb/public/api/protos/ydb_persqueue_cluster_discovery.pb.h>
#include <ydb/public/api/protos/ydb_persqueue_v1.pb.h>
-#include <ydb/public/api/protos/ydb_rate_limiter.pb.h>
#include <ydb/public/api/protos/yq.pb.h>
@@ -58,15 +57,10 @@ using TEvPQAddReadRuleRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQ
using TEvPQRemoveReadRuleRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQRemoveReadRule, Ydb::PersQueue::V1::RemoveReadRuleRequest, Ydb::PersQueue::V1::RemoveReadRuleResponse, true>;
using TEvDiscoverPQClustersRequest = TGRpcRequestWrapper<TRpcServices::EvDiscoverPQClusters, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResponse, true>;
-using TEvCreateRateLimiterResource = TGRpcRequestWrapper<TRpcServices::EvCreateRateLimiterResource, Ydb::RateLimiter::CreateResourceRequest, Ydb::RateLimiter::CreateResourceResponse, true, TRateLimiterMode::Rps>;
-using TEvAlterRateLimiterResource = TGRpcRequestWrapper<TRpcServices::EvAlterRateLimiterResource, Ydb::RateLimiter::AlterResourceRequest, Ydb::RateLimiter::AlterResourceResponse, true, TRateLimiterMode::Rps>;
-using TEvDropRateLimiterResource = TGRpcRequestWrapper<TRpcServices::EvDropRateLimiterResource, Ydb::RateLimiter::DropResourceRequest, Ydb::RateLimiter::DropResourceResponse, true, TRateLimiterMode::Rps>;
-using TEvListRateLimiterResources = TGRpcRequestWrapper<TRpcServices::EvListRateLimiterResources, Ydb::RateLimiter::ListResourcesRequest, Ydb::RateLimiter::ListResourcesResponse, true, TRateLimiterMode::Rps>;
-using TEvDescribeRateLimiterResource = TGRpcRequestWrapper<TRpcServices::EvDescribeRateLimiterResource, Ydb::RateLimiter::DescribeResourceRequest, Ydb::RateLimiter::DescribeResourceResponse, true, TRateLimiterMode::Rps>;
-using TEvAcquireRateLimiterResource = TGRpcRequestWrapper<TRpcServices::EvAcquireRateLimiterResource, Ydb::RateLimiter::AcquireResourceRequest, Ydb::RateLimiter::AcquireResourceResponse, true>;
using TEvLoginRequest = TGRpcRequestWrapperNoAuth<TRpcServices::EvLogin, Ydb::Auth::LoginRequest, Ydb::Auth::LoginResponse>;
using TEvCoordinationSessionRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvCoordinationSession, Ydb::Coordination::SessionRequest, Ydb::Coordination::SessionResponse>;
+
} // namespace NGRpcService
} // namespace NKikimr
diff --git a/ydb/core/grpc_services/rpc_rate_limiter_api.cpp b/ydb/core/grpc_services/rpc_rate_limiter_api.cpp
index 3e27a32d95..a59859b33b 100644
--- a/ydb/core/grpc_services/rpc_rate_limiter_api.cpp
+++ b/ydb/core/grpc_services/rpc_rate_limiter_api.cpp
@@ -1,4 +1,5 @@
-#include "grpc_request_proxy.h"
+#include "service_ratelimiter.h"
+#include "service_ratelimiter_events.h"
#include "rpc_calls.h"
#include "rpc_scheme_base.h"
@@ -494,28 +495,28 @@ public:
} // namespace
-void TGRpcRequestProxy::Handle(TEvCreateRateLimiterResource::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TCreateRateLimiterResourceRPC(ev->Release().Release()));
+void DoCreateRateLimiterResource(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TCreateRateLimiterResourceRPC(p.release()));
}
-void TGRpcRequestProxy::Handle(TEvAlterRateLimiterResource::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TAlterRateLimiterResourceRPC(ev->Release().Release()));
+void DoAlterRateLimiterResource(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TAlterRateLimiterResourceRPC(p.release()));
}
-void TGRpcRequestProxy::Handle(TEvDropRateLimiterResource::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TDropRateLimiterResourceRPC(ev->Release().Release()));
+void DoDropRateLimiterResource(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TDropRateLimiterResourceRPC(p.release()));
}
-void TGRpcRequestProxy::Handle(TEvListRateLimiterResources::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TListRateLimiterResourcesRPC(ev->Release().Release()));
+void DoListRateLimiterResources(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TListRateLimiterResourcesRPC(p.release()));
}
-void TGRpcRequestProxy::Handle(TEvDescribeRateLimiterResource::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TDescribeRateLimiterResourceRPC(ev->Release().Release()));
+void DoDescribeRateLimiterResource(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TDescribeRateLimiterResourceRPC(p.release()));
}
-void TGRpcRequestProxy::Handle(TEvAcquireRateLimiterResource::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TAcquireRateLimiterResourceRPC(ev->Release().Release()));
+void DoAcquireRateLimiterResource(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TAcquireRateLimiterResourceRPC(p.release()));
}
template<>
diff --git a/ydb/core/grpc_services/service_ratelimiter.h b/ydb/core/grpc_services/service_ratelimiter.h
new file mode 100644
index 0000000000..003c706edc
--- /dev/null
+++ b/ydb/core/grpc_services/service_ratelimiter.h
@@ -0,0 +1,19 @@
+#pragma once
+
+#include <memory>
+
+namespace NKikimr {
+namespace NGRpcService {
+
+class IRequestOpCtx;
+class IFacilityProvider;
+
+void DoCreateRateLimiterResource(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoAlterRateLimiterResource(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDropRateLimiterResource(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoListRateLimiterResources(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDescribeRateLimiterResource(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoAcquireRateLimiterResource(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+
+}
+}
diff --git a/ydb/core/grpc_services/service_ratelimiter_events.h b/ydb/core/grpc_services/service_ratelimiter_events.h
new file mode 100644
index 0000000000..edd96653eb
--- /dev/null
+++ b/ydb/core/grpc_services/service_ratelimiter_events.h
@@ -0,0 +1,23 @@
+#pragma once
+
+#include <ydb/core/grpc_services/base/base.h>
+#include <ydb/public/api/protos/ydb_rate_limiter.pb.h>
+
+namespace NKikimr {
+namespace NGRpcService {
+
+using TEvCreateRateLimiterResource = TGrpcRequestOperationCall<Ydb::RateLimiter::CreateResourceRequest,
+ Ydb::RateLimiter::CreateResourceResponse>;
+using TEvAlterRateLimiterResource = TGrpcRequestOperationCall<Ydb::RateLimiter::AlterResourceRequest,
+ Ydb::RateLimiter::AlterResourceResponse>;
+using TEvDropRateLimiterResource = TGrpcRequestOperationCall<Ydb::RateLimiter::DropResourceRequest,
+ Ydb::RateLimiter::DropResourceResponse>;
+using TEvListRateLimiterResources = TGrpcRequestOperationCall<Ydb::RateLimiter::ListResourcesRequest,
+ Ydb::RateLimiter::ListResourcesResponse>;
+using TEvDescribeRateLimiterResource = TGrpcRequestOperationCall<Ydb::RateLimiter::DescribeResourceRequest,
+ Ydb::RateLimiter::DescribeResourceResponse>;
+using TEvAcquireRateLimiterResource = TGrpcRequestOperationCall<Ydb::RateLimiter::AcquireResourceRequest,
+ Ydb::RateLimiter::AcquireResourceResponse>;
+
+}
+}
diff --git a/ydb/services/rate_limiter/grpc_service.cpp b/ydb/services/rate_limiter/grpc_service.cpp
index 480045d29a..2040c78463 100644
--- a/ydb/services/rate_limiter/grpc_service.cpp
+++ b/ydb/services/rate_limiter/grpc_service.cpp
@@ -1,8 +1,8 @@
#include "grpc_service.h"
#include <ydb/core/grpc_services/grpc_helper.h>
-#include <ydb/core/grpc_services/grpc_request_proxy.h>
-#include <ydb/core/grpc_services/rpc_calls.h>
+#include <ydb/core/grpc_services/base/base.h>
+#include <ydb/core/grpc_services/service_ratelimiter.h>
namespace NKikimr::NQuoter {
@@ -34,12 +34,13 @@ void TRateLimiterGRpcService::DecRequest() {
void TRateLimiterGRpcService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
auto getCounterBlock = NGRpcService::CreateCounterCb(Counters, ActorSystem);
+ using namespace NGRpcService;
#ifdef SETUP_METHOD
#error SETUP_METHOD macro collision
#endif
-#define SETUP_METHOD(methodName, event) \
+#define SETUP_METHOD(methodName, cb, rps) \
MakeIntrusive<NGRpcService::TGRpcRequest< \
Ydb::RateLimiter::Y_CAT(methodName, Request), \
Ydb::RateLimiter::Y_CAT(methodName, Response), \
@@ -48,9 +49,13 @@ void TRateLimiterGRpcService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
this, \
&Service_, \
CQ, \
- [this](NGrpc::IRequestContextBase* reqCtx) { \
+ [this](NGrpc::IRequestContextBase* reqCtx) { \
NGRpcService::ReportGrpcReqToMon(*ActorSystem, reqCtx->GetPeer()); \
- ActorSystem->Send(GRpcRequestProxyId, new NGRpcService::event(reqCtx)); \
+ ActorSystem->Send(GRpcRequestProxyId, \
+ new NGRpcService::TGrpcRequestOperationCall< \
+ Ydb::RateLimiter::Y_CAT(methodName, Request), \
+ Ydb::RateLimiter::Y_CAT(methodName, Response)> \
+ (reqCtx, &cb, TRequestAuxSettings{TRateLimiterMode::rps, nullptr})); \
}, \
&Ydb::RateLimiter::V1::RateLimiterService::AsyncService::Y_CAT(Request, methodName), \
"RateLimiter/" Y_STRINGIZE(methodName), \
@@ -58,12 +63,12 @@ void TRateLimiterGRpcService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
getCounterBlock("rate_limiter", Y_STRINGIZE(methodName)) \
)->Run()
- SETUP_METHOD(CreateResource, TEvCreateRateLimiterResource);
- SETUP_METHOD(AlterResource, TEvAlterRateLimiterResource);
- SETUP_METHOD(DropResource, TEvDropRateLimiterResource);
- SETUP_METHOD(ListResources, TEvListRateLimiterResources);
- SETUP_METHOD(DescribeResource, TEvDescribeRateLimiterResource);
- SETUP_METHOD(AcquireResource, TEvAcquireRateLimiterResource);
+ SETUP_METHOD(CreateResource, DoCreateRateLimiterResource, Rps);
+ SETUP_METHOD(AlterResource, DoAlterRateLimiterResource, Rps);
+ SETUP_METHOD(DropResource, DoDropRateLimiterResource, Rps);
+ SETUP_METHOD(ListResources, DoListRateLimiterResources, Rps);
+ SETUP_METHOD(DescribeResource, DoDescribeRateLimiterResource, Rps);
+ SETUP_METHOD(AcquireResource, DoAcquireRateLimiterResource, Off);
#undef SETUP_METHOD
}