summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDaniil Cherednik <[email protected]>2022-04-01 11:52:06 +0300
committerDaniil Cherednik <[email protected]>2022-04-01 11:52:06 +0300
commite84a22027f7a0a962827c8d6be4968a810b042db (patch)
tree0fa6bf8be5465a002ef4d31689865afeccb9c253
parent05091b7d4e382df88a2480b582e800943634518d (diff)
Invert grpc_request_proxy dependence for coordination service (control plane) KIKIMR-13646
ref:762156edf55afdc30a10288b74119e5056b9e5e7
-rw-r--r--ydb/core/grpc_services/grpc_request_proxy.cpp4
-rw-r--r--ydb/core/grpc_services/grpc_request_proxy.h4
-rw-r--r--ydb/core/grpc_services/rpc_alter_coordination_node.cpp13
-rw-r--r--ydb/core/grpc_services/rpc_calls.h6
-rw-r--r--ydb/core/grpc_services/rpc_create_coordination_node.cpp13
-rw-r--r--ydb/core/grpc_services/rpc_describe_coordination_node.cpp15
-rw-r--r--ydb/core/grpc_services/rpc_drop_coordination_node.cpp13
-rw-r--r--ydb/core/grpc_services/service_coordination.h17
-rw-r--r--ydb/services/kesus/grpc_service.cpp26
9 files changed, 60 insertions, 51 deletions
diff --git a/ydb/core/grpc_services/grpc_request_proxy.cpp b/ydb/core/grpc_services/grpc_request_proxy.cpp
index deda24cdf01..e35cdda7d30 100644
--- a/ydb/core/grpc_services/grpc_request_proxy.cpp
+++ b/ydb/core/grpc_services/grpc_request_proxy.cpp
@@ -568,10 +568,6 @@ void TGRpcRequestProxyImpl::StateFunc(TAutoPtr<IEventHandle>& ev, const TActorCo
HFunc(TRefreshTokenImpl, PreHandle);
HFunc(TEvLoginRequest, PreHandle);
HFunc(TEvListEndpointsRequest, PreHandle);
- HFunc(TEvCreateCoordinationNode, PreHandle);
- HFunc(TEvAlterCoordinationNode, PreHandle);
- HFunc(TEvDropCoordinationNode, PreHandle);
- HFunc(TEvDescribeCoordinationNode, PreHandle);
HFunc(TEvReadColumnsRequest, PreHandle);
HFunc(TEvGetShardLocationsRequest, PreHandle);
HFunc(TEvKikhouseDescribeTableRequest, PreHandle);
diff --git a/ydb/core/grpc_services/grpc_request_proxy.h b/ydb/core/grpc_services/grpc_request_proxy.h
index adeabb56a2f..d1cfe65d855 100644
--- a/ydb/core/grpc_services/grpc_request_proxy.h
+++ b/ydb/core/grpc_services/grpc_request_proxy.h
@@ -50,10 +50,6 @@ public:
protected:
void Handle(TEvListEndpointsRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvCreateCoordinationNode::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvAlterCoordinationNode::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDropCoordinationNode::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDescribeCoordinationNode::TPtr& ev, const TActorContext& ctx);
void Handle(TEvReadColumnsRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvGetShardLocationsRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvKikhouseDescribeTableRequest::TPtr& ev, const TActorContext& ctx);
diff --git a/ydb/core/grpc_services/rpc_alter_coordination_node.cpp b/ydb/core/grpc_services/rpc_alter_coordination_node.cpp
index 52cfe8c9403..58b36eda9d6 100644
--- a/ydb/core/grpc_services/rpc_alter_coordination_node.cpp
+++ b/ydb/core/grpc_services/rpc_alter_coordination_node.cpp
@@ -1,6 +1,6 @@
-#include "grpc_request_proxy.h"
+#include "service_coordination.h"
+#include <ydb/core/grpc_services/base/base.h>
-#include "rpc_calls.h"
#include "rpc_scheme_base.h"
#include "rpc_common.h"
@@ -10,11 +10,14 @@ namespace NGRpcService {
using namespace NActors;
using namespace Ydb;
+using TEvAlterCoordinationNode = TGrpcRequestOperationCall<Ydb::Coordination::AlterNodeRequest,
+ Ydb::Coordination::AlterNodeResponse>;
+
class TAlterCoordinationNodeRPC : public TRpcSchemeRequestActor<TAlterCoordinationNodeRPC, TEvAlterCoordinationNode> {
using TBase = TRpcSchemeRequestActor<TAlterCoordinationNodeRPC, TEvAlterCoordinationNode>;
public:
- TAlterCoordinationNodeRPC(TEvAlterCoordinationNode* msg)
+ TAlterCoordinationNodeRPC(IRequestOpCtx* msg)
: TBase(msg) {}
void Bootstrap(const TActorContext& ctx) {
@@ -57,8 +60,8 @@ private:
}
};
-void TGRpcRequestProxy::Handle(TEvAlterCoordinationNode::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TAlterCoordinationNodeRPC(ev->Release().Release()));
+void DoAlterCoordinationNode(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TAlterCoordinationNodeRPC(p.release()));
}
} // namespace NKikimr
diff --git a/ydb/core/grpc_services/rpc_calls.h b/ydb/core/grpc_services/rpc_calls.h
index 9224eed750f..bad9473fb64 100644
--- a/ydb/core/grpc_services/rpc_calls.h
+++ b/ydb/core/grpc_services/rpc_calls.h
@@ -45,16 +45,13 @@ void FillYdbStatus(Ydb::Coordination::SessionResponse& resp, const NYql::TIssues
using TEvListEndpointsRequest = TGRpcRequestWrapper<TRpcServices::EvListEndpoints, Ydb::Discovery::ListEndpointsRequest, Ydb::Discovery::ListEndpointsResponse, true>;
-using TEvCreateCoordinationNode = TGRpcRequestWrapper<TRpcServices::EvCreateCoordinationNode, Ydb::Coordination::CreateNodeRequest, Ydb::Coordination::CreateNodeResponse, true, TRateLimiterMode::Rps>;
-using TEvAlterCoordinationNode = TGRpcRequestWrapper<TRpcServices::EvAlterCoordinationNode, Ydb::Coordination::AlterNodeRequest, Ydb::Coordination::AlterNodeResponse, true, TRateLimiterMode::Rps>;
-using TEvDropCoordinationNode = TGRpcRequestWrapper<TRpcServices::EvDropCoordinationNode, Ydb::Coordination::DropNodeRequest, Ydb::Coordination::DropNodeResponse, true, TRateLimiterMode::Rps>;
-using TEvDescribeCoordinationNode = TGRpcRequestWrapper<TRpcServices::EvDescribeCoordinationNode, Ydb::Coordination::DescribeNodeRequest, Ydb::Coordination::DescribeNodeResponse, true, TRateLimiterMode::Rps>;
using TEvReadColumnsRequest = TGRpcRequestWrapper<TRpcServices::EvReadColumns, Ydb::ClickhouseInternal::ScanRequest, Ydb::ClickhouseInternal::ScanResponse, true>;
using TEvGetShardLocationsRequest = TGRpcRequestWrapper<TRpcServices::EvGetShardLocations, Ydb::ClickhouseInternal::GetShardLocationsRequest, Ydb::ClickhouseInternal::GetShardLocationsResponse, true>;
using TEvKikhouseDescribeTableRequest = TGRpcRequestWrapper<TRpcServices::EvKikhouseDescribeTable, Ydb::ClickhouseInternal::DescribeTableRequest, Ydb::ClickhouseInternal::DescribeTableResponse, true>;
using TEvS3ListingRequest = TGRpcRequestWrapper<TRpcServices::EvS3Listing, Ydb::S3Internal::S3ListingRequest, Ydb::S3Internal::S3ListingResponse, true>;
using TEvBiStreamPingRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvBiStreamPing, Draft::Dummy::PingRequest, Draft::Dummy::PingResponse>;
using TEvExperimentalStreamQueryRequest = TGRpcRequestWrapper<TRpcServices::EvExperimentalStreamQuery, Ydb::Experimental::ExecuteStreamQueryRequest, Ydb::Experimental::ExecuteStreamQueryResponse, false>;
+
using TEvStreamPQWriteRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamPQWrite, Ydb::PersQueue::V1::StreamingWriteClientMessage, Ydb::PersQueue::V1::StreamingWriteServerMessage>;
using TEvStreamPQReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamPQRead, Ydb::PersQueue::V1::MigrationStreamingReadClientMessage, Ydb::PersQueue::V1::MigrationStreamingReadServerMessage>;
using TEvPQReadInfoRequest = TGRpcRequestWrapper<TRpcServices::EvPQReadInfo, Ydb::PersQueue::V1::ReadInfoRequest, Ydb::PersQueue::V1::ReadInfoResponse, true>;
@@ -64,6 +61,7 @@ using TEvPQAlterTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQA
using TEvPQDescribeTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQDescribeTopic, Ydb::PersQueue::V1::DescribeTopicRequest, Ydb::PersQueue::V1::DescribeTopicResponse, true>;
using TEvPQAddReadRuleRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQAddReadRule, Ydb::PersQueue::V1::AddReadRuleRequest, Ydb::PersQueue::V1::AddReadRuleResponse, true>;
using TEvPQRemoveReadRuleRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQRemoveReadRule, Ydb::PersQueue::V1::RemoveReadRuleRequest, Ydb::PersQueue::V1::RemoveReadRuleResponse, true>;
+
using TEvExportToYtRequest = TGRpcRequestValidationWrapper<TRpcServices::EvExportToYt, Ydb::Export::ExportToYtRequest, Ydb::Export::ExportToYtResponse, true>;
using TEvExportToS3Request = TGRpcRequestValidationWrapper<TRpcServices::EvExportToS3, Ydb::Export::ExportToS3Request, Ydb::Export::ExportToS3Response, true>;
using TEvImportFromS3Request = TGRpcRequestValidationWrapper<TRpcServices::EvImportFromS3, Ydb::Import::ImportFromS3Request, Ydb::Import::ImportFromS3Response, true>;
diff --git a/ydb/core/grpc_services/rpc_create_coordination_node.cpp b/ydb/core/grpc_services/rpc_create_coordination_node.cpp
index 11c838592ca..f96fe2f4ef6 100644
--- a/ydb/core/grpc_services/rpc_create_coordination_node.cpp
+++ b/ydb/core/grpc_services/rpc_create_coordination_node.cpp
@@ -1,6 +1,6 @@
-#include "grpc_request_proxy.h"
+#include "service_coordination.h"
+#include <ydb/core/grpc_services/base/base.h>
-#include "rpc_calls.h"
#include "rpc_scheme_base.h"
#include "rpc_common.h"
@@ -10,11 +10,14 @@ namespace NGRpcService {
using namespace NActors;
using namespace Ydb;
+using TEvCreateCoordinationNode = TGrpcRequestOperationCall<Ydb::Coordination::CreateNodeRequest,
+ Ydb::Coordination::CreateNodeResponse>;
+
class TCreateCoordinationNodeRPC : public TRpcSchemeRequestActor<TCreateCoordinationNodeRPC, TEvCreateCoordinationNode> {
using TBase = TRpcSchemeRequestActor<TCreateCoordinationNodeRPC, TEvCreateCoordinationNode>;
public:
- TCreateCoordinationNodeRPC(TEvCreateCoordinationNode* msg)
+ TCreateCoordinationNodeRPC(IRequestOpCtx* msg)
: TBase(msg) {}
void Bootstrap(const TActorContext& ctx) {
@@ -55,8 +58,8 @@ private:
}
};
-void TGRpcRequestProxy::Handle(TEvCreateCoordinationNode::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TCreateCoordinationNodeRPC(ev->Release().Release()));
+void DoCreateCoordinationNode(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TCreateCoordinationNodeRPC(p.release()));
}
} // namespace NKikimr
diff --git a/ydb/core/grpc_services/rpc_describe_coordination_node.cpp b/ydb/core/grpc_services/rpc_describe_coordination_node.cpp
index 04c16bf32e8..291cd267cb5 100644
--- a/ydb/core/grpc_services/rpc_describe_coordination_node.cpp
+++ b/ydb/core/grpc_services/rpc_describe_coordination_node.cpp
@@ -1,6 +1,6 @@
-#include "grpc_request_proxy.h"
+#include "service_coordination.h"
+#include <ydb/core/grpc_services/base/base.h>
-#include "rpc_calls.h"
#include "rpc_scheme_base.h"
#include "rpc_common.h"
@@ -14,11 +14,14 @@ namespace NGRpcService {
using namespace NActors;
using namespace Ydb;
+using TEvDescribeCoordinationNode = TGrpcRequestOperationCall<Ydb::Coordination::DescribeNodeRequest,
+ Ydb::Coordination::DescribeNodeResponse>;
+
class TDescribeCoordinationNode : public TRpcSchemeRequestActor<TDescribeCoordinationNode, TEvDescribeCoordinationNode> {
using TBase = TRpcSchemeRequestActor<TDescribeCoordinationNode, TEvDescribeCoordinationNode>;
public:
- TDescribeCoordinationNode(TEvDescribeCoordinationNode* msg)
+ TDescribeCoordinationNode(IRequestOpCtx* msg)
: TBase(msg) {}
void Bootstrap(const TActorContext &ctx) {
@@ -91,12 +94,10 @@ private:
ctx.Send(MakeTxProxyID(), navigateRequest.release());
}
-
-
};
-void TGRpcRequestProxy::Handle(TEvDescribeCoordinationNode::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TDescribeCoordinationNode(ev->Release().Release()));
+void DoDescribeCoordinationNode(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TDescribeCoordinationNode(p.release()));
}
}
diff --git a/ydb/core/grpc_services/rpc_drop_coordination_node.cpp b/ydb/core/grpc_services/rpc_drop_coordination_node.cpp
index 3632ec58723..7298df3223c 100644
--- a/ydb/core/grpc_services/rpc_drop_coordination_node.cpp
+++ b/ydb/core/grpc_services/rpc_drop_coordination_node.cpp
@@ -1,6 +1,6 @@
-#include "grpc_request_proxy.h"
+#include "service_coordination.h"
+#include <ydb/core/grpc_services/base/base.h>
-#include "rpc_calls.h"
#include "rpc_scheme_base.h"
#include "rpc_common.h"
@@ -10,11 +10,14 @@ namespace NGRpcService {
using namespace NActors;
using namespace Ydb;
+using TEvDropCoordinationNode = TGrpcRequestOperationCall<Ydb::Coordination::DropNodeRequest,
+ Ydb::Coordination::DropNodeResponse>;
+
class TDropCoordinationNodeRPC : public TRpcSchemeRequestActor<TDropCoordinationNodeRPC, TEvDropCoordinationNode> {
using TBase = TRpcSchemeRequestActor<TDropCoordinationNodeRPC, TEvDropCoordinationNode>;
public:
- TDropCoordinationNodeRPC(TEvDropCoordinationNode* msg)
+ TDropCoordinationNodeRPC(IRequestOpCtx* msg)
: TBase(msg) {}
void Bootstrap(const TActorContext &ctx) {
@@ -54,8 +57,8 @@ private:
}
};
-void TGRpcRequestProxy::Handle(TEvDropCoordinationNode::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TDropCoordinationNodeRPC(ev->Release().Release()));
+void DoDropCoordinationNode(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TDropCoordinationNodeRPC(p.release()));
}
} // namespace NKikimr
diff --git a/ydb/core/grpc_services/service_coordination.h b/ydb/core/grpc_services/service_coordination.h
new file mode 100644
index 00000000000..94842d3b695
--- /dev/null
+++ b/ydb/core/grpc_services/service_coordination.h
@@ -0,0 +1,17 @@
+#pragma once
+#include <memory>
+
+namespace NKikimr {
+namespace NGRpcService {
+
+class IRequestOpCtx;
+class IFacilityProvider;
+
+void DoCreateCoordinationNode(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoAlterCoordinationNode(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDropCoordinationNode(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDescribeCoordinationNode(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+
+}
+}
+
diff --git a/ydb/services/kesus/grpc_service.cpp b/ydb/services/kesus/grpc_service.cpp
index c5826b66274..e297e00fe01 100644
--- a/ydb/services/kesus/grpc_service.cpp
+++ b/ydb/services/kesus/grpc_service.cpp
@@ -7,6 +7,7 @@
#include <ydb/core/base/appdata.h>
#include <ydb/core/grpc_services/grpc_helper.h>
#include <ydb/core/grpc_services/grpc_request_proxy.h>
+#include <ydb/core/grpc_services/service_coordination.h>
#include <ydb/core/grpc_services/rpc_calls.h>
#include <ydb/core/grpc_streaming/grpc_streaming.h>
#include <ydb/core/base/ticket_parser.h>
@@ -644,35 +645,26 @@ void TKesusGRpcService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
#error ADD_REQUEST macro is already defined
#endif
-#define ADD_REQUEST(NAME, IN, OUT, ACTION) \
+#define ADD_REQUEST(NAME, IN, OUT, CB) \
MakeIntrusive<NGRpcService::TGRpcRequest<Ydb::Coordination::IN, Ydb::Coordination::OUT, TKesusGRpcService>>( \
this, \
&Service_, \
CQ, \
[this](NGrpc::IRequestContextBase* reqCtx) { \
NGRpcService::ReportGrpcReqToMon(*ActorSystem, reqCtx->GetPeer()); \
- ACTION; \
+ ActorSystem->Send(GRpcRequestProxyId, \
+ new NGRpcService::TGrpcRequestOperationCall<Ydb::Coordination::IN, Ydb::Coordination::OUT> \
+ (reqCtx, &CB, NGRpcService::TRequestAuxSettings{NGRpcService::TRateLimiterMode::Rps, nullptr})); \
}, \
&Ydb::Coordination::V1::CoordinationService::AsyncService::Request ## NAME, \
"Coordination/" #NAME, \
logger, \
getCounterBlock("coordination", #NAME))->Run();
- ADD_REQUEST(CreateNode, CreateNodeRequest, CreateNodeResponse, {
- ActorSystem->Send(GRpcRequestProxyId, new NGRpcService::TEvCreateCoordinationNode(reqCtx));
- });
-
- ADD_REQUEST(AlterNode, AlterNodeRequest, AlterNodeResponse, {
- ActorSystem->Send(GRpcRequestProxyId, new NGRpcService::TEvAlterCoordinationNode(reqCtx));
- });
-
- ADD_REQUEST(DropNode, DropNodeRequest, DropNodeResponse, {
- ActorSystem->Send(GRpcRequestProxyId, new NGRpcService::TEvDropCoordinationNode(reqCtx));
- });
-
- ADD_REQUEST(DescribeNode, DescribeNodeRequest, DescribeNodeResponse, {
- ActorSystem->Send(GRpcRequestProxyId, new NGRpcService::TEvDescribeCoordinationNode(reqCtx));
- });
+ ADD_REQUEST(CreateNode, CreateNodeRequest, CreateNodeResponse, NGRpcService::DoCreateCoordinationNode);
+ ADD_REQUEST(AlterNode, AlterNodeRequest, AlterNodeResponse, NGRpcService::DoAlterCoordinationNode);
+ ADD_REQUEST(DropNode, DropNodeRequest, DropNodeResponse, NGRpcService::DoDropCoordinationNode);
+ ADD_REQUEST(DescribeNode, DescribeNodeRequest, DescribeNodeResponse, NGRpcService::DoDescribeCoordinationNode);
#undef ADD_REQUEST